Dela via


Realtidsläge i strukturerad direktuppspelning

Important

Den här funktionen finns som allmänt tillgänglig förhandsversion.

Realtidsläge är en utlösartyp för strukturerad direktuppspelning som möjliggör databearbetning med extremt låg svarstid med svarstid från slutpunkt till slutpunkt så låg som fem millisekunder. Använd realtidsläge för driftarbetsbelastningar som kräver omedelbara svar på strömmande data, till exempel bedrägeriidentifiering, realtidsanpassning och omedelbara beslutssystem.

Realtidsläge är tillgängligt i Databricks Runtime 16.4 LTS och senare. Stegvisa installationsinstruktioner finns i Komma igång med realtidsläge. Kodexempel finns i Exempel på realtidsläge.

Vad är realtidsläge?

Operationella vs. analytiska arbetsbelastningar

Strömmande arbetsbelastningar kan i stort sett delas in i analytiska arbetsbelastningar och operativa arbetsbelastningar:

  • Analytiska arbetsbelastningar använder datainmatning och transformering, vanligtvis efter medaljongarkitekturen (till exempel att mata in data i tabellerna brons, silver och guld).
  • Driftarbetsbelastningar förbrukar realtidsdata, tillämpar affärslogik och utlöser underordnade åtgärder eller beslut.

Några exempel på operativa arbetsbelastningar är:

  • Blockera eller flagga en kreditkortstransaktion i realtid om en bedrägeripoäng överskrider ett tröskelvärde, baserat på faktorer som ovanlig plats, stor transaktionsstorlek eller snabba utgiftsmönster.
  • Leverera ett kampanjmeddelande när clickstream-data visar att en användare har bläddrat efter jeans i fem minuter och erbjuder en rabatt på 25% om de köper under de kommande 15 minuterna.

I allmänhet kännetecknas operativa arbetsbelastningar av behovet av subsekund latens från slutpunkt till slutpunkt. Detta kan uppnås med realtidsläge i Apache Spark Structured Streaming.

Så här uppnår realtidsläget låg svarstid

Realtidsläget förbättrar körningsarkitekturen genom att:

  • Köra långvariga batchar (standardvärdet är fem minuter), där systemet bearbetar data när de blir tillgängliga i källan.
  • Schemalägger alla faser av frågan samtidigt. Detta kräver att antalet tillgängliga aktivitetsfack är lika med eller större än antalet aktiviteter för alla faser i en batch.
  • Skicka data mellan faser så snart de har producerats med hjälp av en streamingomkastning.

I slutet av bearbetningen av en batch, och innan nästa batch startar, gör Structured Streaming kontrollpunktsavstämningar och publicerar mätvärden. Batchvaraktigheten påverkar kontrollpunktsfrekvensen:

  • Längre batchar: Mindre frekventa kontrollpunkter, vilket innebär längre repriser vid fel och fördröjd måtttillgänglighet.
  • Kortare batchar: Vanligare kontrollpunkter, vilket kan påverka svarstiden.

Databricks rekommenderar att du jämför realtidsläget mot målarbetsbelastningen för att hitta rätt utlösarintervall.

När du ska använda realtidsläge

Välj realtidsläge när ditt användningsfall kräver:

  • Svarstid under sekund: Program som behöver svara på data inom millisekunder, till exempel system för identifiering av bedrägerier som måste blockera transaktioner i realtid.
  • Operativt beslutsfattande: System som utlöser omedelbara åtgärder baserat på inkommande data, till exempel erbjudanden i realtid, aviseringar eller meddelanden.
  • Kontinuerlig bearbetning: Arbetsbelastningar där data måste bearbetas så snart de kommer, i stället för i periodiska batchar.

Använd mikrobatchläge (standardutlösaren för strukturerad direktuppspelning) när:

  • Analysbearbetning: ETL-pipelines, datatransformeringar och arkitekturimplementeringar för medaljonger där svarstidskrav mäts i sekunder eller minuter.
  • Kostnadsoptimering: Arbetsbelastningar där svarstid under sekund inte krävs, eftersom realtidsläge kräver dedikerade beräkningsresurser.
  • Kontrollpunktsfrekvensen är viktig: Program som drar nytta av mer frekventa kontrollpunkter för snabbare återställning.

Krav och konfiguration

