Compartir vía


Datos de flujo como entrada en Stream Analytics

Stream Analytics tiene una integración de primer nivel con los flujos de datos de Azure, como entrada desde cuatro tipos de recursos.

Estos recursos de entrada pueden residir en la misma suscripción de Azure que el trabajo de Stream Analytics o una suscripción diferente.

Compresión

Stream Analytics admite la compresión para todos los orígenes de entrada. Los tipos de compresión admitidos son: Ninguna, Gzip y Deflate. La compatibilidad con la compresión no está disponible para los datos de referencia. Si los datos de entrada fueran datos de Avro comprimidos, Stream Analytics los manipulará de forma transparente. No es necesario especificar el tipo de compresión con la serialización de Avro.

Crear, editar, o probar datos de entrada

Puede usar el portal Azure, Visual Studio y Visual Studio Code para agregar y ver o editar entradas existentes en el trabajo de streaming. También puede probar las conexiones de entrada y las consultas de prueba desde datos de ejemplo desde el portal de Azure, Visual Studio y Visual Studio Code. Al escribir una consulta, se muestra una lista de las entradas en la cláusula FROM. Puede obtener la lista de las entradas disponibles en la página Consulta del portal. Si quisiera usar varias entradas, use o escriba varias consultas .

Nota:

Se recomienda encarecidamente usar las herramientas de Stream Analytics para Visual Studio Code para obtener la mejor experiencia de desarrollo local. Hay brechas de características conocidas en las herramientas de Stream Analytics para Visual Studio 2019 (versión 2.6.3000.0) y no se mejorará en el futuro.

Transmitir datos desde Event Hubs

Azure Event Hubs es un ingeror de eventos de publicación y suscripción altamente escalable. Un centro de eventos puede recopilar millones de eventos por segundo, por lo que se pueden procesar y analizar las grandes cantidades de datos que generan las aplicaciones y los dispositivos conectados. Juntos, Event Hubs y Stream Analytics proporcionan una solución de un extremo a otro para realizar análisis en tiempo real. Event Hubs le permite alimentar eventos en Azure en tiempo real y los trabajos de Stream Analytics pueden procesar esos eventos en tiempo real. Por ejemplo, se pueden enviar clics en la web, lecturas de sensores o eventos de registro en línea a Event Hubs. Así, se pueden crear trabajos de Stream Analytics que usen Event Hubs para los datos de entrada para su filtrado, agregación y correlación en tiempo real.

es la marca de tiempo de la llegada de un evento a un centro de eventos y es la marca de tiempo predeterminada de los eventos procedentes de Event Hubs hacia Stream Analytics. Para procesar los datos como un flujo con una marca de tiempo en la carga del evento, se debe usar la palabra clave TIMESTAMP BY.

Grupo de consumidores de Event Hubs

Se debería configurar cada entrada del centro de eventos para que tenga su propio grupo de consumidores. Cuando un trabajo contiene una autocombinación o tiene varias entradas, es posible que algunas entradas sean leídas por más de un lector de un nivel inferior. Esta situación afecta al número de lectores de un solo grupo de consumidores. El procedimiento recomendado para evitar superar el límite de cinco lectores por grupo de consumidores por cada partición de Event Hubs consiste en designar un grupo de consumidores para cada trabajo de Stream Analytics. También hay un límite de 20 grupos de consumidores para los centros de eventos de nivel estándar. Para obtener más información, consulte Troubleshoot Azure Stream Analytics inputs.

Creación de una entrada desde Event Hubs

En la tabla siguiente se explica cada propiedad de la página New input del portal de Azure para transmitir la entrada de datos desde un centro de eventos:

Propiedad Descripción
Alias de entrada Nombre descriptivo que se usará en la consulta del trabajo para hacer referencia a esta entrada.
Suscripción Elija la suscripción Azure en la que existe el recurso del centro de eventos.
Espacio de nombres del Event Hub El espacio de nombres de Event Hubs es un contenedor de centros de eventos. Al crear un centro de eventos, también se crea el espacio de nombres.
Nombre del centro de eventos Nombre del centro de eventos que se usa como entrada.
Grupo de consumidores del centro de eventos (recomendado) Se recomienda usar un grupo de consumidores distinto para cada trabajo de Stream Analytics. Esta cadena identifica el grupo de consumidores que se usa para la ingesta de datos desde el centro de eventos. Si no se especifica ningún grupo de consumidores, el trabajo de Stream Analytics usa el grupo de consumidores .
Modo de autenticación Especifique el tipo de autenticación que desee usar para conectarse al centro de eventos. Puede usar una connection string o una identidad administrada para autenticarse con el centro de eventos. Para la opción de identidad administrada, podría crear una identidad administrada asignada por el sistema para el trabajo de Stream Analytics o una identidad administrada asignada por el usuario para autenticarse con el centro de eventos. Cuando se usa una identidad administrada, la identidad administrada debe ser miembro de los roles Azure Event Hubs Data Receiver o Azure Event Hubs Data Owner.
Nombre de la política de Event Hub La directiva de acceso compartido que proporciona acceso a Event Hubs. Cada directiva de acceso compartido tiene un nombre, los permisos establecidos y las claves de acceso. Esta opción se rellena automáticamente, a menos que seleccione la opción para proporcionar la configuración de Event Hubs de forma manual.
Clave de partición Es un campo opcional que solo está disponible si el trabajo está configurado para usar el nivel de compatibilidad 1.2 o cualquier nivel superior. Si la entrada está particionada por una propiedad, puede agregar aquí el nombre de esta propiedad. Se usa para mejorar el rendimiento de la consulta si incluye una cláusula o en esta propiedad. Si este trabajo usa el nivel de compatibilidad 1.2 o superior, el valor predeterminado de este campo será
Formato de serialización de eventos Formato de serialización (JSON, CSV, Avro) del flujo de datos entrante. Asegúrese de que el formato JSON responde a la especificación y no incluye un 0 inicial para números decimales.
Codificación Por el momento, UTF-8 es el único formato de codificación compatible.
Tipo de compresión de eventos Tipo de compresión utilizado para leer el flujo de datos entrante, como Ninguno (valor predeterminado), Gzip o Deflate.
Repositorio de esquema Seleccione el registro de esquema con esquemas para los datos de eventos que se reciban del centro de eventos.

Cuando los datos proceden de una entrada de flujo de Event Hubs, tiene acceso a los siguientes campos de metadatos en la consulta de Stream Analytics:

Propiedad Descripción
EventProcessedUtcTime Fecha y hora en que Stream Analytics procesa el evento.
EventEnqueuedUtcTime Fecha y hora en que Event Hubs recibe los eventos.
Id de Partición Identificador de partición de base cero para el adaptador de entrada.

Por ejemplo, si usa estos campos, puede escribir una consulta similar al ejemplo siguiente:

SELECT
    EventProcessedUtcTime,
    EventEnqueuedUtcTime,
    PartitionId
FROM Input

Nota:

Al usar Event Hubs como punto de conexión para rutas de IoT Hub, puede acceder a los metadatos de IoT Hub mediante la función GetMetadataPropertyValue.

Transmisión de datos de IoT Hub

Azure IoT Hub es un ingeror de eventos de publicación y suscripción altamente escalable optimizado para escenarios de IoT.

La marca de tiempo predeterminada de los eventos procedentes de un IoT Hub en Stream Analytics es la marca de tiempo que el evento llegó a la IoT Hub, que es EventEnqueuedUtcTime. Para procesar los datos como un flujo con una marca de tiempo en la carga del evento, se debe usar la palabra clave TIMESTAMP BY.

grupos de consumidores de IoT Hub

