From 0451543d6dcbc2abdf74f11cd3690adaa71edbad Mon Sep 17 00:00:00 2001 From: VC Date: Fri, 6 Oct 2023 16:21:11 +0200 Subject: [PATCH] feat: async + stream --- Cargo.lock | 101 +++++++++++++++++++++++++++++++++++++++++++++++- Cargo.toml | 7 +++- src/lib.rs | 68 ++++++++++---------------------- src/peertube.rs | 18 +++++---- src/youtube.rs | 62 +++++++++++++++-------------- 5 files changed, 170 insertions(+), 86 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index d2b9336..bc8b1ff 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -289,6 +289,17 @@ version = "0.3.28" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4fff74096e71ed47f8e023204cfd0aa1289cd54ae5430a9523be060cdb849964" +[[package]] +name = "futures-macro" +version = "0.3.28" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "89ca545a94061b6365f2c7355b4b32bd20df3ff95f02da9329b34ccc3bd6ee72" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "futures-sink" version = "0.3.28" @@ -309,6 +320,8 @@ checksum = "26b01e40b772d54cf6c6d721c1d1abd0647a0106a12ecaa1c186273392a69533" dependencies = [ "futures-core", "futures-io", + "futures-macro", + "futures-sink", "futures-task", "memchr", "pin-project-lite", @@ -500,6 +513,16 @@ version = "0.4.7" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1a9bad9f94746442c783ca431b22403b519cd7fbeed0533fdd6328b2f2212128" +[[package]] +name = "lock_api" +version = "0.4.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c1cc9717a20b1bb222f333e6a92fd32f7d8a18ddc5a3191a11af45dcbf4dcd16" +dependencies = [ + "autocfg", + "scopeguard", +] + [[package]] name = "log" version = "0.4.20" @@ -625,6 +648,29 @@ dependencies = [ "vcpkg", ] +[[package]] +name = "parking_lot" +version = "0.12.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3742b2c103b9f06bc9fff0a37ff4912935851bee6d36f3c02bcc755bcfec228f" +dependencies = [ + "lock_api", + "parking_lot_core", +] + +[[package]] +name = "parking_lot_core" +version = "0.9.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "93f00c865fe7cabf650081affecd3871070f26767e7b2070a3ffae14c654b447" +dependencies = [ + "cfg-if", + "libc", + "redox_syscall", + "smallvec", + "windows-targets", +] + [[package]] name = "percent-encoding" version = "2.3.0" @@ -734,10 +780,12 @@ dependencies = [ "serde_urlencoded", "tokio", "tokio-native-tls", + "tokio-util", "tower-service", "url", "wasm-bindgen", "wasm-bindgen-futures", + "wasm-streams", "web-sys", "winreg", ] @@ -776,6 +824,12 @@ dependencies = [ "windows-sys", ] +[[package]] +name = "scopeguard" +version = "1.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "94143f37725109f92c262ed2cf5e59bce7498c01bcc1502d7b9afe439a4e9f49" + [[package]] name = "security-framework" version = "2.9.2" @@ -842,6 +896,15 @@ dependencies = [ "serde", ] +[[package]] +name = "signal-hook-registry" +version = "1.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d8229b473baa5980ac72ef434c4415e70c4b5e71b423043adb4ba059f89c99a1" +dependencies = [ + "libc", +] + [[package]] name = "slab" version = "0.4.9" @@ -851,6 +914,12 @@ dependencies = [ "autocfg", ] +[[package]] +name = "smallvec" +version = "1.11.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "942b4a808e05215192e39f4ab80813e599068285906cc91aa64f923db842bd5a" + [[package]] name = "socket2" version = "0.4.9" @@ -936,11 +1005,25 @@ dependencies = [ "libc", "mio", "num_cpus", + "parking_lot", "pin-project-lite", + "signal-hook-registry", "socket2 0.5.4", + "tokio-macros", "windows-sys", ] +[[package]] +name = "tokio-macros" +version = "2.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "630bdcf245f78637c13ec01ffae6187cca34625e8c63150d424b59e55af2675e" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "tokio-native-tls" version = "0.3.1" @@ -976,13 +1059,16 @@ dependencies = [ [[package]] name = "tootube" -version = "0.2.0" +version = "0.3.0" dependencies = [ + "bytes", "clap", "env_logger", + "futures-core", "log", "reqwest", "serde", + "tokio", "toml", ] @@ -1143,6 +1229,19 @@ version = "0.2.87" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ca6ad05a4870b2bf5fe995117d3728437bd27d7cd5f06f13c17443ef369775a1" +[[package]] +name = "wasm-streams" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b4609d447824375f43e1ffbc051b50ad8f4b3ae8219680c94452ea05eb240ac7" +dependencies = [ + "futures-util", + "js-sys", + "wasm-bindgen", + "wasm-bindgen-futures", + "web-sys", +] + [[package]] name = "web-sys" version = "0.3.64" diff --git a/Cargo.toml b/Cargo.toml index de4e77b..437b0bc 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,18 +1,21 @@ [package] name = "tootube" authors = ["VC "] -version = "0.2.0" +version = "0.3.0" edition = "2021" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] -reqwest = { version = "^0.11", features = ["blocking", "json"] } +reqwest = { version = "^0.11", features = ["json", "stream"] } +tokio = { version = "^1", features = ["full"] } clap = "^4" serde = { version = "1.0", features = ["derive"] } toml = "^0.5" log = "^0.4" env_logger = "^0.10" +futures-core = "0.3.28" +bytes = "1.5.0" [profile.release] strip = true diff --git a/src/lib.rs b/src/lib.rs index fd7b5fa..848fce3 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,13 +1,9 @@ -use std::{ - error::Error, - fs::create_dir_all, - fs::{remove_file, File}, -}; +use std::error::Error; -use reqwest::Url; +use bytes::Bytes; +use futures_core::stream::Stream; mod error; -use error::TootubeError; mod config; pub use config::parse_toml; @@ -17,58 +13,34 @@ mod peertube; use peertube::{get_latest_video, get_max_resolution_dl}; mod youtube; -use youtube::{create_resumable_upload, upload_video}; +use youtube::{create_resumable_upload, now_kiss}; -const TMP_DIR: &str = "/tmp/tootube"; - -fn dl_video(u: &str) -> Result> { - // create dir - create_dir_all(TMP_DIR)?; - - // get file - let mut response = reqwest::blocking::get(u)?; - - // create local file - let url = Url::parse(u)?; - let dest_filename = url - .path_segments() - .ok_or_else(|| { - TootubeError::new(&format!( - "Cannot determine the destination filename for {u}" - )) - })? - .last() - .ok_or_else(|| { - TootubeError::new(&format!( - "Cannot determine the destination filename for {u}" - )) - })?; - - let dest_filepath = format!("{TMP_DIR}/{dest_filename}"); - - let mut dest_file = File::create(&dest_filepath)?; - - response.copy_to(&mut dest_file)?; - - Ok(dest_filepath) +async fn get_dl_video_stream( + u: &str, +) -> Result>, Box> { + Ok(reqwest::get(u).await?.bytes_stream()) } -pub fn run(config: Config) { +#[tokio::main] +pub async fn run(config: Config) { // Get the latest video object - let latest_vid = get_latest_video(&config.peertube.base_url).unwrap_or_else(|e| { - panic!("Cannot retrieve the latest video, something must have gone terribly wrong: {e}") - }); + let latest_vid = get_latest_video(&config.peertube.base_url) + .await + .unwrap_or_else(|e| { + panic!("Cannot retrieve the latest video, something must have gone terribly wrong: {e}") + }); let dl_url = get_max_resolution_dl(latest_vid.streaming_playlists.as_ref().unwrap()); - let local_path = dl_video(&dl_url) + let pt_stream = get_dl_video_stream(&dl_url) + .await .unwrap_or_else(|e| panic!("Cannot download video at URL {}: {}", dl_url, e)); let resumable_upload_url = create_resumable_upload(&config.youtube, &latest_vid) + .await .unwrap_or_else(|e| panic!("Cannot retrieve the upload’s resumable id: {e}")); - upload_video(&local_path, &resumable_upload_url, &config.youtube) + now_kiss(pt_stream, &resumable_upload_url, &config.youtube) + .await .unwrap_or_else(|e| panic!("Cannot resume upload!: {e}")); - - remove_file(&local_path).unwrap_or_else(|e| panic!("Cannot delete file {}: {}", local_path, e)); } diff --git a/src/peertube.rs b/src/peertube.rs index ca945f4..d4e025a 100644 --- a/src/peertube.rs +++ b/src/peertube.rs @@ -36,11 +36,13 @@ pub struct PeerTubeVideoStreamingPlaylistsFilesResolution { } /// This gets the last video uploaded to the PeerTube server -pub fn get_latest_video(u: &str) -> Result> { - let body = reqwest::blocking::get(format!("{}/api/v1/videos?count=1&sort=-publishedAt", u))? - .json::()?; +pub async fn get_latest_video(u: &str) -> Result> { + let body = reqwest::get(format!("{}/api/v1/videos?count=1&sort=-publishedAt", u)) + .await? + .json::() + .await?; - let vid = get_video_detail(u, &body.data[0].uuid)?; + let vid = get_video_detail(u, &body.data[0].uuid).await?; Ok(vid) } @@ -61,9 +63,11 @@ pub fn get_max_resolution_dl(p: &[PeerTubeVideoStreamingPlaylists]) -> String { } /// This gets all the crispy details about one particular video -fn get_video_detail(u: &str, v: &str) -> Result> { - let body = - reqwest::blocking::get(format!("{}/api/v1/videos/{}", u, v))?.json::()?; +async fn get_video_detail(u: &str, v: &str) -> Result> { + let body = reqwest::get(format!("{}/api/v1/videos/{}", u, v)) + .await? + .json::() + .await?; Ok(body) } diff --git a/src/youtube.rs b/src/youtube.rs index 630e0c6..b5bb729 100644 --- a/src/youtube.rs +++ b/src/youtube.rs @@ -1,8 +1,12 @@ use crate::{config::YoutubeConfig, error::TootubeError, peertube::PeerTubeVideo}; +use bytes::Bytes; +use futures_core::stream::Stream; +use reqwest::{Body, Client}; use serde::{Deserialize, Serialize}; -use std::{error::Error, fs::File, sync::Mutex}; +use std::error::Error; +use tokio::sync::OnceCell; -static ACCESS_TOKEN: Mutex = Mutex::new(String::new()); +static ACCESS_TOKEN: OnceCell = OnceCell::const_new(); #[derive(Serialize, Debug)] struct RefreshTokenRequest { @@ -79,9 +83,9 @@ impl Default for YoutubeUploadParamsStatus { } /// Ensures that Token has been refreshed and that it is unique -fn refresh_token(config: &YoutubeConfig) -> Result> { - if let Ok(mut unlocked_access_token) = ACCESS_TOKEN.lock() { - if unlocked_access_token.is_empty() { +async fn refresh_token(config: &YoutubeConfig) -> Result { + ACCESS_TOKEN + .get_or_try_init(|| async { let refresh_token = RefreshTokenRequest { refresh_token: config.refresh_token.clone(), client_id: config.client_id.clone(), @@ -89,26 +93,26 @@ fn refresh_token(config: &YoutubeConfig) -> Result> { ..Default::default() }; - let client = reqwest::blocking::Client::new(); + let client = Client::new(); let res = client .post("https://accounts.google.com/o/oauth2/token") .json(&refresh_token) - .send()?; + .send() + .await?; - let access_token: AccessTokenResponse = res.json()?; + let access_token: AccessTokenResponse = res.json().await?; - *unlocked_access_token = access_token.access_token.clone(); - } - } - - Ok(ACCESS_TOKEN.lock().unwrap().to_string()) + Ok(access_token.access_token) + }) + .await + .cloned() } -pub fn create_resumable_upload( +pub async fn create_resumable_upload( config: &YoutubeConfig, vid: &PeerTubeVideo, ) -> Result> { - let access_token = refresh_token(config)?; + let access_token = refresh_token(config).await?; let upload_params = YoutubeUploadParams { snippet: { @@ -126,12 +130,12 @@ pub fn create_resumable_upload( }, }; - let client = reqwest::blocking::Client::new(); + let client = Client::new(); let res = client.post("https://www.googleapis.com/upload/youtube/v3/videos?uploadType=resumable&part=snippet%2Cstatus") .header("Authorization", format!("Bearer {}", access_token)) .json(&upload_params) - .send()?; + .send().await?; if res.status().is_success() { Ok(res @@ -145,28 +149,30 @@ pub fn create_resumable_upload( } } -pub fn upload_video( - f_path: &str, - r_url: &str, - config: &YoutubeConfig, +pub async fn now_kiss<'a>( + stream: impl Stream> + + std::marker::Send + + std::marker::Sync + + 'a + 'static, + r_url: &'a str, + config: &'a YoutubeConfig, ) -> Result<(), Box> { // Get access token - let access_token = refresh_token(config)?; + let access_token = refresh_token(config).await?; // Create client - let client = reqwest::blocking::Client::new(); - - let file = File::open(f_path)?; + let client = Client::new(); let res = client .put(r_url) .header("Authorization", format!("Bearer {}", access_token)) - .body(file) - .send()?; + .body(Body::wrap_stream(stream)) + .send() + .await?; if res.status().is_success() { Ok(()) } else { - Err(TootubeError::new(&format!("Cannot upload video: {:?}", res.text()?)).into()) + Err(TootubeError::new(&format!("Cannot upload video: {:?}", res.text().await?)).into()) } }