Compartilhar via


As APIs AUTO CDC: simplificam a captura de dados de alterações com pipelines

O Lakeflow Spark Declarative Pipelines simplifica a captura de alteração de dados (CDC) com AUTO CDC e AUTO CDC FROM SNAPSHOT APIs. Essas APIs automatizam o processamento complexo das dimensões de alteração lenta (SCD - Slowly Changing Dimensions) Tipo 1 e Tipo 2, a partir de dados gerados por CDC ou instantâneos de banco de dados. Para saber mais sobre esses conceitos, veja captura de dados de mudança e instantâneos.

Observação

As AUTO CDC APIs substituem as APPLY CHANGES APIs e têm a mesma sintaxe. As APPLY CHANGES APIs ainda estão disponíveis, mas o Databricks recomenda usar as AUTO CDC APIs em seu lugar.

A API que você usa depende da origem dos dados de alteração:

  • AUTO CDC: Use isso quando o banco de dados de origem estiver com o feed CDC habilitado. AUTO CDC processa alterações de um fluxo de dados de alteração (CDF). Ele tem suporte nas interfaces SQL e Python do pipeline.
  • AUTO CDC FROM SNAPSHOT: use isso quando o CDC não estiver habilitado no banco de dados de origem e somente os instantâneos estiverem disponíveis. Essa API compara instantâneos para determinar as alterações e processá-las. Ele tem suporte apenas na interface do Python.

Ambas as APIs dão suporte à atualização de tabelas usando o SCD Tipo 1 e o Tipo 2:

  • Use o SCD Tipo 1 para atualizar registros diretamente. O histórico não é mantido para registros atualizados.
  • Use o SCD Tipo 2 para manter um histórico de registros, em todas as atualizações ou em atualizações para um conjunto especificado de colunas.

As AUTO CDC APIs não têm suporte dos Pipelines Declarativos do Apache Spark.

Para obter sintaxe e outras referências, consulte AUTO CDC INTO (pipelines), create_auto_cdc_flow e create_auto_cdc_from_snapshot_flow.

Observação

Esta página descreve como atualizar tabelas em seus pipelines com base em alterações nos dados de origem. Para saber como registrar e consultar informações de alteração no nível de linha para tabelas Delta, confira Usar o feed de dados de alterações do Delta Lake no Azure Databricks.

Requirements

Para usar as APIs CDC, o pipeline deve ser configurado para usar o SDP sem servidor ou o SDP Pro ou Advancededições.

Como funciona o AUTO CDC

Para executar o processamento CDC com AUTO CDC, crie uma tabela de streaming e use a instrução AUTO CDC ... INTO no SQL ou a função create_auto_cdc_flow() no Python para especificar a origem, as chaves e a ordenação para o feed de alterações. Para obter uma explicação de como o sequenciamento e a lógica SCD funcionam, consulte captura de dados em tempo real e instantâneos. Veja os exemplos de AUTO CDC.

Para hidratação inicial de uma fonte com um feed de alterações, use AUTO CDC com um once fluxo e continue processando o feed de alterações. Consulte Replicar uma tabela RDBMS externa usando AUTO CDC.

Para obter detalhes de sintaxe, consulte AUTO CDC INTO (pipelines) ou create_auto_cdc_flow.

Como funciona o AUTO CDC FROM SNAPSHOT

AUTO CDC FROM SNAPSHOT determina as alterações nos dados de origem comparando instantâneos em ordem. Ele só tem suporte na interface de pipeline do Python. Você pode ler instantâneos de uma tabela Delta, arquivos de armazenamento em nuvem ou JDBC diretamente.

Para executar o processamento CDC com AUTO CDC FROM SNAPSHOT, crie uma tabela de streaming e, em seguida, use a função create_auto_cdc_from_snapshot_flow() para especificar o snapshot, as chaves e outros argumentos. Para obter detalhes sobre os dois padrões de ingestão e quando usar cada um, consulte Padrões de Processamento de Instantâneo. Veja os exemplos AUTO CDC FROM SNAPSHOT.

Para obter detalhes de sintaxe, consulte create_auto_cdc_from_snapshot_flow.

Usar várias colunas para sequenciamento

Para sequenciar por várias colunas (por exemplo, um carimbo de data/hora e uma ID para quebrar vínculos), use uma STRUCT para combiná-las. A API ordena primeiro pelo primeiro campo e, em caso de empate, considera o segundo campo e assim por diante.

SQL

SEQUENCE BY STRUCT(timestamp_col, id_col)

Python

sequence_by = struct("timestamp_col", "id_col")

