Compartilhar via


Modo em tempo real no Streaming Estruturado

Important

Esse recurso está em Visualização Pública.

O modo em tempo real é um tipo de gatilho para o Streaming Estruturado que permite o processamento de dados de latência ultra-baixa com latência de ponta a ponta de até cinco milissegundos. Use o modo em tempo real para cargas de trabalho operacionais que exigem resposta imediata aos dados de streaming, como detecção de fraudes, personalização em tempo real e sistemas de tomada de decisões instantâneas.

O modo em tempo real está disponível no Databricks Runtime 16.4 LTS e superior. Para obter instruções de configuração passo a passo, consulte Introdução ao modo em tempo real. Para obter exemplos de código, consulte exemplos de modo em tempo real.

O que é o modo em tempo real?

Cargas de trabalho operacionais versus analíticas

As cargas de trabalho de streaming podem ser amplamente divididas em cargas de trabalho analíticas e cargas de trabalho operacionais:

  • Os workloads analíticos usam a ingestão e a transformação de dados, geralmente seguindo a arquitetura do medalhão (por exemplo, ingestão de dados nas tabelas bronze, prata e ouro).
  • As cargas de trabalho operacionais consomem dados em tempo real, aplicam lógica de negócios e disparam ações ou decisões downstream.

Alguns exemplos de cargas de trabalho operacionais são:

  • Bloquear ou sinalizar uma transação de cartão de crédito em tempo real se uma pontuação de fraude exceder um limite, com base em fatores como localização incomum, tamanho de transação grande ou padrões de gastos rápidos.
  • Entregar uma mensagem promocional quando os dados de navegação mostram que um usuário está procurando por jeans há cinco minutos, oferecendo um desconto de 25% se comprar nos próximos 15 minutos.

Em geral, as cargas de trabalho operacionais são caracterizadas pela necessidade de uma latência end-to-end inferior a um segundo. Isso pode ser feito com o modo em tempo real no Streaming Estruturado do Apache Spark.

Como o modo em tempo real obtém baixa latência

O modo em tempo real melhora a arquitetura de execução:

  • Executando lotes de execução longa (o padrão é cinco minutos), no qual o sistema processa dados conforme eles ficam disponíveis na origem.
  • Agendando todos os estágios da consulta simultaneamente. Isso requer que o número de slots de tarefas disponíveis seja igual ou maior que o número de tarefas de todos os estágios em um lote.
  • Transferência de dados entre estágios, assim que são produzidos, usando shuffle em streaming.

Ao final do processamento de um lote e antes do início do próximo lote, os pontos de verificação de Streaming Estruturado progridem e publicam as métricas. A duração do lote afeta a frequência de ponto de verificação:

  • Lotes mais longos: ponto de verificação menos frequente, o que significa repetições mais longas sobre falha e disponibilidade de métricas atrasadas.
  • Lotes mais curtos: pontos de verificação mais frequentes, o que pode afetar a latência.

A Databricks recomenda avaliar o modo em tempo real em relação à carga de trabalho de destino para determinar o intervalo de disparo apropriado.

Quando usar o modo em tempo real

Escolha o modo em tempo real quando o caso de uso exigir:

  • Latência de sub-segundo: aplicativos que precisam responder aos dados dentro de milissegundos, como sistemas de detecção de fraudes que devem bloquear transações em tempo real.
  • Tomada de decisão operacional: sistemas que disparam ações imediatas com base em dados de entrada, como ofertas em tempo real, alertas ou notificações.
  • Processamento contínuo: cargas de trabalho em que os dados devem ser processados assim que chegam, em vez de em lotes periódicos.

Use o modo de microlote (o gatilho de streaming estruturado padrão) quando:

  • Processamento analítico: pipelines de ETL, transformações de dados e implementações de arquitetura de medalhão em que os requisitos de latência são medidos em segundos ou minutos.
  • Otimização de custo: cargas de trabalho em que a latência de sub-segundo não é necessária, pois o modo em tempo real requer recursos de computação dedicados.
  • A frequência do ponto de verificação é importante: aplicativos que se beneficiam de pontos de verificação mais frequentes para uma recuperação mais rápida.

Requisitos e configuração

O modo em tempo real tem requisitos específicos para configuração de computação e configuração de consulta. Esta seção descreve os pré-requisitos e as etapas de configuração necessárias para usar o modo em tempo real.

