Freigeben über


Streamen von Daten als Eingabe in Stream Analytics

Stream Analytics lässt sich in Azure-Datenströme als Eingaben aus fünf verschiedenen Ressourcentypen integrieren.

Diese Eingaberessourcen können im selben Azure Abonnement wie Ihr Stream Analytics-Auftrag oder in einem anderen Abonnement vorhanden sein.

Komprimierung

Stream Analytics unterstützt die Komprimierung für alle Eingabequellen. Unterstützte Komprimierungstypen sind: None, Gzip und Deflate. Stream Analytics unterstützt keine Komprimierung für Referenzdaten. Wenn die Eingabedaten komprimierte Avro-Daten sind, verarbeitet Stream Analytics sie ohne weiteres Zutun. Sie müssen bei der Avro-Serialisierung keinen Komprimierungstyp angeben.

Erstellen, Bearbeiten oder Testen von Eingaben

Sie können das portal Azure, Visual Studio und Visual Studio Code verwenden, um vorhandene Eingaben für Ihren Streamingauftrag hinzuzufügen und anzuzeigen oder zu bearbeiten. Sie können auch Eingabeverbindungen und Abfragen aus Beispieldaten aus dem Azure Portal, Visual Studio und Visual Studio Code testen. Beim Schreiben einer Abfrage listen Sie die Eingabe in der FROM-Klausel auf. Sie können die Liste der verfügbaren Eingaben über die Abfrageseite im Portal abrufen. Wenn Sie mehrere Eingaben verwenden möchten, JOIN sie oder schreiben Sie mehrere SELECT Abfragen.

Hinweis

Verwenden Sie Stream Analytics-Tools für Visual Studio Code. Es gibt bekannte Featurelücken in Stream Analytics-Tools für Visual Studio 2019 (Version 2.6.3000.0), und es wird in Zukunft nicht verbessert.

Streamen von Daten aus Event Hubs

Azure Event Hubs ist ein hochgradig skalierbarer Ereignis-Ingestor im Publisher-Subscriber-Modell. Ein Event Hub kann Millionen von Ereignissen pro Sekunde erfassen. Auf diese Weise können Sie riesige Datenmengen verarbeiten und analysieren, die von vernetzten Geräten und Anwendungen erzeugt werden. Gemeinsam bieten Event Hubs und Stream Analytics eine End-to-End-Lösung für Echtzeitanalysen. Event Hubs leitet Ereignisse in Azure in Echtzeitmodi weiter, und Stream Analytics Aufträge verarbeiten diese Ereignisse kontinuierlich in Echtzeit. Beispielsweise können Sie Webklicks, Sensormesswerte oder Onlineprotokollereignisse an Event Hubs senden. Anschließend können Sie Stream Analytics-Aufträge erstellen, um Event Hubs für Eingabedaten zum Filtern, Aggregieren und Korrelieren in Echtzeit zu verwenden.

EventEnqueuedUtcTime ist der Zeitstempel der Ankunft eines Ereignisses in einem Event Hub und ist der Standardzeitstempel von Ereignissen, die von Event Hubs zu Stream Analytics stammen. Um die Daten als Datenstrom mithilfe eines Zeitstempels in der Ereignisnutzlast zu verarbeiten, müssen Sie das SCHLÜSSELwort TIMESTAMP BY verwenden.

Event Hubs-Consumergruppen

Konfigurieren Sie jeden Event Hub-Eingang mit einer eigenen Consumer-Gruppe. Wenn ein Job eine Selbstverknüpfung enthält oder mehrere Eingaben enthält, lesen mehrere Leser weiter unten einige der Eingaben. Dies wirkt sich auf die Anzahl der Leser in einer einzelnen Consumergruppe aus. Um zu vermeiden, dass die Event Hubs-Grenze von fünf Lesern pro Consumergruppe pro Partition überschritten wird, legen Sie eine Consumergruppe für jeden Stream Analytics-Auftrag fest. Darüber hinaus gilt ein Grenzwert von 20 Verbrauchergruppen für einen Event Hub im Standard-Tarif. Weitere Informationen finden Sie unter Troubleshoot Azure Stream Analytics Eingabedaten.

Erstellen einer Eingabe aus Event Hubs

