Nota
O acesso a esta página requer autorização. Pode tentar iniciar sessão ou alterar os diretórios.
O acesso a esta página requer autorização. Pode tentar alterar os diretórios.
Uma classe base para streaming de leitores de fontes de dados.
Os leitores de fluxo de fonte de dados são responsáveis por emitir dados a partir de uma fonte de dados em streaming. Implemente esta classe e retorne uma instância de DataSource.streamReader() para tornar uma fonte de dados legível como fonte de streaming.
Sintaxe
from pyspark.sql.datasource import DataSourceStreamReader
class MyDataSourceStreamReader(DataSourceStreamReader):
def initialOffset(self):
...
def partitions(self, start, end):
...
def read(self, partition):
...
Methods
| Método | Descrição |
|---|---|
initialOffset() |
Devolve o deslocamento inicial da fonte de dados em streaming como um dict. Uma nova consulta de streaming começa a ler a partir deste deslocamento. As consultas reiniciadas retomam a partir do deslocamento checkpoint. |
partitions(start, end) |
Devolve uma sequência de InputPartition objetos que representam os dados entre start e end os deslocamentos. Devolve uma sequência vazia se start for endigual a . |
read(partition) |
Gera dados para uma dada partição e devolve um iterador de tuplas, linhas ou objetos PyArrow RecordBatch . Cada tupla ou linha é convertida numa linha no DataFrame final. Este método é abstrato e deve ser implementado. |
commit(end) |
Informa a fonte que o Spark completou o processamento de todos os dados para deslocamentos menores ou iguais a end. A Spark só irá pedir compensações superiores às end futuras. |
stop() |
Para a fonte e liberta quaisquer recursos que tenha alocado. Invocado quando a consulta de streaming termina. |
Notes
-
read()é estático e sem estado. Não aceda a membros da classe mutável nem mantenha estado em memória entre diferentes invocações deread(). - Todos os valores de partição devolvidos por
partitions()devem ser objetos selecionáveis. - Os deslocamentos são representados como um dict ou dict recursivo cujas chaves e valores são tipos primitivos: inteiro, string ou booleano.
Exemplos
Implemente um leitor de streaming que leia a partir de uma sequência de registos indexados:
from pyspark.sql.datasource import (
DataSource,
DataSourceStreamReader,
InputPartition,
)
class MyDataSourceStreamReader(DataSourceStreamReader):
def initialOffset(self):
return {"index": 0}
def latestOffset(self, start, limit):
return {"index": start["index"] + 10}
def partitions(self, start, end):
return [
InputPartition(i)
for i in range(start["index"], end["index"])
]
def read(self, partition):
yield (partition.value, f"record-{partition.value}")
def commit(self, end):
print(f"Committed up to offset {end}")
def stop(self):
print("Stopping stream reader")