Compartilhar via


O que é streaming com estado?

Esta página explica as consultas de Streaming Estruturado com estado, incluindo operações com estado, recomendações de otimização, encadeamento de vários operadores com estado e rebalanceamento de estado.

Uma consulta de Streaming Estruturado com estado requer atualizações incrementais para informações de estado intermediário, enquanto uma consulta de Streaming Estruturado sem estado rastreia apenas informações sobre quais linhas foram processadas da origem para o coletor. Para obter recursos de otimização disponíveis para consultas sem estado, consulte Otimizar consultas de streaming sem estado.

Operações com estado

As operações com estado incluem agregação de streaming, distinct, dropDuplicates, junções entre fluxos e aplicativos com estado personalizados.

As informações de estado intermediário necessárias para consultas de Streaming Estruturado com estado podem levar a problemas inesperados de latência e produção se configurados incorretamente.

No Databricks Runtime 13.3 LTS ou posterior, você pode habilitar o ponto de verificação do changelog com o RocksDB para reduzir a duração do ponto de verificação e a latência de ponta a ponta para cargas de trabalho de Streaming Estruturado. O Databricks recomenda habilitar o ponto de verificação do changelog para todas as consultas com estado de Streaming Estruturado. Confira Habilitar o ponto de verificação do log de alterações.

Otimizar consultas de streaming estruturadas com estado

A Databricks recomenda o seguinte para consultas de Streaming Estruturadas com estado:

  • Use instâncias otimizadas para computação como trabalhadores.
  • Defina o número de partições embaralhadas como 1 a 2 vezes o número de núcleos no cluster.
  • Definir a configuração spark.sql.streaming.noDataMicroBatches.enabled como false na SparkSession. Isso impede que o mecanismo de microlote de streaming processe microlotes que não contêm dados. Definir essa configuração false também pode resultar em operações com estado que usam marcas d'água ou tempo limite de processamento não obtendo saída de dados até que novos dados cheguem em vez de imediatamente.

Databricks recomenda usar RocksDB com checkpoint de changelog para gerenciar o estado dos fluxos com estado. Confira Configurar o repositório de estado do RocksDB no Azure Databricks.

Observação

O esquema de gerenciamento de estado não pode ser alterado entre reinicializações de consulta. Se uma consulta tiver sido iniciada com o gerenciamento padrão, você deverá reiniciá-la do zero com um novo local de ponto de verificação para alterar o repositório de estado.

Trabalhe com vários operadores de estado no Streaming Estruturado

No Databricks Runtime 13.3 LTS ou versões posteriores, o Azure Databricks oferece suporte avançado para operadores com estado em workloads de Streaming Estruturado. Você pode encadear vários operadores com estado, o que significa que você pode encaminhar a saída de uma operação, como uma agregação em janelas, para outra operação com estado, como uma junção.

No Databricks Runtime 16.2 ou posterior, você pode usar transformWithState em cargas de trabalho com vários operadores com estado. Consulte Criar um aplicativo com estado personalizado.

Os exemplos a seguir demonstram vários padrões que você pode usar.

Importante

As seguintes limitações existem ao se trabalhar com vários operadores com estado:

  • Não há suporte para operadores personalizados herdados com estado (FlatMapGroupWithState e applyInPandasWithState) .
  • Há suporte apenas para o modo de saída de acréscimo.

Agregação encadeada de janelas temporais

Python

words = ...  # streaming DataFrame of schema { timestamp: Timestamp, word: String }

# Group the data by window and word and compute the count of each group
windowedCounts = words.groupBy(
    window(words.timestamp, "10 minutes", "5 minutes"),
    words.word
).count()

# Group the windowed data by another window and word and compute the count of each group
anotherWindowedCounts = windowedCounts.groupBy(
    window(window_time(windowedCounts.window), "1 hour"),
    windowedCounts.word
).count()

Scala (linguagem de programação)

import spark.implicits._

val words = ... // streaming DataFrame of schema { timestamp: Timestamp, word: String }

// Group the data by window and word and compute the count of each group
val windowedCounts = words.groupBy(
  window($"timestamp", "10 minutes", "5 minutes"),
  $"word"
).count()

// Group the windowed data by another window and word and compute the count of each group
val anotherWindowedCounts = windowedCounts.groupBy(
  window($"window", "1 hour"),
  $"word"
).count()

Agregação por janelas de tempo em dois fluxos de dados diferentes seguida de junção de janelas entre fluxos.

Python

clicksWindow = clicksWithWatermark.groupBy(
  clicksWithWatermark.clickAdId,
  window(clicksWithWatermark.clickTime, "1 hour")
).count()

impressionsWindow = impressionsWithWatermark.groupBy(
  impressionsWithWatermark.impressionAdId,
  window(impressionsWithWatermark.impressionTime, "1 hour")
).count()

clicksWindow.join(impressionsWindow, "window", "inner")

Scala (linguagem de programação)

val clicksWindow = clicksWithWatermark
  .groupBy(window("clickTime", "1 hour"))
  .count()

val impressionsWindow = impressionsWithWatermark
  .groupBy(window("impressionTime", "1 hour"))
  .count()

clicksWindow.join(impressionsWindow, "window", "inner")

Junção entre fluxos baseada em intervalo de tempo seguida pela agregação da janela temporal

Python

joined = impressionsWithWatermark.join(
  clicksWithWatermark,
  expr("""
    clickAdId = impressionAdId AND
    clickTime >= impressionTime AND
    clickTime <= impressionTime + interval 1 hour
    """),
  "leftOuter"                 # can be "inner", "leftOuter", "rightOuter", "fullOuter", "leftSemi"
)

joined.groupBy(
  joined.clickAdId,
  window(joined.clickTime, "1 hour")
).count()

Scala (linguagem de programação)

val joined = impressionsWithWatermark.join(
  clicksWithWatermark,
  expr("""
    clickAdId = impressionAdId AND
    clickTime >= impressionTime AND
    clickTime <= impressionTime + interval 1 hour
  """),
  joinType = "leftOuter"      // can be "inner", "leftOuter", "rightOuter", "fullOuter", "leftSemi"
)

joined
  .groupBy($"clickAdId", window($"clickTime", "1 hour"))
  .count()

Rebalanceamento de estado para Streaming Estruturado

O rebalanceamento de estado é habilitado por padrão para todas as cargas de trabalho de streaming em Lakeflow Spark Declarative Pipelines. No Databricks Runtime 11.3 LTS ou posterior, você pode definir a seguinte opção de configuração na configuração do cluster Spark para habilitar o reequilíbrio de estado:

spark.sql.streaming.statefulOperator.stateRebalancing.enabled true

O rebalanceamento de estado beneficia pipelines de Streaming Estruturado com estado que passam por eventos de redimensionamento de cluster. As operações de streaming sem estado não se beneficiam, independentemente da alteração dos tamanhos do cluster.

Observação

O autoescalonamento de computação tem limitações ao reduzir o tamanho do cluster para cargas de trabalho de Structured Streaming. O Databricks recomenda usar os pipelines declarativos do Lakeflow Spark com dimensionamento automático aprimorado para cargas de trabalho de streaming. Consulte Otimize a utilização do cluster dos pipelines declarativos do Lakeflow Spark com dimensionamento automático.

Os eventos de redimensionamento do cluster acionam o reequilíbrio de estado. Os microlotes podem ter maior latência durante o reequilíbrio de eventos à medida que o estado é carregado do armazenamento em nuvem para os novos executores.