Partilhar via


Modo em tempo real no Structured Streaming

Important

Este recurso está no Public Preview.

O modo em tempo real é um tipo de gatilho para Streaming Estruturado que permite o processamento de dados com latência ultra-baixa com latência de ponta a ponta tão baixa quanto cinco milissegundos. Use o modo em tempo real para cargas de trabalho operacionais que requerem resposta imediata a dados em fluxo, como deteção de fraude, personalização em tempo real e sistemas de tomada de decisão instantânea.

O modo em tempo real está disponível no Databricks Runtime 16.4 LTS e superior. Para instruções de configuração passo a passo, consulte Começar com o modo em tempo real. Para exemplos de código, veja exemplos de modo em tempo real.

O que é o modo em tempo real?

Cargas de trabalho operacionais vs. analíticas

As cargas de trabalho de streaming podem ser amplamente divididas em cargas de trabalho analíticas e cargas de trabalho operacionais:

  • Trabalhos analíticos usam o carregamento e a transformação de dados, normalmente seguindo a arquitetura medalhão (por exemplo, carregamento de dados nas tabelas bronze, prata e ouro).
  • As cargas de trabalho operacionais consomem dados em tempo real, aplicam a lógica de negócios e acionam ações ou decisões downstream.

Alguns exemplos de cargas de trabalho operacionais são:

  • Bloquear ou sinalizar uma transação de cartão de crédito em tempo real se uma pontuação de fraude exceder um limite, com base em fatores como localização incomum, grande tamanho da transação ou padrões de gastos rápidos.
  • Enviar uma mensagem promocional quando os dados de fluxo de cliques indicam que um utilizador está a navegar por jeans há cinco minutos, oferecendo um desconto de 25% se o fizer nos próximos 15 minutos.

Em geral, as cargas de trabalho operacionais caracterizam-se pela necessidade de uma latência de ponta a ponta inferior a um segundo. Isso pode ser alcançado com o modo em tempo real no Apache Spark Structured Streaming.

Como o modo em tempo real alcança baixa latência

O modo em tempo real melhora a arquitetura de execução ao:

  • Executar lotes de longa duração (o padrão é cinco minutos), em que o sistema processa dados à medida que ficam disponíveis na fonte.
  • Agendar todas as fases da consulta em simultâneo. Isso requer que o número de slots de tarefas disponíveis seja igual ou maior do que o número de tarefas de todos os estágios de um lote.
  • Passar dados entre fases assim que são produzidos, usando um streaming shuffle.

No final do processamento de um lote e antes do início do próximo, o Structured Streaming utiliza pontos de verificação para registar o progresso e publica métricas. A duração do lote afeta a frequência dos pontos de controlo:

  • Batches mais longos: Checkpointing menos frequente, o que significa repetições mais longas em caso de falha e atraso na disponibilidade de métricas.
  • Batches mais curtos: Pontos de verificação mais frequentes, o que pode afetar a latência.

O Databricks recomenda comparar o modo em tempo real com a carga de trabalho alvo para encontrar o intervalo de disparo adequado.

Quando usar o modo em tempo real

Escolha o modo em tempo real quando o seu caso de uso exigir:

  • Latência sub-segundo: Aplicações que precisam de responder a dados em milissegundos, como sistemas de deteção de fraude que têm de bloquear transações em tempo real.
  • Tomada de decisões operacionais: Sistemas que desencadeiam ações imediatas com base em dados recebidos, como ofertas em tempo real, alertas ou notificações.
  • Processamento contínuo: Cargas de trabalho em que os dados devem ser processados assim que chegam, em vez de em lotes periódicos.

Use o modo micro-batch (o gatilho padrão de Streaming Estruturado) quando:

  • Processamento analítico: pipelines ETL, transformações de dados e implementações de arquitetura medallion onde os requisitos de latência são medidos em segundos ou minutos.
  • Otimização de custos: Cargas de trabalho onde não é necessária uma latência inferior a um segundo, pois o modo em tempo real requer recursos de computação dedicados.
  • A frequência dos checkpoints importa: Aplicações que beneficiam de checkpoints mais frequentes para uma recuperação mais rápida.

Requisitos e configuração

O modo em tempo real tem requisitos específicos para configuração de computação e configuração de consultas. Esta secção descreve os pré-requisitos e os passos de configuração necessários para utilizar o modo em tempo real.

