💥: now incompatible with Twitter

This commit is contained in:
VC
2025-01-13 17:11:19 +01:00
parent e99a666b18
commit 9da43beb34
7 changed files with 688 additions and 1058 deletions

View File

@@ -5,18 +5,9 @@ use std::fs::read_to_string;
pub struct Config {
pub oolatoocs: OolatoocsConfig,
pub mastodon: MastodonConfig,
pub twitter: TwitterConfig,
pub bluesky: BlueskyConfig,
}
#[derive(Debug, Deserialize, Clone)]
pub struct TwitterConfig {
pub consumer_key: String,
pub consumer_secret: String,
pub oauth_token: String,
pub oauth_token_secret: String,
}
#[derive(Debug, Deserialize)]
pub struct OolatoocsConfig {
pub db_path: String,

View File

@@ -7,7 +7,7 @@ mod config;
pub use config::{parse_toml, Config};
mod state;
use state::{delete_state, read_all_state, read_state, write_state, TootTweetRecord};
use state::{delete_state, read_all_state, read_state, write_state, TootRecord};
pub use state::{init_db, migrate_db};
mod mastodon;
@@ -17,9 +17,6 @@ use mastodon::{get_mastodon_instance, get_mastodon_timeline_since, get_status_ed
mod utils;
use utils::{generate_multi_tweets, strip_everything};
mod twitter;
use twitter::{delete_tweet, generate_media_ids, post_tweet, transform_poll};
mod bsky;
use bsky::{build_post_record, generate_media_records, get_session, BskyReply};
@@ -48,20 +45,13 @@ pub async fn run(config: &Config) {
// a date has been found
if d > t.datetime.unwrap() {
debug!("Last toot date is posterior to the previously written tweet, deleting…");
let (local_tweet_ids, local_record_uris) = read_all_state(&conn, t.toot_id)
.unwrap_or_else(|e| {
let local_record_uris =
read_all_state(&conn, t.toot_id).unwrap_or_else(|e| {
panic!(
"Cannot fetch all tweets associated with Toot ID {}: {}",
"Cannot fetch all records associated with Toot ID {}: {}",
t.toot_id, e
)
});
for local_tweet_id in local_tweet_ids.into_iter() {
delete_tweet(&config.twitter, local_tweet_id)
.await
.unwrap_or_else(|e| {
panic!("Cannot delete Tweet ID ({}): {}", t.tweet_id, e)
});
}
for local_record_uri in local_record_uris.into_iter() {
bluesky
.delete_record(&local_record_uri)
@@ -100,38 +90,20 @@ pub async fn run(config: &Config) {
};
// threads if necessary
let (mut tweet_reply_to, mut record_reply_to) = toot
.in_reply_to_id
.and_then(|t| {
read_state(&conn, Some(t.parse::<u64>().unwrap()))
.ok()
.flatten()
.map(|s| {
(
s.tweet_id,
BskyReply {
record_uri: s.record_uri.to_owned(),
root_record_uri: s.root_record_uri.to_owned(),
},
)
})
})
.unzip();
let mut record_reply_to = toot.in_reply_to_id.and_then(|t| {
read_state(&conn, Some(t.parse::<u64>().unwrap()))
.ok()
.flatten()
.map(|s| BskyReply {
record_uri: s.record_uri.to_owned(),
root_record_uri: s.root_record_uri.to_owned(),
})
});
// if the toot is too long, we cut it in half here
if let Some((first_half, second_half)) = generate_multi_tweets(&tweet_content) {
tweet_content = second_half;
// post the first half
let tweet_reply_id =
post_tweet(&config.twitter, &first_half, vec![], tweet_reply_to, None)
.await
.unwrap_or_else(|e| {
panic!(
"Cannot post the first half of {} for Twitter: {}",
&toot.id, e
)
});
let record = build_post_record(
&config.bluesky,
&first_half,
@@ -150,9 +122,8 @@ pub async fn run(config: &Config) {
// write it to db
write_state(
&conn,
TootTweetRecord {
TootRecord {
toot_id: toot.id.parse::<u64>().unwrap(),
tweet_id: tweet_reply_id,
record_uri: record_reply_id.data.uri.to_owned(),
root_record_uri: record_reply_to
.as_ref()
@@ -164,8 +135,8 @@ pub async fn run(config: &Config) {
)
.unwrap_or_else(|e| {
panic!(
"Cannot store Toot/Tweet/Record ({}/{}/{}): {}",
&toot.id, tweet_reply_id, &record_reply_id.data.uri, e
"Cannot store Toot/Tweet/Record ({}/{}): {}",
&toot.id, &record_reply_id.data.uri, e
)
});
@@ -177,28 +148,12 @@ pub async fn run(config: &Config) {
v.root_record_uri.clone()
}),
});
tweet_reply_to = Some(tweet_reply_id);
};
// treats poll if any
let in_poll = toot.poll.map(|p| transform_poll(&p));
// treats medias
let record_medias = generate_media_records(&bluesky, &toot.media_attachments).await;
let tweet_medias = generate_media_ids(&config.twitter, &toot.media_attachments).await;
// posts corresponding tweet
let tweet_id = post_tweet(
&config.twitter,
&tweet_content,
tweet_medias,
tweet_reply_to,
in_poll,
)
.await
.unwrap_or_else(|e| panic!("Cannot Tweet {}: {}", toot.id, e));
let record = build_post_record(
&config.bluesky,
&tweet_content,
@@ -217,9 +172,8 @@ pub async fn run(config: &Config) {
// writes the current state of the tweet
write_state(
&conn,
TootTweetRecord {
TootRecord {
toot_id: toot.id.parse::<u64>().unwrap(),
tweet_id,
record_uri: created_record.data.uri.clone(),
root_record_uri: record_reply_to
.as_ref()
@@ -229,6 +183,6 @@ pub async fn run(config: &Config) {
datetime: None,
},
)
.unwrap_or_else(|e| panic!("Cannot store Toot/Tweet ({}/{}): {}", &toot.id, tweet_id, e));
.unwrap_or_else(|e| panic!("Cannot store Toot/Tweet ({}): {}", &toot.id, e));
}
}

View File

@@ -5,11 +5,9 @@ use std::error::Error;
/// Struct for each query line
#[derive(Debug)]
pub struct TootTweetRecord {
pub struct TootRecord {
// Mastodon part
pub toot_id: u64,
// Twitter part
pub tweet_id: u64,
// Bluesky part
pub record_uri: String,
pub root_record_uri: String,
@@ -20,44 +18,36 @@ pub struct TootTweetRecord {
pub fn delete_state(conn: &Connection, toot_id: u64) -> Result<(), Box<dyn Error>> {
debug!("Deleting Toot ID {}", toot_id);
conn.execute(
&format!("DELETE FROM toot_tweet_record WHERE toot_id = {}", toot_id),
&format!("DELETE FROM toot_record WHERE toot_id = {}", toot_id),
[],
)?;
Ok(())
}
/// Retrieves all tweets associated to a toot in the form of a vector
pub fn read_all_state(
conn: &Connection,
toot_id: u64,
) -> Result<(Vec<u64>, Vec<String>), Box<dyn Error>> {
pub fn read_all_state(conn: &Connection, toot_id: u64) -> Result<Vec<String>, Box<dyn Error>> {
let query = format!(
"SELECT tweet_id, record_uri FROM toot_tweet_record WHERE toot_id = {};",
"SELECT record_uri FROM toot_record WHERE toot_id = {};",
toot_id
);
let mut stmt = conn.prepare(&query)?;
let mut rows = stmt.query([])?;
let mut tweet_v: Vec<u64> = Vec::new();
let mut record_v: Vec<String> = Vec::new();
while let Some(row) = rows.next()? {
tweet_v.push(row.get(0)?);
record_v.push(row.get(1)?);
record_v.push(row.get(0)?);
}
Ok((tweet_v, record_v))
Ok(record_v)
}
/// if None is passed, read the last tweet from DB
/// if a tweet_id is passed, read this particular tweet from DB
pub fn read_state(
conn: &Connection,
s: Option<u64>,
) -> Result<Option<TootTweetRecord>, Box<dyn Error>> {
pub fn read_state(conn: &Connection, s: Option<u64>) -> Result<Option<TootRecord>, Box<dyn Error>> {
debug!("Reading toot_id {:?}", s);
let begin_query = "SELECT *, UNIXEPOCH(datetime) AS unix_datetime FROM toot_tweet_record";
let begin_query = "SELECT *, UNIXEPOCH(datetime) AS unix_datetime FROM toot_record";
let query: String = match s {
Some(i) => format!("{begin_query} WHERE toot_id = {i} ORDER BY tweet_id DESC LIMIT 1"),
Some(i) => format!("{begin_query} WHERE toot_id = {i} ORDER BY record_uri DESC LIMIT 1"),
None => format!("{begin_query} ORDER BY toot_id DESC LIMIT 1"),
};
@@ -65,9 +55,8 @@ pub fn read_state(
let t = stmt
.query_row([], |row| {
Ok(TootTweetRecord {
Ok(TootRecord {
toot_id: row.get("toot_id")?,
tweet_id: row.get("tweet_id")?,
record_uri: row.get("record_uri")?,
root_record_uri: row.get("root_record_uri")?,
datetime: Some(
@@ -81,11 +70,11 @@ pub fn read_state(
}
/// Writes last treated tweet id and toot id to the db
pub fn write_state(conn: &Connection, t: TootTweetRecord) -> Result<(), Box<dyn Error>> {
pub fn write_state(conn: &Connection, t: TootRecord) -> Result<(), Box<dyn Error>> {
debug!("Write struct {:?}", t);
conn.execute(
"INSERT INTO toot_tweet_record (toot_id, tweet_id, record_uri, root_record_uri) VALUES (?1, ?2, ?3, ?4)",
params![t.toot_id, t.tweet_id, t.record_uri, t.root_record_uri],
"INSERT INTO toot_record (toot_id, record_uri, root_record_uri) VALUES (?1, ?2, ?3)",
params![t.toot_id, t.record_uri, t.root_record_uri],
)?;
Ok(())
@@ -100,10 +89,9 @@ pub fn init_db(d: &str) -> Result<(), Box<dyn Error>> {
let conn = Connection::open(d)?;
conn.execute(
"CREATE TABLE IF NOT EXISTS toot_tweet_record (
"CREATE TABLE IF NOT EXISTS toot_record (
toot_id INTEGER,
tweet_id INTEGER PRIMARY KEY,
record_uri VARCHAR(128) DEFAULT '',
record_uri VARCHAR(128) PRIMARY KEY,
root_record_uri VARCHAR(128) DEFAULT '',
datetime INTEGER DEFAULT CURRENT_TIMESTAMP
)",
@@ -113,20 +101,20 @@ pub fn init_db(d: &str) -> Result<(), Box<dyn Error>> {
Ok(())
}
/// Migrate DB from 1.6+ to 3+
/// Migrate DB from 3+ to 4+
pub fn migrate_db(d: &str) -> Result<(), Box<dyn Error>> {
debug!("Migration DB for Oolatoocs");
let conn = Connection::open(d)?;
let res = conn.execute("SELECT datetime FROM toot_tweet_record;", []);
let res = conn.execute("SELECT datetime FROM toot_record;", []);
// If the column can be selected then, its OK
// if not, see if the error is a missing column and add it
match res {
Err(e) => match e.to_string().as_str() {
"no such table: toot_tweet_record" => migrate_db_alter_table(&conn), // table does not exist
"Execute returned results - did you mean to call query?" => Ok(()), // return results,
"no such table: toot_record" => migrate_db_alter_table(&conn), // table does not exist
"Execute returned results - did you mean to call query?" => Ok(()), // return results,
// column does
// exist
_ => Err(e.into()),
@@ -139,10 +127,9 @@ pub fn migrate_db(d: &str) -> Result<(), Box<dyn Error>> {
fn migrate_db_alter_table(c: &Connection) -> Result<(), Box<dyn Error>> {
// create the new table
c.execute(
"CREATE TABLE IF NOT EXISTS toot_tweet_record (
"CREATE TABLE IF NOT EXISTS toot_record (
toot_id INTEGER,
tweet_id INTEGER PRIMARY KEY,
record_uri VARCHAR(128) DEFAULT '',
record_uri VARCHAR(128) PRIMARY KEY,
root_record_uri VARCHAR(128) DEFAULT '',
datetime INTEGER DEFAULT CURRENT_TIMESTAMP
)",
@@ -151,13 +138,14 @@ fn migrate_db_alter_table(c: &Connection) -> Result<(), Box<dyn Error>> {
// copy data from the old table
c.execute(
"INSERT INTO toot_tweet_record (toot_id, tweet_id, datetime)
SELECT toot_id, tweet_id, datetime FROM tweet_to_toot;",
"INSERT INTO toot_record (toot_id, record_uri, root_record_uri, datetime)
SELECT toot_id, record_uri, root_record_uri, datetime FROM toot_tweet_record
WHERE record_uri != '';",
[],
)?;
// drop the old table
c.execute("DROP TABLE IF EXISTS tweet_to_toot;", [])?;
c.execute("DROP TABLE IF EXISTS toot_tweet_record;", [])?;
Ok(())
}
@@ -178,8 +166,7 @@ mod tests {
// open said file
let conn = Connection::open(d).unwrap();
conn.execute("SELECT * from toot_tweet_record;", [])
.unwrap();
conn.execute("SELECT * from toot_record;", []).unwrap();
remove_file(d).unwrap();
}
@@ -194,9 +181,9 @@ mod tests {
let conn = Connection::open(d).unwrap();
conn.execute(
"INSERT INTO toot_tweet_record (tweet_id, toot_id)
"INSERT INTO toot_record (record_uri, toot_id)
VALUES
(100, 1001);",
('a', 1001);",
[],
)
.unwrap();
@@ -214,9 +201,8 @@ mod tests {
let conn = Connection::open(d).unwrap();
let t_in = TootTweetRecord {
let t_in = TootRecord {
toot_id: 987654321,
tweet_id: 123456789,
record_uri: "a".to_string(),
root_record_uri: "c".to_string(),
datetime: None,
@@ -225,14 +211,13 @@ mod tests {
write_state(&conn, t_in).unwrap();
let mut stmt = conn
.prepare("SELECT *, UNIXEPOCH(datetime) AS unix_datetime FROM toot_tweet_record;")
.prepare("SELECT *, UNIXEPOCH(datetime) AS unix_datetime FROM toot_record;")
.unwrap();
let t_out = stmt
.query_row([], |row| {
Ok(TootTweetRecord {
Ok(TootRecord {
toot_id: row.get("toot_id").unwrap(),
tweet_id: row.get("tweet_id").unwrap(),
record_uri: row.get("record_uri").unwrap(),
root_record_uri: row.get("root_record_uri").unwrap(),
datetime: Some(
@@ -243,7 +228,6 @@ mod tests {
.unwrap();
assert_eq!(t_out.toot_id, 987654321);
assert_eq!(t_out.tweet_id, 123456789);
assert_eq!(t_out.record_uri, "a".to_string());
assert_eq!(t_out.root_record_uri, "c".to_string());
@@ -259,10 +243,10 @@ mod tests {
let conn = Connection::open(d).unwrap();
conn.execute(
"INSERT INTO toot_tweet_record (toot_id, tweet_id, record_uri)
"INSERT INTO toot_record (toot_id, record_uri)
VALUES
(101, 1001, 'abc'),
(102, 1002, 'def');",
(101, 'abc'),
(102, 'def');",
[],
)
.unwrap();
@@ -272,7 +256,6 @@ mod tests {
remove_file(d).unwrap();
assert_eq!(t_out.toot_id, 102);
assert_eq!(t_out.tweet_id, 1002);
assert_eq!(t_out.record_uri, "def".to_string());
}
@@ -300,9 +283,9 @@ mod tests {
let conn = Connection::open(d).unwrap();
conn.execute(
"INSERT INTO toot_tweet_record (toot_id, tweet_id, record_uri)
"INSERT INTO toot_record (toot_id, record_uri)
VALUES
(100, 1000, 'abc');",
(100, 'abc');",
[],
)
.unwrap();
@@ -323,9 +306,9 @@ mod tests {
let conn = Connection::open(d).unwrap();
conn.execute(
"INSERT INTO toot_tweet_record (toot_id, tweet_id, record_uri)
"INSERT INTO toot_record (toot_id, record_uri)
VALUES
(100, 1000, 'abc');",
(100, 'abc');",
[],
)
.unwrap();
@@ -335,7 +318,6 @@ mod tests {
remove_file(d).unwrap();
assert_eq!(t_out.toot_id, 100);
assert_eq!(t_out.tweet_id, 1000);
assert_eq!(t_out.record_uri, "abc".to_string());
}
@@ -348,10 +330,10 @@ mod tests {
let conn = Connection::open(d).unwrap();
conn.execute(
"INSERT INTO toot_tweet_record (toot_id, tweet_id, record_uri)
"INSERT INTO toot_record (toot_id, record_uri)
VALUES
(1000, 100, 'abc'),
(1000, 101, 'def');",
(1000, 'abc'),
(1000, 'def');",
[],
)
.unwrap();
@@ -361,7 +343,6 @@ mod tests {
remove_file(d).unwrap();
assert_eq!(t_out.toot_id, 1000);
assert_eq!(t_out.tweet_id, 101);
assert_eq!(t_out.record_uri, "def".to_string());
}
@@ -372,9 +353,11 @@ mod tests {
let conn = Connection::open(d).unwrap();
conn.execute(
"CREATE TABLE IF NOT EXISTS tweet_to_toot (
tweet_id INTEGER,
toot_id INTEGER PRIMARY KEY,
"CREATE TABLE IF NOT EXISTS toot_tweet_record (
toot_id INTEGER,
tweet_id INTEGER PRIMARY KEY,
record_uri VARCHAR(128) DEFAULT '',
root_record_uri VARCHAR(128) DEFAULT '',
datetime INTEGER DEFAULT CURRENT_TIMESTAMP
)",
[],
@@ -382,7 +365,7 @@ mod tests {
.unwrap();
conn.execute(
"INSERT INTO tweet_to_toot (tweet_id, toot_id) VALUES (0, 0), (1, 1);",
"INSERT INTO toot_tweet_record (tweet_id, toot_id, record_uri) VALUES (0, 0, ''), (1, 1, 'abc');",
[],
)
.unwrap();
@@ -391,7 +374,6 @@ mod tests {
let last_state = read_state(&conn, None).unwrap().unwrap();
assert_eq!(last_state.tweet_id, 1);
assert_eq!(last_state.toot_id, 1);
migrate_db(d).unwrap(); // shouldnt do anything
@@ -408,7 +390,7 @@ mod tests {
let conn = Connection::open(d).unwrap();
conn.execute(
"INSERT INTO toot_tweet_record(toot_id, tweet_id, record_uri) VALUES (0, 0, 'abc');",
"INSERT INTO toot_record(toot_id, record_uri) VALUES (0, 'abc');",
[],
)
.unwrap();
@@ -416,13 +398,12 @@ mod tests {
delete_state(&conn, 0).unwrap();
let mut stmt = conn
.prepare("SELECT *, UNIXEPOCH(datetime) AS unix_datetime FROM toot_tweet_record;")
.prepare("SELECT *, UNIXEPOCH(datetime) AS unix_datetime FROM toot_record;")
.unwrap();
let t_out = stmt.query_row([], |row| {
Ok(TootTweetRecord {
Ok(TootRecord {
toot_id: row.get("toot_id").unwrap(),
tweet_id: row.get("tweet_id").unwrap(),
record_uri: row.get("record_uri").unwrap(),
root_record_uri: row.get("root_record_uri").unwrap(),
datetime: Some(
@@ -434,7 +415,7 @@ mod tests {
assert!(t_out.is_err_and(|x| x == rusqlite::Error::QueryReturnedNoRows));
conn.execute(
"INSERT INTO toot_tweet_record(toot_id, tweet_id, record_uri) VALUES(42, 102, 'abc'), (42, 103, 'def');",
"INSERT INTO toot_record(toot_id, record_uri) VALUES(42, 'abc'), (42, 'def');",
[],
)
.unwrap();
@@ -442,13 +423,12 @@ mod tests {
delete_state(&conn, 42).unwrap();
let mut stmt = conn
.prepare("SELECT *, UNIXEPOCH(datetime) AS unix_datetime FROM toot_tweet_record;")
.prepare("SELECT *, UNIXEPOCH(datetime) AS unix_datetime FROM toot_record;")
.unwrap();
let t_out = stmt.query_row([], |row| {
Ok(TootTweetRecord {
Ok(TootRecord {
toot_id: row.get("toot_id").unwrap(),
tweet_id: row.get("tweet_id").unwrap(),
record_uri: row.get("record_uri").unwrap(),
root_record_uri: row.get("root_record_uri").unwrap(),
datetime: Some(
@@ -471,16 +451,13 @@ mod tests {
let conn = Connection::open(d).unwrap();
conn.execute(
"INSERT INTO toot_tweet_record (toot_id, tweet_id, record_uri) VALUES (42, 102, 'abc'), (42, 103, 'def'), (43, 105, 'ghi');",
"INSERT INTO toot_record (toot_id, record_uri) VALUES (42, 'abc'), (42, 'def'), (43, 'ghi');",
[],
)
.unwrap();
let (tweet_v1, record_v1) = read_all_state(&conn, 43).unwrap();
let (tweet_v2, record_v2) = read_all_state(&conn, 42).unwrap();
assert_eq!(tweet_v1, vec![105]);
assert_eq!(tweet_v2, vec![102, 103]);
let record_v1 = read_all_state(&conn, 43).unwrap();
let record_v2 = read_all_state(&conn, 42).unwrap();
assert_eq!(record_v1, vec!["ghi".to_string()]);
assert_eq!(record_v2, vec!["abc".to_string(), "def".to_string()]);

View File

@@ -1,556 +0,0 @@
use crate::config::TwitterConfig;
use crate::error::OolatoocsError;
use chrono::Utc;
use futures::{stream, StreamExt};
use log::{debug, error, warn};
use megalodon::entities::{
attachment::{Attachment, AttachmentType},
Poll,
};
use oauth1_request::Token;
use reqwest::{
multipart::{Form, Part},
Body, Client,
};
use serde::{Deserialize, Serialize};
use std::{error::Error, ops::Not};
use tokio::time::{sleep, Duration};
const TWITTER_API_TWEET_URL: &str = "https://api.twitter.com/2/tweets";
const TWITTER_UPLOAD_MEDIA_URL: &str = "https://upload.twitter.com/1.1/media/upload.json";
const TWITTER_METADATA_MEDIA_URL: &str =
"https://upload.twitter.com/1.1/media/metadata/create.json";
// I dont know, dont ask me
#[derive(oauth1_request::Request)]
struct EmptyRequest {}
#[derive(Serialize, Debug)]
struct Tweet {
text: String,
#[serde(skip_serializing_if = "Option::is_none")]
media: Option<TweetMediasIds>,
#[serde(skip_serializing_if = "Option::is_none")]
reply: Option<TweetReply>,
#[serde(skip_serializing_if = "Option::is_none")]
poll: Option<TweetPoll>,
}
#[derive(Serialize, Debug)]
struct TweetMediasIds {
media_ids: Vec<String>,
}
#[derive(Serialize, Debug)]
struct TweetReply {
in_reply_to_tweet_id: String,
}
#[derive(Serialize, Debug)]
pub struct TweetPoll {
pub options: Vec<String>,
pub duration_minutes: u16,
}
#[derive(Deserialize, Debug)]
struct TweetResponse {
data: TweetResponseData,
}
#[derive(Deserialize, Debug)]
struct TweetResponseData {
id: String,
}
#[derive(Deserialize, Debug)]
struct UploadMediaResponse {
media_id: u64,
processing_info: Option<UploadMediaResponseProcessingInfo>,
}
#[derive(Deserialize, Debug)]
struct UploadMediaResponseProcessingInfo {
state: UploadMediaResponseProcessingInfoState,
check_after_secs: Option<u64>,
}
#[derive(Deserialize, Debug)]
enum UploadMediaResponseProcessingInfoState {
#[serde(rename = "failed")]
Failed,
#[serde(rename = "succeeded")]
Succeeded,
#[serde(rename = "pending")]
Pending,
#[serde(rename = "in_progress")]
InProgress,
}
#[derive(Serialize, Debug)]
struct MediaMetadata {
media_id: u64,
alt_text: MediaMetadataAltText,
}
#[derive(Serialize, Debug)]
struct MediaMetadataAltText {
text: String,
}
#[derive(Serialize, Debug, oauth1_request::Request)]
struct UploadMediaCommand {
command: String,
media_id: String,
}
/// This function returns the OAuth1 Token object from TwitterConfig
fn get_token(config: &TwitterConfig) -> Token {
oauth1_request::Token::from_parts(
config.consumer_key.to_string(),
config.consumer_secret.to_string(),
config.oauth_token.to_string(),
config.oauth_token_secret.to_string(),
)
}
/// This functions deletes a tweet, given its id
pub async fn delete_tweet(config: &TwitterConfig, id: u64) -> Result<(), Box<dyn Error>> {
debug!("Deleting Tweet {}", id);
let empty_request = EmptyRequest {}; // Why? Because fuck you, thats why!
let token = get_token(config);
let delete_uri = format!("{}/{}", TWITTER_API_TWEET_URL, id);
let client = Client::new();
let res = client
.delete(&delete_uri)
.header(
"Authorization",
oauth1_request::delete(
&delete_uri,
&empty_request,
&token,
oauth1_request::HMAC_SHA1,
),
)
.send()
.await?;
if !res.status().is_success() {
return Err(OolatoocsError::new(&format!("Cannot delete Tweet {}", id)).into());
}
Ok(())
}
/// This function generates a media_ids vec to be used by Twitter
pub async fn generate_media_ids(config: &TwitterConfig, media_attach: &[Attachment]) -> Vec<u64> {
let mut medias: Vec<u64> = vec![];
let media_attachments = media_attach.to_owned();
let mut stream = stream::iter(media_attachments)
.map(|media| {
let twitter_config = config.clone();
tokio::task::spawn(async move {
match media.r#type {
AttachmentType::Image => {
upload_simple_media(&twitter_config, &media.url, &media.description).await
}
AttachmentType::Gifv => {
upload_chunk_media(&twitter_config, &media.url, "tweet_gif").await
}
AttachmentType::Video => {
upload_chunk_media(&twitter_config, &media.url, "tweet_video").await
}
_ => Err::<u64, Box<dyn Error + Send + Sync>>(
OolatoocsError::new(&format!(
"Cannot treat this type of media: {}",
&media.url
))
.into(),
),
}
})
})
.buffered(4);
while let Some(result) = stream.next().await {
match result {
Ok(Ok(v)) => medias.push(v),
Ok(Err(e)) => warn!("Cannot treat media: {}", e),
Err(e) => error!("Something went wrong when joining the main thread: {}", e),
}
}
medias
}
/// This function uploads simple images from Mastodon to Twitter and returns the media id from Twitter
async fn upload_simple_media(
config: &TwitterConfig,
u: &str,
d: &Option<String>,
) -> Result<u64, Box<dyn Error + Send + Sync>> {
// initiate request parameters
let empty_request = EmptyRequest {}; // Why? Because fuck you, thats why!
let token = get_token(config);
// retrieve the length and bytes stream from the given URL
let dl = reqwest::get(u).await?;
let content_length = dl
.content_length()
.ok_or(format!("Cannot get content length for {}", u))?;
let stream = dl.bytes_stream();
debug!("Ref download URL: {}", u);
// upload the media
let client = Client::new();
let res = client
.post(TWITTER_UPLOAD_MEDIA_URL)
.header(
"Authorization",
oauth1_request::post(
TWITTER_UPLOAD_MEDIA_URL,
&empty_request,
&token,
oauth1_request::HMAC_SHA1,
),
)
.multipart(Form::new().part(
"media",
Part::stream_with_length(Body::wrap_stream(stream), content_length),
))
.send()
.await?
.json::<UploadMediaResponse>()
.await?;
debug!("Media ID: {}", res.media_id);
// update the metadata
if let Some(metadata) = d {
debug!("Metadata found! Processing…");
metadata_create(config, res.media_id, metadata).await?;
}
Ok(res.media_id)
}
/// This function updates the metadata given the current media_id and token
async fn metadata_create(
config: &TwitterConfig,
id: u64,
m: &str,
) -> Result<(), Box<dyn Error + Send + Sync>> {
let token = get_token(config);
let empty_request = EmptyRequest {};
let media_metadata = MediaMetadata {
media_id: id,
alt_text: MediaMetadataAltText {
text: m.to_string(),
},
};
debug!("Metadata to process: {}", m);
let client = Client::new();
let metadata = client
.post(TWITTER_METADATA_MEDIA_URL)
.header(
"Authorization",
oauth1_request::post(
TWITTER_METADATA_MEDIA_URL,
&empty_request,
&token,
oauth1_request::HMAC_SHA1,
),
)
.json(&media_metadata)
.send()
.await?;
debug!("Metadata processed with return code: {}", metadata.status());
Ok(())
}
/// This posts video/gif to Twitter and returns the media id from Twitter
async fn upload_chunk_media(
config: &TwitterConfig,
u: &str,
t: &str,
) -> Result<u64, Box<dyn Error + Send + Sync>> {
let empty_request = EmptyRequest {};
let token = get_token(config);
// retrieve the length, type and bytes stream from the given URL
let mut dl = reqwest::get(u).await?;
let content_length = dl
.content_length()
.ok_or(format!("Cannot get content length for {}", u))?;
let content_headers = dl.headers().clone();
let content_type = content_headers
.get("Content-Type")
.ok_or(format!("Cannot get content type for {}", u))?
.to_str()?;
debug!("Init the slot for uploading media: {}", u);
// init the slot for uploading
let client = Client::new();
let orig_media_id = client
.post(TWITTER_UPLOAD_MEDIA_URL)
.header(
"Authorization",
oauth1_request::post(
TWITTER_UPLOAD_MEDIA_URL,
&empty_request,
&token,
oauth1_request::HMAC_SHA1,
),
)
.multipart(
Form::new()
.text("command", "INIT")
.text("media_type", content_type.to_owned())
.text("total_bytes", content_length.to_string())
.text("media_category", t.to_string()),
)
.send()
.await?
.json::<UploadMediaResponse>()
.await?;
debug!("Slot initiated with ID: {}", orig_media_id.media_id);
debug!("Appending media to ID: {}", orig_media_id.media_id);
// append the media to the corresponding slot
let mut segment: u8 = 0;
while let Some(chunk) = dl.chunk().await? {
debug!(
"Appending segment {} for media ID {}",
segment, orig_media_id.media_id
);
let chunk_size: u64 = chunk.len().try_into().unwrap();
let res = client
.post(TWITTER_UPLOAD_MEDIA_URL)
.header(
"Authorization",
oauth1_request::post(
TWITTER_UPLOAD_MEDIA_URL,
&empty_request,
&token,
oauth1_request::HMAC_SHA1,
),
)
.multipart(
Form::new()
.text("command", "APPEND")
.text("media_id", orig_media_id.media_id.to_string())
.text("segment_index", segment.to_string())
.part("media", Part::stream_with_length(chunk, chunk_size)),
)
.send()
.await?;
if !res.status().is_success() {
return Err(
OolatoocsError::new(&format!("Cannot upload part {} of {}", segment, u)).into(),
);
}
segment += 1;
}
debug!("Finalize media ID: {}", orig_media_id.media_id);
// Finalizing task
let fin = client
.post(TWITTER_UPLOAD_MEDIA_URL)
.header(
"Authorization",
oauth1_request::post(
TWITTER_UPLOAD_MEDIA_URL,
&empty_request,
&token,
oauth1_request::HMAC_SHA1,
),
)
.multipart(
Form::new()
.text("command", "FINALIZE")
.text("media_id", orig_media_id.media_id.to_string()),
)
.send()
.await?
.json::<UploadMediaResponse>()
.await?;
if let Some(p_info) = fin.processing_info {
if let Some(wait_sec) = p_info.check_after_secs {
debug!(
"Processing is not finished yet for ID {}, waiting {} secs",
orig_media_id.media_id, wait_sec
);
// getting here, we have a status and a check_after_secs
// this status can be anything but we will check it afterwards
// whatever happens, we can wait here before proceeding
sleep(Duration::from_secs(wait_sec)).await;
let command = UploadMediaCommand {
command: "STATUS".to_string(),
media_id: orig_media_id.media_id.to_string(),
};
loop {
debug!(
"Checking on status for ID {} after waiting {} secs",
orig_media_id.media_id, wait_sec
);
let status = client
.get(TWITTER_UPLOAD_MEDIA_URL)
.header(
"Authorization",
oauth1_request::get(
TWITTER_UPLOAD_MEDIA_URL,
&command,
&token,
oauth1_request::HMAC_SHA1,
),
)
.query(&command)
.send()
.await?
.json::<UploadMediaResponse>()
.await?;
let p_status = status.processing_info.unwrap(); // shouldnt be None at this point
match p_status.state {
UploadMediaResponseProcessingInfoState::Failed => {
debug!("Processing has failed!");
return Err(OolatoocsError::new(&format!(
"Upload for {} (id: {}) has failed",
u, orig_media_id.media_id
))
.into());
}
UploadMediaResponseProcessingInfoState::Succeeded => {
debug!("Processing has succeeded, exiting loop!");
break;
}
UploadMediaResponseProcessingInfoState::Pending
| UploadMediaResponseProcessingInfoState::InProgress => {
debug!(
"Processing still pending, waiting {} secs more…",
p_status.check_after_secs.unwrap() // unwrap is safe here,
// check_after_secs is only present
// when status is pending or in
// progress
);
sleep(Duration::from_secs(p_status.check_after_secs.unwrap())).await;
continue;
}
}
}
}
}
Ok(orig_media_id.media_id)
}
pub fn transform_poll(p: &Poll) -> TweetPoll {
let poll_end_datetime = p.expires_at.unwrap(); // should be safe at this point
let now = Utc::now();
let diff = poll_end_datetime.signed_duration_since(now);
TweetPoll {
options: p
.options
.iter()
.map(|i| i.title.chars().take(25).collect::<String>())
.collect(),
duration_minutes: diff.num_minutes().try_into().unwrap(), // safe here, number is positive
// and cant be over 21600
}
}
/// This posts Tweets with all the associated medias
pub async fn post_tweet(
config: &TwitterConfig,
content: &str,
medias: Vec<u64>,
reply_to: Option<u64>,
poll: Option<TweetPoll>,
) -> Result<u64, Box<dyn Error>> {
let empty_request = EmptyRequest {}; // Why? Because fuck you, thats why!
let token = get_token(config);
let tweet = Tweet {
text: content.to_string(),
media: medias.is_empty().not().then(|| TweetMediasIds {
media_ids: medias.iter().map(|m| m.to_string()).collect(),
}),
reply: reply_to.map(|s| TweetReply {
in_reply_to_tweet_id: s.to_string(),
}),
poll,
};
let client = Client::new();
let res = client
.post(TWITTER_API_TWEET_URL)
.header(
"Authorization",
oauth1_request::post(
TWITTER_API_TWEET_URL,
&empty_request,
&token,
oauth1_request::HMAC_SHA1,
),
)
.json(&tweet)
.send()
.await?
.json::<TweetResponse>()
.await?;
Ok(res.data.id.parse::<u64>().unwrap())
}
#[cfg(test)]
mod tests {
use super::*;
use megalodon::entities::PollOption;
#[test]
fn test_transform_poll() {
let poll = Poll {
id: "youpi".to_string(),
expires_at: Some(Utc::now()),
expired: false,
multiple: false,
votes_count: 0,
voters_count: None,
options: vec![
PollOption {
title: "Je suis beaucoup trop long comme option, tronque-moi !".to_string(),
votes_count: None,
},
PollOption {
title: "nope".to_string(),
votes_count: None,
},
],
voted: None,
emojis: vec![],
};
let tweet_poll_res = transform_poll(&poll);
let tweet_pool_expected = TweetPoll {
duration_minutes: 0,
options: vec!["Je suis beaucoup trop lon".to_string(), "nope".to_string()],
};
assert_eq!(tweet_poll_res.options, tweet_pool_expected.options);
}
}