Remarque
L’accès à cette page nécessite une autorisation. Vous pouvez essayer de vous connecter ou de modifier des répertoires.
L’accès à cette page nécessite une autorisation. Vous pouvez essayer de modifier des répertoires.
Cet article explique comment utiliser Apache Kafka en tant que source ou récepteur lors de l’exécution de charges de travail Structured Streaming sur Azure Databricks.
Pour plus d’informations sur Kafka, consultez la documentation Apache Kafka.
Lire les données de Kafka
Azure Databricks fournit le kafka mot clé en tant que format de données pour configurer les connexions à Kafka. Voici un exemple de lecture en continu :
Python
df = (spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "<server:ip>")
.option("subscribe", "<topic>")
.option("startingOffsets", "latest")
.load()
)
Scala
val df = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "<server:ip>")
.option("subscribe", "<topic>")
.option("startingOffsets", "latest")
.load()
SQL
CREATE OR REFRESH STREAMING TABLE <table_name> AS
SELECT * FROM STREAM read_kafka(
bootstrapServers => '<server:ip>',
subscribe => '<topic>'
);
Azure Databricks prend également en charge la sémantique de lecture par lots, comme illustré dans l’exemple suivant :
Python
df = (spark.read
.format("kafka")
.option("kafka.bootstrap.servers", "<server:ip>")
.option("subscribe", "<topic>")
.option("startingOffsets", "earliest")
.option("endingOffsets", "latest")
.load()
)
Scala
val df = spark.read
.format("kafka")
.option("kafka.bootstrap.servers", "<server:ip>")
.option("subscribe", "<topic>")
.option("startingOffsets", "earliest")
.option("endingOffsets", "latest")
.load()
SQL
SELECT * FROM read_kafka(
bootstrapServers => '<server:ip>',
subscribe => '<topic>',
startingOffsets => 'earliest',
endingOffsets => 'latest'
);
Pour le chargement par lots incrémentiel, Databricks recommande d’utiliser Kafka avec Trigger.AvailableNow. Voir AvailableNow: Traitement par lots incrémentiel.
Dans Databricks Runtime 13.3 LTS et versions ultérieures, Azure Databricks fournit également une fonction SQL pour lire les données Kafka. La diffusion en continu avec SQL est prise en charge uniquement dans les pipelines déclaratifs Spark Lakeflow ou avec des tables de diffusion en continu dans Databricks SQL. Consultez read_kafkaTVF.
Configurer le lecteur Kafka de flux structuré
L’option suivante doit être définie pour la source Kafka pour les requêtes de traitement par lots et de diffusion en continu :
| Choix | Valeur | Description |
|---|---|---|
kafka.bootstrap.servers |
Liste séparée par des virgules de host :port | Serveurs de démarrage de cluster Kafka |
En outre, l’une des options suivantes est nécessaire pour spécifier les rubriques auxquelles s’abonner :
| Choix | Valeur | Description |
|---|---|---|
subscribe |
Liste séparée par des virgules des rubriques. | Liste de rubriques auxquelles s’abonner. |
subscribePattern |
Chaîne regex Java. | Modèle utilisé pour s’abonner à une ou plusieurs rubriques. |
assign |
Chaîne JSON {"topicA":[0,1],"topic":[2,4]}. |
TopicPartitions spécifique à consommer. |
Consultez la page Options pour obtenir la liste complète des options disponibles.
Schéma pour les enregistrements Kafka
Les enregistrements retournés par le lecteur Kafka Structured Streaming auront le schéma suivant :
| Colonne | Type |
|---|---|
key |
binary |
value |
binary |
topic |
string |
partition |
int |
offset |
long |
timestamp |
long |
timestampType |
int |
Les key et value sont toujours désérialisées en tant que tableaux d’octets avec le ByteArrayDeserializer. Utilisez des opérations dataFrame (telles que cast("string") ou from_avro) pour désérialiser explicitement les clés et les valeurs.
Écrire des données dans Kafka
Voici un exemple ci-après pour une écriture en streaming dans Kafka :
Python
(df.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", "<server:ip>")
.option("topic", "<topic>")
.start()
)
Scala
df.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", "<server:ip>")
.option("topic", "<topic>")
.start()
Azure Databricks prend également en charge la sémantique d’écriture dans des récepteurs de données Kafka, comme illustré dans l’exemple suivant :
Python
(df.write
.format("kafka")
.option("kafka.bootstrap.servers", "<server:ip>")
.option("topic", "<topic>")
.save()
)
Scala
df.write
.format("kafka")
.option("kafka.bootstrap.servers", "<server:ip>")
.option("topic", "<topic>")
.save()
Configurer la fonction d’écriture Kafka de flux structuré
Important
Databricks Runtime 13.3 LTS et versions ultérieures incluent une version plus récente de la bibliothèque kafka-clients qui active par défaut les écritures idempotentes. Si un récepteur Kafka utilise la version 2.8.0 ou antérieure avec des listes de contrôle d’accès configurées, mais sans activation de IDEMPOTENT_WRITE, l’écriture échoue avec le message d’erreur org.apache.kafka.common.KafkaException:Cannot execute transactional method because we are in an error state.
Pour résoudre cette erreur, mettez à jour vers Kafka version 2.8.0 ou supérieure, ou définissez l’option .option(“kafka.enable.idempotence”, “false”) pendant la configuration de la fonction d’écriture de flux structurés.
Voici les options courantes définies lors de l’écriture dans Kafka :
| Choix | Valeur | Valeur par défaut | Description |
|---|---|---|---|
kafka.boostrap.servers |
Une liste des machines virtuelles séparée par des virgules de <host:port> |
aucune | [Obligatoire] Configuration de bootstrap.servers Kafka. |
topic |
STRING |
non défini | [Facultatif] Définit la rubrique pour toutes les lignes à écrire. Cette option remplace toute colonne de rubrique qui existe dans les données. |
includeHeaders |
BOOLEAN |
false |
[Facultatif] Indique s’il faut inclure les en-têtes Kafka dans la ligne. |
Consultez la page Options pour obtenir la liste complète des options disponibles.
Schéma pour le producteur Kafka
Lors de l’écriture de données dans Kafka, le DataFrame fourni peut inclure les champs suivants :
| Nom de colonne | Obligatoire ou facultatif | Type |
|---|---|---|
key |
optionnel |
STRING ou BINARY |
value |
obligatoire |
STRING ou BINARY |
headers |
optionnel | ARRAY |
topic |
facultatif (ignoré si topic est défini comme option d’enregistreur) |
STRING |
partition |
optionnel | INT |
Authentification
Azure Databricks prend en charge plusieurs méthodes d’authentification pour Kafka, notamment les informations d’identification du service Unity Catalog, SASL/SSL et les options spécifiques au cloud pour AWS MSK, Azure Event Hubs et Google Cloud Managed Kafka. Consultez Authentification.
Récupérer des métriques Kafka
Vous pouvez surveiller le retard d'une requête de diffusion en continu par rapport à Kafka en utilisant les métriques avgOffsetsBehindLatest, maxOffsetsBehindLatest, et minOffsetsBehindLatest. Ces rapports indiquent le décalage moyen, maximal et minimal entre toutes les partitions de rubriques abonnées, par rapport aux derniers décalages dans Kafka. Consultez Lecture des métriques de manière interactive.
Pour estimer la quantité de données que la requête n’a pas encore consommées, utilisez la estimatedTotalBytesBehindLatest métrique. Cette métrique estime le nombre total d’octets restants sur toutes les partitions abonnées en fonction des lots traités au cours des 300 dernières secondes. Vous pouvez modifier la fenêtre de temps utilisée pour cette estimation en définissant l’option bytesEstimateWindowLength . Par exemple, pour la définir sur 10 minutes :
Python
df = (spark.readStream
.format("kafka")
.option("bytesEstimateWindowLength", "10m") # m for minutes, you can also use "600s" for 600 seconds
)
Scala
val df = spark.readStream
.format("kafka")
.option("bytesEstimateWindowLength", "10m") // m for minutes, you can also use "600s" for 600 seconds
Si vous exécutez le flux dans un notebook, vous pouvez voir ces métriques sous l’onglet Données brutes du tableau de bord de progression des requêtes de diffusion en continu :
{
"sources": [
{
"description": "KafkaV2[Subscribe[topic]]",
"metrics": {
"avgOffsetsBehindLatest": "4.0",
"maxOffsetsBehindLatest": "4",
"minOffsetsBehindLatest": "4",
"estimatedTotalBytesBehindLatest": "80.0"
}
}
]
}
Pour plus d’informations , consultez Surveillance des requêtes de streaming structuré sur Azure Databricks .
Exemple de code : Kafka vers Delta
L’exemple suivant illustre un flux de travail complet pour diffuser en continu des données de Kafka vers une table Delta. Ce modèle est idéal pour les charges de travail d’ingestion de données en quasi-temps réel.
Cet exemple utilise un schéma JSON fixe. Pour d’autres formats comme Avro ou Protobuf, utilisez from_avro ou from_protobuf. Vous pouvez également intégrer un registre de schémas. Consultez l’exemple avec le Registre de schémas.
Python
from pyspark.sql.functions import from_json, col
# Define simple JSON schemas for key and value
key_schema = "user_id STRING"
value_schema = "event_type STRING, event_ts TIMESTAMP"
# Configure Kafka options with service credentials
kafka_options = {
"kafka.bootstrap.servers": "<bootstrap-server>:9092",
"subscribe": "<topic-name>",
"databricks.serviceCredential": "<service-credential-name>",
}
# Read from Kafka and parse JSON
parsed_df = (spark.readStream
.format("kafka")
.options(**kafka_options)
.load()
.select(
from_json(col("key").cast("string"), key_schema).alias("key"),
from_json(col("value").cast("string"), value_schema).alias("value")
)
.select("key.*", "value.*")
)
# Write to Delta table
query = (parsed_df.writeStream
.format("delta")
.option("checkpointLocation", "/path/to/checkpoint")
.trigger(processingTime="10 seconds")
.toTable("catalog.schema.events_table")
)
query.awaitTermination()
Scala
import org.apache.spark.sql.functions.{from_json, col}
import org.apache.spark.sql.streaming.Trigger
// Define JSON schemas for key and value
val keySchema = "user_id STRING"
val valueSchema = "event_type STRING, event_ts TIMESTAMP"
// Configure Kafka options with service credentials
val kafkaOptions = Map(
"kafka.bootstrap.servers" -> "<bootstrap-server>:9092",
"subscribe" -> "<topic-name>",
"databricks.serviceCredential" -> "<service-credential-name>"
)
// Read from Kafka and parse JSON
val parsedDF = spark.readStream
.format("kafka")
.options(kafkaOptions)
.load()
.select(
from_json(col("key").cast("string"), keySchema).alias("key"),
from_json(col("value").cast("string"), valueSchema).alias("value")
)
.select("key.*", "value.*")
// Write to Delta table
val query = parsedDF.writeStream
.format("delta")
.option("checkpointLocation", "/path/to/checkpoint")
.trigger(Trigger.ProcessingTime("10 seconds"))
.toTable("catalog.schema.events_table")
query.awaitTermination()
SQL
-- Create a streaming table from Kafka using read_kafka
CREATE OR REFRESH STREAMING TABLE catalog.schema.events_table AS
SELECT
key::string:user_id AS user_id,
value::string:event_type AS event_type,
to_timestamp(value::string:event_ts) AS event_ts
FROM STREAM read_kafka(
bootstrapServers => '<bootstrap-server>:9092',
subscribe => '<topic-name>',
serviceCredential => '<service-credential-name>'
);