Notitie
Voor toegang tot deze pagina is autorisatie vereist. U kunt proberen u aan te melden of de directory te wijzigen.
Voor toegang tot deze pagina is autorisatie vereist. U kunt proberen de mappen te wijzigen.
Important
Deze functie bevindt zich in openbare preview-versie.
De realtimemodus is een triggertype voor Structured Streaming waarmee gegevensverwerking met ultra lage latentie met end-to-end latentie tot vijf milliseconden mogelijk is. Gebruik de realtimemodus voor operationele workloads waarvoor directe reactie op streaminggegevens is vereist, zoals fraudedetectie, realtime persoonlijke instellingen en directe besluitvormingssystemen.
De realtimemodus is beschikbaar in Databricks Runtime 16.4 LTS en hoger. Zie Aan de slag met de realtime-modus voor stapsgewijze installatie-instructies. Zie voorbeelden van realtimemodus voor codevoorbeelden.
Wat is real-time modus?
Operationele versus analytische workloads
Streaming-workloads kunnen breed worden onderverdeeld in analytische workloads en operationele workloads.
- Analytische workloads maken gebruik van gegevensinname en transformatie, meestal volgens de medaillonarchitectuur (bijvoorbeeld het opnemen van gegevens in de bronzen, zilveren en gouden tabellen).
- Operationele workloads verbruiken realtimegegevens, passen bedrijfslogica toe en activeren downstreamacties of beslissingen.
Enkele voorbeelden van operationele workloads zijn:
- Het blokkeren of markeren van een creditcardtransactie in realtime als een fraudescore een drempelwaarde overschrijdt, op basis van factoren zoals ongebruikelijke locatie, grote transactiegrootte of snelle uitgavenpatronen.
- Het versturen van een promotiebericht wanneer clickstream-gegevens laten zien dat een gebruiker al vijf minuten jeans aan het bekijken is, waarbij een korting van 25% wordt aangeboden als ze binnen de komende 15 minuten kopen.
Over het algemeen worden operationele workloads gekenmerkt door de noodzaak van end-to-end latentie van minder dan een seconde. Dit kan worden bereikt met de realtimemodus in Apache Spark Structured Streaming.
Hoe de realtime-modus lage latentie bereikt
De realtimemodus verbetert de uitvoeringsarchitectuur door:
- Het uitvoeren van langlopende batches (de standaardwaarde is vijf minuten), waarin het systeem gegevens verwerkt zodra deze beschikbaar is in de bron.
- Alle fasen van de query tegelijk plannen. Hiervoor moet het aantal beschikbare taaksites gelijk zijn aan of groter zijn dan het aantal taken van alle fasen in een batch.
- Gegevens overdragen tussen fasen zodra deze worden geproduceerd met behulp van een streaming shuffle.
Aan het einde van de verwerking van een batch en voordat de volgende batch wordt gestart, worden controlepunten voor Structured Streaming uitgevoerd en worden metrische data gepubliceerd. De batchduur is van invloed op de controlepuntfrequentie:
- Langere batches: minder frequente controlepunten, wat betekent dat er langer wordt afgespeeld op fouten en vertraagde beschikbaarheid van metrische gegevens.
- Kortere batches: frequentere controlepunten, wat de latentie kan beïnvloeden.
Databricks raadt aan om de realtime-modus te benchmarken voor uw doelworkload om het juiste triggerinterval te vinden.
Wanneer gebruikt u de realtimemodus?
Kies de realtimemodus wanneer uw use-case vereist:
- Sub-seconde latentie: toepassingen die binnen milliseconden op gegevens moeten reageren, zoals systemen voor fraudedetectie die transacties in realtime moeten blokkeren.
- Operationele besluitvorming: systemen die directe acties activeren op basis van binnenkomende gegevens, zoals realtime aanbiedingen, waarschuwingen of meldingen.
- Continue verwerking: workloads waar gegevens moeten worden verwerkt zodra ze binnenkomen, in plaats van in periodieke batches.
Gebruik de microbatchmodus (de standaardtrigger voor gestructureerd streamen) wanneer:
- Analytische verwerking: ETL-pijplijnen, gegevenstransformaties en implementaties van medaillonarchitectuur waarbij latentievereisten in seconden of minuten worden gemeten.
- Kostenoptimalisatie: workloads waarbij een latentie van minder dan een seconde niet is vereist, omdat voor de realtimemodus toegewezen rekenresources zijn vereist.
- De controlepuntfrequentie is van belang: toepassingen die profiteren van frequentere controlepunten voor sneller herstel.
Vereisten en configuratie
De realtimemodus heeft specifieke vereisten voor het instellen en uitvoeren van query's. In deze sectie worden de vereisten en configuratiestappen beschreven die nodig zijn voor het gebruik van de realtimemodus.
Vereiste voorwaarden
Als u de realtimemodus wilt gebruiken, moet u voldoen aan de volgende vereisten:
- Databricks Runtime 16.4 LTS of hoger: de realtimemodus is alleen beschikbaar in DBR 16.4 LTS en latere versies.
- Toegewezen rekenkracht: u moet een toegewezen rekenproces (voorheen één gebruiker) gebruiken. Standard (voorheen gedeeld), Lakeflow Spark-declaratieve pijplijnen en serverloze clusters worden niet ondersteund.
- Geen automatische schaalaanpassing: automatische schaalaanpassing moet worden uitgeschakeld.
- Geen foton: Fotonversnelling wordt niet ondersteund met de realtime-modus.
-
Spark-configuratie: u moet
spark.databricks.streaming.realTimeMode.enabledinstellen optrue.
Berekeningsconfiguratie
Configureer uw rekenproces met de volgende instellingen:
- Ingesteld
spark.databricks.streaming.realTimeMode.enabledoptruein de Spark-configuratie. - Automatische schaalaanpassing uitschakelen.
- Schakel Fotonversnelling uit.
- Zorg ervoor dat de berekening is geconfigureerd als een toegewezen cluster (niet standaard, Lakeflow Spark-declaratieve pijplijnen of serverloos).
Zie Aan de slag met de realtime-modus voor stapsgewijze instructies voor het maken en configureren van berekeningen voor realtimemodus.
Queryconfiguratie
Als u een query in realtime wilt uitvoeren, moet u de realtime-trigger inschakelen. Realtime-triggers worden alleen ondersteund in de updatemodus.
Python
query = (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", broker_address)
.option("subscribe", input_topic)
.load()
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", broker_address)
.option("topic", output_topic)
.option("checkpointLocation", checkpoint_location)
.outputMode("update")
# In PySpark, the realTime trigger requires specifying the interval.
.trigger(realTime="5 minutes")
.start()
)
Scala
import org.apache.spark.sql.execution.streaming.RealTimeTrigger
val readStream = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", brokerAddress)
.option("subscribe", inputTopic).load()
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", brokerAddress)
.option("topic", outputTopic)
.option("checkpointLocation", checkpointLocation)
.outputMode("update")
.trigger(RealTimeTrigger.apply())
// RealTimeTrigger can also accept an argument specifying the checkpoint interval.
// For example, this code indicates a checkpoint interval of 5 minutes:
// .trigger(RealTimeTrigger.apply("5 minutes"))
.start()
Berekening van de computergrootte
U kunt één realtime taak per computing resource uitvoeren als er voldoende taaksloten beschikbaar zijn.
Als u in lage-latentiemodus wilt werken, moet het totale aantal beschikbare taakslots groter zijn dan of gelijk zijn aan het aantal taken over alle querystadia heen.
Voorbeelden van sleufberekeningen
| Pijplijntype | Configuratie | Vereiste aansluitingen |
|---|---|---|
| Staatloos met één fase (Kafka-bron + sink) |
maxPartitions = 8 |
8 sleuven |
| Twee-fasen toestandsafhankelijk systeem (Kafka-bron + shuffle) |
maxPartitions = 8, partities verdelen = 20 |
28 sleuven (8 + 20) |
| Drie fasen (Kafka-bron + shuffle + repartition) |
maxPartitions = 8, twee willekeurige fasen van elk 20 |
48 sleuven (8 + 20 + 20) |
Als u maxPartitions niet instelt, gebruikt u het aantal partities van het Kafka-topic.
Belangrijke overwegingen
Houd rekening met het volgende wanneer u uw rekenproces configureert:
- In tegenstelling tot de microbatchmodus kunnen realtime taken inactief blijven tijdens het wachten op gegevens, zodat de juiste grootte essentieel is om verspilde resources te voorkomen.
- Richt u op een doelgebruiksniveau (bijvoorbeeld 50%) door het volgende af te stemmen:
-
maxPartitions(voor Kafka) -
spark.sql.shuffle.partitions(voor shuffle-fasen)
-
- Databricks raadt u aan de instelling zo in te stellen
maxPartitionsdat elke taak meerdere Kafka-partities verwerkt om de overhead te verminderen. - Pas taakslots per werknemer aan zodat deze overeenkomen met de werkbelasting voor eenvoudige taken met één fase.
- Experimenteer voor shuffle-intensieve taken om het minimale aantal shuffle-partities te vinden dat achterstanden voorkomt, en pas van daaruit aan. Het computersysteem plant de taak niet als het niet genoeg slots heeft.
Note
Vanuit Databricks Runtime 16.4 LTS en hoger gebruiken alle realtime pijplijnen controlepunt v2, waardoor naadloze schakelen tussen realtime- en microbatchmodi mogelijk is.
Optimalisatietechnieken
| Technique | Standaard ingeschakeld |
|---|---|
| Asynchrone voortgangstracering: Schrijven naar het offsetlogboek en het commitlogboek wordt naar een asynchrone thread verplaatst, waardoor de onderlinge tijd tussen twee microbatches wordt verminderd. Dit kan helpen de latentie van staatloze streamingquery's te verminderen. | No |
| Asynchrone statuscontrolepunten: helpt de latentie van stateful streamingquery's te verminderen door de volgende microbatch te verwerken zodra de berekening van de vorige microbatch is voltooid, zonder te wachten op statuscontrolepunten. | No |
bewaking en waarneembaarheid
Het meten van queryprestaties is essentieel voor realtime workloads. In realtime-modus geven traditionele metrische gegevens over batchduur geen werkelijke latentie weer, dus u hebt alternatieve benaderingen nodig.
End-to-end latentie is de workloadspecifiek en kan soms alleen nauwkeurig worden gemeten met bedrijfslogica. Als de brontijdstempel bijvoorbeeld wordt weergegeven in Kafka, kunt u de latentie berekenen als het verschil tussen de tijdstempel van Kafka en de brontijdstempel.
U kunt ook end-to-end latentie schatten met behulp van de ingebouwde metrische gegevens en API's die hieronder worden beschreven.
Ingebouwde metrische gegevens met StreamingQueryProgress
De volgende metrische gegevens worden opgenomen in de StreamingQueryProgress gebeurtenis, die automatisch wordt geregistreerd in de stuurprogrammalogboeken. U kunt ze ook openen via de callback-functie van de StreamingQueryListeneronQueryProgress() functie.
QueryProgressEvent.json() ofwel toString() extra metrische gegevens voor de realtimemodus opnemen.
- Verwerkingslatentie (processingLatencyMs). De tijd die is verstreken tussen het moment waarop de query in de real-time modus een record leest en wanneer de query deze naar de volgende fase of downstream schrijft. Voor query's met één fase meet dit dezelfde duur als de E2E-latentie. Het systeem rapporteert deze metrische gegevens per taak.
- Wachtrijlatentie van de bron (sourceQueuingLatencyMs). De hoeveelheid tijd die is verstreken tussen het moment waarop het systeem een record naar een berichtenbus schrijft, bijvoorbeeld de toevoegtijd van het logboek in Kafka, en wanneer de realtime-modusquery de record voor het eerst leest. Het systeem rapporteert deze metrische gegevens per taak.
- E2E Latentie (e2eLatencyMs). De tijd tussen het moment waarop het systeem de record naar een berichtenbus schrijft en wanneer de realtimemodusquery de record downstream schrijft. Het systeem voegt deze metrische waarde per batch samen voor alle records die door alle taken worden verwerkt.
Voorbeeld:
"rtmMetrics" : {
"processingLatencyMs" : {
"P0" : 0,
"P50" : 0,
"P90" : 0,
"P95" : 0,
"P99" : 0
},
"sourceQueuingLatencyMs" : {
"P0" : 0,
"P50" : 1,
"P90" : 1,
"P95" : 2,
"P99" : 3
},
"e2eLatencyMs" : {
"P0" : 0,
"P50" : 1,
"P90" : 1,
"P95" : 2,
"P99" : 4
},
Aangepaste latentiemeting met observeer-API
Met de Observe-API kunt u latentie meten zonder een andere taak te starten. Als u een brontijdstempel hebt die de aankomsttijd van de brongegevens benadert, kunt u met behulp van de Observe-API de latentie van elke batch schatten. Geef de tijdstempel door voordat u de sink bereikt:
Python
from datetime import datetime
from pyspark.sql.functions import avg, col, lit, max, percentile_approx, udf, unix_millis
from pyspark.sql.types import TimestampType
@udf(returnType=TimestampType())
def current_timestamp():
return datetime.now()
# Query before outputting
.withColumn("temp-timestamp", current_timestamp())
.withColumn(
"latency",
unix_millis(col("temp-timestamp")).cast("long") - unix_millis(col("timestamp")).cast("long"))
.observe(
"observedLatency",
avg(col("latency")).alias("avg"),
max(col("latency")).alias("max"),
percentile_approx(col("latency"), lit(0.99), lit(150)).alias("p99"),
percentile_approx(col("latency"), lit(0.5), lit(150)).alias("p50"))
.drop(col("latency"))
.drop(col("temp-timestamp"))
# Output part of the query. For example, .WriteStream, etc.
Scala
import org.apache.spark.sql.functions.{avg, col, lit, max, percentile_approx, udf, unix_millis}
val currentTimestampUDF = udf(() => System.currentTimeMillis())
// Query before outputting
.withColumn("temp-timestamp", currentTimestampUDF())
.withColumn(
"latency",
col("temp-timestamp").cast("long") - unix_millis(col("timestamp")).cast("long"))
.observe(
name = "observedLatency",
avg(col("latency")).as("avg"),
max(col("latency")).as("max"),
percentile_approx(col("latency"), lit(0.99), lit(150)).as("p99"),
percentile_approx(col("latency"), lit(0.5), lit(150)).as("p50"))
.drop(col("latency"))
.drop(col("temp-timestamp"))
// Output part of the query. For example, .WriteStream, etc.
In dit voorbeeld wordt een huidige tijdstempel vastgelegd voordat de invoer wordt uitgevoerd. Latentie wordt geschat door het verschil tussen deze tijdstempel en de brontijdstempel van de record te berekenen. De resultaten worden opgenomen in voortgangsrapporten en beschikbaar gesteld aan listeners. Hier volgt een voorbeelduitvoer:
"observedMetrics" : {
"observedLatency" : {
"avg" : 63.8369765176552,
"max" : 219,
"p99" : 154,
"p50" : 49
}
}
Functieondersteuning en -beperkingen
In deze sectie worden de ondersteunde functies en huidige beperkingen van de realtimemodus beschreven, waaronder compatibele omgevingen, talen, bronnen, sinks, operators en speciale overwegingen voor specifieke functies.
Ondersteunde omgevingen, talen en modi
| Rekentype | Supported |
|---|---|
| Dedicated (voorheen: één gebruiker) | Yes |
| Standard (voorheen: gedeeld) | No |
| Klassieke declaratieve Pijplijnen van Lakeflow Spark | No |
| Lakeflow Spark-declaratieve pijplijnen serverloos | No |
| Serverless | No |
Ondersteunde talen:
| Language | Supported |
|---|---|
| Scala | Yes |
| Java | Yes |
| Python | Yes |
Ondersteunde uitvoeringsmodi:
| Uitvoeringsmodus | Supported |
|---|---|
| Update-modus | Yes |
| Append mode | No |
| Volledige modus | No |
Ondersteunde bronnen en sinks
Bronnen:
| Sources | Supported |
|---|---|
| Apache Kafka | Yes |
| AWS MSK | Yes |
| Event Hubs (met behulp van Kafka-connector) | Yes |
| Kinesis | Ja (enkel EFO-modus) |
| Google Pub/Sub (een berichten- en gebeurtenissenservice van Google) | No |
| Apache Pulsar | No |
Heatsinks:
| Sinks | Supported |
|---|---|
| Apache Kafka | Yes |
| Event Hubs (met behulp van Kafka-connector) | Yes |
| Kinesis | No |
| Google Pub/Sub (een berichten- en gebeurtenissenservice van Google) | No |
| Apache Pulsar | No |
| Willekeurige sinks (met forEachWriter) | Yes |
Ondersteunde operators
| Operators | Supported |
|---|---|
| Staatloze bewerkingen | |
| Selection | Yes |
| Projection | Yes |
| UDF's | |
| Scala UDF | Ja (met enkele beperkingen) |
| Python UDF | Ja (met enkele beperkingen) |
| Aggregatie- | |
| sum | Yes |
| count | Yes |
| max | Yes |
| min | Yes |
| avg | Yes |
| Aggregatiesfuncties | Yes |
| Windowing | |
| Tumbling | Yes |
| Sliding | Yes |
| Session | No |
| Deduplicatie | |
| dropDuplicates | Ja (de toestand is onbegrensd) |
| dropDuplicatesWithinWatermark | No |
| Stream - Tabelkoppeling | |
| Broadcast-tabel (moet klein zijn) | Yes |
| Stream - Stream Toevoegen | No |
| (plat)MapGroupsWithState | No |
| transformWithState | Ja (met enkele verschillen) |
| union | Ja (met enkele beperkingen) |
| forEach | Yes |
| forEachBatch | No |
| mapPartitions | Nee (zie beperking) |
Speciale overwegingen
Sommige operators en functies hebben specifieke overwegingen of verschillen bij gebruik in realtimemodus.
transformWithState in realtimemodus
Voor het bouwen van aangepaste stateful toepassingen ondersteunt Databricks transformWithState, een API in Apache Spark Structured Streaming. Zie Een aangepaste stateful toepassing bouwen voor meer informatie over de API en codefragmenten.
Er zijn echter enkele verschillen tussen hoe de API zich gedraagt in realtime-modus en traditionele streamingquery's die gebruikmaken van de microbatcharchitectuur.
- In realtime wordt de
handleInputRows(key: String, inputRows: Iterator[T], timerValues: TimerValues)methode voor elke rij aangeroepen.- De
inputRowsiterator retourneert één waarde. De microbatchmodus roept deze eenmaal aan voor elke sleutel en deinputRowsiterator retourneert alle waarden voor een sleutel in de microbatch. - U moet rekening houden met dit verschil bij het schrijven van uw code.
- De
- Timers voor gebeurtenistijd worden niet ondersteund in realtimemodus.
- In realtime-modus worden timers vertraagd bij het activeren, afhankelijk van de aankomst van gegevens:
- Als een timer is gepland voor 10:00:00, maar er geen gegevens binnenkomen, wordt de timer niet onmiddellijk geactiveerd.
- Als de gegevens om 10:00:10 binnenkomen, wordt de timer geactiveerd met een vertraging van 10 seconden.
- Als er geen gegevens binnenkomen en de langlopende batch wordt beëindigd, wordt de timer geactiveerd voordat de batch wordt beëindigd.
Python UDFs in real-time modus
Databricks ondersteunt het merendeel van door de gebruiker gedefinieerde Python-functies (UDF's) in realtime:
| UDF type | Supported |
|---|---|
| Staatloze UDF | |
| Python scalaire UDF (koppeling) | Yes |
| Pijl scalaire UDF | Yes |
| Pandas scalar UDF (koppeling) | Yes |
Pijlfunctie (mapInArrow) |
Yes |
| Pandas functie (koppeling) | Yes |
| Stateful Grouping UDF (UDAF) | |
transformWithState (alleen Row interface) |
Yes |
| applyInPandasWithState | No |
| Niet-opbouwend groepering-UDF (UDAF) | |
| apply | No |
| applyInArrow | No |
| applyInPandas | No |
| Tabelfunctie | |
| UDTF (koppeling) | No |
| UC UDF | No |
Er zijn verschillende punten om rekening mee te houden bij het gebruik van Python UDF's in realtime-modus:
- Als u de latentie wilt minimaliseren, configureert u de grootte van de pijlbatch (
spark.sql.execution.arrow.maxRecordsPerBatch) op 1.- Afweging: Deze configuratie optimaliseert voor een lagere latentie ten koste van de doorvoer. Voor de meeste workloads wordt deze instelling aanbevolen.
- Verhoog de batchgrootte alleen als een hogere doorvoer is vereist voor invoervolume, waarbij de potentiële toename van de latentie wordt geaccepteerd.
- Pandas UDFs en functies presteren niet goed met een Arrow batch grootte van 1.
- Als u pandas UDF's of functies gebruikt, stelt u de Arrow-batchgrootte in op een hogere waarde (bijvoorbeeld 100 of hoger).
- Houd er rekening mee dat dit een hogere latentie impliceert. Databricks raadt het gebruik van Arrow UDF of functie aan, indien mogelijk.
- Vanwege het prestatieprobleem met pandas wordt transformWithState alleen ondersteund met de
Rowinterface.
Limitations
Bronbeperkingen
Voor Kinesis biedt de realtime-modus geen ondersteuning voor de polling-modus. Bovendien kunnen frequente repartities een negatieve invloed hebben op de latentie.
Beperkingen van de unie
De operator Union heeft enkele beperkingen:
- Realtime-modus biedt geen ondersteuning voor zelf-samenvoeging:
- Kafka: U kunt niet hetzelfde gegevensframeobject van de bron gebruiken en afgeleide gegevensframes hiervan samenvoegen. Tijdelijke oplossing: gebruik verschillende DataFrames die uit dezelfde bron worden gelezen.
- Kinesis: U kunt geen gegevensframes samenvoegen die zijn afgeleid van dezelfde Kinesis-bron met dezelfde configuratie. Tijdelijke oplossing: Naast het gebruik van verschillende DataFrames kunt u een andere optie 'consumerName' toewijzen aan elk DataFrame.
- In realtime-modus worden stateful operators (bijvoorbeeld
aggregate,deduplicatetransformWithState) die vóór de Unie zijn gedefinieerd, niet ondersteund. - De realtimemodus biedt geen ondersteuning voor samenvoeging met batchbronnen.
Beperking van MapPartitions
mapPartitions in Scala en vergelijkbare Python-API's (mapInPandas, mapInArrow) nemen een iterator van de gehele invoerpartitie en produceren een iterator voor de gehele uitvoer met arbitraire koppeling tussen invoer en uitvoer. Deze API's kunnen prestatieproblemen veroorzaken in de streamingmodus Real-Time door de volledige uitvoer te blokkeren, waardoor de latentie toeneemt. De semantiek van deze API's biedt geen ondersteuning voor het doorgeven van watermerken.
Gebruik scalaire UDF's in combinatie met complexe gegevenstypen transformeren of filter in plaats daarvan om vergelijkbare functionaliteit te bereiken.
Volgende stappen
Nu u begrijpt wat de realtimemodus is en hoe u deze configureert, verkent u deze resources om realtime streamingtoepassingen te implementeren:
- Aan de slag met de realtime-modus : volg stapsgewijze instructies voor het configureren van rekenkracht en het uitvoeren van uw eerste realtime streamingquery.
- Voorbeelden van code in realtimemodus : verken werkvoorbeelden, waaronder Kafka-bronnen en -sinks, stateful query's, aggregaties en aangepaste sinks.
- Structured Streaming-concepten : leer de basisconcepten van Structured Streaming op Databricks.