10 Commits

Author SHA1 Message Date
VC
1fdea7f69d Merge branch 'parallel_medias' into 'main'
feat: async upload of medias

See merge request veretcle/oolatoocs!3
2023-11-16 08:34:56 +00:00
VC
b73d6340c9 feat: async upload of medias 2023-11-15 15:20:03 +01:00
VC
00ba8bda42 Merge branch 'feat_other_medias' into 'main'
feat: separate metadata create + modify upload media for simple media only

See merge request veretcle/oolatoocs!2
2023-11-11 12:16:32 +00:00
VC
6d208f3de3 tamerelol 2023-11-11 12:59:57 +01:00
VC
b0c9485c82 Merge branch 'main' into 'feat_other_medias'
# Conflicts:
#   Cargo.lock
#   Cargo.toml
#   src/lib.rs
#   src/twitter.rs
2023-11-11 11:40:57 +00:00
VC
af7156786b chore: bump version to v1.0.0 2023-11-11 12:16:57 +01:00
VC
b9179d8cce feat: threads 2023-11-11 12:16:18 +01:00
VC
eba13ba095 feat: add video/gif medias 2023-11-10 19:29:29 +01:00
VC
f4c0504c28 Merge branch 'feat_medias' into 'main'
feat: attach medias to tweets

See merge request veretcle/oolatoocs!1
2023-11-09 17:52:56 +00:00
VC
dc9ed538f4 feat: attach medias to tweets 2023-11-09 18:44:31 +01:00
6 changed files with 494 additions and 38 deletions

57
Cargo.lock generated
View File

