Condividi tramite


Trasmettere dati come input in Analisi di flusso

Stream Analytics si integra con i flussi di dati di Azure come sorgenti di input provenienti da cinque tipi di risorse.

Queste risorse di input possono trovarsi nella stessa sottoscrizione di Azure del processo Stream Analytics o in una sottoscrizione diversa.

Compressione

Analisi di flusso supporta la compressione per tutte le origini di input. I tipi di compressione supportati sono: Nessuno, Gzip e Deflate. Analisi di flusso non supporta la compressione per i dati di riferimento. Se i dati di input sono compressi in formato Avro, Analisi di flusso li gestisce in modo trasparente. Non è necessario specificare il tipo di compressione con la serializzazione Avro.

Creare, modificare o testare gli input

È possibile usare il portale Azure, Visual Studio e Visual Studio Code per aggiungere e visualizzare o modificare gli input esistenti nel processo di streaming. È anche possibile testare le connessioni di input e testare le query dai dati di esempio dal portale di Azure, Visual Studio e Visual Studio Code. Quando si scrive una query, si elencano gli input nella clausola FROM. È possibile ottenere l'elenco degli input disponibili dalla pagina Query nel portale. Se si vogliono usare più input, JOIN o scrivere più SELECT query.

Nota

Per un'esperienza di sviluppo locale ottimale, usare gli strumenti Stream Analytics per Visual Studio Code. Esistono lacune nelle funzionalità note negli strumenti di Analisi di flusso per Visual Studio 2019 (versione 2.6.3000.0) e non verrà migliorata in futuro.

Trasmettere dati da Hub eventi

Azure Event Hubs è un sistema di acquisizione eventi publish-subscribe altamente scalabile. Un hub eventi può raccogliere milioni di eventi al secondo per consentire di elaborare e analizzare enormi quantità di dati generati dalle applicazioni e dai dispositivi connessi. Insieme, Hub eventi e Analisi di flusso offrono una soluzione end-to-end per l'analisi in tempo reale. Hub eventi inserisce eventi in Azure in tempo reale e i processi di Analisi di flusso elaborano tali eventi in tempo reale. È ad esempio possibile inviare clic sul Web, letture di sensori o eventi di log online agli Event Hubs È quindi possibile creare processi di Analisi di flusso per usare Hub eventi per i dati di input per il filtro, l'aggregazione e la correlazione in tempo reale.

EventEnqueuedUtcTime è il timestamp dell'arrivo di un evento in un hub eventi ed è il timestamp predefinito degli eventi provenienti da Hub eventi ad Analisi di flusso. Per elaborare i dati come flusso usando un timestamp nel payload dell'evento, è necessario usare la parola chiave TIMESTAMP BY .

Gruppi di consumer di Event Hubs

Configurare ogni input di event hub con il proprio gruppo di consumatori. Quando un processo contiene un self-join o ha più input, più di un lettore downstream potrebbe leggere alcuni input. Questa situazione influisce sul numero di lettori in un singolo gruppo di consumer. Per evitare di superare il limite di Event Hub di cinque lettori per gruppo di consumatori per partizione, designare un gruppo di consumatori per ogni processo di Stream Analytics. Esiste anche un limite di 20 gruppi di consumer per un hub eventi di livello Standard. Per altre informazioni, vedere Troubleshoot Azure Stream Analytics input.

Creare un input da Hub eventi

La tabella seguente illustra ogni proprietà nella pagina Nuovo input nel portale di Azure per trasmettere l'input dei dati da un hub eventi:

Proprietà Descrizione
Alias di input Nome descrittivo che viene usato nella query del processo per fare riferimento a questo input.
Abbonamento Scegliere la sottoscrizione Azure in cui è presente la risorsa dell'hub eventi.
Namespace dell'Event Hub Lo spazio dei nomi di Hub eventi è un contenitore per hub eventi. Quando si crea un hub eventi, viene creato anche lo spazio dei nomi.
Nome dell'Hub degli eventi Nome dell'hub eventi da usare come input.
Gruppo di consumatori dell'hub eventi (scelta consigliata) Usare un gruppo di consumer distinto per ogni job di Stream Analytics. Questa stringa identifica il gruppo di consumer da usare per l'inserimento di dati dall'hub eventi. Se non si specifica un gruppo di consumatori, il processo di Analisi di Flusso utilizza il $Default gruppo di consumatori.
Modalità di autenticazione Specificare il tipo di autenticazione da usare per connettersi all'hub eventi. Usare una stringa di connessione o un'identità gestita per l'autenticazione con l'hub degli eventi. Per l'opzione identità gestita, è possibile creare un'identità gestita assegnata dal sistema per il processo di Analisi di flusso o un'identità gestita assegnata dall'utente per l'autenticazione con l'hub eventi. Quando si utilizza un'identità gestita, questa deve essere membro dei ruoli Azure Event Hubs Data Receiver o Azure Event Hubs Data Owner.
Nome della policy dell'Event Hub Politica di accesso condiviso che fornisce l'accesso a Event Hubs. Tutti i criteri di accesso condiviso dispongono di un nome e di autorizzazioni impostati, nonché di chiavi di accesso. Questa opzione viene popolata automaticamente, a meno che non si selezioni l'opzione per fornire manualmente le impostazioni di Hub eventi.
Chiave partizione Questo campo facoltativo è disponibile solo se si configura il processo per l'uso del livello di compatibilità 1.2 o superiore. Se l'input è partizionato da una proprietà, aggiungere qui il nome di questa proprietà. Usa per migliorare le prestazioni della query se include una clausola PARTITION BY o GROUP BY su questa proprietà. Se questo processo usa il livello di compatibilità 1.2 o versione successiva, il valore predefinito di questo campo è PartitionId.
Formato di serializzazione degli eventi Formato di serializzazione (JSON, CSV, Avro) del flusso di dati in ingresso. Verificare che il formato JSON sia allineato alla specifica e che non includa 0 iniziale per i numeri decimali.
Codifica L'unico formato di codifica attualmente supportato è UTF-8.
Tipo di compressione degli eventi Tipo di compressione usato per leggere il flusso di dati in ingresso, ad esempio Nessuno (impostazione predefinita), Gzip o Deflate.
Registro schemi Selezionare il registro degli schemi con gli schemi per i dati degli eventi ricevuti dall'hub eventi.

Quando i dati provengono da un input del flusso di Event Hubs, è possibile accedere ai seguenti campi di metadati nella query di Stream Analytics:

Proprietà Descrizione
EventProcessedUtcTime Data e ora in cui Analisi di flusso elabora l'evento.
EventEnqueuedUtcTime Data e ora in cui Hub eventi riceve gli eventi.
PartitionId ID di partizione in base zero per l'adattatore di input.

Usando questi campi, è possibile scrivere una query come nell'esempio seguente:

SELECT
    EventProcessedUtcTime,
    EventEnqueuedUtcTime,
    PartitionId
FROM Input

Nota

Quando si usano Event Hubs come endpoint per le Route di IoT Hub, è possibile accedere ai metadati di IoT Hub usando la funzione GetMetadataPropertyValue.

Trasmettere dati da IoT Hub

Azure IoT Hub è un sistema di ingestione eventi publish-subscribe altamente scalabile ottimizzato per gli scenari IoT.

Il timestamp predefinito degli eventi provenienti da un IoT Hub in Analisi di flusso è il timestamp che l'evento è arrivato nel IoT Hub, ovvero EventEnqueuedUtcTime. Per elaborare i dati come flusso usando un timestamp nel payload dell'evento, usare la parola chiave TIMESTAMP BY .

IoT Hub gruppi di consumatori

Configurare ogni input di Stream Analytics IoT Hub per avere un proprio gruppo di consumatori. Quando un'attività contiene un'auto-associazione o ha più input, può avvenire che più di un lettore legga lo stesso input. Questa situazione influisce sul numero di lettori in un singolo gruppo di consumer. Per evitare di superare il limite di Azure IoT Hub di cinque lettori per gruppo di consumatori per partizione, designare un proprio gruppo di consumatori per ogni processo di Stream Analytics.

