Compartir a través de


Tutorial: Creación de un motor de búsqueda personalizado y un sistema de respuesta a preguntas

En este tutorial, cargará facturas en Spark, extraerá datos estructurados con Azure Inteligencia de documentos, traducirá texto, enriquecerá con Azure OpenAI y escribirá resultados en un índice de Azure AI Search que puede consultar. El tutorial tarda aproximadamente 30 minutos en completarse.

Prerrequisitos

Configuración de dependencias

Importe paquetes y conéctese a los recursos de Azure usados en este flujo de trabajo.

import os
from pyspark.sql import SparkSession
from synapse.ml.core.platform import running_on_synapse, find_secret

# Bootstrap Spark Session
spark = SparkSession.builder.getOrCreate()

cognitive_key = find_secret("Foundry-resource-key") # replace with your Azure AI services key
cognitive_location = "eastus"

translator_key = find_secret("translator-key") # replace with your Azure Translator resource key
translator_location = "eastus"

search_key = find_secret("azure-search-key") # replace with your Azure AI Search key
search_service = "mmlspark-azure-search"
search_index = "form-demo-index-5"

openai_key = find_secret("openai-api-key") # replace with your Azure OpenAI key
openai_service_name = "synapseml-openai"
openai_deployment_name = "gpt-4o-mini"
openai_url = f"https://{openai_service_name}.openai.azure.com/"

Carga de datos en Spark

Este código carga algunos archivos externos de una cuenta de almacenamiento de Azure que se usa con fines de demostración. Los archivos son varias facturas y el código los carga en un marco de datos.

from pyspark.sql.functions import udf
from pyspark.sql.types import StringType


def blob_to_url(blob):
    [prefix, postfix] = blob.split("@")
    container = prefix.split("/")[-1]
    split_postfix = postfix.split("/")
    account = split_postfix[0]
    filepath = "/".join(split_postfix[1:])
    return "https://{}/{}/{}".format(account, container, filepath)


df2 = (
    spark.read.format("binaryFile")
    .load("wasbs://ignite2021@mmlsparkdemo.blob.core.windows.net/form_subset/*")
    .select("path")
    .limit(10)
    .select(udf(blob_to_url, StringType())("path").alias("url"))
    .cache()
)

display(df2)

Aplicación de la inteligencia de documentos

Este código carga el transformador AnalyzeInvoices y pasa una referencia a la trama de datos que contiene las facturas. Llama al modelo de factura preconstruido de la Inteligencia de Documentos de Azure.

from synapse.ml.cognitive import AnalyzeInvoices

analyzed_df = (
    AnalyzeInvoices()
    .setSubscriptionKey(cognitive_key)
    .setLocation(cognitive_location)
    .setImageUrlCol("url")
    .setOutputCol("invoices")
    .setErrorCol("errors")
    .setConcurrency(5)
    .transform(df2)
    .cache()
)

display(analyzed_df)

Simplificar la producción de inteligencia de documentos

El transformador FormOntologyLearner deduce una estructura tabular de la salida dinámica AnalyzeInvoices , organizándola en columnas y filas para un análisis más sencillo de bajada.

from synapse.ml.cognitive import FormOntologyLearner

organized_df = (
    FormOntologyLearner()
    .setInputCol("invoices")
    .setOutputCol("extracted")
    .fit(analyzed_df)
    .transform(analyzed_df)
    .select("url", "extracted.*")
    .cache()
)

display(organized_df)

Mediante un dataframe tabular, puede aplanar las tablas anidadas que se encuentran en los formularios utilizando SparkSQL.

from pyspark.sql.functions import explode, col

itemized_df = (
    organized_df.select("*", explode(col("Items")).alias("Item"))
    .drop("Items")
    .select("Item.*", "*")
    .drop("Item")
)

display(itemized_df)

Agregar traducciones

Este código carga Translate, un transformador que llama a la Azure Translator en el servicio Foundry Tools. El texto original, que se encuentra en inglés en la columna "Description", se traduce automáticamente a varios idiomas. Toda la salida se consolida en el array "output.translations".

from synapse.ml.cognitive import Translate

translated_df = (
    Translate()
    .setSubscriptionKey(translator_key)
    .setLocation(translator_location)
    .setTextCol("Description")
    .setErrorCol("TranslationError")
    .setOutputCol("output")
    .setToLanguage(["zh-Hans", "fr", "ru", "cy"])
    .setConcurrency(5)
    .transform(itemized_df)
    .withColumn("Translations", col("output.translations")[0])
    .drop("output", "TranslationError")
    .cache()
)

display(translated_df)

Traducción de productos a emojis con OpenAI

from synapse.ml.cognitive.openai import OpenAIPrompt
from pyspark.sql.functions import trim, split

