From df7552017541f7143103d7a25b50764e5b3546f9 Mon Sep 17 00:00:00 2001 From: VC Date: Fri, 4 Nov 2022 10:23:05 +0100 Subject: [PATCH 1/2] feat: async treatment of all accounts --- Cargo.lock | 2 +- Cargo.toml | 2 +- src/lib.rs | 254 +++++++++++++++++++++++++++---------------------- src/twitter.rs | 16 +--- 4 files changed, 148 insertions(+), 126 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index ab595dd..cb23115 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2016,7 +2016,7 @@ dependencies = [ [[package]] name = "scootaloo" -version = "0.7.0" +version = "0.7.1" dependencies = [ "chrono", "clap", diff --git a/Cargo.toml b/Cargo.toml index 828fc6f..ec236f3 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "scootaloo" -version = "0.7.0" +version = "0.7.1" authors = ["VC "] edition = "2021" diff --git a/src/lib.rs b/src/lib.rs index 6918cd2..93d45ff 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -21,144 +21,172 @@ use state::{read_state, write_state, TweetToToot}; use elefren::{prelude::*, status_builder::StatusBuilder}; use log::{debug, error, info, warn}; use rusqlite::Connection; -use std::borrow::Cow; -use tokio::fs::remove_file; +use std::{borrow::Cow, sync::Arc}; +use tokio::{fs::remove_file, spawn, sync::Mutex, task::JoinHandle}; /// This is where the magic happens #[tokio::main] pub async fn run(config: Config) { - // get OAuth2 token - let token = get_oauth2_token(&config.twitter); + // create the task vector for handling multiple accounts + let mut mtask: Vec> = vec![]; - for mastodon_config in config.mastodon.values() { - // open the SQLite connection - let conn = Connection::open(&config.scootaloo.db_path).unwrap_or_else(|e| { + // open the SQLite connection + let conn = Arc::new(Mutex::new( + Connection::open(&config.scootaloo.db_path).unwrap_or_else(|e| { panic!( "Something went wrong when opening the DB {}: {}", &config.scootaloo.db_path, e ) - }); - // retrieve the last tweet ID for the username - let last_tweet_id = read_state(&conn, &mastodon_config.twitter_screen_name, None) - .unwrap_or_else(|e| panic!("Cannot retrieve last_tweet_id: {}", e)) - .map(|s| s.tweet_id); + }), + )); - // get Mastodon instance - let mastodon = get_mastodon_token(mastodon_config); + for mastodon_config in config.mastodon.into_values() { + // create temporary value for each task + let scootaloo_cache_path = config.scootaloo.cache_path.clone(); + let token = get_oauth2_token(&config.twitter); + let task_conn = conn.clone(); - // get user timeline feed (Vec) - let mut feed = get_user_timeline(mastodon_config, &token, last_tweet_id) - .await - .unwrap_or_else(|e| { - panic!( - "Something went wrong when trying to retrieve {}’s timeline: {}", - &mastodon_config.twitter_screen_name, e - ) - }); + let task = spawn(async move { + debug!("Starting treating {}", &mastodon_config.twitter_screen_name); - // empty feed -> exiting - if feed.is_empty() { - info!("Nothing to retrieve since last time, exiting…"); - return; - } + // retrieve the last tweet ID for the username + let lconn = task_conn.lock().await; + let last_tweet_id = read_state(&lconn, &mastodon_config.twitter_screen_name, None) + .unwrap_or_else(|e| panic!("Cannot retrieve last_tweet_id: {}", e)) + .map(|s| s.tweet_id); + drop(lconn); - // order needs to be chronological - feed.reverse(); + // get Mastodon instance + let mastodon = get_mastodon_token(&mastodon_config); - for tweet in &feed { - debug!("Treating Tweet {} inside feed", tweet.id); - // initiate the toot_reply_id var - let mut toot_reply_id: Option = None; - // determine if the tweet is part of a thread (response to self) or a standard response - if let Some(r) = &tweet.in_reply_to_screen_name { - if r.to_lowercase() != mastodon_config.twitter_screen_name.to_lowercase() { - // we are responding not threadin - info!("Tweet is a direct response, skipping"); - continue; - } - info!("Tweet is a thread"); - toot_reply_id = read_state( - &conn, - &mastodon_config.twitter_screen_name, - tweet.in_reply_to_status_id, - ) - .unwrap_or(None) - .map(|s| s.toot_id); - }; + // get user timeline feed (Vec) + let mut feed = + get_user_timeline(&mastodon_config.twitter_screen_name, &token, last_tweet_id) + .await + .unwrap_or_else(|e| { + panic!( + "Something went wrong when trying to retrieve {}’s timeline: {}", + &mastodon_config.twitter_screen_name, e + ) + }); - // build basic status by just yielding text and dereferencing contained urls - let mut status_text = build_basic_status(tweet); + // empty feed -> exiting + if feed.is_empty() { + info!("Nothing to retrieve since last time, exiting…"); + return; + } - let mut status_medias: Vec = vec![]; - // reupload the attachments if any - if let Some(m) = &tweet.extended_entities { - for media in &m.media { - let local_tweet_media_path = - match get_tweet_media(media, &config.scootaloo.cache_path).await { - Ok(m) => m, + // order needs to be chronological + feed.reverse(); + + for tweet in &feed { + debug!("Treating Tweet {} inside feed", tweet.id); + // initiate the toot_reply_id var + let mut toot_reply_id: Option = None; + // determine if the tweet is part of a thread (response to self) or a standard response + if let Some(r) = &tweet.in_reply_to_screen_name { + if r.to_lowercase() != mastodon_config.twitter_screen_name.to_lowercase() { + // we are responding not threading + info!("Tweet is a direct response, skipping"); + continue; + } + info!("Tweet is a thread"); + // get the corresponding toot id + let lconn = task_conn.lock().await; + toot_reply_id = read_state( + &lconn, + &mastodon_config.twitter_screen_name, + tweet.in_reply_to_status_id, + ) + .unwrap_or(None) + .map(|s| s.toot_id); + drop(lconn); + }; + + // build basic status by just yielding text and dereferencing contained urls + let mut status_text = build_basic_status(tweet); + + let mut status_medias: Vec = vec![]; + // reupload the attachments if any + if let Some(m) = &tweet.extended_entities { + for media in &m.media { + let local_tweet_media_path = + match get_tweet_media(media, &scootaloo_cache_path).await { + Ok(m) => m, + Err(e) => { + error!("Cannot get tweet media for {}: {}", &media.url, e); + continue; + } + }; + + let mastodon_media_ids = match mastodon + .media(Cow::from(local_tweet_media_path.to_owned())) + { + Ok(m) => { + remove_file(&local_tweet_media_path) + .await + .unwrap_or_else(|e| + warn!("Attachment for {} has been uploaded, but I’m unable to remove the existing file: {}", &local_tweet_media_path, e) + ); + m.id + } Err(e) => { - error!("Cannot get tweet media for {}: {}", &media.url, e); + error!( + "Attachment {} cannot be uploaded to Mastodon Instance: {}", + &local_tweet_media_path, e + ); continue; } }; - let mastodon_media_ids = match mastodon - .media(Cow::from(local_tweet_media_path.to_owned())) - { - Ok(m) => { - remove_file(&local_tweet_media_path) - .await - .unwrap_or_else(|e| - warn!("Attachment for {} has been uploaded, but I’m unable to remove the existing file: {}", &local_tweet_media_path, e) - ); - m.id - } - Err(e) => { - error!( - "Attachment {} cannot be uploaded to Mastodon Instance: {}", - &local_tweet_media_path, e - ); - continue; - } - }; + status_medias.push(mastodon_media_ids); - status_medias.push(mastodon_media_ids); - - // last step, removing the reference to the media from with the toot’s text - status_text = status_text.replace(&media.url, ""); + // last step, removing the reference to the media from with the toot’s text + status_text = status_text.replace(&media.url, ""); + } } + // finished reuploading attachments, now let’s do the toot baby! + + debug!("Building corresponding Mastodon status"); + + let mut status_builder = StatusBuilder::new(); + + status_builder.status(&status_text).media_ids(status_medias); + + if let Some(i) = toot_reply_id { + status_builder.in_reply_to(&i); + } + + let status = status_builder + .build() + .unwrap_or_else(|_| panic!("Cannot build status with text {}", &status_text)); + + // publish status + // again unwrap is safe here as we are in the main thread + let published_status = mastodon.new_status(status).unwrap(); + // this will panic if it cannot publish the status, which is a good thing, it allows the + // last_tweet gathered not to be written + + let ttt_towrite = TweetToToot { + twitter_screen_name: mastodon_config.twitter_screen_name.clone(), + tweet_id: tweet.id, + toot_id: published_status.id, + }; + + // write the current state (tweet ID and toot ID) to avoid copying it another time + let lconn = task_conn.lock().await; + write_state(&lconn, ttt_towrite) + .unwrap_or_else(|e| panic!("Can’t write the last tweet retrieved: {}", e)); + drop(lconn); } - // finished reuploading attachments, now let’s do the toot baby! + }); - debug!("Building corresponding Mastodon status"); + // push each task into the vec task + mtask.push(task); + } - let mut status_builder = StatusBuilder::new(); - - status_builder.status(&status_text).media_ids(status_medias); - - if let Some(i) = toot_reply_id { - status_builder.in_reply_to(&i); - } - - let status = status_builder - .build() - .unwrap_or_else(|_| panic!("Cannot build status with text {}", &status_text)); - - // publish status - // again unwrap is safe here as we are in the main thread - let published_status = mastodon.new_status(status).unwrap(); - // this will panic if it cannot publish the status, which is a good thing, it allows the - // last_tweet gathered not to be written - - let ttt_towrite = TweetToToot { - twitter_screen_name: mastodon_config.twitter_screen_name.clone(), - tweet_id: tweet.id, - toot_id: published_status.id, - }; - - // write the current state (tweet ID and toot ID) to avoid copying it another time - write_state(&conn, ttt_towrite) - .unwrap_or_else(|e| panic!("Can’t write the last tweet retrieved: {}", e)); - } + // launch and wait for every handle + for handle in mtask { + handle.await.unwrap(); } } diff --git a/src/twitter.rs b/src/twitter.rs index d827885..e9a1816 100644 --- a/src/twitter.rs +++ b/src/twitter.rs @@ -1,4 +1,3 @@ -use crate::config::MastodonConfig; use crate::config::TwitterConfig; use crate::util::cache_media; use crate::ScootalooError; @@ -30,20 +29,15 @@ pub fn get_oauth2_token(config: &TwitterConfig) -> Token { /// Gets Twitter user timeline pub async fn get_user_timeline( - config: &MastodonConfig, + screen_name: &str, token: &Token, lid: Option, ) -> Result, Box> { // fix the page size to 200 as it is the maximum Twitter authorizes - let (_, feed) = user_timeline( - UserID::from(config.twitter_screen_name.to_owned()), - true, - false, - token, - ) - .with_page_size(200) - .older(lid) - .await?; + let (_, feed) = user_timeline(UserID::from(screen_name.to_owned()), true, false, token) + .with_page_size(20) + .older(lid) + .await?; Ok(feed.to_vec()) } From de758c7bda89b0252a0220e5116264bb5ca6f575 Mon Sep 17 00:00:00 2001 From: VC Date: Sat, 5 Nov 2022 07:57:03 +0100 Subject: [PATCH 2/2] refactor: separate function for media ids --- Cargo.lock | 2 +- Cargo.toml | 2 +- src/lib.rs | 53 ++++++++++------------------------------------ src/twitter.rs | 2 +- src/util.rs | 57 +++++++++++++++++++++++++++++++++++++++++++++++--- 5 files changed, 68 insertions(+), 48 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index cb23115..c801a9b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2016,7 +2016,7 @@ dependencies = [ [[package]] name = "scootaloo" -version = "0.7.1" +version = "0.7.2" dependencies = [ "chrono", "clap", diff --git a/Cargo.toml b/Cargo.toml index ec236f3..d866bd5 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "scootaloo" -version = "0.7.1" +version = "0.7.2" authors = ["VC "] edition = "2021" diff --git a/src/lib.rs b/src/lib.rs index 93d45ff..4c68bb7 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -13,16 +13,17 @@ mod twitter; use twitter::*; mod util; +use crate::util::generate_media_ids; mod state; pub use state::{init_db, migrate_db}; use state::{read_state, write_state, TweetToToot}; use elefren::{prelude::*, status_builder::StatusBuilder}; -use log::{debug, error, info, warn}; +use log::{debug, info}; use rusqlite::Connection; -use std::{borrow::Cow, sync::Arc}; -use tokio::{fs::remove_file, spawn, sync::Mutex, task::JoinHandle}; +use std::sync::Arc; +use tokio::{spawn, sync::Mutex, task::JoinHandle}; /// This is where the magic happens #[tokio::main] @@ -106,46 +107,11 @@ pub async fn run(config: Config) { // build basic status by just yielding text and dereferencing contained urls let mut status_text = build_basic_status(tweet); - let mut status_medias: Vec = vec![]; - // reupload the attachments if any - if let Some(m) = &tweet.extended_entities { - for media in &m.media { - let local_tweet_media_path = - match get_tweet_media(media, &scootaloo_cache_path).await { - Ok(m) => m, - Err(e) => { - error!("Cannot get tweet media for {}: {}", &media.url, e); - continue; - } - }; + // building associative media list + let (media_url, status_medias) = + generate_media_ids(tweet, &scootaloo_cache_path, &mastodon).await; - let mastodon_media_ids = match mastodon - .media(Cow::from(local_tweet_media_path.to_owned())) - { - Ok(m) => { - remove_file(&local_tweet_media_path) - .await - .unwrap_or_else(|e| - warn!("Attachment for {} has been uploaded, but I’m unable to remove the existing file: {}", &local_tweet_media_path, e) - ); - m.id - } - Err(e) => { - error!( - "Attachment {} cannot be uploaded to Mastodon Instance: {}", - &local_tweet_media_path, e - ); - continue; - } - }; - - status_medias.push(mastodon_media_ids); - - // last step, removing the reference to the media from with the toot’s text - status_text = status_text.replace(&media.url, ""); - } - } - // finished reuploading attachments, now let’s do the toot baby! + status_text = status_text.replace(&media_url, ""); debug!("Building corresponding Mastodon status"); @@ -157,6 +123,9 @@ pub async fn run(config: Config) { status_builder.in_reply_to(&i); } + // can be activated for test purposes + // status_builder.visibility(elefren::status_builder::Visibility::Private); + let status = status_builder .build() .unwrap_or_else(|_| panic!("Cannot build status with text {}", &status_text)); diff --git a/src/twitter.rs b/src/twitter.rs index e9a1816..50e4ac8 100644 --- a/src/twitter.rs +++ b/src/twitter.rs @@ -35,7 +35,7 @@ pub async fn get_user_timeline( ) -> Result, Box> { // fix the page size to 200 as it is the maximum Twitter authorizes let (_, feed) = user_timeline(UserID::from(screen_name.to_owned()), true, false, token) - .with_page_size(20) + .with_page_size(200) .older(lid) .await?; diff --git a/src/util.rs b/src/util.rs index 1517718..8aa34a8 100644 --- a/src/util.rs +++ b/src/util.rs @@ -1,11 +1,62 @@ -use crate::ScootalooError; +use crate::{twitter::get_tweet_media, ScootalooError}; +use egg_mode::tweet::Tweet; +use elefren::prelude::*; +use log::{error, warn}; use reqwest::Url; -use std::error::Error; +use std::{borrow::Cow, error::Error}; use tokio::{ - fs::{create_dir_all, File}, + fs::{create_dir_all, remove_file, File}, io::copy, }; +/// Generate associative table between media ids and tweet extended entities +pub async fn generate_media_ids( + tweet: &Tweet, + cache_path: &str, + mastodon: &Mastodon, +) -> (String, Vec) { + let mut media_ids: Vec = vec![]; + let mut media_url: String = "".to_string(); + + if let Some(m) = &tweet.extended_entities { + for media in &m.media { + // attribute the media url + media_url = media.url.clone(); + let local_tweet_media_path = match get_tweet_media(media, cache_path).await { + Ok(m) => m, + Err(e) => { + error!("Cannot get tweet media for {}: {}", &media.url, e); + continue; + } + }; + + let mastodon_media_ids = match mastodon + .media(Cow::from(local_tweet_media_path.to_owned())) + { + Ok(m) => { + remove_file(&local_tweet_media_path).await.unwrap_or_else(|e| + warn!("Attachment for {} has been uploaded, but I’m unable to remove the existing file: {}", &local_tweet_media_path, e)); + m.id + } + Err(e) => { + error!( + "Attachment {} cannot be uploaded to Mastodon Instance: {}", + &local_tweet_media_path, e + ); + // file is no longer useful, deleting + remove_file(&local_tweet_media_path).await.unwrap_or_else(|e| + warn!("Attachment for {} has been uploaded, but I’m unable to remove the existing file: {}", &local_tweet_media_path, e)); + continue; + } + }; + + media_ids.push(mastodon_media_ids); + } + } + + (media_url, media_ids) +} + /// Gets and caches Twitter Media inside the determined temp dir pub async fn cache_media(u: &str, t: &str) -> Result> { // create dir