Kommentar
Åtkomst till den här sidan kräver auktorisering. Du kan prova att logga in eller ändra kataloger.
Åtkomst till den här sidan kräver auktorisering. Du kan prova att ändra kataloger.
En basklass för läsare av strömmande datakällor.
Datakällströmläsare ansvarar för att mata ut data från en strömmande datakälla. Implementera den här klassen och returnera en instans från DataSource.streamReader() för att göra en datakälla läsbar som en strömmande källa.
Syntax
from pyspark.sql.datasource import DataSourceStreamReader
class MyDataSourceStreamReader(DataSourceStreamReader):
def initialOffset(self):
...
def partitions(self, start, end):
...
def read(self, partition):
...
Methods
| Metod | Beskrivning |
|---|---|
initialOffset() |
Returnerar den första förskjutningen av den strömmande datakällan som en diktamen. En ny direktuppspelningsfråga börjar läsas från den här förskjutningen. Omstartade frågor återupptas från förskjutningen med kontrollpunkter i stället. |
partitions(start, end) |
Returnerar en sekvens med InputPartition objekt som representerar data mellan start och end förskjutningar. Returnerar en tom sekvens om start är endlika med . |
read(partition) |
Genererar data för en viss partition och returnerar en iterator med tupplar, rader eller PyArrow-objekt RecordBatch . Varje tuppeln eller raden konverteras till en rad i den slutliga dataramen. Den här metoden är abstrakt och måste implementeras. |
commit(end) |
Informerar källan om att Spark har slutfört bearbetningen av alla data för förskjutningar som är mindre än eller lika med end. Spark begär endast förskjutningar som är större än end i framtiden. |
stop() |
Stoppar källan och frigör alla resurser som den har allokerat. Anropas när strömningsfrågan avslutas. |
Notes
-
read()är statisk och tillståndslös. Få inte åtkomst till föränderliga klassmedlemmar eller behåll minnesinternt tillstånd mellan olika anrop avread(). - Alla partitionsvärden som returneras av
partitions()måste vara valbara objekt. - Förskjutningar representeras som en diktat eller rekursiv diktering vars nycklar och värden är primitiva typer: heltal, sträng eller booleskt värde.
Exempel
Implementera en strömmande läsare som läser från en sekvens med indexerade poster:
from pyspark.sql.datasource import (
DataSource,
DataSourceStreamReader,
InputPartition,
)
class MyDataSourceStreamReader(DataSourceStreamReader):
def initialOffset(self):
return {"index": 0}
def latestOffset(self, start, limit):
return {"index": start["index"] + 10}
def partitions(self, start, end):
return [
InputPartition(i)
for i in range(start["index"], end["index"])
]
def read(self, partition):
yield (partition.value, f"record-{partition.value}")
def commit(self, end):
print(f"Committed up to offset {end}")
def stop(self):
print("Stopping stream reader")