Dela via


Strömdata som indata till Stream Analytics

Stream Analytics integreras med Azure dataströmmar som indata från fem typer av resurser:

Dessa indataresurser kan finnas i samma Azure prenumeration som ditt Stream Analytics-jobb eller i en annan prenumeration.

Komprimering

Stream Analytics stöder komprimering för alla indatakällor. Komprimeringstyper som stöds är: None, Gzip och Deflate. Stream Analytics stöder inte komprimering för referensdata. Om indata är komprimerad Avro-data hanterar Stream Analytics dem automatiskt. Du behöver inte ange komprimeringstypen med Avro-serialisering.

Skapa, redigera eller testa indata

Du kan använda portalen Azure, Visual Studio och Visual Studio Code för att lägga till och visa eller redigera befintliga indata i ditt strömningsjobb. Du kan också testa indataanslutningar och testa frågor från exempeldata från Azure-portalen, Visual Studio och Visual Studio Code. När du skriver en fråga listar du indata i FROM-satsen. Du kan hämta listan över tillgängliga indata från sidan Fråga i portalen. Om du vill använda flera indata, JOIN dem eller skriv flera SELECT-frågor.

Anmärkning

Använd Stream Analytics-verktyg för Visual Studio Code för bästa lokala utveckling. Det finns kända funktionsluckor i Stream Analytics-verktyg för Visual Studio 2019 (version 2.6.3000.0) och det kommer inte att förbättras framöver.

Strömma data från Event Hubs

Azure Event Hubs är en mycket skalbar händelseöverförare med publicera-prenumerera-modell. En händelsehubb kan samla in miljontals händelser per sekund så att du kan bearbeta och analysera de enorma mängder data som produceras av dina anslutna enheter och program. Tillsammans tillhandahåller Event Hubs och Stream Analytics en lösning från slutpunkt till slutpunkt för realtidsanalys. Event Hubs matar in händelser i Azure i realtid och Stream Analytics-jobb bearbetar dessa händelser i realtid. Du kan till exempel skicka webbklick, sensoravläsningar eller onlinelogghändelser till Event Hubs. Du kan sedan skapa Stream Analytics-jobb för att använda Event Hubs för indata för realtidsfiltrering, aggregering och korrelation.

EventEnqueuedUtcTime är tidsstämpeln för en händelses ankomst till en händelsehubb och är standardtidsstämpeln för händelser som kommer från Event Hubs till Stream Analytics. Om du vill bearbeta data som en dataström med hjälp av en tidsstämpel i händelsenyttolasten måste du använda nyckelordet TIMESTAMP BY .

Event Hubs-konsumentgrupper

Konfigurera varje inmatning till händelsehubben med en egen konsumentgrupp. När ett jobb innehåller en självkoppling eller har flera indata kan fler än en läsare nedströms läsa indata. Den här situationen påverkar antalet läsare i en enskild konsumentgrupp. Om du vill undvika att överskrida gränsen för Event Hubs på fem läsare per konsumentgrupp per partition anger du en konsumentgrupp för varje Stream Analytics-jobb. Det finns också en gräns på 20 konsumentgrupper för en händelsehubb på standardnivå. För mer information, läs Felsöka Azure Stream Analytics-indata.

Skapa indata från Event Hubs

I följande tabell förklaras varje egenskap i sidan Ny indata i Azure-portalen för att strömma dataindata från en händelsehubb:

Fastighet Beskrivning
Indata-alias Ett vänligt namn som du använder i jobbets fråga för att referera till detta indata.
Subscription Välj den Azure prenumeration där händelsehubbresursen finns.
Event Hub-namnområde Event Hubs-namnområdet är en container för händelsehubbar. När du skapar en eventhub skapar du även ett namespace.
Namn på händelsehubb Namnet på den händelsehubb som ska användas som indata.
Event Hub-konsumentgrupp (rekommenderas) Använd en distinkt konsumentgrupp för varje Stream Analytics-jobb. Den här strängen identifierar konsumentgruppen som ska användas för att mata in data från händelsehubben. Om du inte anger en konsumentgrupp använder Stream Analytics-jobbet konsumentgruppen $Default.
Autentiseringsläge Ange vilken typ av autentisering du vill använda för att ansluta till händelsehubben. Använd en connection string eller en hanterad identitet för att autentisera med händelsehubben. För alternativet hanterad identitet kan du antingen skapa en systemtilldelad hanterad identitet för Stream Analytics-jobbet eller en användartilldelad hanterad identitet för att autentisera med händelsehubben. När du använder en hanterad identitet måste den hanterade identiteten vara medlem i Azure Event Hubs datamottagare eller Azure Event Hubs dataägarroller.
Event Hub-policy namn Principen för delad åtkomst som ger åtkomst till Event Hubs. Varje princip för delad åtkomst har ett namn, behörigheter som du anger och åtkomstnycklar. Det här alternativet fylls i automatiskt, såvida du inte väljer alternativet att ange Event Hubs-inställningarna manuellt.
Partitionsnyckel Det här valfria fältet är endast tillgängligt om du konfigurerar jobbet så att det använder kompatibilitetsnivå 1.2 eller senare. Om dina indata partitioneras av en egenskap lägger du till namnet på den här egenskapen här. Använd den för att förbättra prestanda för din fråga om den innehåller en PARTITION BY eller GROUP BY -sats för den här egenskapen. Om det här jobbet använder kompatibilitetsnivå 1.2 eller senare är det här fältet standard PartitionId.
Format för händelse serialisering Serialiseringsformatet (JSON, CSV, Avro) för den inkommande dataströmmen. Se till att JSON-formatet överensstämmer med specifikationen och inkluderar inte inledande 0 för decimaltal.
Kodning UTF-8 är för närvarande det enda kodningsformat som stöds.
Händelsekomprimeringstyp Komprimeringstypen som används för att läsa inkommande dataström, till exempel Ingen (standard), Gzip eller Deflate.
Schemaregister Välj schemaregistret med scheman för händelsedata som tas emot från händelsehubben.

När dina data kommer från en Event Hubs-indata har du åtkomst till följande metadatafält i Stream Analytics-frågan:

Fastighet Beskrivning
EventProcessedUtcTime Datum och tid när Stream Analytics bearbetar händelsen.
EventEnqueuedUtcTime Datum och tid när Event Hubs tar emot händelserna.
PartitionId Det nollbaserade partitions-ID för indataadapter.

Genom att använda de här fälten kan du skriva en fråga som i följande exempel:

SELECT
    EventProcessedUtcTime,
    EventEnqueuedUtcTime,
    PartitionId
FROM Input

Anmärkning

När du använder Event Hubs som slutpunkt för IoT Hub Vägar kan du komma åt IoT Hub metadata med hjälp av funktionen GetMetadataPropertyValue.

Strömma data från IoT Hub

Azure IoT Hub är en mycket skalbar händelseinsamling optimerad för IoT-tillämpningar.

Standardtidsstämpeln för händelser som kommer från en IoT Hub i Stream Analytics är tidsstämpeln som händelsen kom i IoT Hub, vilket är EventEnqueuedUtcTime. Om du vill bearbeta data som en dataström med hjälp av en tidsstämpel i händelsenyttolasten använder du nyckelordet TIMESTAMP BY .

IoT Hub konsumentgrupper

Konfigurera varje Stream Analytics IoT Hub-indataflöde så att de har en egen konsumentgrupp. När ett jobb innehåller en självkoppling eller när det har flera indata kan fler än en läsare läsa indata. Den här situationen påverkar antalet läsare i en enskild konsumentgrupp. Om du vill undvika att överskrida gränsen för Azure IoT Hub på fem läsare per konsumentgrupp per partition anger du en konsumentgrupp för varje Stream Analytics-jobb.

Konfigurera en IoT Hub som inmatning för dataströmmar

I följande tabell förklaras varje egenskap på sidan Ny indata i Azure-portalen när du konfigurerar en IoT Hub som strömingång.

Fastighet Beskrivning
Indata-alias Ett vänligt namn som du använder i jobbets fråga för att referera till detta indata.
Subscription Välj den prenumeration där den IoT Hub resursen finns.
IoT Hub Namnet på IoT Hub som ska användas som input.
Konsumentgrupp Använd en annan konsumentgrupp för varje Stream Analytics-jobb. Konsumentgruppen matar in data från IoT Hub. Stream Analytics använder konsumentgruppen $Default om du inte anger något annat.
namn på principer för delad åtkomst Principen för delad åtkomst som ger åtkomst till IoT Hub. Varje princip för delad åtkomst har ett namn, behörigheter som du anger och åtkomstnycklar.
Principnyckel för delad åtkomst Den delade åtkomstnyckeln som används för att auktorisera åtkomst till IoT Hub. Det här alternativet fylls i automatiskt om du inte väljer alternativet för att ange inställningarna för IoT Hub manuellt.
Slutpunkt Slutpunkten för IoT Hub.
Partitionsnyckel Det är ett valfritt fält som bara är tillgängligt om du konfigurerar jobbet så att det använder kompatibilitetsnivå 1.2 eller senare. Om du partitionera dina indata efter en egenskap kan du lägga till namnet på den här egenskapen här. Den används för att förbättra din frågas prestanda om den innehåller en PARTITION BY- eller GROUP BY-klausul för den här egenskapen. Om det här jobbet använder kompatibilitetsnivå 1.2 eller senare är det här fältet som standard "PartitionId".
Format för händelse serialisering Serialiseringsformatet (JSON, CSV, Avro) för den inkommande dataströmmen. Se till att JSON-formatet överensstämmer med specifikationen och inkluderar inte inledande 0 för decimaltal.
Kodning UTF-8 är för närvarande det enda kodningsformat som stöds.
Händelsekomprimeringstyp Komprimeringstypen som används för att läsa inkommande dataström, till exempel Ingen (standard), Gzip eller Deflate.

När du använder dataströmdata från en IoT Hub har du åtkomst till följande metadatafält i Stream Analytics-frågan:

Fastighet Beskrivning
EventProcessedUtcTime Datum och tid då händelsen bearbetades.
EventEnqueuedUtcTime Datum och tid när IoT Hub tar emot händelsen.
PartitionId Det nollbaserade partitions-ID för indataadapter.
IoTHub.MessageId Ett ID som används för att korrelera dubbelriktad kommunikation i IoT Hub.
IoTHub.CorrelationId Ett ID som används i meddelandesvar och feedback i IoT Hub.
IoTHub.ConnectionDeviceId Det autentiserings-ID som användes för att skicka det här meddelandet. IoT Hub stämplar det här värdet på tjänstbundna meddelanden.
IoTHub.ConnectionDeviceGenerationId Generations-ID för den autentiserade enhet som användes för att skicka det här meddelandet. IoT Hub stämplar det här värdet på servicebound-meddelanden.
IoTHub.EnqueuedTime Den tid då IoT Hub tar emot meddelandet.

Strömma data från Blob Storage eller Data Lake Storage Gen2

För scenarier som omfattar lagring av stora mängder ostrukturerade data i molnet erbjuder Azure Blob Storage eller Azure Data Lake Storage Gen2 en kostnadseffektiv och skalbar lösning. Data i Blob Storage eller Azure Data Lake Storage Gen2 betraktas som vilande data. Stream Analytics kan dock bearbeta dessa data som en dataström.

Ett vanligt scenario för att använda sådana indata med Stream Analytics är loggbearbetning. I det här scenariot samlar du in telemetridatafiler från ett system och behöver parsa och bearbeta dem för att extrahera meningsfulla data.

Standardtidsstämpeln för en Blob Storage- eller Azure Data Lake Storage Gen2-händelse i Stream Analytics är tidsstämpeln som den senast ändrades, vilket är BlobLastModifiedUtcTime. Om du laddar upp en blob till ett lagringskonto klockan 13:00 och startar Azure Stream Analytics-jobbet med hjälp av alternativet Now kl. 13:01, hämtar inte jobbet bloben eftersom den ändrade tiden ligger utanför jobbkörningsperioden.

Om du laddar upp en blob till en lagringskontocontainer klockan 13:00 och startar Azure Stream Analytics-jobbet med hjälp av Anpassad tid kl. 13:00 eller tidigare, hämtar jobbet bloben eftersom den ändrade tiden infaller inom jobbkörningsperioden.

Om du startar ett Azure Stream Analytics jobb med hjälp av Now kl. 13:00 och laddar upp en blob till lagringskontocontainern kl. 13:01 Azure Stream Analytics hämtar bloben. Tidsstämpeln som tilldelats varje blob baseras endast på BlobLastModifiedTime. Mappen som bloben finns i har ingen relation till den tilldelade tidsstämpeln. Om det till exempel finns en blob 2019/10-01/00/b1.txt med en BlobLastModifiedTime av 2019-11-11, så är 2019-11-11 tidsstämpeln som är tilldelad den här blobben.

Om du vill bearbeta data som en dataström med hjälp av en tidsstämpel i händelsenyttolasten måste du använda nyckelordet TIMESTAMP BY . Ett Stream Analytics-jobb hämtar data från Azure Blob Storage eller Azure Data Lake Storage Gen2 indata varje sekund om blobfilen är tillgänglig. Om blobfilen inte är tillgänglig använder jobbet en exponentiell backoff med en maximal tidsfördröjning på 90 sekunder.

Anmärkning

Stream Analytics har inte stöd för att lägga till innehåll i en befintlig blobfil. Stream Analytics visar endast varje fil en gång och bearbetar inga ändringar som sker i filen när jobbet har läst data. Bästa praxis är att ladda upp alla data för en blobfil samtidigt och sedan lägga till ytterligare nyare händelser i en annan, ny blobfil.

I scenarier där du kontinuerligt lägger till många blobar och Stream Analytics bearbetar blobarna när du lägger till dem kan du hoppa över vissa blobar i sällsynta fall på grund av kornigheten i BlobLastModifiedTime. Du kan åtgärda det här problemet genom att ladda upp blobar med minst två sekunders mellanrum. Om det här alternativet inte är möjligt kan du använda Event Hubs för att strömma stora mängder händelser.

Konfigurera Blob-lagring som en strömingång

I följande tabell förklaras varje egenskap i sidan Ny indata i Azure-portalen när du konfigurerar Blob Storage som indata för dataström.

Fastighet Beskrivning
Indata-alias Ett vänligt namn som du använder i jobbets fråga för att referera till detta indata.
Subscription Välj den prenumeration där lagringsresursen finns.
Lagringskonto Namnet på lagringskontot där blobfilerna finns.
Lagringskontonyckel Den hemliga nyckel som är associerad med lagringskontot. Det här alternativet fylls i automatiskt om du inte väljer alternativet för att ange inställningarna manuellt.
Container Containrar tillhandahåller en logisk gruppering för blobar. Du kan välja antingen Använd befintlig container eller Skapa ny för att skapa en ny container.
Autentiseringsläge Ange vilken typ av autentisering du vill använda för att ansluta till lagringskontot. Du kan använda en connection string eller en hanterad identitet för att autentisera med lagringskontot. För alternativet hanterad identitet kan du antingen skapa en systemtilldelad hanterad identitet till Stream Analytics-jobbet eller en användartilldelad hanterad identitet för att autentisera med lagringskontot. När du använder en hanterad identitet måste den hanterade identiteten vara medlem i en lämplig roll i lagringskontot.
Sökvägsmönster (valfritt) Filsökvägen som används för att hitta blobarna i den angivna containern. Om du vill läsa blobar från containerns rot anger du inget sökvägsmönster. I sökvägen kan du ange en eller flera instanser av följande tre variabler: {date}, {time}eller {partition}

