Partilhar via


As APIs AUTO CDC: captura simplificada de dados de mudanças com pipelines

O Lakeflow Spark Declarative Pipelines simplifica a captura de dados de alterações (CDC) com as APIs AUTO CDC e AUTO CDC FROM SNAPSHOT. Estas APIs automatizam a complexidade de computar dimensões que mudam lentamente (SCD) Tipo 1 e Tipo 2, seja a partir de um feed CDC ou de instantâneos de bases de dados. Para saber mais sobre estes conceitos, consulte Captura de alterações de dados e capturas instantâneas.

Observação

As AUTO CDC APIs substituem as APPLY CHANGES APIs e mantêm a mesma sintaxe. As APPLY CHANGES APIs ainda estão disponíveis, mas o Databricks recomenda usar as AUTO CDC APIs em seu lugar.

A API que usa depende da origem dos seus dados de alteração:

  • AUTO CDC: Use isto quando a base de dados de origem tiver um feed CDC ativado. AUTO CDC processa alterações a partir de um feed de dados de alteração (CDF). É suportado tanto nas interfaces de SQL e Python do sistema de pipeline.
  • AUTO CDC FROM SNAPSHOT: Use isto quando o CDC não estiver ativado na base de dados de origem e apenas os snapshots estiverem disponíveis. Esta API compara instantâneos para determinar alterações e depois processa-as. É suportado apenas na interface Python.

Ambas as APIs suportam a atualização de tabelas usando SCD Tipo 1 e Tipo 2:

  • Use o SCD Tipo 1 para atualizar registos diretamente. O histórico não é retido para registros atualizados.
  • Use o SCD Tipo 2 para manter um histórico de registos, seja em todas as atualizações ou em atualizações de um conjunto especificado de colunas.

As AUTO CDC APIs não são suportadas pelo Apache Spark Declarative Pipelines.

Para sintaxe e outras referências, veja AUTO CDC INTO (pipelines),create_auto_cdc_flow e create_auto_cdc_from_snapshot_flow.

Observação

Esta página descreve como atualizar tabelas nos seus pipelines com base nas alterações nos dados de origem. Para saber como registar e consultar informações de alteração ao nível da linha para tabelas Delta, consulte Utilizar o feed de dados de mudança do Delta Lake no Azure Databricks.

Requerimentos

Para usar as APIs CDC, seu pipeline deve ser configurado para usar SDP sem servidor ou o SDP Pro ou Advancededições.

Como funciona o AUTO CDC

Para realizar o processamento CDC com AUTO CDC, crie uma tabela de streaming e depois use a instrução AUTO CDC ... INTO em SQL ou a função create_auto_cdc_flow() em Python para especificar a fonte, as chaves e o sequenciamento para o feed de alterações. Para uma explicação de como funcionam a sequenciação e a lógica SCD, veja Alterar captura de dados e instantâneos. Veja os exemplos do AUTO CDC.

Para a inicialização inicial de uma fonte com um feed de alterações, use AUTO CDC com um once fluxo e depois continue a processar o feed de alterações. Veja Replicar uma tabela RDBMS externa usando AUTO CDC.

Para detalhes de sintaxe, consulte AUTO CDC INTO (pipelines) ou create_auto_cdc_flow.

Como funciona o AUTO CDC do SNAPSHOT

AUTO CDC FROM SNAPSHOT determina alterações nos dados de origem comparando instantâneos por ordem. É suportado apenas na interface de pipeline Python. Você pode ler capturas instantâneas de uma tabela Delta, arquivos de armazenamento na cloud ou JDBC diretamente.

Para realizar o processamento CDC com AUTO CDC FROM SNAPSHOT, crie uma tabela de fluxo contínuo e, depois, use a função create_auto_cdc_from_snapshot_flow() para especificar o snapshot, as chaves e outros argumentos. Para obter detalhes sobre os dois padrões de ingestão e quando usar cada um, consulte Padrões de processamento de instantâneos. Veja os exemplos do AUTO CDC FROM SNAPSHOT.

Para detalhes da sintaxe, consulte create_auto_cdc_from_snapshot_flow.

Usar várias colunas para sequenciamento

Para sequenciar por múltiplas colunas (por exemplo, um carimbo temporal e um ID para desempatar), use a STRUCT para as combinar. A API ordena inicialmente pelo primeiro campo e, em caso de empate, considera o segundo campo, e assim sucessivamente.

SQL

SEQUENCE BY STRUCT(timestamp_col, id_col)

Python

