Partager via


Rubriques avancées sur l'AUTO CDC

Cette page aborde des sujets avancés sur l'utilisation de AUTO CDC et AUTO CDC FROM SNAPSHOT avec les tables cibles, incluant les opérations DML, la lecture des flux de données modifiées et le suivi des métriques de traitement. Pour une présentation des AUTO CDC API AUTO CDC, consultez Les API AUTO CDC : simplifie la capture de données modifiées avec des pipelines.

Ajouter, modifier ou supprimer des données dans une table de diffusion en continu cible

Si votre pipeline publie des tables dans le catalogue Unity, vous pouvez utiliser des instructions DML (Langage de manipulation de données), y compris les instructions insert, update, delete et merge, pour modifier les tables de streaming cibles créées par des instructions .

Note

  • Les instructions DML qui modifient le schéma de table d’une table de streaming ne sont pas prises en charge. Assurez-vous que vos instructions DML ne tentent pas de faire évoluer le schéma de table.
  • Les instructions DML qui mettent à jour une table de streaming ne peuvent être exécutées que dans un cluster Unity Catalog partagé ou un entrepôt SQL à l’aide de Databricks Runtime 13.3 LTS et versions ultérieures.
  • Étant donné que le streaming nécessite des sources de données en ajout uniquement, si votre traitement nécessite le streaming à partir d'une table source de streaming avec des modifications (par exemple, via des instructions DML), définissez l'indicateur skipChangeCommits lors de la lecture de la table de streaming source. Lorsque skipChangeCommits est défini, les transactions qui suppriment ou modifient des enregistrements sur la table source sont ignorées. Si votre traitement ne nécessite pas de table de flux, vous pouvez utiliser une vue matérialisée (qui n’a pas la restriction d’ajout uniquement) comme table cible.

Étant donné que les pipelines déclaratifs Spark Lakeflow utilisent une colonne spécifiée SEQUENCE BY et propagent les valeurs de séquencement appropriées vers les colonnes __START_AT et __END_AT de la table cible (pour le Type 2 SCD), vous devez vous assurer que les instructions DML utilisent des valeurs valides pour ces colonnes afin de conserver l'ordre approprié des enregistrements. Découvrez comment fonctionne auto CDC.

Pour plus d’informations sur l’utilisation d’instructions DML avec des tables de streaming, consultez Ajouter, modifier ou supprimer des données dans une table de diffusion en continu.

L’exemple suivant insère un enregistrement actif avec une séquence de début de 5 :

INSERT INTO my_streaming_table (id, name, __START_AT, __END_AT) VALUES (123, 'John Doe', 5, NULL);

Conseil / Astuce

Si vous devez renommer les colonnes __START_AT et __END_AT de votre table cible SCD Type 2 (par exemple, pour répondre aux exigences de structure en aval), créez une vue sur la table cible :

CREATE VIEW my_employees_view AS
SELECT
  *,
  __START_AT AS valid_from,
  __END_AT AS valid_to
FROM my_scd2_target_table;

Lire un flux de données modifiées à partir d’une table cible AUTO CDC

Dans Databricks Runtime 15.2 et versions ultérieures, vous pouvez lire un flux de données modifiées à partir d'une table de streaming qui fait l'objet des requêtes AUTO CDC ou AUTO CDC FROM SNAPSHOT de la même façon que vous lisez un flux de données modifiées à partir d'autres tables Delta. Les éléments suivants sont nécessaires pour lire le flux de données modifiées à partir d’une table de diffusion en continu cible :

  • La table de streaming cible doit être publiée sur le catalogue Unity. Consultez Utiliser le catalogue Unity avec des pipelines.
  • Pour lire le flux de données de modification de la table de diffusion en continu cible, vous devez utiliser Databricks Runtime 15.2 ou version ultérieure. Pour lire le flux de données de modification dans un autre pipeline, le pipeline doit être configuré pour utiliser Databricks Runtime 15.2 ou version ultérieure.

Vous lisez le flux de données de modification à partir d'une table de diffusion en continu cible qui a été créée dans les pipelines déclaratifs de Spark Lakeflow de la même manière que pour la lecture d'un flux de données modifiées à partir d'autres tables Delta. Pour en savoir plus sur l’utilisation de la fonctionnalité de flux de données modifiées Delta, notamment des exemples en Python et SQL, consultez Utiliser le flux de données modifiées Delta Lake sur Azure Databricks.

Note

L’enregistrement de flux de données modifiées inclut des métadonnées identifiant le type d’événement de modification. Lorsqu’un enregistrement est mis à jour dans une table, les métadonnées des enregistrements de modification associés incluent généralement des événements _change_type et des valeurs update_preimage définies par update_postimage.

Toutefois, les valeurs _change_type sont différentes si des mises à jour sont apportées à la table de flux cible, comprenant la modification des valeurs de clé primaire. Lorsque les modifications incluent des mises à jour des clés primaires, les _change_type champs de métadonnées sont définis sur les insert et delete événements. Les modifications apportées aux clés primaires peuvent se produire lorsque des mises à jour manuelles sont effectuées sur l’un des champs de clé avec une instruction UPDATE ou MERGE, ou, pour les tables de type SCD 2, lorsque le champ __start_at change pour refléter une valeur de séquence de début antérieure.

La AUTO CDC requête détermine les valeurs de clé primaire, qui diffèrent pour le traitement SCD type 1 et SCD type 2 :

SCD Type Clé primaire
ScD type 1 et l’interface Python des pipelines La clé primaire est la valeur du keys paramètre dans la create_auto_cdc_flow() fonction. Pour l’interface SQL, la clé primaire est les colonnes définies par la KEYS clause dans l’instruction AUTO CDC ... INTO .
Type SCD 2 La clé primaire est le keys paramètre ou KEYS clause plus la valeur de retour de l’opération coalesce(__START_AT, __END_AT), où __START_AT et __END_AT sont les colonnes correspondantes de la table de flux cible.

Obtenir des données sur les enregistrements traités par une requête CDC dans les pipelines

Note

Les métriques suivantes sont capturées uniquement par AUTO CDC les requêtes et non par AUTO CDC FROM SNAPSHOT les requêtes.

Les métriques suivantes sont capturées par les requêtes AUTO CDC :

  • num_upserted_rows: Le nombre de lignes de sortie fusionnées dans le jeu de données lors d'une mise à jour.
  • num_deleted_rows: nombre de lignes de sortie existantes supprimées du jeu de données pendant une mise à jour.

La métrique num_output_rows, générée pour les flux non-CDC, n'est pas capturée lors des requêtes AUTO CDC.

Quels objets de données sont utilisés pour le traitement CDC dans un pipeline ?

Lorsque vous déclarez la table cible dans le metastore Hive, deux structures de données sont créées :

  • Vue utilisant le nom attribué à la table cible.
  • Table de stockage interne utilisée par le pipeline pour gérer le traitement CDC. Cette table est nommée en prédéfinissant __apply_changes_storage_ le nom de la table cible.

Par exemple, si vous déclarez une table cible nommée dp_cdc_target, vous voyez une vue nommée dp_cdc_target et une table nommée __apply_changes_storage_dp_cdc_target dans le metastore. Consultez la vue pour accéder aux données traitées. Ne modifiez pas directement la table de support.

Note

Ces structures de données s’appliquent uniquement au AUTO CDC traitement, pas au AUTO CDC FROM SNAPSHOT traitement. Ils s’appliquent également uniquement au metastore Hive, et non au catalogue Unity.