use chrono::{DateTime, Utc}; use log::debug; use rusqlite::{params, Connection, OptionalExtension}; use std::error::Error; /// Struct for each query line #[derive(Debug)] pub struct TootRecord { // Mastodon part pub toot_id: u64, // Bluesky part pub record_uri: String, pub root_record_uri: String, pub datetime: Option>, } /// Deletes a given state pub fn delete_state(conn: &Connection, toot_id: u64) -> Result<(), Box> { debug!("Deleting Toot ID {}", toot_id); conn.execute( &format!("DELETE FROM toot_record WHERE toot_id = {}", toot_id), [], )?; Ok(()) } /// Retrieves all tweets associated to a toot in the form of a vector pub fn read_all_state(conn: &Connection, toot_id: u64) -> Result, Box> { let query = format!( "SELECT record_uri FROM toot_record WHERE toot_id = {};", toot_id ); let mut stmt = conn.prepare(&query)?; let mut rows = stmt.query([])?; let mut record_v: Vec = Vec::new(); while let Some(row) = rows.next()? { record_v.push(row.get(0)?); } Ok(record_v) } /// if None is passed, read the last tweet from DB /// if a tweet_id is passed, read this particular tweet from DB pub fn read_state(conn: &Connection, s: Option) -> Result, Box> { debug!("Reading toot_id {:?}", s); let begin_query = "SELECT *, UNIXEPOCH(datetime) AS unix_datetime FROM toot_record"; let query: String = match s { Some(i) => format!("{begin_query} WHERE toot_id = {i} ORDER BY record_uri DESC LIMIT 1"), None => format!("{begin_query} ORDER BY toot_id DESC LIMIT 1"), }; let mut stmt = conn.prepare(&query)?; let t = stmt .query_row([], |row| { Ok(TootRecord { toot_id: row.get("toot_id")?, record_uri: row.get("record_uri")?, root_record_uri: row.get("root_record_uri")?, datetime: Some( DateTime::from_timestamp(row.get("unix_datetime").unwrap(), 0).unwrap(), ), }) }) .optional()?; Ok(t) } /// Writes last treated tweet id and toot id to the db pub fn write_state(conn: &Connection, t: TootRecord) -> Result<(), Box> { debug!("Write struct {:?}", t); conn.execute( "INSERT INTO toot_record (toot_id, record_uri, root_record_uri) VALUES (?1, ?2, ?3)", params![t.toot_id, t.record_uri, t.root_record_uri], )?; Ok(()) } /// Initiates the DB from path pub fn init_db(d: &str) -> Result<(), Box> { debug!("Initializing DB for {}", env!("CARGO_PKG_NAME")); let conn = Connection::open(d)?; conn.execute( "CREATE TABLE IF NOT EXISTS toot_record ( toot_id INTEGER, record_uri VARCHAR(128) PRIMARY KEY, root_record_uri VARCHAR(128) DEFAULT '', datetime INTEGER DEFAULT CURRENT_TIMESTAMP )", [], )?; Ok(()) } /// Migrate DB from 3+ to 4+ pub fn migrate_db(d: &str) -> Result<(), Box> { debug!("Migration DB for Oolatoocs"); let conn = Connection::open(d)?; let res = conn.execute("SELECT datetime FROM toot_record;", []); // If the column can be selected then, it’s OK // if not, see if the error is a missing column and add it match res { Err(e) => match e.to_string().as_str() { "no such table: toot_record" => migrate_db_alter_table(&conn), // table does not exist "Execute returned results - did you mean to call query?" => Ok(()), // return results, // column does // exist _ => Err(e.into()), }, Ok(_) => Ok(()), } } /// Creates a new table, copy the data from the old table and rename it fn migrate_db_alter_table(c: &Connection) -> Result<(), Box> { // create the new table c.execute( "CREATE TABLE IF NOT EXISTS toot_record ( toot_id INTEGER, record_uri VARCHAR(128) PRIMARY KEY, root_record_uri VARCHAR(128) DEFAULT '', datetime INTEGER DEFAULT CURRENT_TIMESTAMP )", [], )?; // copy data from the old table c.execute( "INSERT INTO toot_record (toot_id, record_uri, root_record_uri, datetime) SELECT toot_id, record_uri, root_record_uri, datetime FROM toot_tweet_record WHERE record_uri != '';", [], )?; // drop the old table c.execute("DROP TABLE IF EXISTS toot_tweet_record;", [])?; Ok(()) } #[cfg(test)] mod tests { use super::*; use std::{fs::remove_file, path::Path}; #[test] fn test_init_db() { let d = "/tmp/test_init_db.sqlite"; init_db(d).unwrap(); // check that file exist assert!(Path::new(d).exists()); // open said file let conn = Connection::open(d).unwrap(); conn.execute("SELECT * from toot_record;", []).unwrap(); remove_file(d).unwrap(); } #[test] fn test_init_init_db() { // init_db fn should be idempotent so let’s test that let d = "/tmp/test_init_init_db.sqlite"; init_db(d).unwrap(); let conn = Connection::open(d).unwrap(); conn.execute( "INSERT INTO toot_record (record_uri, toot_id) VALUES ('a', 1001);", [], ) .unwrap(); init_db(d).unwrap(); remove_file(d).unwrap(); } #[test] fn test_write_state() { let d = "/tmp/test_write_state.sqlite"; init_db(d).unwrap(); let conn = Connection::open(d).unwrap(); let t_in = TootRecord { toot_id: 987654321, record_uri: "a".to_string(), root_record_uri: "c".to_string(), datetime: None, }; write_state(&conn, t_in).unwrap(); let mut stmt = conn .prepare("SELECT *, UNIXEPOCH(datetime) AS unix_datetime FROM toot_record;") .unwrap(); let t_out = stmt .query_row([], |row| { Ok(TootRecord { toot_id: row.get("toot_id").unwrap(), record_uri: row.get("record_uri").unwrap(), root_record_uri: row.get("root_record_uri").unwrap(), datetime: Some( DateTime::from_timestamp(row.get("unix_datetime").unwrap(), 0).unwrap(), ), }) }) .unwrap(); assert_eq!(t_out.toot_id, 987654321); assert_eq!(t_out.record_uri, "a".to_string()); assert_eq!(t_out.root_record_uri, "c".to_string()); remove_file(d).unwrap(); } #[test] fn test_none_to_tweet_id_read_state() { let d = "/tmp/test_none_to_tweet_id_read_state.sqlite"; init_db(d).unwrap(); let conn = Connection::open(d).unwrap(); conn.execute( "INSERT INTO toot_record (toot_id, record_uri) VALUES (101, 'abc'), (102, 'def');", [], ) .unwrap(); let t_out = read_state(&conn, None).unwrap().unwrap(); remove_file(d).unwrap(); assert_eq!(t_out.toot_id, 102); assert_eq!(t_out.record_uri, "def".to_string()); } #[test] fn test_none_to_none_read_state() { let d = "/tmp/test_none_to_none_read_state.sqlite"; init_db(d).unwrap(); let conn = Connection::open(d).unwrap(); let t_out = read_state(&conn, None).unwrap(); remove_file(d).unwrap(); assert!(t_out.is_none()); } #[test] fn test_tweet_id_to_none_read_state() { let d = "/tmp/test_tweet_id_to_none_read_state.sqlite"; init_db(d).unwrap(); let conn = Connection::open(d).unwrap(); conn.execute( "INSERT INTO toot_record (toot_id, record_uri) VALUES (100, 'abc');", [], ) .unwrap(); let t_out = read_state(&conn, Some(1200)).unwrap(); remove_file(d).unwrap(); assert!(t_out.is_none()); } #[test] fn test_tweet_id_to_tweet_id_read_state() { let d = "/tmp/test_tweet_id_to_tweet_id_read_state.sqlite"; init_db(d).unwrap(); let conn = Connection::open(d).unwrap(); conn.execute( "INSERT INTO toot_record (toot_id, record_uri) VALUES (100, 'abc');", [], ) .unwrap(); let t_out = read_state(&conn, Some(100)).unwrap().unwrap(); remove_file(d).unwrap(); assert_eq!(t_out.toot_id, 100); assert_eq!(t_out.record_uri, "abc".to_string()); } #[test] fn test_last_toot_id_read_state() { let d = "/tmp/test_last_toot_id_read_state.sqlite"; init_db(d).unwrap(); let conn = Connection::open(d).unwrap(); conn.execute( "INSERT INTO toot_record (toot_id, record_uri) VALUES (1000, 'abc'), (1000, 'def');", [], ) .unwrap(); let t_out = read_state(&conn, Some(1000)).unwrap().unwrap(); remove_file(d).unwrap(); assert_eq!(t_out.toot_id, 1000); assert_eq!(t_out.record_uri, "def".to_string()); } #[test] fn test_migrate_db() { // this should be idempotent let d = "/tmp/test_migrate_db.sqlite"; let conn = Connection::open(d).unwrap(); conn.execute( "CREATE TABLE IF NOT EXISTS toot_tweet_record ( toot_id INTEGER, tweet_id INTEGER PRIMARY KEY, record_uri VARCHAR(128) DEFAULT '', root_record_uri VARCHAR(128) DEFAULT '', datetime INTEGER DEFAULT CURRENT_TIMESTAMP )", [], ) .unwrap(); conn.execute( "INSERT INTO toot_tweet_record (tweet_id, toot_id, record_uri) VALUES (0, 0, ''), (1, 1, 'abc');", [], ) .unwrap(); migrate_db(d).unwrap(); let last_state = read_state(&conn, None).unwrap().unwrap(); assert_eq!(last_state.toot_id, 1); migrate_db(d).unwrap(); // shouldn’t do anything remove_file(d).unwrap(); } #[test] fn test_delete_state() { let d = "/tmp/test_delete_state.sqlite"; init_db(d).unwrap(); let conn = Connection::open(d).unwrap(); conn.execute( "INSERT INTO toot_record(toot_id, record_uri) VALUES (0, 'abc');", [], ) .unwrap(); delete_state(&conn, 0).unwrap(); let mut stmt = conn .prepare("SELECT *, UNIXEPOCH(datetime) AS unix_datetime FROM toot_record;") .unwrap(); let t_out = stmt.query_row([], |row| { Ok(TootRecord { toot_id: row.get("toot_id").unwrap(), record_uri: row.get("record_uri").unwrap(), root_record_uri: row.get("root_record_uri").unwrap(), datetime: Some( DateTime::from_timestamp(row.get("unix_datetime").unwrap(), 0).unwrap(), ), }) }); assert!(t_out.is_err_and(|x| x == rusqlite::Error::QueryReturnedNoRows)); conn.execute( "INSERT INTO toot_record(toot_id, record_uri) VALUES(42, 'abc'), (42, 'def');", [], ) .unwrap(); delete_state(&conn, 42).unwrap(); let mut stmt = conn .prepare("SELECT *, UNIXEPOCH(datetime) AS unix_datetime FROM toot_record;") .unwrap(); let t_out = stmt.query_row([], |row| { Ok(TootRecord { toot_id: row.get("toot_id").unwrap(), record_uri: row.get("record_uri").unwrap(), root_record_uri: row.get("root_record_uri").unwrap(), datetime: Some( DateTime::from_timestamp(row.get("unix_datetime").unwrap(), 0).unwrap(), ), }) }); assert!(t_out.is_err_and(|x| x == rusqlite::Error::QueryReturnedNoRows)); remove_file(d).unwrap(); } #[test] fn test_read_all_state() { let d = "/tmp/read_all_state.sqlite"; init_db(d).unwrap(); let conn = Connection::open(d).unwrap(); conn.execute( "INSERT INTO toot_record (toot_id, record_uri) VALUES (42, 'abc'), (42, 'def'), (43, 'ghi');", [], ) .unwrap(); let record_v1 = read_all_state(&conn, 43).unwrap(); let record_v2 = read_all_state(&conn, 42).unwrap(); assert_eq!(record_v1, vec!["ghi".to_string()]); assert_eq!(record_v2, vec!["abc".to_string(), "def".to_string()]); remove_file(d).unwrap(); } }