Exemplos de AUTO CDC

Os exemplos a seguir demonstram o processamento SCD Tipo 1 e Tipo 2 usando uma fonte de fluxo de dados de alteração. Os dados de exemplo criam novos registros de usuário, excluem um registro de usuário e atualizam os registros do usuário. No exemplo SCD Tipo 1, as últimas UPDATE operações chegam atrasadas e são excluídas da tabela de destino, demonstrando como eventos fora de ordem são tratados.

Veja a seguir os registros de entrada usados nestes exemplos. Esses dados são criados executando a consulta na seção Criar dados de exemplo .

userId nome city operação sequenceNum
124 Raul Oaxaca INSERT 1
123 Isabel Monterrey INSERT 1
125 Mercedes Tijuana INSERT 2
126 Lírio Cancun INSERT 2
123 nulo nulo DELETE 6
125 Mercedes Guadalajara UPDATE 6
125 Mercedes Mexicali UPDATE 5
123 Isabel Chihuahua UPDATE 5

Se você remover o comentário da linha final na consulta de geração de dados de exemplo, ela inserirá o seguinte registro que especifica truncar a tabela em sequenceNum=3:

userId nome city operação sequenceNum
nulo nulo nulo TRUNCAR 3

Observação

Todos os exemplos a seguir incluem opções para especificar tanto as operações DELETE quanto TRUNCATE, mas cada uma é opcional.

Criar dados de exemplo

Execute as instruções a seguir para criar um conjunto de dados de exemplo. Esse código não se destina a ser executado como parte de uma definição de pipeline. Execute-o na pasta de exploração do pipeline, em vez da pasta de transformações.

CREATE SCHEMA IF NOT EXISTS main.cdc_tutorial;

CREATE TABLE main.cdc_tutorial.users_cdf
AS SELECT
  col1 AS userId,
  col2 AS name,
  col3 AS city,
  col4 AS operation,
  col5 AS sequenceNum
FROM (
  VALUES
  -- Initial load.
  (124, "Raul",     "Oaxaca",      "INSERT", 1),
  (123, "Isabel",   "Monterrey",   "INSERT", 1),
  -- New users.
  (125, "Mercedes", "Tijuana",     "INSERT", 2),
  (126, "Lily",     "Cancun",      "INSERT", 2),
  -- Isabel is removed from the system and Mercedes moved to Guadalajara.
  (123, null,       null,          "DELETE", 6),
  (125, "Mercedes", "Guadalajara", "UPDATE", 6),
  -- This batch of updates arrived out of order. The batch at sequenceNum 6 is the final state.
  (125, "Mercedes", "Mexicali",    "UPDATE", 5),
  (123, "Isabel",   "Chihuahua",   "UPDATE", 5)
  -- Uncomment to test TRUNCATE.
  -- ,(null, null,      null,          "TRUNCATE", 3)
);

Processar atualizações do TIPO 1 do SCD

O SCD Type 1 mantém apenas a versão mais recente de cada registro. O exemplo a seguir lê a partir do feed de dados de alteração criado acima e aplica alterações a uma tabela de streaming de destino. Desenvolva pipelines declarativos do Lakeflow Spark para executar esse código.

Python

from pyspark import pipelines as dp
from pyspark.sql.functions import col, expr

@dp.view
def users():
  return spark.readStream.table("main.cdc_tutorial.users_cdf")

dp.create_streaming_table("users_current")

dp.create_auto_cdc_flow(
  target = "users_current",
  source = "users",
  keys = ["userId"],
  sequence_by = col("sequenceNum"),
  apply_as_deletes = expr("operation = 'DELETE'"),
  apply_as_truncates = expr("operation = 'TRUNCATE'"),
  except_column_list = ["operation", "sequenceNum"],
  stored_as_scd_type = 1
)

SQL

CREATE OR REFRESH STREAMING TABLE users_current;

CREATE FLOW apply_cdc AS AUTO CDC INTO
  users_current
FROM
  stream(main.cdc_tutorial.users_cdf)
KEYS
  (userId)
APPLY AS DELETE WHEN
  operation = "DELETE"
APPLY AS TRUNCATE WHEN
  operation = "TRUNCATE"
SEQUENCE BY
  sequenceNum
COLUMNS * EXCEPT
  (operation, sequenceNum)
STORED AS
  SCD TYPE 1;

Depois de executar o exemplo scd tipo 1, a tabela de destino contém os seguintes registros:

userId nome city
124 Raul Oaxaca
125 Mercedes Guadalajara
126 Lírio Cancun