In der folgenden Tabelle werden die einzelnen Eigenschaften in der Neue Eingabeseite im Azure Portal erläutert, um Dateneingaben von einem Event Hub zu streamen:

Eigenschaft BESCHREIBUNG
Eingabealias Ein Anzeigename, der in der Auftragsabfrage verwendet wird, um auf diese Eingabe zu verweisen.
Subscription Wählen Sie das Azure Abonnement aus, in dem die Event Hub-Ressource vorhanden ist.
Event Hub-Namespace Der Event Hubs-Namespace ist ein Container für eine Gruppe von Event Hubs. Wenn Sie einen Event Hub erstellen, erstellen Sie auch den Namespace.
Name des Event Hub Der Name des Event Hubs, der als Eingabe verwendet wird.
Event Hub Consumer Group (empfohlen) Verwenden Sie eine unterschiedliche Verbrauchergruppe für jeden Stream Analytics-Auftrag. Diese Zeichenfolge identifiziert die Verbrauchergruppe, die zum Erfassen von Daten aus dem Event Hub verwendet wird. Wenn Sie keine Consumergruppe angeben, verwendet der Stream Analytics-Auftrag die $Default Consumergruppe.
Authentifizierungsmodus Geben Sie den Authentifizierungstyp an, den Sie zum Herstellen einer Verbindung mit dem Event Hub verwenden möchten. Verwenden Sie eine connection string oder eine verwaltete Identität, um sich beim Event Hub zu authentifizieren. Für die Option „Verwaltete Identität“ können Sie entweder eine vom System zugewiesene verwaltete Identität für den Stream Analytics-Auftrag oder eine vom Benutzer zugewiesene verwaltete Identität erstellen, um sich beim Event Hub zu authentifizieren. Wenn Sie eine verwaltete Identität verwenden, muss die verwaltete Identität Mitglied der Rollen Azure Event Hubs Datenempfänger oder Azure Event Hubs Datenbesitzer sein.
Name der Event Hub-Richtlinie Die SAS-Richtlinie, die Zugriff auf Event Hubs ermöglicht. Jede gemeinsame Zugriffsrichtlinie hat einen Namen, die von Ihnen festgelegten Berechtigungen und Zugriffsschlüssel. Diese Option wird automatisch aufgefüllt, es sei denn, Sie wählen die Option aus, um die Event Hubs-Einstellungen manuell bereitzustellen.
Partitionsschlüssel Dieses optionale Feld ist nur verfügbar, wenn Sie Ihren Auftrag für die Verwendung der Kompatibilitätsebene 1.2 oder höher konfigurieren. Wenn Ihre Eingabe von einer Eigenschaft partitioniert wird, fügen Sie hier den Namen dieser Eigenschaft hinzu. Verwenden Sie es, um die Leistung Ihrer Abfrage zu verbessern, wenn sie eine PARTITION BY oder GROUP BY Klausel für diese Eigenschaft enthält. Wenn für diesen Auftrag die Kompatibilitätsebene 1.2 oder höher verwendet wird, wird dieses Feld standardmäßig auf PartitionId.
Ereignis serialisierungsformat Das Serialisierungsformat (JSON, CSV, Avro) des eingehenden Datenstroms. Stellen Sie sicher, dass das JSON-Format an der Spezifikation ausgerichtet ist und keine führende 0 für Dezimalzahlen enthält.
Codieren UTF-8 ist derzeit das einzige unterstützte Codierungsformat.
Ereigniskomprimierungstyp Der Komprimierungstyp, der zum Lesen des eingehenden Datenstroms verwendet wird, z. B. Keiner (Standard), Gzip oder Deflate.
Schemaregistrierung Wählen Sie die Schemaregistrierung mit Schemas für Ereignisdaten aus, die vom Event Hub empfangen werden.

Wenn Ihre Daten aus einer Event Hubs-Streamingeingabe stammen, haben Sie Zugriff auf die folgenden Metadatenfelder in ihrer Stream Analytics-Abfrage:

Eigenschaft BESCHREIBUNG
EventProcessedUtcTime Das Datum und die Uhrzeit, zu dem Stream Analytics das Ereignis verarbeitet.
EventEnqueuedUtcTime Das Datum und die Uhrzeit, zu dem Event Hubs die Ereignisse empfängt.
Partition-ID Die nullbasierte Partitions-ID für den Eingabeadapter.

