Freigeben über


Lernprogramm: Koordinieren von Transaktionen über Tabellen hinweg

Von Bedeutung

Transaktionen, die in verwaltete Delta-Tabellen im Unity-Katalog schreiben, befinden sich in der öffentlichen Vorschau.

Transaktionen, die in verwaltete Iceberg-Tabellen im Unity-Katalog schreiben, befinden sich in der privaten Vorschau. Um dieser Vorschau beizutreten, übermitteln Sie das Vorschauformular für verwaltete Iceberg-Tabellen.

In diesem Tutorial wird veranschaulicht, wie Transaktionen verwendet werden, um Aktualisierungen zwischen mehreren Anweisungen und Tabellen zu koordinieren. Sie lernen beide Transaktionsmodi kennen: nicht interaktive Transaktionen, die automatisch commiten, und interaktive Transaktionen, die Ihnen die explizite Kontrolle geben. Das Lernprogramm veranschaulicht auch die Verwendung von Transaktionen mit gespeicherten Prozeduren und SQL Scripting zum Erstellen von unternehmenskritischen Lagerarbeitslasten in Azure Databricks.

Anforderungen

  • Umgebung: Zugriff auf einen Azure Databricks-Arbeitsbereich.
  • Compute: Unterstützte Computetypen variieren je nach Transaktionsmodus:
    • Ein klassisches oder serverloses SQL Warehouse unterstützt beide Transaktionsmodi.
    • Serverless Compute unterstützt nur nicht interaktive Transaktionen.
    • Klassische Cluster mit Databricks Runtime 18.0 oder höher unterstützen nur nicht interaktive Transaktionen.
  • Berechtigungen: CREATE TABLE in einem Unity-Katalogschema .

Einrichten von Beispieltabellen

Alle Tabellen, die in einer Mehranweisungs- und Mehrtabellentransaktion beschrieben werden, müssen Folgendes erfüllen:

Erstellen Sie zwei Beispieltabellen im SQL-Editor oder einem Notizbuch:

-- Create a table for account data
CREATE TABLE IF NOT EXISTS sample_accounts (
  id INT,
  account_name STRING,
  balance DECIMAL(10,2)
) USING DELTA
TBLPROPERTIES (
  'delta.feature.catalogManaged' = 'supported'
);

-- Create a table for transaction records
CREATE TABLE IF NOT EXISTS sample_transactions (
  id INT,
  account_id INT,
  transaction_type STRING,
  amount DECIMAL(10,2)
) USING DELTA
TBLPROPERTIES (
  'delta.feature.catalogManaged' = 'supported'
);

-- To upgrade an existing table, use:
-- ALTER TABLE <table_name> SET TBLPROPERTIES ('delta.feature.catalogManaged' = 'supported');

-- Insert sample data
INSERT INTO sample_accounts VALUES
  (1, 'Alice', 1000.00),
  (2, 'Bob', 500.00);

INSERT INTO sample_transactions VALUES
  (1, 1, 'deposit', 100.00);

Überprüfen Sie das Setup:

SELECT * FROM sample_accounts;
SELECT * FROM sample_transactions;

Ausgabe:

sample_accounts:
id	account_name	balance
1	  Alice	        1000.00
2	  Bob	          500.00

sample_transactions:
id	account_id	transaction_type	amount
1	           1	         deposit	100.00

Nicht interaktive Transaktionen

Nicht interaktive Transaktionen verwenden BEGIN ATOMIC ... END; Syntax. Alle Anweisungen werden als einzelne Atomeinheit ausgeführt. Wenn jede Anweisung erfolgreich ist, übernimmt Azure Databricks die Änderungen automatisch. Wenn eine Anweisung fehlschlägt, setzt Azure Databricks alle Änderungen automatisch zurück. Ausführliche Syntax- und Verwendungsmuster finden Sie unter nicht interaktive Transaktionen.

Ausführen einer erfolgreichen Transaktion

Aktualisieren Sie beide Tabellen atomar:

BEGIN ATOMIC
  -- Update Alice's account balance
  UPDATE sample_accounts
  SET balance = balance + 100.00
  WHERE id = 1;

  -- Record the deposit transaction
  INSERT INTO sample_transactions
  VALUES (2, 1, 'deposit', 100.00);
END;

Überprüfen Sie, ob beide Vorgänge erfolgreich waren:

-- Alice's balance should now be 1100.00
SELECT * FROM sample_accounts WHERE id = 1;

-- Should show two transaction records
SELECT * FROM sample_transactions;

Sowohl die Aktualisierung des Saldos als auch der Transaktionsdatensatz wurden zusammen erstellt. Wenn eine der Anweisungen fehlgeschlagen wäre, wäre keine Änderung übernommen worden, und Databricks hätte die Transaktion ohne Nebenwirkungen beendet.

Verwenden Sie SIGNAL, um eine Transaktion bei einer Bedingung scheitern zu lassen

Sie können innerhalb eines SIGNAL Blocks verwendenBEGIN ATOMIC ... END;, um die Transaktion fehlzuschlagen, wenn eine benutzerdefinierte Bedingung nicht erfüllt ist. Dies ist hilfreich für die Datenüberprüfung vor dem Commit:

BEGIN ATOMIC
  -- Insert new account
  INSERT INTO sample_accounts VALUES (3, 'Charlie', -50.00);

  -- Fail the transaction if balance is negative
  IF (SELECT balance FROM sample_accounts WHERE id = 3) < 0 THEN
    SIGNAL SQLSTATE '45000' SET MESSAGE_TEXT = 'Account balance cannot be negative';
  END IF;
END;

Der SIGNAL Fehler löst einen Fehler aus, der dazu führt, dass die gesamte Transaktion automatisch zurückgerollt wird. Überprüfen Sie, ob der Insert-Vorgang rückgängig gemacht wurde:

-- Should return 0 rows (the transaction was rolled back by SIGNAL)
SELECT * FROM sample_accounts WHERE id = 3;

Anzeigen des automatischen Rollbacks bei Fehlern

Ausführen einer Transaktion mit einer ungültigen Anweisung:

BEGIN ATOMIC
  -- This statement is valid
  INSERT INTO sample_accounts VALUES (4, 'David', 300.00);

  -- This statement will fail (table does not exist)
  INSERT INTO non_existent_table VALUES (1, 2, 3);
END;

Die Transaktion schlägt mit einem Fehler fehl. Überprüfen Sie, ob die erste Anweisung rückgängig gemacht wurde.

-- Should return 0 rows because the transaction was rolled back
SELECT * FROM sample_accounts WHERE id = 4;

Obwohl die erste INSERT Anweisung gültig war, wurde ein Rollback ausgeführt, da die zweite Anweisung fehlgeschlagen ist. Dies veranschaulicht die gesamte oder gar keine Garantie für Transaktionen.

Interaktive Transaktionen

Interaktive Transaktionen geben Ihnen explizite Kontrolle darüber, wann ein Commit ausgeführt oder ein Rollback ausgeführt werden soll. Verwenden Sie BEGIN TRANSACTION , um zu starten, und schließen Sie mit COMMIT ab, um Änderungen zu speichern, oder ROLLBACK , um sie zu verwerfen.

Committen von Änderungen

Starten einer Transaktion:

BEGIN TRANSACTION;

Änderungen vornehmen (noch nicht zugesichert):

INSERT INTO sample_accounts VALUES (5, 'Eve', 850.00);
UPDATE sample_accounts SET balance = balance + 50.00 WHERE id = 2;

Commit, um Änderungen dauerhaft vorzunehmen:

COMMIT;

Überprüfen Sie die Änderungen:

-- Eve's account should now be visible
SELECT * FROM sample_accounts WHERE id = 5;

-- Bob's balance should be 550.00 (500 + 50)
SELECT * FROM sample_accounts WHERE id = 2;

Zurücksetzen von Änderungen

