diff --git a/Cargo.lock b/Cargo.lock index 15e573d..3c7b354 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2103,12 +2103,13 @@ dependencies = [ [[package]] name = "scootaloo" -version = "0.8.3" +version = "0.9.0" dependencies = [ "chrono", "clap", "egg-mode", "elefren", + "futures 0.3.25", "html-escape", "log", "mime", diff --git a/Cargo.toml b/Cargo.toml index 9d88ddf..47873b6 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "scootaloo" -version = "0.8.3" +version = "0.9.0" authors = ["VC "] edition = "2021" @@ -13,7 +13,8 @@ toml = "^0.5" clap = "^4" egg-mode = "^0.16" rusqlite = "^0.27" -tokio = { version = "1", features = ["full"]} +tokio = { version = "^1", features = ["full"]} +futures = "^0.3" elefren = "^0.22" html-escape = "^0.2" reqwest = "^0.11" diff --git a/README.md b/README.md index 8f0694a..be90edc 100644 --- a/README.md +++ b/README.md @@ -18,6 +18,7 @@ First up, create a configuration file (default path is `/usr/local/etc/scootaloo [scootaloo] db_path = "/var/lib/scootaloo/scootaloo.sqlite" ## file containing the SQLite Tweet corresponding Toot DB, must be writeable cache_path = "/tmp/scootaloo" ## a dir where the temporary files will be download, must be writeable +rate_limiting = 4 ## optional, default 4, number of accounts handled simultaneously [twitter] ## Consumer/Access key for Twitter (can be generated at https://developer.twitter.com/en/apps) diff --git a/src/config.rs b/src/config.rs index c5a6728..35fa6f3 100644 --- a/src/config.rs +++ b/src/config.rs @@ -32,6 +32,7 @@ pub struct MastodonConfig { pub struct ScootalooConfig { pub db_path: String, pub cache_path: String, + pub rate_limit: Option, } /// Parses the TOML file into a Config Struct diff --git a/src/lib.rs b/src/lib.rs index f5e0822..38fe096 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -25,12 +25,13 @@ use rusqlite::Connection; use std::sync::Arc; use tokio::{spawn, sync::Mutex}; +use futures::StreamExt; + +const DEFAULT_RATE_LIMIT: usize = 4; + /// This is where the magic happens #[tokio::main] pub async fn run(config: Config) { - // create the task vector for handling multiple accounts - let mut mtask = vec![]; - // open the SQLite connection let conn = Arc::new(Mutex::new( Connection::open(&config.scootaloo.db_path).unwrap_or_else(|e| { @@ -41,111 +42,110 @@ pub async fn run(config: Config) { }), )); - for mastodon_config in config.mastodon.into_values() { - // create temporary value for each task - let scootaloo_cache_path = config.scootaloo.cache_path.clone(); - let token = get_oauth2_token(&config.twitter); - let task_conn = conn.clone(); + let mut stream = futures::stream::iter(config.mastodon.into_values()) + .map(|mastodon_config| { + // create temporary value for each task + let scootaloo_cache_path = config.scootaloo.cache_path.clone(); + let token = get_oauth2_token(&config.twitter); + let task_conn = conn.clone(); - let task = spawn(async move { - info!("Starting treating {}", &mastodon_config.twitter_screen_name); + spawn(async move { + info!("Starting treating {}", &mastodon_config.twitter_screen_name); - // retrieve the last tweet ID for the username - let lconn = task_conn.lock().await; - let last_tweet_id = - read_state(&lconn, &mastodon_config.twitter_screen_name, None)?.map(|r| r.tweet_id); - drop(lconn); + // retrieve the last tweet ID for the username + let lconn = task_conn.lock().await; + let last_tweet_id = read_state(&lconn, &mastodon_config.twitter_screen_name, None)? + .map(|r| r.tweet_id); + drop(lconn); - // get user timeline feed (Vec) - let mut feed = - get_user_timeline(&mastodon_config.twitter_screen_name, &token, last_tweet_id) - .await?; + // get user timeline feed (Vec) + let mut feed = + get_user_timeline(&mastodon_config.twitter_screen_name, &token, last_tweet_id) + .await?; - // empty feed -> exiting - if feed.is_empty() { - info!("Nothing to retrieve since last time, exiting…"); - return Ok(()); - } - - // get Mastodon instance - let mastodon = get_mastodon_token(&mastodon_config); - - // order needs to be chronological - feed.reverse(); - - for tweet in &feed { - info!("Treating Tweet {} inside feed", tweet.id); - // initiate the toot_reply_id var - let mut toot_reply_id: Option = None; - // determine if the tweet is part of a thread (response to self) or a standard response - if let Some(r) = &tweet.in_reply_to_screen_name { - if r.to_lowercase() != mastodon_config.twitter_screen_name.to_lowercase() { - // we are responding not threading - info!("Tweet is a direct response, skipping"); - continue; - } - info!("Tweet is a thread"); - // get the corresponding toot id - let lconn = task_conn.lock().await; - toot_reply_id = read_state( - &lconn, - &mastodon_config.twitter_screen_name, - tweet.in_reply_to_status_id, - ) - .unwrap_or(None) - .map(|s| s.toot_id); - drop(lconn); - }; - - // build basic status by just yielding text and dereferencing contained urls - let mut status_text = build_basic_status(tweet); - - // building associative media list - let (media_url, status_medias) = - generate_media_ids(tweet, &scootaloo_cache_path, &mastodon).await; - - status_text = status_text.replace(&media_url, ""); - - info!("Building corresponding Mastodon status"); - - let mut status_builder = StatusBuilder::new(); - - status_builder.status(&status_text).media_ids(status_medias); - - if let Some(i) = toot_reply_id { - status_builder.in_reply_to(&i); + // empty feed -> exiting + if feed.is_empty() { + info!("Nothing to retrieve since last time, exiting…"); + return Ok(()); } - // can be activated for test purposes - // status_builder.visibility(elefren::status_builder::Visibility::Private); + // get Mastodon instance + let mastodon = get_mastodon_token(&mastodon_config); - let status = status_builder.build()?; + // order needs to be chronological + feed.reverse(); - let published_status = mastodon.new_status(status)?; - // this will return if it cannot publish the status preventing the last_tweet from - // being written into db + for tweet in &feed { + info!("Treating Tweet {} inside feed", tweet.id); + // initiate the toot_reply_id var + let mut toot_reply_id: Option = None; + // determine if the tweet is part of a thread (response to self) or a standard response + if let Some(r) = &tweet.in_reply_to_screen_name { + if r.to_lowercase() != mastodon_config.twitter_screen_name.to_lowercase() { + // we are responding not threading + info!("Tweet is a direct response, skipping"); + continue; + } + info!("Tweet is a thread"); + // get the corresponding toot id + let lconn = task_conn.lock().await; + toot_reply_id = read_state( + &lconn, + &mastodon_config.twitter_screen_name, + tweet.in_reply_to_status_id, + ) + .unwrap_or(None) + .map(|s| s.toot_id); + drop(lconn); + }; - let ttt_towrite = TweetToToot { - twitter_screen_name: mastodon_config.twitter_screen_name.clone(), - tweet_id: tweet.id, - toot_id: published_status.id, - }; + // build basic status by just yielding text and dereferencing contained urls + let mut status_text = build_basic_status(tweet); - // write the current state (tweet ID and toot ID) to avoid copying it another time - let lconn = task_conn.lock().await; - write_state(&lconn, ttt_towrite)?; - drop(lconn); - } - Ok::<(), ScootalooError>(()) - }); + // building associative media list + let (media_url, status_medias) = + generate_media_ids(tweet, &scootaloo_cache_path, &mastodon).await; - // push each task into the vec task - mtask.push(task); - } + status_text = status_text.replace(&media_url, ""); + + info!("Building corresponding Mastodon status"); + + let mut status_builder = StatusBuilder::new(); + + status_builder.status(&status_text).media_ids(status_medias); + + if let Some(i) = toot_reply_id { + status_builder.in_reply_to(&i); + } + + // can be activated for test purposes + // status_builder.visibility(elefren::status_builder::Visibility::Private); + + let status = status_builder.build()?; + + let published_status = mastodon.new_status(status)?; + // this will return if it cannot publish the status preventing the last_tweet from + // being written into db + + let ttt_towrite = TweetToToot { + twitter_screen_name: mastodon_config.twitter_screen_name.clone(), + tweet_id: tweet.id, + toot_id: published_status.id, + }; + + // write the current state (tweet ID and toot ID) to avoid copying it another time + let lconn = task_conn.lock().await; + write_state(&lconn, ttt_towrite)?; + drop(lconn); + } + Ok::<(), ScootalooError>(()) + }) + }) + .buffer_unordered(config.scootaloo.rate_limit.unwrap_or(DEFAULT_RATE_LIMIT)); // launch and wait for every handle - for handle in mtask { - match handle.await { + while let Some(result) = stream.next().await { + match result { Ok(Err(e)) => eprintln!("Error within thread: {}", e), Err(e) => eprintln!("Error with thread: {}", e), _ => (), diff --git a/src/util.rs b/src/util.rs index 4409d64..cc873bd 100644 --- a/src/util.rs +++ b/src/util.rs @@ -15,6 +15,8 @@ use tokio::{ io::copy, }; +use futures::{stream, stream::StreamExt}; + /// Generate associative table between media ids and tweet extended entities pub async fn generate_media_ids( tweet: &Tweet, @@ -25,45 +27,40 @@ pub async fn generate_media_ids( let mut media_ids: Vec = vec![]; if let Some(m) = &tweet.extended_entities { - // create tasks list - let mut tasks = vec![]; - - // size of media_ids vector, should be equal to the media vector - media_ids.resize(m.media.len(), String::new()); - info!("{} medias in tweet", m.media.len()); - for (i, media) in m.media.iter().enumerate() { - // attribute media url - media_url = media.url.clone(); + let medias = m.media.clone(); - // clone everything we need - let cache_path = String::from(cache_path); - let media = media.clone(); - let mastodon = mastodon.clone(); + let mut stream = stream::iter(medias) + .map(|media| { + // attribute media url + media_url = media.url.clone(); - let task = tokio::task::spawn(async move { - info!("Start treating {}", media.media_url_https); - // get the tweet embedded media - let local_tweet_media_path = get_tweet_media(&media, &cache_path).await?; + // clone everything we need + let cache_path = String::from(cache_path); + let mastodon = mastodon.clone(); - // upload media to Mastodon - let mastodon_media = - mastodon.media(Cow::from(local_tweet_media_path.to_owned()))?; - // at this point, we can safely erase the original file - // it doesn’t matter if we can’t remove, cache_media fn is idempotent - remove_file(&local_tweet_media_path).await.ok(); + tokio::task::spawn(async move { + info!("Start treating {}", media.media_url_https); + // get the tweet embedded media + let local_tweet_media_path = get_tweet_media(&media, &cache_path).await?; - Ok::<(usize, String), ScootalooError>((i, mastodon_media.id)) - }); + // upload media to Mastodon + let mastodon_media = + mastodon.media(Cow::from(local_tweet_media_path.to_owned()))?; + // at this point, we can safely erase the original file + // it doesn’t matter if we can’t remove, cache_media fn is idempotent + remove_file(&local_tweet_media_path).await.ok(); - tasks.push(task); - } + Ok::(mastodon_media.id) + }) + }) + .buffered(4); // there are max four medias per tweet and they need to be treated in + // order - for task in tasks { - match task.await { - // insert the media at the right place - Ok(Ok((i, v))) => media_ids[i] = v, + while let Some(result) = stream.next().await { + match result { + Ok(Ok(v)) => media_ids.push(v), Ok(Err(e)) => warn!("Cannot treat media: {}", e), Err(e) => error!("Something went wrong when joining the main thread: {}", e), }