Nota:
El acceso a esta página requiere autorización. Puede intentar iniciar sesión o cambiar directorios.
El acceso a esta página requiere autorización. Puede intentar cambiar los directorios.
Los canales de difusión son un tipo especial de mecanismo de difusión que se puede usar para enviar mensajes a todos los suscriptores. A diferencia de los proveedores de streaming, los canales de difusión no son persistentes, no almacenan mensajes y no son un reemplazo de las secuencias persistentes. Con los canales de difusión, los granos se suscriben implícitamente al canal de difusión y reciben mensajes de difusión de un productor. Esto desacopla el remitente y el receptor del mensaje, lo que resulta útil para escenarios en los que el remitente y el receptor no se conocen de antemano.
Para usar un canal de difusión, primero debe configurar los flujos de Orleans y luego habilitar la difusión en su canal usando AddBroadcastChannel durante la configuración del silo.
siloBuilder.AddBroadcastChannel(OrleansBroadcastChannelNames.ReadmodelChanges);
Escenario de ejemplo
Considere un escenario en el que tiene un agente que necesita obtener actualizaciones del precio de las acciones de un proveedor. El proveedor de precios de acciones es un servicio en segundo plano que publica actualizaciones de precios de acciones en un canal de difusión. Los granos se suscriben implícitamente al canal de difusión y reciben precios de acciones actualizados. El siguiente diagrama muestra los escenarios.
En el diagrama anterior:
- El silo publica actualizaciones de los precios de las acciones en el canal de transmisión.
- El grano se suscribe al canal de difusión y recibe actualizaciones de los precios de las acciones.
- El cliente consume las actualizaciones de precios de acciones del grano de acciones.
El canal de difusión desacopla al productor y al consumidor de las actualizaciones de precios de acciones. El productor publica actualizaciones de los precios de las acciones en el canal de difusión y el consumidor se suscribe al canal de difusión y recibe actualizaciones de los precios de las acciones.
Definir un grano de consumidor
Para consumir mensajes de canal de difusión, los granos deben implementar la interfaz IOnBroadcastChannelSubscribed. Esta interfaz permite suscripciones implícitas, lo que significa que los granos se suscriben automáticamente al canal de difusión cuando se activan. La implementación usa el IBroadcastChannelSubscription.Attach método para asociarse al canal de difusión. El método Attach toma un parámetro de tipo genérico para el tipo de mensaje que va a recibir.
En primer lugar, defina la interfaz de grano que usan los consumidores para interactuar con el grano:
namespace BroadcastChannel.GrainInterfaces;
public interface ILiveStockGrain : IGrainWithGuidKey
{
ValueTask<Stock> GetStock(StockSymbol symbol);
}
La ILiveStockGrain interfaz usa IGrainWithGuidKey, lo que significa que el grano se identifica mediante una clave GUID. A continuación, implemente el componente que se suscribe al canal de emisión:
using System.Collections.Concurrent;
using BroadcastChannel.GrainInterfaces;
using Orleans.BroadcastChannel;
namespace BroadcastChannel.Silo;
[ImplicitChannelSubscription]
public sealed class LiveStockGrain :
Grain,
ILiveStockGrain,
IOnBroadcastChannelSubscribed
{
private readonly IDictionary<StockSymbol, Stock> _stockCache =
new ConcurrentDictionary<StockSymbol, Stock>();
public ValueTask<Stock> GetStock(StockSymbol symbol) =>
_stockCache.TryGetValue(symbol, out Stock? stock) is false
? new ValueTask<Stock>(Task.FromException<Stock>(new KeyNotFoundException()))
: new ValueTask<Stock>(stock);
public Task OnSubscribed(IBroadcastChannelSubscription subscription) =>
subscription.Attach<Stock>(OnStockUpdated, OnError);
private Task OnStockUpdated(Stock stock)
{
if (stock is { GlobalQuote: { } })
{
_stockCache[stock.GlobalQuote.Symbol] = stock;
}
return Task.CompletedTask;
}
private static Task OnError(Exception ex)
{
Console.Error.WriteLine($"An error occurred: {ex}");
return Task.CompletedTask;
}
}
En el código anterior:
- El grano
LiveStockGrainimplementa la interfazIOnBroadcastChannelSubscribed. - El atributo
[ImplicitChannelSubscription]marca este elemento para la suscripción automática a canales de difusión. - Se llama al método
OnSubscribedautomáticamente cuando el grano se activa (en su primer uso o después de recuperarse de un fallo). - El parámetro
subscriptionse usa para llamar al métodoAttachpara asociarlo al canal de difusión.- El método
OnStockUpdatedse pasa aAttachcomo una función que se desencadena cuando se recibe el mensajeStock. - El método
OnErrorse pasa aAttachcomo un callback que se activa cuando se produce un error.
- El método
Este ejemplo de grano contiene los últimos precios de acciones publicados en el canal de difusión. Cualquier cliente que solicite este grano para el último precio de acciones obtiene el precio más reciente del canal de difusión.
Publicación de mensajes en un canal de difusión
Para publicar mensajes en el canal de difusión, debe obtener una referencia a él. Para ello, obtenga el IBroadcastChannelProvider del IClusterClient. Con el proveedor, llame al IBroadcastChannelProvider.GetChannelWriter método para obtener una instancia de IBroadcastChannelWriter<T>. El escritor se usa para publicar mensajes en el canal de difusión.
En primer lugar, defina una constante para el nombre del canal para asegurarse de que el productor y los consumidores usan el mismo identificador de canal:
namespace BroadcastChannel.GrainInterfaces;
public sealed class ChannelNames
{
public const string LiveStockTicker = "live-stock-ticker";
}
A continuación, cree un publicador que envíe mensajes al canal de difusión:
using System.Diagnostics;
using BroadcastChannel.GrainInterfaces;
using Microsoft.Extensions.Hosting;
using Orleans.BroadcastChannel;
namespace BroadcastChannel.Silo.Services;
internal sealed class StockWorker : BackgroundService
{
private readonly StockClient _stockClient;
private readonly IBroadcastChannelProvider _provider;
private readonly List<StockSymbol> _symbols = Enum.GetValues<StockSymbol>().ToList();
public StockWorker(
StockClient stockClient, IClusterClient clusterClient) =>
(_stockClient, _provider) =
(stockClient, clusterClient.GetBroadcastChannelProvider(ChannelNames.LiveStockTicker));
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
while (!stoppingToken.IsCancellationRequested)
{
// Capture the starting timestamp.
long startingTimestamp = Stopwatch.GetTimestamp();
// Get all updated stock values.
Stock[] stocks = await Task.WhenAll(
tasks: _symbols.Select(selector: _stockClient.GetStockAsync));
// Get the live stock ticker broadcast channel.
ChannelId channelId = ChannelId.Create(ChannelNames.LiveStockTicker, Guid.Empty);
IBroadcastChannelWriter<Stock> channelWriter = _provider.GetChannelWriter<Stock>(channelId);
// Broadcast all stock updates on this channel.
await Task.WhenAll(
stocks.Where(s => s is not null).Select(channelWriter.Publish));
// Use the elapsed time to calculate a 15 second delay.
int elapsed = Stopwatch.GetElapsedTime(startingTimestamp).Milliseconds;
int remaining = Math.Max(0, 15_000 - elapsed);
await Task.Delay(remaining, stoppingToken);
}
}
}
En el código anterior:
- La clase
StockWorkeres un servicio en segundo plano que publica mensajes en el canal de difusión. - El constructor toma un
StockClienty un IClusterClient como parámetros. - Desde la instancia de cliente del clúster, el GetBroadcastChannelProvider método se usa para obtener el proveedor del canal de difusión para el
LiveStockTickercanal. - El
ChannelId.Createmétodo crea un identificador de canal mediante:- El nombre del canal (
ChannelNames.LiveStockTicker): debe coincidir con el nombre usado al configurar el canal de difusión en la configuración del silo. -
Guid.Emptycomo espacio de nombres: para los canales de difusión, todos los suscriptores reciben todos los mensajes, por lo que el espacio de nombres normalmente se establece enGuid.Emptypara indicar una sola difusión compartida.
- El nombre del canal (
- Utilizando el
StockClient, la claseStockWorkerobtiene el precio más reciente de cada símbolo bursátil. - Cada 15 segundos, la
StockWorkerclase publicaStockmensajes en el canal de difusión.
La publicación de mensajes en un canal de difusión se desacopla del grano del consumidor. El productor no sabe sobre granos de consumo específicos. En su lugar, se publica en el canal de difusión y todos los granos suscritos implícitamente reciben automáticamente los mensajes.
Canales de difusión frente a secuencias
Los canales de difusión y Orleans las secuencias (incluidas las secuencias en memoria) son mecanismos de mensajería, pero sirven para propósitos diferentes y tienen características diferentes. En la tabla siguiente se comparan las diferencias clave:
| Característica | Canales de difusión | Orleans secuencias |
|---|---|---|
| Modelo de suscripción | Implícito: los granos se suscriben automáticamente cuando se activan | Explícito: los granos deben suscribirse explícitamente a secuencias |
| Persistencia de mensajes | No persistente: los mensajes se pierden si no hay suscriptores activos | Puede ser persistente (colas de Azure, Event Hubs) o transitoria (en memoria) |
| Entrega de mensajes | Mejor esfuerzo, entrega de fuego y olvido | Depende del proveedor: puede soportar la entrega al menos una vez o exactamente una vez. |
| Caso de uso | Difusión del mismo mensaje a todos los granos interesados en tiempo real | Mensajería de punto a punto o publicación-suscripción con garantías de entrega |
| Historial de mensajes | Sin historial de mensajes: solo las difusiones actuales | Las secuencias pueden admitir suscripciones con capacidad de retroceso con historial de mensajes |
| Escalabilidad | Optimizado para distribución ramificada a muchos consumidores | Optimizado para el procesamiento basado en cola con presión inversa |
| Ciclo de vida del consumidor | Los consumidores se gestionan de forma implícita mediante Orleans | Los consumidores deben administrar el ciclo de vida de la suscripción |
| Configuración | Simple: solo requiere el nombre del canal. | Más complejo: requiere la configuración del proveedor de flujos. |
Cuándo usar canales de difusión
Use canales de difusión cuando:
- Debe enviar el mismo mensaje a todas las instancias de un tipo de grano.
- La entrega de mensajes no es crítica (las pérdidas ocasionales son aceptables).
- Quiere una suscripción implícita sin administrar el ciclo de vida de la suscripción.
- Necesita actualizaciones en tiempo real sin historial de mensajes.
- Quiere una configuración e instalación sencillas.
Cuándo usar flujos
Utiliza flujos de datos cuando:
- Necesitas entrega garantizada de mensajes.
- Necesita funcionalidades de persistencia y reproducción de mensajes.
- Desea un control explícito sobre el ciclo de vida de la suscripción.
- Necesita mecanismos de control de flujo y de contrapresión.
- El patrón de mensajería es de punto a punto o requiere un enrutamiento más complejo.
- Usted está integrando con sistemas de puesta en cola externos (Event Hubs, Service Bus, Kafka).