Partager via


Mode temps réel dans Structured Streaming

Important

Cette fonctionnalité est disponible en préversion publique.

Le mode en temps réel est un type de déclencheur pour Structured Streaming qui permet un traitement de données ultra-faible latence avec une latence de bout en bout aussi faible que cinq millisecondes. Utilisez le mode en temps réel pour les charges de travail opérationnelles qui nécessitent une réponse immédiate aux données de streaming, telles que la détection des fraudes, la personnalisation en temps réel et les systèmes décisionnels instantanés.

Le mode en temps réel est disponible dans Databricks Runtime 16.4 LTS et versions ultérieures. Pour obtenir des instructions de configuration pas à pas, consultez Prise en main du mode en temps réel. Pour obtenir des exemples de code, consultez des exemples de mode temps réel.

Qu’est-ce que le mode en temps réel ?

Charges de travail opérationnelles et analytiques

Les charges de travail de streaming peuvent être largement divisées en charges de travail analytiques et charges de travail opérationnelles :

  • Les charges de travail analytiques utilisent l'ingestion et la transformation des données, en suivant généralement l'architecture en médaillon (par exemple, l'ingestion des données dans les tables bronze, argent et or).
  • Les charges de travail opérationnelles consomment des données en temps réel, appliquent la logique métier et déclenchent des actions ou des décisions en aval.

Voici quelques exemples de charges de travail opérationnelles :

  • Blocage ou indicateur d’une transaction de carte de crédit en temps réel si un score de fraude dépasse un seuil, en fonction de facteurs tels que l’emplacement inhabituel, la grande taille des transactions ou les modèles de dépenses rapides.
  • Envoyer un message promotionnel lorsque les données de parcours indiquent qu’un utilisateur regarde des jeans depuis cinq minutes, en offrant une remise de 25% s'il achète dans les 15 prochaines minutes.

En général, les charges de travail opérationnelles sont caractérisées par la nécessité d’une latence de bout en bout sous-seconde. Cela peut être réalisé avec le mode en temps réel dans Apache Spark Structured Streaming.

Comment le mode en temps réel atteint une faible latence

Le mode en temps réel améliore l’architecture d’exécution en :

  • Exécution de lots de longue durée (valeur par défaut de cinq minutes), dans lesquels le système traite les données au fur et à mesure qu'elles deviennent disponibles dans la source.
  • Planification simultanée de toutes les étapes de la requête. Cela nécessite que le nombre d’emplacements de tâches disponibles soit égal ou supérieur au nombre de tâches de toutes les étapes d’un lot.
  • Passage de données entre les étapes dès qu’elles sont produites à l’aide d’un shuffle en streaming.

À la fin du traitement d’un lot et avant le démarrage du lot suivant, les points de contrôle Structured Streaming progressent et publient des métriques. La durée du lot affecte la fréquence de point de contrôle :

  • Lots de traitement plus longs : points de contrôle moins fréquents, ce qui signifie des reprises plus longues en cas d’échec et une disponibilité différée des métriques.
  • Lots plus courts : points de contrôle plus fréquents, ce qui peut affecter la latence.

Databricks recommande d’évaluer le mode en temps réel par rapport à votre charge de travail cible pour trouver l’intervalle de déclencheur approprié.

Quand utiliser le mode en temps réel

Choisissez le mode en temps réel lorsque votre cas d’usage nécessite :

  • Latence inférieure à la seconde : les applications qui doivent répondre aux données en millisecondes, telles que les systèmes de détection des fraudes qui doivent bloquer les transactions en temps réel.
  • Prise de décision opérationnelle : systèmes qui déclenchent des actions immédiates basées sur des données entrantes, telles que des offres, des alertes ou des notifications en temps réel.
  • Traitement continu : charges de travail où les données doivent être traitées dès qu’elles arrivent, plutôt que dans des lots périodiques.

Utilisez le mode micro-batch (déclencheur Structured Streaming par défaut) quand :

  • Traitement analytique : pipelines ETL, transformations de données et implémentations d’architecture de médaillon où les exigences de latence sont mesurées en secondes ou en minutes.
  • Optimisation des coûts : charges de travail où une latence inférieure à la seconde n’est pas nécessaire, car le mode en temps réel nécessite des ressources de calcul dédiées.
  • La fréquence des points de contrôle est importante : les applications qui bénéficient de points de contrôle plus fréquents pour une récupération plus rapide.

Exigences et configuration

Le mode en temps réel présente des exigences spécifiques pour la configuration du calcul et la configuration des requêtes. Cette section décrit les prérequis et les étapes de configuration nécessaires pour utiliser le mode en temps réel.

Prerequisites

Pour utiliser le mode en temps réel, vous devez répondre aux exigences suivantes :

  • Databricks Runtime 16.4 LTS ou version ultérieure : le mode temps réel est disponible uniquement dans DBR 16.4 LTS et versions ultérieures.
  • Calcul dédié : vous devez utiliser un calcul dédié (anciennement mono-utilisateur). Les clusters standard (anciennement partagés), les pipelines déclaratifs de Spark Lakeflow et les clusters serverless ne sont pas pris en charge.
  • Aucune mise à l’échelle automatique : la mise à l’échelle automatique doit être désactivée.
  • Pas de Photon : l’accélération Photon n’est pas prise en charge avec le mode en temps réel.
  • Configuration Spark : vous devez définir spark.databricks.streaming.realTimeMode.enabled à true.

Configuration du calcul

Configurez votre calcul avec les paramètres suivants :

  • Défini spark.databricks.streaming.realTimeMode.enabled sur true dans la configuration Spark.
  • Désactivez la mise à l’échelle automatique.
  • Désactivez l’accélération photon.
  • Vérifiez que les ressources de calcul sont configurées en tant que cluster dédié (et non standard, pipelines déclaratifs Lakeflow Spark ou sans serveur).

Pour obtenir des instructions pas à pas sur la création et la configuration du calcul pour le mode en temps réel, consultez Prise en main du mode en temps réel.

Configuration des requêtes

Pour exécuter une requête en mode temps réel, vous devez activer le déclencheur en temps réel. Les déclencheurs en temps réel sont pris en charge uniquement en mode mise à jour.

Python

query = (
    spark.readStream
        .format("kafka")
        .option("kafka.bootstrap.servers", broker_address)
        .option("subscribe", input_topic)
        .load()
        .writeStream
        .format("kafka")
        .option("kafka.bootstrap.servers", broker_address)
        .option("topic", output_topic)
        .option("checkpointLocation", checkpoint_location)
        .outputMode("update")
        # In PySpark, the realTime trigger requires specifying the interval.
        .trigger(realTime="5 minutes")
        .start()
)

Scala

import org.apache.spark.sql.execution.streaming.RealTimeTrigger

val readStream = spark.readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", brokerAddress)
      .option("subscribe", inputTopic).load()
      .writeStream
      .format("kafka")
      .option("kafka.bootstrap.servers", brokerAddress)
      .option("topic", outputTopic)
      .option("checkpointLocation", checkpointLocation)
      .outputMode("update")
      .trigger(RealTimeTrigger.apply())
      // RealTimeTrigger can also accept an argument specifying the checkpoint interval.
      // For example, this code indicates a checkpoint interval of 5 minutes:
      // .trigger(RealTimeTrigger.apply("5 minutes"))
      .start()

Dimensionnement des ressources de calcul

Vous pouvez exécuter un travail en temps réel par ressource de calcul si le calcul a suffisamment d’emplacements de tâches.

Pour s’exécuter en mode faible latence, le nombre total d’emplacements de tâches disponibles doit être supérieur ou égal au nombre de tâches dans toutes les phases de requête.

Exemples de calcul d'emplacement

Type de pipeline Paramétrage Emplacements requis
Étape unique sans état (source Kafka + puits) maxPartitions = 8 8 emplacements
Avec état à deux phases (source Kafka + shuffle) maxPartitions = 8, partitions aléatoires = 20 28 emplacements (8 + 20)
Trois étapes (source Kafka + mélange + répartition) maxPartitions = 8, deux phases aléatoires de 20 chacune 48 emplacements (8 + 20 + 20)

Si vous ne définissez maxPartitionspas, utilisez le nombre de partitions dans la rubrique Kafka.

Considérations clés

Lorsque vous configurez votre calcul, tenez compte des éléments suivants :

  • Contrairement au mode micro-batch, les tâches en temps réel peuvent rester inactives lors de l’attente des données, de sorte que le dimensionnement approprié est essentiel pour éviter les ressources perdues.
  • Visez un niveau d’utilisation cible (par exemple, 50%) en paramétrant :
    • maxPartitions (pour Kafka)
    • spark.sql.shuffle.partitions (pour les phases de lecture aléatoire)
  • Databricks recommande de définir maxPartitions afin que chaque tâche gère plusieurs partitions Kafka pour réduire la surcharge.
  • Ajustez les emplacements de tâche par travailleur pour qu'ils correspondent à la charge de travail des tâches simples à une étape.
  • Pour les travaux à forte charge de lecture aléatoire, essayez de trouver le nombre minimal de partitions de lecture aléatoire qui évitent les retards, et ajustez-les en fonction. Le système de calcul ne planifiera pas la tâche s'il n'a pas suffisamment de capacités.

Note

À partir de Databricks Runtime 16.4 LTS et versions supérieures, tous les pipelines en temps réel utilisent le point de contrôle v2, ce qui permet un basculement transparent entre les modes en temps réel et micro-lots.

Techniques d’optimisation

Technique Activée par défaut
Suivi de progression asynchrone : transfère l'écriture dans le journal de décalage et le journal de validation dans un thread asynchrone, ce qui réduit le temps de traitement entre deux micro-lots. Cela peut aider à réduire la latence des requêtes stateless de streaming. No
Point de contrôle d’état asynchrone : permet de réduire la latence des requêtes de streaming avec état en commençant à traiter le micro-lot suivant dès que le calcul du micro-lot précédent se termine, sans attendre le point de contrôle d’état. No

Surveillance et observabilité

La mesure des performances des requêtes est essentielle pour les charges de travail en temps réel. En mode temps réel, les métriques de durée de traitement par lots traditionnelles ne reflètent pas la latence réelle. Vous avez donc besoin d’autres approches.

La latence de bout en bout est spécifique à la charge de travail et peut parfois être mesurée avec précision avec la logique métier. Par exemple, si l’horodatage de la source est émis dans Kafka, vous pouvez calculer la latence comme la différence entre l’horodatage de sortie de Kafka et l’horodatage de la source.

Vous pouvez également estimer la latence de bout en bout à l’aide des métriques et API intégrées décrites ci-dessous.

Métriques intégrées avec StreamingQueryProgress

Les métriques suivantes sont incluses dans l’événement StreamingQueryProgress , qui est automatiquement journalisée dans les journaux du pilote. Vous pouvez également y accéder par le biais de la fonction de rappel de StreamingQueryListeneronQueryProgress(). QueryProgressEvent.json() ou toString() inclure des métriques en mode temps réel supplémentaires.

  1. Latence de traitement (processingLatencyMs). Temps écoulé entre le moment où la requête en mode temps réel lit un enregistrement et lorsque la requête l’écrit à la phase suivante ou en aval. Pour les requêtes à phase unique, cela mesure la même durée que la latence E2E. Le système signale cette métrique par tâche.
  2. Latence de mise en file d’attente source (sourceQueuingLatencyMs). Durée écoulée entre le moment où le système écrit un enregistrement dans un bus de messages, par exemple, le temps d’ajout du journal dans Kafka et lorsque la requête en mode temps réel lit d’abord l’enregistrement. Le système signale cette métrique par tâche.
  3. Latence E2E (e2eLatencyMs). Temps entre le moment où le système écrit l’enregistrement dans un bus de messages et lorsque la requête en mode temps réel écrit l’enregistrement en aval. Le système agrège cette métrique par lot sur tous les enregistrements traités par toutes les tâches.

Par exemple:

"rtmMetrics" : {
    "processingLatencyMs" : {
      "P0" : 0,
      "P50" : 0,
      "P90" : 0,
      "P95" : 0,
      "P99" : 0
    },
    "sourceQueuingLatencyMs" : {
      "P0" : 0,
      "P50" : 1,
      "P90" : 1,
      "P95" : 2,
      "P99" : 3
    },
    "e2eLatencyMs" : {
      "P0" : 0,
      "P50" : 1,
      "P90" : 1,
      "P95" : 2,
      "P99" : 4
    },

Mesure de latence personnalisée avec l’API Observer

L’API Observer permet de mesurer la latence sans lancer un autre travail. Si vous disposez d’un horodatage source qui correspond approximativement à l’heure d’arrivée des données sources, vous pouvez estimer la latence de chaque lot à l’aide de l’API Observer. Passez l’horodatage avant d’atteindre le puits de données :

Python

from datetime import datetime

from pyspark.sql.functions import avg, col, lit, max, percentile_approx, udf, unix_millis
from pyspark.sql.types import TimestampType

@udf(returnType=TimestampType())
def current_timestamp():
  return datetime.now()

# Query before outputting
.withColumn("temp-timestamp", current_timestamp())
.withColumn(
  "latency",
  unix_millis(col("temp-timestamp")).cast("long") - unix_millis(col("timestamp")).cast("long"))
.observe(
  "observedLatency",
  avg(col("latency")).alias("avg"),
  max(col("latency")).alias("max"),
  percentile_approx(col("latency"), lit(0.99), lit(150)).alias("p99"),
  percentile_approx(col("latency"), lit(0.5), lit(150)).alias("p50"))
.drop(col("latency"))
.drop(col("temp-timestamp"))
# Output part of the query. For example, .WriteStream, etc.

Scala

import org.apache.spark.sql.functions.{avg, col, lit, max, percentile_approx, udf, unix_millis}

val currentTimestampUDF = udf(() => System.currentTimeMillis())

// Query before outputting
.withColumn("temp-timestamp", currentTimestampUDF())
.withColumn(
  "latency",
  col("temp-timestamp").cast("long") - unix_millis(col("timestamp")).cast("long"))
.observe(
  name = "observedLatency",
  avg(col("latency")).as("avg"),
  max(col("latency")).as("max"),
  percentile_approx(col("latency"), lit(0.99), lit(150)).as("p99"),
  percentile_approx(col("latency"), lit(0.5), lit(150)).as("p50"))
.drop(col("latency"))
.drop(col("temp-timestamp"))
// Output part of the query. For example, .WriteStream, etc.

Dans cet exemple, un horodatage actuel est enregistré avant de sortir l’entrée, et la latence est estimée en calculant la différence entre cet horodatage et l’horodatage source de l’enregistrement. Les résultats sont inclus dans les rapports d’avancement et mis à la disposition des écouteurs. Voici un exemple de sortie :

"observedMetrics" : {
  "observedLatency" : {
    "avg" : 63.8369765176552,
    "max" : 219,
    "p99" : 154,
    "p50" : 49
  }
}

Prise en charge et limitations des fonctionnalités

Cette section décrit les fonctionnalités prises en charge et les limitations actuelles du mode en temps réel, notamment les environnements compatibles, les langages, les sources, les récepteurs, les opérateurs et les considérations spéciales relatives aux fonctionnalités spécifiques.

Environnements, langues et modes pris en charge

Type de capacité de calcul Supported
Dédié (anciennement : utilisateur unique) Yes
Standard (anciennement : partagé) No
Pipelines déclaratifs Spark Lakeflow Classic No
Pipelines déclaratifs Spark Lakeflow sans serveur No
Serverless No

Langues prises en charge :

Language Supported
Scala Yes
Java Yes
Python Yes

Modes d’exécution pris en charge :

Mode d’exécution Supported
Mode de mise à jour Yes
Append mode No
Mode complet No

Sources et puits pris en charge

Sources:

Sources Supported
Apache Kafka Yes
AWS MSK Yes
Event Hubs (à l’aide du connecteur Kafka) Yes
Kinesis Oui (mode EFO uniquement)
Google Pub/Sub No
Apache Pulsar No

Éviers:

Sinks Supported
Apache Kafka Yes
Event Hubs (à l’aide du connecteur Kafka) Yes
Kinesis No
Google Pub/Sub No
Apache Pulsar No
Récepteurs arbitraires (à l’aide de forEachWriter) Yes

Opérateurs pris en charge

Operators Supported
Opérations sans état
Selection Yes
Projection Yes
Fonctions définies par l’utilisateur
Scala UDF Oui (avec certaines limitations)
Python UDF Oui (avec certaines limitations)
Agrégation
sum Yes
count Yes
max Yes
min Yes
avg Yes
Fonctions d’agrégation Yes
Fenêtrage
Tumbling Yes
Sliding Yes
Session No
Déduplication
dropDuplicates Oui (l’état n’est pas lié)
dropDuplicatesWithinWatermark No
Stream - Jointure de table
Table de diffusion (doit être petite) Yes
Jointure flux-flux No
(plat)MapGroupsWithState No
transformWithState Oui (avec certaines différences)
union Oui (avec certaines limitations)
forEach Yes
forEachBatch No
mapPartitions Non (voir limitation)

Considérations spéciales

Certains opérateurs et fonctionnalités ont des considérations ou des différences spécifiques lorsqu’ils sont utilisés en mode temps réel.

transformWithState en mode temps réel

Pour la création d’applications personnalisées avec état, Databricks prend en charge transformWithState, une API dans Apache Spark Structured Streaming. Consultez Créer une application avec état personnalisé pour plus d’informations sur l’API et les extraits de code.

Toutefois, il existe des différences entre le comportement de l’API en mode réel et les requêtes de diffusion en continu traditionnelles qui tirent parti de l’architecture de micro-lots.

  • Le mode en temps réel appelle la handleInputRows(key: String, inputRows: Iterator[T], timerValues: TimerValues) méthode pour chaque ligne.
    • L’itérateur inputRows retourne une valeur unique. Le mode micro-lot l'appelle une fois pour chaque clé, et l'itérateur inputRows renvoie toutes les valeurs d'une clé dans le micro-lot.
    • Vous devez connaître cette différence lors de l’écriture de votre code.
  • Les temporisateurs d’événements ne sont pas pris en charge en mode temps réel.
  • En mode temps réel, les minuteurs sont retardés en fonction de l’arrivée des données :
    • Si un minuteur est planifié pour 10:00:00, mais qu’aucune donnée n’arrive, le minuteur ne se déclenche pas immédiatement.
    • Si les données arrivent à 10:00:10, le minuteur se déclenche avec un délai de 10 secondes.
    • Si aucune donnée n’arrive et que le traitement de longue durée se termine, le minuteur se déclenche avant l’arrêt du traitement.

Fonctions définies par l’utilisateur Python en mode temps réel

Databricks prend en charge la majorité des fonctions utilisateur définies en Python en mode temps réel.

Type UDF Supported
UDF sans état
UDF scalaire Python (lien) Yes
Fonctions UDF scalaires de flèche Yes
UDF scalaire Pandas (lien) Yes
Fonction fléchée (mapInArrow) Yes
Fonction de pandas (lien) Yes
Regroupement avec état UDF (UDAF)
transformWithState (interface uniquement Row ) Yes
applyInPandasWithState No
UDF sans état (UDAF)
apply No
applyInArrow No
applyInPandas No
Table, fonction
UDTF (lien) No
UC UDF No

Il existe plusieurs points à prendre en compte lors de l'utilisation des UDFs Python en mode temps réel :

  • Pour réduire la latence, réglez la taille du lot Arrow (spark.sql.execution.arrow.maxRecordsPerBatch) à 1.
    • Compromis : cette configuration optimise la latence au détriment du débit. Pour la plupart des charges de travail, ce paramètre est recommandé.
    • Augmentez la taille du lot uniquement si un débit plus élevé est nécessaire pour prendre en charge le volume d’entrée, en acceptant l’augmentation potentielle de la latence.
  • Les fonctions pandas et UDF ne fonctionnent pas correctement avec une taille de lot Arrow de 1.
    • Si vous utilisez des fonctions UDFs ou pandas, définissez la taille du lot Arrow sur une valeur supérieure (par exemple, 100 ou plus).
    • Notez que cela implique une latence plus élevée. Databricks recommande d’utiliser les UDF Arrow ou une autre fonction si possible.
  • En raison du problème de performances avec pandas, transformWithState est uniquement pris en charge avec l’interface Row .

Limitations

Limitations de la source

Pour Kinesis, le mode en temps réel ne prend pas en charge le mode d’interrogation. De plus, les repartitions fréquentes peuvent avoir un impact négatif sur la latence.

Limitations de l’union

L’opérateur Union présente certaines limitations :

  • Le mode en temps réel ne prend pas en charge l’auto-union :
    • Kafka : Vous ne pouvez pas utiliser le même objet de trame de données source et combiner des trames de données dérivées à partir de celui-ci par union. Solution de contournement : utilisez différents DataFrames qui lisent à partir de la même source.
    • Kinesis : vous ne pouvez pas faire l'union des blocs de données dérivés de la même source Kinesis avec la même configuration. Solution de contournement : Outre l’utilisation de différents DataFrames, vous pouvez attribuer une autre option « consumerName » à chaque DataFrame.
  • Le mode en temps réel ne prend pas en charge les opérateurs avec état (par exemple, aggregate, deduplicate, transformWithState) définis avant l’Union.
  • Le mode en temps réel ne prend pas en charge l’union avec les sources de traitement par lots.

Limitation de MapPartitions

mapPartitions dans Scala et les API Python similaires (mapInPandas, mapInArrow) prennent un itérateur de la partition d’entrée entière et produisent un itérateur de la sortie entière avec un mappage arbitraire entre l’entrée et la sortie. Ces API peuvent entraîner des problèmes de performances en streaming Real-Time Mode en bloquant l’intégralité de la sortie, ce qui augmente la latence. La sémantique de ces API ne prend pas bien en charge la propagation des filigranes.

Utilisez des fonctions scalaires définies par l'utilisateur combinées avec la transformation des types de données complexes ou filter pour atteindre des fonctionnalités similaires.

Étapes suivantes

Maintenant que vous comprenez quel est le mode en temps réel et comment le configurer, explorez ces ressources pour commencer à implémenter des applications de streaming en temps réel :