Mithilfe dieser Felder können Sie eine Abfrage wie das folgende Beispiel schreiben:

SELECT
    EventProcessedUtcTime,
    EventEnqueuedUtcTime,
    PartitionId
FROM Input

Hinweis

Wenn Sie Event Hubs als Endpunkt für IoT Hub Routen verwenden, können Sie mithilfe der Funktion GetMetadataPropertyValue auf die IoT Hub Metadaten zugreifen.

Streamen von Daten aus IoT Hub

Azure IoT Hub ist ein hochskalierbarer Ereignisverarbeiter im Veröffentlichungs-Abonnenten-Modell, optimiert für IoT-Szenarien.

Der Standardzeitstempel von Ereignissen, die aus einem IoT Hub in Stream Analytics stammen, ist der Zeitstempel, den das Ereignis im IoT Hub eingegangen ist, das EventEnqueuedUtcTime ist. Verwenden Sie das Schlüsselwort TIMESTAMP BY, um Daten als Stream unter Verwendung eines Zeitstempels in der Ereignisdatenlast zu verarbeiten.

IoT Hub Verbrauchergruppen

Konfigurieren Sie jede Stream Analytics-IoT Hub Eingabe so, dass sie über eine eigene Consumergruppe verfügt. Wenn ein Job eine Selbstverknüpfung oder mehrere Eingaben enthält, können mehrere Leser dieselben Eingaben lesen. Dies wirkt sich auf die Anzahl der Leser in einer einzelnen Consumergruppe aus. Um zu vermeiden, dass die Azure IoT Hub Grenze von fünf Lesern pro Consumergruppe pro Partition überschritten wird, legen Sie eine Consumergruppe für jeden Stream Analytics-Auftrag fest.

Konfigurieren eines IoT Hub als Datenstromeingabe

In der folgenden Tabelle werden die einzelnen Eigenschaften in der Neue Eingabeseite im Azure Portal erläutert, wenn Sie eine IoT Hub als Datenstromeingabe konfigurieren.

Eigenschaft BESCHREIBUNG
Eingabealias Ein Anzeigename, der in der Auftragsabfrage verwendet wird, um auf diese Eingabe zu verweisen.
Subscription Wählen Sie das Abonnement aus, in dem die IoT Hub Ressource vorhanden ist.
IoT Hub Der Name des IoT-Hubs, der als Eingabe verwendet werden soll.
Consumergruppe Verwenden Sie für jeden Stream Analytics-Auftrag eine andere Verbrauchergruppe. Die Verbrauchergruppe erfasst Daten aus dem IoT Hub. Stream Analytics verwendet die $Default-Consumergruppe, sofern nicht anders angegeben.
Name der Richtlinie für gemeinsamen Zugriff Die Richtlinie für den freigegebenen Zugriff, die Zugriff auf die IoT Hub ermöglicht. Jede gemeinsame Zugriffsrichtlinie hat einen Namen, die von Ihnen festgelegten Berechtigungen und Zugriffsschlüssel.
Schlüssel für SAS-Richtlinie Der freigegebene Zugriffsschlüssel, der zum Autorisieren des Zugriffs auf die IoT Hub verwendet wird. Diese Option wird automatisch ausgefüllt, es sei denn, Sie wählen die Option aus, um die IoT Hub Einstellungen manuell bereitzustellen.
Endpunkt Der Endpunkt des IoT Hub.
Partitionsschlüssel Es ist ein optionales Feld, das nur verfügbar ist, wenn Sie Ihren Auftrag für die Verwendung der Kompatibilitätsebene 1.2 oder höher konfigurieren. Wenn Sie Ihre Eingabe durch eine Eigenschaft partitionieren, können Sie hier den Namen dieser Eigenschaft hinzufügen. Es wird verwendet, um die Leistung Ihrer Abfrage zu verbessern, wenn diese eine PARTITION BY- oder GROUP BY-Klausel für diese Eigenschaft enthält. Wenn dieser Auftrag den Kompatibilitätsgrad 1.2 oder höher verwendet, wird dieses Feld standardmäßig auf „PartitionId“ festgelegt.
Ereignis serialisierungsformat Das Serialisierungsformat (JSON, CSV, Avro) des eingehenden Datenstroms. Stellen Sie sicher, dass das JSON-Format an der Spezifikation ausgerichtet ist und keine führende 0 für Dezimalzahlen enthält.
Codieren UTF-8 ist derzeit das einzige unterstützte Codierungsformat.
Ereigniskomprimierungstyp Der Komprimierungstyp, der zum Lesen des eingehenden Datenstroms verwendet wird, z. B. Keiner (Standard), Gzip oder Deflate.

