Compartir a través de


Integración de Azure Functions con Azure Data Explorer mediante enlaces de entrada y salida (versión preliminar)

Importante

Este conector se puede usar en Inteligencia en tiempo real en Microsoft Fabric. Use las instrucciones de este artículo con las siguientes excepciones:

Azure Functions permite ejecutar código sin servidor en la nube según una programación o en respuesta a un evento. Mediante el uso de enlaces de entrada y salida de Azure Data Explorer para Azure Functions, puede integrar Azure Data Explorer en los flujos de trabajo para ingerir datos y ejecutar consultas en el clúster.

Requisitos previos

Pruebe la integración mediante el proyecto de ejemplo.

Cómo utilizar vinculaciones de Azure Data Explorer para Azure Functions

Para obtener información sobre cómo usar enlaces de Azure Data Explorer para Azure Functions, consulte los siguientes artículos:

Escenarios para usar enlaces de Azure Data Explorer para Azure Functions

En las secciones siguientes se describen algunos escenarios comunes para usar enlaces de Azure Data Explorer para Azure Functions.

Vinculaciones de entrada

Los enlaces de entrada ejecutan una consulta del lenguaje de consulta kusto (KQL) o una función KQL, opcionalmente con parámetros, y devuelven la salida a la función.

En las secciones siguientes se describe cómo usar enlaces de entrada en algunos escenarios comunes.

Escenario 1: un punto de conexión HTTP para consultar datos de un clúster

Use enlaces de entrada cuando necesite exponer datos de Azure Data Explorer a través de una API REST. En este escenario, se usa un desencadenador HTTP de Azure Functions para consultar datos en el clúster. El escenario es útil en situaciones en las que es necesario proporcionar acceso mediante programación a los datos de Azure Data Explorer para aplicaciones o servicios externos. Exponer los datos a través de una API REST permite a las aplicaciones consumir fácilmente los datos sin necesidad de que se conecten directamente al clúster.

El código define una función con un desencadenador HTTP y un enlace de entrada de Azure Data Explorer. El enlace de entrada especifica la consulta que se ejecutará en la tabla Productos de la base de datos productsdb. La función usa la columna productId como predicado que se pasa como parámetro.

{
    [FunctionName("GetProduct")]
    public static async Task<IActionResult> RunAsync(
        [HttpTrigger(AuthorizationLevel.User, "get", Route = "getproducts/{productId}")]
        HttpRequest req,
        [Kusto(Database:"productsdb" ,
        KqlCommand = "declare query_parameters (productId:long);Products | where ProductID == productId" ,
        KqlParameters = "@productId={productId}",
        Connection = "KustoConnectionString")]
        IAsyncEnumerable<Product> products)
    {
        IAsyncEnumerator<Product> enumerator = products.GetAsyncEnumerator();
        var productList = new List<Product>();
        while (await enumerator.MoveNextAsync())
        {
            productList.Add(enumerator.Current);
        }
        await enumerator.DisposeAsync();
        return new OkObjectResult(productList);
    }
}

Invoque la función de la siguiente manera:

curl https://myfunctionapp.azurewebsites.net/api/getproducts/1

Escenario 2: un desencadenador programado para exportar datos desde un clúster

El escenario siguiente es aplicable en situaciones en las que los datos deben exportarse según una programación basada en el tiempo.

El código define una función con un desencadenador de temporizador que exporta una agregación de datos de ventas de la base de datos productsdb a un archivo CSV en Azure Blob Storage.

public static async Task Run([TimerTrigger("0 0 1 * * *")] TimerInfo myTimer,
    [Kusto(ConnectionStringSetting = "KustoConnectionString",
            DatabaseName = "productsdb",
            Query = "ProductSales | where OrderDate >= ago(1d) | summarize Sales = sum(ProductSales) by ProductName | top 10 by Sales desc")] IEnumerable<dynamic> queryResults,
[Blob("salescontainer/productsblob.csv", FileAccess.Write, Connection = "BlobStorageConnection")] CloudBlockBlob outputBlob,
ILogger log)
{
    // Write the query results to a CSV file
    using (var stream = new MemoryStream())
    using (var writer = new StreamWriter(stream))
    using (var csv = new CsvWriter(writer, CultureInfo.InvariantCulture))
    {
        csv.WriteRecords(queryResults);
        writer.Flush();
        stream.Position = 0;
        await outputBlob.UploadFromStreamAsync(stream);
    }
}

Vinculaciones de salida

Las vinculaciones de salida toman una o varias filas y las insertan en una tabla de Azure Data Explorer.

En las secciones siguientes se describe cómo usar enlaces de salida en algunos escenarios comunes.

Escenario 1: punto de conexión HTTP para ingerir datos en un clúster

Use este escenario cuando necesite procesar solicitudes HTTP entrantes e ingerir los datos en el clúster. Mediante un vínculo de salida, puede escribir los datos recibidos de la solicitud en tablas de Azure Data Explorer.

El código define una función con un desencadenador HTTP y un enlace de salida de Azure Data Explorer. Esta función toma una carga JSON en el cuerpo de la solicitud HTTP y la escribe en la tabla productos de la base de datos productsdb.

public static IActionResult Run(
    [HttpTrigger(AuthorizationLevel.Anonymous, "post", Route = "addproductuni")]
    HttpRequest req, ILogger log,
    [Kusto(Database:"productsdb" ,
    TableName ="products" ,
    Connection = "KustoConnectionString")] out Product product)
{
    log.LogInformation($"AddProduct function started");
    string body = new StreamReader(req.Body).ReadToEnd();
    product = JsonConvert.DeserializeObject<Product>(body);
    string productString = string.Format(CultureInfo.InvariantCulture, "(Name:{0} ID:{1} Cost:{2})",
                product.Name, product.ProductID, product.Cost);
    log.LogInformation("Ingested product {}", productString);
    return new CreatedResult($"/api/addproductuni", product);
}

Invoque la función de la siguiente manera:

curl -X POST https://myfunctionapp.azurewebsites.net/api/addproductuni -d '{"Name":"Product1","ProductID":1,"Cost":100,"ActivatedOn":"2023-01-02T00:00:00"}'

Escenario 2: Ingesta de datos de RabbitMQ u otros sistemas de mensajería compatibles con Azure

Use este escenario cuando necesite ingerir datos de un sistema de mensajería en el clúster. Mediante un enlace de salida, puede ingerir datos entrantes del sistema de mensajería en tablas de Azure Data Explorer.

El código define una función con un desencadenador RabbitMQ. La función ingiere mensajes, datos en formato JSON, en la tabla products de la base de datos productsdb .

public class QueueTrigger
{
    [FunctionName("QueueTriggerBinding")]
    [return: Kusto(Database: "productsdb",
                TableName = "products",
                Connection = "KustoConnectionString")]
    public static Product Run(
        [RabbitMQTrigger(queueName: "bindings.products.queue", ConnectionStringSetting = "rabbitMQConnectionAppSetting")] Product product,
        ILogger log)
    {
        log.LogInformation($"Dequeued product {product.ProductID}");
        return product;
    }
}

Para más información sobre las funciones, consulte Documentación de Azure Functions. La extensión de Azure Data Explorer está disponible en: