Nota
O acesso a esta página requer autorização. Pode tentar iniciar sessão ou alterar os diretórios.
O acesso a esta página requer autorização. Pode tentar alterar os diretórios.
O modo em tempo real é um tipo de gatilho para Streaming Estruturado que permite o processamento de dados com latência ultra-baixa com latência de ponta a ponta tão baixa quanto cinco milissegundos. Use o modo em tempo real para cargas de trabalho operacionais que requerem resposta imediata a dados em fluxo, como deteção de fraude, personalização em tempo real e sistemas de tomada de decisão instantânea.
O modo em tempo real está disponível no Databricks Runtime 16.4 LTS e superior. Para instruções de configuração passo a passo, consulte Começar com o modo em tempo real. Para exemplos de código, veja exemplos de modo em tempo real.
O que é o modo em tempo real?
Cargas de trabalho operacionais vs. analíticas
As cargas de trabalho de streaming podem ser amplamente divididas em cargas de trabalho analíticas e cargas de trabalho operacionais:
- Trabalhos analíticos usam o carregamento e a transformação de dados, normalmente seguindo a arquitetura medalhão (por exemplo, carregamento de dados nas tabelas bronze, prata e ouro).
- As cargas de trabalho operacionais consomem dados em tempo real, aplicam a lógica de negócios e acionam ações ou decisões downstream.
Alguns exemplos de cargas de trabalho operacionais são:
- Bloquear ou sinalizar uma transação de cartão de crédito em tempo real se uma pontuação de fraude exceder um limite, com base em fatores como localização incomum, grande tamanho da transação ou padrões de gastos rápidos.
- Enviar uma mensagem promocional quando os dados de fluxo de cliques indicam que um utilizador está a navegar por jeans há cinco minutos, oferecendo um desconto de 25% se o fizer nos próximos 15 minutos.
Em geral, as cargas de trabalho operacionais caracterizam-se pela necessidade de uma latência de ponta a ponta inferior a um segundo. Isso pode ser alcançado com o modo em tempo real no Apache Spark Structured Streaming.
Como o modo em tempo real alcança baixa latência
O modo em tempo real melhora a arquitetura de execução ao:
- Executar lotes de longa duração (o padrão é cinco minutos), em que o sistema processa dados à medida que ficam disponíveis na fonte.
- Agendar todas as fases da consulta em simultâneo. Isso requer que o número de slots de tarefas disponíveis seja igual ou maior do que o número de tarefas de todos os estágios de um lote.
- Passar dados entre fases assim que são produzidos, usando um streaming shuffle.
No final do processamento de um lote e antes do início do próximo, o Structured Streaming utiliza pontos de verificação para registar o progresso e publica métricas. A duração do lote afeta a frequência dos pontos de controlo:
- Batches mais longos: Checkpointing menos frequente, o que significa repetições mais longas em caso de falha e atraso na disponibilidade de métricas.
- Batches mais curtos: Pontos de verificação mais frequentes, o que pode afetar a latência.
O Databricks recomenda comparar o modo em tempo real com a carga de trabalho alvo para encontrar o intervalo de disparo adequado.
Quando usar o modo em tempo real
Escolha o modo em tempo real quando o seu caso de uso exigir:
- Latência sub-segundo: Aplicações que precisam de responder a dados em milissegundos, como sistemas de deteção de fraude que têm de bloquear transações em tempo real.
- Tomada de decisões operacionais: Sistemas que desencadeiam ações imediatas com base em dados recebidos, como ofertas em tempo real, alertas ou notificações.
- Processamento contínuo: Cargas de trabalho em que os dados devem ser processados assim que chegam, em vez de em lotes periódicos.
Use o modo micro-batch (o gatilho padrão de Streaming Estruturado) quando:
- Processamento analítico: pipelines ETL, transformações de dados e implementações de arquitetura medallion onde os requisitos de latência são medidos em segundos ou minutos.
- Otimização de custos: Cargas de trabalho onde não é necessária uma latência inferior a um segundo, pois o modo em tempo real requer recursos de computação dedicados.
- A frequência dos checkpoints importa: Aplicações que beneficiam de checkpoints mais frequentes para uma recuperação mais rápida.
Requisitos e configuração
O modo em tempo real tem requisitos específicos para configuração de computação e configuração de consultas. Esta secção descreve os pré-requisitos e os passos de configuração necessários para utilizar o modo em tempo real.
Pré-requisitos
Para usar o modo em tempo real, deve cumprir os seguintes requisitos:
- Databricks Runtime 16.4 LTS ou superior: O modo em tempo real está disponível apenas no DBR 16.4 LTS e versões posteriores.
- Computação dedicada: Deve usar uma computação dedicada (anteriormente de utilizador único). Standard (anteriormente partilhados), Lakeflow Spark Declarative Pipelines e clusters sem servidor não são suportados.
- Sem autoescalonamento: O autoescalonamento tem de ser desativado.
- Sem Fotão: A aceleração de fotões não é suportada no modo em tempo real.
-
Configuração da faísca: Deve definir
spark.databricks.streaming.realTimeMode.enabledparatrue.
Configuração de cálculo
Configure o seu computador com as seguintes definições:
- Defina
spark.databricks.streaming.realTimeMode.enabledparatruena configuração do Spark. - Desative o dimensionamento automático.
- Desativar a aceleração de fotões.
- Certifique-se de que o cálculo está configurado como um cluster dedicado (não padrão, Lakeflow Spark Declarative Pipelines, ou serverless).
Para instruções passo a passo sobre como criar e configurar computação para o modo em tempo real, consulte Começar com o modo em tempo real.
Configuração da consulta
Para executar uma consulta em modo em tempo real, deve ativar o gatilho em tempo real. Os gatilhos em tempo real são suportados apenas em modo de atualização.
Python
query = (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", broker_address)
.option("subscribe", input_topic)
.load()
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", broker_address)
.option("topic", output_topic)
.option("checkpointLocation", checkpoint_location)
.outputMode("update")
# In PySpark, the realTime trigger requires specifying the interval.
.trigger(realTime="5 minutes")
.start()
)
Scala
import org.apache.spark.sql.execution.streaming.RealTimeTrigger
val readStream = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", brokerAddress)
.option("subscribe", inputTopic).load()
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", brokerAddress)
.option("topic", outputTopic)
.option("checkpointLocation", checkpointLocation)
.outputMode("update")
.trigger(RealTimeTrigger.apply())
// RealTimeTrigger can also accept an argument specifying the checkpoint interval.
// For example, this code indicates a checkpoint interval of 5 minutes:
// .trigger(RealTimeTrigger.apply("5 minutes"))
.start()
Dimensionamento computacional
Podes executar um trabalho em tempo real por recurso de computação se o cálculo tiver espaços suficientes para tarefas.
Para ser executado no modo de baixa latência, o número total de slots de tarefas disponíveis deve ser maior ou igual ao número de tarefas em todos os estágios de consulta.
Exemplos de cálculo de faixas horárias
| Tipo de pipeline | Configuração | Slots obrigatórios |
|---|---|---|
| Sem estado de estágio único (fonte + sumidouro Kafka) |
maxPartitions = 8 |
8 encaixes |
| Com estado em dois estágios (fonte Kafka + embaralhamento) |
maxPartitions = 8, partições de shuffle = 20 |
28 slots (8 + 20) |
| Três estágios (origem Kafka + shuffle + repartição) |
maxPartitions = 8, dois estágios de embaralhamento de 20 cada |
48 slots (8 + 20 + 20) |
Se não definires maxPartitions, usa o número de partições no tópico Kafka.
Principais considerações
Ao configurar o seu cálculo, considere o seguinte:
- Ao contrário do modo de microlote, as tarefas em tempo real podem ficar ociosas enquanto aguardam dados, portanto, o dimensionamento correto é essencial para evitar o desperdício de recursos.
- Procure atingir um nível de utilização alvo (por exemplo, 50%) ajustando:
-
maxPartitions(para Kafka) -
spark.sql.shuffle.partitions(para fases aleatórias)
-
- O Databricks recomenda definir
maxPartitionspara que cada tarefa consiga gerir múltiplas partições Kafka para reduzir a sobrecarga. - Ajuste os slots de tarefas por trabalhador para corresponder à carga de trabalho para trabalhos simples de um estágio.
- Para trabalhos com muita reorganização, experimente encontrar o número mínimo de partições de reorganização que evitem acumulações e ajuste a partir daí. O sistema de computação não vai agendar a tarefa se não tiver espaços suficientes.
Note
A partir do Databricks Runtime 16.4 LTS e posteriores, todos os pipelines em tempo real utilizam o checkpoint v2, que permite a transição perfeita entre os modos de tempo real e de microlote.
Técnicas de otimização
| Technique | Ativado por padrão |
|---|---|
| pt-PT: Acompanhamento assíncrono do progresso: Move a escrita para o log de offset e o log de confirmação para um processo assíncrono, reduzindo o tempo inter-batch entre dois micro-lotes. Isto pode ajudar a reduzir a latência das consultas de streaming sem estado. | No |
| Checkpointing de estado assíncrono: Ajuda a reduzir a latência das consultas de streaming com estado ao começar a processar o próximo micro-batch assim que o cálculo do micro-batch anterior é concluído, sem esperar pelo checkpointing de estado. | No |
Monitorização e observabilidade
Medir o desempenho das consultas é essencial para cargas de trabalho em tempo real. No modo em tempo real, as métricas tradicionais de duração em lote não refletem a latência real, por isso precisas de abordagens alternativas.
A latência de ponta a ponta é específica da carga de trabalho e, às vezes, só pode ser medida com precisão com a lógica de negócios. Por exemplo, se o carimbo temporal da fonte for emitido em Kafka, pode calcular a latência como a diferença entre o carimbo temporal de saída do Kafka e o da fonte.
Também pode estimar a latência de ponta a ponta usando as métricas e APIs incorporadas descritas abaixo.
Métricas incorporadas com o StreamingQueryProgress
As métricas a seguir são incluídas no StreamingQueryProgress evento, que é registrado automaticamente nos logs do driver. Você também pode acessá-los através da função de retorno do StreamingQueryListener de onQueryProgress().
QueryProgressEvent.json() ou toString() incluem métricas extras do modo em tempo real.
- Latência de processamento (processingLatencyMs). O tempo decorrido entre o momento em que a consulta em modo em tempo real lê um registo e o momento em que a consulta o escreve para a etapa seguinte ou a jusante. Para consultas de estágio único, isso mede a mesma duração que a latência E2E. O sistema reporta esta métrica por tarefa.
- Latência de enfileiramento da fonte (sourceQueuingLatencyMs). O intervalo de tempo decorrido entre o momento em que o sistema escreve um registo num barramento de mensagens, por exemplo, o tempo de anexação do registo no Kafka, e o momento em que a consulta em modo em tempo real lê o registo pela primeira vez. O sistema reporta esta métrica por tarefa.
- Latência E2E (e2eLatencyMs). O tempo entre o momento em que o sistema escreve o registo num barramento de mensagens e o momento em que a consulta em modo de tempo real escreve o registo a jusante. O sistema agrega esta métrica por lote em todos os registos processados por todas as tarefas.
Por exemplo:
"rtmMetrics" : {
"processingLatencyMs" : {
"P0" : 0,
"P50" : 0,
"P90" : 0,
"P95" : 0,
"P99" : 0
},
"sourceQueuingLatencyMs" : {
"P0" : 0,
"P50" : 1,
"P90" : 1,
"P95" : 2,
"P99" : 3
},
"e2eLatencyMs" : {
"P0" : 0,
"P50" : 1,
"P90" : 1,
"P95" : 2,
"P99" : 4
},
Medição personalizada de latência com a API Observe
A API do Observe ajuda a medir a latência sem iniciar outro trabalho. Se tiver um carimbo temporal de origem que aproxima o tempo de chegada dos dados de origem, pode estimar a latência de cada lote usando a API Observe. Passe o carimbo temporal antes de chegar ao lava-loiça:
Python
from datetime import datetime
from pyspark.sql.functions import avg, col, lit, max, percentile_approx, udf, unix_millis
from pyspark.sql.types import TimestampType
@udf(returnType=TimestampType())
def current_timestamp():
return datetime.now()
# Query before outputting
.withColumn("temp-timestamp", current_timestamp())
.withColumn(
"latency",
unix_millis(col("temp-timestamp")).cast("long") - unix_millis(col("timestamp")).cast("long"))
.observe(
"observedLatency",
avg(col("latency")).alias("avg"),
max(col("latency")).alias("max"),
percentile_approx(col("latency"), lit(0.99), lit(150)).alias("p99"),
percentile_approx(col("latency"), lit(0.5), lit(150)).alias("p50"))
.drop(col("latency"))
.drop(col("temp-timestamp"))
# Output part of the query. For example, .WriteStream, etc.
Scala
import org.apache.spark.sql.functions.{avg, col, lit, max, percentile_approx, udf, unix_millis}
val currentTimestampUDF = udf(() => System.currentTimeMillis())
// Query before outputting
.withColumn("temp-timestamp", currentTimestampUDF())
.withColumn(
"latency",
col("temp-timestamp").cast("long") - unix_millis(col("timestamp")).cast("long"))
.observe(
name = "observedLatency",
avg(col("latency")).as("avg"),
max(col("latency")).as("max"),
percentile_approx(col("latency"), lit(0.99), lit(150)).as("p99"),
percentile_approx(col("latency"), lit(0.5), lit(150)).as("p50"))
.drop(col("latency"))
.drop(col("temp-timestamp"))
// Output part of the query. For example, .WriteStream, etc.
Neste exemplo, um carimbo de data/hora atual é registrado antes de enviar a entrada, e a latência é estimada calculando a diferença entre esse carimbo de data/hora e o carimbo de data/hora de origem do registro. Os resultados são incluídos em relatórios de progresso e disponibilizados aos ouvintes. Aqui está um exemplo de resultado:
"observedMetrics" : {
"observedLatency" : {
"avg" : 63.8369765176552,
"max" : 219,
"p99" : 154,
"p50" : 49
}
}
Suporte a funcionalidades e limitações
Esta secção descreve as funcionalidades suportadas e as limitações atuais do modo em tempo real, incluindo ambientes compatíveis, linguagens, fontes, sumidouros, operadores e considerações especiais para funcionalidades específicas.
Ambientes, linguagens e modos suportados
| Tipo de computação | Supported |
|---|---|
| Dedicado (anteriormente: usuário único) | Yes |
| Padrão (anteriormente: compartilhado) | No |
| Lakeflow Spark Oleodutos Declarativos Classic | No |
| Lakeflow Spark Pipelines Declarativos Sem Servidor | No |
| Serverless | No |
Idiomas suportados:
| Linguagem | Supported |
|---|---|
| Scala | Yes |
| Java | Yes |
| Python | Yes |
Modos de execução suportados:
| Modo de execução | Supported |
|---|---|
| Modo de atualização | Yes |
| modo de acrescento | No |
| Modo completo | No |
Fontes e sumidouros suportados
Fontes:
| Sources | Supported |
|---|---|
| Apache Kafka | Yes |
| AWS MSK | Yes |
| Centros de Eventos (usando o Conector Kafka) | Yes |
| Kinesis | Sim (apenas modo EFO) |
| Google Pub/Sub | No |
| Apache Pulsar | No |
Pias:
| Sinks | Supported |
|---|---|
| Apache Kafka | Yes |
| Centros de Eventos (usando o Conector Kafka) | Yes |
| Kinesis | No |
| Google Pub/Sub | No |
| Apache Pulsar | No |
| Sumidouros arbitrários (usando forEachWriter) | Yes |
Operadores suportados
| Operators | Supported |
|---|---|
| Operações Sem Estado | |
| Selection | Yes |
| Projection | Yes |
| UDF | |
| Scala UDF | Sim (com algumas limitações) |
| Python UDF | Sim (com algumas limitações) |
| Agregação | |
| sum | Yes |
| count | Yes |
| max | Yes |
| min | Yes |
| avg | Yes |
| Funções de agregação | Yes |
| Técnica de Janelação | |
| Tumbling | Yes |
| Sliding | Yes |
| Session | No |
| Desduplicação | |
| dropDuplicates | Sim (o estado é ilimitado) |
| dropDuplicatesWithinWatermark | No |
| Stream - Juntação de Mesa | |
| Tabela de difusão (deverá ser pequena) | Yes |
| Stream - Stream Junte-se | No |
| (plano)MapGroupsWithState | No |
| transformWithState | Sim (com algumas diferenças) |
| união | Sim (com algumas limitações) |
| forEach | Yes |
| paraCadaLote | No |
| mapPartitions | Não (ver limitação) |
Considerações especiais
Alguns operadores e funcionalidades têm considerações ou diferenças específicas quando usados em modo em tempo real.
transformWithState em modo de tempo real
Para a criação de aplicativos baseados em estado personalizados, o Databricks suporta transformWithState, uma API no Apache Spark Structured Streaming. Consulte Criar um aplicativo com monitoração de estado personalizado para obter mais informações sobre a API e trechos de código.
No entanto, há algumas diferenças entre como a API se comporta no modo de tempo real e as consultas de streaming tradicionais que aproveitam a arquitetura de microlote.
- O modo em tempo real chama o método
handleInputRows(key: String, inputRows: Iterator[T], timerValues: TimerValues)para cada linha.- O
inputRowsiterador retorna um único valor. O modo micro-batch chama-o uma vez para cada chave, e oinputRowsiterador devolve todos os valores de uma chave no micro batch. - Deve estar ciente desta diferença ao escrever o seu código.
- O
- Os temporizadores de tempo de evento não são suportados no modo de tempo real.
- No modo em tempo real, os temporizadores atrasam o disparo dependendo da chegada dos dados:
- Se um temporizador estiver agendado para as 10:00:00, mas não houver dados disponíveis, o temporizador não aciona imediatamente.
- Se os dados chegarem às 10:00:10, o temporizador dispara com um atraso de 10 segundos.
- Se não chegarem dados e o processo de longa duração estiver a terminar, o temporizador dispara antes do processo terminar.
UDFs em Python em modo em tempo real
O Databricks suporta a maioria das funções definidas pelo usuário (UDFs) do Python no modo em tempo real:
| Tipo UDF | Supported |
|---|---|
| UDF sem estado | |
| UDF escalar Python (link) | Yes |
| Seta escalar UDF | Yes |
| Pandas escalar UDF (link) | Yes |
Função de seta (mapInArrow) |
Yes |
| Função Pandas (link) | Yes |
| Agrupamento Estadual UDF (UDAF) | |
transformWithState (apenas Row interface) |
Yes |
| applyInPandasWithState | No |
| Agrupamento não estadual UDF (UDAF) | |
| aplicar | No |
| applyInArrow | No |
| applyInPandas | No |
| Função da tabela | |
| UDTF (link) | No |
| UC UDF | No |
Há vários pontos a considerar ao usar UDFs Python no modo de tempo real:
- Para minimizar a latência, configure o tamanho do lote Arrow (
spark.sql.execution.arrow.maxRecordsPerBatch) para 1.- Compensação: essa configuração otimiza a latência em detrimento da taxa de transferência. Para a maioria das cargas de trabalho, essa configuração é recomendada.
- Aumente o tamanho do lote somente se uma taxa de transferência maior for necessária para acomodar o volume de entrada, aceitando o potencial aumento na latência.
- Pandas UDFs e funções não têm um bom desempenho com um tamanho de lote de Arrow de 1.
- Se utilizares UDFs de pandas ou funções, define o tamanho de conjunto do Arrow para um valor mais alto (por exemplo, 100 ou superior).
- Observe que isso implica maior latência. O Databricks recomenda usar o Arrow UDF ou uma função, se possível.
- Devido ao problema de desempenho com pandas, transformWithState só é suportado com a interface
Row.
Limitations
Limitações da fonte
Para o Kinesis, o modo em tempo real não suporta o modo de sondagem. Além disso, as repartições frequentes podem afetar negativamente a latência.
Limitações sindicais
O operador da União tem algumas limitações:
- O modo em tempo real não suporta autounição:
- Kafka: Não podes usar o mesmo objeto de data frame de origem e unir dataframes derivados dele. Solução alternativa: Use DataFrames diferentes que leiam da mesma fonte.
- Kinese: Não se podem unir quadros de dados derivados da mesma fonte Kinesis com a mesma configuração. Solução alternativa: Além de usar DataFrames diferentes, pode atribuir uma opção diferente de 'Nome do consumidor' a cada DataFrame.
- O modo em tempo real não suporta operadores com estado (por exemplo,
aggregate,deduplicate,transformWithState) definidos antes da União. - O modo em tempo real não suporta união com fontes em lote.
Limitação do MapPartitions
mapPartitions em Scala e APIs semelhantes em Python (mapInPandas, mapInArrow) recebem um iterador da partição inteira de entrada e produzem um iterador da saída inteira com mapeamento arbitrário entre entrada e saída. Estas APIs podem causar problemas de desempenho no Modo Real-Time de Streaming ao bloquear toda a saída, o que aumenta a latência. A semântica destas APIs não suporta bem a propagação de metadados.
Use UDFs escalares combinados com Transform tipos de dados complexos ou filter para obter funcionalidades semelhantes.
Passos seguintes
Agora que percebe o que é o modo em tempo real e como o configurar, explore estes recursos para começar a implementar aplicações de streaming em tempo real:
- Comece com o modo em tempo real - Siga instruções passo a passo para configurar, calcular e executar a sua primeira consulta de streaming em tempo real.
- Exemplos de código em modo em tempo real - Explore exemplos funcionais, incluindo fontes e sinks Kafka, consultas com estado, agregações e sinks personalizados.
- Conceitos de Streaming Estruturado - Aprenda os conceitos fundamentais do Streaming Estruturado em Databricks.