Wenn Sie Daten aus einem IoT Hub verwenden, haben Sie Zugriff auf die folgenden Metadatenfelder in Ihrer Stream Analytics-Abfrage:

Eigenschaft BESCHREIBUNG
EventProcessedUtcTime Das Datum und die Uhrzeit der Verarbeitung des Ereignisses.
EventEnqueuedUtcTime Das Datum und die Uhrzeit, zu dem die IoT Hub das Ereignis empfängt.
Partition-ID Die nullbasierte Partitions-ID für den Eingabeadapter.
IoTHub.MessageId Eine ID, die verwendet wird, um die bidirektionale Kommunikation in IoT Hub zu korrelieren.
IoTHub.CorrelationId Eine ID, die in Nachrichtenantworten und Feedbacks in IoT Hub verwendet wird.
IoTHub.ConnectionDeviceId Die Authentifizierung-ID, die zum Senden dieser Nachricht verwendet wird. Der IoT Hub stempelt diesen Wert für dienstgebundene Nachrichten.
IoTHub.ConnectionDeviceGenerationId Die Generierungs-ID des authentifizierten Geräts, das zum Senden dieser Nachricht verwendet wurde. Der IoT Hub stempelt diesen Wert für dienstgebundene Nachrichten.
IoTHub.EnqueuedTime Der Zeitpunkt, zu dem die IoT Hub die Nachricht empfängt.

Streamen von Daten aus Blob Storage oder Data Lake Storage Gen2

Für Szenarien, in denen große Mengen unstrukturierter Daten in der Cloud gespeichert werden, bietet Azure Blob Storage oder Azure Data Lake Storage Gen2 eine kostengünstige und skalierbare Lösung. Daten im BLOB-Speicher oder Azure Data Lake Storage Gen2 gelten als ruhende Daten. Stream Analytics kann diese Daten jedoch als Datenstrom verarbeiten.

Ein häufig verwendetes Szenario für die Verwendung solcher Eingaben mit Stream Analytics ist die Protokollverarbeitung. In diesem Szenario erfassen Sie Telemetriedatendateien aus einem System und müssen diese analysieren und verarbeiten, um aussagekräftige Daten zu extrahieren.

Der Standardzeitstempel eines Blob-Speichers oder Azure Data Lake Storage Gen2-Ereignisses in Stream Analytics ist der Zeitstempel, der zuletzt geändert wurde, was BlobLastModifiedUtcTime ist. Wenn Sie ein Blob um 13:00 in ein Speicherkonto hochladen und den Azure Stream Analytics-Auftrag mithilfe der Option Now um 13:01 starten, wird das Blob nicht übernommen, da seine Änderungszeit außerhalb des Zeitraums des Auftragslaufs liegt.

Wenn Sie einen BLOB in einen Speicherkontocontainer um 13:00 Hochladen und den Azure Stream Analytics Auftrag mithilfe von Custom Time um 13:00 oder früher starten, nimmt der Auftrag das Blob auf, da die geänderte Zeit in den Ausführungszeitraum des Auftrags fällt.

Wenn Sie einen Azure Stream Analytics Auftrag mit Now um 13:00 beginnen und ein Blob in den Speicherkonto-Container um 13:01 hochladen, wird das Blob von Azure Stream Analytics verarbeitet. Der jedem Blob zugewiesene Zeitstempel basiert nur auf BlobLastModifiedTime. Der Ordner, in dem sich das Blob befindet, hat keine Beziehung zum zugewiesenen Zeitstempel. Wenn zum Beispiel ein Blob 2019/10-01/00/b1.txt mit einem BlobLastModifiedTime von 2019-11-11 existiert, wird diesem Blob der Zeitstempel 2019-11-11 zugewiesen.

Um die Daten als Datenstrom mithilfe eines Zeitstempels in der Ereignisnutzlast zu verarbeiten, müssen Sie das SCHLÜSSELwort TIMESTAMP BY verwenden. Ein Stream Analytics-Auftrag ruft jede Sekunde Daten aus Azure Blob Storage oder Azure Data Lake Storage Gen2 ab, wenn die Blob-Datei verfügbar ist. Wenn die Blob-Datei nicht verfügbar ist, verwendet der Job einen exponentiellen Backoff mit einer maximalen Zeitverzögerung von 90 Sekunden.

Hinweis

Stream Analytics unterstützt das Hinzufügen von Inhalten zu einer vorhandenen BLOB-Datei nicht. Stream Analytics verarbeitet jede Datei nur einmal und berücksichtigt keine Änderungen, die in der Datei auftreten, nachdem der Job die Daten gelesen hat. Die Methode, alle Daten für eine Blobdatei auf einmal hochzuladen und dann zusätzliche neuere Ereignisse einer anderen, neuen Blobdatei hinzuzufügen, hat sich bewährt.

In Szenarien, in denen Sie kontinuierlich viele Blobs hinzufügen und Stream Analytics die Blobs während des Hinzufügens verarbeiten, können Sie einige Blobs in seltenen Fällen aufgrund der Granularität der BlobLastModifiedTime. Sie können dieses Problem beheben, indem Sie Blobs mindestens zwei Sekunden auseinander hochladen. Wenn diese Option nicht praktikabel ist, können Sie Event Hubs verwenden, um große Mengen von Ereignissen zu streamen.

Konfigurieren von Blob Storage als Datenstromeingabe

In der folgenden Tabelle werden die einzelnen Eigenschaften in der Neue Eingabeseite im Azure Portal erläutert, wenn Sie Blob Storage als Datenstromeingabe konfigurieren.

Eigenschaft BESCHREIBUNG
Eingabealias Ein Anzeigename, der in der Auftragsabfrage verwendet wird, um auf diese Eingabe zu verweisen.
Subscription Wählen Sie das Abonnement, in dem die Speicherressource vorhanden ist.
Speicherkonto Der Name des Speicherkontos, in dem die Blobdateien sich befinden.
Speicherkontoschlüssel Der geheime Schlüssel, der dem Speicherkonto zugeordnet ist. Diese Option wird automatisch ausgefüllt, es sei denn, Sie wählen die Option zum manuellen Bereitstellen der Einstellungen aus.
Container Container bieten eine logische Gruppierung für Blobs. Sie können entweder vorhandenen Container verwenden oder "Neu erstellen " auswählen, um einen neuen Container zu erstellen.
Authentifizierungsmodus Geben Sie den Authentifizierungstyp an, den Sie zum Herstellen einer Verbindung mit dem Speicherkonto verwenden möchten. Sie können eine connection string oder eine verwaltete Identität verwenden, um sich mit dem Speicherkonto zu authentifizieren. Für die Option „Verwaltete Identität“ können Sie entweder eine vom System zugewiesene verwaltete Identität für den Stream Analytics-Auftrag oder eine vom Benutzer zugewiesene verwaltete Identität erstellen, um sich beim Speicherkonto zu authentifizieren. Wenn Sie eine verwaltete Identität verwenden, muss die verwaltete Identität Mitglied einer geeigneten Rolle für das Speicherkonto sein.
Pfadmuster (optional) Der Dateipfad, der verwendet wird, um die Blobs im angegebenen Container zu suchen. Wenn Sie Blobs aus dem Containerstamm lesen möchten, legen Sie kein Pfadmuster fest. Innerhalb des Pfads können Sie eine oder mehrere Instanzen der folgenden drei Variablen angeben: {date}, , , {time}oder {partition}

Beispiel 1: cluster1/logs/{date}/{time}/{partition}

Beispiel 2: cluster1/logs/{date}