Exempel 1: cluster1/logs/{date}/{time}/{partition}

Exempel 2: cluster1/logs/{date}

Tecknet * är inte ett tillåtet värde för sökvägsprefixet. Endast giltiga Azure blobtecken tillåts. Ta inte med containernamn eller filnamn.
Datumformat (valfritt) Om du använder datumvariabeln i sökvägen, datumformatet där filerna är ordnade. Exempel: YYYY/MM/DD

När blobindata har {date} eller {time} i sin sökväg tittar Stream Analytics på mapparna i stigande tidsordning.
Tidsformat (valfritt) Om du använder tidsvariabeln i sökvägen, tidsformatet där filerna är ordnade. För närvarande är HH det enda värdet som stöds i timmar.
Partitionsnyckel Det är ett valfritt fält som bara är tillgängligt om du konfigurerar jobbet så att det använder kompatibilitetsnivå 1.2 eller senare. Om du partitionera dina indata efter en egenskap kan du lägga till namnet på den här egenskapen här. Den används för att förbättra din frågas prestanda om den innehåller en PARTITION BY- eller GROUP BY-klausul för den här egenskapen. Om det här jobbet använder kompatibilitetsnivå 1.2 eller senare är det här fältet som standard "PartitionId".
Antal indatapartitioner Det här fältet finns bara när {partition} finns i sökvägsmönstret. Värdet för den här egenskapen är ett heltal >=1. Oavsett var {partition} visas i pathPattern används ett tal mellan 0 och värdet för det här fältet -1.
Format för händelse serialisering Serialiseringsformatet (JSON, CSV, Avro) för den inkommande dataströmmen. Se till att JSON-formatet överensstämmer med specifikationen och inkluderar inte inledande 0 för decimaltal.
Kodning FÖR CSV och JSON är UTF-8 för närvarande det enda kodningsformat som stöds.
Compression Komprimeringstypen som används för att läsa inkommande dataström, till exempel Ingen (standard), Gzip eller Deflate.

När dina data kommer från en Blob Storage-källa kan du komma åt följande metadatafält i Stream Analytics-frågan:

Fastighet Beskrivning
BlobName Namnet på den indatablob som händelsen kom från.
EventProcessedUtcTime Datum och tid när Stream Analytics bearbetar händelsen.
BlobLastModifiedUtcTime Datum och tid då bloben senast ändrades.
PartitionId Det nollbaserade partitions-ID för indataadapter.

Genom att använda de här fälten kan du skriva en fråga som i följande exempel:

SELECT
    BlobName,
    EventProcessedUtcTime,
    BlobLastModifiedUtcTime
FROM Input

Strömma data från Apache Kafka

Azure Stream Analytics kan du ansluta direkt till Apache Kafka-kluster för att mata in data. Lösningen är låg kod och hanteras helt av Azure Stream Analytics-teamet på Microsoft, så den uppfyller standarder för affärsefterlevnad. Kafka-indata är bakåtkompatibla och stöder alla versioner från version 0.10 med den senaste klientversionen. Du kan ansluta till Kafka-kluster i ett virtuellt nätverk och Kafka-kluster med en offentlig slutpunkt, beroende på konfigurationerna. Konfigurationen förlitar sig på befintliga Kafka-konfigurationskonventioner. Komprimeringstyper som stöds är None, Gzip, Snappy, LZ4 och Zstd.

Mer information finns i Strömdata från Kafka till Azure Stream Analytics (förhandsversion).

Nästa steg