Configurare un IoT Hub come input del flusso di dati

La tabella seguente illustra ogni proprietà nella pagina Nuovo input del portale di Azure quando si configura un IoT Hub come input del flusso.

Proprietà Descrizione
Alias di input Nome descrittivo che viene usato nella query del processo per fare riferimento a questo input.
Abbonamento Scegliere la sottoscrizione in cui esiste la risorsa IoT Hub.
IoT Hub Nome del IoT Hub da usare come input.
Gruppo di consumer Usare un gruppo di consumatori diverso per ogni processo di Stream Analytics. Il gruppo di utenti riceve i dati dall'IoT Hub. Stream Analytics utilizza il gruppo di consumatori $Default, a meno che non venga specificato diversamente.
Nome criteri di accesso condiviso Criteri di accesso condiviso che forniscono l'accesso al IoT Hub. Tutti i criteri di accesso condiviso dispongono di un nome e di autorizzazioni impostati, nonché di chiavi di accesso.
Chiave criteri di accesso condiviso Chiave di accesso condivisa usata per autorizzare l'accesso al IoT Hub. Questa opzione viene popolata automaticamente, a meno che non si selezioni l'opzione per fornire manualmente le impostazioni di IoT Hub.
Punto finale Endpoint per il IoT Hub.
Chiave partizione Si tratta di un campo facoltativo disponibile solo se si configura il processo per l'uso del livello di compatibilità 1.2 o superiore. Se si partiziona l'input in base a una proprietà, è possibile aggiungere il nome di questa proprietà qui. Viene utilizzato per migliorare le prestazioni della query quando include un'istruzione PARTITION BY o GROUP BY su questa proprietà. Se questo processo usa il livello di compatibilità 1.2 o versione successiva, per impostazione predefinita questo campo è "PartitionId".
Formato di serializzazione degli eventi Formato di serializzazione (JSON, CSV, Avro) del flusso di dati in ingresso. Verificare che il formato JSON sia allineato alla specifica e che non includa 0 iniziale per i numeri decimali.
Codifica L'unico formato di codifica attualmente supportato è UTF-8.
Tipo di compressione degli eventi Tipo di compressione usato per leggere il flusso di dati in ingresso, ad esempio Nessuno (impostazione predefinita), Gzip o Deflate.

Quando si usano i dati di flusso da un IoT Hub, è possibile accedere ai campi di metadati seguenti nella query di Analisi di flusso:

Proprietà Descrizione
EventProcessedUtcTime Data e ora di elaborazione dell'evento.
EventEnqueuedUtcTime Data e ora in cui il IoT Hub riceve l'evento.
PartitionId ID di partizione in base zero per l'adattatore di input.
IoTHub.MessageId ID usato per correlare la comunicazione bidirezionale in IoT Hub.
IoTHub.CorrelationId ID usato nelle risposte ai messaggi e nei commenti e suggerimenti in IoT Hub.
IoTHub.ConnectionDeviceId ID di autenticazione usato per inviare questo messaggio. Il IoT Hub contrassegna questo valore nei messaggi associati al servizio.
IoTHub.ConnectionDeviceGenerationId ID di generazione del dispositivo autenticato che è stato usato per inviare questo messaggio. Il IoT Hub contrassegna questo valore nei messaggi servicebound.
IoTHub.EnqueuedTime Ora in cui il IoT Hub riceve il messaggio.

Trasmettere dati dall'archiviazione BLOB o Data Lake Storage Gen2

Per gli scenari che comportano l'archiviazione di grandi quantità di dati non strutturati nel cloud, Azure archiviazione BLOB o Azure Data Lake Storage Gen2 offre una soluzione conveniente e scalabile. I dati nell'archivio BLOB o Azure Data Lake Storage Gen2 sono considerati dati inattivi. Tuttavia, Analisi di flusso può elaborare questi dati come flusso di dati.

Uno scenario comunemente usato per l'uso di tali input con Analisi di flusso è l'elaborazione dei log. In questo scenario si acquisiscono i file di dati di telemetria da un sistema ed è necessario analizzarli ed elaborarli per estrarre dati significativi.

