構造化ストリーミングは、Spark 上に構築されたスケーラブルでフォールト トレラントなストリーム処理エンジンです。 ライブ データ ストリームは、新しい行が継続的に追加されるテーブルとして扱われます。 Structured Streaming では、CSV、JSON、ORC、Parquet などの組み込みのファイル ソースと、Kafka や Azure Event Hubs などのメッセージング サービスがサポートされます。
この記事では、Azure Event Hubs などのストリーミング ソースの設定、Lakehouse Delta テーブルへのストリーミング データの取り込み、パーティション分割とイベント バッチ処理による書き込みパフォーマンスの最適化、運用環境でのストリーミング ジョブの確実な実行について説明します。
ストリーミング ソースを設定する
レイクハウスにデータをストリーミングするには、まずストリーミング ソースへの接続を構成します。 Azure Event Hubs は一般的な選択肢です。 Azure Event Hubs Connector for Apache Spark を使用して、Spark アプリケーションを Azure Event Hubs に接続します。
基本的な Event Hubs 構成には、Event Hubs 名前空間名、ハブ名、共有アクセス キー名、コンシューマー グループが必要です。
コンシューマー グループは、イベント ハブ全体のビューです。 コンシューマー グループを使用すると、複数の消費アプリケーションがそれぞれイベントストリームの個別のビューを持ち、独自のペースで独自のオフセットを使用してストリームを個別に読み取ります。
Event Hubs のパーティションを使用すると、大量のイベントを並列で処理できます。 1 つのプロセッサの 1 秒あたりのイベント処理容量は限られていますが、複数のプロセッサはパーティション間で並列に動作できます。
インジェスト率の低いパーティションが多すぎると、パーティション リーダーはデータのごく一部を処理し、最適ではない処理を引き起こします。 パーティションの理想的な数は、必要な処理速度によって異なります。 名前空間内のスループット ユニットの数を増やすと、同時実行リーダーが最大スループットを実現できるように、追加のパーティションが必要になる場合があります。
スループット シナリオに最適なパーティション数をテストします。 スループットが高いシナリオでは、通常、32 個以上のパーティションが使用されます。
ストリーミング シンクとしてのデルタテーブル
Delta Lake は、データ レイク ストレージの上に ACID (原子性、一貫性、分離性、持続性) トランザクションを提供するオープンソースストレージレイヤーです。 Fabric Data Engineering では、Delta Lake はアップサート、データ圧縮、タイム トラベル、スキーマの進化、オープン形式のストレージをサポートしています。
deltaを出力形式としてwriteStreamすると、ストリーミング データは Delta テーブルに直接流れます。 次の例では、Event Hubs から読み取り、メッセージ本文を解析し、Delta テーブルに書き込みます。
import pyspark.sql.functions as f
from pyspark.sql.types import *
df = (
spark.readStream
.format("eventhubs")
.options(**ehConf)
.load()
)
Schema = StructType([
StructField("<column_name_01>", StringType(), False),
StructField("<column_name_02>", StringType(), False),
StructField("<column_name_03>", DoubleType(), True),
StructField("<column_name_04>", LongType(), True),
StructField("<column_name_05>", LongType(), True),
])
rawData = (
df
.withColumn("bodyAsString", f.col("body").cast("string"))
.select(f.from_json("bodyAsString", Schema).alias("events"))
.select("events.*")
.writeStream
.format("delta")
.option("checkpointLocation", "Files/checkpoint")
.outputMode("append")
.toTable("deltaeventstable")
)
このコードでは、delta を出力形式として設定 format("delta") 、 outputMode("append") テーブルに新しい行のみを書き込み、ストリーミング されたデータをマネージド Delta テーブルに保持 toTable("deltaeventstable") 。
ストリーミング パフォーマンスを最適化する
基本的なストリーミング インジェストが機能したら、次のセクションの最適化手法を使用してスループットとファイルの編成を改善できます。
書き込み用のデータをパーティション分割する
スループットを最適化するには、データを効果的にパーティション分割します。 パーティション分割により、書き込みスループットとダウンストリーム クエリパフォーマンスの両方が向上します。 メモリ内、ディスク上、またはその両方でデータをパーティション分割できます。
ディスク上 — partitionBy() を使用して、列の値に基づいてデータをサブディレクトリに整理します。 最適なサイズのファイルを生成するカーディナリティの良い列を選択します。 小さなパーティションが多すぎたり、大きなパーティションが少なすぎたりする列は避けてください。
メモリ内 — repartition() または coalesce() を使用して、書き込む前にワーカー ノード間でデータを分散します。
-
repartition()は、フル シャッフルでパーティションを増減し、データを均等に分散します。 -
coalesce()では、パーティションのみが減少し、データ移動が最小限に抑えられます。
両方の方法を組み合わせると、高スループットのシナリオに適しています。 次の例では、メモリ内の 48 個のパーティション (使用可能な CPU コアと一致する) にデータを分割し、次にディスク上のパーティションを 2 つの列で分割します。
rawData = (
df
.withColumn("bodyAsString", f.col("body").cast("string"))
.select(f.from_json("bodyAsString", Schema).alias("events"))
.select("events.*")
.repartition(48)
.writeStream
.format("delta")
.option("checkpointLocation", "Files/checkpoint")
.outputMode("append")
.partitionBy("<column_name_01>", "<column_name_02>")
.toTable("deltaeventstable")
)
最適化された書き込みを使用する
手動によるパーティション分割の代わりに、書き込みの最適化によって、書き込み前にパーティションがマージまたは分割され、手動 repartition() や coalesce() 呼び出しなしでディスク スループットが最大化されます。 Spark 構成で有効にします。
spark.conf.set("spark.databricks.delta.optimizeWrite.enabled", True)
最適化された書き込みを有効にすると、コードから repartition() または coalesce() を削除し、Spark でパーティションのサイズ設定を処理できます。 ディスク レベルの編成には引き続き partitionBy() を使用できます。
トリガーを使用したバッチ イベント
書き込みパフォーマンスをさらに最適化するには、ディスクに書き込む前にイベントをバッチ処理します。 既定では、Spark は前のマイクロバッチが完了するとすぐに各マイクロバッチを処理します。 トリガー間隔を設定すると、一定期間にわたってデータが蓄積され、より少ない大きな操作で書き込まれます。 バッチが大きいほど、より大きなデルタファイルが生成され、小さいファイルにかかるオーバーヘッドが削減されます。
次の例では、1 分間隔でイベントを処理します。
rawData = (
df
.withColumn("bodyAsString", f.col("body").cast("string"))
.select(f.from_json("bodyAsString", Schema).alias("events"))
.select("events.*")
.writeStream
.format("delta")
.option("checkpointLocation", "Files/checkpoint")
.outputMode("append")
.partitionBy("<column_name_01>", "<column_name_02>")
.trigger(processingTime="1 minute")
.toTable("deltaeventstable")
)
受信データの量を分析し、Delta テーブルで適切なサイズの Parquet ファイルを生成する処理間隔を選択します。
運用環境でストリーミング ジョブを実行する
Spark ノートブックは、ストリーミング ロジックを開発およびテストするための効果的なツールです。 ただし、継続的に実行する必要がある運用ワークロードの場合は、代わりに Spark ジョブ定義を使用してください。 Spark ジョブ定義は、Spark クラスターで実行され、堅牢性と可用性を高める非対話型のコード指向タスクです。
ストリーミング ジョブを実行しているインフラストラクチャでは、ハードウェア障害やインフラストラクチャの修正プログラムの適用など、ジョブを停止する問題が発生する可能性があります。 再試行ポリシーは、予期しない停止時にジョブを自動的に再起動します。 Spark ジョブ定義で再試行ポリシーを構成して、ジョブを再起動する回数 (最大無限再試行回数) と再試行間隔を指定します。 再試行ポリシーが有効になっている場合、ストリーミング ジョブは明示的に停止するまで実行を続けます。
ファブリック監視ハブには、入力レート、プロセス レート、入力行、バッチ期間、操作期間などのメトリックを含む [構造化ストリーミング] タブが含まれています。