Kommentar
Åtkomst till den här sidan kräver auktorisering. Du kan prova att logga in eller ändra kataloger.
Åtkomst till den här sidan kräver auktorisering. Du kan prova att ändra kataloger.
Important
Den här funktionen finns som allmänt tillgänglig förhandsversion.
Realtidsläge är en utlösartyp för strukturerad direktuppspelning som möjliggör databearbetning med extremt låg svarstid med svarstid från slutpunkt till slutpunkt så låg som fem millisekunder. Använd realtidsläge för driftarbetsbelastningar som kräver omedelbara svar på strömmande data, till exempel bedrägeriidentifiering, realtidsanpassning och omedelbara beslutssystem.
Realtidsläge är tillgängligt i Databricks Runtime 16.4 LTS och senare. Stegvisa installationsinstruktioner finns i Komma igång med realtidsläge. Kodexempel finns i Exempel på realtidsläge.
Vad är realtidsläge?
Operationella vs. analytiska arbetsbelastningar
Strömmande arbetsbelastningar kan i stort sett delas in i analytiska arbetsbelastningar och operativa arbetsbelastningar:
- Analytiska arbetsbelastningar använder datainmatning och transformering, vanligtvis efter medaljongarkitekturen (till exempel att mata in data i tabellerna brons, silver och guld).
- Driftarbetsbelastningar förbrukar realtidsdata, tillämpar affärslogik och utlöser underordnade åtgärder eller beslut.
Några exempel på operativa arbetsbelastningar är:
- Blockera eller flagga en kreditkortstransaktion i realtid om en bedrägeripoäng överskrider ett tröskelvärde, baserat på faktorer som ovanlig plats, stor transaktionsstorlek eller snabba utgiftsmönster.
- Leverera ett kampanjmeddelande när clickstream-data visar att en användare har bläddrat efter jeans i fem minuter och erbjuder en rabatt på 25% om de köper under de kommande 15 minuterna.
I allmänhet kännetecknas operativa arbetsbelastningar av behovet av subsekund latens från slutpunkt till slutpunkt. Detta kan uppnås med realtidsläge i Apache Spark Structured Streaming.
Så här uppnår realtidsläget låg svarstid
Realtidsläget förbättrar körningsarkitekturen genom att:
- Köra långvariga batchar (standardvärdet är fem minuter), där systemet bearbetar data när de blir tillgängliga i källan.
- Schemalägger alla faser av frågan samtidigt. Detta kräver att antalet tillgängliga aktivitetsfack är lika med eller större än antalet aktiviteter för alla faser i en batch.
- Skicka data mellan faser så snart de har producerats med hjälp av en streamingomkastning.
I slutet av bearbetningen av en batch, och innan nästa batch startar, gör Structured Streaming kontrollpunktsavstämningar och publicerar mätvärden. Batchvaraktigheten påverkar kontrollpunktsfrekvensen:
- Längre batchar: Mindre frekventa kontrollpunkter, vilket innebär längre repriser vid fel och fördröjd måtttillgänglighet.
- Kortare batchar: Vanligare kontrollpunkter, vilket kan påverka svarstiden.
Databricks rekommenderar att du jämför realtidsläget mot målarbetsbelastningen för att hitta rätt utlösarintervall.
När du ska använda realtidsläge
Välj realtidsläge när ditt användningsfall kräver:
- Svarstid under sekund: Program som behöver svara på data inom millisekunder, till exempel system för identifiering av bedrägerier som måste blockera transaktioner i realtid.
- Operativt beslutsfattande: System som utlöser omedelbara åtgärder baserat på inkommande data, till exempel erbjudanden i realtid, aviseringar eller meddelanden.
- Kontinuerlig bearbetning: Arbetsbelastningar där data måste bearbetas så snart de kommer, i stället för i periodiska batchar.
Använd mikrobatchläge (standardutlösaren för strukturerad direktuppspelning) när:
- Analysbearbetning: ETL-pipelines, datatransformeringar och arkitekturimplementeringar för medaljonger där svarstidskrav mäts i sekunder eller minuter.
- Kostnadsoptimering: Arbetsbelastningar där svarstid under sekund inte krävs, eftersom realtidsläge kräver dedikerade beräkningsresurser.
- Kontrollpunktsfrekvensen är viktig: Program som drar nytta av mer frekventa kontrollpunkter för snabbare återställning.
Krav och konfiguration
Realtidsläget har specifika krav för beräkningskonfiguration och frågekonfiguration. I det här avsnittet beskrivs de krav och konfigurationssteg som krävs för att använda realtidsläge.
Förutsättningar
Om du vill använda realtidsläge måste du uppfylla följande krav:
- Databricks Runtime 16.4 LTS eller senare: Realtidsläge är endast tillgängligt i DBR 16.4 LTS och senare versioner.
- Dedikerad beräkning: Du måste använda en dedikerad beräkning (tidigare en enskild användare). Standard (tidigare delat), Lakeflow Spark Deklarativa Pipelines och serverlösa kluster stöds inte.
- Ingen autoskalning: Autoskalning måste inaktiveras.
- Ingen foton: Fotoacceleration stöds inte i realtidsläge.
-
Spark-konfiguration: Du måste ange
spark.databricks.streaming.realTimeMode.enabledtilltrue.
Beräkningskonfiguration
Konfigurera din beräkning med följande inställningar:
- Ange
spark.databricks.streaming.realTimeMode.enabledtilltruei Spark-konfigurationen. - Inaktivera automatisk skalning.
- Inaktivera fotoacceleration.
- Se till att datorkapaciteten är konfigurerad som ett dedikerat kluster (inte standard, Lakeflow Spark Deklarativa Pipelines eller serverlös).
Stegvisa instruktioner för hur du skapar och konfigurerar beräkning för realtidsläge finns i Komma igång med realtidsläge.
Frågekonfiguration
Om du vill köra en fråga i realtidsläge måste du aktivera realtidsutlösaren. Realtidsutlösare stöds endast i uppdateringsläge.
Python
query = (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", broker_address)
.option("subscribe", input_topic)
.load()
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", broker_address)
.option("topic", output_topic)
.option("checkpointLocation", checkpoint_location)
.outputMode("update")
# In PySpark, the realTime trigger requires specifying the interval.
.trigger(realTime="5 minutes")
.start()
)
Scala
import org.apache.spark.sql.execution.streaming.RealTimeTrigger
val readStream = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", brokerAddress)
.option("subscribe", inputTopic).load()
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", brokerAddress)
.option("topic", outputTopic)
.option("checkpointLocation", checkpointLocation)
.outputMode("update")
.trigger(RealTimeTrigger.apply())
// RealTimeTrigger can also accept an argument specifying the checkpoint interval.
// For example, this code indicates a checkpoint interval of 5 minutes:
// .trigger(RealTimeTrigger.apply("5 minutes"))
.start()
Beräkningsstorlek
Du kan köra ett realtidsjobb per beräkningsresurs om resursen har tillräckligt med uppgiftsplatser.
Om du vill köra i läge med låg latens måste det totala antalet tillgängliga aktivitetsfack vara större än eller lika med antalet aktiviteter i alla frågesteg.
Exempel på platsberäkningar
| Typ av pipeline | Konfiguration | Nödvändiga fack |
|---|---|---|
| Tillståndslös i en fas (Kafka-källa + mottagare) |
maxPartitions = 8 |
8 platser |
| Tillståndskänsligt i två steg (Kafka-källa + shuffle) |
maxPartitions = 8, shuffle partitioner = 20 |
28 platser (8 + 20) |
| Tresteg (Kafka-källa + shuffle + ompartition) |
maxPartitions = 8, två blandningssteg på 20 vardera |
48 platser (8 + 20 + 20) |
Om du inte anger maxPartitions, använd antalet partitioner i Kafka-ämnet.
Viktiga överväganden
Tänk på följande när du konfigurerar din beräkning:
- Till skillnad från mikrobatchläge kan realtidsuppgifter vara inaktiva i väntan på data, så rätt storleksändring är viktigt för att undvika bortkastade resurser.
- Sikta på en målanvändningsnivå (till exempel 50%) genom att justera:
-
maxPartitions(för Kafka) -
spark.sql.shuffle.partitions(för shuffle-steg)
-
- Databricks rekommenderar att du ställer in
maxPartitionsså att varje uppgift hanterar flera Kafka-partitioner för att minska kostnaderna. - Justera arbetsuppgifter per anställd för att matcha arbetsbelastningen för enkla, enetapsjobb.
- För shuffle-heavy-jobb experimenterar du med att hitta det minsta antalet shuffle-partitioner som undviker kvarvarande uppgifter och justerar därifrån. Beräkningen schemalägger inte jobbet om det inte har tillräckligt med platser.
Note
Från Databricks Runtime 16.4 LTS och senare använder alla realtidspipelines kontrollpunkt v2, vilket möjliggör sömlös växling mellan realtids- och mikrobatchlägen.
Optimeringstekniker
| Technique | Aktiverad som standard |
|---|---|
| Asynkron förloppsspårning: Flyttar skrivning till offsetlogg och commit-logg till en asynkron tråd, vilket minskar tiden mellan två mikrobatchar. Detta kan bidra till att minska svarstiden för tillståndslösa strömningsfrågor. | No |
| Asynkron kontrollpunkt för tillstånd: Hjälper till att minska svarstiden för tillståndskänsliga strömningsfrågor genom att börja bearbeta nästa mikrobatch så snart beräkningen av föregående mikrobatch slutförs, utan att vänta på tillståndskontroll. | No |
övervakning och observerbarhet
Det är viktigt att mäta frågeprestanda för arbetsbelastningar i realtid. I realtidsläge återspeglar traditionella mått för batchvaraktighet inte faktisk svarstid, så du behöver alternativa metoder.
Svarstiden från slutpunkt till slutpunkt är arbetsbelastningsspecifik och kan ibland bara mätas korrekt med affärslogik. Om källtidsstämpeln till exempel är utdata i Kafka kan du beräkna svarstiden som skillnaden mellan Kafkas tidsstämpel för utdata och källtidsstämpeln.
Du kan också uppskatta svarstiden från slutpunkt till slutpunkt med hjälp av de inbyggda måtten och API:erna som beskrivs nedan.
Inbyggda mått med StreamingQueryProgress
Följande mått ingår i StreamingQueryProgress händelsen, som loggas automatiskt i drivrutinsloggarna. Du kan också komma åt dem via StreamingQueryListeneråteranropsfunktionen onQueryProgress() .
QueryProgressEvent.json() eller toString() inkludera extra realtidslägesmått.
- Bearbetningssvarstid (processingLatencyMs). Den tid som förflutit mellan när realtidsförfrågan läser en post och när den skriver den till nästa steg eller vidare nedströms. För enstegsfrågor mäter detta samma varaktighet som E2E-svarstid. Systemet rapporterar detta mått per uppgift.
- Svarstid för källköer (sourceQueuingLatencyMs). Den tid som förflutit mellan när systemet skriver en post till en meddelandebuss, till exempel loggens tilläggstid i Kafka, och när realtidslägesfrågan först läser posten. Systemet rapporterar det här måttet per aktivitet.
- E2E-svarstid (e2eLatencyMs). Tiden mellan när systemet skriver posten till en meddelandebuss och när realtidsfrågan skriver posten vidare. Systemet aggregerar det här mätvärdet för varje batch över alla poster som bearbetas av alla uppgifter.
Till exempel:
"rtmMetrics" : {
"processingLatencyMs" : {
"P0" : 0,
"P50" : 0,
"P90" : 0,
"P95" : 0,
"P99" : 0
},
"sourceQueuingLatencyMs" : {
"P0" : 0,
"P50" : 1,
"P90" : 1,
"P95" : 2,
"P99" : 3
},
"e2eLatencyMs" : {
"P0" : 0,
"P50" : 1,
"P90" : 1,
"P95" : 2,
"P99" : 4
},
Anpassad svarstidsmätning med Observera API
Observera API:et hjälper till att mäta svarstiden utan att starta ett annat jobb. Om du har en källtidsstämpel som beräknar källdatans ankomsttid kan du uppskatta varje batchs svarstid med hjälp av Observera API.et. Skicka tidsstämpeln innan du når diskbänken:
Python
from datetime import datetime
from pyspark.sql.functions import avg, col, lit, max, percentile_approx, udf, unix_millis
from pyspark.sql.types import TimestampType
@udf(returnType=TimestampType())
def current_timestamp():
return datetime.now()
# Query before outputting
.withColumn("temp-timestamp", current_timestamp())
.withColumn(
"latency",
unix_millis(col("temp-timestamp")).cast("long") - unix_millis(col("timestamp")).cast("long"))
.observe(
"observedLatency",
avg(col("latency")).alias("avg"),
max(col("latency")).alias("max"),
percentile_approx(col("latency"), lit(0.99), lit(150)).alias("p99"),
percentile_approx(col("latency"), lit(0.5), lit(150)).alias("p50"))
.drop(col("latency"))
.drop(col("temp-timestamp"))
# Output part of the query. For example, .WriteStream, etc.
Scala
import org.apache.spark.sql.functions.{avg, col, lit, max, percentile_approx, udf, unix_millis}
val currentTimestampUDF = udf(() => System.currentTimeMillis())
// Query before outputting
.withColumn("temp-timestamp", currentTimestampUDF())
.withColumn(
"latency",
col("temp-timestamp").cast("long") - unix_millis(col("timestamp")).cast("long"))
.observe(
name = "observedLatency",
avg(col("latency")).as("avg"),
max(col("latency")).as("max"),
percentile_approx(col("latency"), lit(0.99), lit(150)).as("p99"),
percentile_approx(col("latency"), lit(0.5), lit(150)).as("p50"))
.drop(col("latency"))
.drop(col("temp-timestamp"))
// Output part of the query. For example, .WriteStream, etc.
I det här exemplet registreras en aktuell tidsstämpel innan posten matas ut, och svarstiden beräknas genom att skillnaden mellan den här tidsstämpeln och postens källtidsstämpel beräknas. Resultaten ingår i förloppsrapporterna och görs tillgängliga för lyssnare. Här är exempel på utdata:
"observedMetrics" : {
"observedLatency" : {
"avg" : 63.8369765176552,
"max" : 219,
"p99" : 154,
"p50" : 49
}
}
Funktionsstöd och begränsningar
I det här avsnittet beskrivs de funktioner och aktuella begränsningar som stöds i realtidsläget, inklusive kompatibla miljöer, språk, källor, mottagare, operatorer och särskilda överväganden för specifika funktioner.
Miljöer, språk och lägen som stöds
| Typ av beräkning | Supported |
|---|---|
| Dedikerad (tidigare: enskild användare) | Yes |
| Standard (tidigare: delad) | No |
| Lakeflow Spark Deklarativa Pipelines: Klassisk | No |
| Lakeflow Spark-deklarativa pipelines serverlösa | No |
| Serverless | No |
Språk som stöds:
| Language | Supported |
|---|---|
| Scala | Yes |
| Java | Yes |
| Python | Yes |
Körningslägen som stöds:
| Körningsläge | Supported |
|---|---|
| Uppdateringsläge | Yes |
| Append mode | No |
| Fullständigt läge | No |
Källor och mottagare som stöds
Källor:
| Sources | Supported |
|---|---|
| Apache Kafka | Yes |
| AWS MSK | Yes |
| Event Hubs (med Kafka Connector) | Yes |
| Kinesis | Ja (endast EFO-läge) |
| Google Pub/Sub | No |
| Apache Pulsar | No |
Mottagare:
| Sinks | Supported |
|---|---|
| Apache Kafka | Yes |
| Event Hubs (med Kafka Connector) | Yes |
| Kinesis | No |
| Google Pub/Sub | No |
| Apache Pulsar | No |
| Godtyckliga noder (med forEachWriter) | Yes |
Operatorer som stöds
| Operators | Supported |
|---|---|
| Tillståndslösa operationer | |
| Selection | Yes |
| Projection | Yes |
| UDF:er | |
| Scala UDF | Ja (med vissa begränsningar) |
| Python-användardefinierad funktion (UDF) | Ja (med vissa begränsningar) |
| sammansättning | |
| sum | Yes |
| count | Yes |
| max | Yes |
| min | Yes |
| avg | Yes |
| Sammansättningsfunktioner | Yes |
| Fönster | |
| Tumbling | Yes |
| Sliding | Yes |
| Session | No |
| Deduplication | |
| dropDuplicates | Ja (tillståndet är obundet) |
| dropDuplicatesWithinWatermark | No |
| Stream – Tabell-sammanfogning | |
| Broadcast-tabell (bör vara liten) | Yes |
| Stream – Stream Join | No |
| (platt)MapGroupsWithState | No |
| transformWithState | Ja (med vissa skillnader) |
| union | Ja (med vissa begränsningar) |
| forEach | Yes |
| forEachBatch | No |
| mapPartitions | Nej (se begränsning) |
Särskilda överväganden
Vissa operatorer och funktioner har specifika överväganden eller skillnader när de används i realtidsläge.
transformWithState i realtidsläge
För att skapa anpassade tillståndskänsliga program stöder DatabrickstransformWithState, ett API inom Apache Spark Structured Streaming. Mer information om API och kodfragment finns i Skapa ett anpassat tillståndskänsligt program .
Det finns dock vissa skillnader mellan hur API:et beter sig i realtidsläge och traditionella strömningsfrågor som utnyttjar mikrobatcharkitekturen.
- Realtidsläget anropar
handleInputRows(key: String, inputRows: Iterator[T], timerValues: TimerValues)metoden för varje rad.- Iteratorn
inputRowsreturnerar ett enda värde. Mikrobatchläget anropar den en gång för varje nyckel, ochinputRowsiteratorn returnerar alla värden för en nyckel i mikrobatchen. - Du måste vara medveten om den här skillnaden när du skriver koden.
- Iteratorn
- Tidsinställda händelser stöds inte i realtidsläge.
- I realtidsläge fördröjs timers aktivering beroende på dataankomst:
- Om en timer är schemalagd till 10:00:00 men inga data anländer, utlöses inte timern omedelbart.
- Om data kommer klockan 10:00:10 utlöses timern med en fördröjning på 10 sekunder.
- Om inga data tas emot och den långvariga batchen avslutas utlöses timern innan batchen avslutas.
Python-UDF:er i realtidsläge
Databricks stöder de flesta användardefinierade Python-funktioner (UDF: er) i realtidsläge:
| UDF-typ | Supported |
|---|---|
| Tillståndslös UDF | |
| Python-skalär UDF (länk) | Yes |
| Arrow scalar UDF | Yes |
| Pandas scalar UDF (länk) | Yes |
Pilfunktion (mapInArrow) |
Yes |
| Pandas-funktion (länk) | Yes |
| Stateful Grouping UDF (UDAF) | |
transformWithState (endast Row gränssnitt) |
Yes |
| applyInPandasWithState | No |
| Icke-tillståndskänslig gruppering av UDF (UDAF) | |
| apply | No |
| applyInArrow | No |
| applyInPandas | No |
| Tabellfunktion | |
| UDTF (länk) | No |
| UC UDF | No |
Det finns flera saker att tänka på när du använder Python-UDF:er i realtidsläge:
- För att minimera svarstiden konfigurerar du pilbatchstorleken (
spark.sql.execution.arrow.maxRecordsPerBatch) till 1.- Kompromiss: Den här konfigurationen optimerar för svarstid på bekostnad av dataflödet. För de flesta arbetsbelastningar rekommenderas den här inställningen.
- Öka batchstorleken endast om ett högre dataflöde krävs för att hantera indatavolymen och acceptera den potentiella ökningen av svarstiden.
- Pandas UDF:ar och funktioner fungerar inte bra med en Arrow-batchstorlek på 1.
- Om du använder Pandas UDF:er eller funktioner anger du pilbatchstorleken till ett högre värde (till exempel 100 eller högre).
- Observera att detta innebär högre svarstid. Databricks rekommenderar att du använder Arrow UDF eller funktionen om möjligt.
- På grund av prestandaproblemet med Pandas stöds transformWithState endast med
Rowgränssnittet.
Limitations
Källbegränsningar
För Kinesis stöder realtidsläget inte avsökningsläge. Dessutom kan frekventa ompartitioner påverka svarstiden negativt.
Unionsbegränsningar
Unionsoperatören har vissa begränsningar:
- Realtidsläget stöder inte självunion:
- Kafka: Du kan inte använda samma källdataramobjekt och unionera härledda dataramar från det. Lösning: Använd olika DataFrames som läser från samma källa.
- Kinesis: Du kan inte förena dataramar som härletts från samma Kinesis-källa med samma konfiguration. Lösning: Förutom att använda olika DataFrames kan du tilldela varje DataFrame ett annat "consumerName"-alternativ.
- Realtidsläget stöder inte tillståndskänsliga operatorer (till exempel ,
aggregatededuplicate,transformWithState) som definierats före unionen. - Realtidsläget stöder inte union med batchkällor.
MapPartitions-begränsning
mapPartitions i Scala och liknande Python-API:er (mapInPandas, mapInArrow) tar du en iterator för hela indatapartitionen och skapar en iterator för hela utdata med godtycklig mappning mellan indata och utdata. Dessa API:er kan orsaka prestandaproblem i strömningsläge Real-Time genom att blockera hela utdata, vilket ökar svarstiden. Semantiken för dessa API:er stöder inte effektivt vattenstämpelspridning.
Använd skalära UDF:er i kombination med Transformera komplexa datatyper eller filter i stället för att uppnå liknande funktioner.
Nästa steg
Nu när du förstår vad realtidsläge är och hur du konfigurerar det kan du utforska dessa resurser för att börja implementera realtidsströmningsprogram:
- Kom igång med realtidsläge – Följ stegvisa instruktioner för att konfigurera beräkning och köra din första realtidsströmningsfråga.
- Kodexempel i realtidsläge – Utforska arbetsexempel som Kafka-källor och mottagare, tillståndskänsliga frågor, sammansättningar och anpassade mottagare.
- Koncept för strukturerad direktuppspelning – Lär dig grundbegreppen för strukturerad direktuppspelning på Databricks.