Il timestamp predefinito di un'archiviazione BLOB o di un evento Azure Data Lake Storage Gen2 in Analisi di flusso è il timestamp dell'ultima modifica, ovvero BlobLastModifiedUtcTime. Se si carica un BLOB in un account di archiviazione alle 13:00 e si avvia il processo di Azure Stream Analytics usando l'opzione Now alle 13:01, il processo non preleva il BLOB perché il tempo modificato non rientra nel periodo di esecuzione del processo.

Se si carica un BLOB in un contenitore dell'account di archiviazione alle 13:00 e si avvia il processo di Azure Stream Analytics usando Tempo personalizzato alle 13:00 o prima, il processo preleva il BLOB perché l'ora di modifica è compresa nel periodo in cui il processo viene eseguito.

Se avvii un processo Azure Stream Analytics usando Now alle 13:00 e carichi un BLOB nel contenitore dell'account su Azure Storage alle 13:01, Azure Stream Analytics preleva il BLOB. Il timestamp assegnato a ogni BLOB è basato solo su BlobLastModifiedTime. La cartella in cui si trova il BLOB non ha alcuna relazione con il timestamp assegnato. Ad esempio, se è presente un BLOB 2019/10-01/00/b1.txt con un BlobLastModifiedTime di 2019-11-11, il timestamp assegnato a questo BLOB è 2019-11-11.

Per elaborare i dati come flusso usando un timestamp nel payload dell'evento, è necessario usare la parola chiave TIMESTAMP BY . Un processo di analisi di flusso estrae i dati dall'archiviazione BLOB di Azure o dall'Azure Data Lake Storage Gen2 ogni secondo, se il file BLOB è disponibile. Se il file BLOB non è disponibile, il processo usa un backoff esponenziale con un ritardo massimo di 90 secondi.

Nota

Analisi di flusso non supporta l'aggiunta di contenuto a un file BLOB esistente. Analisi di flusso visualizza ogni file una sola volta e non elabora le modifiche apportate nel file dopo che il processo legge i dati. La procedura consigliata consiste nel caricare simultaneamente tutti i dati per un file di BLOB e quindi aggiungere gli altri eventi più nuovi in un nuovo file di BLOB diverso.

Negli scenari in cui si aggiungono continuamente molti BLOB e Analisi di flusso elabora i BLOB durante l'aggiunta, è possibile ignorare alcuni BLOB in rari casi a causa della granularità di BlobLastModifiedTime. È possibile attenuare questo problema caricando i BLOB a distanza di almeno due secondi. Se questa opzione non è fattibile, è possibile usare Hub eventi per trasmettere grandi volumi di eventi.

Configurare l'archiviazione BLOB come input del flusso

La tabella seguente illustra ogni proprietà nella pagina Nuovo input nel portale di Azure quando si configura l'archiviazione BLOB come input del flusso.

Proprietà Descrizione
Alias di input Nome descrittivo che viene usato nella query del processo per fare riferimento a questo input.
Abbonamento Scegliere la sottoscrizione in cui esiste la risorsa di archiviazione.
Account di archiviazione Nome dell'account di archiviazione in cui si trovano i file BLOB.
Chiave dell'account di archiviazione Chiave privata associata all'account di archiviazione. Questa opzione viene popolata automaticamente, a meno che non si selezioni l'opzione per fornire manualmente le impostazioni.
Contenitore I contenitori forniscono un raggruppamento logico per i BLOB. È possibile scegliere Usa contenitore esistente o Crea nuovo per creare un nuovo contenitore.
Modalità di autenticazione Specificare il tipo di autenticazione da usare per connettersi all'account di archiviazione. È possibile usare un connection string o un'identità gestita per l'autenticazione con l'account di archiviazione. Per l'opzione identità gestita, è possibile creare un'identità gestita assegnata dal sistema al processo di Analisi di flusso o un'identità gestita assegnata dall'utente per l'autenticazione con l'account di archiviazione. Quando si usa un'identità gestita, l'identità gestita deve essere membro di un ruolo appropriato nell'account di archiviazione.
Modello di percorso (facoltativo) Percorso del file utilizzato per trovare i BLOB nel contenitore specificato. Se si desidera leggere i BLOB dalla radice del contenitore, non impostare un modello di percorso. All'interno del percorso è possibile specificare una o più istanze delle tre variabili seguenti: {date}, {time}o {partition}