Debe configurar cada entrada de Stream Analytics IoT Hub para tener su propio grupo de consumidores. Cuando un trabajo contiene una autocombinación o tiene varias entradas, es posible que alguna entrada sea leída por más de un lector posteriormente. Esta situación afecta al número de lectores de un solo grupo de consumidores. Para evitar superar el límite de Azure IoT Hub de cinco lectores por grupo de consumidores por partición, se recomienda designar un grupo de consumidores para cada trabajo de Stream Analytics.

Configuración de un IoT Hub como entrada de flujo de datos

En la tabla siguiente se explica cada propiedad de la página New input del portal de Azure al configurar un IoT Hub como entrada de flujo.

Propiedad Descripción
Alias de entrada Nombre descriptivo que se usará en la consulta del trabajo para hacer referencia a esta entrada.
Suscripción Elija la suscripción en la que existe el recurso IoT Hub.
IoT Hub Nombre del IoT Hub que se va a usar como entrada.
Grupo de consumidores Se recomienda usar un grupo de consumidores distinto para cada trabajo de Stream Analytics. El grupo de consumidores que se usa para ingerir datos desde Azure IoT Hub. Stream Analytics usa el grupo de consumidores $Default, a menos que se especifique lo contrario.
Nombre de directiva de acceso compartido Directiva de acceso compartido que proporciona acceso al IoT Hub. Cada directiva de acceso compartido tiene un nombre, los permisos establecidos y las claves de acceso.
Clave de directiva de acceso compartido Clave de acceso compartido que se usa para autorizar el acceso a la IoT Hub. Esta opción se rellena automáticamente a menos que seleccione la opción para proporcionar la configuración de IoT Hub manualmente.
Punto de conexión Punto de conexión del IoT Hub.
Clave de partición Es un campo opcional que solo está disponible si el trabajo está configurado para usar el nivel de compatibilidad 1.2 o cualquier nivel superior. Si la entrada está particionada por una propiedad, puede agregar aquí el nombre de esta propiedad. Se usa para mejorar el rendimiento de la consulta si incluye una cláusula PARTITION BY o GROUP BY en esta propiedad. Si este trabajo usase el nivel de compatibilidad 1.2 o superior, el valor predeterminado de este campo será "PartitionId".
Formato de serialización de eventos Formato de serialización (JSON, CSV, Avro) del flujo de datos entrante. Asegúrese de que el formato JSON responde a la especificación y no incluye un 0 inicial para números decimales.
Codificación Por el momento, UTF-8 es el único formato de codificación compatible.
Tipo de compresión de eventos Tipo de compresión utilizado para leer el flujo de datos entrante, como Ninguno (valor predeterminado), Gzip o Deflate.

Cuando se usan datos de flujo desde un IoT Hub, tiene acceso a los siguientes campos de metadatos en la consulta de Stream Analytics:

Propiedad Descripción
EventProcessedUtcTime Fecha y la hora en que se produjo el evento.
EventEnqueuedUtcTime Fecha y hora en que el IoT Hub recibe el evento.
Id de Partición Identificador de partición de base cero para el adaptador de entrada.
IoTHub.MessageId Identificador que se usa para correlacionar la comunicación bidireccional en IoT Hub.
IoTHub.CorrelationId Identificador que se usa en las respuestas de mensajes y los comentarios en IoT Hub.
IoTHub.ConnectionDeviceId Identificador de autenticación que se usa para enviar este mensaje. El IoT Hub marca este valor en los mensajes enlazados al servicio.
IoTHub.ConnectionDeviceGenerationId (identificador de generación de dispositivo de conexión en IoTHub) Identificador de generación del dispositivo autenticado que se ha usado para enviar este mensaje. El IoT Hub marca este valor en los mensajes destinados al servicio.
IoTHub.EnqueuedTime Hora a la que el IoT Hub recibe el mensaje.

Transmisión de datos desde Blob Storage o Data Lake Storage Gen2

En escenarios con grandes cantidades de datos no estructurados para almacenarlos en la nube, Azure Blob Storage o Azure Data Lake Storage Gen2 ofrece una solución rentable y escalable. Los datos de Blob Storage o Azure Data Lake Storage Gen2 se consideran datos en reposo. Sin embargo, Stream Analytics puede procesar estos datos como flujo de datos.

