Observação
O acesso a essa página exige autorização. Você pode tentar entrar ou alterar diretórios.
O acesso a essa página exige autorização. Você pode tentar alterar os diretórios.
Esta página descreve os conceitos básicos de marca d'água e fornece recomendações para usar marcas d'água em operações comuns de streaming com estado. Você deve aplicar marcas d'água a operações de streaming com estado para evitar expandir infinitamente a quantidade de dados mantidos no estado, o que pode introduzir problemas de memória ou aumentar as latências de processamento durante operações de streaming de longa execução.
O que é uma marca d'água?
Streaming Estruturado usa marcas d'água para controlar o limite de tempo para continuar processando atualizações de uma determinada entidade de estado. Exemplos comuns de entidades de estado incluem:
- Agregações em uma janela de tempo.
- Chaves exclusivas em uma junção entre dois fluxos.
Ao declarar uma marca d'água, você especifica um campo de marcação temporal e um limite de marca d'água em um DataFrame de streaming. À medida que novos dados chegam, o gerenciador de estado acompanha o timestamp mais recente no campo especificado e processa todos os registros dentro do limite de atraso.
O exemplo a seguir aplica um limite de marca d'água de 10 minutos a uma contagem em janela:
from pyspark.sql.functions import window
(df
.withWatermark("event_time", "10 minutes")
.groupBy(
window("event_time", "5 minutes"),
"id")
.count()
)
Neste exemplo:
- A coluna
event_timeé usada para definir uma marca d'água de 10 minutos e uma janela deslizante de 5 minutos. - Uma contagem é coletada para cada
idobservada para cada janela de 5 minutos não sobrepostas. - As informações de estado são mantidas para cada contagem até que o final da janela seja 10 minutos mais antigo do que a
event_timemais recente observada.
Importante
Os limites de marca d'água garantem que os registros que chegam dentro do limite especificado sejam processados de acordo com a semântica da consulta definida. Registros de chegada tardia que chegam fora do limite especificado ainda podem ser processados usando métricas de consulta, mas isso não é garantido.
Como as marcas d'água afetam o tempo de processamento e a taxa de transferência?
As marcas d'água interagem com os modos de saída a serem controlados quando dados são gravados no coletor. Como as marcas d'água reduzem a quantidade total de informações de estado a serem processadas, o uso efetivo de marcas d'água é essencial para uma taxa de transferência eficiente de streaming com estado.
Observação
Nem todos os modos de saída são compatíveis com todas as operações stateful.
Marcas d'água e modo de saída para agregações em janelas
A tabela a seguir detalha o processamento de consultas com agregação em um timestamp com uma marca d'água definida.
| Modo de saída | Comportamento |
|---|---|
| Acrescentar | As linhas são gravadas na tabela de destino depois que o limite da marca d'água é passado. Todas as gravações são atrasadas com base no limite de tardança. O estado de agregação antigo é descartado depois que o limite é passado. |
| Atualizar | As linhas são gravadas na tabela de destino conforme os resultados são calculados e podem ser atualizadas e substituídas à medida que novos dados chegam. O estado de agregação antigo é descartado depois que o limite é passado. |
| Concluído | O estado de agregação não foi descartado. A tabela de destino é reescrita a cada gatilho. |
Marcas d'água e saída para junções de fluxo a fluxo
As junções entre vários fluxos só dão suporte ao modo de acréscimo e os registros correspondentes são gravados em cada lote descoberto. Para junções internas, o Databricks recomenda definir um limite de marca d'água em cada fonte de dados de streaming. Isso permite que as informações de estado sejam descartadas para registros antigos. Sem marcas d'água, o Streaming Estruturado tenta unir todas as chaves de ambos os lados da junção com cada gatilho.
O Streaming Estruturado tem semântica especial para dar suporte a junções externas. A marca d'água é obrigatória para junções externas, pois indica quando uma chave deve ser gravada com um valor nulo após não encontrar correspondência. Embora junções externas possam ser úteis para registrar dados que não são correspondidos durante o processamento de dados, como as junções gravam apenas em tabelas através de operações de adição, esses dados ausentes não são registrados até que o limiar de atraso tenha passado.
Controlar o limite de tolerância para dados atrasados com política de múltiplas marcas-d'água em Streaming Estruturado
Ao trabalhar com várias entradas de Structured Streaming, você pode definir várias marcas-d'água para controlar os limiares de tolerância para dados que chegam atrasados. Configurar marcas-d'água permite controlar as informações de estado, o que impacta a latência.
Uma consulta de streaming pode ter vários fluxos de entrada que são unidos ou combinados. Cada um dos fluxos de entrada pode ter um limite diferente de dados tardios que precisam ser tolerados para operações com estado mantido. Especifique esses limites usando withWatermarks("eventTime", delay) em cada um dos fluxos de entrada. Veja a seguir uma consulta de exemplo com junções de fluxo a fluxo.
val inputStream1 = ... // delays up to 1 hour
val inputStream2 = ... // delays up to 2 hours
inputStream1.withWatermark("eventTime1", "1 hour")
.join(
inputStream2.withWatermark("eventTime2", "2 hours"),
joinCondition)
Durante a execução da consulta, o Streaming Estruturado rastreia individualmente o tempo máximo de evento visto em cada fluxo de entrada, calcula marcas-d'água com base no atraso correspondente e escolhe uma única marca-d'água global com elas para ser usada para operações com estado. Por padrão, o mínimo é escolhido como a marca temporal global porque impede que os dados sejam acidentalmente descartados como tarde demais se um dos fluxos ficar atrás dos outros (por exemplo, um dos fluxos para de receber dados devido a falhas no upstream). Em outras palavras, a marca d'água global se move com segurança garantida no ritmo do fluxo mais lento e o resultado da consulta é atrasado de acordo.
Se quiser resultados mais rápidos, você pode definir a política de marca-d'água múltipla para escolher o valor máximo como a marca-d'água global definindo a configuração de SQL spark.sql.streaming.multipleWatermarkPolicy como max (o padrão é min). Isso permite que a marca d'água global se mova no ritmo do fluxo mais rápido. No entanto, essa configuração remove dados dos fluxos mais lentos. O Databricks recomenda usar essa configuração de forma criteriosa.
Aplicar marcas d'água a operações distintas
A distinct operação é um operador com gerenciamento de estado que requer watermarks para evitar o crescimento descontrolado do estado. Sem marcas d'água, o Structured Streaming tenta rastrear todos os registros exclusivos por tempo indefinido, o que pode levar a problemas de memória ou a aumentos nas latências de processamento.
Ao aplicar distinct a um DataFrame de streaming, você deve especificar uma marca d'água em um campo de carimbo de data/hora. A marca d'água controla por quanto tempo o gerenciador de estado mantém registros para eliminação de duplicação. Depois que o limite da marca d'água for aprovado, os registros antigos serão removidos do estado.
O exemplo a seguir aplica uma marca d'água a uma distinct operação:
Python
streamingDf = spark.readStream. ... # columns: eventTime, id, value, ...
# Apply watermark before distinct operation
(streamingDf
.withWatermark("eventTime", "1 hour")
.distinct()
)
Scala
val streamingDf = spark.readStream. ... // columns: eventTime, id, value, ...
// Apply watermark before distinct operation
streamingDf
.withWatermark("eventTime", "1 hour")
.distinct()
Neste exemplo, os registros duplicados eventTime que chegam dentro de 1 hora após a última observação são removidos do fluxo. As informações de estado para deduplicação são descartadas após a superação do limite.
Importante
Se você precisar deduplicar em colunas específicas em vez de todas as colunas, use dropDuplicates() ou dropDuplicatesWithinWatermark() em vez de distinct. Confira a próxima seção para saber mais detalhes.
Descartar duplicatas dentro da marca d'água
No Databricks Runtime 13.3 LTS ou posterior, você pode deduplicar registros dentro de um limite de marca d'água usando um identificador exclusivo.
O Streaming Estruturado fornece garantias de processamento exatamente uma vez, mas não remove automaticamente a duplicação de registros de fontes de dados. Você pode usar dropDuplicatesWithinWatermark para eliminar a duplicação de registros em qualquer campo especificado, permitindo que você remova duplicatas de um fluxo mesmo que alguns campos sejam diferentes (como hora do evento ou hora de chegada).
Os registros duplicados que chegam na marca d'água especificada têm a garantia de serem descartados. Essa garantia é estrita somente em uma direção, e os registros duplicados que chegam fora do limite especificado também podem ser descartados. Você deve definir o limite de atraso da marca temporal para ser maior do que as diferenças máximas de timestamp entre eventos duplicados a fim de remover todas as duplicatas.
Você deve especificar uma marca d'água para usar o método dropDuplicatesWithinWatermark, como no exemplo a seguir:
Python
streamingDf = spark.readStream. ...
# deduplicate using guid column with watermark based on eventTime column
(streamingDf
.withWatermark("eventTime", "10 hours")
.dropDuplicatesWithinWatermark(["guid"])
)
Scala
val streamingDf = spark.readStream. ... // columns: guid, eventTime, ...
// deduplicate using guid column with watermark based on eventTime column
streamingDf
.withWatermark("eventTime", "10 hours")
.dropDuplicatesWithinWatermark(Seq("guid"))