Partager via


Les APIs AUTO CDC : Simplifiez la capture des modifications de données grâce aux pipelines

Les pipelines déclaratifs de Lakeflow Spark simplifient la capture de données modifiées (CDC) avec les API AUTO CDC et AUTO CDC FROM SNAPSHOT. Ces API automatisent la complexité du calcul des dimensions à variation lente (SCD) Type 1 et Type 2 à partir d’un flux CDC ou d’instantanés de base de données. Pour en savoir plus sur ces concepts, consultez Captures et captures instantanées des données modifiées.

Note

Les AUTO CDC API remplacent les APPLY CHANGES API et ont la même syntaxe. Les APPLY CHANGES API sont toujours disponibles, mais Databricks recommande d’utiliser les AUTO CDC API à leur place.

L’API que vous utilisez dépend de la source de vos données modifiées :

  • AUTO CDC: utilisez cette option lorsque la base de données source a un flux CDC activé. AUTO CDC traite les modifications d’un flux de données modifiées (CDF). Il est pris en charge dans les interfaces SQL et Python du pipeline.
  • AUTO CDC FROM SNAPSHOT: Utilisez cette option lorsque la CDC (capture de données modifiées) n'est pas activée sur la base de données source et que seuls des instantanés sont disponibles. Cette API compare les instantanés pour déterminer les modifications, puis les traite. Elle est prise en charge uniquement dans l’interface Python.

Les deux API prennent en charge la mise à jour des tables à l’aide de SCD Type 1 et de Type 2 :

  • Utilisez SCD Type 1 pour mettre à jour les enregistrements directement. L’historique n’est pas conservé pour les enregistrements mis à jour.
  • Utilisez SCD Type 2 pour conserver un historique des enregistrements, soit sur toutes les mises à jour, soit sur les mises à jour d’un ensemble de colonnes spécifié.

Les AUTO CDC API ne sont pas prises en charge par les pipelines déclaratifs Apache Spark.

Pour la syntaxe et d'autres références, consultez AUTO CDC INTO (pipelines), create_auto_cdc_flow et create_auto_cdc_from_snapshot_flow.

Note

Cette page explique comment mettre à jour des tables dans vos pipelines en fonction des modifications apportées aux données sources. Pour savoir comment enregistrer et interroger des informations de modification au niveau des lignes pour les tables Delta, consultez Utiliser le flux de données de modification Delta Lake sur Azure Databricks.

Spécifications

Pour utiliser les API cdc, votre pipeline doit être configuré pour utiliser le SDP serverless ou le SDP Pro ou Advancedles éditions.

Comment fonctionne AUTO CDC

Pour effectuer le traitement CDC avec AUTO CDC, créez une table de diffusion en continu, puis utilisez l’instruction AUTO CDC ... INTO dans SQL ou la fonction create_auto_cdc_flow() en Python pour spécifier la source, les clés et le séquencement du flux de modification. Pour obtenir une explication sur le fonctionnement de la logique de séquencement et SCD, consultez la capture de données modifiées et les instantanés. Consultez les exemples AUTO CDC.

Pour une hydratation initiale à partir d'une source avec un flux de modification, utilisez AUTO CDC avec un flux once, puis continuez à traiter le flux de modification. Consultez Répliquer une table RDBMS externe à l’aide de l’AUTO CDC.

Pour plus d’informations sur la syntaxe, consultez AUTO CDC INTO (pipelines) ou create_auto_cdc_flow.

Fonctionnement de la capture automatique des données modifiées à partir de la capture instantanée

AUTO CDC FROM SNAPSHOT détermine les modifications apportées aux données sources en comparant les instantanés dans l’ordre. Elle est prise en charge uniquement dans l’interface de pipeline Python. Vous pouvez lire des instantanés à partir d’une table Delta, de fichiers de stockage cloud ou de JDBC directement.