Pré-requisitos

Para usar o modo em tempo real, deve cumprir os seguintes requisitos:

  • Databricks Runtime 16.4 LTS ou superior: O modo em tempo real está disponível apenas no DBR 16.4 LTS e versões posteriores.
  • Computação dedicada: Deve usar uma computação dedicada (anteriormente de utilizador único). Standard (anteriormente partilhados), Lakeflow Spark Declarative Pipelines e clusters sem servidor não são suportados.
  • Sem autoescalonamento: O autoescalonamento tem de ser desativado.
  • Sem Fotão: A aceleração de fotões não é suportada no modo em tempo real.
  • Configuração da faísca: Deve definir spark.databricks.streaming.realTimeMode.enabled para true.

Configuração de cálculo

Configure o seu computador com as seguintes definições:

  • Defina spark.databricks.streaming.realTimeMode.enabled para true na configuração do Spark.
  • Desative o dimensionamento automático.
  • Desativar a aceleração de fotões.
  • Certifique-se de que o cálculo está configurado como um cluster dedicado (não padrão, Lakeflow Spark Declarative Pipelines, ou serverless).

Para instruções passo a passo sobre como criar e configurar computação para o modo em tempo real, consulte Começar com o modo em tempo real.

Configuração da consulta

Para executar uma consulta em modo em tempo real, deve ativar o gatilho em tempo real. Os gatilhos em tempo real são suportados apenas em modo de atualização.

Python

query = (
    spark.readStream
        .format("kafka")
        .option("kafka.bootstrap.servers", broker_address)
        .option("subscribe", input_topic)
        .load()
        .writeStream
        .format("kafka")
        .option("kafka.bootstrap.servers", broker_address)
        .option("topic", output_topic)
        .option("checkpointLocation", checkpoint_location)
        .outputMode("update")
        # In PySpark, the realTime trigger requires specifying the interval.
        .trigger(realTime="5 minutes")
        .start()
)

Scala

import org.apache.spark.sql.execution.streaming.RealTimeTrigger

val readStream = spark.readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", brokerAddress)
      .option("subscribe", inputTopic).load()
      .writeStream
      .format("kafka")
      .option("kafka.bootstrap.servers", brokerAddress)
      .option("topic", outputTopic)
      .option("checkpointLocation", checkpointLocation)
      .outputMode("update")
      .trigger(RealTimeTrigger.apply())
      // RealTimeTrigger can also accept an argument specifying the checkpoint interval.
      // For example, this code indicates a checkpoint interval of 5 minutes:
      // .trigger(RealTimeTrigger.apply("5 minutes"))
      .start()

Dimensionamento computacional

Podes executar um trabalho em tempo real por recurso de computação se o cálculo tiver espaços suficientes para tarefas.

Para ser executado no modo de baixa latência, o número total de slots de tarefas disponíveis deve ser maior ou igual ao número de tarefas em todos os estágios de consulta.

Exemplos de cálculo de faixas horárias

Tipo de pipeline Configuração Slots obrigatórios
Sem estado de estágio único (fonte + sumidouro Kafka) maxPartitions = 8 8 encaixes
Com estado em dois estágios (fonte Kafka + embaralhamento) maxPartitions = 8, partições de shuffle = 20 28 slots (8 + 20)
Três estágios (origem Kafka + shuffle + repartição) maxPartitions = 8, dois estágios de embaralhamento de 20 cada 48 slots (8 + 20 + 20)

Se não definires maxPartitions, usa o número de partições no tópico Kafka.

Principais considerações

Ao configurar o seu cálculo, considere o seguinte:

  • Ao contrário do modo de microlote, as tarefas em tempo real podem ficar ociosas enquanto aguardam dados, portanto, o dimensionamento correto é essencial para evitar o desperdício de recursos.
  • Procure atingir um nível de utilização alvo (por exemplo, 50%) ajustando:
    • maxPartitions (para Kafka)
    • spark.sql.shuffle.partitions (para fases aleatórias)
  • O Databricks recomenda definir maxPartitions para que cada tarefa consiga gerir múltiplas partições Kafka para reduzir a sobrecarga.
  • Ajuste os slots de tarefas por trabalhador para corresponder à carga de trabalho para trabalhos simples de um estágio.
  • Para trabalhos com muita reorganização, experimente encontrar o número mínimo de partições de reorganização que evitem acumulações e ajuste a partir daí. O sistema de computação não vai agendar a tarefa se não tiver espaços suficientes.

Note

A partir do Databricks Runtime 16.4 LTS e posteriores, todos os pipelines em tempo real utilizam o checkpoint v2, que permite a transição perfeita entre os modos de tempo real e de microlote.

Técnicas de otimização

Technique Ativado por padrão
pt-PT: Acompanhamento assíncrono do progresso: Move a escrita para o log de offset e o log de confirmação para um processo assíncrono, reduzindo o tempo inter-batch entre dois micro-lotes. Isto pode ajudar a reduzir a latência das consultas de streaming sem estado. No
Checkpointing de estado assíncrono: Ajuda a reduzir a latência das consultas de streaming com estado ao começar a processar o próximo micro-batch assim que o cálculo do micro-batch anterior é concluído, sem esperar pelo checkpointing de estado. No

Monitorização e observabilidade

Medir o desempenho das consultas é essencial para cargas de trabalho em tempo real. No modo em tempo real, as métricas tradicionais de duração em lote não refletem a latência real, por isso precisas de abordagens alternativas.

A latência de ponta a ponta é específica da carga de trabalho e, às vezes, só pode ser medida com precisão com a lógica de negócios. Por exemplo, se o carimbo temporal da fonte for emitido em Kafka, pode calcular a latência como a diferença entre o carimbo temporal de saída do Kafka e o da fonte.

Também pode estimar a latência de ponta a ponta usando as métricas e APIs incorporadas descritas abaixo.

Métricas incorporadas com o StreamingQueryProgress

As métricas a seguir são incluídas no StreamingQueryProgress evento, que é registrado automaticamente nos logs do driver. Você também pode acessá-los através da função de retorno do StreamingQueryListener de onQueryProgress(). QueryProgressEvent.json() ou toString() incluem métricas extras do modo em tempo real.

  1. Latência de processamento (processingLatencyMs). O tempo decorrido entre o momento em que a consulta em modo em tempo real lê um registo e o momento em que a consulta o escreve para a etapa seguinte ou a jusante. Para consultas de estágio único, isso mede a mesma duração que a latência E2E. O sistema reporta esta métrica por tarefa.
  2. Latência de enfileiramento da fonte (sourceQueuingLatencyMs). O intervalo de tempo decorrido entre o momento em que o sistema escreve um registo num barramento de mensagens, por exemplo, o tempo de anexação do registo no Kafka, e o momento em que a consulta em modo em tempo real lê o registo pela primeira vez. O sistema reporta esta métrica por tarefa.
  3. Latência E2E (e2eLatencyMs). O tempo entre o momento em que o sistema escreve o registo num barramento de mensagens e o momento em que a consulta em modo de tempo real escreve o registo a jusante. O sistema agrega esta métrica por lote em todos os registos processados por todas as tarefas.

Por exemplo:

"rtmMetrics" : {
    "processingLatencyMs" : {
      "P0" : 0,
      "P50" : 0,
      "P90" : 0,
      "P95" : 0,
      "P99" : 0
    },
    "sourceQueuingLatencyMs" : {
      "P0" : 0,
      "P50" : 1,
      "P90" : 1,
      "P95" : 2,
      "P99" : 3
    },
    "e2eLatencyMs" : {
      "P0" : 0,
      "P50" : 1,
      "P90" : 1,
      "P95" : 2,
      "P99" : 4
    },

Medição personalizada de latência com a API Observe

A API do Observe ajuda a medir a latência sem iniciar outro trabalho. Se tiver um carimbo temporal de origem que aproxima o tempo de chegada dos dados de origem, pode estimar a latência de cada lote usando a API Observe. Passe o carimbo temporal antes de chegar ao lava-loiça:

Python

from datetime import datetime

from pyspark.sql.functions import avg, col, lit, max, percentile_approx, udf, unix_millis
from pyspark.sql.types import TimestampType

@udf(returnType=TimestampType())
def current_timestamp():
  return datetime.now()

# Query before outputting
.withColumn("temp-timestamp", current_timestamp())
.withColumn(
  "latency",
  unix_millis(col("temp-timestamp")).cast("long") - unix_millis(col("timestamp")).cast("long"))
.observe(
  "observedLatency",
  avg(col("latency")).alias("avg"),
  max(col("latency")).alias("max"),
  percentile_approx(col("latency"), lit(0.99), lit(150)).alias("p99"),
  percentile_approx(col("latency"), lit(0.5), lit(150)).alias("p50"))
.drop(col("latency"))
.drop(col("temp-timestamp"))
# Output part of the query. For example, .WriteStream, etc.

Scala

import org.apache.spark.sql.functions.{avg, col, lit, max, percentile_approx, udf, unix_millis}

val currentTimestampUDF = udf(() => System.currentTimeMillis())

// Query before outputting
.withColumn("temp-timestamp", currentTimestampUDF())
.withColumn(
  "latency",
  col("temp-timestamp").cast("long") - unix_millis(col("timestamp")).cast("long"))
.observe(
  name = "observedLatency",
  avg(col("latency")).as("avg"),
  max(col("latency")).as("max"),
  percentile_approx(col("latency"), lit(0.99), lit(150)).as("p99"),
  percentile_approx(col("latency"), lit(0.5), lit(150)).as("p50"))
.drop(col("latency"))
.drop(col("temp-timestamp"))
// Output part of the query. For example, .WriteStream, etc.

Neste exemplo, um carimbo de data/hora atual é registrado antes de enviar a entrada, e a latência é estimada calculando a diferença entre esse carimbo de data/hora e o carimbo de data/hora de origem do registro. Os resultados são incluídos em relatórios de progresso e disponibilizados aos ouvintes. Aqui está um exemplo de resultado:

"observedMetrics" : {
  "observedLatency" : {
    "avg" : 63.8369765176552,
    "max" : 219,
    "p99" : 154,
    "p50" : 49
  }
}

Suporte a funcionalidades e limitações

Esta secção descreve as funcionalidades suportadas e as limitações atuais do modo em tempo real, incluindo ambientes compatíveis, linguagens, fontes, sumidouros, operadores e considerações especiais para funcionalidades específicas.

Ambientes, linguagens e modos suportados

Tipo de computação Supported
Dedicado (anteriormente: usuário único) Yes
Padrão (anteriormente: compartilhado) No
Lakeflow Spark Oleodutos Declarativos Classic No
Lakeflow Spark Pipelines Declarativos Sem Servidor No
Serverless No

Idiomas suportados:

Linguagem Supported
Scala Yes
Java Yes
Python Yes

Modos de execução suportados:

Modo de execução Supported
Modo de atualização Yes
modo de acrescento No
Modo completo No

Fontes e sumidouros suportados

Fontes:

Sources Supported
Apache Kafka Yes
AWS MSK Yes
Centros de Eventos (usando o Conector Kafka) Yes
Kinesis Sim (apenas modo EFO)
Google Pub/Sub No
Apache Pulsar No

Pias:

Sinks Supported
Apache Kafka Yes
Centros de Eventos (usando o Conector Kafka) Yes
Kinesis No
Google Pub/Sub No
Apache Pulsar No
Sumidouros arbitrários (usando forEachWriter) Yes

Operadores suportados

Operators Supported
Operações Sem Estado
Selection Yes
Projection Yes
UDF
Scala UDF Sim (com algumas limitações)
Python UDF Sim (com algumas limitações)
Agregação
sum Yes
count Yes
max Yes
min Yes
avg Yes
Funções de agregação Yes
Técnica de Janelação
Tumbling Yes
Sliding Yes
Session No
Desduplicação
dropDuplicates Sim (o estado é ilimitado)
dropDuplicatesWithinWatermark No
Stream - Juntação de Mesa
Tabela de difusão (deverá ser pequena) Yes
Stream - Stream Junte-se No
(plano)MapGroupsWithState No
transformWithState Sim (com algumas diferenças)
união Sim (com algumas limitações)
forEach Yes
paraCadaLote No
mapPartitions Não (ver limitação)

Considerações especiais

Alguns operadores e funcionalidades têm considerações ou diferenças específicas quando usados em modo em tempo real.

transformWithState em modo de tempo real

Para a criação de aplicativos baseados em estado personalizados, o Databricks suporta transformWithState, uma API no Apache Spark Structured Streaming. Consulte Criar um aplicativo com monitoração de estado personalizado para obter mais informações sobre a API e trechos de código.

No entanto, há algumas diferenças entre como a API se comporta no modo de tempo real e as consultas de streaming tradicionais que aproveitam a arquitetura de microlote.

  • O modo em tempo real chama o método handleInputRows(key: String, inputRows: Iterator[T], timerValues: TimerValues) para cada linha.
    • O inputRows iterador retorna um único valor. O modo micro-batch chama-o uma vez para cada chave, e o inputRows iterador devolve todos os valores de uma chave no micro batch.
    • Deve estar ciente desta diferença ao escrever o seu código.
  • Os temporizadores de tempo de evento não são suportados no modo de tempo real.
  • No modo em tempo real, os temporizadores atrasam o disparo dependendo da chegada dos dados:
    • Se um temporizador estiver agendado para as 10:00:00, mas não houver dados disponíveis, o temporizador não aciona imediatamente.
    • Se os dados chegarem às 10:00:10, o temporizador dispara com um atraso de 10 segundos.
    • Se não chegarem dados e o processo de longa duração estiver a terminar, o temporizador dispara antes do processo terminar.

UDFs em Python em modo em tempo real

O Databricks suporta a maioria das funções definidas pelo usuário (UDFs) do Python no modo em tempo real:

Tipo UDF Supported
UDF sem estado
UDF escalar Python (link) Yes
Seta escalar UDF Yes
Pandas escalar UDF (link) Yes
Função de seta (mapInArrow) Yes
Função Pandas (link) Yes
Agrupamento Estadual UDF (UDAF)
transformWithState (apenas Row interface) Yes
applyInPandasWithState No
Agrupamento não estadual UDF (UDAF)
aplicar No
applyInArrow No
applyInPandas No
Função da tabela
UDTF (link) No
UC UDF No

Há vários pontos a considerar ao usar UDFs Python no modo de tempo real:

  • Para minimizar a latência, configure o tamanho do lote Arrow (spark.sql.execution.arrow.maxRecordsPerBatch) para 1.
    • Compensação: essa configuração otimiza a latência em detrimento da taxa de transferência. Para a maioria das cargas de trabalho, essa configuração é recomendada.
    • Aumente o tamanho do lote somente se uma taxa de transferência maior for necessária para acomodar o volume de entrada, aceitando o potencial aumento na latência.
  • Pandas UDFs e funções não têm um bom desempenho com um tamanho de lote de Arrow de 1.
    • Se utilizares UDFs de pandas ou funções, define o tamanho de conjunto do Arrow para um valor mais alto (por exemplo, 100 ou superior).
    • Observe que isso implica maior latência. O Databricks recomenda usar o Arrow UDF ou uma função, se possível.
  • Devido ao problema de desempenho com pandas, transformWithState só é suportado com a interface Row.

Limitations

Limitações da fonte

Para o Kinesis, o modo em tempo real não suporta o modo de sondagem. Além disso, as repartições frequentes podem afetar negativamente a latência.

Limitações sindicais

O operador da União tem algumas limitações:

  • O modo em tempo real não suporta autounição:
    • Kafka: Não podes usar o mesmo objeto de data frame de origem e unir dataframes derivados dele. Solução alternativa: Use DataFrames diferentes que leiam da mesma fonte.
    • Kinese: Não se podem unir quadros de dados derivados da mesma fonte Kinesis com a mesma configuração. Solução alternativa: Além de usar DataFrames diferentes, pode atribuir uma opção diferente de 'Nome do consumidor' a cada DataFrame.
  • O modo em tempo real não suporta operadores com estado (por exemplo, aggregate, deduplicate, transformWithState) definidos antes da União.
  • O modo em tempo real não suporta união com fontes em lote.

Limitação do MapPartitions

mapPartitions em Scala e APIs semelhantes em Python (mapInPandas, mapInArrow) recebem um iterador da partição inteira de entrada e produzem um iterador da saída inteira com mapeamento arbitrário entre entrada e saída. Estas APIs podem causar problemas de desempenho no Modo Real-Time de Streaming ao bloquear toda a saída, o que aumenta a latência. A semântica destas APIs não suporta bem a propagação de metadados.

Use UDFs escalares combinados com Transform tipos de dados complexos ou filter para obter funcionalidades semelhantes.

Passos seguintes

Agora que percebe o que é o modo em tempo real e como o configurar, explore estes recursos para começar a implementar aplicações de streaming em tempo real: