Delen via


Realtimemodus in gestructureerd streamen

Important

Deze functie bevindt zich in openbare preview-versie.

De realtimemodus is een triggertype voor Structured Streaming waarmee gegevensverwerking met ultra lage latentie met end-to-end latentie tot vijf milliseconden mogelijk is. Gebruik de realtimemodus voor operationele workloads waarvoor directe reactie op streaminggegevens is vereist, zoals fraudedetectie, realtime persoonlijke instellingen en directe besluitvormingssystemen.

De realtimemodus is beschikbaar in Databricks Runtime 16.4 LTS en hoger. Zie Aan de slag met de realtime-modus voor stapsgewijze installatie-instructies. Zie voorbeelden van realtimemodus voor codevoorbeelden.

Wat is real-time modus?

Operationele versus analytische workloads

Streaming-workloads kunnen breed worden onderverdeeld in analytische workloads en operationele workloads.

  • Analytische workloads maken gebruik van gegevensinname en transformatie, meestal volgens de medaillonarchitectuur (bijvoorbeeld het opnemen van gegevens in de bronzen, zilveren en gouden tabellen).
  • Operationele workloads verbruiken realtimegegevens, passen bedrijfslogica toe en activeren downstreamacties of beslissingen.

Enkele voorbeelden van operationele workloads zijn:

  • Het blokkeren of markeren van een creditcardtransactie in realtime als een fraudescore een drempelwaarde overschrijdt, op basis van factoren zoals ongebruikelijke locatie, grote transactiegrootte of snelle uitgavenpatronen.
  • Het versturen van een promotiebericht wanneer clickstream-gegevens laten zien dat een gebruiker al vijf minuten jeans aan het bekijken is, waarbij een korting van 25% wordt aangeboden als ze binnen de komende 15 minuten kopen.

Over het algemeen worden operationele workloads gekenmerkt door de noodzaak van end-to-end latentie van minder dan een seconde. Dit kan worden bereikt met de realtimemodus in Apache Spark Structured Streaming.

Hoe de realtime-modus lage latentie bereikt

De realtimemodus verbetert de uitvoeringsarchitectuur door:

  • Het uitvoeren van langlopende batches (de standaardwaarde is vijf minuten), waarin het systeem gegevens verwerkt zodra deze beschikbaar is in de bron.
  • Alle fasen van de query tegelijk plannen. Hiervoor moet het aantal beschikbare taaksites gelijk zijn aan of groter zijn dan het aantal taken van alle fasen in een batch.
  • Gegevens overdragen tussen fasen zodra deze worden geproduceerd met behulp van een streaming shuffle.

Aan het einde van de verwerking van een batch en voordat de volgende batch wordt gestart, worden controlepunten voor Structured Streaming uitgevoerd en worden metrische data gepubliceerd. De batchduur is van invloed op de controlepuntfrequentie:

  • Langere batches: minder frequente controlepunten, wat betekent dat er langer wordt afgespeeld op fouten en vertraagde beschikbaarheid van metrische gegevens.
  • Kortere batches: frequentere controlepunten, wat de latentie kan beïnvloeden.

Databricks raadt aan om de realtime-modus te benchmarken voor uw doelworkload om het juiste triggerinterval te vinden.

Wanneer gebruikt u de realtimemodus?

Kies de realtimemodus wanneer uw use-case vereist:

  • Sub-seconde latentie: toepassingen die binnen milliseconden op gegevens moeten reageren, zoals systemen voor fraudedetectie die transacties in realtime moeten blokkeren.
  • Operationele besluitvorming: systemen die directe acties activeren op basis van binnenkomende gegevens, zoals realtime aanbiedingen, waarschuwingen of meldingen.
  • Continue verwerking: workloads waar gegevens moeten worden verwerkt zodra ze binnenkomen, in plaats van in periodieke batches.

Gebruik de microbatchmodus (de standaardtrigger voor gestructureerd streamen) wanneer:

  • Analytische verwerking: ETL-pijplijnen, gegevenstransformaties en implementaties van medaillonarchitectuur waarbij latentievereisten in seconden of minuten worden gemeten.
  • Kostenoptimalisatie: workloads waarbij een latentie van minder dan een seconde niet is vereist, omdat voor de realtimemodus toegewezen rekenresources zijn vereist.
  • De controlepuntfrequentie is van belang: toepassingen die profiteren van frequentere controlepunten voor sneller herstel.

Vereisten en configuratie

De realtimemodus heeft specifieke vereisten voor het instellen en uitvoeren van query's. In deze sectie worden de vereisten en configuratiestappen beschreven die nodig zijn voor het gebruik van de realtimemodus.

Vereiste voorwaarden

Als u de realtimemodus wilt gebruiken, moet u voldoen aan de volgende vereisten:

  • Databricks Runtime 16.4 LTS of hoger: de realtimemodus is alleen beschikbaar in DBR 16.4 LTS en latere versies.
  • Toegewezen rekenkracht: u moet een toegewezen rekenproces (voorheen één gebruiker) gebruiken. Standard (voorheen gedeeld), Lakeflow Spark-declaratieve pijplijnen en serverloze clusters worden niet ondersteund.
  • Geen automatische schaalaanpassing: automatische schaalaanpassing moet worden uitgeschakeld.
  • Geen foton: Fotonversnelling wordt niet ondersteund met de realtime-modus.
  • Spark-configuratie: u moet spark.databricks.streaming.realTimeMode.enabled instellen op true.

Berekeningsconfiguratie

Configureer uw rekenproces met de volgende instellingen:

  • Ingesteld spark.databricks.streaming.realTimeMode.enabled op true in de Spark-configuratie.
  • Automatische schaalaanpassing uitschakelen.
  • Schakel Fotonversnelling uit.
  • Zorg ervoor dat de berekening is geconfigureerd als een toegewezen cluster (niet standaard, Lakeflow Spark-declaratieve pijplijnen of serverloos).

Zie Aan de slag met de realtime-modus voor stapsgewijze instructies voor het maken en configureren van berekeningen voor realtimemodus.

Queryconfiguratie

Als u een query in realtime wilt uitvoeren, moet u de realtime-trigger inschakelen. Realtime-triggers worden alleen ondersteund in de updatemodus.

Python

query = (
    spark.readStream
        .format("kafka")
        .option("kafka.bootstrap.servers", broker_address)
        .option("subscribe", input_topic)
        .load()
        .writeStream
        .format("kafka")
        .option("kafka.bootstrap.servers", broker_address)
        .option("topic", output_topic)
        .option("checkpointLocation", checkpoint_location)
        .outputMode("update")
        # In PySpark, the realTime trigger requires specifying the interval.
        .trigger(realTime="5 minutes")
        .start()
)

Scala

import org.apache.spark.sql.execution.streaming.RealTimeTrigger

val readStream = spark.readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", brokerAddress)
      .option("subscribe", inputTopic).load()
      .writeStream
      .format("kafka")
      .option("kafka.bootstrap.servers", brokerAddress)
      .option("topic", outputTopic)
      .option("checkpointLocation", checkpointLocation)
      .outputMode("update")
      .trigger(RealTimeTrigger.apply())
      // RealTimeTrigger can also accept an argument specifying the checkpoint interval.
      // For example, this code indicates a checkpoint interval of 5 minutes:
      // .trigger(RealTimeTrigger.apply("5 minutes"))
      .start()

Berekening van de computergrootte

U kunt één realtime taak per computing resource uitvoeren als er voldoende taaksloten beschikbaar zijn.

Als u in lage-latentiemodus wilt werken, moet het totale aantal beschikbare taakslots groter zijn dan of gelijk zijn aan het aantal taken over alle querystadia heen.

Voorbeelden van sleufberekeningen

Pijplijntype Configuratie Vereiste aansluitingen
Staatloos met één fase (Kafka-bron + sink) maxPartitions = 8 8 sleuven
Twee-fasen toestandsafhankelijk systeem (Kafka-bron + shuffle) maxPartitions = 8, partities verdelen = 20 28 sleuven (8 + 20)
Drie fasen (Kafka-bron + shuffle + repartition) maxPartitions = 8, twee willekeurige fasen van elk 20 48 sleuven (8 + 20 + 20)

Als u maxPartitions niet instelt, gebruikt u het aantal partities van het Kafka-topic.

Belangrijke overwegingen

Houd rekening met het volgende wanneer u uw rekenproces configureert:

  • In tegenstelling tot de microbatchmodus kunnen realtime taken inactief blijven tijdens het wachten op gegevens, zodat de juiste grootte essentieel is om verspilde resources te voorkomen.
  • Richt u op een doelgebruiksniveau (bijvoorbeeld 50%) door het volgende af te stemmen:
    • maxPartitions (voor Kafka)
    • spark.sql.shuffle.partitions (voor shuffle-fasen)
  • Databricks raadt u aan de instelling zo in te stellen maxPartitions dat elke taak meerdere Kafka-partities verwerkt om de overhead te verminderen.
  • Pas taakslots per werknemer aan zodat deze overeenkomen met de werkbelasting voor eenvoudige taken met één fase.
  • Experimenteer voor shuffle-intensieve taken om het minimale aantal shuffle-partities te vinden dat achterstanden voorkomt, en pas van daaruit aan. Het computersysteem plant de taak niet als het niet genoeg slots heeft.

Note

Vanuit Databricks Runtime 16.4 LTS en hoger gebruiken alle realtime pijplijnen controlepunt v2, waardoor naadloze schakelen tussen realtime- en microbatchmodi mogelijk is.

Optimalisatietechnieken

Technique Standaard ingeschakeld
Asynchrone voortgangstracering: Schrijven naar het offsetlogboek en het commitlogboek wordt naar een asynchrone thread verplaatst, waardoor de onderlinge tijd tussen twee microbatches wordt verminderd. Dit kan helpen de latentie van staatloze streamingquery's te verminderen. No
Asynchrone statuscontrolepunten: helpt de latentie van stateful streamingquery's te verminderen door de volgende microbatch te verwerken zodra de berekening van de vorige microbatch is voltooid, zonder te wachten op statuscontrolepunten. No

bewaking en waarneembaarheid

Het meten van queryprestaties is essentieel voor realtime workloads. In realtime-modus geven traditionele metrische gegevens over batchduur geen werkelijke latentie weer, dus u hebt alternatieve benaderingen nodig.

End-to-end latentie is de workloadspecifiek en kan soms alleen nauwkeurig worden gemeten met bedrijfslogica. Als de brontijdstempel bijvoorbeeld wordt weergegeven in Kafka, kunt u de latentie berekenen als het verschil tussen de tijdstempel van Kafka en de brontijdstempel.

U kunt ook end-to-end latentie schatten met behulp van de ingebouwde metrische gegevens en API's die hieronder worden beschreven.

Ingebouwde metrische gegevens met StreamingQueryProgress

De volgende metrische gegevens worden opgenomen in de StreamingQueryProgress gebeurtenis, die automatisch wordt geregistreerd in de stuurprogrammalogboeken. U kunt ze ook openen via de callback-functie van de StreamingQueryListeneronQueryProgress() functie. QueryProgressEvent.json() ofwel toString() extra metrische gegevens voor de realtimemodus opnemen.

  1. Verwerkingslatentie (processingLatencyMs). De tijd die is verstreken tussen het moment waarop de query in de real-time modus een record leest en wanneer de query deze naar de volgende fase of downstream schrijft. Voor query's met één fase meet dit dezelfde duur als de E2E-latentie. Het systeem rapporteert deze metrische gegevens per taak.
  2. Wachtrijlatentie van de bron (sourceQueuingLatencyMs). De hoeveelheid tijd die is verstreken tussen het moment waarop het systeem een record naar een berichtenbus schrijft, bijvoorbeeld de toevoegtijd van het logboek in Kafka, en wanneer de realtime-modusquery de record voor het eerst leest. Het systeem rapporteert deze metrische gegevens per taak.
  3. E2E Latentie (e2eLatencyMs). De tijd tussen het moment waarop het systeem de record naar een berichtenbus schrijft en wanneer de realtimemodusquery de record downstream schrijft. Het systeem voegt deze metrische waarde per batch samen voor alle records die door alle taken worden verwerkt.

