Delen via


Gegevens streamen als invoer in Stream Analytics

Stream Analytics kan worden geïntegreerd met Azure gegevensstromen als invoer van vijf soorten resources:

Deze invoerbronnen kunnen zich in hetzelfde Azure abonnement bevinden als uw Stream Analytics-taak of in een ander abonnement.

Compressie

Stream Analytics ondersteunt compressie voor alle invoerbronnen. Ondersteunde compressietypen zijn: Geen, Gzip en Deflate. Stream Analytics biedt geen ondersteuning voor compressie voor referentiegegevens. Als de invoergegevens Avro-gegevens gecomprimeerd zijn, verwerkt Stream Analytics deze transparant. U hoeft het compressietype niet op te geven met Avro-serialisatie.

Invoer maken, bewerken of testen

U kunt de Azure portal, Visual Studio en Visual Studio Code gebruiken om bestaande invoer in uw streamingtaak toe te voegen en weer te geven of te bewerken. U kunt ook invoerverbindingen testen en query's testen vanuit voorbeeldgegevens uit de Azure-portal, Visual Studio en Visual Studio Code. Wanneer u een query schrijft, geeft u de invoer in de FROM-component weer. U kunt de lijst met beschikbare invoer ophalen op de pagina Query in de portal. Als u meerdere invoergegevens wilt gebruiken, JOIN ze of schrijf meerdere SELECT queries.

Opmerking

Gebruik voor de beste lokale ontwikkelervaring Stream Analytics-hulpprogramma's voor Visual Studio Code. Er zijn bekende functie-hiaten in Stream Analytics-hulpprogramma's voor Visual Studio 2019 (versie 2.6.3000.0) en dit wordt in de toekomst niet verbeterd.

Gegevens streamen vanuit Event Hubs

Azure Event Hubs is een zeer schaalbaar publiceren-abonneren gebeurtenisontvanger. Een Event Hub kan miljoenen gebeurtenissen per seconde verzamelen, zodat u de enorme hoeveelheden gegevens kunt verwerken en analyseren die worden geproduceerd door uw verbonden apparaten en toepassingen. Event Hubs en Stream Analytics bieden samen een end-to-end oplossing voor realtime analyse. Event Hubs levert gebeurtenissen aan Azure in realtime, en Stream Analytics-taken verwerken die gebeurtenissen in realtime. U kunt bijvoorbeeld webklikken, sensormetingen of onlinelogboeken naar Event Hubs verzenden. Vervolgens kunt u Stream Analytics-taken maken om Event Hubs te gebruiken voor de invoergegevens voor realtime filteren, samenvoegen en correlatie.

EventEnqueuedUtcTime is de tijdstempel van de aankomst van een gebeurtenis in een Event Hub en is de standaardtijdstempel van gebeurtenissen die afkomstig zijn van Event Hubs naar Stream Analytics. Als u de gegevens wilt verwerken als een stroom met behulp van een tijdstempel in de nettolading van de gebeurtenis, moet u het trefwoord TIMESTAMP BY gebruiken.

Event Hubs-consumentengroepen

Configureer elke Event Hub-invoer met een eigen consumentengroep. Wanneer een taak een self-join bevat of meerdere invoerwaarden heeft, kan meer dan één lezer downstream enkele invoer lezen. Deze situatie is van invloed op het aantal lezers in één consumentengroep. Om te voorkomen dat de Event Hubs-limiet van vijf lezers per consumentengroep per partitie wordt overschreden, wijst u een consumentengroep aan voor elke Stream Analytics-taak. Er is ook een limiet van 20 consumentengroepen voor een Event Hub van de Standard-laag. Zie Troubleshoot Azure Stream Analytics inputs voor meer informatie.

Een invoer maken vanuit Event Hubs

In de volgende tabel wordt elke eigenschap op de pagina Nieuwe invoer in de Azure-portal uitgelegd om gegevensinvoer van een Event Hub te streamen:

Vastgoed Beschrijving
invoeralias Een gemakkelijk te gebruiken naam die u in de query van de job gebruikt om naar deze invoer te verwijzen.
Subscription Kies het Azure-abonnement waarin de Event Hub-resource bestaat.
Event Hub-naamruimte De Event Hubs-naamruimte is een container voor Event Hubs. Wanneer u een Event Hub maakt, maakt u ook de naamruimte.
Event Hub-naam De naam van de Event Hub die moet worden gebruikt als invoer.
Event Hub-consumentengroep (aanbevolen) Gebruik een afzonderlijke consumentengroep voor elke Stream Analytics-taak. Deze tekenreeks identificeert de consumentengroep die moet worden gebruikt voor het opnemen van gegevens uit de Event Hub. Als u geen consumentengroep opgeeft, gebruikt de Stream Analytics-taak de $Default consumentengroep.
Verificatiemodus Geef het type verificatie op dat u wilt gebruiken om verbinding te maken met de Event Hub. Gebruik een connection string of een beheerde identiteit om te verifiëren bij de Event Hub. Voor de optie beheerde identiteit kunt u een door het systeem toegewezen beheerde identiteit maken voor de Stream Analytics-taak of een door de gebruiker toegewezen beheerde identiteit om te verifiëren met de Event Hub. Wanneer u een beheerde identiteit gebruikt, moet de beheerde identiteit lid zijn van de rollen Azure Event Hubs Gegevensontvanger of Azure Event Hubs Gegevenseigenaar.
Naam van Event Hub-beleid Het beleid voor gedeelde toegang dat toegang biedt tot de Event Hubs. Elk beleid voor gedeelde toegang heeft een naam, machtigingen die u instelt en toegangssleutels. Deze optie wordt automatisch ingevuld, tenzij u de optie selecteert om de Event Hubs-instellingen handmatig op te geven.
Partitiesleutel Dit optionele veld is alleen beschikbaar als u uw taak configureert voor het gebruik van compatibiliteitsniveau 1.2 of hoger. Als uw invoer is gepartitioneerd door een eigenschap, voegt u hier de naam van deze eigenschap toe. Gebruik deze om de prestaties van uw query te verbeteren als deze een PARTITION BY of GROUP BY component voor deze eigenschap bevat. Als voor deze taak compatibiliteitsniveau 1.2 of hoger wordt gebruikt, wordt dit veld standaard ingesteld op PartitionId.
Indeling van gebeurtenisserialisatie De serialisatie-indeling (JSON, CSV, Avro) van de binnenkomende gegevensstroom. Zorg ervoor dat de JSON-indeling overeenkomt met de specificatie en geen voorlooptekens bevat voor decimale getallen.
Codering UTF-8 is momenteel de enige ondersteunde coderingsindeling.
Gebeurteniscompressietype Het compressietype dat wordt gebruikt voor het lezen van de binnenkomende gegevensstroom, zoals None (standaard), Gzip of Deflate.
Schemaregister Selecteer het schemaregister met schema's voor gebeurtenisgegevens die worden ontvangen van de Event Hub.

Wanneer uw gegevens afkomstig zijn van een Event Hubs-stroominvoer, hebt u toegang tot de volgende metagegevensvelden in uw Stream Analytics-query:

Vastgoed Beschrijving
EventProcessedUtcTime De datum en tijd waarop Stream Analytics de gebeurtenis verwerkt.
EventEnqueuedUtcTime De datum en tijd waarop Event Hubs de gebeurtenissen ontvangt.
Partitie-id De nul-gebaseerde partitie-ID voor de invoeradapter.

Met deze velden kunt u een query schrijven zoals in het volgende voorbeeld:

SELECT
    EventProcessedUtcTime,
    EventEnqueuedUtcTime,
    PartitionId
FROM Input

Opmerking

Wanneer u Event Hubs gebruikt als eindpunt voor IoT Hub Routes, hebt u toegang tot de IoT Hub metagegevens met behulp van de functie GetMetadataPropertyValue.

Gegevens streamen vanuit IoT Hub

Azure IoT Hub is een zeer schaalbare gebeurtenisverwerker voor publiceren en abonneren, geoptimaliseerd voor IoT-scenario's.

De standaardtijdstempel van gebeurtenissen die afkomstig zijn van een IoT Hub in Stream Analytics, is de tijdstempel dat de gebeurtenis is aangekomen in de IoT Hub. Dit is EventEnqueuedUtcTime. Als u de gegevens wilt verwerken als een stroom met behulp van een tijdstempel in de nettolading van de gebeurtenis, gebruikt u het trefwoord TIMESTAMP BY .

IoT Hub consumentengroepen

Configureer elke Stream Analytics-IoT Hub invoer om een eigen consumentengroep te hebben. Wanneer een taak een self-join bevat of als deze meerdere invoer heeft, kan meer dan één lezer wat invoer lezen. Deze situatie is van invloed op het aantal lezers in één consumentengroep. Om te voorkomen dat de Azure IoT Hub limiet van vijf lezers per consumentengroep per partitie wordt overschreden, wijst u een consumentengroep aan voor elke Stream Analytics-taak.

Een IoT Hub configureren als invoer van een gegevensstroom

In de volgende tabel wordt elke eigenschap op de pagina Nieuwe invoer in de Azure-portal uitgelegd wanneer u een IoT Hub configureert als stroominvoer.

Vastgoed Beschrijving
invoeralias Een gemakkelijk te gebruiken naam die u in de query van de job gebruikt om naar deze invoer te verwijzen.
Subscription Kies het abonnement waarin de IoT Hub resource bestaat.
IoT Hub De naam van de IoT Hub die moet worden gebruikt als invoer.
consumentengroep Gebruik een andere consumentengroep voor elke Stream Analytics-taak. De consumentengroep neemt gegevens op uit de IoT Hub. Stream Analytics maakt gebruik van de $Default consumentengroep, tenzij u anders opgeeft.
naam van beleid voor gedeelde toegang Het beleid voor gedeelde toegang dat toegang biedt tot de IoT Hub. Elk beleid voor gedeelde toegang heeft een naam, machtigingen die u instelt en toegangssleutels.
Sleutel voor gedeeld toegangsbeleid De gedeelde toegangssleutel die wordt gebruikt om toegang tot de IoT Hub te autoriseren. Deze optie wordt automatisch ingevuld, tenzij u de optie selecteert om de IoT Hub instellingen handmatig op te geven.
Eindpunt Het eindpunt voor de IoT Hub.
Partitiesleutel Het is een optioneel veld dat alleen beschikbaar is als u uw taak configureert voor het gebruik van compatibiliteitsniveau 1.2 of hoger. Als u uw invoer partitioneren door een eigenschap, kunt u hier de naam van deze eigenschap toevoegen. Deze wordt gebruikt voor het verbeteren van de prestaties van uw query als deze een PARTITION BY- of GROUP BY-component voor deze eigenschap bevat. Als voor deze taak compatibiliteitsniveau 1.2 of hoger wordt gebruikt, wordt dit veld standaard ingesteld op PartitionId.
Indeling van gebeurtenisserialisatie De serialisatie-indeling (JSON, CSV, Avro) van de binnenkomende gegevensstroom. Zorg ervoor dat de JSON-indeling overeenkomt met de specificatie en geen voorlooptekens bevat voor decimale getallen.
Codering UTF-8 is momenteel de enige ondersteunde coderingsindeling.
Gebeurteniscompressietype Het compressietype dat wordt gebruikt voor het lezen van de binnenkomende gegevensstroom, zoals None (standaard), Gzip of Deflate.

Wanneer u streamgegevens uit een IoT Hub gebruikt, hebt u toegang tot de volgende metagegevensvelden in uw Stream Analytics-query:

Vastgoed Beschrijving
EventProcessedUtcTime De datum en tijd waarop de gebeurtenis is verwerkt.
EventEnqueuedUtcTime De datum en tijd waarop de IoT Hub de gebeurtenis ontvangt.
Partitie-id De nul-gebaseerde partitie-ID voor de invoeradapter.
IoTHub.MessageId Een id die wordt gebruikt om communicatie in twee richtingen te correleren in IoT Hub.
IoTHub.CorrelationId Een id die wordt gebruikt in berichtantwoorden en feedback in IoT Hub.
IoTHub.ConnectionDeviceId De verificatie-id die wordt gebruikt om dit bericht te verzenden. De IoT Hub stempelt deze waarde op servicegebonden berichten.
IoTHub.ConnectionDeviceGenerationId De generatie-id van het geverifieerde apparaat dat is gebruikt om dit bericht te verzenden. De IoT Hub stempelt deze waarde op uitgaande berichten van de service.
IoTHub.EnqueuedTime Het tijdstip waarop de IoT Hub het bericht ontvangt.

Gegevens streamen vanuit Blob Storage of Data Lake Storage Gen2

Voor scenario's waarbij grote hoeveelheden ongestructureerde gegevens in de cloud moeten worden opgeslagen, biedt Azure Blob Storage of Azure Data Lake Storage Gen2 een rendabele en schaalbare oplossing. Gegevens in Blob Storage of Azure Data Lake Storage Gen2 worden beschouwd als gegevens in rust. Stream Analytics kan deze gegevens echter verwerken als een gegevensstroom.

Een veelgebruikt scenario voor het gebruik van dergelijke invoer met Stream Analytics is logboekverwerking. In dit scenario legt u telemetriegegevensbestanden van een systeem vast en moet u deze parseren en verwerken om zinvolle gegevens te extraheren.

De standaardtijdstempel van een Blob-opslag of Azure Data Lake Storage Gen2 gebeurtenis in Stream Analytics is de tijdstempel die het laatst is gewijzigd. Dit is BlobLastModifiedUtcTime. Als u een blob uploadt naar een opslagaccount om 13:00 uur en de Azure Stream Analytics taak start met behulp van de optie Now om 13:01, wordt de blob niet opgehaald omdat de gewijzigde tijd buiten de taakuitvoeringsperiode valt.

Als u een blob uploadt naar een opslagaccountcontainer om 13:00 uur en de Azure Stream Analytics taak start met behulp van Custom Time om 13:00 of eerder, wordt de blob opgehaald omdat de gewijzigde tijd binnen de uitvoeringsperiode van de taak valt.

Als u een Azure Stream Analytics taak start met behulp van Now om 13:00 en om 13:00 een blob uploadt naar de opslagaccountcontainer, Azure Stream Analytics de blob ophaalt. De tijdstempel die aan elke blob is toegewezen, is alleen gebaseerd op BlobLastModifiedTime. De map waarin de blob zich bevindt, heeft geen relatie met de tijdstempel die is toegewezen. Als er bijvoorbeeld een blob 2019/10-01/00/b1.txt is met een BlobLastModifiedTime van 2019-11-11, wordt de tijdstempel die aan deze blob is toegewezen 2019-11-11.

Als u de gegevens als een stroom wilt verwerken met behulp van een tijdstempel in de nettolading van de gebeurtenis, moet u het trefwoord TIMESTAMP BY gebruiken. Een Stream Analytics-taak haalt elke seconde gegevens op uit Azure Blob Storage of Azure Data Lake Storage Gen2 invoer als het blobbestand beschikbaar is. Als het blobbestand niet beschikbaar is, gebruikt de taak een exponentieel uitstel met een maximale vertraging van 90 seconden.

Opmerking

Stream Analytics biedt geen ondersteuning voor het toevoegen van inhoud aan een bestaand blobbestand. Stream Analytics bekijkt elk bestand slechts één keer en verwerkt geen wijzigingen die in het bestand optreden nadat de taak de gegevens heeft gelezen. Het is raadzaam om alle gegevens voor een blobbestand tegelijk te uploaden en vervolgens extra nieuwere gebeurtenissen toe te voegen aan een ander, nieuw blobbestand.

In scenario's waarin u continu veel blobs toevoegt en Stream Analytics de blobs verwerkt terwijl u ze toevoegt, kunt u sommige blobs in zeldzame gevallen overslaan vanwege de granulariteit van de BlobLastModifiedTime. U kunt dit probleem verhelpen door blobs ten minste twee seconden uit elkaar te uploaden. Als deze optie niet haalbaar is, kunt u Event Hubs gebruiken om grote hoeveelheden gebeurtenissen te streamen.

Blob-opslag configureren als stroominvoer

In de volgende tabel wordt elke eigenschap op de pagina Nieuwe invoer in de Azure-portal uitgelegd wanneer u Blob Storage configureert als stroominvoer.

Vastgoed Beschrijving
invoeralias Een gemakkelijk te gebruiken naam die u in de query van de job gebruikt om naar deze invoer te verwijzen.
Subscription Kies het abonnement waarin de opslagresource bestaat.
Opslagaccount De naam van het opslagaccount waarin de blobbestanden zich bevinden.
Sleutel van opslagaccount De geheime sleutel die is gekoppeld aan het opslagaccount. Deze optie wordt automatisch ingevuld, tenzij u de optie selecteert om de instellingen handmatig op te geven.
Container Containers bieden een logische groepering voor blobs. U kunt bestaande container gebruiken of Nieuwe container maken kiezen.
Verificatiemodus Geef het type verificatie op dat u wilt gebruiken om verbinding te maken met het opslagaccount. U kunt een connection string of een beheerde identiteit gebruiken om te verifiëren met het opslagaccount. Voor de optie beheerde identiteit kunt u een door het systeem toegewezen beheerde identiteit maken aan de Stream Analytics-taak of een door de gebruiker toegewezen beheerde identiteit om te verifiëren met het opslagaccount. Wanneer u een beheerde identiteit gebruikt, moet de beheerde identiteit lid zijn van een geschikte rol in het opslagaccount.
Padpatroon (optioneel) Het bestandspad dat wordt gebruikt om de blobs in de opgegeven container te vinden. Als u blobs wilt lezen uit de root van de container, stel dan geen padpatroon in. In het pad kunt u een of meer exemplaren van de volgende drie variabelen opgeven: {date}, {time}, of {partition}

