Partager via


Se connecter à Apache Kafka

Cet article explique comment utiliser Apache Kafka en tant que source ou récepteur lors de l’exécution de charges de travail Structured Streaming sur Azure Databricks.

Pour plus d’informations sur Kafka, consultez la documentation Apache Kafka.

Lire les données de Kafka

Azure Databricks fournit le kafka mot clé en tant que format de données pour configurer les connexions à Kafka. Voici un exemple de lecture en continu :

Python

df = (spark.readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "<server:ip>")
  .option("subscribe", "<topic>")
  .option("startingOffsets", "latest")
  .load()
)

Scala

val df = spark.readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "<server:ip>")
  .option("subscribe", "<topic>")
  .option("startingOffsets", "latest")
  .load()

SQL

CREATE OR REFRESH STREAMING TABLE <table_name> AS
SELECT * FROM STREAM read_kafka(
  bootstrapServers => '<server:ip>',
  subscribe => '<topic>'
);

Azure Databricks prend également en charge la sémantique de lecture par lots, comme illustré dans l’exemple suivant :

Python

df = (spark.read
  .format("kafka")
  .option("kafka.bootstrap.servers", "<server:ip>")
  .option("subscribe", "<topic>")
  .option("startingOffsets", "earliest")
  .option("endingOffsets", "latest")
  .load()
)

Scala

val df = spark.read
  .format("kafka")
  .option("kafka.bootstrap.servers", "<server:ip>")
  .option("subscribe", "<topic>")
  .option("startingOffsets", "earliest")
  .option("endingOffsets", "latest")
  .load()

SQL

SELECT * FROM read_kafka(
  bootstrapServers => '<server:ip>',
  subscribe => '<topic>',
  startingOffsets => 'earliest',
  endingOffsets => 'latest'
);

Pour le chargement par lots incrémentiel, Databricks recommande d’utiliser Kafka avec Trigger.AvailableNow. Voir AvailableNow: Traitement par lots incrémentiel.

Dans Databricks Runtime 13.3 LTS et versions ultérieures, Azure Databricks fournit également une fonction SQL pour lire les données Kafka. La diffusion en continu avec SQL est prise en charge uniquement dans les pipelines déclaratifs Spark Lakeflow ou avec des tables de diffusion en continu dans Databricks SQL. Consultez read_kafkaTVF.

Configurer le lecteur Kafka de flux structuré

L’option suivante doit être définie pour la source Kafka pour les requêtes de traitement par lots et de diffusion en continu :

Choix Valeur Description
kafka.bootstrap.servers Liste séparée par des virgules de host :port Serveurs de démarrage de cluster Kafka

En outre, l’une des options suivantes est nécessaire pour spécifier les rubriques auxquelles s’abonner :

Choix Valeur Description
subscribe Liste séparée par des virgules des rubriques. Liste de rubriques auxquelles s’abonner.
subscribePattern Chaîne regex Java. Modèle utilisé pour s’abonner à une ou plusieurs rubriques.
assign Chaîne JSON {"topicA":[0,1],"topic":[2,4]}. TopicPartitions spécifique à consommer.

Consultez la page Options pour obtenir la liste complète des options disponibles.

Schéma pour les enregistrements Kafka

Les enregistrements retournés par le lecteur Kafka Structured Streaming auront le schéma suivant :

Colonne Type
key binary
value binary
topic string
partition int
offset long
timestamp long
timestampType int

Les key et value sont toujours désérialisées en tant que tableaux d’octets avec le ByteArrayDeserializer. Utilisez des opérations dataFrame (telles que cast("string") ou from_avro) pour désérialiser explicitement les clés et les valeurs.

Écrire des données dans Kafka

Voici un exemple ci-après pour une écriture en streaming dans Kafka :

Python

(df.writeStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "<server:ip>")
  .option("topic", "<topic>")
  .start()
)

Scala

df.writeStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "<server:ip>")
  .option("topic", "<topic>")
  .start()

Azure Databricks prend également en charge la sémantique d’écriture dans des récepteurs de données Kafka, comme illustré dans l’exemple suivant :

Python

(df.write
  .format("kafka")
  .option("kafka.bootstrap.servers", "<server:ip>")
  .option("topic", "<topic>")
  .save()
)

Scala

df.write
  .format("kafka")
  .option("kafka.bootstrap.servers", "<server:ip>")
  .option("topic", "<topic>")
  .save()

Configurer la fonction d’écriture Kafka de flux structuré

Important

Databricks Runtime 13.3 LTS et versions ultérieures incluent une version plus récente de la bibliothèque kafka-clients qui active par défaut les écritures idempotentes. Si un récepteur Kafka utilise la version 2.8.0 ou antérieure avec des listes de contrôle d’accès configurées, mais sans activation de IDEMPOTENT_WRITE, l’écriture échoue avec le message d’erreur org.apache.kafka.common.KafkaException:Cannot execute transactional method because we are in an error state.

Pour résoudre cette erreur, mettez à jour vers Kafka version 2.8.0 ou supérieure, ou définissez l’option .option(“kafka.enable.idempotence”, “false”) pendant la configuration de la fonction d’écriture de flux structurés.

Voici les options courantes définies lors de l’écriture dans Kafka :

Choix Valeur Valeur par défaut Description
kafka.boostrap.servers Une liste des machines virtuelles séparée par des virgules de <host:port> aucune [Obligatoire] Configuration de bootstrap.servers Kafka.
topic STRING non défini [Facultatif] Définit la rubrique pour toutes les lignes à écrire. Cette option remplace toute colonne de rubrique qui existe dans les données.
includeHeaders BOOLEAN false [Facultatif] Indique s’il faut inclure les en-têtes Kafka dans la ligne.

Consultez la page Options pour obtenir la liste complète des options disponibles.

Schéma pour le producteur Kafka

Lors de l’écriture de données dans Kafka, le DataFrame fourni peut inclure les champs suivants :

Nom de colonne Obligatoire ou facultatif Type
key optionnel STRING ou BINARY
value obligatoire STRING ou BINARY
headers optionnel ARRAY
topic facultatif (ignoré si topic est défini comme option d’enregistreur) STRING
partition optionnel INT

Authentification

Azure Databricks prend en charge plusieurs méthodes d’authentification pour Kafka, notamment les informations d’identification du service Unity Catalog, SASL/SSL et les options spécifiques au cloud pour AWS MSK, Azure Event Hubs et Google Cloud Managed Kafka. Consultez Authentification.

Récupérer des métriques Kafka

Vous pouvez surveiller le retard d'une requête de diffusion en continu par rapport à Kafka en utilisant les métriques avgOffsetsBehindLatest, maxOffsetsBehindLatest, et minOffsetsBehindLatest. Ces rapports indiquent le décalage moyen, maximal et minimal entre toutes les partitions de rubriques abonnées, par rapport aux derniers décalages dans Kafka. Consultez Lecture des métriques de manière interactive.

Pour estimer la quantité de données que la requête n’a pas encore consommées, utilisez la estimatedTotalBytesBehindLatest métrique. Cette métrique estime le nombre total d’octets restants sur toutes les partitions abonnées en fonction des lots traités au cours des 300 dernières secondes. Vous pouvez modifier la fenêtre de temps utilisée pour cette estimation en définissant l’option bytesEstimateWindowLength . Par exemple, pour la définir sur 10 minutes :

Python

df = (spark.readStream
  .format("kafka")
  .option("bytesEstimateWindowLength", "10m") # m for minutes, you can also use "600s" for 600 seconds
)

Scala

val df = spark.readStream
  .format("kafka")
  .option("bytesEstimateWindowLength", "10m") // m for minutes, you can also use "600s" for 600 seconds

Si vous exécutez le flux dans un notebook, vous pouvez voir ces métriques sous l’onglet Données brutes du tableau de bord de progression des requêtes de diffusion en continu :

{
  "sources": [
    {
      "description": "KafkaV2[Subscribe[topic]]",
      "metrics": {
        "avgOffsetsBehindLatest": "4.0",
        "maxOffsetsBehindLatest": "4",
        "minOffsetsBehindLatest": "4",
        "estimatedTotalBytesBehindLatest": "80.0"
      }
    }
  ]
}

Pour plus d’informations , consultez Surveillance des requêtes de streaming structuré sur Azure Databricks .

Exemple de code : Kafka vers Delta

L’exemple suivant illustre un flux de travail complet pour diffuser en continu des données de Kafka vers une table Delta. Ce modèle est idéal pour les charges de travail d’ingestion de données en quasi-temps réel.

Cet exemple utilise un schéma JSON fixe. Pour d’autres formats comme Avro ou Protobuf, utilisez from_avro ou from_protobuf. Vous pouvez également intégrer un registre de schémas. Consultez l’exemple avec le Registre de schémas.

Python

from pyspark.sql.functions import from_json, col

# Define simple JSON schemas for key and value
key_schema = "user_id STRING"
value_schema = "event_type STRING, event_ts TIMESTAMP"

# Configure Kafka options with service credentials
kafka_options = {
  "kafka.bootstrap.servers": "<bootstrap-server>:9092",
  "subscribe": "<topic-name>",
  "databricks.serviceCredential": "<service-credential-name>",
}

# Read from Kafka and parse JSON
parsed_df = (spark.readStream
  .format("kafka")
  .options(**kafka_options)
  .load()
  .select(
    from_json(col("key").cast("string"), key_schema).alias("key"),
    from_json(col("value").cast("string"), value_schema).alias("value")
  )
  .select("key.*", "value.*")
)

# Write to Delta table
query = (parsed_df.writeStream
  .format("delta")
  .option("checkpointLocation", "/path/to/checkpoint")
  .trigger(processingTime="10 seconds")
  .toTable("catalog.schema.events_table")
)

query.awaitTermination()

Scala

import org.apache.spark.sql.functions.{from_json, col}
import org.apache.spark.sql.streaming.Trigger

// Define JSON schemas for key and value
val keySchema = "user_id STRING"
val valueSchema = "event_type STRING, event_ts TIMESTAMP"

// Configure Kafka options with service credentials
val kafkaOptions = Map(
  "kafka.bootstrap.servers" -> "<bootstrap-server>:9092",
  "subscribe" -> "<topic-name>",
  "databricks.serviceCredential" -> "<service-credential-name>"
)

// Read from Kafka and parse JSON
val parsedDF = spark.readStream
  .format("kafka")
  .options(kafkaOptions)
  .load()
  .select(
    from_json(col("key").cast("string"), keySchema).alias("key"),
    from_json(col("value").cast("string"), valueSchema).alias("value")
  )
  .select("key.*", "value.*")

// Write to Delta table
val query = parsedDF.writeStream
  .format("delta")
  .option("checkpointLocation", "/path/to/checkpoint")
  .trigger(Trigger.ProcessingTime("10 seconds"))
  .toTable("catalog.schema.events_table")

query.awaitTermination()

SQL

-- Create a streaming table from Kafka using read_kafka
CREATE OR REFRESH STREAMING TABLE catalog.schema.events_table AS
SELECT
  key::string:user_id AS user_id,
  value::string:event_type AS event_type,
  to_timestamp(value::string:event_ts) AS event_ts
FROM STREAM read_kafka(
  bootstrapServers => '<bootstrap-server>:9092',
  subscribe => '<topic-name>',
  serviceCredential => '<service-credential-name>'
);