Partager via


Configurer un magasin d’état RocksDB sur Azure Databricks

RocksDB est le fournisseur de magasin d’état par défaut dans Databricks Runtime 17.3 et versions ultérieures. Pour les versions databricks Runtime inférieures à la version 17.3, vous pouvez activer la gestion de l’état basée sur RocksDB en définissant la configuration suivante dans SparkSession avant de démarrer la requête de diffusion en continu.

spark.conf.set(
  "spark.sql.streaming.stateStore.providerClass",
  "com.databricks.sql.streaming.state.RocksDBStateStoreProvider")

Vous pouvez activer RocksDB sur les pipelines déclaratifs Spark Lakeflow. Voir Optimisation de la configuration du pipeline pour le traitement d'état.

Activer le point de contrôle du journal des modifications

Dans Databricks Runtime 13.3 LTS et versions ultérieures, vous pouvez activer le changelog pour réduire la durée des points de contrôle et la latence de bout en bout pour les charges de travail de streaming structuré. Databricks recommande d’activer le point de contrôle du journal des modifications pour toutes les requêtes avec état Flux structuré. Le point de contrôle des journaux de modifications est activé par défaut dans Databricks Runtime 17.3 et versions ultérieures.

Traditionnellement, le State Store RocksDB capture des instantanés et télécharge les fichiers de données pendant les points de contrôle. Pour éviter ce coût, le journal des modifications n'écrit vers un stockage durable que les enregistrements modifiés depuis le dernier point de contrôle.

** Le point de vérification du changelog est désactivé par défaut sur les versions de Databricks Runtime inférieures à 17.3. Vous pouvez activer le point de contrôle du journal des modifications au niveau SparkSession à l’aide de la syntaxe suivante :

spark.conf.set(
  "spark.sql.streaming.stateStore.rocksdb.changelogCheckpointing.enabled", "true")

Vous pouvez activer le point de contrôle du journal des modifications sur un flux existant et conserver les informations d’état stockées dans le point de contrôle.

Important

Les requêtes qui ont activé le point de contrôle du journal des modifications ne peuvent être exécutées que sur Databricks Runtime 13.3 LTS et versions ultérieures. Vous pouvez désactiver les points de contrôle du journal des modifications pour revenir au comportement de point de contrôle hérité, mais vous devez continuer à exécuter ces requêtes sur Databricks Runtime 13.3 LTS ou version ultérieure. Vous devez redémarrer le travail pour que ces modifications se produisent.

Métriques de la base de données RocksDB

Chaque opérateur d'état collecte des métriques liées aux opérations de gestion d'état effectuées sur son instance RocksDB pour observer le stockage d'état et potentiellement aider à déboguer la lenteur des tâches.

Dans Databricks Runtime 16.4 LTS et versions ultérieures, les métriques d’une instance de magasin d’états spécifique sont étiquetées avec leur ID de partition et leur nom de magasin, ce qui garantit qu’elles restent distinctes. Toutes les autres métriques sont signalées comme somme agrégée pour chaque opérateur d’état sur toutes les tâches où l’opérateur d’état est en cours d’exécution.

Ces métriques font partie du mappage customMetrics dans les champs stateOperators de StreamingQueryProgress. Voici un exemple de StreamingQueryProgress sous forme de JSON (obtenu à l’aide de StreamingQueryProgress.json()).

{
  "id": "6774075e-8869-454b-ad51-513be86cfd43",
  "runId": "3d08104d-d1d4-4d1a-b21e-0b2e1fb871c5",
  "batchId": 7,
  "stateOperators": [
    {
      "numRowsTotal": 20000000,
      "numRowsUpdated": 20000000,
      "memoryUsedBytes": 31005397,
      "numRowsDroppedByWatermark": 0,
      "customMetrics": {
        "SnapshotLastUploaded.partition_0_default": 7,
        "SnapshotLastUploaded.partition_1_default": 7,
        "SnapshotLastUploaded.partition_2_default": 6,
        "SnapshotLastUploaded.partition_3_default": 6,
        "SnapshotLastUploaded.partition_4_default": -1,
        "rocksdbBytesCopied": 141037747,
        "rocksdbCommitCheckpointLatency": 2,
        "rocksdbCommitCompactLatency": 22061,
        "rocksdbCommitFileSyncLatencyMs": 1710,
        "rocksdbCommitFlushLatency": 19032,
        "rocksdbCommitPauseLatency": 0,
        "rocksdbCommitWriteBatchLatency": 56155,
        "rocksdbFilesCopied": 2,
        "rocksdbFilesReused": 0,
        "rocksdbGetCount": 40000000,
        "rocksdbGetLatency": 21834,
        "rocksdbPutCount": 1,
        "rocksdbPutLatency": 56155599000,
        "rocksdbReadBlockCacheHitCount": 1988,
        "rocksdbReadBlockCacheMissCount": 40341617,
        "rocksdbSstFileSize": 141037747,
        "rocksdbTotalBytesReadByCompaction": 336853375,
        "rocksdbTotalBytesReadByGet": 680000000,
        "rocksdbTotalBytesReadThroughIterator": 0,
        "rocksdbTotalBytesWrittenByCompaction": 141037747,
        "rocksdbTotalBytesWrittenByPut": 740000012,
        "rocksdbTotalCompactionLatencyMs": 21949695000,
        "rocksdbWriterStallLatencyMs": 0,
        "rocksdbZipFileBytesUncompressed": 7038
      }
    }
  ],
  "sources": [{}],
  "sink": {}
}