Voorbeeld 1: cluster1/logs/{date}/{time}/{partition}

Voorbeeld 2: cluster1/logs/{date}

Het * teken is geen toegestane waarde voor het padvoorvoegsel. Alleen geldige Azure blobtekens zijn toegestaan. Neem geen containernamen of bestandsnamen op.
Datumnotatie (optioneel) Als u de datumvariabele in het pad gebruikt, specificeert u de datumformaat waarin de bestanden zijn geordend. Voorbeeld: YYYY/MM/DD

Wanneer de blob-invoer {date} of {time} in het pad heeft, bekijkt Stream Analytics de mappen in oplopende tijdsvolgorde.
Tijdnotatie (optioneel) Als u de tijdvariabele in het pad gebruikt, bepaalt u de tijdnotatie waarin de bestanden zijn ingedeeld. Momenteel is de enige ondersteunde waarde voor uren HH.
Partitiesleutel Het is een optioneel veld dat alleen beschikbaar is als u uw taak configureert voor het gebruik van compatibiliteitsniveau 1.2 of hoger. Als u uw invoer partitioneren door een eigenschap, kunt u hier de naam van deze eigenschap toevoegen. Deze wordt gebruikt voor het verbeteren van de prestaties van uw query als deze een PARTITION BY- of GROUP BY-component voor deze eigenschap bevat. Als voor deze taak compatibiliteitsniveau 1.2 of hoger wordt gebruikt, wordt dit veld standaard ingesteld op PartitionId.
Aantal invoerpartities Dit veld is alleen aanwezig wanneer {partition} aanwezig is in padpatroon. De waarde van deze eigenschap is een geheel getal >=1. Waar {partition} ook wordt weergegeven in pathPattern, wordt een getal tussen 0 en de waarde van dit veld -1 gebruikt.
Indeling van gebeurtenisserialisatie De serialisatie-indeling (JSON, CSV, Avro) van de binnenkomende gegevensstroom. Zorg ervoor dat de JSON-indeling overeenkomt met de specificatie en geen voorlooptekens bevat voor decimale getallen.
Codering Voor CSV en JSON is UTF-8 momenteel de enige ondersteunde coderingsindeling.
Compression Het compressietype dat wordt gebruikt voor het lezen van de binnenkomende gegevensstroom, zoals None (standaard), Gzip of Deflate.

Wanneer uw gegevens afkomstig zijn van een Blob Storage-bron, hebt u toegang tot de volgende metagegevensvelden in uw Stream Analytics-query:

Vastgoed Beschrijving
BlobName De naam van de invoerblob waaruit de gebeurtenis afkomstig is.
EventProcessedUtcTime De datum en tijd waarop Stream Analytics de gebeurtenis verwerkt.
BlobLastModifiedUtcTime De datum en tijd waarop de blob voor het laatst is gewijzigd.
Partitie-id De nul-gebaseerde partitie-ID voor de invoeradapter.

Met deze velden kunt u een query schrijven zoals in het volgende voorbeeld:

SELECT
    BlobName,
    EventProcessedUtcTime,
    BlobLastModifiedUtcTime
FROM Input

Gegevens streamen vanuit Apache Kafka

Azure Stream Analytics kunt u rechtstreeks verbinding maken met Apache Kafka-clusters om gegevens op te nemen. De oplossing is weinig code en wordt volledig beheerd door het Azure Stream Analytics-team van Microsoft, zodat deze voldoet aan de standaarden voor zakelijke naleving. De Kafka-invoer is achterwaarts compatibel en ondersteunt alle versies vanaf versie 0.10 met de nieuwste clientrelease. U kunt verbinding maken met Kafka-clusters in een virtueel netwerk en Kafka-clusters met een openbaar eindpunt, afhankelijk van de configuraties. De configuratie is afhankelijk van bestaande Kafka-configuratieconventies. Ondersteunde compressietypen zijn None, Gzip, Snappy, LZ4 en Zstd.

Zie Stream data from Kafka into Azure Stream Analytics (preview) voor meer informatie.

Volgende stappen