Voorbeeld:

"rtmMetrics" : {
    "processingLatencyMs" : {
      "P0" : 0,
      "P50" : 0,
      "P90" : 0,
      "P95" : 0,
      "P99" : 0
    },
    "sourceQueuingLatencyMs" : {
      "P0" : 0,
      "P50" : 1,
      "P90" : 1,
      "P95" : 2,
      "P99" : 3
    },
    "e2eLatencyMs" : {
      "P0" : 0,
      "P50" : 1,
      "P90" : 1,
      "P95" : 2,
      "P99" : 4
    },

Aangepaste latentiemeting met observeer-API

Met de Observe-API kunt u latentie meten zonder een andere taak te starten. Als u een brontijdstempel hebt die de aankomsttijd van de brongegevens benadert, kunt u met behulp van de Observe-API de latentie van elke batch schatten. Geef de tijdstempel door voordat u de sink bereikt:

Python

from datetime import datetime

from pyspark.sql.functions import avg, col, lit, max, percentile_approx, udf, unix_millis
from pyspark.sql.types import TimestampType

@udf(returnType=TimestampType())
def current_timestamp():
  return datetime.now()

# Query before outputting
.withColumn("temp-timestamp", current_timestamp())
.withColumn(
  "latency",
  unix_millis(col("temp-timestamp")).cast("long") - unix_millis(col("timestamp")).cast("long"))
.observe(
  "observedLatency",
  avg(col("latency")).alias("avg"),
  max(col("latency")).alias("max"),
  percentile_approx(col("latency"), lit(0.99), lit(150)).alias("p99"),
  percentile_approx(col("latency"), lit(0.5), lit(150)).alias("p50"))
.drop(col("latency"))
.drop(col("temp-timestamp"))
# Output part of the query. For example, .WriteStream, etc.

Scala

import org.apache.spark.sql.functions.{avg, col, lit, max, percentile_approx, udf, unix_millis}

val currentTimestampUDF = udf(() => System.currentTimeMillis())

// Query before outputting
.withColumn("temp-timestamp", currentTimestampUDF())
.withColumn(
  "latency",
  col("temp-timestamp").cast("long") - unix_millis(col("timestamp")).cast("long"))
.observe(
  name = "observedLatency",
  avg(col("latency")).as("avg"),
  max(col("latency")).as("max"),
  percentile_approx(col("latency"), lit(0.99), lit(150)).as("p99"),
  percentile_approx(col("latency"), lit(0.5), lit(150)).as("p50"))
.drop(col("latency"))
.drop(col("temp-timestamp"))
// Output part of the query. For example, .WriteStream, etc.

In dit voorbeeld wordt een huidige tijdstempel vastgelegd voordat de invoer wordt uitgevoerd. Latentie wordt geschat door het verschil tussen deze tijdstempel en de brontijdstempel van de record te berekenen. De resultaten worden opgenomen in voortgangsrapporten en beschikbaar gesteld aan listeners. Hier volgt een voorbeelduitvoer:

"observedMetrics" : {
  "observedLatency" : {
    "avg" : 63.8369765176552,
    "max" : 219,
    "p99" : 154,
    "p50" : 49
  }
}

Functieondersteuning en -beperkingen

In deze sectie worden de ondersteunde functies en huidige beperkingen van de realtimemodus beschreven, waaronder compatibele omgevingen, talen, bronnen, sinks, operators en speciale overwegingen voor specifieke functies.

Ondersteunde omgevingen, talen en modi

Rekentype Supported
Dedicated (voorheen: één gebruiker) Yes
Standard (voorheen: gedeeld) No
Klassieke declaratieve Pijplijnen van Lakeflow Spark No
Lakeflow Spark-declaratieve pijplijnen serverloos No
Serverless No

Ondersteunde talen:

Language Supported
Scala Yes
Java Yes
Python Yes

Ondersteunde uitvoeringsmodi:

Uitvoeringsmodus Supported
Update-modus Yes
Append mode No
Volledige modus No

Ondersteunde bronnen en sinks

Bronnen:

Sources Supported
Apache Kafka Yes
AWS MSK Yes
Event Hubs (met behulp van Kafka-connector) Yes
Kinesis Ja (enkel EFO-modus)
Google Pub/Sub (een berichten- en gebeurtenissenservice van Google) No
Apache Pulsar No

Heatsinks:

Sinks Supported
Apache Kafka Yes
Event Hubs (met behulp van Kafka-connector) Yes
Kinesis No
Google Pub/Sub (een berichten- en gebeurtenissenservice van Google) No
Apache Pulsar No
Willekeurige sinks (met forEachWriter) Yes

Ondersteunde operators

Operators Supported
Staatloze bewerkingen
Selection Yes
Projection Yes
UDF's
Scala UDF Ja (met enkele beperkingen)
Python UDF Ja (met enkele beperkingen)
Aggregatie-
sum Yes
count Yes
max Yes
min Yes
avg Yes
Aggregatiesfuncties Yes
Windowing
Tumbling Yes
Sliding Yes
Session No
Deduplicatie
dropDuplicates Ja (de toestand is onbegrensd)
dropDuplicatesWithinWatermark No
Stream - Tabelkoppeling
Broadcast-tabel (moet klein zijn) Yes
Stream - Stream Toevoegen No
(plat)MapGroupsWithState No
transformWithState Ja (met enkele verschillen)
union Ja (met enkele beperkingen)
forEach Yes
forEachBatch No
mapPartitions Nee (zie beperking)

Speciale overwegingen

Sommige operators en functies hebben specifieke overwegingen of verschillen bij gebruik in realtimemodus.

transformWithState in realtimemodus

Voor het bouwen van aangepaste stateful toepassingen ondersteunt Databricks transformWithState, een API in Apache Spark Structured Streaming. Zie Een aangepaste stateful toepassing bouwen voor meer informatie over de API en codefragmenten.

Er zijn echter enkele verschillen tussen hoe de API zich gedraagt in realtime-modus en traditionele streamingquery's die gebruikmaken van de microbatcharchitectuur.

  • In realtime wordt de handleInputRows(key: String, inputRows: Iterator[T], timerValues: TimerValues) methode voor elke rij aangeroepen.
    • De inputRows iterator retourneert één waarde. De microbatchmodus roept deze eenmaal aan voor elke sleutel en de inputRows iterator retourneert alle waarden voor een sleutel in de microbatch.
    • U moet rekening houden met dit verschil bij het schrijven van uw code.
  • Timers voor gebeurtenistijd worden niet ondersteund in realtimemodus.
  • In realtime-modus worden timers vertraagd bij het activeren, afhankelijk van de aankomst van gegevens:
    • Als een timer is gepland voor 10:00:00, maar er geen gegevens binnenkomen, wordt de timer niet onmiddellijk geactiveerd.
    • Als de gegevens om 10:00:10 binnenkomen, wordt de timer geactiveerd met een vertraging van 10 seconden.
    • Als er geen gegevens binnenkomen en de langlopende batch wordt beëindigd, wordt de timer geactiveerd voordat de batch wordt beëindigd.

Python UDFs in real-time modus

Databricks ondersteunt het merendeel van door de gebruiker gedefinieerde Python-functies (UDF's) in realtime:

UDF type Supported
Staatloze UDF
Python scalaire UDF (koppeling) Yes
Pijl scalaire UDF Yes
Pandas scalar UDF (koppeling) Yes
Pijlfunctie (mapInArrow) Yes
Pandas functie (koppeling) Yes
Stateful Grouping UDF (UDAF)
transformWithState (alleen Row interface) Yes
applyInPandasWithState No
Niet-opbouwend groepering-UDF (UDAF)
apply No
applyInArrow No
applyInPandas No
Tabelfunctie
UDTF (koppeling) No
UC UDF No

Er zijn verschillende punten om rekening mee te houden bij het gebruik van Python UDF's in realtime-modus:

  • Als u de latentie wilt minimaliseren, configureert u de grootte van de pijlbatch (spark.sql.execution.arrow.maxRecordsPerBatch) op 1.
    • Afweging: Deze configuratie optimaliseert voor een lagere latentie ten koste van de doorvoer. Voor de meeste workloads wordt deze instelling aanbevolen.
    • Verhoog de batchgrootte alleen als een hogere doorvoer is vereist voor invoervolume, waarbij de potentiële toename van de latentie wordt geaccepteerd.
  • Pandas UDFs en functies presteren niet goed met een Arrow batch grootte van 1.
    • Als u pandas UDF's of functies gebruikt, stelt u de Arrow-batchgrootte in op een hogere waarde (bijvoorbeeld 100 of hoger).
    • Houd er rekening mee dat dit een hogere latentie impliceert. Databricks raadt het gebruik van Arrow UDF of functie aan, indien mogelijk.
  • Vanwege het prestatieprobleem met pandas wordt transformWithState alleen ondersteund met de Row interface.

Limitations

Bronbeperkingen

Voor Kinesis biedt de realtime-modus geen ondersteuning voor de polling-modus. Bovendien kunnen frequente repartities een negatieve invloed hebben op de latentie.

Beperkingen van de unie

De operator Union heeft enkele beperkingen:

  • Realtime-modus biedt geen ondersteuning voor zelf-samenvoeging:
    • Kafka: U kunt niet hetzelfde gegevensframeobject van de bron gebruiken en afgeleide gegevensframes hiervan samenvoegen. Tijdelijke oplossing: gebruik verschillende DataFrames die uit dezelfde bron worden gelezen.
    • Kinesis: U kunt geen gegevensframes samenvoegen die zijn afgeleid van dezelfde Kinesis-bron met dezelfde configuratie. Tijdelijke oplossing: Naast het gebruik van verschillende DataFrames kunt u een andere optie 'consumerName' toewijzen aan elk DataFrame.
  • In realtime-modus worden stateful operators (bijvoorbeeld aggregate, deduplicatetransformWithState) die vóór de Unie zijn gedefinieerd, niet ondersteund.
  • De realtimemodus biedt geen ondersteuning voor samenvoeging met batchbronnen.

Beperking van MapPartitions

mapPartitions in Scala en vergelijkbare Python-API's (mapInPandas, mapInArrow) nemen een iterator van de gehele invoerpartitie en produceren een iterator voor de gehele uitvoer met arbitraire koppeling tussen invoer en uitvoer. Deze API's kunnen prestatieproblemen veroorzaken in de streamingmodus Real-Time door de volledige uitvoer te blokkeren, waardoor de latentie toeneemt. De semantiek van deze API's biedt geen ondersteuning voor het doorgeven van watermerken.

Gebruik scalaire UDF's in combinatie met complexe gegevenstypen transformeren of filter in plaats daarvan om vergelijkbare functionaliteit te bereiken.

Volgende stappen 

Nu u begrijpt wat de realtimemodus is en hoe u deze configureert, verkent u deze resources om realtime streamingtoepassingen te implementeren: