次の方法で共有


テーブル履歴の操作

テーブルを変更する各操作では、新しいテーブル バージョンが作成されます。 履歴情報とタイム トラベルを使用して、操作を監査したり、テーブルをロールバックしたり、特定の時点でテーブルをクエリしたりします。

注意

Databricks では、データ アーカイブの長期的なバックアップ ソリューションとしてテーブル履歴を使用することはお勧めしません。 データとログリテンション期間の両方の構成を大きな値に設定していない限り、タイム トラベル操作には過去 7 日間のみを使用してください。

テーブル履歴の取得

history コマンドを実行して、テーブルへの書き込みごとに操作、ユーザー、タイムスタンプなどの情報を取得します。 操作は、逆の時系列順で返されます。

テーブル履歴の保持期間は、テーブル設定 logRetentionDuration によって決まり、既定では 30 日間です。

注意

タイム トラベルとテーブル履歴は、さまざまな保持しきい値によって制御されます。 「 タイム トラベルとは」を参照してください。

DESCRIBE HISTORY table_name       -- get the full history of the table

DESCRIBE HISTORY table_name LIMIT 1  -- get the last operation only

Spark SQL 構文の詳細については、DESCRIBE HISTORYを参照してください。

Scala、Java、Python 構文の詳細については、Delta Lake API ドキュメントを参照してください。

カタログ エクスプローラー では、この詳細なテーブル情報と履歴が視覚的に表示されます。 テーブル スキーマとサンプル データに加えて、[履歴] タブをクリックして DESCRIBE HISTORY で表示されるテーブル履歴を確認することもできます。

履歴スキーマ

history 操作の出力には、次の列があります。

タイプ 説明
バージョン 長い 操作によって生成されたテーブルのバージョン。
timestamp timestamp このバージョンがコミットされた時点。
ユーザーID 文字列 操作を実行したユーザーの ID。
ユーザー名 文字列 操作を実行したユーザーの名前。
操作 文字列 操作の名前。
操作パラメーター マップ 操作のパラメーター (述語など)。
ジョブ 構造体 操作を実行したジョブの詳細。
ノートブック 構造体 操作が実行されたノートブックの詳細。
クラスターID 文字列 操作が実行されたクラスターの ID。
バージョンを読む 長い 書き込み操作を実行するために読み取ったテーブルのバージョン。
分離レベル (isolationLevel) 文字列 この操作に使用された分離レベル。
isBlindAppend Boolean この操作によってデータが追加されたかどうか。
operationMetrics マップ 操作のメトリック (変更された行数やファイル数など)。
ユーザーメタデータ 文字列 ユーザー定義のコミット メタデータ (指定されている場合)
+-------+-------------------+------+--------+---------+--------------------+----+--------+---------+-----------+-----------------+-------------+--------------------+
|version|          timestamp|userId|userName|operation| operationParameters| job|notebook|clusterId|readVersion|   isolationLevel|isBlindAppend|    operationMetrics|
+-------+-------------------+------+--------+---------+--------------------+----+--------+---------+-----------+-----------------+-------------+--------------------+
|      5|2019-07-29 14:07:47|   ###|     ###|   DELETE|[predicate -> ["(...|null|     ###|      ###|          4|WriteSerializable|        false|[numTotalRows -> ...|
|      4|2019-07-29 14:07:41|   ###|     ###|   UPDATE|[predicate -> (id...|null|     ###|      ###|          3|WriteSerializable|        false|[numTotalRows -> ...|
|      3|2019-07-29 14:07:29|   ###|     ###|   DELETE|[predicate -> ["(...|null|     ###|      ###|          2|WriteSerializable|        false|[numTotalRows -> ...|
|      2|2019-07-29 14:06:56|   ###|     ###|   UPDATE|[predicate -> (id...|null|     ###|      ###|          1|WriteSerializable|        false|[numTotalRows -> ...|
|      1|2019-07-29 14:04:31|   ###|     ###|   DELETE|[predicate -> ["(...|null|     ###|      ###|          0|WriteSerializable|        false|[numTotalRows -> ...|
|      0|2019-07-29 14:01:40|   ###|     ###|    WRITE|[mode -> ErrorIfE...|null|     ###|      ###|       null|WriteSerializable|         true|[numFiles -> 2, n...|
+-------+-------------------+------+--------+---------+--------------------+----+--------+---------+-----------+-----------------+-------------+--------------------+

