use crate::{ config::{TootubeConfig, YoutubeConfig}, error::TootubeError, peertube::PeerTubeVideo, }; use async_stream::stream; use futures_util::StreamExt; use indicatif::{ProgressBar, ProgressStyle}; use log::{debug, warn}; use reqwest::{multipart::Form, Body, Client}; use serde::{Deserialize, Serialize}; use std::{cmp::min, error::Error, io::stdin}; use tokio::sync::OnceCell; static ACCESS_TOKEN: OnceCell = OnceCell::const_new(); #[derive(Serialize, Debug)] struct RefreshTokenRequest { refresh_token: String, client_id: String, client_secret: String, redirect_uri: String, grant_type: String, } impl Default for RefreshTokenRequest { fn default() -> Self { RefreshTokenRequest { refresh_token: "".to_string(), client_id: "".to_string(), client_secret: "".to_string(), redirect_uri: "urn:ietf:wg:oauth:2.0:oob".to_string(), grant_type: "refresh_token".to_string(), } } } #[derive(Deserialize, Debug)] struct AccessTokenResponse { access_token: String, } #[derive(Deserialize, Debug)] struct RegistrationAccessTokenResponse { refresh_token: String, } #[derive(Serialize, Debug)] struct YoutubeUploadParams { snippet: YoutubeUploadParamsSnippet, status: YoutubeUploadParamsStatus, } #[derive(Serialize, Debug)] struct YoutubeUploadParamsSnippet { title: String, description: String, #[serde(rename = "categoryId")] category_id: String, #[serde(rename = "defaultAudioLanguage")] default_audio_language: String, tags: Option>, } impl Default for YoutubeUploadParamsSnippet { fn default() -> Self { YoutubeUploadParamsSnippet { title: "".to_string(), description: "".to_string(), category_id: "20".to_string(), default_audio_language: "fr".to_string(), tags: None, } } } #[derive(Serialize, Debug)] struct YoutubeUploadParamsStatus { #[serde(rename = "selfDeclaredMadeForKids")] self_declared_made_for_kids: bool, #[serde(rename = "privacyStatus")] privacy_status: String, license: String, } impl Default for YoutubeUploadParamsStatus { fn default() -> Self { YoutubeUploadParamsStatus { self_declared_made_for_kids: false, privacy_status: "public".to_string(), license: "creativeCommon".to_string(), } } } #[derive(Serialize, Debug)] struct YoutubePlaylistItemsParams { snippet: YoutubePlaylistItemsParamsSnippet, } #[derive(Serialize, Debug)] struct YoutubePlaylistItemsParamsSnippet { #[serde(rename = "playlistId")] playlist_id: String, position: u16, #[serde(rename = "resourceId")] resource_id: YoutubePlaylistItemsParamsSnippetResourceId, } impl Default for YoutubePlaylistItemsParamsSnippet { fn default() -> Self { YoutubePlaylistItemsParamsSnippet { playlist_id: "".to_string(), position: 0, resource_id: YoutubePlaylistItemsParamsSnippetResourceId { ..Default::default() }, } } } #[derive(Serialize, Debug)] struct YoutubePlaylistItemsParamsSnippetResourceId { kind: String, #[serde(rename = "videoId")] video_id: String, } impl Default for YoutubePlaylistItemsParamsSnippetResourceId { fn default() -> Self { YoutubePlaylistItemsParamsSnippetResourceId { kind: "youtube#video".to_string(), video_id: "".to_string(), } } } #[derive(Deserialize, Debug)] struct YoutubeVideos { id: String, } #[derive(Deserialize, Debug)] struct YoutubePlaylistListResponse { #[serde(rename = "nextPageToken")] next_page_token: Option, items: Vec, } #[derive(Deserialize, Debug)] struct YoutubePlaylistListResponseItem { id: String, snippet: YoutubePlaylistListResponseItemSnippet, } #[derive(Deserialize, Debug)] struct YoutubePlaylistListResponseItemSnippet { title: String, } /// This function makes the registration process a little bit easier #[tokio::main] pub async fn register(config: &YoutubeConfig) -> Result<(), Box> { println!("Click on the link below to authorize {} to upload to YouTube and deal with your playlists:", env!("CARGO_PKG_NAME")); println!("https://accounts.google.com/o/oauth2/v2/auth?client_id={}&redirect_uri=urn:ietf:wg:oauth:2.0:oob&scope=https://www.googleapis.com/auth/youtube%20https://www.googleapis.com/auth/youtube.upload&response_type=code", config.client_id); println!("Paste the returned authorization code:"); let mut input = String::new(); stdin() .read_line(&mut input) .expect("Unable to read back the authorization code!"); let form = Form::new() .text("code", input) .text("client_id", config.client_id.clone()) .text("client_secret", config.client_secret.clone()) .text("redirect_uri", "urn:ietf:wg:oauth:2.0:oob") .text("grant_type", "authorization_code"); let client = Client::new(); let res = client .post("https://accounts.google.com/o/oauth2/token") .multipart(form) .send() .await?; let refresh_token: RegistrationAccessTokenResponse = res.json().await?; println!("You can now paste the following line inside the `youtube` section of your tootube.toml file:"); println!("refresh_token=\"{}\"", refresh_token.refresh_token); Ok(()) } /// Ensures that Token has been refreshed and that it is unique async fn refresh_token(config: &YoutubeConfig) -> Result { ACCESS_TOKEN .get_or_try_init(|| async { let refresh_token = RefreshTokenRequest { refresh_token: config.refresh_token.clone(), client_id: config.client_id.clone(), client_secret: config.client_secret.clone(), ..Default::default() }; let client = Client::new(); let res = client .post("https://accounts.google.com/o/oauth2/token") .json(&refresh_token) .send() .await?; let access_token: AccessTokenResponse = res.json().await?; debug!("YT Access Token: {}", &access_token.access_token); Ok(access_token.access_token) }) .await .cloned() } /// This function takes a list of playlists keyword and returns a list of playlist ID async fn get_playlist_ids( config: &YoutubeConfig, pl: &[String], ) -> Result, Box> { let mut page_token = String::new(); let mut playlists: Vec = vec![]; let access_token = refresh_token(config).await?; let client = Client::new(); while let Ok(local_pl) = client .get(&format!( "https://www.googleapis.com/youtube/v3/playlists?part=snippet&mine=true&pageToken={}", page_token )) .header("Authorization", format!("Bearer {}", access_token)) .send() .await? .json::() .await { playlists.append( &mut local_pl .items .iter() .filter_map(|s| pl.contains(&s.snippet.title).then_some(s.id.clone())) .collect(), ); // if nextPageToken is present, continue the loop match local_pl.next_page_token { None => break, Some(a) => page_token.clone_from(&a), } } debug!("Playlists IDs: {:?}", &playlists); Ok(playlists) } /// This function adds the video id to the corresponding named playlist(s) pub async fn add_video_to_playlists( config: &YoutubeConfig, v: &str, pl: &[String], ) -> Result<(), Box> { let access_token = refresh_token(config).await?; let playlists_ids = get_playlist_ids(config, pl).await?; for pl_id in playlists_ids { let yt_pl_upload_params = YoutubePlaylistItemsParams { snippet: YoutubePlaylistItemsParamsSnippet { playlist_id: pl_id.clone(), resource_id: YoutubePlaylistItemsParamsSnippetResourceId { video_id: v.to_string(), ..Default::default() }, ..Default::default() }, }; let client = Client::new(); let res = client .post("https://youtube.googleapis.com/youtube/v3/playlistItems?part=snippet") .header("Authorization", format!("Bearer {}", access_token)) .json(&yt_pl_upload_params) .send() .await?; if !res.status().is_success() { return Err(TootubeError::new(&format!( "Something went wrong when trying to add the video to a playlist: {}", res.text().await? )) .into()); } } Ok(()) } /// This function creates a resumable YT upload, putting all the parameters in pub async fn create_resumable_upload( config: &YoutubeConfig, vid: &PeerTubeVideo, ) -> Result> { let access_token = refresh_token(config).await?; if vid.name.chars().count() > 100 { warn!( "PT Video Title ({}) is too long, it will be truncated", &vid.name ); } let upload_params = YoutubeUploadParams { snippet: { YoutubeUploadParamsSnippet { title: vid.name.chars().take(100).collect::(), description: vid.description.clone(), tags: vid.tags.clone(), ..Default::default() } }, status: { YoutubeUploadParamsStatus { ..Default::default() } }, }; debug!("YT upload params: {:?}", &upload_params); let client = Client::new(); let res = client.post("https://www.googleapis.com/upload/youtube/v3/videos?uploadType=resumable&part=snippet%2Cstatus") .header("Authorization", format!("Bearer {}", access_token)) .json(&upload_params) .send().await?; if res.status().is_success() { Ok(res .headers() .get("location") .ok_or("Cannot find suitable header")? .to_str()? .to_string()) } else { Err(TootubeError::new("Cannot create resumable upload!").into()) } } /// This takes the PT stream for download, connects it to YT stream for upload pub async fn now_kiss( dl_url: &str, r_url: &str, config: &YoutubeConfig, pg_conf: &TootubeConfig, ) -> Result> { // Get access token let access_token = refresh_token(config).await?; // Get the upstream bytes stream let res = reqwest::get(dl_url).await?; let content_lengh = res .content_length() .ok_or(format!("Cannot get content length from {}", dl_url))?; let mut stream = res.bytes_stream(); // Create the progress bar let pb = ProgressBar::new(content_lengh); pb.set_style( ProgressStyle::default_bar() .template(&pg_conf.progress_bar)? .progress_chars(&pg_conf.progress_chars), ); pb.set_message("Transferring…"); let mut transferring: u64 = 0; // yields the stream chunk by chunk, updating the progress bar at the same time let async_stream = stream! { while let Some(chunk) = stream.next().await { if let Ok(chunk) = &chunk { let new = min(transferring + (chunk.len() as u64), content_lengh); transferring = new; pb.set_position(new); if transferring >= content_lengh { pb.finish(); } } yield chunk; } }; // Create client let client = Client::new(); let res = client .put(r_url) .header("Authorization", format!("Bearer {}", access_token)) .body(Body::wrap_stream(async_stream)) .send() .await?; if res.status().is_success() { let yt_videos: YoutubeVideos = res.json().await?; Ok(yt_videos.id) } else { Err(TootubeError::new(&format!("Cannot upload video: {:?}", res.text().await?)).into()) } }