Futures et Streams
Dans ce TD, vous allez construire un serveur de messagerie (pensez à IRC par exemple) en utilisant les futures et les streams. On commencera par coder un serveur qui renvoie tous les messages à son émetteur, puis un serveur qui diffuse tous les messages à tous les clients.
Contrairement aux TDs précédents, le but ici n'est pas de paralléliser le code en utilisant plusieurs cœurs du processeur mais d'exécuteur plusieurs tâches lentes à cause de l'interaction avec les entrées/sorties sur le même cœur.
Bibliothèques utilisées
Pour construire le serveur, nous nous appuierons sur l'ensemble de bibliothèques tokio qui permet d'exposer l'interface réseau sous forme de futures et de streams. En particulier, nous utiliserons les bibliothèques tokio-core pour accéder aux fonctionnalités de base, tokio-io pour accéder aux flux de bytes transitant sur le réseau et tokio-codec pour décoder les messages envoyés par le client. Le décodage des messages entrants se fera également à l'aide de la librairie bytes qui permet de représenter des buffers de bytes.
Enfin, les futures et les streams fournis par Tokio se basent sur les traits définis dans la bibliothèque futures. En particulier, nous vous conseillons vivement de regarder la documentation des traits Future, Stream et Sink pour voir comment vous pourrez manipuler les futures et les streams.
Vous devrez ajouter les dépendances ci-dessous au fichier Cargo.toml
.
[dependencies]
bytes = "0.4"
tokio-core = "0.1"
tokio-io = "0.1"
tokio-codec = "0.1"
futures = "0.1"
Par ailleurs, le TD vous demande d'écrire deux programmes différents : un client et un
serveur. Vous pouvez le faire en créant deux fichiers src/bin/client.rs
et
src/bin/server.rs
qui seront lancés avec cargo run --bin client
et cargo run --bin server
. Si vous souhaitez partager du code entre les deux programmes, vous pouvez le
placer dans src/lib.rs
puis importer le module avec extern crate <votre_crate>;
(<votre_crate>
est le nom indiqué dans le Cargo.toml
dans la section package
, par
exemple extern crate td2;
).
Gestion des erreurs et décodage des messages
Afin de vous aider à construire le serveur, on vous donne le type ci-dessous pour
représenter les erreurs. Ce type peut être construit automatiquement à partir des objets
des types std::io::Error
et mpsc::SendError
. Pour transformer une future ou un stream
ayant un de ces deux types d'erreurs en une future ou un stream sur notre type d'erreur,
il suffit d'appeler la méthode .from_err::<Error>()
. De la même manière, on peut appeler
sink_from_err::<Error>()
sur un sink. On peux aussi afficher l'erreur en la passant en
argument des macros de formatage des chaines de caractères (par exemple
println).
# #![allow(unused_variables)] #fn main() { extern crate bytes; extern crate futures; extern crate tokio_codec; extern crate tokio_core; extern crate tokio_io; use futures::unsync::mpsc; enum Error { /// Error produced by an IO opperation. IoError(std::io::Error), /// Error produced by an MPSC channel. SendError, /// The client sent an invalid frame. InvalidFrame, } impl From<std::io::Error> for Error { fn from(e: std::io::Error) -> Error { Error::IoError(e) } } impl<T> From<mpsc::SendError<T>> for Error { fn from(_: mpsc::SendError<T>) -> Error { Error::SendError } } impl std::fmt::Display for Error { fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { match *self { Error::IoError(ref e) => e.fmt(f), Error::SendError => write!(f, "mpsc send failed"), Error::InvalidFrame => write!(f, "invalid frame"), } } } #}
On donne aussi le type Codec
pour décrire comment interpréter les bytes arrivant sur
l'interface réseau et comment renvoyer les messages dans l'autre sens. La méthode
decode
découpe le flux d'entrée à chaque retour à la ligne et renvoie une String
par
ligne. La méthode encode
écrit les String
qu'on lui donne en entrée séparées par des
retours à la ligne.
On remarquera que encode
prend en réalité un compteur de référence vers une String
.
Cela évite de copier la chaine de caractères si elle est envoyée vers plusieurs clients.
Pour plus d'informations, allez voir la documentation de Rc
.
# #![allow(unused_variables)] #fn main() { use bytes::BytesMut; use std::rc::Rc; use tokio_codec::{Decoder, Encoder}; struct Codec; impl Decoder for Codec { type Item = String; type Error = Error; fn decode(&mut self, buf: &mut BytesMut) -> Result<Option<Self::Item>, Error> { if let Some(i) = buf.iter().position(|&b| b == b'\n') { // Remove the serialized frame from the buffer. let line = buf.split_to(i); // Also remove the '\n' buf.split_to(1); // Turn this data into a UTF string and return it in a Frame. match std::str::from_utf8(&line) { Ok(s) => Ok(Some(s.to_string())), Err(_) => Err(Error::InvalidFrame), } } else { Ok(None) } } } impl Encoder for Codec { type Item = Rc<String>; type Error = Error; fn encode(&mut self, msg: Rc<String>, buf: &mut BytesMut) -> Result<(), Error> { buf.extend(msg.as_bytes()); buf.extend(b"\n"); Ok(()) } } #}
Client de messagerie
On vous donne maintenant le code d'un client pour communiquer avec le serveur. Vous
pourrez utiliser ce client pour tester votre serveur. Le client utilise les mêmes types
Error
et Codec
que le serveur.
use futures::{stream, Future, Sink, Stream}; use tokio_core::net::TcpStream; use tokio_core::reactor::Core; fn main() { let mut core = Core::new().unwrap(); let handle = core.handle(); let addr = "127.0.0.1:12345".parse().unwrap(); // Establish a conection to the server. `TcpStream::connect` produces a future that we // must resolve with the event loop. let socket = core.run(TcpStream::connect(&addr, &handle)).unwrap(); // Obtain a sink and a stream to interface with the socket. let (writer, reader) = Codec.framed(socket).split(); // Create a future that prints each message to the console. let printer = reader.for_each(|msg| { println!("{}", msg); Ok(()) }); // Add the future to the event loop, panic if an error is encountered. handle.spawn(printer.map_err(|err| panic!("{}", err))); // Create a future than send 10^9 messages. let sender = stream::iter_ok::<_, ()>(0..1_000_000_000) // Convert numbers to messages .map(|i| Rc::new(format!("msg {}", i))) // Send all messages to the sink. .forward(writer.sink_map_err(|err| panic!("{}", err))); // Spin-up the event loop until `sender` is completed. core.run(sender).unwrap(); }
Question 1. Lisez attentivement et comprenez le code ci-dessus. En particulier vous vous poserez les questions suivantes:
- Quel est le type de
TcpStream::connect(&addr, &handle)
? - Que fait la méthode
Core::run
? - Quels sont les types de
writer
et dereader
? - Que fait
reader.for_each
? Que renvoie-t-il ? A quel moment la clôture passée en argument est-elle exécutée ? - Que fait
handle.spawn
? - Que font les méthodes
map
etforward
?
Serveur d'écho
On veut maintenant créer un serveur qui renvoie à chaque client les messages que celui-ci
lui a envoyé. Afin de traiter les connexions entrantes, on utilisera le
TcpListener. Un exemple d'utilisation est fournis ci-dessous. La méthode
listener.incoming()
retourne le stream des connexions entrantes.
# #![allow(unused_variables)] #fn main() { use tokio_core::net::TcpListener; // Create the event loop that will drive this server let mut core = Core::new().unwrap(); let handle = core.handle(); // Bind the server's socket let addr = "127.0.0.1:12345".parse().unwrap(); let listener = TcpListener::bind(&addr, &handle).unwrap(); // Handle the stream of incoming connections. let server = listener.incoming().for_each(|(socket, _)| { ... // TODO: handle incoming connections }); // Spin up the server on the event loop core.run(server).unwrap(); #}
Question 2. Créez un serveur qui renvoie chaque message reçu au client émetteur.
Votre serveur devra continuer de fonctionner même si un client est tué ou si il envoie un
caractère non valide (on peut tester ce dernier point en ouvrant une connexion avec telnet
et en tapant Ctrl+C
).
Serveur de messagerie
Nous allons maintenant modifier le serveur d'écho pour envoyer les messages à tous les clients. Le serveur devra répondre aux contraintes ci dessous:
- Tous les clients doivent recevoir tous les messages.
- Les messages doivent arriver dans le même ordre pour tous les clients
- Le serveur doit continuer à fonctionner même si un client se déconnecte ou envoie des données illégales.
Pour séquentialiser les messages, vous pourrez utiliser les files de messages MPSC (Multiple Producer, Single Consumer).
Question 3. Implémenter le serveur de messagerie. Modifiez le client pour mesurer le nombre de messages que peut supporter le serveur avec 1, 2 ou 4 clients.
Question 4. On souhaite compléter le serveur et/ou le client de messagerie par un pool de threads capables de gérer en parallèle des traitements calculatoires sur les messages. Par exemple, on pourra ajouter du (dé)chiffrement, de la traduction automatique, de l'analyse syntaxique et sémantique pour exploiter les données échangées par les clients, etc. Modifiez le serveur et/ou le client pour que les messages soient envoyées via une future dans un tel pool de tâches calculatoires, avant de repartir (éventuellement modifié) dans le schéma de diffusion de messages implémenté ci-dessus.