Compartilhar via


DataSourceReader

Uma classe base para leitores de fonte de dados.

Os leitores de fonte de dados são responsáveis por gerar dados de uma fonte de dados. Implemente essa classe e retorne uma instância para tornar uma fonte de DataSource.reader() dados legível.

Sintaxe

from pyspark.sql.datasource import DataSourceReader

class MyDataSourceReader(DataSourceReader):
    def read(self, partition):
        ...

Methods

Método Descrição
pushFilters(filters) Chamado com a lista de filtros que podem ser enviados por push para a fonte de dados. Retorna um iterável de filtros que ainda precisam ser avaliados pelo Spark. Por padrão, retorna todos os filtros, indicando que nenhum filtro é enviado por push para baixo. pushFilters() é permitido modificar self. O objeto deve permanecer picklable após a modificação. As alterações para self as quais são visíveis partitions() e read().
partitions() Retorna uma sequência de objetos que dividem a leitura de InputPartition dados em tarefas paralelas. Por padrão, retorna uma única partição. Substitua por um melhor desempenho ao ler grandes conjuntos de dados. Todos os valores de partição retornados por partitions() devem ser objetos picklable.
read(partition) Gera dados para uma determinada partição e retorna um iterador de tuplas, linhas ou objetos PyArrow RecordBatch . Cada tupla ou linha é convertida em uma linha no DataFrame final. Esse método é abstrato e deve ser implementado.

Exemplos

Implemente um leitor básico que retorna linhas de uma lista de partições:

from pyspark.sql.datasource import DataSource, DataSourceReader, InputPartition

class MyDataSourceReader(DataSourceReader):
    def partitions(self):
        return [InputPartition(1), InputPartition(2), InputPartition(3)]

    def read(self, partition):
        yield (partition.value, 0)
        yield (partition.value, 1)

Retornar linhas usando PyArrow RecordBatch:

class MyDataSourceReader(DataSourceReader):
    def read(self, partition):
        import pyarrow as pa
        data = {
            "partition": [partition.value] * 2,
            "value": [0, 1]
        }
        table = pa.Table.from_pydict(data)
        for batch in table.to_batches():
            yield batch

Implemente o pushdown de filtro para dar suporte EqualTo a filtros:

from pyspark.sql.datasource import DataSourceReader, EqualTo

class MyDataSourceReader(DataSourceReader):
    def __init__(self):
        self.filters = []

    def pushFilters(self, filters):
        for f in filters:
            if isinstance(f, EqualTo):
                self.filters.append(f)
            else:
                yield f

    def read(self, partition):
        ...