Del via


Datastrømning ind i et søhus med Spark

Struktureret streaming er en skalerbar, fejltolerant strømbehandlingsmotor bygget på Spark. Den behandler en live datastrøm som en tabel, som nye rækker løbende tilføjes til. Structured Streaming understøtter indbyggede filkilder som CSV, JSON, ORC og Parquet samt beskedtjenester som Kafka og Azure Event Hubs.

Denne artikel omhandler opsætning af en streamingkilde som Azure Event Hubs, indlæsning af streamingdata i en Lakehouse Delta-tabel, optimering af skriveydelsen med partitionering og event batching samt at køre streamingjobs pålideligt i produktion.

Opsæt en streamingkilde

For at streame data ind i et søhus skal du først konfigurere en forbindelse til din streamingkilde. Azure Event Hubs er et almindeligt valg. Brug Azure Event Hubs Connector for Apache Spark til at forbinde din Spark-applikation til Azure Event Hubs.

En grundlæggende Event Hubs-konfiguration kræver Event Hubs-navnerumsnavnet, hubnavnet, navnet på delt adgangsnøgle og forbrugergruppen.

En forbrugergruppe er en visning af en hel hændelseshub. Forbrugergrupper gør det muligt for flere forbrugende applikationer hver at have et separat syn på hændelsesstrømmen og læse strømmen uafhængigt i deres eget tempo og med deres egne offsets.

Partitioner i Event Hubs gør det muligt at behandle store mængder begivenheder parallelt. En enkelt processor har en begrænset kapacitet til at håndtere hændelser per sekund, mens flere processorer kan arbejde parallelt på tværs af partitioner.

Hvis for mange partitioner bruges med lav indlæsningshastighed, håndterer partitionlæsere en lille del af dataene, hvilket forårsager ikke-optimal behandling. Det ideelle antal partitioner afhænger af den ønskede behandlingshastighed. Når du øger antallet af gennemstrømningsenheder i dit navnerum, kan du have brug for ekstra partitioner, så samtidige læsere kan opnå deres maksimale gennemstrømning.

Test det bedste antal partitioner til dit gennemstrømningsscenarie. Scenarier med høj gennemstrømning bruger ofte 32 eller flere partitioner.

Delta-tabel som streaming-sænk

Delta Lake er et open source-lagringslag, der leverer ACID-transaktioner (atomicitet, konsistens, isolation og holdbarhed) oven på datalake-lagring. I Fabric Data Engineering understøtter Delta Lake upserts, datakomprimering, tidsrejser, skemaudvikling og åbent format-lagring.

Med delta som outputformat i writeStream, strømmer data direkte ind i en Delta-tabel. Følgende eksempel læser fra Event Hubs, parser beskedens krop og skriver til en Delta-tabel:

import pyspark.sql.functions as f
from pyspark.sql.types import *

df = (
    spark.readStream
    .format("eventhubs")
    .options(**ehConf)
    .load()
)

Schema = StructType([
    StructField("<column_name_01>", StringType(), False),
    StructField("<column_name_02>", StringType(), False),
    StructField("<column_name_03>", DoubleType(), True),
    StructField("<column_name_04>", LongType(), True),
    StructField("<column_name_05>", LongType(), True),
])

rawData = (
    df
    .withColumn("bodyAsString", f.col("body").cast("string"))
    .select(f.from_json("bodyAsString", Schema).alias("events"))
    .select("events.*")
    .writeStream
    .format("delta")
    .option("checkpointLocation", "Files/checkpoint")
    .outputMode("append")
    .toTable("deltaeventstable")
)

I koden format("delta") sættes Delta som outputformat, outputMode("append") skriver kun nye rækker til tabellen og toTable("deltaeventstable") lagrer de streamede data i en administreret Delta-tabel.

Optimer streamingydelse

Når grundlæggende streaming-indlæsning virker, kan du forbedre gennemstrømning og filorganisering med optimeringsteknikkerne i de følgende afsnit.

Partitioneringsdata for skrivninger

For at optimere gennemstrømningen skal du opdele dine data effektivt. Partitionering forbedrer både skrivegennemstrømning og downstream forespørgselsydelse. Du kan partitionere data i hukommelsen, på disken eller begge dele.

På disk — Brug partitionBy() til at organisere data i undermapper baseret på kolonneværdier. Vælg kolonner med god kardinalitet, der producerer filer i optimal størrelse. Undgå kolonner, der skaber for mange små partitioner eller for få store.

I hukommelsen — Brug repartition() eller coalesce() distribuer data på tværs af arbejdernoder, før du skriver:

  • repartition() Øger eller mindsker partitioner med en fuld blanding, hvilket balancerer data jævnt.
  • coalesce() reducerer kun partitioner og minimerer dataflytning.

At kombinere begge tilgange fungerer godt i højkapacitetsscenarier. Følgende eksempel opdeler data i 48 partitioner i hukommelsen (der matcher tilgængelige CPU-kerner) og derefter partitioner på disk med to kolonner:

rawData = (
    df
    .withColumn("bodyAsString", f.col("body").cast("string"))
    .select(f.from_json("bodyAsString", Schema).alias("events"))
    .select("events.*")
    .repartition(48)
    .writeStream
    .format("delta")
    .option("checkpointLocation", "Files/checkpoint")
    .outputMode("append")
    .partitionBy("<column_name_01>", "<column_name_02>")
    .toTable("deltaeventstable")
)

Brug Optimeret Skriv

Som et alternativ til manuel partitionering sammenfletter eller splitter Optimized Write partitioner før skrivning, hvilket maksimerer diskgennemstrømningen uden manuel repartition() eller coalesce() kald. Aktiver det med en Spark-konfiguration:

spark.conf.set("spark.databricks.delta.optimizeWrite.enabled", True)

Med Optimized Write aktiveret kan du fjerne repartition() eller coalesce() fra din kode og lade Spark håndtere partitionsstørrelsen. Du kan stadig bruge til partitionBy() disk-niveau organisering.

Batch-events med triggere

For yderligere at optimere skriveydelsen, batch-events før skrivning til disk. Som standard behandler Spark hver mikrobatch, så snart den forrige er færdig. At sætte et triggerinterval akkumulerer data over en tidsperiode og skriver dem i færre, større operationer. Større batches producerer større Delta-filer og reducerer overhead for små filer.

Følgende eksempel behandler begivenheder med ét minuts intervaller:

rawData = (
    df
    .withColumn("bodyAsString", f.col("body").cast("string"))
    .select(f.from_json("bodyAsString", Schema).alias("events"))
    .select("events.*")
    .writeStream
    .format("delta")
    .option("checkpointLocation", "Files/checkpoint")
    .outputMode("append")
    .partitionBy("<column_name_01>", "<column_name_02>")
    .trigger(processingTime="1 minute")
    .toTable("deltaeventstable")
)

Analyser mængden af indkommende data og vælg et behandlingsinterval, der producerer veldimensionerede Parquet-filer i Delta-tabellen.

Kør streamingjobs i produktion

Spark-notebooks er et effektivt værktøj til udvikling og test af streaming-logik. Men for produktionsarbejdsbelastninger, der skal køre kontinuerligt, brug i stedet Spark-jobdefinitioner. Spark-jobdefinitioner er ikke-interaktive, kodeorienterede opgaver, der kører på en Spark-klynge og giver større robusthed og tilgængelighed.

Infrastrukturen, der kører et streamingjob, kan støde på problemer, der stopper jobbet, såsom hardwarefejl eller patching af infrastrukturen. En genprøvningspolitik genstarter automatisk jobbet, når det stopper uventet. Konfigurer genprøvningspolitikken på en Spark-jobdefinition til at angive, hvor mange gange jobbet skal genstartes (op til uendelige gentagelser) og tidsintervallet mellem gentagelser. Med en genprøv-politik aktiveret fortsætter dit streamingjob med at køre, indtil du eksplicit stopper det.

Fabric overvågningshubben inkluderer en fane Struktureret Streaming med målinger som Input Rate, Process Rate, Input Rows, Batch Varighed og Driftsvarighed.