Dela via


Datakälla

En basklass för datakällor.

Den här klassen representerar en anpassad datakälla som gör det möjligt att läsa från och/eller skriva till den. Datakällan innehåller metoder för att skapa läsare och skrivare för att läsa respektive skriva data. Minst en av metoderna reader() eller writer() måste implementeras av någon underklass för att göra datakällan antingen läsbar eller skrivbar (eller båda).

När du har implementerat det här gränssnittet kan du läsa in datakällan med hjälp av spark.read.format(...).load() och spara data med hjälp av df.write.format(...).save().

Syntax

from pyspark.sql.datasource import DataSource

class MyDataSource(DataSource):
    @classmethod
    def name(cls):
        return "my_data_source"

Parameters

Parameter Type Beskrivning
options Dict En skiftlägeskänslig ordlista som representerar alternativen för den här datakällan.

Methods

Metod Beskrivning
name() Returnerar en sträng som representerar formatnamnet för den här datakällan. Som standard returnerar klassnamnet. Åsidosätt för att ange ett anpassat kort namn.
schema() Returnerar schemat för datakällan som en StructType eller DDL-sträng. Om det inte implementeras och inget schema tillhandahålls av användaren utlöses ett undantag.
reader(schema) Returnerar en DataSourceReader instans för att läsa data. Krävs för läsbara datakällor.
writer(schema, overwrite) Returnerar en DataSourceWriter instans för att skriva data. Krävs för skrivbara datakällor.
streamWriter(schema, overwrite) Returnerar en DataSourceStreamWriter instans för att skriva data till en direktuppspelningsmottagare. Krävs för skrivbara strömmande datakällor.
simpleStreamReader(schema) Returnerar en SimpleDataSourceStreamReader instans för att läsa strömmande data. Används endast när streamReader() inte har implementerats.
streamReader(schema) Returnerar en DataSourceStreamReader instans för att läsa strömmande data. Prioriterar över simpleStreamReader().

Exempel

Definiera och registrera en anpassad läsbar datakälla:

from pyspark.sql.datasource import DataSource, DataSourceReader, InputPartition

class MyDataSource(DataSource):
    @classmethod
    def name(cls):
        return "my_data_source"

    def schema(self):
        return "a INT, b STRING"

    def reader(self, schema):
        return MyDataSourceReader(schema)

class MyDataSourceReader(DataSourceReader):
    def read(self, partition):
        yield (1, "hello")
        yield (2, "world")

spark.dataSource.register(MyDataSource)
df = spark.read.format("my_data_source").load()
df.show()

Definiera en datakälla med ett StructType schema:

from pyspark.sql.types import StructType, StructField, IntegerType, StringType

class MyDataSource(DataSource):
    def schema(self):
        return StructType().add("a", "int").add("b", "string")