Pour effectuer le traitement CDC avec AUTO CDC FROM SNAPSHOT, créez une table en continu, puis utilisez la fonction create_auto_cdc_from_snapshot_flow() pour spécifier l’instantané, les clés et d’autres arguments. Pour plus d’informations sur les deux modèles d’ingestion et sur le moment où les utiliser, consultez les modèles de traitement par instantané. Consultez les exemples AUTO CDC FROM SNAPSHOT.

Pour plus d’informations sur la syntaxe, consultez create_auto_cdc_from_snapshot_flow.

Utiliser plusieurs colonnes pour le séquencement

Pour séquencer plusieurs colonnes (par exemple, un horodatage et un ID pour rompre les liens), utilisez-les STRUCT pour les combiner. L’API commande d’abord par le premier champ, et en cas d’égalité, considère le deuxième champ, et ainsi de suite.

SQL

SEQUENCE BY STRUCT(timestamp_col, id_col)

Python

sequence_by = struct("timestamp_col", "id_col")

Exemples AUTO de CDC

Les exemples suivants illustrent le traitement SCD Type 1 et Type 2 à l’aide d’une source de flux de données modifiées. L’exemple de données crée de nouveaux enregistrements utilisateur, supprime un enregistrement utilisateur et met à jour les enregistrements utilisateur. Dans l’exemple SCD Type 1, les dernières UPDATE opérations arrivent en retard et sont supprimées de la table cible, ce qui illustre la gestion des événements hors séquence.

Voici les enregistrements d’entrée utilisés dans ces exemples. Ces données sont créées en exécutant la requête dans la section Créer des exemples de données .

userId nom city opération Numéro de séquence
124 Raul Oaxaca INSERT 1
123 Isabel Monterrey INSERT 1
125 Mercedes Tijuana INSERT 2
126 Lys Cancun INSERT 2
123 zéro zéro Supprimer 6
125 Mercedes Guadalajara UPDATE 6
125 Mercedes Mexicali UPDATE 5
123 Isabel Chihuahua UPDATE 5

Si vous supprimez les marques de commentaire de la ligne finale dans l’exemple de requête de génération de données, il insère l’enregistrement suivant qui spécifie de tronquer la table (effacer la table) à sequenceNum=3:

userId nom city opération Numéro de séquence
zéro zéro zéro TRONQUER 3

Note

Tous les exemples suivants incluent des options pour spécifier à la fois DELETE et TRUNCATE des opérations, mais chacune est facultative.

Créer des exemples de données

Exécutez les instructions suivantes pour créer un exemple de jeu de données. Ce code n’est pas destiné à être exécuté dans le cadre d’une définition de pipeline. Exécutez-le à partir du dossier d’exploration de votre pipeline, plutôt que du dossier des transformations.

CREATE SCHEMA IF NOT EXISTS main.cdc_tutorial;

CREATE TABLE main.cdc_tutorial.users_cdf
AS SELECT
  col1 AS userId,
  col2 AS name,
  col3 AS city,
  col4 AS operation,
  col5 AS sequenceNum
FROM (
  VALUES
  -- Initial load.
  (124, "Raul",     "Oaxaca",      "INSERT", 1),
  (123, "Isabel",   "Monterrey",   "INSERT", 1),
  -- New users.
  (125, "Mercedes", "Tijuana",     "INSERT", 2),
  (126, "Lily",     "Cancun",      "INSERT", 2),
  -- Isabel is removed from the system and Mercedes moved to Guadalajara.
  (123, null,       null,          "DELETE", 6),
  (125, "Mercedes", "Guadalajara", "UPDATE", 6),
  -- This batch of updates arrived out of order. The batch at sequenceNum 6 is the final state.
  (125, "Mercedes", "Mexicali",    "UPDATE", 5),
  (123, "Isabel",   "Chihuahua",   "UPDATE", 5)
  -- Uncomment to test TRUNCATE.
  -- ,(null, null,      null,          "TRUNCATE", 3)
);

Traiter les mises à jour SCD Type 1

