use crate::config::TwitterConfig; use crate::error::OolatoocsError; use chrono::Utc; use futures::{stream, StreamExt}; use log::{debug, error, warn}; use megalodon::entities::{ attachment::{Attachment, AttachmentType}, Poll, }; use oauth1_request::Token; use reqwest::{ multipart::{Form, Part}, Body, Client, }; use serde::{Deserialize, Serialize}; use std::{error::Error, ops::Not}; use tokio::time::{sleep, Duration}; const TWITTER_API_TWEET_URL: &str = "https://api.twitter.com/2/tweets"; const TWITTER_UPLOAD_MEDIA_URL: &str = "https://upload.twitter.com/1.1/media/upload.json"; const TWITTER_METADATA_MEDIA_URL: &str = "https://upload.twitter.com/1.1/media/metadata/create.json"; // I don’t know, don’t ask me #[derive(oauth1_request::Request)] struct EmptyRequest {} #[derive(Serialize, Debug)] struct Tweet { text: String, #[serde(skip_serializing_if = "Option::is_none")] media: Option, #[serde(skip_serializing_if = "Option::is_none")] reply: Option, #[serde(skip_serializing_if = "Option::is_none")] poll: Option, } #[derive(Serialize, Debug)] struct TweetMediasIds { media_ids: Vec, } #[derive(Serialize, Debug)] struct TweetReply { in_reply_to_tweet_id: String, } #[derive(Serialize, Debug)] pub struct TweetPoll { pub options: Vec, pub duration_minutes: u16, } #[derive(Deserialize, Debug)] struct TweetResponse { data: TweetResponseData, } #[derive(Deserialize, Debug)] struct TweetResponseData { id: String, } #[derive(Deserialize, Debug)] struct UploadMediaResponse { media_id: u64, processing_info: Option, } #[derive(Deserialize, Debug)] struct UploadMediaResponseProcessingInfo { state: UploadMediaResponseProcessingInfoState, check_after_secs: Option, } #[derive(Deserialize, Debug)] enum UploadMediaResponseProcessingInfoState { #[serde(rename = "failed")] Failed, #[serde(rename = "succeeded")] Succeeded, #[serde(rename = "pending")] Pending, #[serde(rename = "in_progress")] InProgress, } #[derive(Serialize, Debug)] struct MediaMetadata { media_id: u64, alt_text: MediaMetadataAltText, } #[derive(Serialize, Debug)] struct MediaMetadataAltText { text: String, } #[derive(Serialize, Debug, oauth1_request::Request)] struct UploadMediaCommand { command: String, media_id: String, } /// This function returns the OAuth1 Token object from TwitterConfig fn get_token(config: &TwitterConfig) -> Token { oauth1_request::Token::from_parts( config.consumer_key.to_string(), config.consumer_secret.to_string(), config.oauth_token.to_string(), config.oauth_token_secret.to_string(), ) } pub async fn generate_media_ids(config: &TwitterConfig, media_attach: &[Attachment]) -> Vec { let mut medias: Vec = vec![]; let media_attachments = media_attach.to_owned(); let mut stream = stream::iter(media_attachments) .map(|media| { let twitter_config = config.clone(); tokio::task::spawn(async move { match media.r#type { AttachmentType::Image => { upload_simple_media(&twitter_config, &media.url, &media.description).await } AttachmentType::Gifv => { upload_chunk_media(&twitter_config, &media.url, "tweet_gif").await } AttachmentType::Video => { upload_chunk_media(&twitter_config, &media.url, "tweet_video").await } _ => Err::>( OolatoocsError::new(&format!( "Cannot treat this type of media: {}", &media.url )) .into(), ), } }) }) .buffered(4); while let Some(result) = stream.next().await { match result { Ok(Ok(v)) => medias.push(v), Ok(Err(e)) => warn!("Cannot treat media: {}", e), Err(e) => error!("Something went wrong when joining the main thread: {}", e), } } medias } /// This function uploads simple images from Mastodon to Twitter and returns the media id from Twitter async fn upload_simple_media( config: &TwitterConfig, u: &str, d: &Option, ) -> Result> { // initiate request parameters let empty_request = EmptyRequest {}; // Why? Because fuck you, that’s why! let token = get_token(config); // retrieve the length and bytes stream from the given URL let dl = reqwest::get(u).await?; let content_length = dl .content_length() .ok_or(format!("Cannot get content length for {}", u))?; let stream = dl.bytes_stream(); debug!("Ref download URL: {}", u); // upload the media let client = Client::new(); let res = client .post(TWITTER_UPLOAD_MEDIA_URL) .header( "Authorization", oauth1_request::post( TWITTER_UPLOAD_MEDIA_URL, &empty_request, &token, oauth1_request::HMAC_SHA1, ), ) .multipart(Form::new().part( "media", Part::stream_with_length(Body::wrap_stream(stream), content_length), )) .send() .await? .json::() .await?; debug!("Media ID: {}", res.media_id); // update the metadata if let Some(metadata) = d { debug!("Metadata found! Processing…"); metadata_create(config, res.media_id, metadata).await?; } Ok(res.media_id) } /// This function updates the metadata given the current media_id and token async fn metadata_create( config: &TwitterConfig, id: u64, m: &str, ) -> Result<(), Box> { let token = get_token(config); let empty_request = EmptyRequest {}; let media_metadata = MediaMetadata { media_id: id, alt_text: MediaMetadataAltText { text: m.to_string(), }, }; debug!("Metadata to process: {}", m); let client = Client::new(); let metadata = client .post(TWITTER_METADATA_MEDIA_URL) .header( "Authorization", oauth1_request::post( TWITTER_METADATA_MEDIA_URL, &empty_request, &token, oauth1_request::HMAC_SHA1, ), ) .json(&media_metadata) .send() .await?; debug!("Metadata processed with return code: {}", metadata.status()); Ok(()) } /// This posts video/gif to Twitter and returns the media id from Twitter async fn upload_chunk_media( config: &TwitterConfig, u: &str, t: &str, ) -> Result> { let empty_request = EmptyRequest {}; let token = get_token(config); // retrieve the length, type and bytes stream from the given URL let mut dl = reqwest::get(u).await?; let content_length = dl .content_length() .ok_or(format!("Cannot get content length for {}", u))?; let content_headers = dl.headers().clone(); let content_type = content_headers .get("Content-Type") .ok_or(format!("Cannot get content type for {}", u))? .to_str()?; debug!("Init the slot for uploading media: {}", u); // init the slot for uploading let client = Client::new(); let orig_media_id = client .post(TWITTER_UPLOAD_MEDIA_URL) .header( "Authorization", oauth1_request::post( TWITTER_UPLOAD_MEDIA_URL, &empty_request, &token, oauth1_request::HMAC_SHA1, ), ) .multipart( Form::new() .text("command", "INIT") .text("media_type", content_type.to_owned()) .text("total_bytes", content_length.to_string()) .text("media_category", t.to_string()), ) .send() .await? .json::() .await?; debug!("Slot initiated with ID: {}", orig_media_id.media_id); debug!("Appending media to ID: {}", orig_media_id.media_id); // append the media to the corresponding slot let mut segment: u8 = 0; while let Some(chunk) = dl.chunk().await? { debug!( "Appending segment {} for media ID {}", segment, orig_media_id.media_id ); let chunk_size: u64 = chunk.len().try_into().unwrap(); let res = client .post(TWITTER_UPLOAD_MEDIA_URL) .header( "Authorization", oauth1_request::post( TWITTER_UPLOAD_MEDIA_URL, &empty_request, &token, oauth1_request::HMAC_SHA1, ), ) .multipart( Form::new() .text("command", "APPEND") .text("media_id", orig_media_id.media_id.to_string()) .text("segment_index", segment.to_string()) .part("media", Part::stream_with_length(chunk, chunk_size)), ) .send() .await?; if !res.status().is_success() { return Err( OolatoocsError::new(&format!("Cannot upload part {} of {}", segment, u)).into(), ); } segment += 1; } debug!("Finalize media ID: {}", orig_media_id.media_id); // Finalizing task let fin = client .post(TWITTER_UPLOAD_MEDIA_URL) .header( "Authorization", oauth1_request::post( TWITTER_UPLOAD_MEDIA_URL, &empty_request, &token, oauth1_request::HMAC_SHA1, ), ) .multipart( Form::new() .text("command", "FINALIZE") .text("media_id", orig_media_id.media_id.to_string()), ) .send() .await? .json::() .await?; if let Some(p_info) = fin.processing_info { if let Some(wait_sec) = p_info.check_after_secs { debug!( "Processing is not finished yet for ID {}, waiting {} secs", orig_media_id.media_id, wait_sec ); // getting here, we have a status and a check_after_secs // this status can be anything but we will check it afterwards // whatever happens, we can wait here before proceeding sleep(Duration::from_secs(wait_sec)).await; let command = UploadMediaCommand { command: "STATUS".to_string(), media_id: orig_media_id.media_id.to_string(), }; loop { debug!( "Checking on status for ID {} after waiting {} secs", orig_media_id.media_id, wait_sec ); let status = client .get(TWITTER_UPLOAD_MEDIA_URL) .header( "Authorization", oauth1_request::get( TWITTER_UPLOAD_MEDIA_URL, &command, &token, oauth1_request::HMAC_SHA1, ), ) .query(&command) .send() .await? .json::() .await?; let p_status = status.processing_info.unwrap(); // shouldn’t be None at this point match p_status.state { UploadMediaResponseProcessingInfoState::Failed => { debug!("Processing has failed!"); return Err(OolatoocsError::new(&format!( "Upload for {} (id: {}) has failed", u, orig_media_id.media_id )) .into()); } UploadMediaResponseProcessingInfoState::Succeeded => { debug!("Processing has succeeded, exiting loop!"); break; } UploadMediaResponseProcessingInfoState::Pending | UploadMediaResponseProcessingInfoState::InProgress => { debug!( "Processing still pending, waiting {} secs more…", p_status.check_after_secs.unwrap() // unwrap is safe here, // check_after_secs is only present // when status is pending or in // progress ); sleep(Duration::from_secs(p_status.check_after_secs.unwrap())).await; continue; } } } } } Ok(orig_media_id.media_id) } pub fn transform_poll(p: &Poll) -> TweetPoll { let poll_end_datetime = p.expires_at.unwrap(); // should be safe at this point let now = Utc::now(); let diff = poll_end_datetime.signed_duration_since(now); TweetPoll { options: p.options.iter().map(|i| i.title.clone()).collect(), duration_minutes: diff.num_minutes().try_into().unwrap(), // safe here, number is positive // and can’t be over 21600 } } /// This posts Tweets with all the associated medias pub async fn post_tweet( config: &TwitterConfig, content: String, medias: Vec, reply_to: Option, poll: Option, ) -> Result> { let empty_request = EmptyRequest {}; // Why? Because fuck you, that’s why! let token = get_token(config); let tweet = Tweet { text: content, media: medias.is_empty().not().then(|| TweetMediasIds { media_ids: medias.iter().map(|m| m.to_string()).collect(), }), reply: reply_to.map(|s| TweetReply { in_reply_to_tweet_id: s.to_string(), }), poll, }; let client = Client::new(); let res = client .post(TWITTER_API_TWEET_URL) .header( "Authorization", oauth1_request::post( TWITTER_API_TWEET_URL, &empty_request, &token, oauth1_request::HMAC_SHA1, ), ) .json(&tweet) .send() .await? .json::() .await?; Ok(res.data.id.parse::().unwrap()) }