Esempio 1: cluster1/logs/{date}/{time}/{partition}

Esempio 2: cluster1/logs/{date}

Il * carattere non è un valore consentito per il prefisso del percorso. Sono consentiti solo caratteri Azure Blob validi. Non includere nomi di contenitori o nomi di file.
Formato data (facoltativo) Formato della data in base al quale vengono organizzati i file, se si usa la variabile date nel percorso. Esempio: YYYY/MM/DD

Quando l'input del BLOB ha {date} o {time} nel percorso, Stream Analytics esamina le cartelle in ordine cronologico crescente.
Formato ora (facoltativo) Formato dell'ora in base al quale vengono organizzati i file, se si usa la variabile time nel percorso. Attualmente l'unico valore supportato è HH per ore.
Chiave partizione Si tratta di un campo facoltativo disponibile solo se si configura il processo per l'uso del livello di compatibilità 1.2 o superiore. Se si partiziona l'input in base a una proprietà, è possibile aggiungere il nome di questa proprietà qui. Viene utilizzato per migliorare le prestazioni della query quando include un'istruzione PARTITION BY o GROUP BY su questa proprietà. Se questo processo usa il livello di compatibilità 1.2 o versione successiva, per impostazione predefinita questo campo è "PartitionId".
Conteggio delle partizioni di input Questo campo è presente solo quando {partition} è presente nel modello di percorso. Il valore di questa proprietà è un numero intero >=1. Ovunque venga visualizzato {partition} in pathPattern, verrà usato un numero compreso tra 0 e il valore di questo campo -1.
Formato di serializzazione degli eventi Formato di serializzazione (JSON, CSV, Avro) del flusso di dati in ingresso. Verificare che il formato JSON sia allineato alla specifica e che non includa 0 iniziale per i numeri decimali.
Codifica Per CSV e JSON, l'unico formato di codifica attualmente supportato è UTF-8.
Compression Tipo di compressione usato per leggere il flusso di dati in ingresso, ad esempio Nessuno (impostazione predefinita), Gzip o Deflate.

Quando i dati provengono da un'origine di archiviazione BLOB, è possibile accedere ai campi di metadati seguenti nella query di Analisi di flusso:

Proprietà Descrizione
BlobName Nome del BLOB di input da cui proviene l'evento.
EventProcessedUtcTime Data e ora in cui Analisi di flusso elabora l'evento.
BlobLastModifiedUtcTime Data e ora dell'ultima modifica del BLOB.
PartitionId ID di partizione in base zero per l'adattatore di input.

Usando questi campi, è possibile scrivere una query come nell'esempio seguente:

SELECT
    BlobName,
    EventProcessedUtcTime,
    BlobLastModifiedUtcTime
FROM Input

Trasmettere dati da Apache Kafka

Azure Stream Analytics consente di connettersi direttamente ai cluster Apache Kafka per inserire dati. La soluzione è a basso codice e completamente gestita dal team Azure Stream Analytics microsoft, in modo che soddisfi gli standard di conformità aziendali. L'input Kafka è compatibile con le versioni precedenti e supporta tutte le versioni a partire dalla versione 0.10 con la versione più recente del client. È possibile connettersi ai cluster Kafka all'interno di una rete virtuale e di cluster Kafka con un endpoint pubblico, a seconda delle configurazioni. La configurazione si basa sulle convenzioni di configurazione Kafka esistenti. I tipi di compressione supportati sono Nessuno, Gzip, Snappy, LZ4 e Zstd.

Per ulteriori informazioni, vedere la pagina Trasmissione dei dati da Kafka in Azure Stream Analytics (Anteprima).

Passaggi successivi