Remarque
L’accès à cette page nécessite une autorisation. Vous pouvez essayer de vous connecter ou de modifier des répertoires.
L’accès à cette page nécessite une autorisation. Vous pouvez essayer de modifier des répertoires.
Les pipelines déclaratifs de Lakeflow Spark simplifient la capture de données modifiées (CDC) avec les API AUTO CDC et AUTO CDC FROM SNAPSHOT. Ces API automatisent la complexité du calcul des dimensions à variation lente (SCD) Type 1 et Type 2 à partir d’un flux CDC ou d’instantanés de base de données. Pour en savoir plus sur ces concepts, consultez Captures et captures instantanées des données modifiées.
Note
Les AUTO CDC API remplacent les APPLY CHANGES API et ont la même syntaxe. Les APPLY CHANGES API sont toujours disponibles, mais Databricks recommande d’utiliser les AUTO CDC API à leur place.
L’API que vous utilisez dépend de la source de vos données modifiées :
-
AUTO CDC: utilisez cette option lorsque la base de données source a un flux CDC activé.AUTO CDCtraite les modifications d’un flux de données modifiées (CDF). Il est pris en charge dans les interfaces SQL et Python du pipeline. -
AUTO CDC FROM SNAPSHOT: Utilisez cette option lorsque la CDC (capture de données modifiées) n'est pas activée sur la base de données source et que seuls des instantanés sont disponibles. Cette API compare les instantanés pour déterminer les modifications, puis les traite. Elle est prise en charge uniquement dans l’interface Python.
Les deux API prennent en charge la mise à jour des tables à l’aide de SCD Type 1 et de Type 2 :
- Utilisez SCD Type 1 pour mettre à jour les enregistrements directement. L’historique n’est pas conservé pour les enregistrements mis à jour.
- Utilisez SCD Type 2 pour conserver un historique des enregistrements, soit sur toutes les mises à jour, soit sur les mises à jour d’un ensemble de colonnes spécifié.
Les AUTO CDC API ne sont pas prises en charge par les pipelines déclaratifs Apache Spark.
Pour la syntaxe et d'autres références, consultez AUTO CDC INTO (pipelines), create_auto_cdc_flow et create_auto_cdc_from_snapshot_flow.
Note
Cette page explique comment mettre à jour des tables dans vos pipelines en fonction des modifications apportées aux données sources. Pour savoir comment enregistrer et interroger des informations de modification au niveau des lignes pour les tables Delta, consultez Utiliser le flux de données de modification Delta Lake sur Azure Databricks.
Spécifications
Pour utiliser les API cdc, votre pipeline doit être configuré pour utiliser le SDP serverless ou le SDP Pro ou Advancedles éditions.
Comment fonctionne AUTO CDC
Pour effectuer le traitement CDC avec AUTO CDC, créez une table de diffusion en continu, puis utilisez l’instruction AUTO CDC ... INTO dans SQL ou la fonction create_auto_cdc_flow() en Python pour spécifier la source, les clés et le séquencement du flux de modification. Pour obtenir une explication sur le fonctionnement de la logique de séquencement et SCD, consultez la capture de données modifiées et les instantanés. Consultez les exemples AUTO CDC.
Pour une hydratation initiale à partir d'une source avec un flux de modification, utilisez AUTO CDC avec un flux once, puis continuez à traiter le flux de modification. Consultez Répliquer une table RDBMS externe à l’aide de l’AUTO CDC.
Pour plus d’informations sur la syntaxe, consultez AUTO CDC INTO (pipelines) ou create_auto_cdc_flow.
Fonctionnement de la capture automatique des données modifiées à partir de la capture instantanée
AUTO CDC FROM SNAPSHOT détermine les modifications apportées aux données sources en comparant les instantanés dans l’ordre. Elle est prise en charge uniquement dans l’interface de pipeline Python. Vous pouvez lire des instantanés à partir d’une table Delta, de fichiers de stockage cloud ou de JDBC directement.
Pour effectuer le traitement CDC avec AUTO CDC FROM SNAPSHOT, créez une table en continu, puis utilisez la fonction create_auto_cdc_from_snapshot_flow() pour spécifier l’instantané, les clés et d’autres arguments. Pour plus d’informations sur les deux modèles d’ingestion et sur le moment où les utiliser, consultez les modèles de traitement par instantané. Consultez les exemples AUTO CDC FROM SNAPSHOT.
Pour plus d’informations sur la syntaxe, consultez create_auto_cdc_from_snapshot_flow.
Utiliser plusieurs colonnes pour le séquencement
Pour séquencer plusieurs colonnes (par exemple, un horodatage et un ID pour rompre les liens), utilisez-les STRUCT pour les combiner. L’API commande d’abord par le premier champ, et en cas d’égalité, considère le deuxième champ, et ainsi de suite.
SQL
SEQUENCE BY STRUCT(timestamp_col, id_col)
Python
sequence_by = struct("timestamp_col", "id_col")
Exemples AUTO de CDC
Les exemples suivants illustrent le traitement SCD Type 1 et Type 2 à l’aide d’une source de flux de données modifiées. L’exemple de données crée de nouveaux enregistrements utilisateur, supprime un enregistrement utilisateur et met à jour les enregistrements utilisateur. Dans l’exemple SCD Type 1, les dernières UPDATE opérations arrivent en retard et sont supprimées de la table cible, ce qui illustre la gestion des événements hors séquence.
Voici les enregistrements d’entrée utilisés dans ces exemples. Ces données sont créées en exécutant la requête dans la section Créer des exemples de données .
| userId | nom | city | opération | Numéro de séquence |
|---|---|---|---|---|
| 124 | Raul | Oaxaca | INSERT | 1 |
| 123 | Isabel | Monterrey | INSERT | 1 |
| 125 | Mercedes | Tijuana | INSERT | 2 |
| 126 | Lys | Cancun | INSERT | 2 |
| 123 | zéro | zéro | Supprimer | 6 |
| 125 | Mercedes | Guadalajara | UPDATE | 6 |
| 125 | Mercedes | Mexicali | UPDATE | 5 |
| 123 | Isabel | Chihuahua | UPDATE | 5 |
Si vous supprimez les marques de commentaire de la ligne finale dans l’exemple de requête de génération de données, il insère l’enregistrement suivant qui spécifie de tronquer la table (effacer la table) à sequenceNum=3:
| userId | nom | city | opération | Numéro de séquence |
|---|---|---|---|---|
| zéro | zéro | zéro | TRONQUER | 3 |
Note
Tous les exemples suivants incluent des options pour spécifier à la fois DELETE et TRUNCATE des opérations, mais chacune est facultative.
Créer des exemples de données
Exécutez les instructions suivantes pour créer un exemple de jeu de données. Ce code n’est pas destiné à être exécuté dans le cadre d’une définition de pipeline. Exécutez-le à partir du dossier d’exploration de votre pipeline, plutôt que du dossier des transformations.
CREATE SCHEMA IF NOT EXISTS main.cdc_tutorial;
CREATE TABLE main.cdc_tutorial.users_cdf
AS SELECT
col1 AS userId,
col2 AS name,
col3 AS city,
col4 AS operation,
col5 AS sequenceNum
FROM (
VALUES
-- Initial load.
(124, "Raul", "Oaxaca", "INSERT", 1),
(123, "Isabel", "Monterrey", "INSERT", 1),
-- New users.
(125, "Mercedes", "Tijuana", "INSERT", 2),
(126, "Lily", "Cancun", "INSERT", 2),
-- Isabel is removed from the system and Mercedes moved to Guadalajara.
(123, null, null, "DELETE", 6),
(125, "Mercedes", "Guadalajara", "UPDATE", 6),
-- This batch of updates arrived out of order. The batch at sequenceNum 6 is the final state.
(125, "Mercedes", "Mexicali", "UPDATE", 5),
(123, "Isabel", "Chihuahua", "UPDATE", 5)
-- Uncomment to test TRUNCATE.
-- ,(null, null, null, "TRUNCATE", 3)
);
Traiter les mises à jour SCD Type 1
SCD Type 1 conserve uniquement la dernière version de chaque enregistrement. L’exemple suivant lit à partir du flux de modifications créé ci-dessus et applique les modifications à une cible de table de streaming. Développez des pipelines déclaratifs Spark Lakeflow pour exécuter ce code.
Python
from pyspark import pipelines as dp
from pyspark.sql.functions import col, expr
@dp.view
def users():
return spark.readStream.table("main.cdc_tutorial.users_cdf")
dp.create_streaming_table("users_current")
dp.create_auto_cdc_flow(
target = "users_current",
source = "users",
keys = ["userId"],
sequence_by = col("sequenceNum"),
apply_as_deletes = expr("operation = 'DELETE'"),
apply_as_truncates = expr("operation = 'TRUNCATE'"),
except_column_list = ["operation", "sequenceNum"],
stored_as_scd_type = 1
)
SQL
CREATE OR REFRESH STREAMING TABLE users_current;
CREATE FLOW apply_cdc AS AUTO CDC INTO
users_current
FROM
stream(main.cdc_tutorial.users_cdf)
KEYS
(userId)
APPLY AS DELETE WHEN
operation = "DELETE"
APPLY AS TRUNCATE WHEN
operation = "TRUNCATE"
SEQUENCE BY
sequenceNum
COLUMNS * EXCEPT
(operation, sequenceNum)
STORED AS
SCD TYPE 1;
Après avoir exécuté l’exemple SCD Type 1, la table cible contient les enregistrements suivants :
| userId | nom | city |
|---|---|---|
| 124 | Raul | Oaxaca |
| 125 | Mercedes | Guadalajara |
| 126 | Lys | Cancun |
L’utilisateur 123 (Isabel) a été supprimé et n’apparaît pas. L’utilisateur 125 (Mercedes) affiche uniquement la dernière ville (Guadalajara), car SCD Type 1 remplace les valeurs précédentes. La version antérieure UPDATE a été supprimée à sequenceNum=5, car une mise à jour ultérieure a été reçue à sequenceNum=6.
Après avoir exécuté l’exemple avec l’enregistrement TRUNCATE non commenté, la table est effacée à l’adresse sequenceNum=3. Cela signifie que les enregistrements 124 et 126 ne figurent pas dans la table et que la table cible finale contient uniquement l’enregistrement suivant :
| userId | nom | city |
|---|---|---|
| 125 | Mercedes | Guadalajara |
Traiter les mises à jour SCD Type 2
SCD Type 2 conserve un historique complet des modifications en créant de nouvelles lignes pour chaque version d’un enregistrement, avec __START_AT et __END_AT des colonnes indiquant quand chaque version était active.
Python
from pyspark import pipelines as dp
from pyspark.sql.functions import col, expr
@dp.view
def users():
return spark.readStream.table("main.cdc_tutorial.users_cdf")
dp.create_streaming_table("users_history")
dp.create_auto_cdc_flow(
target = "users_history",
source = "users",
keys = ["userId"],
sequence_by = col("sequenceNum"),
apply_as_deletes = expr("operation = 'DELETE'"),
except_column_list = ["operation", "sequenceNum"],
stored_as_scd_type = "2"
)
SQL
CREATE OR REFRESH STREAMING TABLE users_history;
CREATE FLOW apply_cdc AS AUTO CDC INTO
users_history
FROM
stream(main.cdc_tutorial.users_cdf)
KEYS
(userId)
APPLY AS DELETE WHEN
operation = "DELETE"
SEQUENCE BY
sequenceNum
COLUMNS * EXCEPT
(operation, sequenceNum)
STORED AS
SCD TYPE 2;
Après avoir exécuté l’exemple SCD Type 2, la table cible contient les enregistrements suivants :
| userId | nom | city | __START_AT | __END_AT |
|---|---|---|---|---|
| 123 | Isabel | Monterrey | 1 | 5 |
| 123 | Isabel | Chihuahua | 5 | 6 |
| 124 | Raul | Oaxaca | 1 | zéro |
| 125 | Mercedes | Tijuana | 2 | 5 |
| 125 | Mercedes | Mexicali | 5 | 6 |
| 125 | Mercedes | Guadalajara | 6 | zéro |
| 126 | Lys | Cancun | 2 | zéro |
La table conserve l’historique complet. L’utilisateur 123 a deux versions (terminées à la séquence 6 lors de la suppression). L’utilisateur 125 a trois versions montrant les changements de ville. Les enregistrements avec __END_AT = null sont actuellement actifs.
Suivre un sous-ensemble de colonnes avec SCD Type 2
Par défaut, SCD Type 2 crée une nouvelle version chaque fois qu’une valeur de colonne change. Vous pouvez spécifier un sous-ensemble de colonnes à suivre, afin que les modifications apportées à d’autres colonnes mettent à jour la version actuelle en place plutôt que de générer un nouvel enregistrement d’historique.
L’exemple suivant exclut la colonne city du suivi de l’historique :
Python
from pyspark import pipelines as dp
from pyspark.sql.functions import col, expr
@dp.view
def users():
return spark.readStream.table("main.cdc_tutorial.users_cdf")
dp.create_streaming_table("users_history")
dp.create_auto_cdc_flow(
target = "users_history",
source = "users",
keys = ["userId"],
sequence_by = col("sequenceNum"),
apply_as_deletes = expr("operation = 'DELETE'"),
except_column_list = ["operation", "sequenceNum"],
stored_as_scd_type = "2",
track_history_except_column_list = ["city"]
)
SQL
CREATE OR REFRESH STREAMING TABLE users_history;
CREATE FLOW apply_cdc AS AUTO CDC INTO
users_history
FROM
stream(main.cdc_tutorial.users_cdf)
KEYS
(userId)
APPLY AS DELETE WHEN
operation = "DELETE"
SEQUENCE BY
sequenceNum
COLUMNS * EXCEPT
(operation, sequenceNum)
STORED AS
SCD TYPE 2
TRACK HISTORY ON * EXCEPT
(city)
Étant donné que city les modifications ne sont pas suivies, les mises à jour de la ville remplacent la ligne actuelle au lieu de créer une nouvelle version. La table cible contient les enregistrements suivants :
| userId | nom | city | __START_AT | __END_AT |
|---|---|---|---|---|
| 123 | Isabel | Chihuahua | 1 | 6 |
| 124 | Raul | Oaxaca | 1 | zéro |
| 125 | Mercedes | Guadalajara | 2 | zéro |
| 126 | Lys | Cancun | 2 | zéro |
Exemples de AUTO CDC FROM SNAPSHOT
Les sections suivantes fournissent des exemples d’utilisation AUTO CDC FROM SNAPSHOT pour traiter des instantanés dans des tables cibles SCD Type 1 ou Type 2. Pour plus d’informations sur l’utilisation de cette API, consultez captures et instantanés de données modifiées.
Exemple : Traiter des instantanés à l’aide du temps d’ingestion du pipeline
Utilisez cette approche lorsque des instantanés arrivent régulièrement et dans l’ordre, et que vous pouvez vous appuyer sur l’horodatage d’exécution du pipeline pour la gestion des versions. Un nouvel instantané est intégré avec chaque mise à jour du pipeline de traitement.
Vous pouvez lire des captures instantanées à partir de plusieurs types sources, notamment les tables Delta, les fichiers de stockage cloud et les connexions JDBC.
Étape 1 : Créer des exemples de données
Créez une table contenant des données instantanées. Exécutez le code suivant à partir d’un notebook ou de Databricks SQL dans le explorations dossier de votre pipeline :
CREATE SCHEMA IF NOT EXISTS main.cdc_tutorial;
CREATE TABLE main.cdc_tutorial.snapshot (
userId INT,
city STRING
);
INSERT INTO main.cdc_tutorial.snapshot VALUES
(1, 'Oaxaca'),
(2, 'Monterrey'),
(3, 'Tijuana');
Étape 2 : Exécuter AUTO CDC À PARTIR DU SNAPSHOT
Développez des pipelines déclaratifs Spark Lakeflow pour exécuter le code à cette étape.
Choisissez un type source pour la vue d’instantané (l’exemple de code de création génère une table Delta) :
Option A : Lecture à partir d’une table Delta
from pyspark import pipelines as dp
@dp.view(name="source")
def source():
return spark.read.table("main.cdc_tutorial.snapshot")
Option B : Lecture à partir du stockage cloud
from pyspark import pipelines as dp
@dp.view(name="source")
def source():
return spark.read.format("csv").option("header", True).load("<snapshot-path>")
Option C : Lecture à partir de JDBC (calcul classique uniquement)
from pyspark import pipelines as dp
@dp.view(name="source")
def source():
return (spark.read
.format("jdbc")
.option("url", "<jdbc-url>")
.option("dbtable", "<table-name>")
.option("user", "<username>")
.option("password", "<password>")
.load()
)
Toutes les options, écrire dans la cible
Ajoutez ensuite la table cible et le flux :
dp.create_streaming_table("target")
dp.create_auto_cdc_from_snapshot_flow(
target = "target",
source = "source",
keys = ["userId"],
stored_as_scd_type = 2
)
Après la première exécution du pipeline, tous les enregistrements sont insérés en tant que lignes actives :
| userId | city | __START_AT | __END_AT |
|---|---|---|---|
| 1 | Oaxaca | 0 | zéro |
| 2 | Monterrey | 0 | zéro |
| 3 | Tijuana | 0 | zéro |
Note
Pour utiliser SCD Type 1 à la place et conserver uniquement l’état actuel, définissez stored_as_scd_type=1. Dans ce cas, la table cible n'inclut pas les colonnes __START_AT et __END_AT.
Étape 3 : Simuler un nouvel instantané et réexécuter
Mettez à jour la table source pour simuler un nouvel instantané arrivant (exécutez ce code à partir d’un notebook ou d’un fichier SQL dans le explorations dossier de votre pipline) :
TRUNCATE TABLE main.cdc_tutorial.snapshot;
INSERT INTO main.cdc_tutorial.snapshot VALUES
(2, 'Carmel'),
(3, 'Los Angeles'),
(4, 'Death Valley'),
(6, 'Kings Canyon');
Exécuter de nouveau le pipeline.
AUTO CDC FROM SNAPSHOT compare la nouvelle capture instantanée à la précédente et détecte que l’utilisateur 1 a été supprimé, les utilisateurs 2 et 3 ont été mis à jour, et les utilisateurs 4 et 6 ont été insérés. Cela génère un flux de modification et utilise AUTO CDC pour créer la table de sortie.
Après la deuxième exécution avec SCD Type 2, la table cible contient les enregistrements suivants :
| userId | city | __START_AT | __END_AT |
|---|---|---|---|
| 1 | Oaxaca | 0 | 1 |
| 2 | Monterrey | 0 | 1 |
| 2 | Carmel | 1 | zéro |
| 3 | Tijuana | 0 | 1 |
| 3 | Los Angeles | 1 | zéro |
| 4 | Vallée de la mort | 1 | zéro |
| 6 | Kings Canyon | 1 | zéro |
L’utilisateur 1 a été supprimé. Les utilisateurs 2 et 3 ont chacun deux versions affichant leurs changements de ville. Les utilisateurs 4 et 6 ont été récemment insérés.
Après la deuxième exécution avec SCD Type 1, la table cible affiche uniquement l’état actuel :
| userId | city |
|---|---|
| 2 | Carmel |
| 3 | Los Angeles |
| 4 | Vallée de la mort |
| 6 | Kings Canyon |
Exemple : Traiter des instantanés à l’aide de fonctions de version
Utilisez cette approche lorsque vous avez besoin d’un contrôle explicite sur l’ordre des instantanés. Par exemple, utilisez cette approche lorsque plusieurs instantanés arrivent en même temps ou que les instantanés arrivent dans le désordre. Vous écrivez une fonction qui spécifie l’instantané à traiter suivant et son numéro de version. L’API traite les instantanés dans l’ordre de version croissant :
- Si plusieurs instantanés sont en stockage, ils sont tous traités dans l’ordre.
- Si un instantané arrive dans le désordre (par exemple,
snapshot_3arrive aprèssnapshot_4), il est ignoré. - S’il n’y a pas de nouveaux instantanés, la fonction retourne
Noneet aucun traitement ne se produit.
Étape 1 : Préparer les fichiers de snapshots
Créez des fichiers CSV contenant des données d’instantané et ajoutez-les à un volume ou à un emplacement de stockage cloud. Nommez les fichiers par ordre chronologique (par exemple, snapshot_1.csv, snapshot_2.csv).
Chaque fichier doit contenir des colonnes pour userId et city. Par exemple:
snapshot_1.csv :
| userId | city |
|---|---|
| 1 | Oaxaca |
| 2 | Monterrey |
| 3 | Tijuana |
snapshot_2.csv :
| userId | city |
|---|---|
| 2 | Carmel |
| 3 | Los Angeles |
| 4 | Vallée de la mort |
Étape 2 : Exécuter AUTO CDC FROM SNAPSHOT avec une fonction de version
Créez un bloc-notes et collez le code de pipeline suivant. Développez ensuite des pipelines déclaratifs Spark Lakeflow.
from pyspark import pipelines as dp
from typing import Optional, Tuple
from pyspark.sql import DataFrame
def next_snapshot_and_version(latest_snapshot_version: Optional[int]) -> Optional[Tuple[DataFrame, int]]:
snapshot_dir = "/Volumes/main/cdc_tutorial/snapshots/" # or the location you created the sample data
files = dbutils.fs.ls(snapshot_dir)
snapshot_files = [f.name for f in files if f.name.startswith("snapshot_") and f.name.endswith(".csv")]
snapshot_versions = []
for filename in snapshot_files:
try:
version = int(filename.replace("snapshot_", "").replace(".csv", ""))
snapshot_versions.append(version)
except ValueError:
continue
snapshot_versions.sort()
if latest_snapshot_version is None:
if snapshot_versions:
next_version = snapshot_versions[0]
else:
return None
else:
next_versions = [v for v in snapshot_versions if v > latest_snapshot_version]
if next_versions:
next_version = next_versions[0]
else:
return None
snapshot_path = f"{snapshot_dir}snapshot_{next_version}.csv"
df = spark.read.format("csv").option("header", True).load(snapshot_path)
return (df, next_version)
dp.create_streaming_table("main.cdc_tutorial.target_versioned")
dp.create_auto_cdc_from_snapshot_flow(
target = "main.cdc_tutorial.target_versioned",
source = next_snapshot_and_version,
keys = ["userId"],
stored_as_scd_type = 2
)
Note
Pour utiliser SCD Type 1 à la place, définissez stored_as_scd_type=1.
Après le traitement snapshot_1.csv, la table cible contient les enregistrements suivants :
| userId | city | __START_AT | __END_AT |
|---|---|---|---|
| 1 | Oaxaca | 1 | zéro |
| 2 | Monterrey | 1 | zéro |
| 3 | Tijuana | 1 | zéro |
Après le traitement snapshot_2.csv, la table cible contient les enregistrements suivants :
| userId | city | __START_AT | __END_AT |
|---|---|---|---|
| 1 | Oaxaca | 1 | 2 |
| 2 | Monterrey | 1 | 2 |
| 2 | Carmel | 2 | zéro |
| 3 | Tijuana | 1 | 2 |
| 3 | Los Angeles | 2 | zéro |
| 4 | Vallée de la mort | 2 | zéro |
Note
N’oubliez pas que, pour SCD Type 1, la table ressemble exactement à la capture instantanée la plus récente. La différence est que les requêtes en aval peuvent utiliser le flux de modification pour traiter uniquement les enregistrements modifiés.
Étape 3 : Ajouter de nouveaux instantanés
Ajoutez un nouveau fichier CSV à l’emplacement de stockage avec des données modifiées (par exemple, des valeurs de ville modifiées, de nouvelles lignes ou des lignes supprimées). Ensuite, réexécutez le pipeline pour traiter le nouvel instantané.
Limites
- La colonne de séquencement doit être un type de données triable.
NULLLes valeurs de séquencement ne sont pas prises en charge. -
AUTO CDC FROM SNAPSHOTest pris en charge uniquement dans l’interface de pipeline Python ; l’interface SQL n’est pas prise en charge.
Ressources supplémentaires
- Capture des modifications de données et captures instantanées : découvrez les concepts de capture des modifications de données, les captures instantanées et les types SCD.
- Répliquez une table SGBDR externe à l'aide de
: découvrez comment effectuer une hydratation initiale avec un flux, puis continuez à traiter les modifications. - Thèmes avancés d'AUTO CDC : Découvrez les opérations de modification sur les cibles AUTO CDC, la lecture des flux de données de modification, et le traitement des métriques.
- Tutoriel : Créer un pipeline ETL à l’aide de la capture de données modifiées