Les descriptions détaillées des métriques sont les suivantes :

Nom de métrique Descriptif
rocksdbCommitWriteBatchLatency Temps (en millisecondes) nécessaire pour appliquer les écritures à étapes dans la structure en mémoire (WriteBatch) au RocksDB natif.
rocksdbCommitFlushLatency Temps (en millisecondes) pris pour vider les modifications en mémoire de RocksDB sur le disque local.
rocksdbCommitCompactLatency Temps (en millisecondes) pris pour le compactage (facultatif) au cours de la validation du point de contrôle.
rocksdbCommitPauseLatency Temps (en millisecondes) nécessaires à l'arrêt des threads de travail en arrière-plan (pour le compactage, etc.) dans le cadre du commit du point de contrôle.
rocksdbCommitCheckpointLatency Temps (en millisecondes) pris pour prendre un instantané de RocksDB natif et l’écrire dans un répertoire local.
rocksdbCommitFileSyncLatencyMs Temps (en millisecondes) pris pour synchroniser les fichiers associés à l’instantané RocksDB natif sur un stockage externe (emplacement du point de contrôle).
rocksdbGetLatency Temps moyen (en nanosecondes) requis par l'appel natif RocksDB::Get sous-jacent.
rocksdbPutCount Temps moyen (en nanosecondes) pris par l’appel RocksDB::Put natif sous-jacent.
rocksdbGetCount Nombre d'appels natifs RocksDB::Get (n'inclut pas Gets de WriteBatch - lot en mémoire utilisé pour les écritures intermédiaires).
rocksdbPutCount Nombre d’appels natifs RocksDB::Put (n’inclut pas Puts WriteBatch - batch en mémoire utilisé pour les écritures intermédiaires).
rocksdbTotalBytesReadByGet (Nombre total d'octets lus par Get dans RocksDB) Nombre d’octets non compressés lus via des appels RocksDB::Get natifs.
rocksdbTotalBytesWrittenByPut Nombre d’octets non compressés écrits via des appels RocksDB::Put natifs.
rocksdbReadBlockCacheHitCount Nombre de fois que le cache de bloc RocksDB natif est utilisé pour éviter une lecture des données à partir du disque local.
rocksdbReadBlockCacheMissCount Nombre de fois que le cache de bloc RocksDB natif a manqué et requis une lecture de données à partir du disque local.
rocksdbTotalBytesReadByCompaction (total des octets lus par la compaction dans rocksdb) Nombre d’octets lus à partir du disque local par le processus de compactage RocksDB natif.
rocksdbTotalBytesWrittenByCompaction Nombre d’octets écrits sur le disque local par le processus de compactage RocksDB natif.
rocksdbTotalCompactionLatencyMs Temps (en millisecondes) pris pour les compactages RocksDB (compactage en arrière-plan et compactage facultatif initié lors de la validation).
rocksdbWriterStallLatencyMs Temps (en millisecondes) pendant lequel l’enregistreur est resté bloqué en raison du compactage en arrière-plan ou du vidage des memtables sur le disque.
rocksdbTotalOctetsLusParIterateur Certaines opérations avec état conservé (telles que le traitement du délai d’expiration dans flatMapGroupsWithState ou l'ajout de repères temporels dans les agrégations fenêtrées) nécessitent la lecture de toutes les données dans la base de données via un itérateur. Taille totale des données décompressées lues à l’aide de l’itérateur.