Compartilhar via


Replicar uma tabela RDBMS externa usando AUTO CDC

Esta página explica como replicar uma tabela de um RDBMS (sistema de gerenciamento de banco de dados relacional externo) para o Azure Databricks usando a AUTO CDC API em pipelines. Você aprenderá:

  • Padrões comuns para configurar as fontes.
  • Como executar uma cópia completa única dos dados existentes usando um once fluxo.
  • Como ingerir continuamente novas alterações usando um change fluxo.

Esse padrão é ideal para criar tabelas SCD (dimensão de alteração lenta) ou manter uma tabela de destino em sincronia com um sistema externo de registro.

Antes de começar

Este guia pressupõe que você tenha acesso aos seguintes conjuntos de dados de sua origem:

  • Um instantâneo completo da tabela de origem no armazenamento em nuvem. Esse conjunto de dados é usado para a carga inicial.
  • Um feed contínuo de alterações, preenchido no mesmo local de armazenamento em nuvem (por exemplo, usando Debezium, Kafka ou CDC baseado em log). Este fluxo é a entrada para o processo em andamento AUTO CDC.

Configurar exibições de origem

Primeiro, defina duas exibições de origem para preencher a rdbms_orders tabela de destino de um caminho orders_snapshot_path de armazenamento em nuvem. Ambos são criados como exibições de streaming sobre dados brutos no armazenamento em nuvem. O uso de exibições proporciona maior eficiência porque os dados não precisam ser gravados antes de serem usados no AUTO CDC processo.

  • A primeira exibição de origem é um instantâneo completo (full_orders_snapshot)
  • O segundo é um fluxo de alterações contínuo (rdbms_orders_change_feed).

Os exemplos neste guia usam o armazenamento em nuvem como origem, mas você pode usar qualquer fonte com suporte por tabelas de streaming.

full_orders_snapshot()

Esta etapa cria um pipeline com uma exibição que lê o instantâneo completo inicial dos dados de pedidos.

Python

O seguinte exemplo do Python:

  • Uso de spark.readStream com Carregador Automático (format("cloudFiles"))
  • Lê arquivos JSON de um diretório definido por orders_snapshot_path
  • Define includeExistingFiles para true garantir que os dados históricos já presentes no caminho sejam processados
  • Defina inferColumnTypes para true para inferir o esquema automaticamente
  • Retorna todas as colunas com .select("\*")
@dp.view()
def full_orders_snapshot():
    return (
        spark.readStream
        .format("cloudFiles")
        .option("cloudFiles.format", "json")
        .option("cloudFiles.includeExistingFiles", "true")
        .option("cloudFiles.inferColumnTypes", "true")
        .load(orders_snapshot_path)
        .select("*")
    )

SQL

O exemplo SQL a seguir passa opções como um mapa de pares chave-valor de strings. orders_snapshot_path deve estar disponível como uma variável SQL (por exemplo, definida usando parâmetros de pipeline ou interpolada manualmente).

CREATE OR REFRESH VIEW full_orders_snapshot
AS SELECT *
FROM STREAM read_files("${orders_snapshot_path}", "json", map(
  "cloudFiles.includeExistingFiles", "true",
  "cloudFiles.inferColumnTypes", "true"
));

rdbms_orders_change_feed()

Esta etapa cria uma segunda visualização que lê dados de alteração incremental (por exemplo, de logs CDC ou tabelas de alteração). Ele lê orders_cdc_path e pressupõe que os arquivos JSON no estilo CDC são inseridos nesse caminho regularmente.

Python

@dp.view()
def rdbms_orders_change_feed():
return (
spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "json")
.option("cloudFiles.includeExistingFiles", "true")
.option("cloudFiles.inferColumnTypes", "true")
.load(orders_cdc_path)

SQL

No exemplo sql a seguir, ${orders_cdc_path} é uma variável e pode ser interpolada definindo um valor em suas configurações de pipeline ou definindo explicitamente uma variável em seu código.

CREATE OR REFRESH VIEW rdbms_orders_change_feed
AS SELECT *
FROM STREAM read_files("${orders_cdc_path}", "json", map(
"cloudFiles.includeExistingFiles", "true",
"cloudFiles.inferColumnTypes", "true"
));

Hidratação inicial (uma vez que o fluxo)

Agora que as fontes estão configuradas, AUTO CDC a lógica mescla ambas as fontes em uma tabela de streaming de destino. Primeiro, use um fluxo único AUTO CDC com ONCE=TRUE para copiar o conteúdo completo da tabela RDBMS em uma tabela de transmissão de dados. Isso prepara a tabela de destino com dados históricos sem reproduzi-la em atualizações futuras.

Python

from pyspark import pipelines as dp

# Step 1: Create the target streaming table

dp.create_streaming_table("rdbms_orders")

# Step 2: Once Flow — Load initial snapshot of full RDBMS table

dp.create_auto_cdc_flow(
  flow_name = "initial_load_orders",
  once = True,  # one-time load
  target = "rdbms_orders",
  source = "full_orders_snapshot",  # e.g., ingested from JDBC into bronze
  keys = ["order_id"],
  sequence_by = "timestamp",
  stored_as_scd_type = "1"
)

SQL


-- Step 1: Create the target streaming table
CREATE OR REFRESH STREAMING TABLE rdbms_orders;

-- Step 2: Once Flow for initial snapshot
CREATE FLOW rdbms_orders_hydrate
AS AUTO CDC ONCE INTO rdbms_orders
FROM stream(full_orders_snapshot)
KEYS (order_id)
SEQUENCE BY timestamp
STORED AS SCD TYPE 1;

O once fluxo é executado apenas uma vez. Novos arquivos que são adicionados à full_orders_snapshot após a criação do pipeline são ignorados.

Importante

Executar uma atualização completa na rdbms_orders tabela de streaming executa novamente o once fluxo. Se os dados de instantâneo iniciais no armazenamento em nuvem tiverem sido removidos, isso resultará em perda de dados.

Feed contínuo de alterações (fluxo de alteração)

Após a carga inicial do instantâneo, use outro AUTO CDC fluxo para incorporar continuamente as alterações do feed CDC do RDBMS. Isso mantém sua rdbms_orders tabela atualizada com inserções, atualizações e exclusões.

Python

from pyspark import pipelines as dp

# Step 3: Change Flow — Ingest ongoing CDC stream from source system

dp.create_auto_cdc_flow(
flow_name = "orders_incremental_cdc",
target = "rdbms_orders",
source = "rdbms_orders_change_feed", # e.g., ingested from Kafka or Debezium
keys = ["order_id"],
sequence_by = "timestamp",
stored_as_scd_type = "1"
)

SQL

-- Step 3: Continuous CDC ingestion
CREATE FLOW rdbms_orders_continuous
AS AUTO CDC INTO rdbms_orders
FROM stream(rdbms_orders_change_feed)
KEYS (order_id)
SEQUENCE BY timestamp
STORED AS SCD TYPE 1;

Considerações

Idempotência do preenchimento retroativo Um once fluxo só é executado novamente quando a tabela de destino é totalmente atualizada.
Vários fluxos Você pode usar vários fluxos de alteração para integrar correções, dados que chegam tardiamente ou feeds alternativos, mas todos devem compartilhar um esquema e as chaves.
Atualização completa Uma atualização completa na rdbms_orders tabela de streaming executa novamente o once fluxo. Isso poderá levar à perda de dados se o local de armazenamento em nuvem inicial tiver removido os dados de instantâneo iniciais.
Ordem de execução de fluxo A ordem de execução do fluxo não importa. O resultado final é o mesmo.

Recursos adicionais

  • Conector do SQL Server totalmente gerenciado no Lakeflow Connect