diff --git a/Cargo.lock b/Cargo.lock index 15e573d..3c7b354 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2103,12 +2103,13 @@ dependencies = [ [[package]] name = "scootaloo" -version = "0.8.3" +version = "0.9.0" dependencies = [ "chrono", "clap", "egg-mode", "elefren", + "futures 0.3.25", "html-escape", "log", "mime", diff --git a/Cargo.toml b/Cargo.toml index 9d88ddf..47873b6 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "scootaloo" -version = "0.8.3" +version = "0.9.0" authors = ["VC "] edition = "2021" @@ -13,7 +13,8 @@ toml = "^0.5" clap = "^4" egg-mode = "^0.16" rusqlite = "^0.27" -tokio = { version = "1", features = ["full"]} +tokio = { version = "^1", features = ["full"]} +futures = "^0.3" elefren = "^0.22" html-escape = "^0.2" reqwest = "^0.11" diff --git a/src/lib.rs b/src/lib.rs index f5e0822..af8210e 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -25,12 +25,11 @@ use rusqlite::Connection; use std::sync::Arc; use tokio::{spawn, sync::Mutex}; +use futures::StreamExt; + /// This is where the magic happens #[tokio::main] pub async fn run(config: Config) { - // create the task vector for handling multiple accounts - let mut mtask = vec![]; - // open the SQLite connection let conn = Arc::new(Mutex::new( Connection::open(&config.scootaloo.db_path).unwrap_or_else(|e| { @@ -41,13 +40,14 @@ pub async fn run(config: Config) { }), )); - for mastodon_config in config.mastodon.into_values() { + let mut stream = futures::stream::iter(config.mastodon.into_values()) + .map(|mastodon_config| { // create temporary value for each task let scootaloo_cache_path = config.scootaloo.cache_path.clone(); let token = get_oauth2_token(&config.twitter); let task_conn = conn.clone(); - let task = spawn(async move { + spawn(async move { info!("Starting treating {}", &mastodon_config.twitter_screen_name); // retrieve the last tweet ID for the username @@ -138,14 +138,11 @@ pub async fn run(config: Config) { } Ok::<(), ScootalooError>(()) }); - - // push each task into the vec task - mtask.push(task); - } + }).buffer_unordered(4); // launch and wait for every handle - for handle in mtask { - match handle.await { + while let Some(result) = stream.next().await { + match result { Ok(Err(e)) => eprintln!("Error within thread: {}", e), Err(e) => eprintln!("Error with thread: {}", e), _ => (), diff --git a/src/util.rs b/src/util.rs index 4409d64..cc873bd 100644 --- a/src/util.rs +++ b/src/util.rs @@ -15,6 +15,8 @@ use tokio::{ io::copy, }; +use futures::{stream, stream::StreamExt}; + /// Generate associative table between media ids and tweet extended entities pub async fn generate_media_ids( tweet: &Tweet, @@ -25,45 +27,40 @@ pub async fn generate_media_ids( let mut media_ids: Vec = vec![]; if let Some(m) = &tweet.extended_entities { - // create tasks list - let mut tasks = vec![]; - - // size of media_ids vector, should be equal to the media vector - media_ids.resize(m.media.len(), String::new()); - info!("{} medias in tweet", m.media.len()); - for (i, media) in m.media.iter().enumerate() { - // attribute media url - media_url = media.url.clone(); + let medias = m.media.clone(); - // clone everything we need - let cache_path = String::from(cache_path); - let media = media.clone(); - let mastodon = mastodon.clone(); + let mut stream = stream::iter(medias) + .map(|media| { + // attribute media url + media_url = media.url.clone(); - let task = tokio::task::spawn(async move { - info!("Start treating {}", media.media_url_https); - // get the tweet embedded media - let local_tweet_media_path = get_tweet_media(&media, &cache_path).await?; + // clone everything we need + let cache_path = String::from(cache_path); + let mastodon = mastodon.clone(); - // upload media to Mastodon - let mastodon_media = - mastodon.media(Cow::from(local_tweet_media_path.to_owned()))?; - // at this point, we can safely erase the original file - // it doesn’t matter if we can’t remove, cache_media fn is idempotent - remove_file(&local_tweet_media_path).await.ok(); + tokio::task::spawn(async move { + info!("Start treating {}", media.media_url_https); + // get the tweet embedded media + let local_tweet_media_path = get_tweet_media(&media, &cache_path).await?; - Ok::<(usize, String), ScootalooError>((i, mastodon_media.id)) - }); + // upload media to Mastodon + let mastodon_media = + mastodon.media(Cow::from(local_tweet_media_path.to_owned()))?; + // at this point, we can safely erase the original file + // it doesn’t matter if we can’t remove, cache_media fn is idempotent + remove_file(&local_tweet_media_path).await.ok(); - tasks.push(task); - } + Ok::(mastodon_media.id) + }) + }) + .buffered(4); // there are max four medias per tweet and they need to be treated in + // order - for task in tasks { - match task.await { - // insert the media at the right place - Ok(Ok((i, v))) => media_ids[i] = v, + while let Some(result) = stream.next().await { + match result { + Ok(Ok(v)) => media_ids.push(v), Ok(Err(e)) => warn!("Cannot treat media: {}", e), Err(e) => error!("Something went wrong when joining the main thread: {}", e), }