Pré-requisitos

Para usar o modo em tempo real, você deve atender aos seguintes requisitos:

  • Databricks Runtime 16.4 LTS ou superior: o modo em tempo real só está disponível no DBR 16.4 LTS e versões posteriores.
  • Computação dedicada: você deve usar uma computação dedicada (anteriormente um único usuário). Standard (anteriormente conhecido como compartilhado), Pipelines Declarativos do Lakeflow Spark e clusters sem servidor não são suportados.
  • Sem dimensionamento automático: o dimensionamento automático deve ser desabilitado.
  • Sem Photon: não há suporte para aceleração de fóton com o modo em tempo real.
  • Configuração do Spark: você deve definir spark.databricks.streaming.realTimeMode.enabled como true.

Configuração de computação

Defina sua computação com as seguintes configurações:

  • Definido spark.databricks.streaming.realTimeMode.enabled como true na configuração do Spark.
  • Desabilitar o dimensionamento automático.
  • Desabilitar a aceleração do Photon.
  • Verifique se a computação está configurada como um cluster dedicado (não padrão, Pipelines Declarativos do Lakeflow Spark ou sem servidor).

Para obter instruções passo a passo sobre como criar e configurar a computação para o modo em tempo real, consulte Introdução ao modo em tempo real.

Configuração de consulta

Para executar uma consulta no modo em tempo real, você deve habilitar o gatilho em tempo real. Os gatilhos em tempo real têm suporte apenas no modo de atualização.

Python

query = (
    spark.readStream
        .format("kafka")
        .option("kafka.bootstrap.servers", broker_address)
        .option("subscribe", input_topic)
        .load()
        .writeStream
        .format("kafka")
        .option("kafka.bootstrap.servers", broker_address)
        .option("topic", output_topic)
        .option("checkpointLocation", checkpoint_location)
        .outputMode("update")
        # In PySpark, the realTime trigger requires specifying the interval.
        .trigger(realTime="5 minutes")
        .start()
)

Scala

import org.apache.spark.sql.execution.streaming.RealTimeTrigger

val readStream = spark.readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", brokerAddress)
      .option("subscribe", inputTopic).load()
      .writeStream
      .format("kafka")
      .option("kafka.bootstrap.servers", brokerAddress)
      .option("topic", outputTopic)
      .option("checkpointLocation", checkpointLocation)
      .outputMode("update")
      .trigger(RealTimeTrigger.apply())
      // RealTimeTrigger can also accept an argument specifying the checkpoint interval.
      // For example, this code indicates a checkpoint interval of 5 minutes:
      // .trigger(RealTimeTrigger.apply("5 minutes"))
      .start()

Dimensionamento de computação

Você pode executar um trabalho em tempo real por recurso de computação se a computação tiver slots de tarefa suficientes.

Para ser executado no modo de baixa latência, o número total de slots de tarefa disponíveis deve ser maior ou igual ao número de tarefas em todos os estágios de consulta.

Exemplos de cálculo de slots

Tipo de pipeline Configuração Slots necessários
Sem estado de estágio único (origem kafka + coletor) maxPartitions = 8 8 slots
Estado com dois estágios (fonte Kafka + embaralhamento) maxPartitions = 8, partições de embaralhamento = 20 28 slots (8 + 20)
Três estágios (origem kafka + shuffle + repartition) maxPartitions = 8, duas fases de shuffle de 20 cada um 48 slots (8 + 20 + 20)

Se você não definir maxPartitions, use o número de partições no tópico Kafka.

Principais considerações

Ao configurar sua computação, considere o seguinte:

  • Ao contrário do modo de microlote, as tarefas em tempo real podem permanecer ociosas enquanto aguardam dados, portanto, o dimensionamento correto é essencial para evitar o desperdício de recursos.
  • Almeje um nível de utilização alvo (por exemplo, 50%) ajustando:
    • maxPartitions (para Kafka)
    • spark.sql.shuffle.partitions (para estágios de embaralhamento)
  • O Databricks recomenda a configuração maxPartitions para que cada tarefa trate várias partições kafka para reduzir a sobrecarga.
  • Ajuste os slots de tarefa por trabalhador para corresponder à carga de trabalho para tarefas simples de estágio único.
  • No caso de trabalhos com muito embaralhamento, faça experiências para encontrar o número mínimo de partições de embaralhamento que evitem listas de pendências e ajuste a partir daí. O computador não agendará a tarefa se não tiver slots suficientes.