Realtidsläget har specifika krav för beräkningskonfiguration och frågekonfiguration. I det här avsnittet beskrivs de krav och konfigurationssteg som krävs för att använda realtidsläge.

Förutsättningar

Om du vill använda realtidsläge måste du uppfylla följande krav:

  • Databricks Runtime 16.4 LTS eller senare: Realtidsläge är endast tillgängligt i DBR 16.4 LTS och senare versioner.
  • Dedikerad beräkning: Du måste använda en dedikerad beräkning (tidigare en enskild användare). Standard (tidigare delat), Lakeflow Spark Deklarativa Pipelines och serverlösa kluster stöds inte.
  • Ingen autoskalning: Autoskalning måste inaktiveras.
  • Ingen foton: Fotoacceleration stöds inte i realtidsläge.
  • Spark-konfiguration: Du måste ange spark.databricks.streaming.realTimeMode.enabled till true.

Beräkningskonfiguration

Konfigurera din beräkning med följande inställningar:

  • Ange spark.databricks.streaming.realTimeMode.enabled till true i Spark-konfigurationen.
  • Inaktivera automatisk skalning.
  • Inaktivera fotoacceleration.
  • Se till att datorkapaciteten är konfigurerad som ett dedikerat kluster (inte standard, Lakeflow Spark Deklarativa Pipelines eller serverlös).

Stegvisa instruktioner för hur du skapar och konfigurerar beräkning för realtidsläge finns i Komma igång med realtidsläge.

Frågekonfiguration

Om du vill köra en fråga i realtidsläge måste du aktivera realtidsutlösaren. Realtidsutlösare stöds endast i uppdateringsläge.

Python

query = (
    spark.readStream
        .format("kafka")
        .option("kafka.bootstrap.servers", broker_address)
        .option("subscribe", input_topic)
        .load()
        .writeStream
        .format("kafka")
        .option("kafka.bootstrap.servers", broker_address)
        .option("topic", output_topic)
        .option("checkpointLocation", checkpoint_location)
        .outputMode("update")
        # In PySpark, the realTime trigger requires specifying the interval.
        .trigger(realTime="5 minutes")
        .start()
)

Scala

import org.apache.spark.sql.execution.streaming.RealTimeTrigger

val readStream = spark.readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", brokerAddress)
      .option("subscribe", inputTopic).load()
      .writeStream
      .format("kafka")
      .option("kafka.bootstrap.servers", brokerAddress)
      .option("topic", outputTopic)
      .option("checkpointLocation", checkpointLocation)
      .outputMode("update")
      .trigger(RealTimeTrigger.apply())
      // RealTimeTrigger can also accept an argument specifying the checkpoint interval.
      // For example, this code indicates a checkpoint interval of 5 minutes:
      // .trigger(RealTimeTrigger.apply("5 minutes"))
      .start()

Beräkningsstorlek

Du kan köra ett realtidsjobb per beräkningsresurs om resursen har tillräckligt med uppgiftsplatser.

Om du vill köra i läge med låg latens måste det totala antalet tillgängliga aktivitetsfack vara större än eller lika med antalet aktiviteter i alla frågesteg.

Exempel på platsberäkningar

Typ av pipeline Konfiguration Nödvändiga fack
Tillståndslös i en fas (Kafka-källa + mottagare) maxPartitions = 8 8 platser
Tillståndskänsligt i två steg (Kafka-källa + shuffle) maxPartitions = 8, shuffle partitioner = 20 28 platser (8 + 20)
Tresteg (Kafka-källa + shuffle + ompartition) maxPartitions = 8, två blandningssteg på 20 vardera 48 platser (8 + 20 + 20)

Om du inte anger maxPartitions, använd antalet partitioner i Kafka-ämnet.

Viktiga överväganden

Tänk på följande när du konfigurerar din beräkning:

  • Till skillnad från mikrobatchläge kan realtidsuppgifter vara inaktiva i väntan på data, så rätt storleksändring är viktigt för att undvika bortkastade resurser.
  • Sikta på en målanvändningsnivå (till exempel 50%) genom att justera:
    • maxPartitions (för Kafka)
    • spark.sql.shuffle.partitions (för shuffle-steg)
  • Databricks rekommenderar att du ställer in maxPartitions så att varje uppgift hanterar flera Kafka-partitioner för att minska kostnaderna.
  • Justera arbetsuppgifter per anställd för att matcha arbetsbelastningen för enkla, enetapsjobb.
  • För shuffle-heavy-jobb experimenterar du med att hitta det minsta antalet shuffle-partitioner som undviker kvarvarande uppgifter och justerar därifrån. Beräkningen schemalägger inte jobbet om det inte har tillräckligt med platser.