SCD Type 1 conserve uniquement la dernière version de chaque enregistrement. L’exemple suivant lit à partir du flux de modifications créé ci-dessus et applique les modifications à une cible de table de streaming. Développez des pipelines déclaratifs Spark Lakeflow pour exécuter ce code.

Python

from pyspark import pipelines as dp
from pyspark.sql.functions import col, expr

@dp.view
def users():
  return spark.readStream.table("main.cdc_tutorial.users_cdf")

dp.create_streaming_table("users_current")

dp.create_auto_cdc_flow(
  target = "users_current",
  source = "users",
  keys = ["userId"],
  sequence_by = col("sequenceNum"),
  apply_as_deletes = expr("operation = 'DELETE'"),
  apply_as_truncates = expr("operation = 'TRUNCATE'"),
  except_column_list = ["operation", "sequenceNum"],
  stored_as_scd_type = 1
)

SQL

CREATE OR REFRESH STREAMING TABLE users_current;

CREATE FLOW apply_cdc AS AUTO CDC INTO
  users_current
FROM
  stream(main.cdc_tutorial.users_cdf)
KEYS
  (userId)
APPLY AS DELETE WHEN
  operation = "DELETE"
APPLY AS TRUNCATE WHEN
  operation = "TRUNCATE"
SEQUENCE BY
  sequenceNum
COLUMNS * EXCEPT
  (operation, sequenceNum)
STORED AS
  SCD TYPE 1;

Après avoir exécuté l’exemple SCD Type 1, la table cible contient les enregistrements suivants :

userId nom city
124 Raul Oaxaca
125 Mercedes Guadalajara
126 Lys Cancun

L’utilisateur 123 (Isabel) a été supprimé et n’apparaît pas. L’utilisateur 125 (Mercedes) affiche uniquement la dernière ville (Guadalajara), car SCD Type 1 remplace les valeurs précédentes. La version antérieure UPDATE a été supprimée à sequenceNum=5, car une mise à jour ultérieure a été reçue à sequenceNum=6.

Après avoir exécuté l’exemple avec l’enregistrement TRUNCATE non commenté, la table est effacée à l’adresse sequenceNum=3. Cela signifie que les enregistrements 124 et 126 ne figurent pas dans la table et que la table cible finale contient uniquement l’enregistrement suivant :

userId nom city
125 Mercedes Guadalajara

Traiter les mises à jour SCD Type 2

SCD Type 2 conserve un historique complet des modifications en créant de nouvelles lignes pour chaque version d’un enregistrement, avec __START_AT et __END_AT des colonnes indiquant quand chaque version était active.

Python

from pyspark import pipelines as dp
from pyspark.sql.functions import col, expr

@dp.view
def users():
  return spark.readStream.table("main.cdc_tutorial.users_cdf")

dp.create_streaming_table("users_history")

dp.create_auto_cdc_flow(
  target = "users_history",
  source = "users",
  keys = ["userId"],
  sequence_by = col("sequenceNum"),
  apply_as_deletes = expr("operation = 'DELETE'"),
  except_column_list = ["operation", "sequenceNum"],
  stored_as_scd_type = "2"
)

SQL

CREATE OR REFRESH STREAMING TABLE users_history;

CREATE FLOW apply_cdc AS AUTO CDC INTO
  users_history
FROM
  stream(main.cdc_tutorial.users_cdf)
KEYS
  (userId)
APPLY AS DELETE WHEN
  operation = "DELETE"
SEQUENCE BY
  sequenceNum
COLUMNS * EXCEPT
  (operation, sequenceNum)
STORED AS
  SCD TYPE 2;

Après avoir exécuté l’exemple SCD Type 2, la table cible contient les enregistrements suivants :

userId nom city __START_AT __END_AT
123 Isabel Monterrey 1 5
123 Isabel Chihuahua 5 6
124 Raul Oaxaca 1 zéro
125 Mercedes Tijuana 2 5
125 Mercedes Mexicali 5 6
125 Mercedes Guadalajara 6 zéro
126 Lys Cancun 2 zéro

La table conserve l’historique complet. L’utilisateur 123 a deux versions (terminées à la séquence 6 lors de la suppression). L’utilisateur 125 a trois versions montrant les changements de ville. Les enregistrements avec __END_AT = null sont actuellement actifs.

Suivre un sous-ensemble de colonnes avec SCD Type 2

Par défaut, SCD Type 2 crée une nouvelle version chaque fois qu’une valeur de colonne change. Vous pouvez spécifier un sous-ensemble de colonnes à suivre, afin que les modifications apportées à d’autres colonnes mettent à jour la version actuelle en place plutôt que de générer un nouvel enregistrement d’historique.

L’exemple suivant exclut la colonne city du suivi de l’historique :

Python

from pyspark import pipelines as dp
from pyspark.sql.functions import col, expr

@dp.view
def users():
  return spark.readStream.table("main.cdc_tutorial.users_cdf")

dp.create_streaming_table("users_history")

dp.create_auto_cdc_flow(
  target = "users_history",
  source = "users",
  keys = ["userId"],
  sequence_by = col("sequenceNum"),
  apply_as_deletes = expr("operation = 'DELETE'"),
  except_column_list = ["operation", "sequenceNum"],
  stored_as_scd_type = "2",
  track_history_except_column_list = ["city"]
)

SQL

CREATE OR REFRESH STREAMING TABLE users_history;

CREATE FLOW apply_cdc AS AUTO CDC INTO
  users_history
FROM
  stream(main.cdc_tutorial.users_cdf)
KEYS
  (userId)
APPLY AS DELETE WHEN
  operation = "DELETE"
SEQUENCE BY
  sequenceNum
COLUMNS * EXCEPT
  (operation, sequenceNum)
STORED AS
  SCD TYPE 2
TRACK HISTORY ON * EXCEPT
  (city)

Étant donné que city les modifications ne sont pas suivies, les mises à jour de la ville remplacent la ligne actuelle au lieu de créer une nouvelle version. La table cible contient les enregistrements suivants :

userId nom city __START_AT __END_AT
123 Isabel Chihuahua 1 6
124 Raul Oaxaca 1 zéro
125 Mercedes Guadalajara 2 zéro
126 Lys Cancun 2 zéro

Exemples de AUTO CDC FROM SNAPSHOT

Les sections suivantes fournissent des exemples d’utilisation AUTO CDC FROM SNAPSHOT pour traiter des instantanés dans des tables cibles SCD Type 1 ou Type 2. Pour plus d’informations sur l’utilisation de cette API, consultez captures et instantanés de données modifiées.

Exemple : Traiter des instantanés à l’aide du temps d’ingestion du pipeline

Utilisez cette approche lorsque des instantanés arrivent régulièrement et dans l’ordre, et que vous pouvez vous appuyer sur l’horodatage d’exécution du pipeline pour la gestion des versions. Un nouvel instantané est intégré avec chaque mise à jour du pipeline de traitement.

Vous pouvez lire des captures instantanées à partir de plusieurs types sources, notamment les tables Delta, les fichiers de stockage cloud et les connexions JDBC.

Étape 1 : Créer des exemples de données

Créez une table contenant des données instantanées. Exécutez le code suivant à partir d’un notebook ou de Databricks SQL dans le explorations dossier de votre pipeline :

CREATE SCHEMA IF NOT EXISTS main.cdc_tutorial;

CREATE TABLE main.cdc_tutorial.snapshot (
  userId INT,
  city STRING
);

INSERT INTO main.cdc_tutorial.snapshot VALUES
  (1, 'Oaxaca'),
  (2, 'Monterrey'),
  (3, 'Tijuana');

Étape 2 : Exécuter AUTO CDC À PARTIR DU SNAPSHOT

Développez des pipelines déclaratifs Spark Lakeflow pour exécuter le code à cette étape.

Choisissez un type source pour la vue d’instantané (l’exemple de code de création génère une table Delta) :

Option A : Lecture à partir d’une table Delta
from pyspark import pipelines as dp

@dp.view(name="source")
def source():
  return spark.read.table("main.cdc_tutorial.snapshot")
Option B : Lecture à partir du stockage cloud
from pyspark import pipelines as dp

@dp.view(name="source")
def source():
  return spark.read.format("csv").option("header", True).load("<snapshot-path>")
Option C : Lecture à partir de JDBC (calcul classique uniquement)
from pyspark import pipelines as dp

@dp.view(name="source")
def source():
  return (spark.read
    .format("jdbc")
    .option("url", "<jdbc-url>")
    .option("dbtable", "<table-name>")
    .option("user", "<username>")
    .option("password", "<password>")
    .load()
  )

Toutes les options, écrire dans la cible

Ajoutez ensuite la table cible et le flux :

dp.create_streaming_table("target")

dp.create_auto_cdc_from_snapshot_flow(
  target = "target",
  source = "source",
  keys = ["userId"],
  stored_as_scd_type = 2
)

Après la première exécution du pipeline, tous les enregistrements sont insérés en tant que lignes actives :

userId city __START_AT __END_AT
1 Oaxaca 0 zéro
2 Monterrey 0 zéro
3 Tijuana 0 zéro

Note

Pour utiliser SCD Type 1 à la place et conserver uniquement l’état actuel, définissez stored_as_scd_type=1. Dans ce cas, la table cible n'inclut pas les colonnes __START_AT et __END_AT.

Étape 3 : Simuler un nouvel instantané et réexécuter

Mettez à jour la table source pour simuler un nouvel instantané arrivant (exécutez ce code à partir d’un notebook ou d’un fichier SQL dans le explorations dossier de votre pipline) :

TRUNCATE TABLE main.cdc_tutorial.snapshot;

INSERT INTO main.cdc_tutorial.snapshot VALUES
  (2, 'Carmel'),
  (3, 'Los Angeles'),
  (4, 'Death Valley'),
  (6, 'Kings Canyon');

Exécuter de nouveau le pipeline. AUTO CDC FROM SNAPSHOT compare la nouvelle capture instantanée à la précédente et détecte que l’utilisateur 1 a été supprimé, les utilisateurs 2 et 3 ont été mis à jour, et les utilisateurs 4 et 6 ont été insérés. Cela génère un flux de modification et utilise AUTO CDC pour créer la table de sortie.

Après la deuxième exécution avec SCD Type 2, la table cible contient les enregistrements suivants :

userId city __START_AT __END_AT
1 Oaxaca 0 1
2 Monterrey 0 1
2 Carmel 1 zéro
3 Tijuana 0 1
3 Los Angeles 1 zéro
4 Vallée de la mort 1 zéro
6 Kings Canyon 1 zéro

L’utilisateur 1 a été supprimé. Les utilisateurs 2 et 3 ont chacun deux versions affichant leurs changements de ville. Les utilisateurs 4 et 6 ont été récemment insérés.

Après la deuxième exécution avec SCD Type 1, la table cible affiche uniquement l’état actuel :

userId city
2 Carmel
3 Los Angeles
4 Vallée de la mort
6 Kings Canyon

Exemple : Traiter des instantanés à l’aide de fonctions de version

Utilisez cette approche lorsque vous avez besoin d’un contrôle explicite sur l’ordre des instantanés. Par exemple, utilisez cette approche lorsque plusieurs instantanés arrivent en même temps ou que les instantanés arrivent dans le désordre. Vous écrivez une fonction qui spécifie l’instantané à traiter suivant et son numéro de version. L’API traite les instantanés dans l’ordre de version croissant :

  • Si plusieurs instantanés sont en stockage, ils sont tous traités dans l’ordre.
  • Si un instantané arrive dans le désordre (par exemple, snapshot_3 arrive après snapshot_4), il est ignoré.
  • S’il n’y a pas de nouveaux instantanés, la fonction retourne None et aucun traitement ne se produit.