O usuário 123 (Isabel) foi excluído e não aparece. O usuário 125 (Mercedes) mostra apenas a cidade mais recente (Guadalajara) porque o SCD Tipo 1 substitui os valores anteriores. O anterior UPDATE em sequenceNum=5 foi removido porque uma atualização posterior chegou em sequenceNum=6.

Depois de executar o exemplo com o TRUNCATE registro sem comentário, a tabela é apagada em sequenceNum=3. Isso significa que os registros 124 e 126 não estão na tabela e a tabela de destino final contém apenas o seguinte registro:

userId nome city
125 Mercedes Guadalajara

Processar atualizações do SCD Tipo 2

O SCD Tipo 2 preserva um histórico completo de alterações criando novas linhas para cada versão de um registro, com __START_AT e __END_AT colunas indicando quando cada versão estava ativa.

Python

from pyspark import pipelines as dp
from pyspark.sql.functions import col, expr

@dp.view
def users():
  return spark.readStream.table("main.cdc_tutorial.users_cdf")

dp.create_streaming_table("users_history")

dp.create_auto_cdc_flow(
  target = "users_history",
  source = "users",
  keys = ["userId"],
  sequence_by = col("sequenceNum"),
  apply_as_deletes = expr("operation = 'DELETE'"),
  except_column_list = ["operation", "sequenceNum"],
  stored_as_scd_type = "2"
)

SQL

CREATE OR REFRESH STREAMING TABLE users_history;

CREATE FLOW apply_cdc AS AUTO CDC INTO
  users_history
FROM
  stream(main.cdc_tutorial.users_cdf)
KEYS
  (userId)
APPLY AS DELETE WHEN
  operation = "DELETE"
SEQUENCE BY
  sequenceNum
COLUMNS * EXCEPT
  (operation, sequenceNum)
STORED AS
  SCD TYPE 2;

Depois de executar o exemplo scd tipo 2, a tabela de destino contém os seguintes registros:

userId nome city __START_AT __END_AT
123 Isabel Monterrey 1 5
123 Isabel Chihuahua 5 6
124 Raul Oaxaca 1 nulo
125 Mercedes Tijuana 2 5
125 Mercedes Mexicali 5 6
125 Mercedes Guadalajara 6 nulo
126 Lírio Cancun 2 nulo

A tabela preserva o histórico completo. O usuário 123 possui duas versões (encerradas na sequência 6 quando foram excluídas). O usuário 125 tem três versões mostrando alterações na cidade. Os registros com __END_AT = null estão ativos no momento.

Rastrear um subconjunto de colunas com SCD Tipo 2

Por padrão, o SCD Type 2 cria uma nova versão sempre que qualquer valor de coluna é alterado. Você pode especificar um subconjunto de colunas a serem controladas, de modo que as alterações em outras colunas atualizem a versão atual em vigor em vez de gerar um novo registro de histórico.

O exemplo a seguir exclui a city coluna do acompanhamento de histórico:

Python

from pyspark import pipelines as dp
from pyspark.sql.functions import col, expr

@dp.view
def users():
  return spark.readStream.table("main.cdc_tutorial.users_cdf")

dp.create_streaming_table("users_history")

dp.create_auto_cdc_flow(
  target = "users_history",
  source = "users",
  keys = ["userId"],
  sequence_by = col("sequenceNum"),
  apply_as_deletes = expr("operation = 'DELETE'"),
  except_column_list = ["operation", "sequenceNum"],
  stored_as_scd_type = "2",
  track_history_except_column_list = ["city"]
)

SQL

CREATE OR REFRESH STREAMING TABLE users_history;

CREATE FLOW apply_cdc AS AUTO CDC INTO
  users_history
FROM
  stream(main.cdc_tutorial.users_cdf)
KEYS
  (userId)
APPLY AS DELETE WHEN
  operation = "DELETE"
SEQUENCE BY
  sequenceNum
COLUMNS * EXCEPT
  (operation, sequenceNum)
STORED AS
  SCD TYPE 2
TRACK HISTORY ON * EXCEPT
  (city)

Como city as alterações não são controladas, as atualizações da cidade substituem a linha atual em vez de criar uma nova versão. A tabela de destino contém os seguintes registros:

userId nome city __START_AT __END_AT
123 Isabel Chihuahua 1 6
124 Raul Oaxaca 1 nulo
125 Mercedes Guadalajara 2 nulo
126 Lírio Cancun 2 nulo

Exemplos de AUTO CDC DE SNAPSHOT

