From 1d4ae8f1525e63eb2aa26f820bc0793a1355f5e9 Mon Sep 17 00:00:00 2001 From: VC Date: Tue, 17 Oct 2023 09:26:07 +0200 Subject: [PATCH] feat: add progress bar to the transferring process --- src/lib.rs | 15 ++++++++++----- src/youtube.rs | 41 ++++++++++++++++++++++++++++++++++------- 2 files changed, 44 insertions(+), 12 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index 3f7ee6b..8074de0 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,5 +1,5 @@ use bytes::Bytes; -use futures_core::stream::Stream; +use futures_util::Stream; use log::debug; use std::error::Error; @@ -18,8 +18,13 @@ use youtube::{add_video_to_playlists, create_resumable_upload, now_kiss}; async fn get_dl_video_stream( u: &str, -) -> Result>, Box> { - Ok(reqwest::get(u).await?.bytes_stream()) +) -> Result<(impl Stream>, u64), Box> { + let res = reqwest::get(u).await?; + let content_lengh = res + .content_length() + .ok_or(format!("Cannot get content length from {}", u))?; + + Ok((res.bytes_stream(), content_lengh)) } #[tokio::main] @@ -34,7 +39,7 @@ pub async fn run(config: Config, pl: Vec) { let dl_url = get_max_resolution_dl(latest_vid.streaming_playlists.as_ref().unwrap()); debug!("PT download URL: {}", &dl_url); - let pt_stream = get_dl_video_stream(&dl_url) + let (pt_stream, size) = get_dl_video_stream(&dl_url) .await .unwrap_or_else(|e| panic!("Cannot download video at URL {}: {}", dl_url, e)); @@ -43,7 +48,7 @@ pub async fn run(config: Config, pl: Vec) { .unwrap_or_else(|e| panic!("Cannot retrieve the upload’s resumable id: {e}")); debug!("YT upload URL: {}", &resumable_upload_url); - let yt_video_id = now_kiss(pt_stream, &resumable_upload_url, &config.youtube) + let yt_video_id = now_kiss(size, pt_stream, &resumable_upload_url, &config.youtube) .await .unwrap_or_else(|e| panic!("Cannot resume upload!: {e}")); debug!("YT video ID: {}", &yt_video_id); diff --git a/src/youtube.rs b/src/youtube.rs index a36493c..01ad123 100644 --- a/src/youtube.rs +++ b/src/youtube.rs @@ -1,10 +1,17 @@ use crate::{config::YoutubeConfig, error::TootubeError, peertube::PeerTubeVideo}; +use async_stream::stream; use bytes::Bytes; -use futures_core::stream::Stream; +use futures_util::{Stream, StreamExt}; +use indicatif::{ProgressBar, ProgressStyle}; use log::debug; use reqwest::{multipart::Form, Body, Client}; use serde::{Deserialize, Serialize}; -use std::{error::Error, io::stdin}; +use std::{ + cmp::min, + error::Error, + io::stdin, + marker::{Send, Sync, Unpin}, +}; use tokio::sync::OnceCell; static ACCESS_TOKEN: OnceCell = OnceCell::const_new(); @@ -341,22 +348,42 @@ pub async fn create_resumable_upload( /// This takes the PT stream for download, connects it to YT stream for upload pub async fn now_kiss<'a>( - stream: impl Stream> - + std::marker::Send - + std::marker::Sync - + 'a + 'static, + size: u64, + mut stream: impl Stream> + Send + Sync + Unpin + 'a + 'static, r_url: &'a str, config: &'a YoutubeConfig, ) -> Result> { // Get access token let access_token = refresh_token(config).await?; + // Create the progress bar + let pb = ProgressBar::new(size); + pb.set_style(ProgressStyle::default_bar() + .template("{msg}\n{spinner:.green} [{elapsed_precise}] [{wide_bar:.cyan/blue}] {bytes}/{total_bytes} ({bytes_per_sec}, {eta})")? + .progress_chars("#>-")); + pb.set_message("Transferring…"); + let mut transferring: u64 = 0; + + let async_stream = stream! { + while let Some(chunk) = stream.next().await { + if let Ok(chunk) = &chunk { + let new = min(transferring + (chunk.len() as u64), size); + transferring = new; + pb.set_position(new); + if transferring >= size { + pb.finish(); + } + } + yield chunk; + } + }; + // Create client let client = Client::new(); let res = client .put(r_url) .header("Authorization", format!("Bearer {}", access_token)) - .body(Body::wrap_stream(stream)) + .body(Body::wrap_stream(async_stream)) .send() .await?;