refactor: use futures instead of tokio for media upload

This commit is contained in:
VC
2022-11-14 22:12:02 +01:00
parent 89de1cf7a3
commit ce84c05581
4 changed files with 41 additions and 45 deletions

3
Cargo.lock generated
View File

@@ -2103,12 +2103,13 @@ dependencies = [
[[package]] [[package]]
name = "scootaloo" name = "scootaloo"
version = "0.8.3" version = "0.9.0"
dependencies = [ dependencies = [
"chrono", "chrono",
"clap", "clap",
"egg-mode", "egg-mode",
"elefren", "elefren",
"futures 0.3.25",
"html-escape", "html-escape",
"log", "log",
"mime", "mime",

View File

@@ -1,6 +1,6 @@
[package] [package]
name = "scootaloo" name = "scootaloo"
version = "0.8.3" version = "0.9.0"
authors = ["VC <veretcle+framagit@mateu.be>"] authors = ["VC <veretcle+framagit@mateu.be>"]
edition = "2021" edition = "2021"
@@ -13,7 +13,8 @@ toml = "^0.5"
clap = "^4" clap = "^4"
egg-mode = "^0.16" egg-mode = "^0.16"
rusqlite = "^0.27" rusqlite = "^0.27"
tokio = { version = "1", features = ["full"]} tokio = { version = "^1", features = ["full"]}
futures = "^0.3"
elefren = "^0.22" elefren = "^0.22"
html-escape = "^0.2" html-escape = "^0.2"
reqwest = "^0.11" reqwest = "^0.11"

View File

@@ -25,12 +25,11 @@ use rusqlite::Connection;
use std::sync::Arc; use std::sync::Arc;
use tokio::{spawn, sync::Mutex}; use tokio::{spawn, sync::Mutex};
use futures::StreamExt;
/// This is where the magic happens /// This is where the magic happens
#[tokio::main] #[tokio::main]
pub async fn run(config: Config) { pub async fn run(config: Config) {
// create the task vector for handling multiple accounts
let mut mtask = vec![];
// open the SQLite connection // open the SQLite connection
let conn = Arc::new(Mutex::new( let conn = Arc::new(Mutex::new(
Connection::open(&config.scootaloo.db_path).unwrap_or_else(|e| { Connection::open(&config.scootaloo.db_path).unwrap_or_else(|e| {
@@ -41,13 +40,14 @@ pub async fn run(config: Config) {
}), }),
)); ));
for mastodon_config in config.mastodon.into_values() { let mut stream = futures::stream::iter(config.mastodon.into_values())
.map(|mastodon_config| {
// create temporary value for each task // create temporary value for each task
let scootaloo_cache_path = config.scootaloo.cache_path.clone(); let scootaloo_cache_path = config.scootaloo.cache_path.clone();
let token = get_oauth2_token(&config.twitter); let token = get_oauth2_token(&config.twitter);
let task_conn = conn.clone(); let task_conn = conn.clone();
let task = spawn(async move { spawn(async move {
info!("Starting treating {}", &mastodon_config.twitter_screen_name); info!("Starting treating {}", &mastodon_config.twitter_screen_name);
// retrieve the last tweet ID for the username // retrieve the last tweet ID for the username
@@ -138,14 +138,11 @@ pub async fn run(config: Config) {
} }
Ok::<(), ScootalooError>(()) Ok::<(), ScootalooError>(())
}); });
}).buffer_unordered(4);
// push each task into the vec task
mtask.push(task);
}
// launch and wait for every handle // launch and wait for every handle
for handle in mtask { while let Some(result) = stream.next().await {
match handle.await { match result {
Ok(Err(e)) => eprintln!("Error within thread: {}", e), Ok(Err(e)) => eprintln!("Error within thread: {}", e),
Err(e) => eprintln!("Error with thread: {}", e), Err(e) => eprintln!("Error with thread: {}", e),
_ => (), _ => (),

View File

@@ -15,6 +15,8 @@ use tokio::{
io::copy, io::copy,
}; };
use futures::{stream, stream::StreamExt};
/// Generate associative table between media ids and tweet extended entities /// Generate associative table between media ids and tweet extended entities
pub async fn generate_media_ids( pub async fn generate_media_ids(
tweet: &Tweet, tweet: &Tweet,
@@ -25,24 +27,20 @@ pub async fn generate_media_ids(
let mut media_ids: Vec<String> = vec![]; let mut media_ids: Vec<String> = vec![];
if let Some(m) = &tweet.extended_entities { if let Some(m) = &tweet.extended_entities {
// create tasks list
let mut tasks = vec![];
// size of media_ids vector, should be equal to the media vector
media_ids.resize(m.media.len(), String::new());
info!("{} medias in tweet", m.media.len()); info!("{} medias in tweet", m.media.len());
for (i, media) in m.media.iter().enumerate() { let medias = m.media.clone();
let mut stream = stream::iter(medias)
.map(|media| {
// attribute media url // attribute media url
media_url = media.url.clone(); media_url = media.url.clone();
// clone everything we need // clone everything we need
let cache_path = String::from(cache_path); let cache_path = String::from(cache_path);
let media = media.clone();
let mastodon = mastodon.clone(); let mastodon = mastodon.clone();
let task = tokio::task::spawn(async move { tokio::task::spawn(async move {
info!("Start treating {}", media.media_url_https); info!("Start treating {}", media.media_url_https);
// get the tweet embedded media // get the tweet embedded media
let local_tweet_media_path = get_tweet_media(&media, &cache_path).await?; let local_tweet_media_path = get_tweet_media(&media, &cache_path).await?;
@@ -54,16 +52,15 @@ pub async fn generate_media_ids(
// it doesnt matter if we cant remove, cache_media fn is idempotent // it doesnt matter if we cant remove, cache_media fn is idempotent
remove_file(&local_tweet_media_path).await.ok(); remove_file(&local_tweet_media_path).await.ok();
Ok::<(usize, String), ScootalooError>((i, mastodon_media.id)) Ok::<String, ScootalooError>(mastodon_media.id)
}); })
})
.buffered(4); // there are max four medias per tweet and they need to be treated in
// order
tasks.push(task); while let Some(result) = stream.next().await {
} match result {
Ok(Ok(v)) => media_ids.push(v),
for task in tasks {
match task.await {
// insert the media at the right place
Ok(Ok((i, v))) => media_ids[i] = v,
Ok(Err(e)) => warn!("Cannot treat media: {}", e), Ok(Err(e)) => warn!("Cannot treat media: {}", e),
Err(e) => error!("Something went wrong when joining the main thread: {}", e), Err(e) => error!("Something went wrong when joining the main thread: {}", e),
} }