Compartilhar via


DataSourceStreamReader

Uma classe base para leitores de fonte de dados de streaming.

Os leitores de fluxo de fonte de dados são responsáveis por gerar dados de uma fonte de dados de streaming. Implemente essa classe e retorne uma instância para tornar uma fonte de DataSource.streamReader() dados legível como uma fonte de streaming.

Sintaxe

from pyspark.sql.datasource import DataSourceStreamReader

class MyDataSourceStreamReader(DataSourceStreamReader):
    def initialOffset(self):
        ...

    def partitions(self, start, end):
        ...

    def read(self, partition):
        ...

Methods

Método Descrição
initialOffset() Retorna o deslocamento inicial da fonte de dados de streaming como um ditado. Uma nova consulta de streaming inicia a leitura desse deslocamento. Em vez disso, as consultas reiniciadas são retomadas do deslocamento com ponto de verificação.
partitions(start, end) Retorna uma sequência de InputPartition objetos que representam os dados entre start e end deslocamentos. Retorna uma sequência vazia se start for igual enda .
read(partition) Gera dados para uma determinada partição e retorna um iterador de tuplas, linhas ou objetos PyArrow RecordBatch . Cada tupla ou linha é convertida em uma linha no DataFrame final. Esse método é abstrato e deve ser implementado.
commit(end) Informa à fonte que o Spark concluiu o processamento de todos os dados para deslocamentos menores ou iguais a end. O Spark solicitará apenas deslocamentos maiores do que end no futuro.
stop() Interrompe a origem e libera todos os recursos alocados. Invocado quando a consulta de streaming é encerrada.

Observações

  • read() é estático e sem estado. Não acesse membros de classe mutáveis ou mantenha o estado na memória entre invocações diferentes de read().
  • Todos os valores de partição retornados por partitions() devem ser objetos picklable.
  • Os deslocamentos são representados como um ditado ou ditado recursivo cujas chaves e valores são tipos primitivos: inteiro, cadeia de caracteres ou booliano.

Exemplos

Implementar um leitor de streaming que lê de uma sequência de registros indexados:

from pyspark.sql.datasource import (
    DataSource,
    DataSourceStreamReader,
    InputPartition,
)

class MyDataSourceStreamReader(DataSourceStreamReader):
    def initialOffset(self):
        return {"index": 0}

    def latestOffset(self, start, limit):
        return {"index": start["index"] + 10}

    def partitions(self, start, end):
        return [
            InputPartition(i)
            for i in range(start["index"], end["index"])
        ]

    def read(self, partition):
        yield (partition.value, f"record-{partition.value}")

    def commit(self, end):
        print(f"Committed up to offset {end}")

    def stop(self):
        print("Stopping stream reader")