Condividi tramite


Analizzare i dati JSON e Avro in Azure Stream Analytics

Il servizio Azure Stream Analytics supporta l'elaborazione di eventi in formati di dati CSV, JSON e Avro. Entrambi i dati JSON e Avro possono essere strutturati e possono contenere alcuni tipi complessi come oggetti annidati (record) e matrici.

Tipi di dati record

I tipi di dati record vengono usati per rappresentare le matrici JSON e Avro quando vengono usati formati corrispondenti nei flussi di dati di input. Questi esempi illustrano un sensore di esempio che legge gli eventi di input in formato JSON. Di seguito è riportato un esempio di un singolo evento:

{
    "DeviceId" : "12345",
    "Location" :
    {
        "Lat": 47,
        "Long": 122
    },
    "SensorReadings" :
    {
        "Temperature" : 80,
        "Humidity" : 70,
        "CustomSensor01" : 5,
        "CustomSensor02" : 99,
        "SensorMetadata" : 
        {
        "Manufacturer":"ABC",
        "Version":"1.2.45"
        }
    }
}

Accedere ai campi annidati nello schema noto

Usare la notazione punto (.) per accedere ai campi annidati direttamente dalla query. Ad esempio, questa query seleziona le coordinate di latitudine e longitudine nella proprietà Location dei dati JSON precedenti. Usare la notazione del punto per spostarsi tra più livelli, come illustrato nel frammento di codice seguente:

SELECT
    DeviceID,
    Location.Lat,
    Location.Long,
    SensorReadings.Temperature,
    SensorReadings.SensorMetadata.Version
FROM input

Il risultato è:

|DeviceID|Lat|Long|Temperature|Version|
|-|-|-|-|-|
|12345|47|122|80|1.2.45|

Selezionare tutte le proprietà

È possibile usare il carattere jolly * per selezionare tutte le proprietà di un record annidato. Si consideri l'esempio seguente:

SELECT
    DeviceID,
    Location.*
FROM input

Il risultato è:

|DeviceID|Lat|Long|
|-|-|-|
|12345|47|122|

Accedere ai campi annidati quando il nome della proprietà è una variabile

Usare la funzione GetRecordPropertyValue se il nome della proprietà è una variabile. Questa funzione ti aiuta a costruire query dinamiche senza dover codificare manualmente i nomi delle proprietà.

Si supponga, ad esempio, che il flusso di dati di esempio debba essere unito a dati di riferimento contenenti soglie per ogni sensore di dispositivo. Un frammento di tali dati di riferimento è illustrato nel frammento di codice seguente.

{
    "DeviceId" : "12345",
    "SensorName" : "Temperature",
    "Value" : 85
},
{
    "DeviceId" : "12345",
    "SensorName" : "Humidity",
    "Value" : 65
}

L'obiettivo è unire il set di dati di esempio dall'inizio dell'articolo a tali dati di riferimento e restituire un evento per ogni misura del sensore al di sopra della soglia. Questo join significa che il singolo evento può generare più eventi di output se più sensori superano le rispettive soglie. Per ottenere risultati simili senza un join, vedere l'esempio seguente:

SELECT
    input.DeviceID,
    thresholds.SensorName,
    "Alert: Sensor above threshold" AS AlertMessage
FROM input      -- stream input
JOIN thresholds -- reference data input
ON
    input.DeviceId = thresholds.DeviceId
WHERE
    GetRecordPropertyValue(input.SensorReadings, thresholds.SensorName) > thresholds.Value

GetRecordPropertyValue seleziona la proprietà in SensorReadings che corrisponde al nome della proprietà proveniente dai dati di riferimento. Estrae quindi il valore associato da SensorReadings.

Il risultato è:

|DeviceID|SensorName|AlertMessage|
| - | - | - |
| 12345 | Humidity | Alert: Sensor above threshold |

Convertire i campi di un record in eventi separati

Per convertire i campi di record in eventi separati, utilizzare l'operatore APPLY insieme alla funzione GetRecordProperties .

Usando i dati di esempio originali, è possibile usare la query seguente per estrarre proprietà in eventi diversi:

SELECT
    event.DeviceID,
    sensorReading.PropertyName,
    sensorReading.PropertyValue
FROM input as event
CROSS APPLY GetRecordProperties(event.SensorReadings) AS sensorReading

Il risultato è:

|DeviceID|SensorName|AlertMessage|
|-|-|-|
|12345|Temperature|80|
|12345|Humidity|70|
|12345|CustomSensor01|5|
|12345|CustomSensor02|99|
|12345|SensorMetadata|[object Object]|

Usando WITH, è possibile instradare tali eventi a destinazioni diverse:

WITH Stage0 AS
(
    SELECT
        event.DeviceID,
        sensorReading.PropertyName,
        sensorReading.PropertyValue
    FROM input as event
    CROSS APPLY GetRecordProperties(event.SensorReadings) AS sensorReading
)

