Processus réactifs

Lors de la création du moteur d'exécution, nous avons volontairement fourni des méthodes bas niveau de création de graphes de tâche. Cela rend la création de graphes peu aisée, et notamment (comme vu en TD) requiert d'écrire les graphes en partant de la fin. Nous allons maintenant définir une interface de création de processus réactifs en s'inspirant des bibliothèques vues en cours (futures, rayon, etc.).

Pour cette partie (et les suivantes) du projet il est demandé de choisir entre l'implémentation à utilisation unique ou utilisation multiple des noeuds. Le cas de l'utilisation multiple permet de meilleures performances, mais pose des questions additionnelles; en particulier, il faut faire attention à ne pas activer plusieurs fois un noeud au travers du même activateur avant qu'il ait été exécuté !

Les exemples et explications ci-dessous sont données en présupposant un moteur d'exécution séquentiel à utilisation unique des noeuds; c'est à vous d'adapter pour votre moteur d'exécution parallèle.

On supposera les définitions de types suivantes:


# #![allow(unused_variables)]
#fn main() {
type Activator<'r> = <Runtime<'r> as GraphSpec>::Activator;
type Builder<'a, 'r> = ScopedGraphBuilder<'a, Runtime<'r>>;

type Receiver<'r, T> = <<Runtime<'r> as PortSpec<T>>::Port as Port>::Receiver;
type Sender<'r, T> = <<Runtime<'r> as PortSpec<T>>::Port as Port>::Sender;

type ControlEdge<'r> = ControlOutput<Activator<'r>>;
type DataEdge<'r, T> = NodeInput<Activator<'r>, Sender<'r, T>>;
#}

Question 0. En vous inspirant des traits ReceiverExt et SenderExt, écrivez un trait ActivatorExt qui fournit une méthode as_control_output convertissant l'activateur en une arête.

Nous allons définir le trait ProcessOnce représentant un processus réactif pouvant être compilé en un graphe de tâches. Pour compiler un processus réactif P, on lui fournit les arêtes sur lesquelles envoyer ses sorties (qui activeront donc les noeuds appropriés), et il retourne les arêtes sur lesquelles il attend ses entrées (de type P::Inputs).


# #![allow(unused_variables)]
#fn main() {
/// A reactive process which can be compiled to a graph.
pub trait ProcessOnce<'r, Outputs: Tuple>: ProcessOnceExt + 'r {
    /// The input edges produced when compiling this process.
    type Inputs: Tuple;

    /// Compile the reactive process down to a graph.
    fn compile_once<'a>(self, b: &mut Builder<'a, 'r>, outputs: Outputs) -> Self::Inputs;
}

/// A trait containing utility functions combinators for processes.
///
/// This is a separate trait from ProcessOnce in order to simplify the
/// typechecking: when rust sees code such as `p1.join(p2)`; it only needs to check
/// that `p1` implements `ProcessOnceExt` and can delay the actual typechecking
/// of the input/output edges until a larger definition is provided.
pub trait ProcessOnceExt {
    fn map<F, I, O>(self, fun: F) -> Map<Self, F, I, O>
    where Self: Sized,
          F: FnOnce(I) -> O,
    {
        Map { process: self, fun, _funtype: PhantomData }
    }

    // TODO: Add combinators here :)
}

struct Map<P, F, I, O> {
    process: P,
    fun: F,

    /// This is used to tell Rust type inference about the `I` and `O` types to
    /// use for this function.  The same type `F` can implement `FnOnce(I) -> O`
    /// for multiple `I` and `O` types, and without this, the typechecker can
    /// easily get confused as to which input/output types we actually meant
    /// and require manual annotations.
    /// This ensures that a single, coherent type is used for a `Map` value
    /// even if the underlying `F` type is compatible with several input/output
    /// types.
    _funtype: PhantomData<fn(I) -> O>,
}
#}

Lors de l'implémentation du trait ProcessOnce on essayera autant que possible de rester général sur les types Outputs compatibles. Par exemple on définit ainsi un processus appliquant un ReceiverOnce:


# #![allow(unused_variables)]
#fn main() {
struct Recv<R> {
    receiver: R,
}

impl<R> ProcessOnceExt for Recv<R> {}

