From 5383c8d216a60057419bafe4df43e04b619e1a93 Mon Sep 17 00:00:00 2001 From: VC Date: Tue, 14 May 2024 17:29:17 +0200 Subject: [PATCH] =?UTF-8?q?=E2=99=BB=EF=B8=8F:=20refactor=20src/peertube.r?= =?UTF-8?q?s=20and=20src/youtube.rs=20code=20to=20be=20more=20efficient=20?= =?UTF-8?q?regarding=20reqwest=20management?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- Cargo.lock | 2 +- Cargo.toml | 2 +- src/lib.rs | 52 ++++--- src/peertube.rs | 258 ++++++++++++++++--------------- src/youtube.rs | 400 ++++++++++++++++++++++++------------------------ 5 files changed, 370 insertions(+), 344 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 3c90212..69b094d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1192,7 +1192,7 @@ dependencies = [ [[package]] name = "tootube" -version = "0.6.0" +version = "0.6.1" dependencies = [ "async-stream", "clap", diff --git a/Cargo.toml b/Cargo.toml index c8f3b92..1e3e6b2 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,7 +1,7 @@ [package] name = "tootube" authors = ["VC "] -version = "0.6.0" +version = "0.6.1" edition = "2021" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html diff --git a/src/lib.rs b/src/lib.rs index 5abf2c3..0ca34f6 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -8,24 +8,31 @@ use config::Config; mod peertube; pub use peertube::register as register_peertube; -use peertube::{delete_original_video_source, get_latest_video, get_original_video_source}; +use peertube::PeerTube; mod youtube; pub use youtube::register as register_youtube; -use youtube::{add_video_to_playlists, create_resumable_upload, now_kiss}; +use youtube::YouTube; #[tokio::main] pub async fn run(config: Config, pl: Vec) { + // Create PeerTube struct + let peertube = match &config.peertube.oauth2 { + Some(s) => PeerTube::new(&config.peertube.base_url) + .with_client(&s.client_id, &s.client_secret, &s.refresh_token) + .await + .unwrap_or_else(|e| panic!("Cannot instantiate PeerTube struct: {}", e)), + None => PeerTube::new(&config.peertube.base_url), + }; // Get the latest video object - let latest_vid = get_latest_video(&config.peertube.base_url) - .await - .unwrap_or_else(|e| { - panic!("Cannot retrieve the latest video, something must have gone terribly wrong: {e}") - }); + let latest_vid = peertube.get_latest_video().await.unwrap_or_else(|e| { + panic!("Cannot retrieve the latest video, something must have gone terribly wrong: {e}") + }); // We have a refresh_token, try to use it let source_url = match &config.peertube.oauth2 { - Some(_) => get_original_video_source(&latest_vid.uuid, &config.peertube) + Some(_) => peertube + .get_original_video_source(&latest_vid.uuid) .await .ok(), None => None, @@ -48,30 +55,37 @@ pub async fn run(config: Config, pl: Vec) { debug!("PT download URL: {}", &dl_url); - let resumable_upload_url = create_resumable_upload(&config.youtube, &latest_vid) + let youtube = YouTube::new( + &config.youtube.client_id, + &config.youtube.client_secret, + &config.youtube.refresh_token, + ) + .await + .unwrap_or_else(|e| panic!("Cannot instantiate YouTube struct: {}", e)); + + let resumable_upload_url = youtube + .create_resumable_upload(&latest_vid) .await .unwrap_or_else(|e| panic!("Cannot retrieve the upload’s resumable id: {e}")); debug!("YT upload URL: {}", &resumable_upload_url); - let yt_video_id = now_kiss( - &dl_url, - &resumable_upload_url, - &config.youtube, - &config.tootube, - ) - .await - .unwrap_or_else(|e| panic!("Cannot resume upload!: {e}")); + let yt_video_id = youtube + .now_kiss(&dl_url, &resumable_upload_url, &config.tootube) + .await + .unwrap_or_else(|e| panic!("Cannot resume upload!: {e}")); debug!("YT video ID: {}", &yt_video_id); if !pl.is_empty() { - add_video_to_playlists(&config.youtube, &yt_video_id, &pl) + youtube + .add_video_to_playlists(&yt_video_id, &pl) .await .unwrap_or_else(|e| panic!("Cannot add video to playlist(s): {e}")); } // delete the source video if requested (it won’t be used anymore) if config.peertube.delete_video_source_after_transfer { - delete_original_video_source(&latest_vid.uuid, &config.peertube) + peertube + .delete_original_video_source(&latest_vid.uuid) .await .unwrap_or_else(|e| panic!("Cannot delete source video: {e}")); diff --git a/src/peertube.rs b/src/peertube.rs index 858dba9..29b84be 100644 --- a/src/peertube.rs +++ b/src/peertube.rs @@ -1,6 +1,9 @@ use crate::{config::PeertubeConfig, error::TootubeError}; use log::debug; -use reqwest::Client; +use reqwest::{ + header::{HeaderMap, HeaderValue}, + Client, +}; use rpassword::prompt_password; use serde::{Deserialize, Serialize}; use std::{boxed::Box, cmp::Ordering, error::Error, io::stdin}; @@ -96,28 +99,6 @@ pub struct PeerTubeVideoStreamingPlaylistsFilesResolution { pub id: u16, } -/// This gets the last video uploaded to the PeerTube server -pub async fn get_latest_video(u: &str) -> Result> { - let body = reqwest::get(format!("{}/api/v1/videos?count=1&sort=-publishedAt", u)) - .await? - .json::() - .await?; - - let vid = get_video_detail(u, &body.data[0].uuid).await?; - - Ok(vid) -} - -/// This gets all the crispy details about one particular video -async fn get_video_detail(u: &str, v: &str) -> Result> { - let body = reqwest::get(format!("{}/api/v1/videos/{}", u, v)) - .await? - .json::() - .await?; - - Ok(body) -} - /// This function makes the registration process a little bit easier #[tokio::main] pub async fn register(config: &PeertubeConfig) -> Result<(), Box> { @@ -173,107 +154,140 @@ pub async fn register(config: &PeertubeConfig) -> Result<(), Box> { Ok(()) } -async fn get_refresh_token(config: &PeertubeConfig) -> Result> { - // unwrap is safe here, this function is only called when oauth2 is present - let oauth2_config = config.oauth2.as_ref().unwrap().clone(); - // retrieve the refresh_token from the file - let refresh_token = read_to_string(&oauth2_config.refresh_token).await?; - - debug!( - "Opened file {} to retrieve Token", - &oauth2_config.refresh_token - ); - - let params = PeerTubeUsersToken { - client_id: oauth2_config.client_id.clone(), - client_secret: oauth2_config.client_secret.clone(), - grant_type: "refresh_token".to_string(), - refresh_token: Some(refresh_token), - username: None, - password: None, - }; - - let client = Client::new(); - let oauth2_token = client - .post(format!("{}/api/v1/users/token", config.base_url)) - .form(¶ms) - .send() - .await? - .json::() - .await?; - - debug!("Retrieved access_token: {}", &oauth2_token.access_token); - - // write the new refresh token to the file - write(&oauth2_config.refresh_token, oauth2_token.refresh_token).await?; - - debug!( - "Written refresh_token to file {}", - &oauth2_config.refresh_token - ); - - Ok(oauth2_token.access_token) +#[derive(Debug)] +pub struct PeerTube { + base_url: String, + client: Client, } -pub async fn get_original_video_source( - uuid: &str, - config: &PeertubeConfig, -) -> Result> { - let access_token = get_refresh_token(config).await?; - - let client = Client::new(); - - let source_vid = client - .get(format!("{}/api/v1/videos/{}/source", config.base_url, uuid)) - .header("Authorization", format!("Bearer {}", &access_token)) - .send() - .await? - .json::() - .await?; - - debug!("Got the Source Vid URL: {}", &source_vid.file_download_url); - - let video_file_token = client - .post(format!("{}/api/v1/videos/{}/token", config.base_url, uuid)) - .header("Authorization", format!("Bearer {}", &access_token)) - .send() - .await? - .json::() - .await?; - - debug!("Got the File Token: {}", &video_file_token.files.token); - - Ok(format!( - "{}?videoFileToken={}", - source_vid.file_download_url, video_file_token.files.token - )) -} - -pub async fn delete_original_video_source( - uuid: &str, - config: &PeertubeConfig, -) -> Result<(), Box> { - let access_token = get_refresh_token(config).await?; - - let client = Client::new(); - - let res = client - .delete(format!( - "{}/api/v1/videos/{}/source/file", - config.base_url, uuid - )) - .header("Authorization", format!("Bearer {}", &access_token)) - .send() - .await?; - - if !res.status().is_success() { - return Err(TootubeError::new(&format!( - "Cannot delete source video file {}: {}", - uuid, - res.text().await? - )) - .into()); +impl PeerTube { + /// Create a new PeerTube struct with a basic embedded reqwest::Client + pub fn new(base_url: &str) -> Self { + PeerTube { + base_url: format!("{}/api/v1", base_url), + client: Client::new(), + } } - Ok(()) + /// Retrieve the refresh_token and access_token and update the embedded reqwest::Client to have + /// the default required header + pub async fn with_client( + mut self, + client_id: &str, + client_secret: &str, + refresh_token_path: &str, + ) -> Result> { + let refresh_token = read_to_string(refresh_token_path).await?; + + let params = PeerTubeUsersToken { + client_id: client_id.to_string(), + client_secret: client_secret.to_string(), + grant_type: "refresh_token".to_string(), + refresh_token: Some(refresh_token), + username: None, + password: None, + }; + + let req = self + .client + .post(&format!("{}/users/token", self.base_url)) + .form(¶ms) + .send() + .await? + .json::() + .await?; + + write(refresh_token_path, req.refresh_token).await?; + + let mut headers = HeaderMap::new(); + headers.insert( + "Authorization", + HeaderValue::from_str(&format!("Bearer {}", req.access_token))?, + ); + + self.client = reqwest::Client::builder() + .default_headers(headers) + .build()?; + + Ok(self) + } + + /// This gets the last video uploaded to the PeerTube server + pub async fn get_latest_video(&self) -> Result> { + let body = self + .client + .get(format!( + "{}/videos?count=1&sort=-publishedAt", + self.base_url + )) + .send() + .await? + .json::() + .await?; + + let vid = self.get_video_detail(&body.data[0].uuid).await?; + + Ok(vid) + } + + /// This gets all the crispy details about one particular video + async fn get_video_detail(&self, v: &str) -> Result> { + let body = self + .client + .get(format!("{}/videos/{}", self.base_url, v)) + .send() + .await? + .json::() + .await?; + + Ok(body) + } + + /// Get the original video source + pub async fn get_original_video_source(&self, uuid: &str) -> Result> { + let source_vid = self + .client + .get(format!("{}/videos/{}/source", self.base_url, uuid)) + .send() + .await? + .json::() + .await?; + + debug!("Got the Source Vid URL: {}", &source_vid.file_download_url); + + let video_file_token = self + .client + .post(format!("{}/videos/{}/token", self.base_url, uuid)) + .send() + .await? + .json::() + .await?; + + debug!("Got the File Token: {}", &video_file_token.files.token); + + Ok(format!( + "{}?videoFileToken={}", + source_vid.file_download_url, video_file_token.files.token + )) + } + + /// Delete the original video source + pub async fn delete_original_video_source(&self, uuid: &str) -> Result<(), Box> { + let res = self + .client + .delete(format!("{}/videos/{}/source/file", self.base_url, uuid)) + .send() + .await?; + + if !res.status().is_success() { + return Err(TootubeError::new(&format!( + "Cannot delete source video file {}: {}", + uuid, + res.text().await? + )) + .into()); + } + + Ok(()) + } } diff --git a/src/youtube.rs b/src/youtube.rs index 2b6194d..7d4ea84 100644 --- a/src/youtube.rs +++ b/src/youtube.rs @@ -7,12 +7,13 @@ use async_stream::stream; use futures_util::StreamExt; use indicatif::{ProgressBar, ProgressStyle}; use log::{debug, warn}; -use reqwest::{multipart::Form, Body, Client}; +use reqwest::{ + header::{HeaderMap, HeaderValue}, + multipart::Form, + Body, Client, +}; use serde::{Deserialize, Serialize}; use std::{cmp::min, error::Error, io::stdin}; -use tokio::sync::OnceCell; - -static ACCESS_TOKEN: OnceCell = OnceCell::const_new(); #[derive(Serialize, Debug)] struct RefreshTokenRequest { @@ -193,219 +194,216 @@ pub async fn register(config: &YoutubeConfig) -> Result<(), Box> { Ok(()) } -/// Ensures that Token has been refreshed and that it is unique -async fn refresh_token(config: &YoutubeConfig) -> Result { - ACCESS_TOKEN - .get_or_try_init(|| async { - let refresh_token = RefreshTokenRequest { - refresh_token: config.refresh_token.clone(), - client_id: config.client_id.clone(), - client_secret: config.client_secret.clone(), - ..Default::default() +pub struct YouTube { + client: Client, +} + +impl YouTube { + pub async fn new( + client_id: &str, + client_secret: &str, + refresh_token: &str, + ) -> Result> { + let mut youtube = YouTube { + client: Client::new(), + }; + + let refresh_token = RefreshTokenRequest { + refresh_token: refresh_token.to_string(), + client_id: client_id.to_string(), + client_secret: client_secret.to_string(), + ..Default::default() + }; + + let access_token = youtube + .client + .post("https://accounts.google.com/o/oauth2/token") + .json(&refresh_token) + .send() + .await? + .json::() + .await?; + + let mut headers = HeaderMap::new(); + headers.insert( + "Authorization", + HeaderValue::from_str(&format!("Bearer {}", access_token.access_token))?, + ); + + youtube.client = reqwest::Client::builder() + .default_headers(headers) + .build()?; + + Ok(youtube) + } + /// This function takes a list of playlists keyword and returns a list of playlist ID + async fn get_playlist_ids(&self, pl: &[String]) -> Result, Box> { + let mut page_token = String::new(); + let mut playlists: Vec = vec![]; + + while let Ok(local_pl) = self.client + .get(&format!( + "https://www.googleapis.com/youtube/v3/playlists?part=snippet&mine=true&pageToken={}", + page_token + )) + .send() + .await? + .json::() + .await + { + playlists.append( + &mut local_pl + .items + .iter() + .filter_map(|s| pl.contains(&s.snippet.title).then_some(s.id.clone())) + .collect(), + ); + // if nextPageToken is present, continue the loop + match local_pl.next_page_token { + None => break, + Some(a) => page_token.clone_from(&a), + } + } + + debug!("Playlists IDs: {:?}", &playlists); + Ok(playlists) + } + + /// This function adds the video id to the corresponding named playlist(s) + pub async fn add_video_to_playlists( + &self, + v: &str, + pl: &[String], + ) -> Result<(), Box> { + let playlists_ids = self.get_playlist_ids(pl).await?; + + for pl_id in playlists_ids { + let yt_pl_upload_params = YoutubePlaylistItemsParams { + snippet: YoutubePlaylistItemsParamsSnippet { + playlist_id: pl_id.clone(), + resource_id: YoutubePlaylistItemsParamsSnippetResourceId { + video_id: v.to_string(), + ..Default::default() + }, + ..Default::default() + }, }; - let client = Client::new(); - let res = client - .post("https://accounts.google.com/o/oauth2/token") - .json(&refresh_token) + let res = self + .client + .post("https://youtube.googleapis.com/youtube/v3/playlistItems?part=snippet") + .json(&yt_pl_upload_params) .send() .await?; - let access_token: AccessTokenResponse = res.json().await?; - - debug!("YT Access Token: {}", &access_token.access_token); - Ok(access_token.access_token) - }) - .await - .cloned() -} - -/// This function takes a list of playlists keyword and returns a list of playlist ID -async fn get_playlist_ids( - config: &YoutubeConfig, - pl: &[String], -) -> Result, Box> { - let mut page_token = String::new(); - let mut playlists: Vec = vec![]; - - let access_token = refresh_token(config).await?; - let client = Client::new(); - - while let Ok(local_pl) = client - .get(&format!( - "https://www.googleapis.com/youtube/v3/playlists?part=snippet&mine=true&pageToken={}", - page_token - )) - .header("Authorization", format!("Bearer {}", access_token)) - .send() - .await? - .json::() - .await - { - playlists.append( - &mut local_pl - .items - .iter() - .filter_map(|s| pl.contains(&s.snippet.title).then_some(s.id.clone())) - .collect(), - ); - - // if nextPageToken is present, continue the loop - match local_pl.next_page_token { - None => break, - Some(a) => page_token.clone_from(&a), + if !res.status().is_success() { + return Err(TootubeError::new(&format!( + "Something went wrong when trying to add the video to a playlist: {}", + res.text().await? + )) + .into()); + } } + + Ok(()) } - debug!("Playlists IDs: {:?}", &playlists); - Ok(playlists) -} + /// This function creates a resumable YT upload, putting all the parameters in + pub async fn create_resumable_upload( + &self, + vid: &PeerTubeVideo, + ) -> Result> { + if vid.name.chars().count() > 100 { + warn!( + "PT Video Title ({}) is too long, it will be truncated", + &vid.name + ); + } -/// This function adds the video id to the corresponding named playlist(s) -pub async fn add_video_to_playlists( - config: &YoutubeConfig, - v: &str, - pl: &[String], -) -> Result<(), Box> { - let access_token = refresh_token(config).await?; - let playlists_ids = get_playlist_ids(config, pl).await?; - - for pl_id in playlists_ids { - let yt_pl_upload_params = YoutubePlaylistItemsParams { - snippet: YoutubePlaylistItemsParamsSnippet { - playlist_id: pl_id.clone(), - resource_id: YoutubePlaylistItemsParamsSnippetResourceId { - video_id: v.to_string(), + let upload_params = YoutubeUploadParams { + snippet: { + YoutubeUploadParamsSnippet { + title: vid.name.chars().take(100).collect::(), + description: vid.description.clone(), + tags: vid.tags.clone(), ..Default::default() - }, - ..Default::default() + } + }, + status: { + YoutubeUploadParamsStatus { + ..Default::default() + } }, }; + debug!("YT upload params: {:?}", &upload_params); - let client = Client::new(); - let res = client - .post("https://youtube.googleapis.com/youtube/v3/playlistItems?part=snippet") - .header("Authorization", format!("Bearer {}", access_token)) - .json(&yt_pl_upload_params) + let res = self.client.post("https://www.googleapis.com/upload/youtube/v3/videos?uploadType=resumable&part=snippet%2Cstatus") + .json(&upload_params) + .send().await?; + + if res.status().is_success() { + Ok(res + .headers() + .get("location") + .ok_or("Cannot find suitable header")? + .to_str()? + .to_string()) + } else { + Err(TootubeError::new("Cannot create resumable upload!").into()) + } + } + /// This takes the PT stream for download, connects it to YT stream for upload + pub async fn now_kiss( + &self, + dl_url: &str, + r_url: &str, + pg_conf: &TootubeConfig, + ) -> Result> { + // Get the upstream bytes stream + let res = reqwest::get(dl_url).await?; + let content_lengh = res + .content_length() + .ok_or(format!("Cannot get content length from {}", dl_url))?; + let mut stream = res.bytes_stream(); + + // Create the progress bar + let pb = ProgressBar::new(content_lengh); + pb.set_style( + ProgressStyle::default_bar() + .template(&pg_conf.progress_bar)? + .progress_chars(&pg_conf.progress_chars), + ); + pb.set_message("Transferring…"); + let mut transferring: u64 = 0; + + // yields the stream chunk by chunk, updating the progress bar at the same time + let async_stream = stream! { + while let Some(chunk) = stream.next().await { + if let Ok(chunk) = &chunk { + let new = min(transferring + (chunk.len() as u64), content_lengh); + transferring = new; + pb.set_position(new); + if transferring >= content_lengh { + pb.finish(); + } + } + yield chunk; + } + }; + + // Create client + let res = self + .client + .put(r_url) + .body(Body::wrap_stream(async_stream)) .send() .await?; - if !res.status().is_success() { - return Err(TootubeError::new(&format!( - "Something went wrong when trying to add the video to a playlist: {}", - res.text().await? - )) - .into()); + if res.status().is_success() { + let yt_videos: YoutubeVideos = res.json().await?; + Ok(yt_videos.id) + } else { + Err(TootubeError::new(&format!("Cannot upload video: {:?}", res.text().await?)).into()) } } - - Ok(()) -} - -/// This function creates a resumable YT upload, putting all the parameters in -pub async fn create_resumable_upload( - config: &YoutubeConfig, - vid: &PeerTubeVideo, -) -> Result> { - let access_token = refresh_token(config).await?; - - if vid.name.chars().count() > 100 { - warn!( - "PT Video Title ({}) is too long, it will be truncated", - &vid.name - ); - } - - let upload_params = YoutubeUploadParams { - snippet: { - YoutubeUploadParamsSnippet { - title: vid.name.chars().take(100).collect::(), - description: vid.description.clone(), - tags: vid.tags.clone(), - ..Default::default() - } - }, - status: { - YoutubeUploadParamsStatus { - ..Default::default() - } - }, - }; - debug!("YT upload params: {:?}", &upload_params); - - let client = Client::new(); - let res = client.post("https://www.googleapis.com/upload/youtube/v3/videos?uploadType=resumable&part=snippet%2Cstatus") - .header("Authorization", format!("Bearer {}", access_token)) - .json(&upload_params) - .send().await?; - - if res.status().is_success() { - Ok(res - .headers() - .get("location") - .ok_or("Cannot find suitable header")? - .to_str()? - .to_string()) - } else { - Err(TootubeError::new("Cannot create resumable upload!").into()) - } -} - -/// This takes the PT stream for download, connects it to YT stream for upload -pub async fn now_kiss( - dl_url: &str, - r_url: &str, - config: &YoutubeConfig, - pg_conf: &TootubeConfig, -) -> Result> { - // Get access token - let access_token = refresh_token(config).await?; - - // Get the upstream bytes stream - let res = reqwest::get(dl_url).await?; - let content_lengh = res - .content_length() - .ok_or(format!("Cannot get content length from {}", dl_url))?; - let mut stream = res.bytes_stream(); - - // Create the progress bar - let pb = ProgressBar::new(content_lengh); - pb.set_style( - ProgressStyle::default_bar() - .template(&pg_conf.progress_bar)? - .progress_chars(&pg_conf.progress_chars), - ); - pb.set_message("Transferring…"); - let mut transferring: u64 = 0; - - // yields the stream chunk by chunk, updating the progress bar at the same time - let async_stream = stream! { - while let Some(chunk) = stream.next().await { - if let Ok(chunk) = &chunk { - let new = min(transferring + (chunk.len() as u64), content_lengh); - transferring = new; - pb.set_position(new); - if transferring >= content_lengh { - pb.finish(); - } - } - yield chunk; - } - }; - - // Create client - let client = Client::new(); - let res = client - .put(r_url) - .header("Authorization", format!("Bearer {}", access_token)) - .body(Body::wrap_stream(async_stream)) - .send() - .await?; - - if res.status().is_success() { - let yt_videos: YoutubeVideos = res.json().await?; - Ok(yt_videos.id) - } else { - Err(TootubeError::new(&format!("Cannot upload video: {:?}", res.text().await?)).into()) - } }