emoji_template = """ 
  Your job is to translate item names into emoji. Do not add anything but the emoji and end the translation with a comma
  
  Two Ducks: 🦆🦆,
  Light Bulb: 💡,
  Three Peaches: 🍑🍑🍑,
  Two kitchen stoves: ♨️♨️,
  A red car: 🚗,
  A person and a cat: 🧍🐈,
  A {Description}: """

prompter = (
    OpenAIPrompt()
    .setSubscriptionKey(openai_key)
    .setDeploymentName(openai_deployment_name)
    .setUrl(openai_url)
    .setMaxTokens(5)
    .setPromptTemplate(emoji_template)
    .setErrorCol("error")
    .setOutputCol("Emoji")
)

emoji_df = (
    prompter.transform(translated_df)
    .withColumn("Emoji", trim(split(col("Emoji"), ",").getItem(0)))
    .drop("error", "prompt")
    .cache()
)
display(emoji_df.select("Description", "Emoji"))

Inferir continente de dirección del proveedor con OpenAI

continent_template = """
Which continent does the following address belong to? 

Pick one value from Europe, Australia, North America, South America, Asia, Africa, Antarctica. 

Dont respond with anything but one of the above. If you don't know the answer or cannot figure it out from the text, return None. End your answer with a comma.

Address: "6693 Ryan Rd, North Whales",
Continent: Europe,
Address: "6693 Ryan Rd",
Continent: None,
Address: "{VendorAddress}",
Continent:"""

continent_df = (
    prompter.setOutputCol("Continent")
    .setPromptTemplate(continent_template)
    .transform(emoji_df)
    .withColumn("Continent", trim(split(col("Continent"), ",").getItem(0)))
    .drop("error", "prompt")
    .cache()
)
display(continent_df.select("VendorAddress", "Continent"))

Creación de un índice de Azure AI Search para los formularios

from synapse.ml.cognitive import *
from pyspark.sql.functions import monotonically_increasing_id, lit

(
    continent_df.withColumn("DocID", monotonically_increasing_id().cast("string"))
    .withColumn("SearchAction", lit("upload"))
    .writeToAzureSearch(
        subscriptionKey=search_key,
        actionCol="SearchAction",
        serviceName=search_service,
        indexName=search_index,
        keyCol="DocID",
    )
)

Prueba de una consulta de búsqueda

import requests

search_url = "https://{}.search.windows.net/indexes/{}/docs/search?api-version=2024-07-01".format(
    search_service, search_index
)
requests.post(
    search_url, json={"search": "door"}, headers={"api-key": search_key}
).json()

Creación de un bot de chat que puede usar Azure AI Search como herramienta

import json
from openai import AzureOpenAI

client = AzureOpenAI(
    api_key=openai_key,
    api_version="2024-10-21",
    azure_endpoint=openai_url,
)

chat_context_prompt = f"""
You are a chatbot designed to answer questions with the help of a search engine that has the following information:

{continent_df.columns}

If you dont know the answer to a question say "I dont know". Do not lie or hallucinate information. Be brief. If you need to use the search engine to solve the please output a json in the form of {{"query": "example_query"}}
"""


def search_query_prompt(question):
    return f"""
Given the search engine above, what would you search for to answer the following question?

Question: "{question}"

Please output a json in the form of {{"query": "example_query"}}
"""


def search_result_prompt(query):
    search_results = requests.post(
        search_url, json={"search": query}, headers={"api-key": search_key}
    ).json()
    return f"""

You previously ran a search for "{query}" which returned the following results:

{search_results}

You should use the results to help you answer questions. If you dont know the answer to a question say "I dont know". Do not lie or hallucinate information. Be Brief and mention which query you used to solve the problem. 
"""


def prompt_gpt(messages):
    response = client.chat.completions.create(
        model=openai_deployment_name, messages=messages, max_tokens=None, top_p=0.95
    )
    return response.choices[0].message.content


def custom_chatbot(question):
    while True:
        try:
            query = json.loads(
                prompt_gpt(
                    [
                        {"role": "system", "content": chat_context_prompt},
                        {"role": "user", "content": search_query_prompt(question)},
                    ]
                )
            )["query"]

            return prompt_gpt(
                [
                    {"role": "system", "content": chat_context_prompt},
                    {"role": "system", "content": search_result_prompt(query)},
                    {"role": "user", "content": question},
                ]
            )
        except Exception as e:
            raise e

Formular una pregunta al bot de chat

custom_chatbot("What did Luke Diaz buy?")

Comprobación de los resultados

display(
    continent_df.where(col("CustomerName") == "Luke Diaz")
    .select("Description")
    .distinct()
)