Note

Från Databricks Runtime 16.4 LTS och senare använder alla realtidspipelines kontrollpunkt v2, vilket möjliggör sömlös växling mellan realtids- och mikrobatchlägen.

Optimeringstekniker

Technique Aktiverad som standard
Asynkron förloppsspårning: Flyttar skrivning till offsetlogg och commit-logg till en asynkron tråd, vilket minskar tiden mellan två mikrobatchar. Detta kan bidra till att minska svarstiden för tillståndslösa strömningsfrågor. No
Asynkron kontrollpunkt för tillstånd: Hjälper till att minska svarstiden för tillståndskänsliga strömningsfrågor genom att börja bearbeta nästa mikrobatch så snart beräkningen av föregående mikrobatch slutförs, utan att vänta på tillståndskontroll. No

övervakning och observerbarhet

Det är viktigt att mäta frågeprestanda för arbetsbelastningar i realtid. I realtidsläge återspeglar traditionella mått för batchvaraktighet inte faktisk svarstid, så du behöver alternativa metoder.

Svarstiden från slutpunkt till slutpunkt är arbetsbelastningsspecifik och kan ibland bara mätas korrekt med affärslogik. Om källtidsstämpeln till exempel är utdata i Kafka kan du beräkna svarstiden som skillnaden mellan Kafkas tidsstämpel för utdata och källtidsstämpeln.

Du kan också uppskatta svarstiden från slutpunkt till slutpunkt med hjälp av de inbyggda måtten och API:erna som beskrivs nedan.

Inbyggda mått med StreamingQueryProgress

Följande mått ingår i StreamingQueryProgress händelsen, som loggas automatiskt i drivrutinsloggarna. Du kan också komma åt dem via StreamingQueryListeneråteranropsfunktionen onQueryProgress() . QueryProgressEvent.json() eller toString() inkludera extra realtidslägesmått.

  1. Bearbetningssvarstid (processingLatencyMs). Den tid som förflutit mellan när realtidsförfrågan läser en post och när den skriver den till nästa steg eller vidare nedströms. För enstegsfrågor mäter detta samma varaktighet som E2E-svarstid. Systemet rapporterar detta mått per uppgift.
  2. Svarstid för källköer (sourceQueuingLatencyMs). Den tid som förflutit mellan när systemet skriver en post till en meddelandebuss, till exempel loggens tilläggstid i Kafka, och när realtidslägesfrågan först läser posten. Systemet rapporterar det här måttet per aktivitet.
  3. E2E-svarstid (e2eLatencyMs). Tiden mellan när systemet skriver posten till en meddelandebuss och när realtidsfrågan skriver posten vidare. Systemet aggregerar det här mätvärdet för varje batch över alla poster som bearbetas av alla uppgifter.

Till exempel:

"rtmMetrics" : {
    "processingLatencyMs" : {
      "P0" : 0,
      "P50" : 0,
      "P90" : 0,
      "P95" : 0,
      "P99" : 0
    },
    "sourceQueuingLatencyMs" : {
      "P0" : 0,
      "P50" : 1,
      "P90" : 1,
      "P95" : 2,
      "P99" : 3
    },
    "e2eLatencyMs" : {
      "P0" : 0,
      "P50" : 1,
      "P90" : 1,
      "P95" : 2,
      "P99" : 4
    },

Anpassad svarstidsmätning med Observera API

Observera API:et hjälper till att mäta svarstiden utan att starta ett annat jobb. Om du har en källtidsstämpel som beräknar källdatans ankomsttid kan du uppskatta varje batchs svarstid med hjälp av Observera API.et. Skicka tidsstämpeln innan du når diskbänken:

Python

from datetime import datetime

from pyspark.sql.functions import avg, col, lit, max, percentile_approx, udf, unix_millis
from pyspark.sql.types import TimestampType

@udf(returnType=TimestampType())
def current_timestamp():
  return datetime.now()

# Query before outputting
.withColumn("temp-timestamp", current_timestamp())
.withColumn(
  "latency",
  unix_millis(col("temp-timestamp")).cast("long") - unix_millis(col("timestamp")).cast("long"))
.observe(
  "observedLatency",
  avg(col("latency")).alias("avg"),
  max(col("latency")).alias("max"),
  percentile_approx(col("latency"), lit(0.99), lit(150)).alias("p99"),
  percentile_approx(col("latency"), lit(0.5), lit(150)).alias("p50"))
.drop(col("latency"))
.drop(col("temp-timestamp"))
# Output part of the query. For example, .WriteStream, etc.

Scala

import org.apache.spark.sql.functions.{avg, col, lit, max, percentile_approx, udf, unix_millis}

val currentTimestampUDF = udf(() => System.currentTimeMillis())

// Query before outputting
.withColumn("temp-timestamp", currentTimestampUDF())
.withColumn(
  "latency",
  col("temp-timestamp").cast("long") - unix_millis(col("timestamp")).cast("long"))
.observe(
  name = "observedLatency",
  avg(col("latency")).as("avg"),
  max(col("latency")).as("max"),
  percentile_approx(col("latency"), lit(0.99), lit(150)).as("p99"),
  percentile_approx(col("latency"), lit(0.5), lit(150)).as("p50"))
.drop(col("latency"))
.drop(col("temp-timestamp"))
// Output part of the query. For example, .WriteStream, etc.

I det här exemplet registreras en aktuell tidsstämpel innan posten matas ut, och svarstiden beräknas genom att skillnaden mellan den här tidsstämpeln och postens källtidsstämpel beräknas. Resultaten ingår i förloppsrapporterna och görs tillgängliga för lyssnare. Här är exempel på utdata:

"observedMetrics" : {
  "observedLatency" : {
    "avg" : 63.8369765176552,
    "max" : 219,
    "p99" : 154,
    "p50" : 49
  }
}

Funktionsstöd och begränsningar

I det här avsnittet beskrivs de funktioner och aktuella begränsningar som stöds i realtidsläget, inklusive kompatibla miljöer, språk, källor, mottagare, operatorer och särskilda överväganden för specifika funktioner.

Miljöer, språk och lägen som stöds

Typ av beräkning Supported
Dedikerad (tidigare: enskild användare) Yes
Standard (tidigare: delad) No
Lakeflow Spark Deklarativa Pipelines: Klassisk No
Lakeflow Spark-deklarativa pipelines serverlösa No
Serverless No

Språk som stöds:

Language Supported
Scala Yes
Java Yes
Python Yes

Körningslägen som stöds:

Körningsläge Supported
Uppdateringsläge Yes
Append mode No
Fullständigt läge No

Källor och mottagare som stöds

Källor:

Sources Supported
Apache Kafka Yes
AWS MSK Yes
Event Hubs (med Kafka Connector) Yes
Kinesis Ja (endast EFO-läge)
Google Pub/Sub No
Apache Pulsar No

Mottagare:

Sinks Supported
Apache Kafka Yes
Event Hubs (med Kafka Connector) Yes
Kinesis No
Google Pub/Sub No
Apache Pulsar No
Godtyckliga noder (med forEachWriter) Yes

Operatorer som stöds

Operators Supported
Tillståndslösa operationer
Selection Yes
Projection Yes
UDF:er
Scala UDF Ja (med vissa begränsningar)
Python-användardefinierad funktion (UDF) Ja (med vissa begränsningar)
sammansättning
sum Yes
count Yes
max Yes
min Yes
avg Yes
Sammansättningsfunktioner Yes
Fönster
Tumbling Yes
Sliding Yes
Session No
Deduplication
dropDuplicates Ja (tillståndet är obundet)
dropDuplicatesWithinWatermark No
Stream – Tabell-sammanfogning
Broadcast-tabell (bör vara liten) Yes
Stream – Stream Join No
(platt)MapGroupsWithState No
transformWithState Ja (med vissa skillnader)
union Ja (med vissa begränsningar)
forEach Yes
forEachBatch No
mapPartitions Nej (se begränsning)

Särskilda överväganden

Vissa operatorer och funktioner har specifika överväganden eller skillnader när de används i realtidsläge.

transformWithState i realtidsläge

För att skapa anpassade tillståndskänsliga program stöder DatabrickstransformWithState, ett API inom Apache Spark Structured Streaming. Mer information om API och kodfragment finns i Skapa ett anpassat tillståndskänsligt program .

Det finns dock vissa skillnader mellan hur API:et beter sig i realtidsläge och traditionella strömningsfrågor som utnyttjar mikrobatcharkitekturen.

  • Realtidsläget anropar handleInputRows(key: String, inputRows: Iterator[T], timerValues: TimerValues) metoden för varje rad.
    • Iteratorn inputRows returnerar ett enda värde. Mikrobatchläget anropar den en gång för varje nyckel, och inputRows iteratorn returnerar alla värden för en nyckel i mikrobatchen.
    • Du måste vara medveten om den här skillnaden när du skriver koden.
  • Tidsinställda händelser stöds inte i realtidsläge.
  • I realtidsläge fördröjs timers aktivering beroende på dataankomst:
    • Om en timer är schemalagd till 10:00:00 men inga data anländer, utlöses inte timern omedelbart.
    • Om data kommer klockan 10:00:10 utlöses timern med en fördröjning på 10 sekunder.
    • Om inga data tas emot och den långvariga batchen avslutas utlöses timern innan batchen avslutas.

Python-UDF:er i realtidsläge

Databricks stöder de flesta användardefinierade Python-funktioner (UDF: er) i realtidsläge:

UDF-typ Supported
Tillståndslös UDF
Python-skalär UDF (länk) Yes
Arrow scalar UDF Yes
Pandas scalar UDF (länk) Yes
Pilfunktion (mapInArrow) Yes
Pandas-funktion (länk) Yes
Stateful Grouping UDF (UDAF)
transformWithState (endast Row gränssnitt) Yes
applyInPandasWithState No
Icke-tillståndskänslig gruppering av UDF (UDAF)
apply No
applyInArrow No
applyInPandas No
Tabellfunktion
UDTF (länk) No
UC UDF No

Det finns flera saker att tänka på när du använder Python-UDF:er i realtidsläge:

  • För att minimera svarstiden konfigurerar du pilbatchstorleken (spark.sql.execution.arrow.maxRecordsPerBatch) till 1.
    • Kompromiss: Den här konfigurationen optimerar för svarstid på bekostnad av dataflödet. För de flesta arbetsbelastningar rekommenderas den här inställningen.
    • Öka batchstorleken endast om ett högre dataflöde krävs för att hantera indatavolymen och acceptera den potentiella ökningen av svarstiden.
  • Pandas UDF:ar och funktioner fungerar inte bra med en Arrow-batchstorlek på 1.
    • Om du använder Pandas UDF:er eller funktioner anger du pilbatchstorleken till ett högre värde (till exempel 100 eller högre).
    • Observera att detta innebär högre svarstid. Databricks rekommenderar att du använder Arrow UDF eller funktionen om möjligt.
  • På grund av prestandaproblemet med Pandas stöds transformWithState endast med Row gränssnittet.

Limitations

Källbegränsningar

För Kinesis stöder realtidsläget inte avsökningsläge. Dessutom kan frekventa ompartitioner påverka svarstiden negativt.

Unionsbegränsningar

Unionsoperatören har vissa begränsningar:

  • Realtidsläget stöder inte självunion:
    • Kafka: Du kan inte använda samma källdataramobjekt och unionera härledda dataramar från det. Lösning: Använd olika DataFrames som läser från samma källa.
    • Kinesis: Du kan inte förena dataramar som härletts från samma Kinesis-källa med samma konfiguration. Lösning: Förutom att använda olika DataFrames kan du tilldela varje DataFrame ett annat "consumerName"-alternativ.
  • Realtidsläget stöder inte tillståndskänsliga operatorer (till exempel , aggregatededuplicate, transformWithState) som definierats före unionen.
  • Realtidsläget stöder inte union med batchkällor.

MapPartitions-begränsning

mapPartitions i Scala och liknande Python-API:er (mapInPandas, mapInArrow) tar du en iterator för hela indatapartitionen och skapar en iterator för hela utdata med godtycklig mappning mellan indata och utdata. Dessa API:er kan orsaka prestandaproblem i strömningsläge Real-Time genom att blockera hela utdata, vilket ökar svarstiden. Semantiken för dessa API:er stöder inte effektivt vattenstämpelspridning.

Använd skalära UDF:er i kombination med Transformera komplexa datatyper eller filter i stället för att uppnå liknande funktioner.

Nästa steg

Nu när du förstår vad realtidsläge är och hur du konfigurerar det kan du utforska dessa resurser för att börja implementera realtidsströmningsprogram: