Partager via


Tutoriel : Coordonner les transactions entre les tables

Important

Les transactions qui écrivent dans les tables Delta gérées par Unity Catalog sont en Aperçu public.

Les transactions qui écrivent dans les tables Iceberg gérées par le catalogue Unity sont en préversion privée. Pour rejoindre cette préversion, envoyez le formulaire d’inscription en préversion des tables Iceberg managées.

Ce tutoriel montre comment utiliser des transactions pour coordonner les mises à jour entre plusieurs instructions et tables. Vous découvrez les deux modes de transaction : les transactions non interactives, qui valident automatiquement et les transactions interactives, ce qui vous donne un contrôle explicite. Le tutoriel montre également l’utilisation de transactions avec des procédures stockées et des scripts SQL pour créer des charges de travail d’entreposage stratégiques sur Azure Databricks.

Exigences

  • Environnement : accès à un espace de travail Azure Databricks.
  • Calcul : Les types de calcul pris en charge varient selon le mode transactionnel :
    • Un entrepôt SQL classique ou serverless prend en charge les deux modes de transaction.
    • Le calcul serverless prend uniquement en charge les transactions non interactives.
    • Les clusters classiques exécutant Databricks Runtime 18.0 ou version ultérieure prennent uniquement en charge les transactions non interactives.
  • Privilèges : CREATE TABLE dans un schéma de Unity Catalog.

Configurer des exemples de tables

Toutes les tables écrites dans une transaction multi-instruction, multi-table doivent :

Créez deux exemples de tables dans l’Éditeur SQL ou un notebook :

-- Create a table for account data
CREATE TABLE IF NOT EXISTS sample_accounts (
  id INT,
  account_name STRING,
  balance DECIMAL(10,2)
) USING DELTA
TBLPROPERTIES (
  'delta.feature.catalogManaged' = 'supported'
);

-- Create a table for transaction records
CREATE TABLE IF NOT EXISTS sample_transactions (
  id INT,
  account_id INT,
  transaction_type STRING,
  amount DECIMAL(10,2)
) USING DELTA
TBLPROPERTIES (
  'delta.feature.catalogManaged' = 'supported'
);

-- To upgrade an existing table, use:
-- ALTER TABLE <table_name> SET TBLPROPERTIES ('delta.feature.catalogManaged' = 'supported');

-- Insert sample data
INSERT INTO sample_accounts VALUES
  (1, 'Alice', 1000.00),
  (2, 'Bob', 500.00);

INSERT INTO sample_transactions VALUES
  (1, 1, 'deposit', 100.00);

Vérifiez la configuration :

SELECT * FROM sample_accounts;
SELECT * FROM sample_transactions;

Output:

sample_accounts:
id	account_name	balance
1	  Alice	        1000.00
2	  Bob	          500.00

sample_transactions:
id	account_id	transaction_type	amount
1	           1	         deposit	100.00

Transactions non interactives

Les transactions non interactives utilisent la syntaxe BEGIN ATOMIC ... END;. Toutes les instructions s’exécutent en tant qu’unité atomique unique. Si chaque instruction réussit, Azure Databricks valide automatiquement. Si une instruction échoue, Azure Databricks restaure automatiquement toutes les modifications. Pour obtenir des modèles détaillés de syntaxe et d’utilisation, consultez les transactions non interactives.

Exécuter une transaction réussie

Mettez à jour les deux tables de manière atomique :

BEGIN ATOMIC
  -- Update Alice's account balance
  UPDATE sample_accounts
  SET balance = balance + 100.00
  WHERE id = 1;

  -- Record the deposit transaction
  INSERT INTO sample_transactions
  VALUES (2, 1, 'deposit', 100.00);
END;

Vérifiez que les deux opérations ont réussi :

-- Alice's balance should now be 1100.00
SELECT * FROM sample_accounts WHERE id = 1;

-- Should show two transaction records
SELECT * FROM sample_transactions;

La mise à jour du solde et le journal de transaction ont été créés ensemble. Si l’une ou l’autre instruction a échoué, aucune modification n’aurait été validée et Databricks aurait arrêté la transaction sans effets secondaires.

Utiliser SIGNAL pour échouer une transaction sur une condition

Vous pouvez utiliser SIGNAL à l’intérieur d’un BEGIN ATOMIC ... END; bloc pour échouer la transaction lorsqu’une condition définie par l’utilisateur n’est pas remplie. Cela est utile pour la validation des données avant de confirmer :

BEGIN ATOMIC
  -- Insert new account
  INSERT INTO sample_accounts VALUES (3, 'Charlie', -50.00);

  -- Fail the transaction if balance is negative
  IF (SELECT balance FROM sample_accounts WHERE id = 3) < 0 THEN
    SIGNAL SQLSTATE '45000' SET MESSAGE_TEXT = 'Account balance cannot be negative';
  END IF;
END;

Le SIGNAL déclenche une erreur, ce qui entraîne le retour en arrière automatique de l’intégralité de la transaction. Vérifiez que l’insertion a été annulée :

-- Should return 0 rows (the transaction was rolled back by SIGNAL)
SELECT * FROM sample_accounts WHERE id = 3;

Afficher la restauration automatique en cas d’échec

Exécutez une transaction avec une déclaration invalide :

BEGIN ATOMIC
  -- This statement is valid
  INSERT INTO sample_accounts VALUES (4, 'David', 300.00);

  -- This statement will fail (table does not exist)
  INSERT INTO non_existent_table VALUES (1, 2, 3);
END;

La transaction échoue avec une erreur. Vérifiez que la première déclaration a été annulée.

-- Should return 0 rows because the transaction was rolled back
SELECT * FROM sample_accounts WHERE id = 4;

Même si la première INSERT instruction était valide, elle a été restaurée parce que la deuxième instruction a échoué. Cela illustre la garantie tout ou rien des transactions.

Transactions interactives

Les transactions interactives vous donnent un contrôle explicite sur le moment où confirmer ou annuler. Utilisez BEGIN TRANSACTION pour démarrer, puis COMMIT pour enregistrer les modifications ou ROLLBACK pour les ignorer.

Valider les modifications

Démarrez une transaction :

BEGIN TRANSACTION;

Apportez des modifications (pas encore validées) :

INSERT INTO sample_accounts VALUES (5, 'Eve', 850.00);
UPDATE sample_accounts SET balance = balance + 50.00 WHERE id = 2;

Confirmez des modifications permanentes :

COMMIT;

Vérifiez les modifications :

-- Eve's account should now be visible
SELECT * FROM sample_accounts WHERE id = 5;

-- Bob's balance should be 550.00 (500 + 50)
SELECT * FROM sample_accounts WHERE id = 2;

Restaurer les modifications

Démarrez une nouvelle transaction :

BEGIN TRANSACTION;

Apportez une modification :

INSERT INTO sample_accounts VALUES (6, 'Frank', 600.00);

Vérifiez que la modification est visible dans votre session :

-- Should show Frank's account (visible in your session only)
SELECT * FROM sample_accounts WHERE id = 6;

Annulez pour supprimer la modification :

ROLLBACK;

Vérifiez que la modification a été ignorée :

-- Should return 0 rows (the insert was rolled back)
SELECT * FROM sample_accounts WHERE id = 6;

Utiliser avec des procédures stockées et un script SQL

Vous pouvez combiner des transactions avec des procédures stockées pour créer une logique de transaction réutilisable. Ce modèle est utile pour les opérations complexes que vous exécutez fréquemment.

  1. Créer deux tables avec des validations gérées par le catalogue activées

    
    CREATE TABLE orders (order_id STRING, item_sku STRING, quantity_ordered INT)
     TBLPROPERTIES ('delta.feature.catalogManaged' = 'supported');
    
    CREATE TABLE inventory (item_sku STRING, quantity_in_stock INT)
     TBLPROPERTIES ('delta.feature.catalogManaged' = 'supported');
    
    
  2. Définir une procédure stockée

    CREATE OR REPLACE PROCEDURE main.retail.apply_order(
        IN  p_order_id      STRING,
        IN  p_customer_id   STRING,
        IN  p_order_amount  DECIMAL(18,2)
    )
    LANGUAGE SQL
    SQL SECURITY INVOKER
    MODIFIES SQL DATA
    AS
    BEGIN
        -- Insert the order
        INSERT INTO main.retail.orders (order_id, customer_id, amount)
        VALUES (p_order_id, p_customer_id, p_order_amount);
    
        -- Update total sales per customer
        MERGE INTO main.retail.total_sales AS t
        USING (
            SELECT
              p_customer_id  AS customer_id,
              p_order_amount AS order_amount
        ) s
          ON t.customer_id = s.customer_id
        WHEN MATCHED THEN
          UPDATE SET t.total_amount = t.total_amount + s.order_amount
        WHEN NOT MATCHED THEN
          INSERT (customer_id, total_amount)
          VALUES (s.customer_id, s.order_amount);
    END;
    
    
  3. Définir la transaction

    BEGIN ATOMIC
        -- Staging batch id for this transaction
        DECLARE new_order_id STRING DEFAULT uuid();
        DECLARE v_batch_id STRING DEFAULT uuid();
    
        -- 1) Stage incoming customer and order rows
        INSERT INTO main.retail.orders_staging (order_id, customer_id, amount, batch_id)
        VALUES (new_order_id, 'CUST_123', 249.99, v_batch_id);
    
        -- 2) Drive final writes from staging to production via stored procedure
        FOR o AS
          SELECT
            order_id,
            customer_id,
            amount
          FROM main.retail.orders_staging
          WHERE batch_id = v_batch_id
        DO
            CALL main.retail.apply_order(
              o.order_id,
              o.customer_id,
              o.amount
            );
        END FOR;
    
        -- 3) Clean up processed staging rows
        DELETE FROM main.retail.orders_staging
        WHERE batch_id = v_batch_id;
    END; -- 4) Commit the transaction
    
    

Si une partie de la transaction échoue, Databricks restaure automatiquement toutes les modifications.

Nettoyer

Supprimez les exemples de tables :

DROP TABLE IF EXISTS sample_accounts;
DROP TABLE IF EXISTS sample_transactions;


DROP TABLE IF EXISTS orders;
DROP TABLE IF EXISTS inventory;

Étapes suivantes