Idea

A pipeline is an idea to consume a defined block of codes one after another. It's also a common issue to share data between executed blocks and return result at the end.

+--------------------------------------------------+
|                                                  |
|  +-------------+     PIPE1,data1                 |
|  | PIPE1       +---------------------+           |
|  +-------------+                     |           |
|       |                              |           |
|       | PIPE1,data1                  |           |
|       v                    RESULTS   |           | 
|  +-------------+  PIPE2    +---------v--------+  |
|  | PIPE2       +---------->+                  |  |
|  +-------------+           | PIPE1 - instance |  |
|    |                       | data1            |  |
|    | PIPE2, PIPE1          | PIPE2 - instance |  |
|    v data1                 | PIPE3 - instance |  |
|  +-------------+ PIPE3     | data3            |  |
   |             | data3     |                  |  | 
|  | PIPE3       +---------->+                  |  |
|  +-------------+           |                  |  |
|                            +------------------+  |
|                                      |           |
+----------------------+---------------+-----------+
                       |
                       | EXECUTE(..)
                       v
            +----------+------+
            |  RESULTS        |
            +-----------------+

Implementation

A pipeline is represented by Pipeline class (IPipeline interface). The pipe must inherit from IPipe or IOptionalPipe.

Defining pipeline

Let's look at the simples possible example.

var pipeline = new Pipeline();
pipeline.Add<Pipe1>();
pipeline.Add<Pipe2>();
await pipeline.Execute(CancellationToken.None);

And the pipes definitions.


class Pipe1 : IPipe
{
    public ValueTask<PipeResult> Execute(CancellationToken cancellationToken)
    {
        return new ValueTask<PipeResult>(PipeResult.Continue);
    }
}

class Pipe2 : IPipe
{
    public ValueTask<PipeResult> Execute(CancellationToken cancellationToken)
    {
        return new ValueTask<PipeResult>(PipeResult.Continue);
    }
}

It's worth to mention that IPipeline defines a lot of methods to manipulate pipelines such as Remove, AddOrReplace, Insert and many more.

Available data

It's possible to pass into Pipeline class some additional data. It's worth to mention that Pipeline class accepts IServiceProvider in its constructor. `IServiceProvider' is used in case of not found required dependency in pipeline additional data or pipes results.

Passing data between pipes

Each executed pipe and its results are available for further pipes as simple as parameter in constructor (ctor injection).

class Pipe2 : IPipe
{
    public Pipe2(Pipe1 pipe1) // that's ok Pipe1 was executed 
    {
    } 
    (...)
}

During pipe execution process it's possible to pass some extra data.

class Pipe1 : IPipe
{
    public string PipeData => "Pipe1 data";

    public ValueTask<PipeResult> Execute(CancellationToken cancellationToken)
    {
        var result = PipeResult.NewContinue("some extra data"); //extra data 
        return new ValueTask<PipeResult>(result);
    }
}

Some facts - Each pipe overwites result of the same type. - Pipeline additional data and pipes results are the first choice. Only if not found IServiceProvider is used.

Optional pipes

Pipes can be defined as IPipe or IOptionalPipe. The IPipe in case of being unable to resolve its references will throw an exception. On the other hand, IOptionalPipe will not throw an exception and also will try to execute after any further pipe execution.

It's a very simple, but powerful concept. It's possible to define pipes that will be waiting till proper data become available.


var pipeline = new Pipeline();
pipeline.Add<OptionalPipe>();
pipeline.Add<Pipe1>();
pipeline.Add<Pipe2>();
pipeline.Add<Pipe3>();
var result = await pipeline.Execute(CancellationToken.None);
result.GetOptional<OptionalPipe>().Should().NotBeNull();
//that's because OptionalPipe breaks pipeline 
result.GetOptional<Pipe3>().Should().BeNull();

class OptionalPipe : IOptionalPipe
{
    // until pipe2 will not show up OptionalPipe won't be created and executed
    public OptionalPipe(Pipe2 pipe2)
    {
        pipe2.Should().NotBeNull();
    }

    public ValueTask<PipeResult> Execute(CancellationToken cancellationToken)
    {
        return new ValueTask<PipeResult>(PipeResult.Break);
    }
}

class Pipe1 : IPipe
{
    public ValueTask<PipeResult> Execute(CancellationToken cancellationToken)
    {
        return new ValueTask<PipeResult>(PipeResult.Continue);
    }
}

class Pipe2 : IPipe
{
    public ValueTask<PipeResult> Execute(CancellationToken cancellationToken)
    {
        return new ValueTask<PipeResult>(PipeResult.Continue);
    }
}

class Pipe3 : IPipe
{
    public ValueTask<PipeResult> Execute(CancellationToken cancellationToken)
    {
        return new ValueTask<PipeResult>(PipeResult.Continue);
    }
}

Consuming pipeline data

Pipeline execution results with IPipelineResult which holds all exposed pipes data.

var result = await pipeline.Execute(CancellationToken.None);
result.Get<Pipe1>().Should().NotBeNull();
result.Get<SomeData>().Should().NotBeNull();

Disposing pipeline result

Pipeline result inherited from IAsyncDisposable and can be disposed.

await using var result = await pipeline.Execute(CancellationToken.None);
// or
var result = await pipeline.Execute(CancellationToken.None);
await result.DisposeAsync();

Below code presents implementation of DisposeAsync from PipelineResult class.

public async ValueTask DisposeAsync()
{
    foreach (var pair in availableData)
    {
        if (pair.Value is IAsyncDisposable asyncDisposable)
            await asyncDisposable.DisposeAsync().ConfigureAwait(false);
        else if(pair.Value is IDisposable disposable)
        {
            disposable.Dispose();
        }
    }
    availableData.Clear();
}

Breaking the pipeline

It's possible to break pipeline execution in each of pipe.


var pipeline = new Pipeline();
pipeline.Add<Pipe1>();
pipeline.Add<BreakPipe>();
pipeline.Add<Pipe2>();
var result = await pipeline.Execute(CancellationToken.None);

result.GetOptional<Pipe2>().Should().BeNull();


class BreakPipe : IPipe
{
    public ValueTask<PipeResult> Execute(CancellationToken cancellationToken)
    {
        return new ValueTask<PipeResult>(PipeResult.Break);
    }
}
(...)

Examples

Examples can be found at pipeline examples.