Azure OpenAI サービスを使用すると、入力候補 API を求めることで、多くの自然言語タスクを解決できます。 プロンプト ワークフローをいくつかの例から大規模なデータセットに簡単にスケーリングできるように、Azure OpenAI サービスは分散機械学習ライブラリ SynapseML と統合します。 この統合を使用すると、 Apache Spark 分散コンピューティング フレームワークを使用して、OpenAI サービスで何百万ものプロンプトを処理できます。 このチュートリアルでは、Azure OpenAI と Microsoft Fabric を使用して、大規模な言語モデルを分散スケールで適用する方法について説明します。
前提条件
このクイック スタートの主な前提条件には、OpenAI リソースAzure作業と、SynapseML がインストールされた Apache Spark クラスターが含まれます。
Microsoft Fabric サブスクリプションを取得します。 または、無料のMicrosoft Fabric試用版にサインアップします。
Microsoft Fabric にサインインします。
ホーム ページの左下にあるエクスペリエンス スイッチャーを使用して Fabric に切り替えます。
- Microsoft Fabricのデータ サイエンス エクスペリエンスに移動します。
- 新しいノートブックを作成します。
- Azure OpenAI リソース - リソースを作成します
このガイドをノートブックとしてインポートする
次の手順では、このコードを Spark クラスターに追加します。 Spark プラットフォームでノートブックを作成し、このノートブックにコードをコピーしてデモを実行できます。 または、ノートブックをダウンロードして Synapse Analytics にインポートします。
- このデモをノートブックとしてダウンロードします ([未加工] を選択してからファイルを保存します)
- ノートブックを Synapse ワークスペースにインポートするか、Fabric を使用している場合は、Fabric ワークスペースにインポートします
- クラスターに SynapseML をインストールします。 SynapseML Web サイトの下部にある Synapse のインストール手順を参照してください。 Fabric を使用している場合は、 インストール ガイドを確認します。 この手順では、インポートしたノートブックの上部に追加のセルを貼り付ける必要があります。
- ノートブックをクラスターに接続して手順を進め、セルを編集して実行します。
サービス情報を入力する
次に、ノートブック内のセルを編集して、サービスを指定します。 OpenAI サービスに一致するように、 service_name、 deployment_name、 location、および key 変数を設定します。
import os
from pyspark.sql import SparkSession
from synapse.ml.core.platform import running_on_synapse, find_secret
# Bootstrap Spark Session
spark = SparkSession.builder.getOrCreate()
if running_on_synapse():
from notebookutils.visualization import display
# Fill in the following lines with your service information
# Learn more about selecting which embedding model to choose: https://openai.com/blog/new-and-improved-embedding-model
service_name = "synapseml-openai"
deployment_name = "gpt-4.1-mini"
deployment_name_embeddings = "text-embedding-3-small"
key = find_secret(
"openai-api-key"
) # please replace this line with your key as a string
assert key is not None and service_name is not None
プロンプトのデータセットを作成する
次に、一連の行で構成され、行ごとに 1 つのプロンプトがあるデータフレームを作成します。
ADLS またはその他のデータベースから直接データを読み込むこともできます。 Spark データフレームの読み込みと準備の詳細については、Apache Spark データ読み込みガイドを参照してください。
df = spark.createDataFrame(
[
("Hello my name is",),
("The best code is code that's",),
("SynapseML is ",),
]
).toDF("prompt")
OpenAIPrompt Apache Spark クライアントを作成する
Azure OpenAI サービスをデータフレームに適用するには、分散クライアントとして機能する OpenAIPrompt オブジェクトを作成します。
OpenAIPrompt オブジェクトの適切なセッターを使用して、単一の値またはデータフレーム列を使用してサービス パラメーターを設定します。 この例では、 maxTokens を 200 に設定します。 トークンは約 4 文字で、この制限はプロンプトと結果の合計に適用されます。 データフレームの prompt 列の名前を使用して、 promptCol パラメーターを設定します。
from synapse.ml.services.openai import OpenAIPrompt
completion = (
OpenAIPrompt()
.setSubscriptionKey(key)
.setDeploymentName(deployment_name)
.setCustomServiceName(service_name)
.setMaxTokens(200)
.setPromptCol("prompt")
.setErrorCol("error")
.setOutputCol("completions")
)
OpenAIPrompt クライアントを使用してデータフレームを変換する
データフレームとプロンプト クライアントを作成したら、入力データセットを変換し、サービスが追加するすべての情報を含む completions という名前の列を追加します。 わかりやすくするために、テキストのみを選択します。
from pyspark.sql.functions import col
completed_df = completion.transform(df).cache()
display(
completed_df.select(
col("prompt"),
col("error"),
col("completions.choices.text").getItem(0).alias("text"),
)
)
出力は次のようになります。 完了テキストはサンプルとは異なります。
| プロンプト | error | text |
|---|---|---|
| Hello my name is (こんにちは、私の名前は) | null 値 | マカヴェリ18歳で、カリフォルニア州ロサンゼルス出身の音楽を書いて作るのが大好きで、大きくなったらラッパーになりたい |
| 最適なコードは、次のコードです。 | null 値 | 理解可能これは主観的な声明であり、決定的な答えはありません。 |
| SynapseML は、次世代のAI機能を提供するためのライブラリです。 | null 値 | イベントの将来の結果を予測する方法を学習できる機械学習アルゴリズム。 |
使用例の詳細
テキスト埋め込みの生成
テキストを完成させるだけでなく、ダウンストリーム アルゴリズムやベクター取得アーキテクチャで使用するテキストを埋め込むこともできます。 埋め込みを作成することで、大規模なコレクションからドキュメントを検索および取得できます。 この方法は、プロンプト エンジニアリングがタスクに対して十分でない場合に使用します。
OpenAIEmbeddingの使用方法の詳細については、埋め込みガイドを参照してください。
from synapse.ml.services.openai import OpenAIEmbedding
embedding = (
OpenAIEmbedding()
.setSubscriptionKey(key)
.setDeploymentName(deployment_name_embeddings)
.setCustomServiceName(service_name)
.setTextCol("prompt")
.setErrorCol("error")
.setOutputCol("embeddings")
)
display(embedding.transform(df))
チャット完了
GPT-4o や GPT-4.1 などのモデルでは、1 つのプロンプトではなくチャットを理解します。
OpenAIChatCompletion トランスフォーマーが、この機能を大規模に公開します。
from synapse.ml.services.openai import OpenAIChatCompletion
from pyspark.sql import Row
from pyspark.sql.types import *
def make_message(role, content):
return Row(role=role, content=content, name=role)
chat_df = spark.createDataFrame(
[
(
[
make_message(
"system", "You are an AI chatbot with red as your favorite color"
),
make_message("user", "What's your favorite color"),
],
),
(
[
make_message("system", "You are very excited"),
make_message("user", "How are you today"),
],
),
]
).toDF("messages")
chat_completion = (
OpenAIChatCompletion()
.setSubscriptionKey(key)
.setDeploymentName(deployment_name)
.setCustomServiceName(service_name)
.setMessagesCol("messages")
.setErrorCol("error")
.setOutputCol("chat_completions")
)
display(
chat_completion.transform(chat_df).select(
"messages", "chat_completions.choices.message.content"
)
)
要求のバッチ処理を使用してスループットを向上させる
この例では、プロンプトごとに 1 つずつ、サービスに対して複数の要求を行います。 1 つの要求で複数のプロンプトを完了すために、バッチ モードを使用します。 まず、 OpenAIPrompt オブジェクトで、Prompt 列を "Prompt" に設定する代わりに、BatchPrompt 列に "batchPrompt" を指定します。
そのためには、行ごとにプロンプトの一覧を含むデータフレームを作成します。
batch_df = spark.createDataFrame(
[
(["The time has come", "Pleased to", "Today stocks", "Here's to"],),
(["The only thing", "Ask not what", "Every litter", "I am"],),
]
).toDF("batchPrompt")
次に、 OpenAIPrompt オブジェクトを作成します。 prompt 列を設定するのではなく、列の型が Array[String] の場合は、batchPrompt 列を設定します。
batch_completion = (
OpenAIPrompt()
.setSubscriptionKey(key)
.setDeploymentName(deployment_name)
.setCustomServiceName(service_name)
.setMaxTokens(200)
.setBatchPromptCol("batchPrompt")
.setErrorCol("error")
.setOutputCol("completions")
)
変換の呼び出しでは、行ごとに要求が行われます。 各行に複数のプロンプトが含まれるため、各要求はその行のすべてのプロンプトを送信します。 結果には、要求ごとに1行が含まれます。
completed_batch_df = batch_completion.transform(batch_df).cache()
display(completed_batch_df)
自動ミニバッチャーを使用する
データが列形式の場合は、SynapseML の FixedMiniBatcherTransformerを使用して、データを行形式に入れ替えることができます。
from pyspark.sql.types import StringType
from synapse.ml.stages import FixedMiniBatchTransformer
from synapse.ml.core.spark import FluentAPI
completed_autobatch_df = (
df.coalesce(
1
) # Force a single partition so that our little 4-row dataframe makes a batch of size 4, you can remove this step for large datasets
.mlTransform(FixedMiniBatchTransformer(batchSize=4))
.withColumnRenamed("prompt", "batchPrompt")
.mlTransform(batch_completion)
)
display(completed_autobatch_df)
翻訳のための迅速なエンジニアリング
Azure OpenAI サービスは、prompt エンジニアリングを通じて、さまざまな自然言語タスクを解決できます。 次の例は、言語翻訳のプロンプトを示しています。
translate_df = spark.createDataFrame(
[
("Japanese: Ookina hako \nEnglish: Big box \nJapanese: Midori tako\nEnglish:",),
(
"French: Quel heure et il au Montreal? \nEnglish: What time is it in Montreal? \nFrench: Ou est le poulet? \nEnglish:",
),
]
).toDF("prompt")
display(completion.transform(translate_df))
質問の回答を求めるプロンプト
この例では、一般的な知識に関する質問への回答をモデルに求めます。
qa_df = spark.createDataFrame(
[
(
"Q: Where is the Grand Canyon?\nA: The Grand Canyon is in Arizona.\n\nQ: What is the weight of the Burj Khalifa in kilograms?\nA:",
)
]
).toDF("prompt")
display(completion.transform(qa_df))