Kommentar
Åtkomst till den här sidan kräver auktorisering. Du kan prova att logga in eller ändra kataloger.
Åtkomst till den här sidan kräver auktorisering. Du kan prova att ändra kataloger.
En basklass för datakällans läsare.
Datakällläsare ansvarar för att mata ut data från en datakälla. Implementera den här klassen och returnera en instans från DataSource.reader() för att göra en datakälla läsbar.
Syntax
from pyspark.sql.datasource import DataSourceReader
class MyDataSourceReader(DataSourceReader):
def read(self, partition):
...
Methods
| Metod | Beskrivning |
|---|---|
pushFilters(filters) |
Anropas med listan över filter som kan skickas ned till datakällan. Returnerar ett iterbart filter som fortfarande måste utvärderas av Spark. Som standard returnerar alla filter, vilket indikerar att inga filter trycks ned.
pushFilters() tillåts att ändra self. Objektet måste förbli picklable efter ändring. Ändringar som ska self visas för partitions() och read(). |
partitions() |
Returnerar en sekvens med InputPartition objekt som delar upp dataläsning i parallella uppgifter. Som standard returnerar en enskild partition. Åsidosätt för bättre prestanda vid läsning av stora datamängder. Alla partitionsvärden som returneras av partitions() måste vara valbara objekt. |
read(partition) |
Genererar data för en viss partition och returnerar en iterator med tupplar, rader eller PyArrow-objekt RecordBatch . Varje tuppeln eller raden konverteras till en rad i den slutliga dataramen. Den här metoden är abstrakt och måste implementeras. |
Exempel
Implementera en grundläggande läsare som returnerar rader från en lista med partitioner:
from pyspark.sql.datasource import DataSource, DataSourceReader, InputPartition
class MyDataSourceReader(DataSourceReader):
def partitions(self):
return [InputPartition(1), InputPartition(2), InputPartition(3)]
def read(self, partition):
yield (partition.value, 0)
yield (partition.value, 1)
Returnera rader med PyArrow RecordBatch:
class MyDataSourceReader(DataSourceReader):
def read(self, partition):
import pyarrow as pa
data = {
"partition": [partition.value] * 2,
"value": [0, 1]
}
table = pa.Table.from_pydict(data)
for batch in table.to_batches():
yield batch
Implementera filter-pushdown för att stödja EqualTo filter:
from pyspark.sql.datasource import DataSourceReader, EqualTo
class MyDataSourceReader(DataSourceReader):
def __init__(self):
self.filters = []
def pushFilters(self, filters):
for f in filters:
if isinstance(f, EqualTo):
self.filters.append(f)
else:
yield f
def read(self, partition):
...