Partilhar via


DataSourceStreamReader

Uma classe base para streaming de leitores de fontes de dados.

Os leitores de fluxo de fonte de dados são responsáveis por emitir dados a partir de uma fonte de dados em streaming. Implemente esta classe e retorne uma instância de DataSource.streamReader() para tornar uma fonte de dados legível como fonte de streaming.

Sintaxe

from pyspark.sql.datasource import DataSourceStreamReader

class MyDataSourceStreamReader(DataSourceStreamReader):
    def initialOffset(self):
        ...

    def partitions(self, start, end):
        ...

    def read(self, partition):
        ...

Methods

Método Descrição
initialOffset() Devolve o deslocamento inicial da fonte de dados em streaming como um dict. Uma nova consulta de streaming começa a ler a partir deste deslocamento. As consultas reiniciadas retomam a partir do deslocamento checkpoint.
partitions(start, end) Devolve uma sequência de InputPartition objetos que representam os dados entre start e end os deslocamentos. Devolve uma sequência vazia se start for endigual a .
read(partition) Gera dados para uma dada partição e devolve um iterador de tuplas, linhas ou objetos PyArrow RecordBatch . Cada tupla ou linha é convertida numa linha no DataFrame final. Este método é abstrato e deve ser implementado.
commit(end) Informa a fonte que o Spark completou o processamento de todos os dados para deslocamentos menores ou iguais a end. A Spark só irá pedir compensações superiores às end futuras.
stop() Para a fonte e liberta quaisquer recursos que tenha alocado. Invocado quando a consulta de streaming termina.

Notes

  • read() é estático e sem estado. Não aceda a membros da classe mutável nem mantenha estado em memória entre diferentes invocações de read().
  • Todos os valores de partição devolvidos por partitions() devem ser objetos selecionáveis.
  • Os deslocamentos são representados como um dict ou dict recursivo cujas chaves e valores são tipos primitivos: inteiro, string ou booleano.

Exemplos

Implemente um leitor de streaming que leia a partir de uma sequência de registos indexados:

from pyspark.sql.datasource import (
    DataSource,
    DataSourceStreamReader,
    InputPartition,
)

class MyDataSourceStreamReader(DataSourceStreamReader):
    def initialOffset(self):
        return {"index": 0}

    def latestOffset(self, start, limit):
        return {"index": start["index"] + 10}

    def partitions(self, start, end):
        return [
            InputPartition(i)
            for i in range(start["index"], end["index"])
        ]

    def read(self, partition):
        yield (partition.value, f"record-{partition.value}")

    def commit(self, end):
        print(f"Committed up to offset {end}")

    def stop(self):
        print("Stopping stream reader")