impl<'r, O: 'r, R: 'r> ProcessOnce<'r, (O, )> for Recv<R>
where
    R: ReceiverOnce,
    O: OutputEdgeOnce<Runtime<'r>, Item = R::Item>,
{
    type Inputs = (ControlEdge<'r>, );

    fn compile_once<'a>(self, b: &mut Builder<'a, 'r>, (output, ): (O, )) -> Self::Inputs {
        let activator = b.node(TaskNode {
            inputs: (),
            outputs: (output, ),
            task: StrictTask::new(move || (self.receiver.recv_once(), )),
        }).add_activator();

        (activator.as_control_output(), )
    }
}
#}

Processus réactifs de base

Question 1. Écrivez une fonction ready qui prend une valeur de type V et construit un processus renvoyant la valeur, et une fonction lazy qui encapsule une cloture dans un processus réactif qui retourne le résultat de son exécution.

Question 2. Implémentez le trait ProcessOnce pour les valeurs de type Map défini ci-dessus.

Question 3. Ajoutez au trait ProcessOnceExt une méthode flatten qui exécute le processus retourné par un autre processus, ainsi qu'une méthode then équivalente à map(..).flatten(). Vous pourrez vous inspirez de la déclaration de ces méthodes pour le trait Future. Il vous faudra probablement également implémenter le trait TaskOnce.

Question 4 (optionnel). Dans la question précédente, les fonctions flatten et then que nous avons définies demandent de compiler un nouveau processus pendant son exécution, ce qui peut être couteux. En réutilisant le processus Recv donné en example plus haut, ajoutez une méthode static_then au trait ProcessOnceExt. Un exemple d'utilisation est donné ci-dessous.


# #![allow(unused_variables)]
#fn main() {
ready(3).static_then(|(recv_three, )| {
    println!("Currently compiling...");
    recv_three.map(|(three, )| println!("{} = 3", three))
})
#}

Question 5. Lorsque l'on compile un processus, on veut pouvoir récupérer sa valeur. Implémentez le trait OutputEdgeOnce pour la structure ProcessOutput ci-dessous, puis écrivez une fonction execute_process qui prend un processus en argument, l'exécute dans un nouveau moteur d'exécution, et écrit sa valeur dans une variable donnée en argument (on a besoin de prendre la variable de sortie en argument car autrement la durée de vie 'r du moteur d'exécution devrait être inférieure à celle de la fonction execute_process, ce que l'on ne peut pas exprimer en Rust).


# #![allow(unused_variables)]
#fn main() {
struct ProcessOutput<'a, T: 'a> {
    target: &'a mut T,
}

fn execute_process<'r, I: 'r, T: 'r, P>(process: P, input: I, out: &'r mut T)
where
    P: ProcessOnce<'r, (ProcessOutput<'r, T>, )>,
    P::Inputs: OutputEdgeOnce<Runtime<'r>, Item = I>,
{
    // TODO
}
#}

Composition parallèle

On veut maintenant implémenter la composition parallèle || sous la forme d'une méthode join, telle que p1.join(p2) créé un nouveau processus retournant le couple des valeurs produites par p1 et p2.

Question 6. Implémentez la méthode join, puis exécutez avec votre moteur un processus qui affiche "a" et "b" en parallèle.

Horloges

On souhaite maintenant introduire une notion d'instants dans le moteur d'exécution. Pour cela, on va utiliser la même technique que dans l'implémentation de ReactiveML: une liste de noeuds à activer à l'instant suivant.

Question 7. Ajoutez au moteur d'exécution une méthode pause qui prend en argument un Handle et l'enregistre dans une liste next qui sera exécutée à l'instant suivant, puis adaptez la méthode execute en conséquence.

Question 8. Ajoutez une méthode pause au trait ProcessOnceExt, puis écrivez le processus ReactiveML suivant en Rust et exécutez-le:

(print_endline "instant 0") ||
(pause; print_endline "instant 1") ||
(pause; pause; print_endline "instant 2")

Répétition

On veut pouvoir implémenter la construction while de ReactiveML. Pour ce faire, on définit le type LoopStatus:


# #![allow(unused_variables)]
#fn main() {
pub enum LoopStatus<V> { Continue, Done(V) }
#}

Question 9. Implémentez une fonction while_ qui prend en argument une fonction retournant un processus de valeur de retour LoopStatus et retourne un processus effectuant la boucle. On notera qu'il est impossible de faire une version statique de cette fonction à moins d'utiliser le moteur à exécutions multiples; il faut donc recompiler un processus à chaque itération.