@@ -444,19 +444,46 @@ dependencies = [
]
[[package]]
name = "futures-channel"
version = "0.3.28"
name = "futures"
version = "0.3.29"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "955518d47e09b25bbebc7a18df10b81f0c766eaf4c4f1cccef2fca5f2a4fb5f2"
checksum = "da0290714b38af9b4a7b094b8a37086d1b4e61f2df9122c3cad2577669145335"
dependencies = [
"futures-channel",
"futures-core",
"futures-executor",
"futures-io",
"futures-sink",
"futures-task",
"futures-util",
]
[[package]]
name = "futures-channel"
version = "0.3.29"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ff4dd66668b557604244583e3e1e1eada8c5c2e96a6d0d6653ede395b78bbacb"
dependencies = [
"futures-core",
"futures-sink",
]
[[package]]
name = "futures-core"
version = "0.3.28"
version = "0.3.29"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4bca583b7e26f571124fe5b7561d49cb2868d79116cfa0eefce955557c6fee8c"
checksum = "eb1d22c66e66d9d72e1758f0bd7d4fd0bee04cad842ee34587d68c07e45d088c"
[[package]]
name = "futures-executor"
version = "0.3.29"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0f4fb8693db0cf099eadcca0efe2a5a22e4550f98ed16aba6c48700da29597bc"
dependencies = [
"futures-core",
"futures-task",
"futures-util",
]
[[package]]
name = "futures-io"
@@ -466,9 +493,9 @@ checksum = "8bf34a163b5c4c52d0478a4d757da8fb65cabef42ba90515efee0f6f9fa45aaa"
[[package]]
name = "futures-macro"
version = "0.3.28"
version = "0.3.29"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "89ca545a94061b6365f2c7355b4b32bd20df3ff95f02da9329b34ccc3bd6ee72"
checksum = "53b153fd91e4b0147f4aced87be237c98248656bb01050b96bf3ee89220a8ddb"
dependencies = [
"proc-macro2",
"quote 1.0.33",
@@ -477,22 +504,23 @@ dependencies = [
[[package]]
name = "futures-sink"
version = "0.3.28"
version = "0.3.29"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f43be4fe21a13b9781a69afa4985b0f6ee0e1afab2c6f454a8cf30e2b2237b6e"
checksum = "e36d3378ee38c2a36ad710c5d30c2911d752cb941c00c72dbabfb786a7970817"
[[package]]
name = "futures-task"
version = "0.3.28"
version = "0.3.29"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "76d3d132be6c0e6aa1534069c705a74a5997a356c0dc2f86a47765e5617c5b65"
checksum = "efd193069b0ddadc69c46389b740bbccdd97203899b48d09c5f7969591d6bae2"
[[package]]
name = "futures-util"
version = "0.3.28"
version = "0.3.29"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "26b01e40b772d54cf6c6d721c1d1abd0647a0106a12ecaa1c186273392a69533"
checksum = "a19526d624e703a3179b3d322efec918b6246ea0fa51d41124525f00f1cc8104"
dependencies = [
"futures-channel",
"futures-core",
"futures-io",
"futures-macro",
@@ -1058,11 +1086,12 @@ checksum = "dd8b5dd2ae5ed71462c540258bedcb51965123ad7e7ccf4b9a8cafaa4a63576d"
[[package]]
name = "oolatoocs"
version = "0.1.0"
version = "1.1.0"
dependencies = [
"clap",
"dissolve",
"env_logger",
"futures",
"log 0.4.20",
"megalodon",
"oauth1-request",

View File

@@ -1,6 +1,6 @@
[package]
name = "oolatoocs"
version = "0.1.0"
version = "1.1.0"
edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
@@ -9,14 +9,15 @@ edition = "2021"
clap = "^4"
dissolve = "0.2.2"
env_logger = "^0.10"
futures = "0.3.29"
log = "^0.4"
megalodon = "^0.11"
oauth1-request = "^0.6"
regex = "1.10.2"
reqwest = { version = "0.11.22", features = ["json"] }
reqwest = { version = "0.11.22", features = ["json", "stream", "multipart"] }
rusqlite = "^0.27"
serde = { version = "^1.0", features = ["derive"] }
tokio = { version = "^1.33", features = ["rt-multi-thread", "macros"] }
tokio = { version = "^1.33", features = ["rt-multi-thread", "macros", "time"] }
toml = "^0.8"
[profile.release]

View File

@@ -8,7 +8,7 @@ pub struct Config {
pub twitter: TwitterConfig,
}
#[derive(Debug, Deserialize)]
#[derive(Debug, Deserialize, Clone)]
pub struct TwitterConfig {
pub consumer_key: String,
pub consumer_secret: String,

25
src/error.rs Normal file
View File

@@ -0,0 +1,25 @@
use std::{
error::Error,
fmt::{Display, Formatter, Result},
};
#[derive(Debug)]
pub struct OolatoocsError {
details: String,
}
impl OolatoocsError {
pub fn new(msg: &str) -> OolatoocsError {
OolatoocsError {
details: msg.to_string(),
}
}
}
impl Error for OolatoocsError {}
impl Display for OolatoocsError {
fn fmt(&self, f: &mut Formatter) -> Result {
write!(f, "{}", self.details)
}
}

View File

@@ -1,3 +1,6 @@
mod error;
pub use error::OolatoocsError;
mod config;
pub use config::{parse_toml, Config};
@@ -15,9 +18,13 @@ use utils::strip_everything;
mod twitter;
#[allow(unused_imports)]
use twitter::{post_tweet, upload_media};
use twitter::{post_tweet, upload_chunk_media, upload_simple_media};
use futures::{stream, StreamExt};
use log::{error, warn};
use megalodon::entities::attachment::AttachmentType;
use rusqlite::Connection;
use std::error::Error;
#[tokio::main]
pub async fn run(config: &Config) {
@@ -36,12 +43,61 @@ pub async fn run(config: &Config) {
let Ok(tweet_content) = strip_everything(&toot.content, &toot.tags) else {
continue; // skip in case we cant strip something
};
let mut medias: Vec<u64> = vec![];
// if we wanted to cut toot in half, now would be the right time to do so
// treating medias (nothing for now)
let tweet_id = post_tweet(&config.twitter, &tweet_content, &[])
let media_attachments = toot.media_attachments.clone();
let mut stream = stream::iter(media_attachments)
.map(|media| {
let twitter_config = config.twitter.clone();
tokio::task::spawn(async move {
match media.r#type {
AttachmentType::Image => {
upload_simple_media(&twitter_config, &media.url, &media.description)
.await
}
AttachmentType::Gifv => {
upload_chunk_media(&twitter_config, &media.url, "tweet_gif").await
}
AttachmentType::Video => {
upload_chunk_media(&twitter_config, &media.url, "tweet_video").await
}
_ => Err::<u64, Box<dyn Error + Send + Sync>>(
OolatoocsError::new(&format!(
"Cannot treat this type of media: {}",
&media.url
))
.into(),
),
}
})
})
.buffered(4);
while let Some(result) = stream.next().await {
match result {
Ok(Ok(v)) => medias.push(v),
Ok(Err(e)) => warn!("Cannot treat media: {}", e),
Err(e) => error!("Something went wrong when joining the main thread: {}", e),
}
}
// threads if necessary
let reply_to = toot.in_reply_to_id.and_then(|t| {
read_state(&conn, Some(t.parse::<u64>().unwrap()))
.ok()
.flatten()
.map(|s| s.tweet_id)
});
// posts corresponding tweet
let tweet_id = post_tweet(&config.twitter, &tweet_content, &medias, &reply_to)
.await
.unwrap_or_else(|e| panic!("Cannot Tweet {}: {}", toot.id, e));
// writes the current state of the tweet
write_state(
&conn,
TweetToToot {

View File

@@ -1,26 +1,92 @@
use crate::config::TwitterConfig;
use crate::error::OolatoocsError;
use log::debug;
use oauth1_request::Token;
use reqwest::Client;
use reqwest::{
multipart::{Form, Part},
Body, Client,
};
use serde::{Deserialize, Serialize};
use std::error::Error;
use std::{error::Error, ops::Not};
use tokio::time::{sleep, Duration};
/// I dont know, dont ask me
const TWITTER_API_TWEET_URL: &str = "https://api.twitter.com/2/tweets";
const TWITTER_UPLOAD_MEDIA_URL: &str = "https://upload.twitter.com/1.1/media/upload.json";
const TWITTER_METADATA_MEDIA_URL: &str =
"https://upload.twitter.com/1.1/media/metadata/create.json";
// I dont know, dont ask me
#[derive(oauth1_request::Request)]
struct EmptyRequest {}
#[derive(Serialize, Debug)]
pub struct Tweet {
pub text: String,
struct Tweet {
text: String,
#[serde(skip_serializing_if = "Option::is_none")]
media: Option<TweetMediasIds>,
#[serde(skip_serializing_if = "Option::is_none")]
reply: Option<TweetReply>,
}
#[derive(Serialize, Debug)]
struct TweetMediasIds {
media_ids: Vec<String>,
}
#[derive(Serialize, Debug)]
struct TweetReply {
in_reply_to_tweet_id: String,
}
#[derive(Deserialize, Debug)]
pub struct TweetResponse {
pub data: TweetResponseData,
struct TweetResponse {
data: TweetResponseData,
}
#[derive(Deserialize, Debug)]
pub struct TweetResponseData {
pub id: String,
struct TweetResponseData {
id: String,
}
#[derive(Deserialize, Debug)]
struct UploadMediaResponse {
media_id: u64,
processing_info: Option<UploadMediaResponseProcessingInfo>,
}
#[derive(Deserialize, Debug)]
struct UploadMediaResponseProcessingInfo {
state: UploadMediaResponseProcessingInfoState,
check_after_secs: Option<u64>,
}
#[derive(Deserialize, Debug)]
enum UploadMediaResponseProcessingInfoState {
#[serde(rename = "failed")]
Failed,
#[serde(rename = "succeeded")]
Succeeded,
#[serde(rename = "pending")]
Pending,
#[serde(rename = "in_progress")]
InProgress,
}
#[derive(Serialize, Debug)]
struct MediaMetadata {
media_id: u64,
alt_text: MediaMetadataAltText,
}
#[derive(Serialize, Debug)]
struct MediaMetadataAltText {
text: String,
}
#[derive(Serialize, Debug, oauth1_request::Request)]
struct UploadMediaCommand {
command: String,
media_id: String,
}
/// This function returns the OAuth1 Token object from TwitterConfig
@@ -33,32 +99,311 @@ fn get_token(config: &TwitterConfig) -> Token {
)
}
/// This function uploads media from Mastodon to Twitter and returns the media id from Twitter
#[allow(dead_code)]
pub async fn upload_media(_u: &str) -> Result<u64, Box<dyn Error>> {
Ok(0)
/// This function uploads simple images from Mastodon to Twitter and returns the media id from Twitter
pub async fn upload_simple_media(
config: &TwitterConfig,
u: &str,
d: &Option<String>,
) -> Result<u64, Box<dyn Error + Send + Sync>> {
// initiate request parameters
let empty_request = EmptyRequest {}; // Why? Because fuck you, thats why!
let token = get_token(config);
// retrieve the length and bytes stream from the given URL
let dl = reqwest::get(u).await?;
let content_length = dl
.content_length()
.ok_or(format!("Cannot get content length for {}", u))?;
let stream = dl.bytes_stream();
debug!("Ref download URL: {}", u);
// upload the media
let client = Client::new();
let res = client
.post(TWITTER_UPLOAD_MEDIA_URL)
.header(
"Authorization",
oauth1_request::post(
TWITTER_UPLOAD_MEDIA_URL,
&empty_request,
&token,
oauth1_request::HMAC_SHA1,
),
)
.multipart(Form::new().part(
"media",
Part::stream_with_length(Body::wrap_stream(stream), content_length),
))
.send()
.await?
.json::<UploadMediaResponse>()
.await?;
debug!("Media ID: {}", res.media_id);
// update the metadata
if let Some(metadata) = d {
debug!("Metadata found! Processing…");
metadata_create(config, res.media_id, metadata).await?;
}
Ok(res.media_id)
}
/// This function updates the metadata given the current media_id and token
async fn metadata_create(
config: &TwitterConfig,
id: u64,
m: &str,
) -> Result<(), Box<dyn Error + Send + Sync>> {
let token = get_token(config);
let empty_request = EmptyRequest {};
let media_metadata = MediaMetadata {
media_id: id,
alt_text: MediaMetadataAltText {
text: m.to_string(),
},
};
debug!("Metadata to process: {}", m);
let client = Client::new();
let metadata = client
.post(TWITTER_METADATA_MEDIA_URL)
.header(
"Authorization",
oauth1_request::post(
TWITTER_METADATA_MEDIA_URL,
&empty_request,
&token,
oauth1_request::HMAC_SHA1,
),
)
.json(&media_metadata)
.send()
.await?;
debug!("Metadata processed with return code: {}", metadata.status());
Ok(())
}
/// This posts video/gif to Twitter and returns the media id from Twitter
pub async fn upload_chunk_media(
config: &TwitterConfig,
u: &str,
t: &str,
) -> Result<u64, Box<dyn Error + Send + Sync>> {
let empty_request = EmptyRequest {};
let token = get_token(config);
// retrieve the length, type and bytes stream from the given URL
let mut dl = reqwest::get(u).await?;
let content_length = dl
.content_length()
.ok_or(format!("Cannot get content length for {}", u))?;
let content_headers = dl.headers().clone();
let content_type = content_headers
.get("Content-Type")
.ok_or(format!("Cannot get content type for {}", u))?
.to_str()?;
debug!("Init the slot for uploading media: {}", u);
// init the slot for uploading
let client = Client::new();
let orig_media_id = client
.post(TWITTER_UPLOAD_MEDIA_URL)
.header(
"Authorization",
oauth1_request::post(
TWITTER_UPLOAD_MEDIA_URL,
&empty_request,
&token,
oauth1_request::HMAC_SHA1,
),
)
.multipart(
Form::new()
.text("command", "INIT")
.text("media_type", content_type.to_owned())
.text("total_bytes", content_length.to_string())
.text("media_category", t.to_string()),
)
.send()
.await?
.json::<UploadMediaResponse>()
.await?;
debug!("Slot initiated with ID: {}", orig_media_id.media_id);
debug!("Appending media to ID: {}", orig_media_id.media_id);
// append the media to the corresponding slot
let mut segment: u8 = 0;
while let Some(chunk) = dl.chunk().await? {
debug!(
"Appending segment {} for media ID {}",
segment, orig_media_id.media_id
);
let chunk_size: u64 = chunk.len().try_into().unwrap();
let res = client
.post(TWITTER_UPLOAD_MEDIA_URL)
.header(
"Authorization",
oauth1_request::post(
TWITTER_UPLOAD_MEDIA_URL,
&empty_request,
&token,
oauth1_request::HMAC_SHA1,
),
)
.multipart(
Form::new()
.text("command", "APPEND")
.text("media_id", orig_media_id.media_id.to_string())
.text("segment_index", segment.to_string())
.part("media", Part::stream_with_length(chunk, chunk_size)),
)
.send()
.await?;
if !res.status().is_success() {
return Err(
OolatoocsError::new(&format!("Cannot upload part {} of {}", segment, u)).into(),
);
}
segment += 1;
}
debug!("Finalize media ID: {}", orig_media_id.media_id);
// Finalizing task
let fin = client
.post(TWITTER_UPLOAD_MEDIA_URL)
.header(
"Authorization",
oauth1_request::post(
TWITTER_UPLOAD_MEDIA_URL,
&empty_request,
&token,
oauth1_request::HMAC_SHA1,
),
)
.multipart(
Form::new()
.text("command", "FINALIZE")
.text("media_id", orig_media_id.media_id.to_string()),
)
.send()
.await?
.json::<UploadMediaResponse>()
.await?;
if let Some(p_info) = fin.processing_info {
if let Some(wait_sec) = p_info.check_after_secs {
debug!(
"Processing is not finished yet for ID {}, waiting {} secs",
orig_media_id.media_id, wait_sec
);
// getting here, we have a status and a check_after_secs
// this status can be anything but we will check it afterwards
// whatever happens, we can wait here before proceeding
sleep(Duration::from_secs(wait_sec)).await;
let command = UploadMediaCommand {
command: "STATUS".to_string(),
media_id: orig_media_id.media_id.to_string(),
};
loop {
debug!(
"Checking on status for ID {} after waiting {} secs",
orig_media_id.media_id, wait_sec
);
let status = client
.get(TWITTER_UPLOAD_MEDIA_URL)
.header(
"Authorization",
oauth1_request::get(
TWITTER_UPLOAD_MEDIA_URL,
&command,
&token,
oauth1_request::HMAC_SHA1,
),
)
.query(&command)
.send()
.await?
.json::<UploadMediaResponse>()
.await?;
let p_status = status.processing_info.unwrap(); // shouldnt be None at this point
match p_status.state {
UploadMediaResponseProcessingInfoState::Failed => {
debug!("Processing has failed!");
return Err(OolatoocsError::new(&format!(
"Upload for {} (id: {}) has failed",
u, orig_media_id.media_id
))
.into());
}
UploadMediaResponseProcessingInfoState::Succeeded => {
debug!("Processing has succeeded, exiting loop!");
break;
}
UploadMediaResponseProcessingInfoState::Pending
| UploadMediaResponseProcessingInfoState::InProgress => {
debug!(
"Processing still pending, waiting {} secs more…",
p_status.check_after_secs.unwrap() // unwrap is safe here,
// check_after_secs is only present
// when status is pending or in
// progress
);
sleep(Duration::from_secs(p_status.check_after_secs.unwrap())).await;
continue;
}
}
}
}
}
Ok(orig_media_id.media_id)
}
/// This posts Tweets with all the associated medias
pub async fn post_tweet(
config: &TwitterConfig,
content: &str,
_medias: &[u64],
medias: &[u64],
reply_to: &Option<u64>,
) -> Result<u64, Box<dyn Error>> {
let uri = "https://api.twitter.com/2/tweets";
let empty_request = EmptyRequest {}; // Why? Because fuck you, thats why!
let token = get_token(config);
let tweet = Tweet {
text: content.to_string(),
media: medias.is_empty().not().then(|| TweetMediasIds {
media_ids: medias.iter().map(|m| m.to_string()).collect(),
}),
reply: reply_to.map(|s| TweetReply {
in_reply_to_tweet_id: s.to_string(),
}),
};
let client = Client::new();
let res = client
.post(uri)
.post(TWITTER_API_TWEET_URL)
.header(
"Authorization",
oauth1_request::post(uri, &empty_request, &token, oauth1_request::HMAC_SHA1),
oauth1_request::post(
TWITTER_API_TWEET_URL,
&empty_request,
&token,
oauth1_request::HMAC_SHA1,
),
)
.json(&tweet)
.send()