次の方法で共有


型の拡大

重要

この機能は、Databricks Runtime 15.4 LTS 以降の パブリック プレビュー 段階にあります。

型拡大が有効になっているテーブルを使用すると、基になるデータ ファイルを書き換えることなく、列データ型をより広い型に変更できます。 列の型を手動で変更するか、スキーマの進化を使用して列型を進化させることができます。

重要

型拡大は、Databricks Runtime 15.4 LTS 以降で使用できます。 型拡大が有効になっているテーブルは、Databricks Runtime 15.4 LTS 以降でのみ読み取ることができます。

型の拡大には Delta Lake が必要です。 すべての Unity カタログ マネージド テーブルでは、既定で Delta Lake が使用されます。

サポートされている型の変更

以下の規則に従って、型を拡大することができます。

ソースの種類 サポートされるより拡大された型
BYTE SHORTINTBIGINTDECIMALDOUBLE
SHORT intBIGINTDECIMALDOUBLE
INT BIGINTDECIMALDOUBLE
BIGINT DECIMAL
FLOAT DOUBLE
DECIMAL DECIMAL より高い精度と規模で
DATE TIMESTAMP_NTZ

構造体、マップ、配列の中で入れ子になったフィールドおよび最上位レベルの列に対する型の変更がサポートされています。

重要

操作によって整数型が decimal または double に昇格され、ダウンストリーム インジェストによって値が整数列に書き戻されると、Spark は既定で値の小数部を切り捨てます。 割り当てポリシーの動作の詳細については、「 ストアの割り当て」を参照してください。

何らかの数値型を decimal に変更する場合、合計の有効桁数は元の有効桁数以上となる必要があります。 スケールも大きくする場合は、合計有効桁数を対応する分だけ増やす必要があります。

byteshortint 型の最小のターゲットは、decimal(10,0) です。 long の最小ターゲットは decimal(20,0) です。

decimal(10,1) を持つフィールドに 2 個の桁を追加したい場合、最小のターゲットは decimal(12,3) になります。

型の拡大を有効にする

既存のテーブルで型拡大を有効にするには、 delta.enableTypeWidening テーブルプロパティを trueに設定します。

  ALTER TABLE <table_name> SET TBLPROPERTIES ('delta.enableTypeWidening' = 'true')

テーブルの作成時に型拡大を有効にすることもできます。

  CREATE TABLE T(c1 INT) TBLPROPERTIES('delta.enableTypeWidening' = 'true')

重要

型拡大を有効にすると、テーブル機能 typeWideningが設定され、リーダー プロトコルとライター プロトコルがアップグレードされます。 型拡大が有効になっているテーブルを操作するには、Databricks Runtime 15.4 以降を使用する必要があります。 外部クライアントもテーブルと対話する場合は、このテーブル機能がサポートされていることを確認します。 Delta Lake の機能の互換性とプロトコルに関する記事を参照してください。

型の変更を手動で適用する

以下のように、ALTER COLUMN コマンドを使用して、型を手動で変更します。

ALTER TABLE <table_name> ALTER COLUMN <col_name> TYPE <new_type>

この操作では、基になるデータ ファイルを書き換えずにテーブル スキーマを更新します。 詳細については、ALTER TABLE を参照してください。

スキーマの自動進化を使用して型を拡大する

スキーマの進化は、入力データの種類と一致するようにターゲット テーブルのデータ型を更新する拡大型と連携します。

型拡大が有効になっていない場合、スキーマの進化は常に、ターゲット テーブル内の列型と一致するようにデータをダウンキャストしようとします。 ターゲット テーブルのデータ型を自動的に拡大しない場合は、スキーマの進化を有効にしてワークロードを実行する前に、型の拡大を無効にします。

インジェスト中にスキーマの進化を使用して列のデータ型を拡大するには、次の条件を満たす必要があります。

  • 書き込みコマンドは、スキーマの自動進化を有効にして実行されます。
  • ターゲット テーブルで、型拡大が有効になっています。
  • ソース列の種類は、ターゲット列の種類よりも広くなります。
  • 型の拡大が型の変更をサポートしている。

これらの条件をすべて満たしていない型の不一致は、通常のスキーマ適用規則に従います。 「スキーマの適用」を参照してください。

次の例は、一般的な書き込み操作のスキーマの進化に対する型拡大のしくみを示しています。

Python

# Create target table with INT column and source table with BIGINT column
spark.sql("CREATE TABLE target_table (id INT, data STRING) TBLPROPERTIES ('delta.enableTypeWidening' = 'true')")
spark.sql("CREATE TABLE source_table (id BIGINT, data STRING)")