Das * Zeichen ist kein zulässiger Wert für das Pfadpräfix. Es sind nur gültige Azure BLOB-Zeichen zulässig. Schließen Sie keine Containernamen oder Dateinamen ein.
Datumsformat (optional) Wenn Sie die Datumsvariable im Pfad verwenden, wird das Datumsformat, in dem die Dateien organisiert sind, verwendet. Beispiel: YYYY/MM/DD

Wenn sich {date} oder {time} im Pfad der BLOB-Eingabe befinden, untersucht Stream Analytics die Ordner in zeitlicher Reihenfolge.
Zeitformat (optional) Wenn Sie die Zeitvariable im Pfad verwenden, wird das Zeitformat, in dem die Dateien organisiert sind, verwendet. Derzeit ist HH der einzige unterstützte Wert stundenlang.
Partitionsschlüssel Es ist ein optionales Feld, das nur verfügbar ist, wenn Sie Ihren Auftrag für die Verwendung der Kompatibilitätsebene 1.2 oder höher konfigurieren. Wenn Sie Ihre Eingabe durch eine Eigenschaft partitionieren, können Sie hier den Namen dieser Eigenschaft hinzufügen. Es wird verwendet, um die Leistung Ihrer Abfrage zu verbessern, wenn diese eine PARTITION BY- oder GROUP BY-Klausel für diese Eigenschaft enthält. Wenn dieser Auftrag den Kompatibilitätsgrad 1.2 oder höher verwendet, wird dieses Feld standardmäßig auf „PartitionId“ festgelegt.
Anzahl der Eingabepartitionen Dieses Feld ist nur vorhanden, wenn {partition} im Pfadmuster vorhanden ist. Der Wert dieser Eigenschaft ist eine ganze Zahl >=1. Wo immer {partition} in „pathPattern“ auftritt, wird eine Zahl zwischen 0 und dem Wert dieses Felds minus 1 verwendet.
Ereignis serialisierungsformat Das Serialisierungsformat (JSON, CSV, Avro) des eingehenden Datenstroms. Stellen Sie sicher, dass das JSON-Format an der Spezifikation ausgerichtet ist und keine führende 0 für Dezimalzahlen enthält.
Codieren Bei CSV und JSON ist UTF-8 gegenwärtig das einzige unterstützte Codierungsformat.
Compression Der Komprimierungstyp, der zum Lesen des eingehenden Datenstroms verwendet wird, z. B. Keiner (Standard), Gzip oder Deflate.

Wenn Ihre Daten aus einer Blob-Speicherquelle stammen, können Sie in Ihrer Stream Analytics-Abfrage auf die folgenden Metadatenfelder zugreifen:

Eigenschaft BESCHREIBUNG
BlobName Der Name des Eingabe-Blobs, aus dem das Ereignis stammt.
EventProcessedUtcTime Das Datum und die Uhrzeit, zu dem Stream Analytics das Ereignis verarbeitet.
BlobLastModifiedUtcTime Das Datum und die Uhrzeit der letzten Änderung des Blobs.
Partition-ID Die nullbasierte Partitions-ID für den Eingabeadapter.

Mithilfe dieser Felder können Sie eine Abfrage wie das folgende Beispiel schreiben:

SELECT
    BlobName,
    EventProcessedUtcTime,
    BlobLastModifiedUtcTime
FROM Input

Streamen von Daten aus Apache Kafka

mit Azure Stream Analytics können Sie eine direkte Verbindung mit Apache Kafka-Clustern herstellen, um Daten aufzunehmen. Die Lösung ist niedriger Code und wird vollständig vom Azure Stream Analytics-Team bei Microsoft verwaltet, sodass sie die Business Compliance-Standards erfüllt. Die Kafka-Eingabe ist abwärtskompatibel und unterstützt alle Versionen ab Version 0.10 mit der neuesten Clientversion. Sie können je nach Konfiguration eine Verbindung mit Kafka-Clustern in einem virtuellen Netzwerk und Kafka-Clustern mit einem öffentlichen Endpunkt herstellen. Die Konfiguration basiert auf vorhandenen Kafka-Konfigurationskonventionen. Unterstützte Komprimierungstypen sind None, Gzip, Snappy, LZ4 und Zstd.

Weitere Informationen finden Sie unter Stream-Daten von Kafka in Azure Stream Analytics (Vorschau).

Nächste Schritte