Files
BaC/content/2021-04-25-Rust-faire-joujou-avec-async.md
2025-02-27 12:52:48 +01:00

291 lines
16 KiB
Markdown
Raw Blame History

This file contains invisible Unicode characters

This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

+++
title = "Rust: faire joujou avec Async"
date = 2021-04-25
[taxonomies]
tags = [ "rust", "codaz" ]
+++
Quest-quon veut? *Maintenant!*
Et quand est-ce quon le veut? *De lasync!*
<!-- more -->
# Quest-ce que cest encore que ce machin?
Si tu fais un peu de Rust ces derniers temps, tu as du voir tout un tas de librairies se convertir à l`async` sans forcément comprendre pourquoi. Async/await, cest juste une manière de faire des choses de manières concurrentes. En gros, au lieu de chercher à exécuter le programme dans lordre, on va chercher à faire des actions un peu dans le désordre pour éviter de bloquer le thread principal.
Tu vas alors me dire: *mais on a déjà de la programmation parallèle pour ça non?* Et ça, je répondrais oui. Sauf quen fait, non. Dans des threads classiques, toutes les actions sont effectuées en **parallèle** (pense: plusieurs processeurs exécutant des actions complètement différentes). En **asynchrone**, lidée est dexécuter tout ça dans le même thread mais en profitant des temps morts à droite à gauche pour essayer de gagner du temps. Lexemple typique est celui de lécriture dans un fichier: cest une opération qui bloque le thread dexécution pendant longtemps pour rien : le processeur ne glande rien et attend juste le disque, donc autant lui faire exécuter dautres choses en attendant que ça passe et récupérer linformation sur lécriture du fichier un peu plus tard.
Bon, ça évidemment, cest la jolie théorie quon décrit un peu partout. Mais concrètement comment ça se passe?
# Mettre des choses dans Async/Await, bah en fait, cest pas si simple que ça…
Si je reprends lexemple donné sur le site de [tokio](https://tokio.rs/tokio/tutorial/hello-tokio) à part mettre des `await` partout, tu fais pas grand-chose dasynchrone là-dedans. Et en fait, cest vrai : basiquement, ça ne fait rien dasynchrone. En fait, `async/await` en Rust peut effectivement commencer à exécuter des trucs en asynchrone sans quon lui demande mais à un moment, on est bien obligé de récupérer le résultat.
Donc si ton programme fait juste des `await` toutes les 3 lignes, bah en fait, tu gagnes à peu près rien…
Ça commence bien…
# Allez, un exemple pour essayer
Comme tu peux éventuellement commencer à ten douter, jai passé un bon moment à me gratter la tête avec ses histoires dasync, sans vraiment comprendre lintérêt que ça avait (oui, on a déjà du parallèle si besoin donc bon…). Donc, voilà, je te propose un petit exemple qui permet de mettre en lumière certaines choses. Ça nest pas une réponse universelles à tout, ça nest peut-être même pas un bon exemple, mais disons que cest là que je suis arrivé après plusieurs heures de recherche et de tatônnement (jai aussi pu constater que les ressources disponibles sur la toile sur le sujet était soit obsolètes, soit pas vraiment claires).
## Bon admettons…
Donc, admettons quon ait 4 API en entrée (nous lappellerons *API entrante*) quon cherche à interroger. Ces 4 API vont renvoyer un nombre arbitraire de nombres et on va les envoyer vers une autre API (que nous appellerons *API sortante*) qui va déterminer si les nombres sont pairs ou impairs (oui, je sais cest complètement con, mais cest pour les besoins de lexercice). Pour bien se rendre compte du fonctionnement du truc, on va admettre que les API mettent énormément de temps à répondre (plusieurs secondes dans certains cas). Lidée est donc de voir si on peut optimiser et paralléliser.
Voici donc le code permettant de demandes des infos à lAPI entrante:
```rust
async fn poll() -> Vec<u32> {
let rand_waiter = rand::random::<u8>()/20; // on calcule un nombre aléatoire compris en 0 et 12
println!("poll: Waiting {}s for poll response…", rand_waiter);
sleep(Duration::from_secs(rand_waiter.into())).await; // on attend exactement ce nombre
// on génère autant de nombre sur 32 bits quon a attendu de secondes
let mut polling_res = vec![];
for _ in 0..rand_waiter {
let res = rand::random::<u32>();
polling_res.push(res);
}
println!("poll: Finished polling!");
polling_res
}
```
*Évidemment, nous utilisons ici les primitives `Duration` et `sleep` provenant de `tokio`.*
Notre API sortant va du coup ressembler à ça:
```rust
async fn treat(n: u32) -> bool {
let rand_waiter = rand::random::<u8>()/10; // on calcule un nombre aléatoire entre 25
println!("treat: Treating {}", n);
println!("treat: Waiting {}s for treatment response…", rand_waiter);
sleep(Duration::from_secs(rand_waiter.into())).await;
println!("treat: Finished treating {}", n);
n%2 == 0
}
```
## Approche naïve
Lapproche ultra de base consiste donc à mettre les 4 API entrantes dans un tableau, à les interroger une par une et ensuite pour chaque résultat reçu (tous les résultats arrivent en même temps en loccurence, on attend simplement que chaque API entrante réponde lensemble de ses résultats), on les balance dans lAPI sortante. On pourrait donc imaginer un code de ce type:
```rust
async fn main() {
let apis = vec!["A","B","C","D"];
for api in apis {
let numbers = poll().await;
for number in numbers {
let res = treat(number).await;
if res {
println!("main: {} was a good one, {}!", res, api);
} else {
println!("main: nah {} no good, better luck next time, {}", res, api);
}
}
}
}
```
Et quand on exécute, on constate effectivement que tout est parfaitement synchrone (et donc complètement con):
```
poll: Waiting 3s for poll response…
poll: Finished polling!
treat: Treating 4097894384
treat: Waiting 7s for treatment response…
treat: Finished treating 4097894384
main: true was a good one, A!
treat: Treating 4056821172
treat: Waiting 14s for treatment response…
treat: Finished treating 4056821172
main: true was a good one, A!
[…]
poll: Waiting 2s for poll response…
poll: Finished polling!
treat: Treating 4156474264
treat: Waiting 20s for treatment response…
treat: Finished treating 4156474264
main: true was a good one, D!
treat: Treating 1467559148
treat: Waiting 13s for treatment response…
treat: Finished treating 1467559148
main: true was a good one, D!
```
Ça asynchrone donc pas des masses :/. En fait, le souci vient du fait quon attend le résultat de chaque opération. Pour la première opération, on ne peut pas vraiment faire autrement (toutes les réponses arrivent en même temps de la partie de lAPI entrante). Par contre, pour le traitement, on pourrait imaginer de le faire en retirant le if derrière. Ça donnerait donc un truc de ce style:
```rust
for number in numbers {
treat(number);
}
```
Et là, tu te rends compte de la principale particularité de limplémentation d`async` dans Rust: si tu ne lui demandes rien, un `future` (donc une fonction `async` ici) ne sexécute pas. **En gros, si tu lattends pas, il fait rien ce con.**
## Bordel, mais comment on va résoudre ça???
Il va falloir mettre notre traitement dans une autre fonction qui va se charger de le faire avancer pendant quon fait autre chose. On ne peut pas le coller juste dans un block `async`, il faut le coller dans une fonction qui va le faire avancer (en gros). Donc on peut utiliser `tokio::spawn` dans la librairie `tokio` pour ce faire:
```rust
tokio::spawn(async move {
treat(number).await;
});
```
Là, ce nest plus gênant de déclencher lattente du résultat parce que finalement, le résultat est attendu dans une sorte de thread détaché du thread principal (mais pas vraiment un thread puisque cest pas du parallèle…). Et donc, là, ça fonctionne nettement mieux:
```
poll: Waiting 7s for poll response…
poll: Finished polling!
poll: Waiting 1s for poll response…
treat: Treating 1725139708
treat: Waiting 14s for treatment response…
treat: Treating 2961643129
treat: Waiting 22s for treatment response…
treat: Treating 3490534924
treat: Waiting 7s for treatment response…
treat: Treating 481316326
treat: Waiting 23s for treatment response…
treat: Treating 3335438155
treat: Waiting 9s for treatment response…
treat: Treating 2314014323
treat: Waiting 14s for treatment response…
treat: Treating 4156517114
treat: Waiting 0s for treatment response…
treat: Finished treating 4156517114
poll: Finished polling!
poll: Waiting 10s for poll response…
[…]
poll: Finished polling!
treat: Treating 279666034
treat: Waiting 22s for treatment response…
treat: Treating 945860350
treat: Waiting 23s for treatment response…
treat: Treating 1248492005
treat: Waiting 24s for treatment response…
```
Oups… On traite donc bien en parallèle, mais le souci cest quon attend pas vraiment que le traitement soit fini. **En gros, non seulement il faut le lancer dans un morceau de code à part, mais à un moment, il faut se poser la question de comment on attend que le morceau de code en question ait fini dexécuter.** Bon, en plus, en loccurence, on ne traite pas vraiment comme on le devrait. En fait, idéalement, il faudrait lancer la consultation de lAPI entrante en parallèle aussi (parce que là, ce nest pas fait), puis lancer le traitement de chaque résultat en parallèle, puis attendre que lensemble ait fini avant de sortir complètement.
# *craquement de doigts*
Une première approche consiste donc à lancer en parallèle lensemble des traitement initiaux sur lAPI entrante, de récupérer ses résultats et de les transmettre au thread principal pour quil puisse faire les traitements. De cette manière, ça ira un peu plus vite dans le sens où lon peut déjà mettre en attente notre volontairement-super-lente API entrante. Pour cela, il va falloir insérer un composant de communication entre ses différents thread. Dans les faits, ça marche exactement de la même manière que les threads classiques de Rust avec des transmetteurs, un récepteur et des messages (dans le cas présent, il existe bien sûr dautres types de messages et de mode de transmission). Si lon veut pouvoir préciser à quelle API on parle, il va aussi falloir créer une nouvelle structure qui va permettre de transmettre le nom de lAPI entrante en même temps que son résultat.
*Ce dernier point est juste cosmétique, on pourrait très bien faire sans, mais je trouve ça plus zoli avec…*
Voilà ce que ça va donner:
```rust
#[derive(Debug, Clone)]
struct ResAPI {
api: String,
id: u32,
}
#[tokio::main]
async fn main() {
let apis = vec!["A","B","C","D"];
let api_len = apis.len();
// on crée un canal de communication
let (tx, mut rx) = mpsc::channel(api_len);
for api in apis {
// quon clone pour chaque API entrante
let tx = tx.clone();
tokio::spawn(async move {
let vu32 = poll().await;
for v in vu32 {
let ra = ResAPI {
api: api.to_string(),
id: v,
};
// on envoie le résultat formé
tx.send(ra).await.unwrap();
}
});
}
// on détruit explicitement le canal de communication restant (celui qui na jamais été cloné
// en fait)
drop(tx);
while let Some(m) = rx.recv().await {
println!("main: Receiving {} from {}", m.id, &m.api);
let res = treat(m.id).await;
if res {
println!("main: {} was a good one, {}!", res, &m.api);
} else {
println!("main: nah {} no good, better luck next time, {}", res, &m.api);
}
}
}
```
Quand on exécute, ça fonctionne effectivement bien mieux. On constate bien que les 4 API entrantes sont consultées en même temps. On constate également que la première API qui a terminé va déclencher la suite des opérations. Mais… mais on a toujours un souci: on attend systématiquement le traitement de lAPI sortante et on perd du coup beaucoup de temps. Idéalement, il faudrait également rendre asynchrone le traitement des résultats.
# Où lon se rend compte que les grands principes de la science sont constants
Les mêmes causes produisant les mêmes effets, si lon ne fait que `spawn` des traitements de lAPI sortante, ça va effectivement paralléliser les traitements, mais on va en perdre au passage (tout traitement non terminé lorsque le thread principal meurt, mourra avec lui). On pourrait reprendre la même stratégie que précédemment mais dabord, ce ne serait pas drôle et ensuite, on est quand même obligé de déterminer la capacité dun canal de communication **avant** de louvrir. Dans notre cas, ça pourrait fonctionner puisquon connait à lavance le nombre maximum de résultalts, mais on pourrait imaginer des cas où lon en est pas capable (ou en tout cas, pas suffisamment précisément).
Donc, on va essayer une autre stratégie: lancer lensemble des threads de traitement dans un coin, stocker leur représentation dans un vecteur et simplement attendre jusquà ce que lensemble du vecteur ait terminé. Malheureusement `tokio` ne propose pas une telle fonction, il va donc falloir aller taper dans une autre librairie `futures` qui elle nous offre `join_all` permettant de faire exactement ce que lon veut. Voyons les modifs de code à partir de la boucle de réception:
```rust
// on crée un vecteur permettant de stocker tout ce petit monde
let mut tasks: Vec<_> = vec![];
while let Some(m) = rx.recv().await {
println!("main: Receiving {} from {}", m.id, &m.api);
// on lance des tâches comme précédemment
let task = tokio::spawn(async move {
println!("main: Beginning treatment for {} from {}", m.id, &m.api);
let t = treat(m.id).await;
if t {
println!("main: {}, that was a good one, {}!", m.id, &m.api);
} else {
println!("main: nah {}, better luck next time, {}!", m.id, &m.api);
}
});
// sauf quon met le résultat de cette tâche (le JoinHandle en fait) dans un tableau
tasks.push(task);
println!("main: Finished receiving");
}
println!("main: Back to main thread and were waiting for everyone now…");
// mainteant quon a tout reçu, on attend simplement que chaque thread se termine
join_all(tasks).await;
```
Et le tour est joué! Les traitement entrants sont bien asynchrones, les traitements sortants également, et le programme principal attend sagement que tout le monde ait terminé avant de lui-même séteindre bien gentiment.
# Conclusage
Ce nest certaiment pas le plus optimal et ce nest peut-être pas un modèle à suivre. Je suis certain par exemple quune fonction comme `join_all` doit poser des problèmes dans certains cas, ou ne peut pas accepter plus dun certain nombre de tâches, etc… Néanmoins, ça démontre bien un truc: faire de lasynchrone, cest bien joli, mais à moins davoir un cas assez spécifique ou de savoir ce que lon fait, ça ne servira clairement pas à tout le monde et clairement pas tout le temps.
Du coup, Messieurs les développeurs, essayer de proposer une alternative bloquante à vos librairies: ça mintéresse beaucoup davoir de lasynchrone (vraiment) mais les cas où je vais réellement en avoir besoin et réellement men servir, ce ne sera clairement pas tous les jours, surtout vu la complexité du truc.
Ah et dernière chose: quand je dis **bloquant**, cest vraiment **bloquant**. Oui, je te regarde `reqwest::blocking` qui en fait planque du `tokio` dedans…