Still a WIP: need to use async reqwest to respect the global context of usage (reqwest::blocking is using async inside so it does not really sync whatever)

This commit is contained in:
VC
2021-04-18 17:00:44 +00:00
parent 8bcf078ad9
commit 2e052ebf6a
6 changed files with 316 additions and 255 deletions

View File

@@ -1,18 +1,14 @@
// std
use std::{
path::Path,
borrow::Cow,
collections::HashMap,
io::{stdin, copy},
io::stdin,
fmt,
fs::{read_to_string, write, create_dir_all, File, remove_file},
fs::{read_to_string, write},
error::Error,
sync::{Arc, Mutex},
};
//tokio
use tokio::runtime::Runtime;
use tokio_compat_02::FutureExt;
// toml
use serde::Deserialize;
@@ -37,34 +33,45 @@ use elefren::{
};
// reqwest
use reqwest::blocking::Client;
use reqwest::Url;
// tokio
use tokio::{
io::copy,
fs::{File, create_dir_all, remove_file},
sync::mpsc,
};
// htmlescape
use htmlescape::decode_html;
// log
use log::{info, warn, error, debug};
/**********
* Generic usage functions
***********/
/*
* Those functions are related to the Twitter side of things
*/
/// Read last tweet id from a file
/// Reads last tweet id from a file
fn read_state(s: &str) -> Option<u64> {
let state = read_to_string(s);
if let Ok(s) = state {
debug!("Last Tweet ID (from file): {}", &s);
return s.parse::<u64>().ok();
}
None
}
/// Write last treated tweet id to a file
/// Writes last treated tweet id to a file
fn write_state(f: &str, s: u64) -> Result<(), std::io::Error> {
write(f, format!("{}", s))
}
/// Get twitter oauth2 token
/// Gets Twitter oauth2 token
fn get_oauth2_token(config: &Config) -> Token {
let con_token = KeyPair::new(String::from(&config.twitter.consumer_key), String::from(&config.twitter.consumer_secret));
let access_token = KeyPair::new(String::from(&config.twitter.access_key), String::from(&config.twitter.access_secret));
@@ -75,19 +82,18 @@ fn get_oauth2_token(config: &Config) -> Token {
}
}
/// Get twitter user timeline
fn get_user_timeline(config: &Config, token: Token, lid: Option<u64>) -> Result<Vec<Tweet>, Box<dyn Error>> {
/// Gets Twitter user timeline
async fn get_user_timeline(config: &Config, token: Token, lid: Option<u64>) -> Result<Vec<Tweet>, Box<dyn Error>> {
// fix the page size to 200 as it is the maximum Twitter authorizes
let rt = Runtime::new()?;
let (_timeline, feed) = rt.block_on(user_timeline(UserID::from(String::from(&config.twitter.username)), true, false, &token)
let (_, feed) = user_timeline(UserID::from(String::from(&config.twitter.username)), true, false, &token)
.with_page_size(200)
.older(lid)
.compat())?;
.await?;
Ok(feed.to_vec())
}
/// decode urls from UrlEntities
/// Decodes urls from UrlEntities
fn decode_urls(urls: &Vec<UrlEntity>) -> HashMap<String, String> {
let mut decoded_urls = HashMap::new();
@@ -101,6 +107,8 @@ fn decode_urls(urls: &Vec<UrlEntity>) -> HashMap<String, String> {
decoded_urls
}
/// Decodes the Twitter mention to something that will make sense once Twitter has joined the
/// Fediverse
fn twitter_mentions(ums: &Vec<MentionEntity>) -> HashMap<String, String> {
let mut decoded_mentions = HashMap::new();
@@ -111,18 +119,18 @@ fn twitter_mentions(ums: &Vec<MentionEntity>) -> HashMap<String, String> {
decoded_mentions
}
/// Retrieve a single media from a tweet and store it in a temporary file
fn get_tweet_media(m: &MediaEntity, t: &str) -> Result<String, Box<dyn Error>> {
/// Retrieves a single media from a tweet and store it in a temporary file
async fn get_tweet_media(m: &MediaEntity, t: &str) -> Result<String, Box<dyn Error>> {
match m.media_type {
MediaType::Photo => {
return cache_media(&m.media_url_https, t);
return cache_media(&m.media_url_https, t).await;
},
_ => {
match &m.video_info {
Some(v) => {
for variant in &v.variants {
if variant.content_type == "video/mp4" {
return cache_media(&variant.url, t);
return cache_media(&variant.url, t).await;
}
}
return Err(Box::new(ScootalooError::new(format!("Media Type for {} is video but no mp4 file URL is available", &m.url).as_str())));
@@ -138,7 +146,7 @@ fn get_tweet_media(m: &MediaEntity, t: &str) -> Result<String, Box<dyn Error>> {
/*
* Those functions are related to the Mastodon side of things
*/
/// Get Mastodon Data
/// Gets Mastodon Data
fn get_mastodon_token(masto: &MastodonConfig) -> Mastodon {
let data = Data {
base: Cow::from(String::from(&masto.base)),
@@ -151,7 +159,7 @@ fn get_mastodon_token(masto: &MastodonConfig) -> Mastodon {
Mastodon::from(data)
}
/// build toot text from tweet
/// Builds toot text from tweet
fn build_basic_status(tweet: &Tweet) -> Result<String, Box<dyn Error>> {
let mut toot = String::from(&tweet.text);
@@ -177,35 +185,40 @@ fn build_basic_status(tweet: &Tweet) -> Result<String, Box<dyn Error>> {
/*
* Generic private functions
*/
fn cache_media(u: &str, t: &str) -> Result<String, Box<dyn Error>> {
/// Gets and caches Twitter Media inside the determined temp dir
async fn cache_media(u: &str, t: &str) -> Result<String, Box<dyn Error>> {
// create dir
if !Path::new(t).is_dir() {
create_dir_all(t)?;
}
create_dir_all(t).await?;
// get file
let client = Client::new();
let mut response = client.get(u).send()?;
let mut response = reqwest::get(u).await?;
// create local file
let dest_filename = match response.url()
.path_segments()
.and_then(|segments| segments.last()) {
Some(r) => r,
None => {
return Err(Box::new(ScootalooError::new(format!("Cannot determine the destination filename for {}", u).as_str())));
},
};
let url = Url::parse(u)?;
let dest_filename = url.path_segments().ok_or_else(|| Box::new(ScootalooError::new(format!("Cannot determine the destination filename for {}", u).as_str())))?
.last().ok_or_else(|| Box::new(ScootalooError::new(format!("Cannot determine the destination filename for {}", u).as_str())))?;
let dest_filepath = format!("{}/{}", t, dest_filename);
let mut dest_file = File::create(&dest_filepath)?;
let mut dest_file = File::create(&dest_filepath).await?;
copy(&mut response, &mut dest_file)?;
while let Some(chunk) = response.chunk().await? {
copy(&mut &*chunk, &mut dest_file).await?;
}
Ok(dest_filepath)
}
/**********
* This is the struct that holds the Mastodon Media ID and the Twitter Media URL at the same Time
**********/
#[derive(Debug)]
struct ScootalooSpawnResponse {
mastodon_media_id: String,
twitter_media_url: String,
}
/**********
* local error handler
**********/
@@ -287,7 +300,7 @@ pub fn parse_toml(toml_file: &str) -> Config {
/// Generic register function
/// As this function is supposed to be run only once, it will panic for every error it encounters
/// Most of this function is a direct copy/paste of the official `mammut` crate
/// Most of this function is a direct copy/paste of the official `elefren` crate
pub fn register(host: &str) {
let mut builder = App::builder();
builder.client_name(Cow::from(String::from(env!("CARGO_PKG_NAME"))))
@@ -315,7 +328,8 @@ pub fn register(host: &str) {
}
/// This is where the magic happens
pub fn run(config: Config) {
#[tokio::main]
pub async fn run(config: Config) {
// retrieve the last tweet ID for the username
let last_tweet_id = read_state(&config.scootaloo.last_tweet_path);
@@ -323,16 +337,18 @@ pub fn run(config: Config) {
let token = get_oauth2_token(&config);
// get Mastodon instance
let mastodon = get_mastodon_token(&config.mastodon);
let mastodon = Arc::new(Mutex::new(get_mastodon_token(&config.mastodon)));
// get user timeline feed (Vec<tweet>)
let mut feed = get_user_timeline(&config, token, last_tweet_id).unwrap_or_else(|e|
let mut feed = get_user_timeline(&config, token, last_tweet_id)
.await
.unwrap_or_else(|e|
panic!("Something went wrong when trying to retrieve {}s timeline: {}", &config.twitter.username, e)
);
// empty feed -> exiting
if feed.is_empty() {
println!("Nothing to retrieve since last time, exiting…");
info!("Nothing to retrieve since last time, exiting…");
return;
}
@@ -340,10 +356,12 @@ pub fn run(config: Config) {
feed.reverse();
for tweet in &feed {
debug!("Treating Tweet {} inside feed", tweet.id);
// determine if the tweet is part of a thread (response to self) or a standard response
if let Some(r) = &tweet.in_reply_to_screen_name {
if &r.to_lowercase() != &config.twitter.username.to_lowercase() {
// we are responding not threading
info!("Tweet is a direct response, skipping");
continue;
}
};
@@ -352,7 +370,7 @@ pub fn run(config: Config) {
let mut status_text = match build_basic_status(tweet) {
Ok(t) => t,
Err(e) => {
println!("Could not create status from tweet {}: {}", tweet.id ,e);
error!("Could not create status from tweet {}: {}", tweet.id ,e);
continue;
},
};
@@ -361,35 +379,64 @@ pub fn run(config: Config) {
// reupload the attachments if any
if let Some(m) = &tweet.extended_entities {
let (tx, mut rx) = mpsc::channel(4);
for media in &m.media {
let local_tweet_media_path = match get_tweet_media(&media, &config.scootaloo.cache_path) {
Ok(m) => m,
Err(e) => {
println!("Cannot get tweet media for {}: {}", &media.url, e);
continue;
},
};
// creating a new tx for this initial loop
let tx = tx.clone();
// creating a new mastodon from the original mutex
let mastodon = mastodon.clone();
// unfortunately for this to be thread safe, we need to clone a lot of structures
let media = media.clone();
let cache_path = config.scootaloo.cache_path.clone();
let mastodon_media_ids = match mastodon.media(Cow::from(String::from(&local_tweet_media_path))) {
Ok(m) => {
remove_file(&local_tweet_media_path).unwrap_or_else(|e|
println!("Attachment for {} has been upload, but Im unable to remove the existing file: {}", &local_tweet_media_path, e)
);
m.id
},
Err(e) => {
println!("Cannot attach media {} to Mastodon Instance: {}", &local_tweet_media_path, e);
continue;
tokio::spawn(async move {
debug!("Spawing new async thread to treat {}", &media.id);
let local_tweet_media_path = match get_tweet_media(&media, &cache_path).await {
Ok(m) => m,
Err(e) => {
// we could have panicked here, no issue, but Im not confortable using
// that for now
warn!("Cannot get tweet media for {}: {}", &media.url, e);
return;
}
};
// we cannot directly do all the stuff inside here because mastodon lock can
// live outside this
let mas_result = mastodon.lock().unwrap().media(Cow::from(String::from(&local_tweet_media_path)));
match mas_result {
Ok(m) => {
remove_file(&local_tweet_media_path).await.unwrap_or_else(|e|
warn!("Attachment {} has been uploaded but Im unable to remove the existing file: {}", &local_tweet_media_path, e)
);
// we can unwrap here because were in a thread
tx.send(ScootalooSpawnResponse {
mastodon_media_id: m.id.clone(),
twitter_media_url: local_tweet_media_path.clone()
}).await.unwrap();
},
Err(e) => {
error!("Attachment {} cannot be uploaded to Mastodon Instance: {}", &local_tweet_media_path, e);
}
}
};
});
}
status_medias.push(mastodon_media_ids);
// dropping the last tx otherwise recv() will wait indefinitely
drop(tx);
// last step, removing the reference to the media from with the toots text
status_text = status_text.replace(&media.url, "");
while let Some(i) = rx.recv().await {
// pushes the media into the media vec
status_medias.push(i.mastodon_media_id);
// removes the URL from the original Tweet text
status_text = status_text.replace(&i.twitter_media_url, "");
}
}
// finished reuploading attachments, now lets do the toot baby!
debug!("Building corresponding Mastodon status");
let status = StatusBuilder::new()
.status(&status_text)
.media_ids(status_medias)
@@ -397,7 +444,8 @@ pub fn run(config: Config) {
.expect(format!("Cannot build status with text {}", &status_text).as_str());
// publish status
mastodon.new_status(status).unwrap();
// again unwrap is safe here as we are in the main thread
mastodon.lock().unwrap().new_status(status).unwrap();
// this will panic if it cannot publish the status, which is a good thing, it allows the
// last_tweet gathered not to be written