Dela via


DataSourceStreamReader

En basklass för läsare av strömmande datakällor.

Datakällströmläsare ansvarar för att mata ut data från en strömmande datakälla. Implementera den här klassen och returnera en instans från DataSource.streamReader() för att göra en datakälla läsbar som en strömmande källa.

Syntax

from pyspark.sql.datasource import DataSourceStreamReader

class MyDataSourceStreamReader(DataSourceStreamReader):
    def initialOffset(self):
        ...

    def partitions(self, start, end):
        ...

    def read(self, partition):
        ...

Methods

Metod Beskrivning
initialOffset() Returnerar den första förskjutningen av den strömmande datakällan som en diktamen. En ny direktuppspelningsfråga börjar läsas från den här förskjutningen. Omstartade frågor återupptas från förskjutningen med kontrollpunkter i stället.
partitions(start, end) Returnerar en sekvens med InputPartition objekt som representerar data mellan start och end förskjutningar. Returnerar en tom sekvens om start är endlika med .
read(partition) Genererar data för en viss partition och returnerar en iterator med tupplar, rader eller PyArrow-objekt RecordBatch . Varje tuppeln eller raden konverteras till en rad i den slutliga dataramen. Den här metoden är abstrakt och måste implementeras.
commit(end) Informerar källan om att Spark har slutfört bearbetningen av alla data för förskjutningar som är mindre än eller lika med end. Spark begär endast förskjutningar som är större än end i framtiden.
stop() Stoppar källan och frigör alla resurser som den har allokerat. Anropas när strömningsfrågan avslutas.

Notes

  • read() är statisk och tillståndslös. Få inte åtkomst till föränderliga klassmedlemmar eller behåll minnesinternt tillstånd mellan olika anrop av read().
  • Alla partitionsvärden som returneras av partitions() måste vara valbara objekt.
  • Förskjutningar representeras som en diktat eller rekursiv diktering vars nycklar och värden är primitiva typer: heltal, sträng eller booleskt värde.

Exempel

Implementera en strömmande läsare som läser från en sekvens med indexerade poster:

from pyspark.sql.datasource import (
    DataSource,
    DataSourceStreamReader,
    InputPartition,
)

class MyDataSourceStreamReader(DataSourceStreamReader):
    def initialOffset(self):
        return {"index": 0}

    def latestOffset(self, start, limit):
        return {"index": start["index"] + 10}

    def partitions(self, start, end):
        return [
            InputPartition(i)
            for i in range(start["index"], end["index"])
        ]

    def read(self, partition):
        yield (partition.value, f"record-{partition.value}")

    def commit(self, end):
        print(f"Committed up to offset {end}")

    def stop(self):
        print("Stopping stream reader")