As seções a seguir fornecem exemplos de como usar AUTO CDC FROM SNAPSHOT para processar instantâneos em tabelas de destino SCD Tipo 1 ou Tipo 2. Para obter informações sobre quando usar essa API, consulte a captura de dados de alteração e instantâneos.

Exemplo: processar instantâneos usando o tempo de ingestão do pipeline

Use esta abordagem quando os snapshots chegarem regularmente e em ordem, e você puder depender do timestamp de execução do pipeline para controle de versão. Um novo instantâneo é ingerido com cada atualização do pipeline de dados.

Você pode ler instantâneos de diversos tipos de fontes, incluindo tabelas Delta, arquivos em armazenamentos na nuvem e conexões JDBC.

Etapa 1: Criar dados de exemplo

Crie uma tabela contendo dados de captura. Execute o seguinte código de um notebook ou do Databricks SQL na pasta explorations do seu pipeline:

CREATE SCHEMA IF NOT EXISTS main.cdc_tutorial;

CREATE TABLE main.cdc_tutorial.snapshot (
  userId INT,
  city STRING
);

INSERT INTO main.cdc_tutorial.snapshot VALUES
  (1, 'Oaxaca'),
  (2, 'Monterrey'),
  (3, 'Tijuana');

Etapa 2: Executar O CDC AUTOMÁTICO DO INSTANTÂNEO

Desenvolva pipelines declarativos do Lakeflow Spark para executar o código nesta etapa.

Escolha um tipo de origem para a visualização de instantâneo (o código de criação de amostra gera uma tabela Delta):

Opção A: Ler de uma tabela Delta
from pyspark import pipelines as dp

@dp.view(name="source")
def source():
  return spark.read.table("main.cdc_tutorial.snapshot")
Opção B: Leitura do armazenamento em nuvem
from pyspark import pipelines as dp

@dp.view(name="source")
def source():
  return spark.read.format("csv").option("header", True).load("<snapshot-path>")
Opção C: Leitura do JDBC (somente computação clássica)
from pyspark import pipelines as dp

@dp.view(name="source")
def source():
  return (spark.read
    .format("jdbc")
    .option("url", "<jdbc-url>")
    .option("dbtable", "<table-name>")
    .option("user", "<username>")
    .option("password", "<password>")
    .load()
  )

Todas as opções, gravar no destino

Em seguida, adicione a tabela de destino e o fluxo:

dp.create_streaming_table("target")

dp.create_auto_cdc_from_snapshot_flow(
  target = "target",
  source = "source",
  keys = ["userId"],
  stored_as_scd_type = 2
)

Após a execução inicial do pipeline, todos os registros são inseridos como linhas ativas:

userId city __START_AT __END_AT
1 Oaxaca 0 nulo
2 Monterrey 0 nulo
3 Tijuana 0 nulo

Observação

Para usar o SCD Tipo 1 e manter apenas o estado atual, defina stored_as_scd_type=1. Nesse caso, a tabela de destino não inclui as colunas __START_AT e __END_AT.

Etapa 3: Simular um novo instantâneo e executar a simulação novamente

Atualize a tabela de origem para simular um novo instantâneo que está chegando (execute este código de um notebook ou arquivo SQL na pasta do seu pipeline explorations):

TRUNCATE TABLE main.cdc_tutorial.snapshot;

INSERT INTO main.cdc_tutorial.snapshot VALUES
  (2, 'Carmel'),
  (3, 'Los Angeles'),
  (4, 'Death Valley'),
  (6, 'Kings Canyon');

Executar o pipeline novamente. AUTO CDC FROM SNAPSHOT compara o novo instantâneo com o anterior e detecta que o usuário 1 foi excluído, os usuários 2 e 3 foram atualizados e os usuários 4 e 6 foram inseridos. Isso gera um feed de alterações e usa AUTO CDC para criar a tabela de saída.

Após a segunda execução com o SCD Tipo 2, a tabela de destino contém os seguintes registros:

userId city __START_AT __END_AT
1 Oaxaca 0 1
2 Monterrey 0 1
2 Carmel 1 nulo
3 Tijuana 0 1
3 Los Angeles 1 nulo
4 Vale da Morte 1 nulo
6 Kings Canyon 1 nulo

O usuário 1 foi encerrado (excluído). Os usuários 2 e 3 têm duas versões mostrando mudanças em suas cidades. Os usuários 4 e 6 foram inseridos recentemente.

Após a segunda execução com o SCD Tipo 1, a tabela de destino mostra apenas o estado atual:

userId city
2 Carmel
3 Los Angeles
4 Vale da Morte
6 Kings Canyon

Exemplo: processar instantâneos usando funções de versão

Use essa abordagem quando precisar de controle explícito sobre a ordenação de instantâneos. Por exemplo, use essa abordagem quando vários instantâneos chegarem ao mesmo tempo ou instantâneos chegarem fora de ordem. Você escreve uma função que especifica qual instantâneo processar em seguida e seu número de versão. A API processa instantâneos na ordem crescente de versão.

  • Se vários instantâneos estiverem no armazenamento, todos eles serão processados em ordem.
  • Se um instantâneo chegar fora de ordem (por exemplo, snapshot_3 chega depois snapshot_4), ele é ignorado.
  • Se não houver novos instantâneos, a função retornará None e não haverá nenhum processamento.

Etapa 1: Preparar arquivos de instantâneo

Crie arquivos CSV contendo dados de instantâneo e adicione-os a um volume ou local de armazenamento em nuvem. Nomeie os arquivos cronologicamente (por exemplo, snapshot_1.csv, ). snapshot_2.csv

Cada arquivo deve conter colunas para userId e city. Por exemplo:

snapshot_1.csv:

userId city
1 Oaxaca
2 Monterrey
3 Tijuana

snapshot_2.csv:

userId city
2 Carmel
3 Los Angeles
4 Vale da Morte

Etapa 2: Executar AUTO CDC FROM SNAPSHOT com uma função de versão

Crie um novo notebook e cole o seguinte código de pipeline. Em seguida, desenvolva pipelines declarativos do Lakeflow Spark.

from pyspark import pipelines as dp
from typing import Optional, Tuple
from pyspark.sql import DataFrame

def next_snapshot_and_version(latest_snapshot_version: Optional[int]) -> Optional[Tuple[DataFrame, int]]:
  snapshot_dir = "/Volumes/main/cdc_tutorial/snapshots/" # or the location you created the sample data

  files = dbutils.fs.ls(snapshot_dir)
  snapshot_files = [f.name for f in files if f.name.startswith("snapshot_") and f.name.endswith(".csv")]

  snapshot_versions = []
  for filename in snapshot_files:
    try:
      version = int(filename.replace("snapshot_", "").replace(".csv", ""))
      snapshot_versions.append(version)
    except ValueError:
      continue

  snapshot_versions.sort()

  if latest_snapshot_version is None:
    if snapshot_versions:
      next_version = snapshot_versions[0]
    else:
      return None
  else:
    next_versions = [v for v in snapshot_versions if v > latest_snapshot_version]
    if next_versions:
      next_version = next_versions[0]
    else:
      return None

  snapshot_path = f"{snapshot_dir}snapshot_{next_version}.csv"
  df = spark.read.format("csv").option("header", True).load(snapshot_path)
  return (df, next_version)


dp.create_streaming_table("main.cdc_tutorial.target_versioned")

dp.create_auto_cdc_from_snapshot_flow(
  target = "main.cdc_tutorial.target_versioned",
  source = next_snapshot_and_version,
  keys = ["userId"],
  stored_as_scd_type = 2
)

Observação

Em vez disso, para usar o SCD Tipo 1, defina stored_as_scd_type=1.

Após o processamento snapshot_1.csv, a tabela de destino contém os seguintes registros:

userId city __START_AT __END_AT
1 Oaxaca 1 nulo
2 Monterrey 1 nulo
3 Tijuana 1 nulo

Após o processamento snapshot_2.csv, a tabela de destino contém os seguintes registros:

userId city __START_AT __END_AT
1 Oaxaca 1 2
2 Monterrey 1 2
2 Carmel 2 nulo
3 Tijuana 1 2
3 Los Angeles 2 nulo
4 Vale da Morte 2 nulo

Observação

Lembre-se de que, para SCD Tipo 1, a tabela se parece exatamente com o instantâneo mais recente. A diferença é que as consultas downstream podem usar o feed de alterações para processar apenas registros alterados.

Etapa 3: Adicionar novos instantâneos

Adicione um novo arquivo CSV ao local de armazenamento com dados modificados (por exemplo, valores de cidade alterados, novas linhas ou linhas removidas). Em seguida, execute o pipeline novamente para processar o novo instantâneo.

Limitações

  • A coluna de sequenciamento deve ser um tipo de dados classificável. NULL Não há suporte para valores de sequenciamento.
  • AUTO CDC FROM SNAPSHOT tem suporte apenas na interface de pipeline do Python; não há suporte para a interface SQL.

Recursos adicionais