注意

操作パラメーターの理解partitionBy

partitionBy フィールドは、テーブルのパーティション スキーマを定義または変更する CREATE 操作と OVERWRITE 操作でのみ意味があります。

既存のテーブルへの追加操作 (APPEND、INSERT、UPDATE、DELETE、MERGE) の場合、このフィールドには、使用される書き込み方法 ([].save()) に応じて、空の配列.saveAsTable()またはパーティション列が表示される場合があります。 この不整合は想定される動作であり、書き込みの検証には使用しないでください。

重要

追加操作を検証するために、履歴の partitionBy に依存しないでください。 値は実装の詳細によって異なりますが、パーティションへのデータの書き込み方法には影響しません。

Example

date列でパーティション分割されたテーブルについて考えてみましょう。

# Initial table creation - partitionBy is populated
df.write.format("delta") \
  .partitionBy("date") \
  .saveAsTable("sales_data")

履歴の CREATE 操作には、次の情報が表示されます。

operationParameters: {
  "mode": "ErrorIfExists",
  "partitionBy": "[\"date\"]"
}

このテーブルにデータを追加する場合:

# Subsequent append - partitionBy shows empty
new_df.write.format("delta") \
  .mode("append") \
  .saveAsTable("sales_data")

APPEND 操作は次を示します。

operationParameters: {
  "mode": "Append",
  "partitionBy": "[]"
}

空の partitionBy 値が必要です。 データは、テーブルの既存のパーティション スキーマに基づいて正しいパーティションに書き込まれます。 パスに .save() すると、このフィールドにパーティション列が表示されることがありますが、この違いは実装の詳細であり、書き込み動作には影響しません。

操作メトリック

history操作は、operationMetrics 列マップ内の操作メトリックのコレクションを返します。

次の表は、マップのキー定義を操作別に示しています。

操作 メトリックの名前 説明
WRITE、CREATE TABLE AS SELECT、REPLACE TABLE AS SELECT、COPY INTO
numFiles 書き込まれたファイルの数。
出力バイト数 (numOutputBytes) 書き込まれたコンテンツのサイズ (バイト単位)。
numOutputRows 書き込まれた行の数。
ストリーミング UPDATE
追加ファイル数 追加されたファイルの数。
numRemovedFiles 削除されたファイルの数。
numOutputRows 書き込まれた行の数。
出力バイト数 (numOutputBytes) 書き込みのサイズ (バイト単位)。
削除
追加ファイル数 追加されたファイルの数。 テーブルのパーティションが削除された場合には提供されません。
numRemovedFiles 削除されたファイルの数。
削除された行数 (numDeletedRows) 削除された行の数。 テーブルのパーティションが削除された場合には提供されません。
numCopiedRows ファイルの削除処理中にコピーされた行の数。
実行時間 (ミリ秒) 操作全体の実行にかかった時間。
scanTimeMs ファイルの一致をスキャンするのに要した時間。
rewriteTimeMs 一致したファイルの書き換えに要した時間。
切り捨てる
numRemovedFiles 削除されたファイルの数。
実行時間 (ミリ秒) 操作全体の実行にかかった時間。
マージ
numSourceRows ソース データフレーム内の行の数。
ターゲット行挿入数 ターゲット テーブルに挿入された行の数。
更新された対象行数 ターゲット テーブル内で更新された行の数。
対象行削除数 ターゲット テーブル内で削除された行の数。
ターゲット行コピー数 コピーされたターゲット行の数。
numOutputRows 書き出された行数の合計。
追加された対象ファイル数 シンク (ターゲット) に追加されたファイルの数。
削除された対象ファイル数 シンク (ターゲット) から削除されたファイルの数。
実行時間 (ミリ秒) 操作全体の実行にかかった時間。
scanTimeMs ファイルの一致をスキャンするのに要した時間。
rewriteTimeMs 一致したファイルの書き換えに要した時間。
UPDATE
追加ファイル数 追加されたファイルの数。
numRemovedFiles 削除されたファイルの数。
更新行数 更新された行の数。
numCopiedRows ファイルの更新処理中にコピーされた行の数。
実行時間 (ミリ秒) 操作全体の実行にかかった時間。
scanTimeMs ファイルの一致をスキャンするのに要した時間。
rewriteTimeMs 一致したファイルの書き換えに要した時間。
FSCK numRemovedFiles 削除されたファイルの数。
変換 変換済みファイル数 変換された Parquet ファイルの数。
OPTIMIZE
追加ファイル数 追加されたファイルの数。
numRemovedFiles 最適化されたファイルの数。
numAddedBytes テーブルが最適化された後に追加されたバイト数。
numRemovedBytes (削除済みバイト数) 削除されたバイト数。
最小ファイルサイズ テーブルが最適化された後の最小ファイル サイズ。
p25FileSize テーブルが最適化された後の 25 パーセンタイル ファイルのサイズ。
p50FileSize テーブルが最適化された後のファイル サイズの中央値。
p75FileSize テーブルが最適化された後の 75 パーセンタイル ファイルのサイズ。
最大ファイルサイズ テーブルが最適化された後の最大ファイル サイズ。
クローン
元テーブルのサイズ 複製されたバージョンのソース テーブルのサイズ (バイト単位)。
sourceNumOfFiles 複製されたバージョンのソース テーブル内のファイルの数。
numRemovedFiles 前のテーブルが置き換えられた場合にターゲット テーブルから削除されたファイルの数。
削除されたファイルのサイズ 前のテーブルが置き換えられた場合にターゲット テーブルから削除されたファイルの合計サイズ (バイト単位)。
コピー済みファイル数 新しい場所にコピーされたファイルの数。 シャロー複製の場合は 0。
コピーされたファイルサイズ 新しい場所にコピーされたファイルの合計サイズ (バイト単位)。 シャロー複製の場合は 0。
RESTORE
復元後のテーブルサイズ 復元後のテーブル サイズ (バイト単位)。
復元後のファイル数 復元後のテーブル内のファイルの数。
numRemovedFiles 復元操作によって削除されたファイルの数。
復元されたファイル数 復元の結果として追加されたファイルの数。
削除されたファイルのサイズ 復元によって削除されたファイルのサイズ (バイト単位)。
復元ファイルサイズ 復元によって追加されたファイルのサイズ (バイト単位)。
VACUUM
削除されたファイル数 削除されたファイルの数。
バキューム済みディレクトリ数 バキューム処理されたディレクトリの数。
削除するファイルの数 削除するファイルの数。

タイム トラベルとは

タイム トラベルでは、タイムスタンプまたはテーブルのバージョン (トランザクション ログに記録される) に基づいて、以前のテーブル バージョンのクエリがサポートされます。 タイム トラベルは、次のようなアプリケーションに使用できます。

  • 分析、レポート、または出力 (機械学習モデルの出力など) を再作成する。 これは、特に規制対象の業界でデバッグや監査を行う際に役立ちます。
  • 複雑なテンポラル クエリを記述する。
  • データの間違いを修正する。
  • 急速に変化するテーブルに対応する一連のクエリに対してスナップショット分離を提供する。

重要

Databricks Runtime 18.0 以降では、 deletedFileRetentionDuration テーブル プロパティより古いバージョン (既定では 7 日間) を要求した場合、タイム トラベル クエリはブロックされます。 Unity カタログのマネージド テーブルの場合、これは Databricks Runtime 12.2 以降に適用されます。

タイム トラベル構文

テーブル名の指定の後に句を追加して、タイム トラベルを使用してテーブルにクエリを実行します。

  • timestamp_expression には次のいずれかを指定できます。
    • '2018-10-18T22:15:12.013Z'、つまり、タイムスタンプにキャストできる文字列です
    • cast('2018-10-18 13:36:32 CEST' as timestamp)
    • '2018-10-18'、つまり、日付文字列です
    • current_timestamp() - interval 12 hours
    • date_sub(current_date(), 1)
    • タイムスタンプにキャストされる (できる) その他の式
  • version は、DESCRIBE HISTORY table_spec の出力から取得できる long 型の値です。

timestamp_expressionversion もサブクエリにすることはできません。

日付またはタイムスタンプ文字列のみが使用できます。 たとえば、"2019-01-01""2019-01-01T00:00:00.000Z"す。 構文の例については、次のコードを参照してください。