Neue Transaktion starten:

BEGIN TRANSACTION;

Nehmen Sie eine Änderung vor:

INSERT INTO sample_accounts VALUES (6, 'Frank', 600.00);

Überprüfen Sie, ob die Änderung in Ihrer Sitzung sichtbar ist:

-- Should show Frank's account (visible in your session only)
SELECT * FROM sample_accounts WHERE id = 6;

Führen Sie einen Rollback aus, um die Änderung zu verwerfen:

ROLLBACK;

Überprüfen Sie, ob die Änderung verworfen wurde:

-- Should return 0 rows (the insert was rolled back)
SELECT * FROM sample_accounts WHERE id = 6;

Verwendung mit gespeicherten Prozeduren und SQL-Skripting

Sie können Transaktionen mit gespeicherten Prozeduren kombinieren, um wiederverwendbare Transaktionslogik zu erstellen. Dieses Muster eignet sich für komplexe Vorgänge, die Sie häufig ausführen.

  1. Erstellen Sie zwei Tabellen mit aktiviertem Katalog-Management für Commits.

    
    CREATE TABLE orders (order_id STRING, item_sku STRING, quantity_ordered INT)
     TBLPROPERTIES ('delta.feature.catalogManaged' = 'supported');
    
    CREATE TABLE inventory (item_sku STRING, quantity_in_stock INT)
     TBLPROPERTIES ('delta.feature.catalogManaged' = 'supported');
    
    
  2. Definieren der gespeicherten Prozedur

    CREATE OR REPLACE PROCEDURE main.retail.apply_order(
        IN  p_order_id      STRING,
        IN  p_customer_id   STRING,
        IN  p_order_amount  DECIMAL(18,2)
    )
    LANGUAGE SQL
    SQL SECURITY INVOKER
    MODIFIES SQL DATA
    AS
    BEGIN
        -- Insert the order
        INSERT INTO main.retail.orders (order_id, customer_id, amount)
        VALUES (p_order_id, p_customer_id, p_order_amount);
    
        -- Update total sales per customer
        MERGE INTO main.retail.total_sales AS t
        USING (
            SELECT
              p_customer_id  AS customer_id,
              p_order_amount AS order_amount
        ) s
          ON t.customer_id = s.customer_id
        WHEN MATCHED THEN
          UPDATE SET t.total_amount = t.total_amount + s.order_amount
        WHEN NOT MATCHED THEN
          INSERT (customer_id, total_amount)
          VALUES (s.customer_id, s.order_amount);
    END;
    
    
  3. Definieren der Transaktion

    BEGIN ATOMIC
        -- Staging batch id for this transaction
        DECLARE new_order_id STRING DEFAULT uuid();
        DECLARE v_batch_id STRING DEFAULT uuid();
    
        -- 1) Stage incoming customer and order rows
        INSERT INTO main.retail.orders_staging (order_id, customer_id, amount, batch_id)
        VALUES (new_order_id, 'CUST_123', 249.99, v_batch_id);
    
        -- 2) Drive final writes from staging to production via stored procedure
        FOR o AS
          SELECT
            order_id,
            customer_id,
            amount
          FROM main.retail.orders_staging
          WHERE batch_id = v_batch_id
        DO
            CALL main.retail.apply_order(
              o.order_id,
              o.customer_id,
              o.amount
            );
        END FOR;
    
        -- 3) Clean up processed staging rows
        DELETE FROM main.retail.orders_staging
        WHERE batch_id = v_batch_id;
    END; -- 4) Commit the transaction
    
    

Wenn ein Teil der Transaktion fehlschlägt, werden alle Änderungen automatisch von Databricks zurückgesetzt.

Bereinigen

Entfernen Sie die Beispieltabellen:

DROP TABLE IF EXISTS sample_accounts;
DROP TABLE IF EXISTS sample_transactions;


DROP TABLE IF EXISTS orders;
DROP TABLE IF EXISTS inventory;

Nächste Schritte