Processus réactifs
Lors de la création du moteur d'exécution, nous avons volontairement fourni des méthodes bas niveau de création de graphes de tâche. Cela rend la création de graphes peu aisée, et notamment (comme vu en TD) requiert d'écrire les graphes en partant de la fin. Nous allons maintenant définir une interface de création de processus réactifs en s'inspirant des bibliothèques vues en cours (futures, rayon, etc.).
Pour cette partie (et les suivantes) du projet il est demandé de choisir entre l'implémentation à utilisation unique ou utilisation multiple des noeuds. Le cas de l'utilisation multiple permet de meilleures performances, mais pose des questions additionnelles; en particulier, il faut faire attention à ne pas activer plusieurs fois un noeud au travers du même activateur avant qu'il ait été exécuté !
Les exemples et explications ci-dessous sont données en présupposant un moteur d'exécution séquentiel à utilisation unique des noeuds; c'est à vous d'adapter pour votre moteur d'exécution parallèle.
On supposera les définitions de types suivantes:
# #![allow(unused_variables)] #fn main() { type Activator<'r> = <Runtime<'r> as GraphSpec>::Activator; type Builder<'a, 'r> = ScopedGraphBuilder<'a, Runtime<'r>>; type Receiver<'r, T> = <<Runtime<'r> as PortSpec<T>>::Port as Port>::Receiver; type Sender<'r, T> = <<Runtime<'r> as PortSpec<T>>::Port as Port>::Sender; type ControlEdge<'r> = ControlOutput<Activator<'r>>; type DataEdge<'r, T> = NodeInput<Activator<'r>, Sender<'r, T>>; #}
Question 0. En vous inspirant des traits ReceiverExt
et SenderExt
,
écrivez un trait ActivatorExt
qui fournit une méthode as_control_output
convertissant l'activateur en une arête.
Nous allons définir le trait ProcessOnce
représentant un processus réactif
pouvant être compilé en un graphe de tâches. Pour compiler un processus
réactif P
, on lui fournit les arêtes sur lesquelles envoyer ses sorties (qui
activeront donc les noeuds appropriés), et il retourne les arêtes sur
lesquelles il attend ses entrées (de type P::Inputs
).
# #![allow(unused_variables)] #fn main() { /// A reactive process which can be compiled to a graph. pub trait ProcessOnce<'r, Outputs: Tuple>: ProcessOnceExt + 'r { /// The input edges produced when compiling this process. type Inputs: Tuple; /// Compile the reactive process down to a graph. fn compile_once<'a>(self, b: &mut Builder<'a, 'r>, outputs: Outputs) -> Self::Inputs; } /// A trait containing utility functions combinators for processes. /// /// This is a separate trait from ProcessOnce in order to simplify the /// typechecking: when rust sees code such as `p1.join(p2)`; it only needs to check /// that `p1` implements `ProcessOnceExt` and can delay the actual typechecking /// of the input/output edges until a larger definition is provided. pub trait ProcessOnceExt { fn map<F, I, O>(self, fun: F) -> Map<Self, F, I, O> where Self: Sized, F: FnOnce(I) -> O, { Map { process: self, fun, _funtype: PhantomData } } // TODO: Add combinators here :) } struct Map<P, F, I, O> { process: P, fun: F, /// This is used to tell Rust type inference about the `I` and `O` types to /// use for this function. The same type `F` can implement `FnOnce(I) -> O` /// for multiple `I` and `O` types, and without this, the typechecker can /// easily get confused as to which input/output types we actually meant /// and require manual annotations. /// This ensures that a single, coherent type is used for a `Map` value /// even if the underlying `F` type is compatible with several input/output /// types. _funtype: PhantomData<fn(I) -> O>, } #}
Lors de l'implémentation du trait ProcessOnce
on essayera autant que possible
de rester général sur les types Outputs
compatibles. Par exemple on
définit ainsi un processus appliquant un ReceiverOnce
:
# #![allow(unused_variables)] #fn main() { struct Recv<R> { receiver: R, } impl<R> ProcessOnceExt for Recv<R> {} impl<'r, O: 'r, R: 'r> ProcessOnce<'r, (O, )> for Recv<R> where R: ReceiverOnce, O: OutputEdgeOnce<Runtime<'r>, Item = R::Item>, { type Inputs = (ControlEdge<'r>, ); fn compile_once<'a>(self, b: &mut Builder<'a, 'r>, (output, ): (O, )) -> Self::Inputs { let activator = b.node(TaskNode { inputs: (), outputs: (output, ), task: StrictTask::new(move || (self.receiver.recv_once(), )), }).add_activator(); (activator.as_control_output(), ) } } #}
Processus réactifs de base
Question 1. Écrivez une fonction ready
qui prend une valeur de type V
et construit un processus renvoyant la valeur, et une fonction lazy
qui
encapsule une cloture dans un processus réactif qui retourne le résultat de son
exécution.
Question 2. Implémentez le trait ProcessOnce
pour les valeurs de type
Map
défini ci-dessus.
Question 3. Ajoutez au trait ProcessOnceExt
une méthode flatten
qui
exécute le processus retourné par un autre processus, ainsi qu'une méthode
then
équivalente à map(..).flatten()
. Vous pourrez vous inspirez de la
déclaration de ces méthodes pour le trait Future
. Il vous faudra
probablement également implémenter le trait TaskOnce
.
Question 4 (optionnel). Dans la question précédente, les fonctions
flatten
et then
que nous avons définies demandent de compiler un nouveau
processus pendant son exécution, ce qui peut être couteux. En réutilisant le
processus Recv
donné en example plus haut, ajoutez une méthode static_then
au trait ProcessOnceExt
. Un exemple d'utilisation est donné ci-dessous.
# #![allow(unused_variables)] #fn main() { ready(3).static_then(|(recv_three, )| { println!("Currently compiling..."); recv_three.map(|(three, )| println!("{} = 3", three)) }) #}
Question 5. Lorsque l'on compile un processus, on veut pouvoir récupérer sa
valeur. Implémentez le trait OutputEdgeOnce
pour la structure
ProcessOutput
ci-dessous, puis écrivez une fonction execute_process
qui
prend un processus en argument, l'exécute dans un nouveau moteur d'exécution,
et écrit sa valeur dans une variable donnée en argument (on a besoin de prendre
la variable de sortie en argument car autrement la durée de vie 'r
du moteur
d'exécution devrait être inférieure à celle de la fonction execute_process
,
ce que l'on ne peut pas exprimer en Rust).
# #![allow(unused_variables)] #fn main() { struct ProcessOutput<'a, T: 'a> { target: &'a mut T, } fn execute_process<'r, I: 'r, T: 'r, P>(process: P, input: I, out: &'r mut T) where P: ProcessOnce<'r, (ProcessOutput<'r, T>, )>, P::Inputs: OutputEdgeOnce<Runtime<'r>, Item = I>, { // TODO } #}
Composition parallèle
On veut maintenant implémenter la composition parallèle ||
sous la forme
d'une méthode join
, telle que p1.join(p2)
créé un nouveau processus
retournant le couple des valeurs produites par p1
et p2
.
Question 6. Implémentez la méthode join
, puis exécutez avec votre moteur
un processus qui affiche "a"
et "b"
en parallèle.
Horloges
On souhaite maintenant introduire une notion d'instants dans le moteur d'exécution. Pour cela, on va utiliser la même technique que dans l'implémentation de ReactiveML: une liste de noeuds à activer à l'instant suivant.
Question 7. Ajoutez au moteur d'exécution une méthode pause
qui prend en
argument un Handle
et l'enregistre dans une liste next
qui sera exécutée à
l'instant suivant, puis adaptez la méthode execute
en conséquence.
Question 8. Ajoutez une méthode pause
au trait ProcessOnceExt
, puis
écrivez le processus ReactiveML suivant en Rust et exécutez-le:
(print_endline "instant 0") ||
(pause; print_endline "instant 1") ||
(pause; pause; print_endline "instant 2")
Répétition
On veut pouvoir implémenter la construction while
de ReactiveML. Pour ce faire, on définit le type LoopStatus
:
# #![allow(unused_variables)] #fn main() { pub enum LoopStatus<V> { Continue, Done(V) } #}
Question 9. Implémentez une fonction while_
qui prend en argument une
fonction retournant un processus de valeur de retour LoopStatus
et retourne
un processus effectuant la boucle. On notera qu'il est impossible de faire une
version statique de cette fonction à moins d'utiliser le moteur à exécutions
multiples; il faut donc recompiler un processus à chaque itération.