feat: async + stream

This commit is contained in:
VC
2023-10-06 16:21:11 +02:00
parent b810de5e57
commit 0451543d6d
5 changed files with 170 additions and 86 deletions

View File

@@ -1,8 +1,12 @@
use crate::{config::YoutubeConfig, error::TootubeError, peertube::PeerTubeVideo};
use bytes::Bytes;
use futures_core::stream::Stream;
use reqwest::{Body, Client};
use serde::{Deserialize, Serialize};
use std::{error::Error, fs::File, sync::Mutex};
use std::error::Error;
use tokio::sync::OnceCell;
static ACCESS_TOKEN: Mutex<String> = Mutex::new(String::new());
static ACCESS_TOKEN: OnceCell<String> = OnceCell::const_new();
#[derive(Serialize, Debug)]
struct RefreshTokenRequest {
@@ -79,9 +83,9 @@ impl Default for YoutubeUploadParamsStatus {
}
/// Ensures that Token has been refreshed and that it is unique
fn refresh_token(config: &YoutubeConfig) -> Result<String, Box<dyn Error>> {
if let Ok(mut unlocked_access_token) = ACCESS_TOKEN.lock() {
if unlocked_access_token.is_empty() {
async fn refresh_token(config: &YoutubeConfig) -> Result<String, reqwest::Error> {
ACCESS_TOKEN
.get_or_try_init(|| async {
let refresh_token = RefreshTokenRequest {
refresh_token: config.refresh_token.clone(),
client_id: config.client_id.clone(),
@@ -89,26 +93,26 @@ fn refresh_token(config: &YoutubeConfig) -> Result<String, Box<dyn Error>> {
..Default::default()
};
let client = reqwest::blocking::Client::new();
let client = Client::new();
let res = client
.post("https://accounts.google.com/o/oauth2/token")
.json(&refresh_token)
.send()?;
.send()
.await?;
let access_token: AccessTokenResponse = res.json()?;
let access_token: AccessTokenResponse = res.json().await?;
*unlocked_access_token = access_token.access_token.clone();
}
}
Ok(ACCESS_TOKEN.lock().unwrap().to_string())
Ok(access_token.access_token)
})
.await
.cloned()
}
pub fn create_resumable_upload(
pub async fn create_resumable_upload(
config: &YoutubeConfig,
vid: &PeerTubeVideo,
) -> Result<String, Box<dyn Error>> {
let access_token = refresh_token(config)?;
let access_token = refresh_token(config).await?;
let upload_params = YoutubeUploadParams {
snippet: {
@@ -126,12 +130,12 @@ pub fn create_resumable_upload(
},
};
let client = reqwest::blocking::Client::new();
let client = Client::new();
let res = client.post("https://www.googleapis.com/upload/youtube/v3/videos?uploadType=resumable&part=snippet%2Cstatus")
.header("Authorization", format!("Bearer {}", access_token))
.json(&upload_params)
.send()?;
.send().await?;
if res.status().is_success() {
Ok(res
@@ -145,28 +149,30 @@ pub fn create_resumable_upload(
}
}
pub fn upload_video(
f_path: &str,
r_url: &str,
config: &YoutubeConfig,
pub async fn now_kiss<'a>(
stream: impl Stream<Item = Result<Bytes, reqwest::Error>>
+ std::marker::Send
+ std::marker::Sync
+ 'a + 'static,
r_url: &'a str,
config: &'a YoutubeConfig,
) -> Result<(), Box<dyn Error>> {
// Get access token
let access_token = refresh_token(config)?;
let access_token = refresh_token(config).await?;
// Create client
let client = reqwest::blocking::Client::new();
let file = File::open(f_path)?;
let client = Client::new();
let res = client
.put(r_url)
.header("Authorization", format!("Bearer {}", access_token))
.body(file)
.send()?;
.body(Body::wrap_stream(stream))
.send()
.await?;
if res.status().is_success() {
Ok(())
} else {
Err(TootubeError::new(&format!("Cannot upload video: {:?}", res.text()?)).into())
Err(TootubeError::new(&format!("Cannot upload video: {:?}", res.text().await?)).into())
}
}