Un escenario que se usa normalmente para utilizar tales entradas con Stream Analytics es el procesamiento de registros. En este escenario, se capturan archivos de datos de telemetría de un sistema y es preciso analizarlos y procesarlos para extraer datos significativos.

La marca de tiempo predeterminada de un evento de Blob Storage o Azure Data Lake Storage Gen2 en Stream Analytics es la marca de tiempo que se modificó por última vez, que es BlobLastModifiedUtcTime. Si un blob se carga en una cuenta de almacenamiento a las 13:00 y el trabajo de Azure Stream Analytics se inicia con la opción Now a las 13:01, no será procesado ya que su hora de modificación está fuera del período de ejecución del trabajo.

Si se carga un blob en un contenedor de cuenta de almacenamiento a las 13:00 y el trabajo Azure Stream Analytics se inicia con la opción Hora personalizada a las 13:00 o antes, el blob se recogerá, ya que su hora de modificación está dentro del período de ejecución del trabajo.

Si se inicia un trabajo de Azure Stream Analytics mediante Now a las 13:00 y se carga un blob en el contenedor de la cuenta de almacenamiento a las 13:01, Azure Stream Analytics selecciona el blob. La marca de tiempo asignada a cada blob se basa únicamente en . La carpeta en la que se encuentra el blob no guarda ninguna relación con la marca de tiempo asignada. Por ejemplo, si hubiera un blob con un de , la marca de tiempo asignada a este blob sería .

Para procesar los datos como un flujo con una marca de tiempo en la carga del evento, se debe usar la palabra clave TIMESTAMP BY. Un trabajo de Stream Analytics extrae datos de entrada de Azure Blob Storage o Azure Data Lake Storage Gen2 cada segundo si el archivo de blob está disponible. Si el archivo de blob no está disponible, hay un retroceso exponencial con un retraso de tiempo máximo de 90 segundos.

Nota:

Stream Analytics no permite agregar contenido a un archivo de blob existente. Stream Analytics solo verá cada archivo una vez y los cambios que se produzcan en él después de que el trabajo lea los datos no se procesan. Se recomienda cargar todos los datos de un archivo de blob a la vez y, a continuación, agregar los eventos más recientes a un archivo de blob diferente y nuevo.

En aquellos escenarios en los que se agregan continuamente muchos blobs que Stream Analytics procesa a medida que se agregan, es posible que se omitan algunos blobs en raras ocasiones debido a la granularidad del objeto . Esto se puede mitigar cargando los blobs con al menos dos segundos de diferencia. Si esta opción no es factible, se puede usar Event Hubs para transmitir grandes volúmenes de eventos.

Configuración de Blob Storage como entrada de flujo

En la tabla siguiente se explica cada propiedad de la página New input del portal de Azure al configurar Blob Storage como entrada de flujo.

Propiedad Descripción
Alias de entrada Nombre amigable que usas en la consulta del trabajo para referenciar esta entrada.
Suscripción Elija la suscripción en la que se encuentra el recurso de almacenamiento.
Cuenta de almacenamiento Nombre de la cuenta de almacenamiento donde se encuentran los archivos de blob.
Clave de cuenta de almacenamiento La clave secreta asociada con la cuenta de almacenamiento. Esta opción se rellena automáticamente, a menos que elija proporcionar la configuración de forma manual.
Contenedor Los contenedores proporcionan una agrupación lógica de los blobs. Puede elegir Usar contenedor existente o Crear uno nuevo para que se cree un nuevo contenedor.
Modo de autenticación Especifique el tipo de autenticación que desea usar para conectarse a la cuenta de almacenamiento. Puede usar una connection string o una identidad administrada para autenticarse con la cuenta de almacenamiento. Para la opción de identidad administrada, podría crear una identidad administrada asignada por el sistema al trabajo de Stream Analytics o una identidad administrada asignada por el usuario para autenticarse con la cuenta de almacenamiento. Cuando se usa una identidad administrada, la identidad administrada debe ser miembro de un rol adecuado de la cuenta de almacenamiento.
Patrón de ruta (opcional) Ruta de archivo que sirve para ubicar los blobs dentro del contenedor especificado. Si desea leer los blobs de la raíz del contenedor, no establezca un patrón de ruta de acceso. Dentro de la ruta, puede especificar una o más instancias de las tres variables siguientes: , o .

