Compartir a través de


udtf

Crea una función de tabla definida por el usuario (UDTF).

Syntax

import pyspark.sql.functions as sf

# As a decorator
@sf.udtf(returnType=<returnType>, useArrow=<useArrow>)
class FunctionClass:
    def eval(self, *args):
        # function body
        yield row_data

# As a function wrapper
sf.udtf(cls=<class>, returnType=<returnType>, useArrow=<useArrow>)

Parámetros

Parámetro Tipo Description
cls class Optional. Clase de controlador de funciones de tabla definidas por el usuario de Python.
returnType pyspark.sql.types.StructType o str Optional. Tipo de valor devuelto de la función de tabla definida por el usuario. El valor puede ser un objeto StructType o una cadena de tipo de estructura con formato DDL. Si es None, la clase de controlador debe proporcionar el analyze método estático.
useArrow bool Optional. Indica si se debe usar Arrow para optimizar las serializaciones (de). Cuando se establece en None, se usa la configuración de Spark "spark.sql.execution.pythonUDTF.arrow.enabled".

Examples

Ejemplo 1: Implementación básica de UDTF.

from pyspark.sql.functions import udtf

class TestUDTF:
    def eval(self, *args):
        yield "hello", "world"

test_udtf = udtf(TestUDTF, returnType="c1: string, c2: string")
test_udtf().show()
+-----+-----+
|   c1|   c2|
+-----+-----+
|hello|world|
+-----+-----+

Ejemplo 2: UDTF mediante la sintaxis del decorador.

from pyspark.sql.functions import udtf, lit

@udtf(returnType="c1: int, c2: int")
class PlusOne:
    def eval(self, x: int):
        yield x, x + 1

PlusOne(lit(1)).show()
+---+---+
| c1| c2|
+---+---+
|  1|  2|
+---+---+

Ejemplo 3: UDTF con el método estático de análisis.

from pyspark.sql.functions import udtf, lit
from pyspark.sql.types import StructType
from pyspark.sql.udtf import AnalyzeArgument, AnalyzeResult

@udtf
class TestUDTFWithAnalyze:
    @staticmethod
    def analyze(a: AnalyzeArgument, b: AnalyzeArgument) -> AnalyzeResult:
        return AnalyzeResult(StructType().add("a", a.dataType).add("b", b.dataType))

    def eval(self, a, b):
        yield a, b

TestUDTFWithAnalyze(lit(1), lit("x")).show()
+---+---+
|  a|  b|
+---+---+
|  1|  x|
+---+---+

Ejemplo 4: UDTF con argumentos de palabra clave.

from pyspark.sql.functions import udtf, lit
from pyspark.sql.types import StructType
from pyspark.sql.udtf import AnalyzeArgument, AnalyzeResult

@udtf
class TestUDTFWithKwargs:
    @staticmethod
    def analyze(
        a: AnalyzeArgument, b: AnalyzeArgument, **kwargs: AnalyzeArgument
    ) -> AnalyzeResult:
        return AnalyzeResult(
            StructType().add("a", a.dataType)
                .add("b", b.dataType)
                .add("x", kwargs["x"].dataType)
        )

    def eval(self, a, b, **kwargs):
        yield a, b, kwargs["x"]

TestUDTFWithKwargs(lit(1), x=lit("x"), b=lit("b")).show()
+---+---+---+
|  a|  b|  x|
+---+---+---+
|  1|  b|  x|
+---+---+---+

Ejemplo 5: UDTF registrado y llamado a través de SQL.

from pyspark.sql.functions import udtf, lit
from pyspark.sql.types import StructType
from pyspark.sql.udtf import AnalyzeArgument, AnalyzeResult

@udtf
class TestUDTFWithKwargs:
    @staticmethod
    def analyze(
        a: AnalyzeArgument, b: AnalyzeArgument, **kwargs: AnalyzeArgument
    ) -> AnalyzeResult:
        return AnalyzeResult(
            StructType().add("a", a.dataType)
                .add("b", b.dataType)
                .add("x", kwargs["x"].dataType)
        )

    def eval(self, a, b, **kwargs):
        yield a, b, kwargs["x"]

_ = spark.udtf.register("test_udtf", TestUDTFWithKwargs)
spark.sql("SELECT * FROM test_udtf(1, x => 'x', b => 'b')").show()
+---+---+---+
|  a|  b|  x|
+---+---+---+
|  1|  b|  x|
+---+---+---+

Ejemplo 6: UDTF con la optimización de flecha habilitada.

from pyspark.sql.functions import udtf, lit

@udtf(returnType="c1: int, c2: int", useArrow=True)
class ArrowPlusOne:
    def eval(self, x: int):
        yield x, x + 1

ArrowPlusOne(lit(1)).show()
+---+---+
| c1| c2|
+---+---+
|  1|  2|
+---+---+

Ejemplo 7: Crear un UDTF determinista.

from pyspark.sql.functions import udtf

class PlusOne:
    def eval(self, a: int):
        yield a + 1,

plus_one = udtf(PlusOne, returnType="r: int").asDeterministic()