diff --git a/Cargo.lock b/Cargo.lock index 6494c04..6dd956e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1156,10 +1156,9 @@ dependencies = [ [[package]] name = "tootube" -version = "0.5.0" +version = "0.5.1" dependencies = [ "async-stream", - "bytes", "clap", "env_logger", "futures-util", diff --git a/Cargo.toml b/Cargo.toml index fd07e07..b021514 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,7 +1,7 @@ [package] name = "tootube" authors = ["VC "] -version = "0.5.0" +version = "0.5.1" edition = "2021" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html @@ -14,7 +14,6 @@ serde = { version = "^1", features = ["derive"] } toml = "^0.5" log = "^0.4" env_logger = "^0.10" -bytes = "^1.5" indicatif = "^0.17" async-stream = "^0.3" futures-util = "^0.3" diff --git a/src/lib.rs b/src/lib.rs index 8074de0..2175604 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,7 +1,4 @@ -use bytes::Bytes; -use futures_util::Stream; use log::debug; -use std::error::Error; mod error; @@ -16,17 +13,6 @@ mod youtube; pub use youtube::register; use youtube::{add_video_to_playlists, create_resumable_upload, now_kiss}; -async fn get_dl_video_stream( - u: &str, -) -> Result<(impl Stream>, u64), Box> { - let res = reqwest::get(u).await?; - let content_lengh = res - .content_length() - .ok_or(format!("Cannot get content length from {}", u))?; - - Ok((res.bytes_stream(), content_lengh)) -} - #[tokio::main] pub async fn run(config: Config, pl: Vec) { // Get the latest video object @@ -39,16 +25,12 @@ pub async fn run(config: Config, pl: Vec) { let dl_url = get_max_resolution_dl(latest_vid.streaming_playlists.as_ref().unwrap()); debug!("PT download URL: {}", &dl_url); - let (pt_stream, size) = get_dl_video_stream(&dl_url) - .await - .unwrap_or_else(|e| panic!("Cannot download video at URL {}: {}", dl_url, e)); - let resumable_upload_url = create_resumable_upload(&config.youtube, &latest_vid) .await .unwrap_or_else(|e| panic!("Cannot retrieve the upload’s resumable id: {e}")); debug!("YT upload URL: {}", &resumable_upload_url); - let yt_video_id = now_kiss(size, pt_stream, &resumable_upload_url, &config.youtube) + let yt_video_id = now_kiss(&dl_url, &resumable_upload_url, &config.youtube) .await .unwrap_or_else(|e| panic!("Cannot resume upload!: {e}")); debug!("YT video ID: {}", &yt_video_id); diff --git a/src/youtube.rs b/src/youtube.rs index 01ad123..126b51f 100644 --- a/src/youtube.rs +++ b/src/youtube.rs @@ -1,17 +1,11 @@ use crate::{config::YoutubeConfig, error::TootubeError, peertube::PeerTubeVideo}; use async_stream::stream; -use bytes::Bytes; -use futures_util::{Stream, StreamExt}; +use futures_util::StreamExt; use indicatif::{ProgressBar, ProgressStyle}; use log::debug; use reqwest::{multipart::Form, Body, Client}; use serde::{Deserialize, Serialize}; -use std::{ - cmp::min, - error::Error, - io::stdin, - marker::{Send, Sync, Unpin}, -}; +use std::{cmp::min, error::Error, io::stdin}; use tokio::sync::OnceCell; static ACCESS_TOKEN: OnceCell = OnceCell::const_new(); @@ -347,30 +341,37 @@ pub async fn create_resumable_upload( } /// This takes the PT stream for download, connects it to YT stream for upload -pub async fn now_kiss<'a>( - size: u64, - mut stream: impl Stream> + Send + Sync + Unpin + 'a + 'static, - r_url: &'a str, - config: &'a YoutubeConfig, +pub async fn now_kiss( + dl_url: &str, + r_url: &str, + config: &YoutubeConfig, ) -> Result> { // Get access token let access_token = refresh_token(config).await?; + // Get the upstream bytes stream + let res = reqwest::get(dl_url).await?; + let content_lengh = res + .content_length() + .ok_or(format!("Cannot get content length from {}", dl_url))?; + let mut stream = res.bytes_stream(); + // Create the progress bar - let pb = ProgressBar::new(size); + let pb = ProgressBar::new(content_lengh); pb.set_style(ProgressStyle::default_bar() .template("{msg}\n{spinner:.green} [{elapsed_precise}] [{wide_bar:.cyan/blue}] {bytes}/{total_bytes} ({bytes_per_sec}, {eta})")? .progress_chars("#>-")); pb.set_message("Transferring…"); let mut transferring: u64 = 0; + // yields the stream chunk by chunk, updating the progress bar at the same time let async_stream = stream! { while let Some(chunk) = stream.next().await { if let Ok(chunk) = &chunk { - let new = min(transferring + (chunk.len() as u64), size); + let new = min(transferring + (chunk.len() as u64), content_lengh); transferring = new; pb.set_position(new); - if transferring >= size { + if transferring >= content_lengh { pb.finish(); } }