sequence_by = struct("timestamp_col", "id_col")

Exemplos AUTO CDC

Os exemplos seguintes demonstram o processamento SCD Tipo 1 e Tipo 2 usando uma fonte de dados de alteração. Os dados de exemplo criam novos registos de utilizador, apagam um registo de utilizador e atualizam os registos de utilizador. No exemplo SCD Tipo 1, as últimas UPDATE operações chegam tarde e são removidas da tabela de destino, demonstrando o manejo de eventos fora de ordem.

Seguem-se os registos de entrada usados nestes exemplos. Estes dados são criados ao executar a consulta na secção Criar dados de exemplo .

userId nome city operação sequênciaNum
124 Raul Oaxaca INSERT 1
123 Isabel Monterrey INSERT 1
125 Mercedes Tijuana INSERT 2
126 Lírio Cancún INSERT 2
123 null null DELETE 6
125 Mercedes Guadalajara UPDATE 6
125 Mercedes Mexicali UPDATE 5
123 Isabel Chihuahua UPDATE 5

Se descomentar a última linha na consulta de geração de dados de exemplo, insere o seguinte registo que especifica truncar a tabela (limpar a tabela) em sequenceNum=3:

userId nome city operação sequênciaNum
null null null TRUNCAR 3

Observação

Todos os exemplos a seguir incluem opções para especificar ambas DELETE e TRUNCATE operações, mas cada uma é opcional.

Criar dados de exemplo

Execute as seguintes instruções para criar um conjunto de dados de exemplo. Este código não se destina a ser executado como parte de uma definição de pipeline. Executa-o a partir da pasta de exploração do teu pipeline, em vez da pasta de transformações.

CREATE SCHEMA IF NOT EXISTS main.cdc_tutorial;

CREATE TABLE main.cdc_tutorial.users_cdf
AS SELECT
  col1 AS userId,
  col2 AS name,
  col3 AS city,
  col4 AS operation,
  col5 AS sequenceNum
FROM (
  VALUES
  -- Initial load.
  (124, "Raul",     "Oaxaca",      "INSERT", 1),
  (123, "Isabel",   "Monterrey",   "INSERT", 1),
  -- New users.
  (125, "Mercedes", "Tijuana",     "INSERT", 2),
  (126, "Lily",     "Cancun",      "INSERT", 2),
  -- Isabel is removed from the system and Mercedes moved to Guadalajara.
  (123, null,       null,          "DELETE", 6),
  (125, "Mercedes", "Guadalajara", "UPDATE", 6),
  -- This batch of updates arrived out of order. The batch at sequenceNum 6 is the final state.
  (125, "Mercedes", "Mexicali",    "UPDATE", 5),
  (123, "Isabel",   "Chihuahua",   "UPDATE", 5)
  -- Uncomment to test TRUNCATE.
  -- ,(null, null,      null,          "TRUNCATE", 3)
);

Processar as atualizações do Tipo 1 de SCD

O SCD Tipo 1 mantém apenas a versão mais recente de cada registo. O exemplo seguinte extrai dados a partir do feed de alterações criado acima e aplica alterações a um alvo de tabela de streaming. Desenvolver Pipelines Declarativos Lakeflow Spark para executar este código.

Python

from pyspark import pipelines as dp
from pyspark.sql.functions import col, expr

@dp.view
def users():
  return spark.readStream.table("main.cdc_tutorial.users_cdf")

dp.create_streaming_table("users_current")

dp.create_auto_cdc_flow(
  target = "users_current",
  source = "users",
  keys = ["userId"],
  sequence_by = col("sequenceNum"),
  apply_as_deletes = expr("operation = 'DELETE'"),
  apply_as_truncates = expr("operation = 'TRUNCATE'"),
  except_column_list = ["operation", "sequenceNum"],
  stored_as_scd_type = 1
)

SQL

CREATE OR REFRESH STREAMING TABLE users_current;

CREATE FLOW apply_cdc AS AUTO CDC INTO
  users_current
FROM
  stream(main.cdc_tutorial.users_cdf)
KEYS
  (userId)
APPLY AS DELETE WHEN
  operation = "DELETE"
APPLY AS TRUNCATE WHEN
  operation = "TRUNCATE"
SEQUENCE BY
  sequenceNum
COLUMNS * EXCEPT
  (operation, sequenceNum)
STORED AS
  SCD TYPE 1;

Após a execução do exemplo SCD Tipo 1, a tabela alvo contém os seguintes registos:

userId nome city
124 Raul Oaxaca
125 Mercedes Guadalajara
126 Lírio Cancún

O utilizador 123 (Isabel) foi eliminado e não aparece. O utilizador 125 (Mercedes) mostra apenas a cidade mais recente (Guadalajara) porque o SCD Tipo 1 sobrespõe valores anteriores. A versão anterior em UPDATE em sequenceNum=5 foi abandonada porque chegou uma atualização posterior em sequenceNum=6.

Após executar o exemplo com o TRUNCATE registo sem comentários, a tabela é limpa em sequenceNum=3. Isto significa que os registos 124 e 126 não estão na tabela, e a tabela alvo final contém apenas o seguinte registo:

userId nome city
125 Mercedes Guadalajara

Processar atualizações do Tipo 2 de SCD

O SCD Tipo 2 preserva um histórico completo das alterações criando novas linhas para cada versão de um registo, com __START_AT colunas e __END_AT a indicar quando cada versão esteve ativa.

Python

from pyspark import pipelines as dp
from pyspark.sql.functions import col, expr

@dp.view
def users():
  return spark.readStream.table("main.cdc_tutorial.users_cdf")

dp.create_streaming_table("users_history")

dp.create_auto_cdc_flow(
  target = "users_history",
  source = "users",
  keys = ["userId"],
  sequence_by = col("sequenceNum"),
  apply_as_deletes = expr("operation = 'DELETE'"),
  except_column_list = ["operation", "sequenceNum"],
  stored_as_scd_type = "2"
)

SQL

CREATE OR REFRESH STREAMING TABLE users_history;

CREATE FLOW apply_cdc AS AUTO CDC INTO
  users_history
FROM
  stream(main.cdc_tutorial.users_cdf)
KEYS
  (userId)
APPLY AS DELETE WHEN
  operation = "DELETE"
SEQUENCE BY
  sequenceNum
COLUMNS * EXCEPT
  (operation, sequenceNum)
STORED AS
  SCD TYPE 2;

Após executar o exemplo SCD Tipo 2, a tabela alvo contém os seguintes registos:

userId nome city __START_AT __END_AT
123 Isabel Monterrey 1 5
123 Isabel Chihuahua 5 6
124 Raul Oaxaca 1 null
125 Mercedes Tijuana 2 5
125 Mercedes Mexicali 5 6
125 Mercedes Guadalajara 6 null
126 Lírio Cancún 2 null

A tabela preserva a história completa. O Utilizador 123 tem duas versões (terminadas na sequência 6 quando eliminadas). Utilizador 125 tem três versões que mostram alterações de cidade. Os registos com __END_AT = null estão atualmente ativos.

Rastreie um subconjunto de colunas com SCD Tipo 2

Por defeito, o SCD Tipo 2 cria uma nova versão sempre que o valor de qualquer coluna muda. Pode especificar um subconjunto de colunas a acompanhar, para que as alterações noutras colunas atualizem a versão atual em vez de gerar um novo registo de histórico.

O exemplo seguinte exclui a city coluna do registo histórico:

Python

from pyspark import pipelines as dp
from pyspark.sql.functions import col, expr

@dp.view
def users():
  return spark.readStream.table("main.cdc_tutorial.users_cdf")

dp.create_streaming_table("users_history")

dp.create_auto_cdc_flow(
  target = "users_history",
  source = "users",
  keys = ["userId"],
  sequence_by = col("sequenceNum"),
  apply_as_deletes = expr("operation = 'DELETE'"),
  except_column_list = ["operation", "sequenceNum"],
  stored_as_scd_type = "2",
  track_history_except_column_list = ["city"]
)

SQL

CREATE OR REFRESH STREAMING TABLE users_history;

CREATE FLOW apply_cdc AS AUTO CDC INTO
  users_history
FROM
  stream(main.cdc_tutorial.users_cdf)
KEYS
  (userId)
APPLY AS DELETE WHEN
  operation = "DELETE"
SEQUENCE BY
  sequenceNum
COLUMNS * EXCEPT
  (operation, sequenceNum)
STORED AS
  SCD TYPE 2
TRACK HISTORY ON * EXCEPT
  (city)

Como city as alterações não são acompanhadas, as atualizações da cidade sobrescrevem a linha atual em vez de criar uma nova versão. A tabela alvo contém os seguintes registos:

userId nome city __START_AT __END_AT
123 Isabel Chihuahua 1 6
124 Raul Oaxaca 1 null
125 Mercedes Guadalajara 2 null
126 Lírio Cancún 2 null

Exemplos de CD AUTOMÁTICO A PARTIR DE SNAPSHOT

As secções seguintes fornecem exemplos da utilização de AUTO CDC FROM SNAPSHOT para processar instantâneos em tabelas alvo SCD Tipo 1 ou Tipo 2. Para contexto sobre quando usar esta API, veja Alterar captura de dados e instantâneos.

Exemplo: Instantâneos de processos usando o tempo de ingestão do pipeline

Utilize esta abordagem quando os snapshots chegarem regularmente e em ordem e pode confiar no timestamp da execução do pipeline para a versionação. Um novo snapshot é importado a cada atualização do pipeline.

Pode ler snapshots de vários tipos de fonte, incluindo tabelas Delta, ficheiros de armazenamento na cloud e ligações JDBC.

Passo 1: Criar dados de exemplo

Crie uma tabela com dados de instantâneo. Execute o seguinte código a partir de um notebook ou do Databricks SQL na explorations pasta do seu pipeline:

CREATE SCHEMA IF NOT EXISTS main.cdc_tutorial;

CREATE TABLE main.cdc_tutorial.snapshot (
  userId INT,
  city STRING
);

INSERT INTO main.cdc_tutorial.snapshot VALUES
  (1, 'Oaxaca'),
  (2, 'Monterrey'),
  (3, 'Tijuana');

Passo 2: Executar AUTO CDC A PARTIR DO SNAPSHOT

Desenvolva Pipelines Declarativos Lakeflow Spark para executar o código nesta etapa.

Escolha um tipo de fonte para a visualização snapshot (o código de criação do exemplo gera uma tabela Delta):

Opção A: Ler de uma tabela Delta
from pyspark import pipelines as dp

@dp.view(name="source")
def source():
  return spark.read.table("main.cdc_tutorial.snapshot")
Opção B: Ler a partir de armazenamento na cloud
from pyspark import pipelines as dp

@dp.view(name="source")
def source():
  return spark.read.format("csv").option("header", True).load("<snapshot-path>")
Opção C: Ler do JDBC (apenas computação clássica)
from pyspark import pipelines as dp

@dp.view(name="source")
def source():
  return (spark.read
    .format("jdbc")
    .option("url", "<jdbc-url>")
    .option("dbtable", "<table-name>")
    .option("user", "<username>")
    .option("password", "<password>")
    .load()
  )

Escrever todas as opções no destino

Depois adiciona a tabela alvo e o fluxo:

dp.create_streaming_table("target")

dp.create_auto_cdc_from_snapshot_flow(
  target = "target",
  source = "source",
  keys = ["userId"],
  stored_as_scd_type = 2
)

Após a primeira execução do pipeline, todos os registos são inseridos como linhas ativas:

userId city __START_AT __END_AT
1 Oaxaca 0 null
2 Monterrey 0 null
3 Tijuana 0 null

Observação

Para usar SCD Tipo 1 em vez disso e manter apenas o estado atual, defina stored_as_scd_type=1. Neste caso, a tabela alvo não inclui as colunas __START_AT e __END_AT.

Passo 3: Simular um novo snapshot e repetir

Atualize a tabela de origem para simular a chegada de um novo snapshot (execute este código a partir de um caderno ou ficheiro SQL na explorations pasta do seu pipline):

TRUNCATE TABLE main.cdc_tutorial.snapshot;

INSERT INTO main.cdc_tutorial.snapshot VALUES
  (2, 'Carmel'),
  (3, 'Los Angeles'),
  (4, 'Death Valley'),
  (6, 'Kings Canyon');

Executar o pipeline novamente. AUTO CDC FROM SNAPSHOT compara o novo snapshot com o anterior e deteta que o utilizador 1 foi eliminado, os utilizadores 2 e 3 foram atualizados, e os utilizadores 4 e 6 foram inseridos. Isto gera um feed de alterações e é usado AUTO CDC para criar a tabela de saída.

Após a segunda execução com SCD Tipo 2, a tabela alvo contém os seguintes registos:

userId city __START_AT __END_AT
1 Oaxaca 0 1
2 Monterrey 0 1
2 Carmel 1 null
3 Tijuana 0 1
3 Los Angeles 1 null
4 Vale da Morte 1 null
6 Kings Canyon 1 null

O Utilizador 1 foi encerrado (apagado). Os utilizadores 2 e 3 têm cada um duas versões que mostram as alterações das suas cidades. Os utilizadores 4 e 6 foram recém-inseridos.

Após a segunda execução com SCD Tipo 1, a tabela alvo mostra apenas o estado atual:

userId city
2 Carmel
3 Los Angeles
4 Vale da Morte
6 Kings Canyon

Exemplo: Processar snapshots usando funções de versão

Use esta abordagem quando precisar de controlo explícito sobre a ordenação de snapshots. Por exemplo, use esta abordagem quando vários snapshots chegam ao mesmo tempo, ou quando chegam fora de ordem. Escreves uma função que especifica qual snapshot processar a seguir e o seu número de versão. A interface da API processa capturas por ordem crescente de versões.

  • Se vários snapshots estiverem armazenados, todos são processados por ordem.
  • Se um snapshot chegar fora de ordem (por exemplo, snapshot_3 chegar depois de snapshot_4), é ignorado.
  • Se não houver novos snapshots, a função retorna None e não ocorre qualquer processamento.

Passo 1: Preparar ficheiros de instantâneo

Crie ficheiros CSV contendo dados snapshot e adicione-os a um volume ou local de armazenamento na cloud. Nomeie os ficheiros cronologicamente (por exemplo, snapshot_1.csv, snapshot_2.csv).

Cada ficheiro deve conter colunas para userId e city. Por exemplo:

snapshot_1.csv:

userId city
1 Oaxaca
2 Monterrey
3 Tijuana

snapshot_2.csv:

userId city
2 Carmel
3 Los Angeles
4 Vale da Morte

Passo 2: Execute AUTO CDC A PARTIR DO SNAPSHOT com uma função de versão

Crie um novo notebook e cole o seguinte código do pipeline. Depois desenvolver pipelines declarativos Lakeflow Spark.

from pyspark import pipelines as dp
from typing import Optional, Tuple
from pyspark.sql import DataFrame

def next_snapshot_and_version(latest_snapshot_version: Optional[int]) -> Optional[Tuple[DataFrame, int]]:
  snapshot_dir = "/Volumes/main/cdc_tutorial/snapshots/" # or the location you created the sample data

  files = dbutils.fs.ls(snapshot_dir)
  snapshot_files = [f.name for f in files if f.name.startswith("snapshot_") and f.name.endswith(".csv")]

  snapshot_versions = []
  for filename in snapshot_files:
    try:
      version = int(filename.replace("snapshot_", "").replace(".csv", ""))
      snapshot_versions.append(version)
    except ValueError:
      continue

  snapshot_versions.sort()

  if latest_snapshot_version is None:
    if snapshot_versions:
      next_version = snapshot_versions[0]
    else:
      return None
  else:
    next_versions = [v for v in snapshot_versions if v > latest_snapshot_version]
    if next_versions:
      next_version = next_versions[0]
    else:
      return None

  snapshot_path = f"{snapshot_dir}snapshot_{next_version}.csv"
  df = spark.read.format("csv").option("header", True).load(snapshot_path)
  return (df, next_version)


dp.create_streaming_table("main.cdc_tutorial.target_versioned")

dp.create_auto_cdc_from_snapshot_flow(
  target = "main.cdc_tutorial.target_versioned",
  source = next_snapshot_and_version,
  keys = ["userId"],
  stored_as_scd_type = 2
)

Observação

Para usar SCD Tipo 1 em vez disso, defina stored_as_scd_type=1.

Após o processamento de snapshot_1.csv, a tabela de destino contém os seguintes registos:

userId city __START_AT __END_AT
1 Oaxaca 1 null
2 Monterrey 1 null
3 Tijuana 1 null

Após o processamento de snapshot_2.csv, a tabela alvo contém os seguintes registros:

userId city __START_AT __END_AT
1 Oaxaca 1 2
2 Monterrey 1 2
2 Carmel 2 null
3 Tijuana 1 2
3 Los Angeles 2 null
4 Vale da Morte 2 null

Observação

Lembra-te que, para SCD Tipo 1, a tabela parece exatamente como o snapshot mais recente. A diferença é que as consultas posteriores podem usar o feed de alterações para processar apenas registos alterados.

Passo 3: Adicionar novos snapshots

Adicione um novo ficheiro CSV à localização de armazenamento com dados modificados (por exemplo, valores de cidade alterados, novas linhas ou linhas removidas). Depois executa novamente o pipeline para processar o novo snapshot.

Limitações

  • A coluna de sequenciação deve ser um tipo de dado ordenável. NULL Os valores de sequenciação não são suportados.
  • AUTO CDC FROM SNAPSHOT é suportado apenas na interface do pipeline Python; a interface SQL não é suportada.

Recursos adicionais