Note

A partir do Databricks Runtime 16.4 LTS e superior, todos os pipelines em tempo real usam o ponto de verificação v2, que permite a alternância perfeita entre os modos em tempo real e microlote.

Técnicas de otimização

Technique Habilitado por padrão
Acompanhamento de progresso assíncrono: move a gravação para compensar o log e confirmar o log em um thread assíncrono, reduzindo o tempo entre lotes entre dois micro-lotes. Isso pode ajudar a reduzir a latência de consultas de streaming sem estado. No
Ponto de verificação de estado assíncrono: ajuda a reduzir a latência de consultas de streaming com estado iniciando o processamento do próximo microlote assim que a computação do microlote anterior for concluída, sem aguardar o ponto de verificação de estado. No

Monitoramento e observabilidade

Medir o desempenho da consulta é essencial para cargas de trabalho em tempo real. No modo em tempo real, as métricas tradicionais de duração do lote não refletem a latência real, portanto, você precisa de abordagens alternativas.

A latência de ponta a ponta é específica da carga de trabalho e, às vezes, só pode ser medida com precisão com a lógica de negócios. Por exemplo, se o carimbo de data/hora de origem for gerado no Kafka, você pode calcular a latência como a diferença entre o carimbo de data/hora de saída do Kafka e o carimbo de data/hora de origem.

Você também pode estimar a latência de ponta a ponta usando as métricas internas e as APIs descritas abaixo.

Métricas integradas com StreamingQueryProgress

As métricas a seguir são incluídas no StreamingQueryProgress evento, que é registrado automaticamente nos logs de driver. Você também pode acessá-los através da função de retorno de chamada StreamingQueryListener de onQueryProgress(). QueryProgressEvent.json() ou toString() incluem métricas adicionais de modo em tempo real.

  1. Latência de processamento (processingLatencyMs). O tempo transcorrido entre quando a consulta no modo em tempo real lê um registro e quando grava esse registro no próximo estágio ou downstream. Para consultas de estágio único, isso mede a mesma duração que a latência E2E. O sistema relata essa métrica por tarefa.
  2. Latência de enfileiramento da fonte (sourceQueuingLatencyMs). O tempo decorrido entre quando o sistema grava um registro em um barramento de mensagens, por exemplo, o tempo de adição de log no Kafka, e quando a consulta em modo em tempo real lê o registro pela primeira vez. O sistema relata essa métrica por tarefa.
  3. Latência E2E (e2eLatencyMs). O intervalo entre o momento em que o sistema grava o registro em um barramento de mensagens e quando a consulta no modo de tempo real grava o registro a jusante. O sistema agrega essa métrica por lote em todos os registros processados por todas as tarefas.

Por exemplo:

"rtmMetrics" : {
    "processingLatencyMs" : {
      "P0" : 0,
      "P50" : 0,
      "P90" : 0,
      "P95" : 0,
      "P99" : 0
    },
    "sourceQueuingLatencyMs" : {
      "P0" : 0,
      "P50" : 1,
      "P90" : 1,
      "P95" : 2,
      "P99" : 3
    },
    "e2eLatencyMs" : {
      "P0" : 0,
      "P50" : 1,
      "P90" : 1,
      "P95" : 2,
      "P99" : 4
    },

Medida de latência personalizada com a API de Observação

A API de Observação ajuda a medir a latência sem iniciar outro trabalho. Se você tiver um carimbo de data/hora de origem que se aproxima da hora de chegada dos dados de origem, poderá estimar a latência de cada lote usando a API Observe. Passe o carimbo de data/hora antes de chegar ao coletor:

Python

from datetime import datetime

from pyspark.sql.functions import avg, col, lit, max, percentile_approx, udf, unix_millis
from pyspark.sql.types import TimestampType

@udf(returnType=TimestampType())
def current_timestamp():
  return datetime.now()

# Query before outputting
.withColumn("temp-timestamp", current_timestamp())
.withColumn(
  "latency",
  unix_millis(col("temp-timestamp")).cast("long") - unix_millis(col("timestamp")).cast("long"))
.observe(
  "observedLatency",
  avg(col("latency")).alias("avg"),
  max(col("latency")).alias("max"),
  percentile_approx(col("latency"), lit(0.99), lit(150)).alias("p99"),
  percentile_approx(col("latency"), lit(0.5), lit(150)).alias("p50"))
.drop(col("latency"))
.drop(col("temp-timestamp"))
# Output part of the query. For example, .WriteStream, etc.

Scala

import org.apache.spark.sql.functions.{avg, col, lit, max, percentile_approx, udf, unix_millis}

val currentTimestampUDF = udf(() => System.currentTimeMillis())

// Query before outputting
.withColumn("temp-timestamp", currentTimestampUDF())
.withColumn(
  "latency",
  col("temp-timestamp").cast("long") - unix_millis(col("timestamp")).cast("long"))
.observe(
  name = "observedLatency",
  avg(col("latency")).as("avg"),
  max(col("latency")).as("max"),
  percentile_approx(col("latency"), lit(0.99), lit(150)).as("p99"),
  percentile_approx(col("latency"), lit(0.5), lit(150)).as("p50"))
.drop(col("latency"))
.drop(col("temp-timestamp"))
// Output part of the query. For example, .WriteStream, etc.

Neste exemplo, um carimbo de data/hora atual é registrado antes de gerar a entrada e a latência é estimada calculando a diferença entre esse carimbo de data/hora e o carimbo de data/hora de origem do registro. Os resultados são incluídos em relatórios de progresso e disponibilizados aos ouvintes. Aqui está um resultado de exemplo:

"observedMetrics" : {
  "observedLatency" : {
    "avg" : 63.8369765176552,
    "max" : 219,
    "p99" : 154,
    "p50" : 49
  }
}

Suporte e limitações de recursos

Esta seção descreve os recursos compatíveis e as limitações atuais do modo em tempo real, incluindo ambientes compatíveis, idiomas, fontes, coletores, operadores e considerações especiais para recursos específicos.

Ambientes, idiomas e modos com suporte

Tipo de computação Supported
Dedicado (anteriormente: usuário único) Yes
Standard (anteriormente: compartilhado) No
Lakeflow Spark Declarative Pipelines Classic (Pipelines Declarativas do Lakeflow Spark Classic) No
Pipelines Declarativos do Lakeflow Spark sem servidor No
Serverless No

Idiomas com suporte:

Linguagem Supported
Scala Yes
Java Yes
Python Yes

Modos de execução com suporte:

Modo de Execução Supported
Modo de atualização Yes
modo de acréscimo No
Modo completo No

Fontes e coletores com suporte

Fontes:

Sources Supported
Apache Kafka Yes
AWS MSK Yes
Hubs de Eventos (usando o Conector Kafka) Yes
Kinesis Sim (somente modo EFO)
Google Pub/Sub No
Apache Pulsar No

Dissipadores:

Sinks Supported
Apache Kafka Yes
Event Hubs (usando o Conector Kafka) Yes
Kinesis No
Google Pub/Sub No
Apache Pulsar No
Coletores arbitrários (usando forEachWriter) Yes

Operadores suportados

Operators Supported
Operações sem estado
Selection Yes
Projection Yes
UDFs
Scala UDF Sim (com algumas limitações)
Python UDF Sim (com algumas limitações)
Agregação
sum Yes
count Yes
max Yes
min Yes
avg Yes
Funções de agregação Yes
Janelas
Tumbling Yes
Sliding Yes
Session No
Deduplicação
dropDuplicates Sim (o estado não está limitado)
dropDuplicatesWithinWatermark No
Stream – Junção de Tabela
Tabela de difusão (deve ser pequena) Yes
Fluxo - Junção de fluxo No
(achatado)MapGroupsWithState No
transformWithState Sim (com algumas diferenças)
união Sim (com algumas limitações)
forEach Yes
forEachBatch No
mapPartitions Não (ver limitação)

Considerações especiais

Alguns operadores e recursos têm considerações ou diferenças específicas quando usados no modo em tempo real.

transformWithState no modo em tempo real

Para criar aplicativos personalizados com estado, o Databricks oferece suporte a transformWithState, uma API no Streaming Estruturado do Apache Spark. Consulte Criar um aplicativo com estado personalizado para obter mais informações sobre a API e os snippets de código.

No entanto, há algumas diferenças entre como a API se comporta no modo em tempo real e as consultas de streaming tradicionais que aproveitam a arquitetura de microlote.

  • O modo em tempo real chama o método handleInputRows(key: String, inputRows: Iterator[T], timerValues: TimerValues) para cada linha.
    • O inputRows iterador retorna um único valor. O modo de micro-lote o chama uma vez para cada chave, e o inputRows iterador retorna todos os valores para uma chave no micro-lote.
    • Você deve estar ciente dessa diferença ao escrever seu código.
  • Não há suporte para temporizadores de evento no modo em tempo real.
  • No modo em tempo real, os temporizadores são atrasados ao disparar dependendo da chegada dos dados:
    • Se um temporizador for agendado para 10:00:00, mas nenhum dado chegar, o temporizador não será acionado imediatamente.
    • Se os dados chegarem às 10:00:10, o temporizador será acionado com um atraso de 10 segundos.
    • Se nenhum dado chegar e o lote de execução longa estiver sendo encerrado, o temporizador será acionado antes que o lote seja encerrado.

UDFs do Python no modo em tempo real

O Databricks dá suporte à maioria das UDFs (funções definidas pelo usuário) do Python no modo em tempo real:

Tipo de UDF Supported
UDF sem estado
UDF escalar do Python (link) Yes
UDF escalar de seta Yes
UDF escalar do Pandas (link) Yes
Função Arrow (mapInArrow) Yes
Função Pandas (link) Yes
UDF de agrupamento com estado (UDAF)
transformWithState (interface somente Row ) Yes
applyInPandasWithState No
UDF de agrupamento não com estado (UDAF)
aplicar No
applyInArrow No
applyInPandas No
Função Table
UDTF (link) No
UC UDF No

Há vários pontos a serem considerados ao usar UDFs do Python no modo em tempo real:

  • Para minimizar a latência, configure o tamanho do lote do Arrow (spark.sql.execution.arrow.maxRecordsPerBatch) para 1.
    • Compensação: essa configuração otimiza a latência em detrimento da taxa de transferência. Para a maioria das cargas de trabalho, essa configuração é recomendada.
    • Aumente o tamanho do lote somente se uma taxa de transferência maior for necessária para acomodar o volume de entrada, aceitando o potencial aumento na latência.
  • As UDFs e as funções do Pandas não têm um bom desempenho com um tamanho de lote do Arrow de 1.
    • Se você utilizar UDFs ou funções com pandas, ajuste o tamanho de bloco do Arrow para um valor mais elevado (por exemplo, 100 ou mais).
    • Observe que isso implica uma latência maior. O Databricks recomenda o uso de Arrow UDF ou função, se possível.
  • Devido ao problema de desempenho com pandas, transformWithState só tem suporte com a Row interface.

Limitations

Limitações de origem

Para o Kinesis, o modo em tempo real não dá suporte ao modo de sondagem. Além disso, repartições frequentes podem afetar negativamente a latência.

Limitações da união

O operador Union tem algumas limitações:

  • O modo em tempo real não dá suporte à auto-união:
    • Kafka: Você não pode usar o mesmo objeto de data frame de origem e unir data frames derivados dele. Solução alternativa: use dataframes diferentes que leem da mesma fonte.
    • Kinesis: você não pode unir quadros de dados derivados da mesma fonte Kinesis com a mesma configuração. Solução alternativa: além de usar DataFrames diferentes, você pode atribuir uma opção "consumerName" diferente a cada DataFrame.
  • O modo em tempo real não dá suporte a operadores com estado (por exemplo, aggregate, deduplicate, transformWithState) definidos antes da União.
  • O modo em tempo real não dá suporte à união com fontes em lotes.

Limitação de MapPartitions

mapPartitions em Scala e em APIs semelhantes do Python (mapInPandas, mapInArrow) pegam um iterador que percorre toda a partição de entrada e geram um iterador que percorre toda a saída com mapeamento arbitrário entre entrada e saída. Essas APIs podem causar problemas de desempenho no Modo Real-Time de Streaming bloqueando toda a saída, o que aumenta a latência. A semântica dessas APIs não suporta bem a propagação de marca d'água.

Use UDFs escalares combinados com transformar tipos de dados complexos ou filter, em vez disso, para obter funções semelhantes.

Próximas Etapas 

Agora que você entende o que é o modo em tempo real e como configurá-lo, explore esses recursos para começar a implementar aplicativos de streaming em tempo real: