mirror of
https://framagit.org/veretcle/tootube.git
synced 2025-07-20 12:31:19 +02:00
feat: add progress bar to the transferring process
This commit is contained in:
15
src/lib.rs
15
src/lib.rs
@@ -1,5 +1,5 @@
|
|||||||
use bytes::Bytes;
|
use bytes::Bytes;
|
||||||
use futures_core::stream::Stream;
|
use futures_util::Stream;
|
||||||
use log::debug;
|
use log::debug;
|
||||||
use std::error::Error;
|
use std::error::Error;
|
||||||
|
|
||||||
@@ -18,8 +18,13 @@ use youtube::{add_video_to_playlists, create_resumable_upload, now_kiss};
|
|||||||
|
|
||||||
async fn get_dl_video_stream(
|
async fn get_dl_video_stream(
|
||||||
u: &str,
|
u: &str,
|
||||||
) -> Result<impl Stream<Item = Result<Bytes, reqwest::Error>>, Box<dyn Error>> {
|
) -> Result<(impl Stream<Item = Result<Bytes, reqwest::Error>>, u64), Box<dyn Error>> {
|
||||||
Ok(reqwest::get(u).await?.bytes_stream())
|
let res = reqwest::get(u).await?;
|
||||||
|
let content_lengh = res
|
||||||
|
.content_length()
|
||||||
|
.ok_or(format!("Cannot get content length from {}", u))?;
|
||||||
|
|
||||||
|
Ok((res.bytes_stream(), content_lengh))
|
||||||
}
|
}
|
||||||
|
|
||||||
#[tokio::main]
|
#[tokio::main]
|
||||||
@@ -34,7 +39,7 @@ pub async fn run(config: Config, pl: Vec<String>) {
|
|||||||
let dl_url = get_max_resolution_dl(latest_vid.streaming_playlists.as_ref().unwrap());
|
let dl_url = get_max_resolution_dl(latest_vid.streaming_playlists.as_ref().unwrap());
|
||||||
debug!("PT download URL: {}", &dl_url);
|
debug!("PT download URL: {}", &dl_url);
|
||||||
|
|
||||||
let pt_stream = get_dl_video_stream(&dl_url)
|
let (pt_stream, size) = get_dl_video_stream(&dl_url)
|
||||||
.await
|
.await
|
||||||
.unwrap_or_else(|e| panic!("Cannot download video at URL {}: {}", dl_url, e));
|
.unwrap_or_else(|e| panic!("Cannot download video at URL {}: {}", dl_url, e));
|
||||||
|
|
||||||
@@ -43,7 +48,7 @@ pub async fn run(config: Config, pl: Vec<String>) {
|
|||||||
.unwrap_or_else(|e| panic!("Cannot retrieve the upload’s resumable id: {e}"));
|
.unwrap_or_else(|e| panic!("Cannot retrieve the upload’s resumable id: {e}"));
|
||||||
debug!("YT upload URL: {}", &resumable_upload_url);
|
debug!("YT upload URL: {}", &resumable_upload_url);
|
||||||
|
|
||||||
let yt_video_id = now_kiss(pt_stream, &resumable_upload_url, &config.youtube)
|
let yt_video_id = now_kiss(size, pt_stream, &resumable_upload_url, &config.youtube)
|
||||||
.await
|
.await
|
||||||
.unwrap_or_else(|e| panic!("Cannot resume upload!: {e}"));
|
.unwrap_or_else(|e| panic!("Cannot resume upload!: {e}"));
|
||||||
debug!("YT video ID: {}", &yt_video_id);
|
debug!("YT video ID: {}", &yt_video_id);
|
||||||
|
@@ -1,10 +1,17 @@
|
|||||||
use crate::{config::YoutubeConfig, error::TootubeError, peertube::PeerTubeVideo};
|
use crate::{config::YoutubeConfig, error::TootubeError, peertube::PeerTubeVideo};
|
||||||
|
use async_stream::stream;
|
||||||
use bytes::Bytes;
|
use bytes::Bytes;
|
||||||
use futures_core::stream::Stream;
|
use futures_util::{Stream, StreamExt};
|
||||||
|
use indicatif::{ProgressBar, ProgressStyle};
|
||||||
use log::debug;
|
use log::debug;
|
||||||
use reqwest::{multipart::Form, Body, Client};
|
use reqwest::{multipart::Form, Body, Client};
|
||||||
use serde::{Deserialize, Serialize};
|
use serde::{Deserialize, Serialize};
|
||||||
use std::{error::Error, io::stdin};
|
use std::{
|
||||||
|
cmp::min,
|
||||||
|
error::Error,
|
||||||
|
io::stdin,
|
||||||
|
marker::{Send, Sync, Unpin},
|
||||||
|
};
|
||||||
use tokio::sync::OnceCell;
|
use tokio::sync::OnceCell;
|
||||||
|
|
||||||
static ACCESS_TOKEN: OnceCell<String> = OnceCell::const_new();
|
static ACCESS_TOKEN: OnceCell<String> = OnceCell::const_new();
|
||||||
@@ -341,22 +348,42 @@ pub async fn create_resumable_upload(
|
|||||||
|
|
||||||
/// This takes the PT stream for download, connects it to YT stream for upload
|
/// This takes the PT stream for download, connects it to YT stream for upload
|
||||||
pub async fn now_kiss<'a>(
|
pub async fn now_kiss<'a>(
|
||||||
stream: impl Stream<Item = Result<Bytes, reqwest::Error>>
|
size: u64,
|
||||||
+ std::marker::Send
|
mut stream: impl Stream<Item = Result<Bytes, reqwest::Error>> + Send + Sync + Unpin + 'a + 'static,
|
||||||
+ std::marker::Sync
|
|
||||||
+ 'a + 'static,
|
|
||||||
r_url: &'a str,
|
r_url: &'a str,
|
||||||
config: &'a YoutubeConfig,
|
config: &'a YoutubeConfig,
|
||||||
) -> Result<String, Box<dyn Error>> {
|
) -> Result<String, Box<dyn Error>> {
|
||||||
// Get access token
|
// Get access token
|
||||||
let access_token = refresh_token(config).await?;
|
let access_token = refresh_token(config).await?;
|
||||||
|
|
||||||
|
// Create the progress bar
|
||||||
|
let pb = ProgressBar::new(size);
|
||||||
|
pb.set_style(ProgressStyle::default_bar()
|
||||||
|
.template("{msg}\n{spinner:.green} [{elapsed_precise}] [{wide_bar:.cyan/blue}] {bytes}/{total_bytes} ({bytes_per_sec}, {eta})")?
|
||||||
|
.progress_chars("#>-"));
|
||||||
|
pb.set_message("Transferring…");
|
||||||
|
let mut transferring: u64 = 0;
|
||||||
|
|
||||||
|
let async_stream = stream! {
|
||||||
|
while let Some(chunk) = stream.next().await {
|
||||||
|
if let Ok(chunk) = &chunk {
|
||||||
|
let new = min(transferring + (chunk.len() as u64), size);
|
||||||
|
transferring = new;
|
||||||
|
pb.set_position(new);
|
||||||
|
if transferring >= size {
|
||||||
|
pb.finish();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
yield chunk;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
// Create client
|
// Create client
|
||||||
let client = Client::new();
|
let client = Client::new();
|
||||||
let res = client
|
let res = client
|
||||||
.put(r_url)
|
.put(r_url)
|
||||||
.header("Authorization", format!("Bearer {}", access_token))
|
.header("Authorization", format!("Bearer {}", access_token))
|
||||||
.body(Body::wrap_stream(stream))
|
.body(Body::wrap_stream(async_stream))
|
||||||
.send()
|
.send()
|
||||||
.await?;
|
.await?;
|
||||||
|
|
||||||
|
Reference in New Issue
Block a user