Partilhar via


DataSourceReader

Uma classe base para leitores de fontes de dados.

Os leitores de fontes de dados são responsáveis por enviar dados a partir de uma fonte de dados. Implemente esta classe e retorne uma instância de DataSource.reader() para tornar uma fonte de dados legível.

Sintaxe

from pyspark.sql.datasource import DataSourceReader

class MyDataSourceReader(DataSourceReader):
    def read(self, partition):
        ...

Methods

Método Descrição
pushFilters(filters) Chamado com a lista de filtros que podem ser enviados para a fonte de dados. Devolve um iterável de filtros que ainda precisam de ser avaliados pelo Spark. Por defeito, devolve todos os filtros, indicando que nenhum filtro é pressionado para baixo. pushFilters() é permitido modificar self. O objeto deve permanecer picklable após a modificação. As alterações a self são visíveis para partitions() e read().
partitions() Devolve uma sequência de InputPartition objetos que divide a leitura de dados em tarefas paralelas. Por defeito, devolve uma única partição. Override para melhor desempenho ao ler grandes conjuntos de dados. Todos os valores de partição devolvidos por partitions() devem ser objetos selecionáveis.
read(partition) Gera dados para uma dada partição e devolve um iterador de tuplas, linhas ou objetos PyArrow RecordBatch . Cada tupla ou linha é convertida numa linha no DataFrame final. Este método é abstrato e deve ser implementado.

Exemplos

Implemente um leitor básico que devolve linhas de uma lista de partições:

from pyspark.sql.datasource import DataSource, DataSourceReader, InputPartition

class MyDataSourceReader(DataSourceReader):
    def partitions(self):
        return [InputPartition(1), InputPartition(2), InputPartition(3)]

    def read(self, partition):
        yield (partition.value, 0)
        yield (partition.value, 1)

Devolve as linhas usando o PyArrow RecordBatch:

class MyDataSourceReader(DataSourceReader):
    def read(self, partition):
        import pyarrow as pa
        data = {
            "partition": [partition.value] * 2,
            "value": [0, 1]
        }
        table = pa.Table.from_pydict(data)
        for batch in table.to_batches():
            yield batch

Implemente o pushdown do filtro para suportar EqualTo filtros:

from pyspark.sql.datasource import DataSourceReader, EqualTo

class MyDataSourceReader(DataSourceReader):
    def __init__(self):
        self.filters = []

    def pushFilters(self, filters):
        for f in filters:
            if isinstance(f, EqualTo):
                self.filters.append(f)
            else:
                yield f

    def read(self, partition):
        ...