Ejemplo 1:

Ejemplo 2:

El carácter no es un valor permitido para el prefijo de ruta de acceso. Solo se permiten caracteres válidos de Azure blob. No incluya nombres de contenedor ni nombres de archivo.
Formato de fecha (opcional) Si usa la variable de fecha en la ruta, formato de fecha por el que se organizan los archivos. Ejemplo:

Cuando la entrada de blob incluye  o en su ruta de acceso, las carpetas se examinan en orden temporal ascendente.
Formato de hora (opcional) Si usa la variable de hora en la ruta, formato de hora por el que se organizan los archivos. Actualmente, el único valor admitido es para las horas.
Clave de partición Es un campo opcional que solo está disponible si el trabajo está configurado para usar el nivel de compatibilidad 1.2 o cualquier nivel superior. Si la entrada está particionada por una propiedad, puede agregar aquí el nombre de esta propiedad. Se usa para mejorar el rendimiento de la consulta si incluye una cláusula PARTITION BY o GROUP BY en esta propiedad. Si este trabajo usase el nivel de compatibilidad 1.2 o superior, el valor predeterminado de este campo será "PartitionId".
Count of input partitions (Recuento de particiones de entrada) Este campo solo está presente cuando {partition} está presente en el patrón de ruta. El valor de esta propiedad es un entero = 1. Siempre que {partition} aparezca en pathPattern, se usará un número entre 0 y el valor de este campo -1.
Formato de serialización de eventos Formato de serialización (JSON, CSV, Avro) del flujo de datos entrante. Asegúrese de que el formato JSON responde a la especificación y no incluye un 0 inicial para números decimales.
Codificación Por el momento, UTF-8 es el único formato de codificación compatible para CSV y JSON.
Compresión Tipo de compresión utilizado para leer el flujo de datos entrante, como Ninguno (valor predeterminado), Gzip o Deflate.

Cuando los datos proceden de un origen de Blob Storage, puede acceder a los siguientes campos de metadatos en la consulta de Stream Analytics:

Propiedad Descripción
Nombre del Blob Nombre del blob de entrada de donde procede el evento.
EventProcessedUtcTime Fecha y hora en que Stream Analytics procesa el evento.
HoraDeÚltimaModificaciónDelBlobUTC Fecha y hora en que se modificó por última vez el blob.
Id de Partición Identificador de partición de base cero para el adaptador de entrada.

Por ejemplo, si usa estos campos, puede escribir una consulta similar al ejemplo siguiente:

SELECT
    BlobName,
    EventProcessedUtcTime,
    BlobLastModifiedUtcTime
FROM Input

Transmisión de datos desde Apache Kafka

Azure Stream Analytics permite conectarse directamente a clústeres de Apache Kafka para ingerir datos. La solución es poco código y está totalmente administrada por el equipo de Azure Stream Analytics en Microsoft, lo que le permite cumplir los estándares de cumplimiento empresarial. La entrada de Kafka es compatible con versiones anteriores y admite todas las versiones con la versión de cliente más reciente a partir de la versión 0.10. Los usuarios pueden conectarse a clústeres de Kafka dentro de una red virtual y a clústeres de Kafka con un punto de conexión público, en función de la configuración. La configuración se basa en las convenciones de configuración de Kafka existentes. Los tipos de compresión admitidos son None, Gzip, Snappy, LZ4 y Zstd.

Para obtener más información, consulte Stream data from Kafka into Azure Stream Analytics (Preview).

Pasos siguientes