Dela via


DataSourceReader

En basklass för datakällans läsare.

Datakällläsare ansvarar för att mata ut data från en datakälla. Implementera den här klassen och returnera en instans från DataSource.reader() för att göra en datakälla läsbar.

Syntax

from pyspark.sql.datasource import DataSourceReader

class MyDataSourceReader(DataSourceReader):
    def read(self, partition):
        ...

Methods

Metod Beskrivning
pushFilters(filters) Anropas med listan över filter som kan skickas ned till datakällan. Returnerar ett iterbart filter som fortfarande måste utvärderas av Spark. Som standard returnerar alla filter, vilket indikerar att inga filter trycks ned. pushFilters() tillåts att ändra self. Objektet måste förbli picklable efter ändring. Ändringar som ska self visas för partitions() och read().
partitions() Returnerar en sekvens med InputPartition objekt som delar upp dataläsning i parallella uppgifter. Som standard returnerar en enskild partition. Åsidosätt för bättre prestanda vid läsning av stora datamängder. Alla partitionsvärden som returneras av partitions() måste vara valbara objekt.
read(partition) Genererar data för en viss partition och returnerar en iterator med tupplar, rader eller PyArrow-objekt RecordBatch . Varje tuppeln eller raden konverteras till en rad i den slutliga dataramen. Den här metoden är abstrakt och måste implementeras.

Exempel

Implementera en grundläggande läsare som returnerar rader från en lista med partitioner:

from pyspark.sql.datasource import DataSource, DataSourceReader, InputPartition

class MyDataSourceReader(DataSourceReader):
    def partitions(self):
        return [InputPartition(1), InputPartition(2), InputPartition(3)]

    def read(self, partition):
        yield (partition.value, 0)
        yield (partition.value, 1)

Returnera rader med PyArrow RecordBatch:

class MyDataSourceReader(DataSourceReader):
    def read(self, partition):
        import pyarrow as pa
        data = {
            "partition": [partition.value] * 2,
            "value": [0, 1]
        }
        table = pa.Table.from_pydict(data)
        for batch in table.to_batches():
            yield batch

Implementera filter-pushdown för att stödja EqualTo filter:

from pyspark.sql.datasource import DataSourceReader, EqualTo

class MyDataSourceReader(DataSourceReader):
    def __init__(self):
        self.filters = []

    def pushFilters(self, filters):
        for f in filters:
            if isinstance(f, EqualTo):
                self.filters.append(f)
            else:
                yield f

    def read(self, partition):
        ...