diff --git a/src/lib.rs b/src/lib.rs index af8210e..38fe096 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -27,6 +27,8 @@ use tokio::{spawn, sync::Mutex}; use futures::StreamExt; +const DEFAULT_RATE_LIMIT: usize = 4; + /// This is where the magic happens #[tokio::main] pub async fn run(config: Config) { @@ -42,103 +44,104 @@ pub async fn run(config: Config) { let mut stream = futures::stream::iter(config.mastodon.into_values()) .map(|mastodon_config| { - // create temporary value for each task - let scootaloo_cache_path = config.scootaloo.cache_path.clone(); - let token = get_oauth2_token(&config.twitter); - let task_conn = conn.clone(); + // create temporary value for each task + let scootaloo_cache_path = config.scootaloo.cache_path.clone(); + let token = get_oauth2_token(&config.twitter); + let task_conn = conn.clone(); - spawn(async move { - info!("Starting treating {}", &mastodon_config.twitter_screen_name); + spawn(async move { + info!("Starting treating {}", &mastodon_config.twitter_screen_name); - // retrieve the last tweet ID for the username - let lconn = task_conn.lock().await; - let last_tweet_id = - read_state(&lconn, &mastodon_config.twitter_screen_name, None)?.map(|r| r.tweet_id); - drop(lconn); + // retrieve the last tweet ID for the username + let lconn = task_conn.lock().await; + let last_tweet_id = read_state(&lconn, &mastodon_config.twitter_screen_name, None)? + .map(|r| r.tweet_id); + drop(lconn); - // get user timeline feed (Vec) - let mut feed = - get_user_timeline(&mastodon_config.twitter_screen_name, &token, last_tweet_id) - .await?; + // get user timeline feed (Vec) + let mut feed = + get_user_timeline(&mastodon_config.twitter_screen_name, &token, last_tweet_id) + .await?; - // empty feed -> exiting - if feed.is_empty() { - info!("Nothing to retrieve since last time, exiting…"); - return Ok(()); - } - - // get Mastodon instance - let mastodon = get_mastodon_token(&mastodon_config); - - // order needs to be chronological - feed.reverse(); - - for tweet in &feed { - info!("Treating Tweet {} inside feed", tweet.id); - // initiate the toot_reply_id var - let mut toot_reply_id: Option = None; - // determine if the tweet is part of a thread (response to self) or a standard response - if let Some(r) = &tweet.in_reply_to_screen_name { - if r.to_lowercase() != mastodon_config.twitter_screen_name.to_lowercase() { - // we are responding not threading - info!("Tweet is a direct response, skipping"); - continue; - } - info!("Tweet is a thread"); - // get the corresponding toot id - let lconn = task_conn.lock().await; - toot_reply_id = read_state( - &lconn, - &mastodon_config.twitter_screen_name, - tweet.in_reply_to_status_id, - ) - .unwrap_or(None) - .map(|s| s.toot_id); - drop(lconn); - }; - - // build basic status by just yielding text and dereferencing contained urls - let mut status_text = build_basic_status(tweet); - - // building associative media list - let (media_url, status_medias) = - generate_media_ids(tweet, &scootaloo_cache_path, &mastodon).await; - - status_text = status_text.replace(&media_url, ""); - - info!("Building corresponding Mastodon status"); - - let mut status_builder = StatusBuilder::new(); - - status_builder.status(&status_text).media_ids(status_medias); - - if let Some(i) = toot_reply_id { - status_builder.in_reply_to(&i); + // empty feed -> exiting + if feed.is_empty() { + info!("Nothing to retrieve since last time, exiting…"); + return Ok(()); } - // can be activated for test purposes - // status_builder.visibility(elefren::status_builder::Visibility::Private); + // get Mastodon instance + let mastodon = get_mastodon_token(&mastodon_config); - let status = status_builder.build()?; + // order needs to be chronological + feed.reverse(); - let published_status = mastodon.new_status(status)?; - // this will return if it cannot publish the status preventing the last_tweet from - // being written into db + for tweet in &feed { + info!("Treating Tweet {} inside feed", tweet.id); + // initiate the toot_reply_id var + let mut toot_reply_id: Option = None; + // determine if the tweet is part of a thread (response to self) or a standard response + if let Some(r) = &tweet.in_reply_to_screen_name { + if r.to_lowercase() != mastodon_config.twitter_screen_name.to_lowercase() { + // we are responding not threading + info!("Tweet is a direct response, skipping"); + continue; + } + info!("Tweet is a thread"); + // get the corresponding toot id + let lconn = task_conn.lock().await; + toot_reply_id = read_state( + &lconn, + &mastodon_config.twitter_screen_name, + tweet.in_reply_to_status_id, + ) + .unwrap_or(None) + .map(|s| s.toot_id); + drop(lconn); + }; - let ttt_towrite = TweetToToot { - twitter_screen_name: mastodon_config.twitter_screen_name.clone(), - tweet_id: tweet.id, - toot_id: published_status.id, - }; + // build basic status by just yielding text and dereferencing contained urls + let mut status_text = build_basic_status(tweet); - // write the current state (tweet ID and toot ID) to avoid copying it another time - let lconn = task_conn.lock().await; - write_state(&lconn, ttt_towrite)?; - drop(lconn); - } - Ok::<(), ScootalooError>(()) - }); - }).buffer_unordered(4); + // building associative media list + let (media_url, status_medias) = + generate_media_ids(tweet, &scootaloo_cache_path, &mastodon).await; + + status_text = status_text.replace(&media_url, ""); + + info!("Building corresponding Mastodon status"); + + let mut status_builder = StatusBuilder::new(); + + status_builder.status(&status_text).media_ids(status_medias); + + if let Some(i) = toot_reply_id { + status_builder.in_reply_to(&i); + } + + // can be activated for test purposes + // status_builder.visibility(elefren::status_builder::Visibility::Private); + + let status = status_builder.build()?; + + let published_status = mastodon.new_status(status)?; + // this will return if it cannot publish the status preventing the last_tweet from + // being written into db + + let ttt_towrite = TweetToToot { + twitter_screen_name: mastodon_config.twitter_screen_name.clone(), + tweet_id: tweet.id, + toot_id: published_status.id, + }; + + // write the current state (tweet ID and toot ID) to avoid copying it another time + let lconn = task_conn.lock().await; + write_state(&lconn, ttt_towrite)?; + drop(lconn); + } + Ok::<(), ScootalooError>(()) + }) + }) + .buffer_unordered(config.scootaloo.rate_limit.unwrap_or(DEFAULT_RATE_LIMIT)); // launch and wait for every handle while let Some(result) = stream.next().await {