# Example 1: Automatic type widening in saveAsTable()
spark.table("source_table").write.mode("append").option("mergeSchema", "true").saveAsTable("target_table")

# Example 2: Automatic type widening in MERGE INTO
from delta.tables import DeltaTable

source_df = spark.table("source_table")
target_table = DeltaTable.forName(spark, "target_table")

(target_table.alias("target")
  .merge(source_df.alias("source"), "target.id = source.id")
  .withSchemaEvolution()
  .whenMatchedUpdateAll()
  .whenNotMatchedInsertAll()
  .execute()
)

スカラ (プログラミング言語)

// Create target table with INT column and source table with BIGINT column
spark.sql("CREATE TABLE target_table (id INT, data STRING) TBLPROPERTIES ('delta.enableTypeWidening' = 'true')")
spark.sql("CREATE TABLE source_table (id BIGINT, data STRING)")

// Example 1: Automatic type widening in saveAsTable()
spark.table("source_table").write.mode("append").option("mergeSchema", "true").saveAsTable("target_table")

// Example 2: Automatic type widening in MERGE INTO
import io.delta.tables.DeltaTable

val sourceDf = spark.table("source_table")
val targetTable = DeltaTable.forName(spark, "target_table")

targetTable.alias("target")
  .merge(sourceDf.alias("source"), "target.id = source.id")
  .withSchemaEvolution()
  .whenMatched().updateAll()
  .whenNotMatched().insertAll()
  .execute()

SQL

-- Create target table with INT column and source table with BIGINT column
CREATE TABLE target_table (id INT, data STRING) TBLPROPERTIES ('delta.enableTypeWidening' = 'true');
CREATE TABLE source_table (id BIGINT, data STRING);

-- Example 1: Automatic type widening in INSERT INTO
---- Enable schema evolution
SET spark.databricks.delta.schema.autoMerge.enabled = true;
---- Insert data with BIGINT value column - automatically widens INT to BIGINT
INSERT INTO target_table SELECT * FROM source_table;

-- Example 2: Automatic type widening in MERGE INTO
MERGE WITH SCHEMA EVOLUTION INTO target_table
USING source_table
ON target_table.id = source_table.id
WHEN MATCHED THEN UPDATE SET *
WHEN NOT MATCHED THEN INSERT *;

タイプ拡大テーブル フィーチャを無効にする

プロパティを false に設定することで、有効なテーブルで誤って型拡大を防ぐことができます。

  ALTER TABLE <table_name> SET TBLPROPERTIES ('delta.enableTypeWidening' = 'false')

この設定では、テーブルに対する将来の型の変更を防ぐことができますが、拡大テーブルの型機能を削除したり、以前の型の変更を元に戻したりすることはありません。

タイプ拡大テーブル フィーチャを完全に削除する必要がある場合は、次の例に示すように DROP FEATURE コマンドを使用できます。

 ALTER TABLE <table-name> DROP FEATURE 'typeWidening' [TRUNCATE HISTORY]

Databricks Runtime 15.4 LTS を使用して型拡大を有効にしたテーブルでは、代わりに機能 typeWidening-preview を削除する必要があります。

型拡大を削除すると、Databricks は現在のテーブル スキーマに準拠していないすべてのデータ ファイルを書き換えます。 Delta Lake テーブル機能の削除とテーブル プロトコルのダウングレードに関する記事を参照してください。

Delta テーブルからのストリーミング

構造化ストリーミングでの型拡大のサポートは、Databricks Runtime 16.4 LTS 以降で利用できます。

型拡大が有効になっている Delta テーブルからストリーミングする場合は、ターゲット テーブルの mergeSchema オプションを使用してスキーマの進化を有効にすることで、ストリーミング クエリの自動型拡大を構成できます。 ターゲット テーブルでは、型拡大が有効になっている必要があります。 「 タイプ拡大を有効にする」を参照してください。

Python

(spark.readStream
  .table("delta_source_table")
  .writeStream
  .option("checkpointLocation", "/path/to/checkpointLocation")
  .option("mergeSchema", "true")
  .toTable("output_table")
)

スカラ (プログラミング言語)

spark.readStream
  .table("delta_source_table")
  .writeStream
  .option("checkpointLocation", "/path/to/checkpointLocation")
  .option("mergeSchema", "true")
  .toTable("output_table")

mergeSchemaが有効で、ターゲット テーブルで型拡大が有効になっている場合:

  • 型の変更は、手動による介入を必要とせずに、ダウンストリーム テーブルに自動的に適用されます。
  • 新しい列は、ダウンストリーム テーブル スキーマに自動的に追加されます。

mergeSchemaが有効になっていない場合、値はspark.sql.storeAssignmentPolicy構成に従って処理されます。既定では、ターゲット列の種類と一致するように値がダウンキャストされます。 割り当てポリシーの動作の詳細については、「 ストアの割り当て」を参照してください。

手動によるタイプ変更の承認

Delta テーブルからストリーミングする場合は、スキーマ追跡の場所を指定して、型の変更を含む非加法スキーマの変更を追跡できます。 Databricks Runtime 18.0 以降ではスキーマ追跡の場所を指定する必要があり、Databricks Runtime 18.1 以降では省略可能です。

Python

checkpoint_path = "/path/to/checkpointLocation"

(spark.readStream
  .option("schemaTrackingLocation", checkpoint_path)
  .table("delta_source_table")
  .writeStream
  .option("checkpointLocation", checkpoint_path)
  .toTable("output_table")
)

スカラ (プログラミング言語)

val checkpointPath = "/path/to/checkpointLocation"

spark.readStream
  .option("schemaTrackingLocation", checkpointPath)
  .table("delta_source_table")
  .writeStream
  .option("checkpointLocation", checkpointPath)
  .toTable("output_table")

スキーマ追跡の場所を指定した後、ストリームは、型の変更を検出してから停止すると、追跡対象のスキーマを進化させます。 その時点で、ダウンストリーム テーブルで型の拡大を有効にしたり、ストリーミング クエリを更新したりするなど、型の変更を処理するために必要なアクションを実行できます。

処理を再開するには、Spark 構成 spark.databricks.delta.streaming.allowSourceColumnTypeChange または DataFrame リーダー オプションを allowSourceColumnTypeChange設定します。

Python

checkpoint_path = "/path/to/checkpointLocation"

(spark.readStream
  .option("schemaTrackingLocation", checkpoint_path)
  .option("allowSourceColumnTypeChange", "<delta_source_table_version>")
  # alternatively to allow all future type changes for this stream:
  # .option("allowSourceColumnTypeChange", "always")
  .table("delta_source_table")
  .writeStream
  .option("checkpointLocation", checkpoint_path)
  .toTable("output_table")
)

スカラ (プログラミング言語)

val checkpointPath = "/path/to/checkpointLocation"

spark.readStream
  .option("schemaTrackingLocation", checkpointPath)
  .option("allowSourceColumnTypeChange", "<delta_source_table_version>")
  // alternatively to allow all future type changes for this stream:
  // .option("allowSourceColumnTypeChange", "always")
  .table("delta_source_table")
  .writeStream
  .option("checkpointLocation", checkpointPath)
  .toTable("output_table")

SQL

  -- To unblock for this particular stream just for this series of schema change(s):
  SET spark.databricks.delta.streaming.allowSourceColumnTypeChange.ckpt_<checkpoint_id> = "<delta_source_table_version>"
  -- To unblock for this particular stream:
  SET spark.databricks.delta.streaming.allowSourceColumnTypeChange = "<delta_source_table_version>"
  -- To unblock for all streams:
  SET spark.databricks.delta.streaming.allowSourceColumnTypeChange = "always"

ストリームが停止すると、チェックポイント ID <checkpoint_id> と Delta Lake ソース テーブルのバージョン <delta_source_table_version> がエラー メッセージに表示されます。

Lakeflow Spark 宣言型パイプライン

Lakeflow Spark 宣言パイプラインでの型拡大のサポートは、プレビュー チャネルで利用できます。

Lakeflow Spark 宣言型パイプラインの型拡大は、パイプライン レベルまたは個々のテーブルで有効にすることができます。 型拡大では、ストリーミング テーブルの完全な更新を必要とせずに、パイプラインの実行中に列の型を自動的に拡大できます。 具体化されたビューでの型の変更は常に完全な再計算をトリガーし、型の変更がソース テーブルに適用されると、そのテーブルに依存する具体化されたビューでは、新しい型を反映するために完全な再計算が必要になります。

パイプライン全体の型拡大を有効にする

パイプライン内のすべてのテーブルに対して型拡大を有効にするには、パイプライン構成を pipelines.enableTypeWidening設定します。

JSON

{
  "configuration": {
    "pipelines.enableTypeWidening": "true"
  }
}

YAML

configuration:
  pipelines.enableTypeWidening: 'true'

特定のテーブルの型拡大を有効にする

また、テーブル プロパティを delta.enableTypeWidening設定して、個々のテーブルに対して型拡大を有効にすることもできます。

Python

import dlt

@dlt.table(
  table_properties={"delta.enableTypeWidening": "true"}
)
def my_table():
  return spark.readStream.table("source_table")

SQL

CREATE OR REFRESH STREAMING TABLE my_table
TBLPROPERTIES ('delta.enableTypeWidening' = 'true')
AS SELECT * FROM source_table

ダウンストリーム リーダーとの互換性

型拡大が有効になっているテーブルは、Databricks Runtime 15.4 LTS 以降でのみ読み取ることができます。 パイプラインで型拡大が有効になっているテーブルを Databricks Runtime 14.3 以下のリーダーが読み取り可能にする場合は、次のいずれかを行う必要があります。

  • プロパティ delta.enableTypeWidening/pipelines.enableTypeWidening を削除するか false に設定して、型拡大を無効にし、テーブルの完全な更新をトリガーします。
  • テーブルで 互換モード を有効にします。

デルタ・シェアリング

デルタ共有での型拡大のサポートは、Databricks Runtime 16.1 以降で利用できます。

Databricks 間での Delta Sharing において、型の拡張が有効となった Delta Lake テーブルの共有がサポートされています。 プロバイダーと受信者は、Databricks Runtime 16.1 以降である必要があります。

Delta Sharing を使用して型拡大が有効になっている Delta Lake テーブルから変更データ フィードを読み取る場合は、応答形式を delta に設定する必要があります。

spark.read
  .format("deltaSharing")
  .option("responseFormat", "delta")
  .option("readChangeFeed", "true")
  .option("startingVersion", "<start version>")
  .option("endingVersion", "<end version>")
  .load("<table>")

型の変更間での変更データ フィードの読み取りはサポートされていません。 代わりに、操作を 2 つの個別の読み取りに分割する必要があります。1 つは型の変更を含むテーブル バージョンで終わり、もう 1 つは型の変更を含むバージョンから始まります。

制限事項

Apache Iceberg の互換性

Apache Iceberg では、型の拡大によってカバーされるすべての型変更がサポートされるわけではありません。 Iceberg スキーマの進化に関するページを参照してください。 特に、Azure Databricks では、次の種類の変更はサポートされていません。

  • byteshortintlongまたはdecimaldouble
  • 10進法スケールの拡大
  • date から timestampNTZ

Delta Lake テーブルで Iceberg との互換性を持つ UniForm を有効にすると、これらの種類の変更のいずれかを適用するとエラーが発生します。 デルタテーブルをIcebergクライアントで読む方法については、を参照してください。

これらのサポートされていない型の変更のいずれかを Delta Lake テーブルに適用する場合は、次の 2 つのオプションがあります。

  1. Iceberg メタデータを再生成する: 次のコマンドを使用して、タイプ拡大テーブル機能なしで Iceberg メタデータを再生成します。

    ALTER TABLE <table-name> SET TBLPROPERTIES ('delta.universalFormat.config.icebergCompatVersion' = '<version>')
    

    これにより、互換性のない型の変更を適用した後も、均一な互換性を維持できます。

  2. タイプ 拡大テーブル フィーチャを削除する: タイプ 拡大テーブル フィーチャを無効にするを参照してください。

型依存関数

一部の SQL 関数は、入力データ型に依存する結果を返します。 たとえば、 hash 関数は 、引数の型が異なる場合、同じ論理値に対して異なるハッシュ値を返します。 hash(1::INT)hash(1::BIGINT)とは異なる結果を返します。

その他の型依存関数は、 xxhash64bit_getbit_reversetypeofです。

これらの関数を使用するクエリで安定した結果を得るには、値を目的の型に明示的にキャストします。

Python

spark.read.table("table_name") \
  .selectExpr("hash(CAST(column_name AS BIGINT))")

スカラ (プログラミング言語)

spark.read.table("main.johan_lasperas.dlt_type_widening_bronze2")
  .selectExpr("hash(CAST(a AS BIGINT))")

SQL

-- Use explicit casting for stable hash values
SELECT hash(CAST(column_name AS BIGINT)) FROM table_name

その他の制限事項

  • 種類が変更された Delta Lake テーブルからストリーミングする場合、SQL を使用してスキーマ追跡の場所を指定することはできません。
  • Delta Sharing を使用して、Databricks 以外のコンシューマーに対して型の幅を広げる機能がオンになっているテーブルを共有することはできません。