SQL

SELECT * FROM people10m TIMESTAMP AS OF '2018-10-18T22:15:12.013Z';
SELECT * FROM people10m VERSION AS OF 123;

Python

df1 = spark.read.option("timestampAsOf", "2019-01-01").table("people10m")
df2 = spark.read.option("versionAsOf", 123).table("people10m")

また、@ 構文を使用して、タイムスタンプまたはバージョンをテーブル名の一部として指定することもできます。 タイムスタンプは yyyyMMddHHmmssSSS 形式である必要があります。 バージョンの前に @ を付加することで、v の後にバージョンを指定できます。 構文の例については、次のコードを参照してください。

SQL

SELECT * FROM people10m@20190101000000000
SELECT * FROM people10m@v123

Python

spark.read.table("people10m@20190101000000000")
spark.read.table("people10m@v123")

トランザクション ログ チェックポイントとは

テーブルのバージョンは、テーブル データと共に格納されるトランザクション ログ ディレクトリ内に JSON ファイルとして記録されます。 チェックポイント クエリを最適化するために、テーブルのバージョンは Parquet チェックポイント ファイルに集計されるため、テーブル履歴のすべての JSON バージョンを読み取る必要はありません。 Azure Databricks を使用すると、データ サイズとワークロードのチェックポイント処理の頻度が最適化されます。 ユーザーはチェックポイントを直接操作する必要はありません。 チェックポイント処理の頻度は、予告なく変更される場合があります。

タイム トラベル クエリのデータ保持を構成する

以前のテーブル バージョンに対してクエリを実行するには、そのバージョンのログとデータのファイルの "両方" を保持している必要があります。

データ ファイルは、テーブルに対して VACUUM を実行すると削除されます。 ログ ファイルの削除は、テーブルのバージョンをチェックポイント処理した後に自動的に管理されます。

ほとんどのテーブルはVACUUMが定期的に実行されるため、ポイントインタイムクエリはVACUUMの保持しきい値 (既定では7日間)を考慮する必要があります。

テーブルのデータ保持しきい値を増やすには、次のテーブル プロパティを構成する必要があります。

  • delta.logRetentionDuration = "interval <interval>": テーブルの履歴を保持する期間を制御します。 既定では、 interval 30 daysです。
  • delta.deletedFileRetentionDuration = "interval <interval>": 現在のテーブル バージョンで参照されなくなったデータ ファイルを削除するために VACUUM で使用するしきい値を決定します。 既定では、 interval 7 daysです。

テーブルの作成時にテーブルのプロパティを指定することも、 ALTER TABLE ステートメントで設定することもできます。 テーブル のプロパティのリファレンスを参照してください

注意

Databricks Runtime 18.0 以降では、 logRetentionDurationdeletedFileRetentionDuration以上である必要があります。 Unity カタログのマネージド テーブルの場合、これは Databricks Runtime 12.2 以降に適用されます。

30 日間の履歴データにアクセスするには、 delta.deletedFileRetentionDuration = "interval 30 days" ( delta.logRetentionDurationの既定の設定と一致) を設定します。

データ保持のしきい値を大きくすると、より多くのデータ ファイルが維持されるため、ストレージ コストが増加する可能性があります。

テーブルを以前の状態に復元する

RESTORE コマンドを使用して、テーブルを以前の状態に復元できます。 テーブルは、以前の状態に復元できる履歴バージョンを内部的に保持します。 RESTORE コマンドのオプションとして、以前の状態に対応するバージョンまたは以前の状態が作成されたときのタイムスタンプがサポートされています。

重要

  • 既に復元されたテーブルを復元できます。
  • 複製されたテーブルを復元できます。
  • 復元するテーブルに対する MODIFY アクセス許可が必要です。
  • 手動または vacuum によってデータ ファイルが削除されている場合、以前のバージョンにテーブルを復元することはできません。 それでも、spark.sql.files.ignoreMissingFilestrue に設定されている場合は、このバージョンに部分的に復元できます。
  • 以前の状態に復元するためのタイムスタンプ形式は yyyy-MM-dd HH:mm:ss です。 日付 (yyyy-MM-dd) 文字列のみを指定することもできます。
RESTORE TABLE target_table TO VERSION AS OF <version>;
RESTORE TABLE target_table TO TIMESTAMP AS OF <timestamp>;

構文の詳細については、RESTOREを参照してください。

重要

復元は、データ変更操作と認識されます。 RESTORE コマンドによって追加されたログ エントリには、dataChange が true に設定されています。 テーブルへの更新を処理する 構造化ストリーミング ジョブなどのダウンストリーム アプリケーションがある場合、復元操作によって追加されたデータ変更ログ エントリは新しいデータ更新と見なされ、それらの処理によってデータが重複する可能性があります。

次に例を示します。

テーブルのバージョン 操作 ログの更新 データ変更ログの更新のレコード
0 INSERT AddFile(/path/to/file-1, dataChange = true) (name = Viktor、age = 29、(name = George、age = 55)
1 INSERT AddFile(/path/to/file-2, dataChange = true) (name = George、age = 39)
2 OPTIMIZE ファイルを追加(AddFile):/path/to/file-3(データ変更 = なし)、ファイルを削除(RemoveFile):/path/to/file-1、ファイルを削除(RemoveFile):/path/to/file-2 (最適化圧縮はテーブル内のデータを変更しないため、レコードはありません)
3 RESTORE(version=1) ファイルを削除する(/path/to/file-3)、ファイルを追加する(/path/to/file-1, dataChange = true)、ファイルを追加する(/path/to/file-2, dataChange = true) (name = Viktor、age = 29)、(name = George、age = 55)、(name = George、age = 39)

前の例では、 RESTORE コマンドを実行すると、テーブル バージョン 0 と 1 の読み取り時に既に表示されていた更新が行われます。 ストリーミング クエリでこのテーブルが読み取られていた場合、これらのファイルは新しく追加されたデータと見なされ、再び処理されます。

復元のメトリック

RESTORE 操作が完了すると、次のメトリックが単一行データフレームとして報告されます。

  • table_size_after_restore: 復元後のテーブルのサイズ。

  • num_of_files_after_restore: 復元後のテーブル内のファイルの数。

  • num_removed_files: テーブルから削除された (論理的に削除された) ファイルの数。

  • num_restored_files: ロールバックによって復元されたファイルの数。

  • removed_files_size: テーブルから削除されたファイルの合計サイズ (バイト単位)。

  • restored_files_size: 復元されたファイルの合計サイズ (バイト単位)。

    復元のメトリックのサンプル

タイム トラベルの使用例

  • ユーザー 111 のテーブルの誤った削除を修正する:

    INSERT INTO my_table
      SELECT * FROM my_table TIMESTAMP AS OF date_sub(current_date(), 1)
      WHERE userId = 111
    
  • テーブルの誤った更新を修正する:

    MERGE INTO my_table target
      USING my_table TIMESTAMP AS OF date_sub(current_date(), 1) source
      ON source.userId = target.userId
      WHEN MATCHED THEN UPDATE SET *
    
  • 先週に追加された新しい顧客の数を照会する:

    SELECT
    (
      SELECT count(distinct userId)
      FROM my_table
    )
    -
    (
      SELECT count(distinct userId)
      FROM my_table TIMESTAMP AS OF date_sub(current_date(), 7)
    ) AS new_customers
    

Spark セッションで最後のコミットのバージョンを見つける方法

すべてのスレッドとすべてのテーブルの全体で、現在の SparkSession によって書き込まれた最後のコミットのバージョン番号を取得するには、SQL 構成 spark.databricks.delta.lastCommitVersionInSession のクエリを実行します。

注意

Apache Iceberg テーブルの場合は、spark.databricks.iceberg.lastCommitVersionInSessionではなくspark.databricks.delta.lastCommitVersionInSessionを使用します。

SQL

SET spark.databricks.delta.lastCommitVersionInSession

Python

spark.conf.get("spark.databricks.delta.lastCommitVersionInSession")

スカラ (プログラミング言語)

spark.conf.get("spark.databricks.delta.lastCommitVersionInSession")

SparkSession によるコミットが行われていない場合は、キーのクエリを実行すると空の値が返されます。

注意

複数のスレッド間で同じ SparkSession を共有する場合は、複数のスレッド間で変数を共有するのと似ています。構成値が同時に更新されるため、競合状態に達する可能性があります。