Compartir a través de


Biblioteca System.Threading.Channels

El espacio de nombres System.Threading.Channels proporciona un conjunto de estructuras de datos de sincronización para pasar datos entre productores y consumidores de forma asincrónica. La biblioteca tiene como destino .NET Standard y funciona en todas las implementaciones de .NET.

Esta biblioteca está disponible en el paquete NuGet System.Threading.Channels. Pero si usa .NET Core 3.0 o una versión posterior, el paquete se incluye como parte del marco.

Modelo de programación conceptual de productor/consumidor

Los canales son una implementación del modelo de programación conceptual de productor/consumidor. En este modelo de programación, los productores generan datos de forma asincrónica y los consumidores consumen esos datos de forma asincrónica. En otras palabras, este modelo pasa datos de una entidad a otra a través de una cola de "primero en entrar, primero en salir" (FIFO). Piense en los canales como cualquier otro tipo de colección genérica común, tal como un List<T>. La principal diferencia es que esta colección administra la sincronización y proporciona varios modelos de consumo mediante las opciones de creación de fábrica. Estas opciones controlan el comportamiento de los canales, como el número de elementos que pueden almacenar y qué ocurre si se alcanza ese límite, o bien si varios productores o varios consumidores acceden al canal de manera simultánea.

Estrategias de límite

En función de cómo se crea una instancia de Channel<T>, su lector y su escritor se comportan de forma diferente.

Para crear un canal que especifique una capacidad máxima, llame a Channel.CreateBounded. Para crear un canal que se use con cualquier número de lectores y escritores simultáneamente, llame a Channel.CreateUnbounded. Cada estrategia de delimitación expone diversas opciones definidas por el creador, ya sea BoundedChannelOptions o UnboundedChannelOptions respectivamente.

Nota

Independientemente de la estrategia de delimitación, un canal siempre lanza un ChannelClosedException cuando se usa después de haber sido cerrado.

Canales ilimitados

Para crear un canal sin límites, llame a una de las sobrecargas de Channel.CreateUnbounded.

var channel = Channel.CreateUnbounded<T>();

Cuando se crea un canal ilimitado, de manera predeterminada, el canal lo puede utilizar cualquier número de lectores y escritores simultáneamente. Como alternativa, puede especificar un comportamiento no predeterminado al crear un canal ilimitado si proporciona una instancia de UnboundedChannelOptions. La capacidad del canal es ilimitada y todas las escrituras se realizan de forma sincrónica. Para obtener más ejemplos, vea Patrones de creación ilimitados.

Canales limitados

Para crear un canal limitado, llame a una de las sobrecargas de Channel.CreateBounded:

var channel = Channel.CreateBounded<T>(7);

El código anterior crea un canal que tiene una capacidad máxima de 7 elementos. Al crear un canal limitado, este está limitado a una capacidad máxima. Cuando se alcanza el límite, el comportamiento predeterminado es que el canal bloquee de forma asincrónica el productor hasta que haya espacio disponible. Puede configurar este comportamiento especificando una opción al crear el canal. Los canales limitados se pueden crear con cualquier valor de capacidad mayor que cero. Para obtener otros ejemplos, vea Patrones de creación limitados.

Comportamiento del modo completo

Al usar un canal limitado, puede especificar el comportamiento al que se adhiere el canal cuando se alcanza el límite configurado. En la tabla siguiente se enumeran los comportamientos de modo completo para cada valor de BoundedChannelFullMode:

Importancia Comportamiento
BoundedChannelFullMode.Wait Este es el valor predeterminado. Las llamadas a WriteAsync esperan a que haya espacio disponible para completar la operación de escritura. Las llamadas a TryWrite devuelven false inmediatamente.
BoundedChannelFullMode.DropNewest Quita y omite el elemento más reciente en el canal con el fin de dejar espacio para el elemento que se va a escribir.
BoundedChannelFullMode.DropOldest Elimina e ignora el elemento más antiguo del canal para dejar espacio al elemento que se va a escribir.
BoundedChannelFullMode.DropWrite Suelta el elemento que se está escribiendo.

Importante

Cada vez que un elemento Channel<TWrite,TRead>.Writer produce más rápido que lo que un elemento Channel<TWrite,TRead>.Reader puede consumir, el escritor del canal experimenta presión retroactiva.

API de productores

La funcionalidad del productor se presenta en Channel<TWrite,TRead>.Writer. Las API de productor y el comportamiento esperado se detallan en la tabla siguiente:

Interfaz de Programación de Aplicaciones (API) Comportamiento esperado
ChannelWriter<T>.Complete Marca el canal como completo, lo que significa que ya no se escriben más elementos en él.
ChannelWriter<T>.TryComplete Intenta marcar el canal como completo, lo que significa que ya no se escriben más datos en él.
ChannelWriter<T>.TryWrite Intenta escribir el elemento especificado en el canal. Cuando se usa con un canal ilimitado, siempre devuelve true, a menos que el escritor del canal señale la finalización con ChannelWriter<T>.Complete o ChannelWriter<T>.TryComplete.
ChannelWriter<T>.WaitToWriteAsync Devuelve una instancia de ValueTask<TResult> que se completa cuando hay espacio disponible para escribir un elemento.
ChannelWriter<T>.WriteAsync Escribe asincrónicamente un elemento en el canal.

APIs de consumidores

La funcionalidad del consumidor se expone en Channel<TWrite,TRead>.Reader. Las API de consumidor y el comportamiento esperado se detallan en la tabla siguiente:

Interfaz de Programación de Aplicaciones (API) Comportamiento esperado
ChannelReader<T>.ReadAllAsync Crea una instancia de IAsyncEnumerable<T> que permite leer todos los datos del canal.
ChannelReader<T>.ReadAsync Lee asincrónicamente un elemento desde el canal.
ChannelReader<T>.TryPeek Intenta consultar un elemento del canal.
ChannelReader<T>.TryRead Intenta leer un elemento del canal.
ChannelReader<T>.WaitToReadAsync Devuelve una instancia de ValueTask<TResult> que se completa cuando hay datos disponibles para leer.

Patrones de uso común

Hay varios patrones de uso para los canales. La API está diseñada para ser sencilla, coherente y lo más flexible posible. Todos los métodos asincrónicos devuelven un elemento ValueTask (o ValueTask<bool>) que representa una operación asíncrona ligera que puede evitar la asignación si la operación se completa sincrónicamente o potencialmente de forma asincrónica. Además, la API está diseñada para admitir composición, de modo que el creador de un canal hace promesas sobre su uso previsto. Cuando se crea un canal con determinados parámetros, la implementación interna puede funcionar de forma más eficaz sabiendo estas promesas.

Patrones de creación

Imagine que está creando una solución de productor/consumidor para un sistema de posicionamiento global (GPS). Quiere realizar un seguimiento de las coordenadas de un dispositivo a lo largo del tiempo. Un objeto de coordenadas de ejemplo podría tener este aspecto:

/// <summary>
/// A representation of a device's coordinates, 
/// which includes latitude and longitude.
/// </summary>
/// <param name="DeviceId">A unique device identifier.</param>
/// <param name="Latitude">The latitude of the device.</param>
/// <param name="Longitude">The longitude of the device.</param>
public readonly record struct Coordinates(
    Guid DeviceId,
    double Latitude,
    double Longitude);

Patrones de creación sin límites

Un patrón de uso común es crear un canal ilimitado predeterminado:

var channel = Channel.CreateUnbounded<Coordinates>();

Pero en su lugar, supongamos que quiere crear un canal ilimitado con varios productores y consumidores:

var channel = Channel.CreateUnbounded<Coordinates>(
    new UnboundedChannelOptions
    {
        SingleWriter = false,
        SingleReader = false,
        AllowSynchronousContinuations = true
    });

En este caso, todas las escrituras son sincrónicas, incluso WriteAsync. Esto se debe a que un canal ilimitado siempre tiene espacio disponible de inmediato para una escritura de forma eficaz. Pero con AllowSynchronousContinuations establecido en true, las operaciones de escritura pueden terminar haciendo el trabajo asociado a un lector ejecutando sus continuaciones. Esto no afecta a la sincronización de la operación.

Patrones de creación limitada

Con los canales limitados, el consumidor debe conocer la capacidad de configuración del canal para ayudar a garantizar el consumo adecuado. Es decir, el consumidor debe saber qué comportamiento muestra el canal cuando se alcanza el límite configurado. Vamos a explorar algunos de los patrones comunes de creación limitada.

La manera más sencilla de crear un canal limitado es especificar una capacidad:

var channel = Channel.CreateBounded<Coordinates>(1);

El código anterior crea un canal limitado con una capacidad máxima de 1. Hay otras opciones disponibles; algunas son las mismas que en un canal ilimitado, mientras que otras son específicas de los canales ilimitados:

var channel = Channel.CreateBounded<Coordinates>(
    new BoundedChannelOptions(1_000)
    {
        SingleWriter = true,
        SingleReader = false,
        AllowSynchronousContinuations = false,
        FullMode = BoundedChannelFullMode.DropWrite
    });

En el código anterior, el canal se crea como un canal acotado con capacidad para 1,000 elementos, con un único escritor pero muchos lectores. Su comportamiento en el modo completo se define como DropWrite, lo que significa que descarta el elemento que se está escribiendo si el canal está completo.

Para monitorizar los elementos que se descartan al usar canales limitados, registre un callback itemDropped:

var channel = Channel.CreateBounded(
    new BoundedChannelOptions(10)
    {
        AllowSynchronousContinuations = true,
        FullMode = BoundedChannelFullMode.DropOldest
    },
    static void (Coordinates dropped) =>
        Console.WriteLine($"Coordinates dropped: {dropped}"));

Cada vez que el canal esté lleno y se agregue un nuevo elemento, se invocará la llamada de retorno itemDropped. En este ejemplo, el callback proporcionado escribe el elemento en la consola, pero puede realizar cualquier otra acción que desee.

Modelos de productor

Imagínate en este escenario que el productor está escribiendo nuevas coordenadas en el canal. El productor puede hacerlo llamando a TryWrite:

static void ProduceWithWhileAndTryWrite(
    ChannelWriter<Coordinates> writer, Coordinates coordinates)
{
    while (coordinates is { Latitude: < 90, Longitude: < 180 })
    {
        var tempCoordinates = coordinates with
        {
            Latitude = coordinates.Latitude + .5,
            Longitude = coordinates.Longitude + 1
        };

        if (writer.TryWrite(item: tempCoordinates))
        {
            coordinates = tempCoordinates;
        }
    }
}

El código anterior del productor:

  • Acepta Channel<Coordinates>.Writer (ChannelWriter<Coordinates>) como argumento, junto con el elemento Coordinates inicial.
  • Define un bucle while condicional que intenta mover las coordenadas mediante TryWrite.

Un productor alternativo podría usar el método WriteAsync:

static async ValueTask ProduceWithWhileWriteAsync(
    ChannelWriter<Coordinates> writer, Coordinates coordinates)
{
    while (coordinates is { Latitude: < 90, Longitude: < 180 })
    {
        await writer.WriteAsync(
            item: coordinates = coordinates with
            {
                Latitude = coordinates.Latitude + .5,
                Longitude = coordinates.Longitude + 1
            });
    }

    writer.Complete();
}

De nuevo, Channel<Coordinates>.Writer se usa en un bucle while. Pero esta vez se llama al método WriteAsync. El método continúa solo después de escribir las coordenadas. Cuando se cierra el bucle while, se realiza una llamada a Complete, lo que indica que no se escriben más datos en el canal.

Otro patrón de productor es usar el método WaitToWriteAsync; tenga en cuenta el código siguiente:

static async ValueTask ProduceWithWaitToWriteAsync(
    ChannelWriter<Coordinates> writer, Coordinates coordinates)
{
    while (coordinates is { Latitude: < 90, Longitude: < 180 } &&
        await writer.WaitToWriteAsync())
    {
        var tempCoordinates = coordinates with
        {
            Latitude = coordinates.Latitude + .5,
            Longitude = coordinates.Longitude + 1
        };

        if (writer.TryWrite(item: tempCoordinates))
        {
            coordinates = tempCoordinates;
        }

        await Task.Delay(TimeSpan.FromMilliseconds(10));
    }

    writer.Complete();
}

Como parte del elemento while condicional, se usa el resultado de la llamada a WaitToWriteAsync para determinar si se va a continuar el bucle.

Patrones de consumidor

Hay varios patrones comunes de consumidor de canales. Cuando un canal no termina nunca, lo que significa que genera datos de manera indefinida, el consumidor puede usar un bucle while (true) y leer los datos a medida que estén disponibles:

static async ValueTask ConsumeWithWhileAsync(
    ChannelReader<Coordinates> reader)
{
    while (true)
    {
        // May throw ChannelClosedException if
        // the parent channel's writer signals complete.
        Coordinates coordinates = await reader.ReadAsync();
        Console.WriteLine(coordinates);
    }
}

Nota

Este código produce una excepción si el canal está cerrado.

Otro consumidor podría evitar este problema mediante un bucle while anidado, tal como se muestra en el código siguiente:

static async ValueTask ConsumeWithNestedWhileAsync(
    ChannelReader<Coordinates> reader)
{
    while (await reader.WaitToReadAsync())
    {
        while (reader.TryRead(out Coordinates coordinates))
        {
            Console.WriteLine(coordinates);
        }
    }
}

En el código anterior, el consumidor espera a leer datos. Una vez que los datos están disponibles, el consumidor intenta leerlos. Estos bucles se siguen evaluando hasta que el productor del canal indica que ya no tiene datos para leer. Dicho esto, cuando se sabe que un productor tiene un número finito de elementos que produce y señala la finalización, el consumidor puede usar la semántica await foreach para iterar sobre los elementos:

static async ValueTask ConsumeWithAwaitForeachAsync(
    ChannelReader<Coordinates> reader)
{
    await foreach (Coordinates coordinates in reader.ReadAllAsync())
    {
        Console.WriteLine(coordinates);
    }
}

El código anterior usa el método ReadAllAsync para leer todas las coordenadas del canal.

Vea también