SELECT DeviceID, PropertyValue AS Temperature INTO TemperatureOutput FROM Stage0 WHERE PropertyName = 'Temperature'
SELECT DeviceID, PropertyValue AS Humidity INTO HumidityOutput FROM Stage0 WHERE PropertyName = 'Humidity'

Analizzare il record JSON nei dati di riferimento SQL

Quando si usa Azure SQL Database come dati di riferimento nel processo, è possibile includere una colonna che contiene dati in formato JSON. L'esempio seguente illustra questo formato:

|DeviceID|Data|
|-|-|
|12345|{"key": "value1"}|
|54321|{"key": "value2"}|

È possibile analizzare il record JSON nella colonna Data scrivendo una semplice funzione javaScript definita dall'utente.

function parseJson(string) {
return JSON.parse(string);
}

Per accedere ai campi dei record JSON, creare un passaggio nella query di Analisi di flusso, come illustrato nell'esempio seguente.

WITH parseJson as
(
SELECT DeviceID, udf.parseJson(sqlRefInput.Data) as metadata,
FROM sqlRefInput
)

SELECT metadata.key
INTO output
FROM streamInput
JOIN parseJson 
ON streamInput.DeviceID = parseJson.DeviceID

Tipi di dati array

I tipi di dati matrice sono una raccolta ordinata di valori. Questa sezione descrive in dettaglio alcune operazioni tipiche sui valori di matrice. Questi esempi usano le funzioni GetArrayElement, GetArrayElements, GetArrayLength e l'operatore APPLY .

Ecco un esempio di evento. Sia CustomSensor03 che SensorMetadata sono di tipo matrice:

{
    "DeviceId" : "12345",
    "SensorReadings" :
    {
        "Temperature" : 80,
        "Humidity" : 70,
        "CustomSensor01" : 5,
        "CustomSensor02" : 99,
        "CustomSensor03": [12,-5,0]
     },
    "SensorMetadata":[
        {          
            "smKey":"Manufacturer",
            "smValue":"ABC"                
        },
        {
            "smKey":"Version",
            "smValue":"1.2.45"
        }
    ]
}

Usare un elemento specifico dell'array

Selezionare l'elemento matrice in corrispondenza di un indice specificato (selezionare il primo elemento della matrice):

SELECT
    GetArrayElement(SensorReadings.CustomSensor03, 0) AS firstElement
FROM input

Il risultato è:

|firstElement|
|-|
|12|

Selezionare la lunghezza della matrice

SELECT
    GetArrayLength(SensorReadings.CustomSensor03) AS arrayLength
FROM input

Il risultato è:

|arrayLength|
|-|
|3|

Convertire gli elementi di matrice in eventi distinti

Selezionare tutti gli elementi della matrice come singoli eventi. L'operatore APPLY insieme alla funzione predefinita GetArrayElements estrae tutti gli elementi della matrice come singoli eventi:

SELECT
    DeviceId,
	CustomSensor03Record.ArrayIndex,
	CustomSensor03Record.ArrayValue
FROM input
CROSS APPLY GetArrayElements(SensorReadings.CustomSensor03) AS CustomSensor03Record

Il risultato è:

|DeviceId|ArrayIndex|ArrayValue|
|-|-|-|
|12345|0|12|
|12345|1|-5|
|12345|2|0|
SELECT   
    i.DeviceId,	
    SensorMetadataRecords.ArrayValue.smKey as smKey,
    SensorMetadataRecords.ArrayValue.smValue as smValue
FROM input i
CROSS APPLY GetArrayElements(SensorMetadata) AS SensorMetadataRecords

Il risultato è:

|DeviceId|smKey|smValue|
|-|-|-|
|12345|Manufacturer|ABC|
|12345|Version|1.2.45|

Per visualizzare i campi estratti nelle colonne, eseguire il pivot del set di dati usando la sintassi WITH insieme all'operazione JOIN . Questo join richiede una condizione limite temporale che impedisce la duplicazione:

WITH DynamicCTE AS (
	SELECT   
		i.DeviceId,
		SensorMetadataRecords.ArrayValue.smKey as smKey,
		SensorMetadataRecords.ArrayValue.smValue as smValue
	FROM input i
	CROSS APPLY GetArrayElements(SensorMetadata) AS SensorMetadataRecords 
)

SELECT
	i.DeviceId,
	i.Location.*,
	V.smValue AS 'smVersion',
	M.smValue AS 'smManufacturer'
FROM input i
LEFT JOIN DynamicCTE V ON V.smKey = 'Version' and V.DeviceId = i.DeviceId AND DATEDIFF(minute,i,V) BETWEEN 0 AND 0 
LEFT JOIN DynamicCTE M ON M.smKey = 'Manufacturer' and M.DeviceId = i.DeviceId AND DATEDIFF(minute,i,M) BETWEEN 0 AND 0

Il risultato è:

|DeviceId|Lat|Long|smVersion|smManufacturer|
|-|-|-|-|-|
|12345|47|122|1.2.45|ABC|

Tipi di dati in Azure Stream Analytics