Kommentar
Åtkomst till den här sidan kräver auktorisering. Du kan prova att logga in eller ändra kataloger.
Åtkomst till den här sidan kräver auktorisering. Du kan prova att ändra kataloger.
Den här sidan beskriver de grundläggande begreppen om vattenmärkning och ger rekommendationer för att använda vattenmärken i vanliga operationer med tillståndskänslig strömning. Du måste använda vattenstämplar för tillståndsfulla streamingoperationer för att undvika att oändligt utöka mängden data som lagras i tillståndet, vilket kan orsaka minnesproblem eller öka processfördröjningar under långvariga streamingoperationer.
Vad är en vattenstämpel?
Structured Streaming använder vattenstämplar för att kontrollera tröskelvärdet för hur länge uppdateringar för ett givet tillståndsobjekt ska fortsätta bearbetas. Vanliga exempel på tillståndsentiteter är:
- Sammansättningar över ett tidsfönster.
- Unika nycklar i en koppling mellan två strömmar.
När du deklarerar en vattenstämpel anger du ett tidsstämpelfält och ett tröskelvärde för vattenstämpel på en strömmande DataFrame. När nya data tas emot spårar tillståndshanteraren den senaste tidsstämpeln i det angivna fältet och bearbetar alla poster inom tröskelvärdet för fördröjning.
I följande exempel tillämpas ett tröskelvärde på 10 minuter för ett antal fönster:
from pyspark.sql.functions import window
(df
.withWatermark("event_time", "10 minutes")
.groupBy(
window("event_time", "5 minutes"),
"id")
.count()
)
I det här exemplet:
- Kolumnen
event_timeanvänds för att definiera en vattenstämpel på 10 minuter och ett rullande fönster på 5 minuter. - För varje
idsom observeras samlas ett antal in för varje icke-överlappande 5-minutersfönster. - Tillståndsinformation bibehålls för varje räkning tills fönstrets slut är 10 minuter äldre än den senaste observerade
event_time.
Viktigt!
Tröskelvärden för vattenstämpel garanterar att poster som kommer inom det angivna tröskelvärdet bearbetas enligt semantiken för den definierade frågan. Poster som anländer sent och utanför det angivna tröskelvärdet kan fortfarande bearbetas med hjälp av frågemetoder, men detta är inte garanterat.
Hur påverkar vattenstämplar bearbetningstiden och dataflödet?
Vattenstämplar interagerar med utdatalägen för att styra när data skrivs till mottagaren. Eftersom vattenstämplar minskar den totala mängden tillståndsinformation som ska bearbetas är effektiv användning av vattenstämplar avgörande för effektivt tillståndskänsligt strömmande dataflöde.
Anmärkning
Alla utdatalägen stöds inte för alla tillståndskänsliga åtgärder.
Vattenstämplar och utgångsläge för fönsteraggregeringar
I följande tabell detaljeras bearbetning för frågor med aggregering med en tidsstämpel och en definierad vattenstämpel.
| Utdataläge | Funktionssätt |
|---|---|
| Lägga till | Rader skrivs till måltabellen när vattenstämpeltröskeln har passerat. Alla skrivningar fördröjs baserat på tröskelvärdet för fördröjning. Det gamla aggregeringstillståndet tas bort när tröskelvärdet har passerat. |
| Update | Rader skrivs till måltabellen när resultaten beräknas och kan uppdateras och skrivas över när nya data tas emot. Det gamla aggregeringstillståndet tas bort när tröskelvärdet har passerat. |
| Klart | Aggregatstatusen tas inte bort. Måltabellen skrivs om med varje utlösare. |
Vattenstämplar och utdata för strömströmanslutningar
Kopplingar mellan flera strömmar stöder endast tilläggsläge, och matchade poster skrivs i varje batch när de identifieras. För inre kopplingar rekommenderar Databricks att du anger ett tröskelvärde för vattenstämpel för varje strömmande datakälla. Detta gör att tillståndsinformation kan ignoreras för gamla poster. Utan vattenstämplar försöker Structured Streaming ansluta varje nyckel från båda sidor av kopplingen med varje utlösare.
Strukturerad direktuppspelning har särskilda semantik för att stödja yttre kopplingar. Vattenstämpling är obligatorisk för outer joins, eftersom det indikerar när en nyckel måste skrivas med ett nullvärde efter att den inte har hittat någon match. Även om yttre kopplingar kan vara användbara för att registrera poster som aldrig matchas under databearbetningen, eftersom kopplingar endast skrivs till tabeller som tilläggsåtgärder, registreras inte dessa saknade data förrän efter att tröskelvärdet för fördröjning har passerat.
Kontrollera tröskelvärdet för sena data med principen för flera vattenstämplar i strukturerad direktuppspelning
När du arbetar med flera indata för strukturerad direktuppspelning kan du ange flera vattenstämplar för att kontrollera toleranströsklar för data som kommer sent. Genom att konfigurera watermarks kan du kontrollera tillståndsinformationen och påverka latensen.
En direktuppspelningsfråga kan ha flera indataströmmar som är sammankopplade eller kopplade. Var och en av indataströmmarna kan ha olika tröskelvärden för sena data som måste tolereras för tillståndskänsliga åtgärder. Ange dessa tröskelvärden för var och en av indataströmmarna med hjälp av withWatermarks("eventTime", delay). Följande är en exempelfråga med stream-stream-kopplingar.
val inputStream1 = ... // delays up to 1 hour
val inputStream2 = ... // delays up to 2 hours
inputStream1.withWatermark("eventTime1", "1 hour")
.join(
inputStream2.withWatermark("eventTime2", "2 hours"),
joinCondition)
När du kör förfrågan spårar Structured Streaming individuellt den maximala observerade händelsetiden i varje indataström, beräknar vattenstämplar baserat på motsvarande fördröjning och väljer en enda global vattenstämpel utifrån dessa som ska användas för tillståndsberoende operationer. Som standardinställning väljs minimivärdet som globalt vattenmärke eftersom det förhindrar att data oavsiktligt tas bort som för sent om en av strömmarna hamnar bakom de andra (till exempel slutar en av strömmarna att ta emot data på grund av uppströmsfel). Med andra ord rör sig den globala vattenstämpeln säkert i den långsammaste strömmens takt och frågeutdata fördröjs i enlighet därmed.
Om du vill få snabbare resultat kan du ange principen för flera vattenstämplar för att välja det maximala värdet som global vattenstämpel genom att ange SQL-konfigurationen spark.sql.streaming.multipleWatermarkPolicy till max (standardvärdet är min). På så sätt kan den globala vattenstämpeln röra sig i den snabbaste strömmens takt. Den här konfigurationen släpper dock data från de långsammaste strömmarna. Databricks rekommenderar att du använder den här konfigurationen med omdöme.
Tillämpa vattenstämplar på distinkta åtgärder
Åtgärden distinct är en tillståndskänslig operator som kräver vattenstämplar för att förhindra obundna tillståndstillväxt. Utan vattenstämplar försöker Structured Streaming spåra varje unik post på obestämd tid, vilket kan leda till minnesproblem eller ökade svarstider för bearbetning.
När du applicerar distinct på en strömmande DataFrame måste du specificera en vattenstämpel i ett tidsstämpelfält. Vattenstämpeln styr hur länge tillståndshanteraren bibehåller dokumentation för deduplicering. När vattenstämpeltröskeln har passerat tas gamla poster bort från systemet.
I följande exempel tillämpas en vattenstämpel på en distinct åtgärd:
Python
streamingDf = spark.readStream. ... # columns: eventTime, id, value, ...
# Apply watermark before distinct operation
(streamingDf
.withWatermark("eventTime", "1 hour")
.distinct()
)
Scala
val streamingDf = spark.readStream. ... // columns: eventTime, id, value, ...
// Apply watermark before distinct operation
streamingDf
.withWatermark("eventTime", "1 hour")
.distinct()
I det här exemplet tas dubblettregister som anländer inom 1 timme efter den senaste observerade eventTime bort från strömmen. Tillståndsinformation för deduplicering tas bort när tröskelvärdet har passerat.
Viktigt!
Om du behöver deduplicera på specifika kolumner i stället för alla kolumner, använd dropDuplicates() eller dropDuplicatesWithinWatermark() i stället för distinct. Mer information finns i nästa avsnitt.
Ta bort dubbletter inom vattenstämpeln
I Databricks Runtime 13.3 LTS eller senare kan du deduplicera poster inom ett tröskelvärde för vattenstämpel med hjälp av en unik identifierare.
Strukturerad direktuppspelning ger bearbetningsgarantier exakt en gång, men deduplicerar inte automatiskt poster från datakällor. Du kan använda dropDuplicatesWithinWatermark för att deduplicera poster i ett angivet fält, så att du kan ta bort dubbletter från en dataström även om vissa fält skiljer sig åt (till exempel händelsetid eller ankomsttid).
Dubbletter av poster som tas emot inom den angivna tidsgränsen garanteras att tas bort. Den här garantin är strikt i endast en riktning, och dubbletter av poster som kommer utanför det angivna tröskelvärdet kan också tas bort. Du måste ange tröskelvärdet för fördröjning för vattenstämpeln som är längre än maximala tidsstämpelskillnader mellan duplicerade händelser för att ta bort alla dubbletter.
Du måste ange en vattenstämpel för att använda dropDuplicatesWithinWatermark metoden, som i följande exempel:
Python
streamingDf = spark.readStream. ...
# deduplicate using guid column with watermark based on eventTime column
(streamingDf
.withWatermark("eventTime", "10 hours")
.dropDuplicatesWithinWatermark(["guid"])
)
Scala
val streamingDf = spark.readStream. ... // columns: guid, eventTime, ...
// deduplicate using guid column with watermark based on eventTime column
streamingDf
.withWatermark("eventTime", "10 hours")
.dropDuplicatesWithinWatermark(Seq("guid"))