Nota
O acesso a esta página requer autorização. Pode tentar iniciar sessão ou alterar os diretórios.
O acesso a esta página requer autorização. Pode tentar alterar os diretórios.
O Lakeflow Spark Declarative Pipelines simplifica a captura de dados de alterações (CDC) com as APIs AUTO CDC e AUTO CDC FROM SNAPSHOT. Estas APIs automatizam a complexidade de computar dimensões que mudam lentamente (SCD) Tipo 1 e Tipo 2, seja a partir de um feed CDC ou de instantâneos de bases de dados. Para saber mais sobre estes conceitos, consulte Captura de alterações de dados e capturas instantâneas.
Observação
As AUTO CDC APIs substituem as APPLY CHANGES APIs e mantêm a mesma sintaxe. As APPLY CHANGES APIs ainda estão disponíveis, mas o Databricks recomenda usar as AUTO CDC APIs em seu lugar.
A API que usa depende da origem dos seus dados de alteração:
-
AUTO CDC: Use isto quando a base de dados de origem tiver um feed CDC ativado.AUTO CDCprocessa alterações a partir de um feed de dados de alteração (CDF). É suportado tanto nas interfaces de SQL e Python do sistema de pipeline. -
AUTO CDC FROM SNAPSHOT: Use isto quando o CDC não estiver ativado na base de dados de origem e apenas os snapshots estiverem disponíveis. Esta API compara instantâneos para determinar alterações e depois processa-as. É suportado apenas na interface Python.
Ambas as APIs suportam a atualização de tabelas usando SCD Tipo 1 e Tipo 2:
- Use o SCD Tipo 1 para atualizar registos diretamente. O histórico não é retido para registros atualizados.
- Use o SCD Tipo 2 para manter um histórico de registos, seja em todas as atualizações ou em atualizações de um conjunto especificado de colunas.
As AUTO CDC APIs não são suportadas pelo Apache Spark Declarative Pipelines.
Para sintaxe e outras referências, veja AUTO CDC INTO (pipelines),create_auto_cdc_flow e create_auto_cdc_from_snapshot_flow.
Observação
Esta página descreve como atualizar tabelas nos seus pipelines com base nas alterações nos dados de origem. Para saber como registar e consultar informações de alteração ao nível da linha para tabelas Delta, consulte Utilizar o feed de dados de mudança do Delta Lake no Azure Databricks.
Requerimentos
Para usar as APIs CDC, seu pipeline deve ser configurado para usar SDP sem servidor ou o SDP Pro ou Advancededições.
Como funciona o AUTO CDC
Para realizar o processamento CDC com AUTO CDC, crie uma tabela de streaming e depois use a instrução AUTO CDC ... INTO em SQL ou a função create_auto_cdc_flow() em Python para especificar a fonte, as chaves e o sequenciamento para o feed de alterações. Para uma explicação de como funcionam a sequenciação e a lógica SCD, veja Alterar captura de dados e instantâneos. Veja os exemplos do AUTO CDC.
Para a inicialização inicial de uma fonte com um feed de alterações, use AUTO CDC com um once fluxo e depois continue a processar o feed de alterações. Veja Replicar uma tabela RDBMS externa usando AUTO CDC.
Para detalhes de sintaxe, consulte AUTO CDC INTO (pipelines) ou create_auto_cdc_flow.
Como funciona o AUTO CDC do SNAPSHOT
AUTO CDC FROM SNAPSHOT determina alterações nos dados de origem comparando instantâneos por ordem. É suportado apenas na interface de pipeline Python. Você pode ler capturas instantâneas de uma tabela Delta, arquivos de armazenamento na cloud ou JDBC diretamente.
Para realizar o processamento CDC com AUTO CDC FROM SNAPSHOT, crie uma tabela de fluxo contínuo e, depois, use a função create_auto_cdc_from_snapshot_flow() para especificar o snapshot, as chaves e outros argumentos. Para obter detalhes sobre os dois padrões de ingestão e quando usar cada um, consulte Padrões de processamento de instantâneos. Veja os exemplos do AUTO CDC FROM SNAPSHOT.
Para detalhes da sintaxe, consulte create_auto_cdc_from_snapshot_flow.
Usar várias colunas para sequenciamento
Para sequenciar por múltiplas colunas (por exemplo, um carimbo temporal e um ID para desempatar), use a STRUCT para as combinar. A API ordena inicialmente pelo primeiro campo e, em caso de empate, considera o segundo campo, e assim sucessivamente.
SQL
SEQUENCE BY STRUCT(timestamp_col, id_col)
Python
sequence_by = struct("timestamp_col", "id_col")
Exemplos AUTO CDC
Os exemplos seguintes demonstram o processamento SCD Tipo 1 e Tipo 2 usando uma fonte de dados de alteração. Os dados de exemplo criam novos registos de utilizador, apagam um registo de utilizador e atualizam os registos de utilizador. No exemplo SCD Tipo 1, as últimas UPDATE operações chegam tarde e são removidas da tabela de destino, demonstrando o manejo de eventos fora de ordem.
Seguem-se os registos de entrada usados nestes exemplos. Estes dados são criados ao executar a consulta na secção Criar dados de exemplo .
| userId | nome | city | operação | sequênciaNum |
|---|---|---|---|---|
| 124 | Raul | Oaxaca | INSERT | 1 |
| 123 | Isabel | Monterrey | INSERT | 1 |
| 125 | Mercedes | Tijuana | INSERT | 2 |
| 126 | Lírio | Cancún | INSERT | 2 |
| 123 | null | null | DELETE | 6 |
| 125 | Mercedes | Guadalajara | UPDATE | 6 |
| 125 | Mercedes | Mexicali | UPDATE | 5 |
| 123 | Isabel | Chihuahua | UPDATE | 5 |
Se descomentar a última linha na consulta de geração de dados de exemplo, insere o seguinte registo que especifica truncar a tabela (limpar a tabela) em sequenceNum=3:
| userId | nome | city | operação | sequênciaNum |
|---|---|---|---|---|
| null | null | null | TRUNCAR | 3 |
Observação
Todos os exemplos a seguir incluem opções para especificar ambas DELETE e TRUNCATE operações, mas cada uma é opcional.
Criar dados de exemplo
Execute as seguintes instruções para criar um conjunto de dados de exemplo. Este código não se destina a ser executado como parte de uma definição de pipeline. Executa-o a partir da pasta de exploração do teu pipeline, em vez da pasta de transformações.
CREATE SCHEMA IF NOT EXISTS main.cdc_tutorial;
CREATE TABLE main.cdc_tutorial.users_cdf
AS SELECT
col1 AS userId,
col2 AS name,
col3 AS city,
col4 AS operation,
col5 AS sequenceNum
FROM (
VALUES
-- Initial load.
(124, "Raul", "Oaxaca", "INSERT", 1),
(123, "Isabel", "Monterrey", "INSERT", 1),
-- New users.
(125, "Mercedes", "Tijuana", "INSERT", 2),
(126, "Lily", "Cancun", "INSERT", 2),
-- Isabel is removed from the system and Mercedes moved to Guadalajara.
(123, null, null, "DELETE", 6),
(125, "Mercedes", "Guadalajara", "UPDATE", 6),
-- This batch of updates arrived out of order. The batch at sequenceNum 6 is the final state.
(125, "Mercedes", "Mexicali", "UPDATE", 5),
(123, "Isabel", "Chihuahua", "UPDATE", 5)
-- Uncomment to test TRUNCATE.
-- ,(null, null, null, "TRUNCATE", 3)
);
Processar as atualizações do Tipo 1 de SCD
O SCD Tipo 1 mantém apenas a versão mais recente de cada registo. O exemplo seguinte extrai dados a partir do feed de alterações criado acima e aplica alterações a um alvo de tabela de streaming. Desenvolver Pipelines Declarativos Lakeflow Spark para executar este código.
Python
from pyspark import pipelines as dp
from pyspark.sql.functions import col, expr
@dp.view
def users():
return spark.readStream.table("main.cdc_tutorial.users_cdf")
dp.create_streaming_table("users_current")
dp.create_auto_cdc_flow(
target = "users_current",
source = "users",
keys = ["userId"],
sequence_by = col("sequenceNum"),
apply_as_deletes = expr("operation = 'DELETE'"),
apply_as_truncates = expr("operation = 'TRUNCATE'"),
except_column_list = ["operation", "sequenceNum"],
stored_as_scd_type = 1
)
SQL
CREATE OR REFRESH STREAMING TABLE users_current;
CREATE FLOW apply_cdc AS AUTO CDC INTO
users_current
FROM
stream(main.cdc_tutorial.users_cdf)
KEYS
(userId)
APPLY AS DELETE WHEN
operation = "DELETE"
APPLY AS TRUNCATE WHEN
operation = "TRUNCATE"
SEQUENCE BY
sequenceNum
COLUMNS * EXCEPT
(operation, sequenceNum)
STORED AS
SCD TYPE 1;
Após a execução do exemplo SCD Tipo 1, a tabela alvo contém os seguintes registos:
| userId | nome | city |
|---|---|---|
| 124 | Raul | Oaxaca |
| 125 | Mercedes | Guadalajara |
| 126 | Lírio | Cancún |
O utilizador 123 (Isabel) foi eliminado e não aparece. O utilizador 125 (Mercedes) mostra apenas a cidade mais recente (Guadalajara) porque o SCD Tipo 1 sobrespõe valores anteriores. A versão anterior em UPDATE em sequenceNum=5 foi abandonada porque chegou uma atualização posterior em sequenceNum=6.
Após executar o exemplo com o TRUNCATE registo sem comentários, a tabela é limpa em sequenceNum=3. Isto significa que os registos 124 e 126 não estão na tabela, e a tabela alvo final contém apenas o seguinte registo:
| userId | nome | city |
|---|---|---|
| 125 | Mercedes | Guadalajara |
Processar atualizações do Tipo 2 de SCD
O SCD Tipo 2 preserva um histórico completo das alterações criando novas linhas para cada versão de um registo, com __START_AT colunas e __END_AT a indicar quando cada versão esteve ativa.
Python
from pyspark import pipelines as dp
from pyspark.sql.functions import col, expr
@dp.view
def users():
return spark.readStream.table("main.cdc_tutorial.users_cdf")
dp.create_streaming_table("users_history")
dp.create_auto_cdc_flow(
target = "users_history",
source = "users",
keys = ["userId"],
sequence_by = col("sequenceNum"),
apply_as_deletes = expr("operation = 'DELETE'"),
except_column_list = ["operation", "sequenceNum"],
stored_as_scd_type = "2"
)
SQL
CREATE OR REFRESH STREAMING TABLE users_history;
CREATE FLOW apply_cdc AS AUTO CDC INTO
users_history
FROM
stream(main.cdc_tutorial.users_cdf)
KEYS
(userId)
APPLY AS DELETE WHEN
operation = "DELETE"
SEQUENCE BY
sequenceNum
COLUMNS * EXCEPT
(operation, sequenceNum)
STORED AS
SCD TYPE 2;
Após executar o exemplo SCD Tipo 2, a tabela alvo contém os seguintes registos:
| userId | nome | city | __START_AT | __END_AT |
|---|---|---|---|---|
| 123 | Isabel | Monterrey | 1 | 5 |
| 123 | Isabel | Chihuahua | 5 | 6 |
| 124 | Raul | Oaxaca | 1 | null |
| 125 | Mercedes | Tijuana | 2 | 5 |
| 125 | Mercedes | Mexicali | 5 | 6 |
| 125 | Mercedes | Guadalajara | 6 | null |
| 126 | Lírio | Cancún | 2 | null |
A tabela preserva a história completa. O Utilizador 123 tem duas versões (terminadas na sequência 6 quando eliminadas). Utilizador 125 tem três versões que mostram alterações de cidade. Os registos com __END_AT = null estão atualmente ativos.
Rastreie um subconjunto de colunas com SCD Tipo 2
Por defeito, o SCD Tipo 2 cria uma nova versão sempre que o valor de qualquer coluna muda. Pode especificar um subconjunto de colunas a acompanhar, para que as alterações noutras colunas atualizem a versão atual em vez de gerar um novo registo de histórico.
O exemplo seguinte exclui a city coluna do registo histórico:
Python
from pyspark import pipelines as dp
from pyspark.sql.functions import col, expr
@dp.view
def users():
return spark.readStream.table("main.cdc_tutorial.users_cdf")
dp.create_streaming_table("users_history")
dp.create_auto_cdc_flow(
target = "users_history",
source = "users",
keys = ["userId"],
sequence_by = col("sequenceNum"),
apply_as_deletes = expr("operation = 'DELETE'"),
except_column_list = ["operation", "sequenceNum"],
stored_as_scd_type = "2",
track_history_except_column_list = ["city"]
)
SQL
CREATE OR REFRESH STREAMING TABLE users_history;
CREATE FLOW apply_cdc AS AUTO CDC INTO
users_history
FROM
stream(main.cdc_tutorial.users_cdf)
KEYS
(userId)
APPLY AS DELETE WHEN
operation = "DELETE"
SEQUENCE BY
sequenceNum
COLUMNS * EXCEPT
(operation, sequenceNum)
STORED AS
SCD TYPE 2
TRACK HISTORY ON * EXCEPT
(city)
Como city as alterações não são acompanhadas, as atualizações da cidade sobrescrevem a linha atual em vez de criar uma nova versão. A tabela alvo contém os seguintes registos:
| userId | nome | city | __START_AT | __END_AT |
|---|---|---|---|---|
| 123 | Isabel | Chihuahua | 1 | 6 |
| 124 | Raul | Oaxaca | 1 | null |
| 125 | Mercedes | Guadalajara | 2 | null |
| 126 | Lírio | Cancún | 2 | null |
Exemplos de CD AUTOMÁTICO A PARTIR DE SNAPSHOT
As secções seguintes fornecem exemplos da utilização de AUTO CDC FROM SNAPSHOT para processar instantâneos em tabelas alvo SCD Tipo 1 ou Tipo 2. Para contexto sobre quando usar esta API, veja Alterar captura de dados e instantâneos.
Exemplo: Instantâneos de processos usando o tempo de ingestão do pipeline
Utilize esta abordagem quando os snapshots chegarem regularmente e em ordem e pode confiar no timestamp da execução do pipeline para a versionação. Um novo snapshot é importado a cada atualização do pipeline.
Pode ler snapshots de vários tipos de fonte, incluindo tabelas Delta, ficheiros de armazenamento na cloud e ligações JDBC.
Passo 1: Criar dados de exemplo
Crie uma tabela com dados de instantâneo. Execute o seguinte código a partir de um notebook ou do Databricks SQL na explorations pasta do seu pipeline:
CREATE SCHEMA IF NOT EXISTS main.cdc_tutorial;
CREATE TABLE main.cdc_tutorial.snapshot (
userId INT,
city STRING
);
INSERT INTO main.cdc_tutorial.snapshot VALUES
(1, 'Oaxaca'),
(2, 'Monterrey'),
(3, 'Tijuana');
Passo 2: Executar AUTO CDC A PARTIR DO SNAPSHOT
Desenvolva Pipelines Declarativos Lakeflow Spark para executar o código nesta etapa.
Escolha um tipo de fonte para a visualização snapshot (o código de criação do exemplo gera uma tabela Delta):
Opção A: Ler de uma tabela Delta
from pyspark import pipelines as dp
@dp.view(name="source")
def source():
return spark.read.table("main.cdc_tutorial.snapshot")
Opção B: Ler a partir de armazenamento na cloud
from pyspark import pipelines as dp
@dp.view(name="source")
def source():
return spark.read.format("csv").option("header", True).load("<snapshot-path>")
Opção C: Ler do JDBC (apenas computação clássica)
from pyspark import pipelines as dp
@dp.view(name="source")
def source():
return (spark.read
.format("jdbc")
.option("url", "<jdbc-url>")
.option("dbtable", "<table-name>")
.option("user", "<username>")
.option("password", "<password>")
.load()
)
Escrever todas as opções no destino
Depois adiciona a tabela alvo e o fluxo:
dp.create_streaming_table("target")
dp.create_auto_cdc_from_snapshot_flow(
target = "target",
source = "source",
keys = ["userId"],
stored_as_scd_type = 2
)
Após a primeira execução do pipeline, todos os registos são inseridos como linhas ativas:
| userId | city | __START_AT | __END_AT |
|---|---|---|---|
| 1 | Oaxaca | 0 | null |
| 2 | Monterrey | 0 | null |
| 3 | Tijuana | 0 | null |
Observação
Para usar SCD Tipo 1 em vez disso e manter apenas o estado atual, defina stored_as_scd_type=1. Neste caso, a tabela alvo não inclui as colunas __START_AT e __END_AT.
Passo 3: Simular um novo snapshot e repetir
Atualize a tabela de origem para simular a chegada de um novo snapshot (execute este código a partir de um caderno ou ficheiro SQL na explorations pasta do seu pipline):
TRUNCATE TABLE main.cdc_tutorial.snapshot;
INSERT INTO main.cdc_tutorial.snapshot VALUES
(2, 'Carmel'),
(3, 'Los Angeles'),
(4, 'Death Valley'),
(6, 'Kings Canyon');
Executar o pipeline novamente.
AUTO CDC FROM SNAPSHOT compara o novo snapshot com o anterior e deteta que o utilizador 1 foi eliminado, os utilizadores 2 e 3 foram atualizados, e os utilizadores 4 e 6 foram inseridos. Isto gera um feed de alterações e é usado AUTO CDC para criar a tabela de saída.
Após a segunda execução com SCD Tipo 2, a tabela alvo contém os seguintes registos:
| userId | city | __START_AT | __END_AT |
|---|---|---|---|
| 1 | Oaxaca | 0 | 1 |
| 2 | Monterrey | 0 | 1 |
| 2 | Carmel | 1 | null |
| 3 | Tijuana | 0 | 1 |
| 3 | Los Angeles | 1 | null |
| 4 | Vale da Morte | 1 | null |
| 6 | Kings Canyon | 1 | null |
O Utilizador 1 foi encerrado (apagado). Os utilizadores 2 e 3 têm cada um duas versões que mostram as alterações das suas cidades. Os utilizadores 4 e 6 foram recém-inseridos.
Após a segunda execução com SCD Tipo 1, a tabela alvo mostra apenas o estado atual:
| userId | city |
|---|---|
| 2 | Carmel |
| 3 | Los Angeles |
| 4 | Vale da Morte |
| 6 | Kings Canyon |
Exemplo: Processar snapshots usando funções de versão
Use esta abordagem quando precisar de controlo explícito sobre a ordenação de snapshots. Por exemplo, use esta abordagem quando vários snapshots chegam ao mesmo tempo, ou quando chegam fora de ordem. Escreves uma função que especifica qual snapshot processar a seguir e o seu número de versão. A interface da API processa capturas por ordem crescente de versões.
- Se vários snapshots estiverem armazenados, todos são processados por ordem.
- Se um snapshot chegar fora de ordem (por exemplo,
snapshot_3chegar depois desnapshot_4), é ignorado. - Se não houver novos snapshots, a função retorna
Nonee não ocorre qualquer processamento.
Passo 1: Preparar ficheiros de instantâneo
Crie ficheiros CSV contendo dados snapshot e adicione-os a um volume ou local de armazenamento na cloud. Nomeie os ficheiros cronologicamente (por exemplo, snapshot_1.csv, snapshot_2.csv).
Cada ficheiro deve conter colunas para userId e city. Por exemplo:
snapshot_1.csv:
| userId | city |
|---|---|
| 1 | Oaxaca |
| 2 | Monterrey |
| 3 | Tijuana |
snapshot_2.csv:
| userId | city |
|---|---|
| 2 | Carmel |
| 3 | Los Angeles |
| 4 | Vale da Morte |
Passo 2: Execute AUTO CDC A PARTIR DO SNAPSHOT com uma função de versão
Crie um novo notebook e cole o seguinte código do pipeline. Depois desenvolver pipelines declarativos Lakeflow Spark.
from pyspark import pipelines as dp
from typing import Optional, Tuple
from pyspark.sql import DataFrame
def next_snapshot_and_version(latest_snapshot_version: Optional[int]) -> Optional[Tuple[DataFrame, int]]:
snapshot_dir = "/Volumes/main/cdc_tutorial/snapshots/" # or the location you created the sample data
files = dbutils.fs.ls(snapshot_dir)
snapshot_files = [f.name for f in files if f.name.startswith("snapshot_") and f.name.endswith(".csv")]
snapshot_versions = []
for filename in snapshot_files:
try:
version = int(filename.replace("snapshot_", "").replace(".csv", ""))
snapshot_versions.append(version)
except ValueError:
continue
snapshot_versions.sort()
if latest_snapshot_version is None:
if snapshot_versions:
next_version = snapshot_versions[0]
else:
return None
else:
next_versions = [v for v in snapshot_versions if v > latest_snapshot_version]
if next_versions:
next_version = next_versions[0]
else:
return None
snapshot_path = f"{snapshot_dir}snapshot_{next_version}.csv"
df = spark.read.format("csv").option("header", True).load(snapshot_path)
return (df, next_version)
dp.create_streaming_table("main.cdc_tutorial.target_versioned")
dp.create_auto_cdc_from_snapshot_flow(
target = "main.cdc_tutorial.target_versioned",
source = next_snapshot_and_version,
keys = ["userId"],
stored_as_scd_type = 2
)
Observação
Para usar SCD Tipo 1 em vez disso, defina stored_as_scd_type=1.
Após o processamento de snapshot_1.csv, a tabela de destino contém os seguintes registos:
| userId | city | __START_AT | __END_AT |
|---|---|---|---|
| 1 | Oaxaca | 1 | null |
| 2 | Monterrey | 1 | null |
| 3 | Tijuana | 1 | null |
Após o processamento de snapshot_2.csv, a tabela alvo contém os seguintes registros:
| userId | city | __START_AT | __END_AT |
|---|---|---|---|
| 1 | Oaxaca | 1 | 2 |
| 2 | Monterrey | 1 | 2 |
| 2 | Carmel | 2 | null |
| 3 | Tijuana | 1 | 2 |
| 3 | Los Angeles | 2 | null |
| 4 | Vale da Morte | 2 | null |
Observação
Lembra-te que, para SCD Tipo 1, a tabela parece exatamente como o snapshot mais recente. A diferença é que as consultas posteriores podem usar o feed de alterações para processar apenas registos alterados.
Passo 3: Adicionar novos snapshots
Adicione um novo ficheiro CSV à localização de armazenamento com dados modificados (por exemplo, valores de cidade alterados, novas linhas ou linhas removidas). Depois executa novamente o pipeline para processar o novo snapshot.
Limitações
- A coluna de sequenciação deve ser um tipo de dado ordenável.
NULLOs valores de sequenciação não são suportados. -
AUTO CDC FROM SNAPSHOTé suportado apenas na interface do pipeline Python; a interface SQL não é suportada.
Recursos adicionais
- Captação de Dados de Alterações e snapshots: Aprenda sobre conceitos de CDC, snapshots e tipos de SCD.
-
Replicar uma tabela RDBMS externa usando
AUTO CDC: Aprenda a realizar a hidratação inicial com umoncefluxo e depois continue a processar as alterações. - Tópicos avançados do AUTO CDC: Aprenda sobre operações de alteração nos alvos do AUTO CDC, leitura de feeds de dados de alteração e métricas de processamento.
- Tutorial: Criar um pipeline de ETL usando a captura de dados de alteração