Étape 1 : Préparer les fichiers de snapshots

Créez des fichiers CSV contenant des données d’instantané et ajoutez-les à un volume ou à un emplacement de stockage cloud. Nommez les fichiers par ordre chronologique (par exemple, snapshot_1.csv, snapshot_2.csv).

Chaque fichier doit contenir des colonnes pour userId et city. Par exemple:

snapshot_1.csv :

userId city
1 Oaxaca
2 Monterrey
3 Tijuana

snapshot_2.csv :

userId city
2 Carmel
3 Los Angeles
4 Vallée de la mort

Étape 2 : Exécuter AUTO CDC FROM SNAPSHOT avec une fonction de version

Créez un bloc-notes et collez le code de pipeline suivant. Développez ensuite des pipelines déclaratifs Spark Lakeflow.

from pyspark import pipelines as dp
from typing import Optional, Tuple
from pyspark.sql import DataFrame

def next_snapshot_and_version(latest_snapshot_version: Optional[int]) -> Optional[Tuple[DataFrame, int]]:
  snapshot_dir = "/Volumes/main/cdc_tutorial/snapshots/" # or the location you created the sample data

  files = dbutils.fs.ls(snapshot_dir)
  snapshot_files = [f.name for f in files if f.name.startswith("snapshot_") and f.name.endswith(".csv")]

  snapshot_versions = []
  for filename in snapshot_files:
    try:
      version = int(filename.replace("snapshot_", "").replace(".csv", ""))
      snapshot_versions.append(version)
    except ValueError:
      continue

  snapshot_versions.sort()

  if latest_snapshot_version is None:
    if snapshot_versions:
      next_version = snapshot_versions[0]
    else:
      return None
  else:
    next_versions = [v for v in snapshot_versions if v > latest_snapshot_version]
    if next_versions:
      next_version = next_versions[0]
    else:
      return None

  snapshot_path = f"{snapshot_dir}snapshot_{next_version}.csv"
  df = spark.read.format("csv").option("header", True).load(snapshot_path)
  return (df, next_version)


dp.create_streaming_table("main.cdc_tutorial.target_versioned")

dp.create_auto_cdc_from_snapshot_flow(
  target = "main.cdc_tutorial.target_versioned",
  source = next_snapshot_and_version,
  keys = ["userId"],
  stored_as_scd_type = 2
)

Note

Pour utiliser SCD Type 1 à la place, définissez stored_as_scd_type=1.

Après le traitement snapshot_1.csv, la table cible contient les enregistrements suivants :

userId city __START_AT __END_AT
1 Oaxaca 1 zéro
2 Monterrey 1 zéro
3 Tijuana 1 zéro

Après le traitement snapshot_2.csv, la table cible contient les enregistrements suivants :

userId city __START_AT __END_AT
1 Oaxaca 1 2
2 Monterrey 1 2
2 Carmel 2 zéro
3 Tijuana 1 2
3 Los Angeles 2 zéro
4 Vallée de la mort 2 zéro

Note

N’oubliez pas que, pour SCD Type 1, la table ressemble exactement à la capture instantanée la plus récente. La différence est que les requêtes en aval peuvent utiliser le flux de modification pour traiter uniquement les enregistrements modifiés.

Étape 3 : Ajouter de nouveaux instantanés

Ajoutez un nouveau fichier CSV à l’emplacement de stockage avec des données modifiées (par exemple, des valeurs de ville modifiées, de nouvelles lignes ou des lignes supprimées). Ensuite, réexécutez le pipeline pour traiter le nouvel instantané.

Limites

  • La colonne de séquencement doit être un type de données triable. NULL Les valeurs de séquencement ne sont pas prises en charge.
  • AUTO CDC FROM SNAPSHOT est pris en charge uniquement dans l’interface de pipeline Python ; l’interface SQL n’est pas prise en charge.

Ressources supplémentaires