Kommentar
Åtkomst till den här sidan kräver auktorisering. Du kan prova att logga in eller ändra kataloger.
Åtkomst till den här sidan kräver auktorisering. Du kan prova att ändra kataloger.
Anteckning
Den här artikeln beskriver Databricks Connect för Databricks Runtime 13.3 och senare.
Databricks Connect för Python stöder användardefinierade funktioner (UDF). När en DataFrame-åtgärd som innehåller UDF:er körs serialiseras UDF:erna av Databricks Connect och skickas till servern som en del av begäran.
Information om UDF:er för Databricks Connect för Scala finns i Användardefinierade funktioner i Databricks Connect för Scala.
Anteckning
Eftersom den användardefinierade funktionen serialiseras och deserialiseras måste Python-versionen av klienten matcha Python-versionen på Azure Databricks-beräkningen. Information om vilka versioner som stöds finns i -versionsstödmatrisen.
Definiera en UDF
Om du vill skapa en UDF i Databricks Connect för Python använder du någon av följande funktioner som stöds:
- Användardefinierade funktioner i PySpark
- PySpark-strömningsfunktioner
Följande Python konfigurerar till exempel en enkel UDF som kvadrerar värdena i en kolumn.
from pyspark.sql.functions import col, udf
from pyspark.sql.types import IntegerType
from databricks.connect import DatabricksSession
@udf(returnType=IntegerType())
def double(x):
return x * x
spark = DatabricksSession.builder.getOrCreate()
df = spark.range(1, 2)
df = df.withColumn("doubled", double(col("id")))
df.show()
Hantera UDF-beroenden
Viktigt!
Den här funktionen är i offentlig förhandsversion och kräver Databricks Connect för Python 16.4 eller senare och ett kluster som kör Databricks Runtime 16.4 eller senare. Om du vill använda den här funktionen aktiverar du förhandsversionen av utökade Python-UDF:er i Unity Catalog på din arbetsyta.
Databricks Connect har stöd för att ange Python-beroenden som krävs för UDF:er. Dessa beroenden installeras inom Databricks-datorresurser som en del av UDF:s Python-miljö.
Med den här funktionen kan användare ange beroenden som UDF behöver utöver de paket som tillhandahålls i basmiljön. Den kan också användas för att installera en annan version av paketet än vad som tillhandahålls i basmiljön.
Beroendenheter kan installeras från följande källor:
- PyPI-paket
- PyPI-paket kan anges enligt PEP 508, till exempel
dice,pyjokes<1ellersimplejson==3.19.*.
- PyPI-paket kan anges enligt PEP 508, till exempel
- Paket som lagras i Unity Catalog-volymer
- Både byggda distributioner (
.whl) och källdistributioner (.tar.gz) stöds. - Unity Catalog-volympaket kan anges som
dbfs:<path>, till exempeldbfs:/Volumes/users/someone@example.com/wheels/my_private_dep-3.20.2-py3-none-any.whlellerdbfs:/Volumes/users/someone@example.com/tars/my_private_dep-4.0.0.tar.gz. - Användaren måste beviljas
READ_FILEbehörighet för filen på volymen re:[UC]. Om du beviljar den här behörigheten till alla kontoanvändare aktiveras detta automatiskt för nya användare.
- Både byggda distributioner (
- Lokala paket, mappar och Python-filer
- Lokala byggda distributioner (
.whl), källdistributioner (.tar.gz), mappar och Python-filer kan anges somlocal:<path>, till exempel:local:/path/to/my_private_dep-3.20.2-py3-none-any.whl,local:/path/to/my_private_dep-4.0.0.tar.gz,local:/path/to/my_folder,local:/path/to/my_file.py. - Både absoluta och relativa sökvägar stöds, till exempel:
local:/path/to/my_file.pyellerlocal:./path/to/my_file.py.
- Lokala byggda distributioner (
Om du vill inkludera anpassade beroenden i din UDF anger du dem i en miljö med och withDependenciesanvänder sedan den miljön för att skapa en Spark-session. Beroendena installeras på din Databricks-beräkning och kommer att vara tillgängliga i alla UDF:er som använder den här Spark-sessionen.
Följande kod deklarerar PyPI-paketet dice som ett beroende:
from databricks.connect import DatabricksSession, DatabricksEnv
env = DatabricksEnv().withDependencies("dice==3.1.0")
spark = DatabricksSession.builder.withEnvironment(env).getOrCreate()
Eller för att ange ett beroende av ett hjul i en volym:
from databricks.connect import DatabricksSession, DatabricksEnv
env = DatabricksEnv().withDependencies("dbfs:/Volumes/users/someone@example.com/wheels/my_private_dep-3.20.2-py3-none-any.whl")
spark = DatabricksSession.builder.withEnvironment(env).getOrCreate()
Beteende i Databricks anteckningsböcker och jobb
I anteckningsböcker och jobb måste UDF-beroenden installeras direkt i REPL. Databricks Connect verifierar REPL Python-miljön genom att kontrollera att alla angivna beroenden redan är installerade och utlöser ett undantag om några inte är installerade. Validering av notebook-miljö körs för både PyPI- och Unity Catalog-volymberoenden, men inte för lokala beroenden.
Begränsningar
- Stöd för UDF-beroenden för
pyspark.sql.streaming.DataStreamWriter.foreachkräver Databricks Connect för Python 18.0 eller senare och ett kluster som kör Databricks Runtime 18.0 eller senare. - Stöd för UDF-beroenden för
pyspark.sql.streaming.DataStreamWriter.foreachBatchkräver Databricks Connect för Python 18.0 eller senare och ett kluster som kör Databricks Runtime 18.0 eller senare. Funktionen stöds inte i serverlös miljö. - Stöd för UDF-beroenden för lokala paket, mappar och Python-filer kräver Databricks Connect för Python 18.1 eller senare och ett kluster som kör Databricks Runtime 18.1 eller senare.
- UDF-beroenden stöds inte för pandas-sammansättnings-UDF:er över fönsterfunktioner.
- Unity Catalog-volympaket och lokala paket måste paketeras enligt standardspecifikationerna för Python-paketering från PEP-427 eller senare för hjulbyggda distributioner och PEP-241 eller senare för tar-källdistributioner. Mer information om Python-paketeringsstandarder finns i PyPA-dokumentationen.
Exempel
I följande exempel definieras PyPI- och volymberoenden i en miljö, en session med den miljön skapas och sedan definieras och anropas UDF:er som använder dessa beroenden:
from databricks.connect import DatabricksSession, DatabricksEnv
from pyspark.sql.functions import udf, col, pandas_udf
from pyspark.sql.types import IntegerType, LongType, StringType
import pandas as pd
pypi_deps = ["pyjokes>=0.8,<1"]
volumes_deps = [
# Example library from: https://pypi.org/project/dice/#files
"dbfs:/Volumes/main/someone@example.com/test/dice-4.0.0.tar.gz",
]
local_deps = [
# Example library from: https://pypi.org/project/simplejson/#files
"local:./test/simplejson-3.20.2-py3-none-any.whl",
]
env = DatabricksEnv().withDependencies(pypi_deps).withDependencies(volumes_deps).withDependencies(local_deps)
spark = DatabricksSession.builder.withEnvironment(env).getOrCreate()
# UDFs
@udf(returnType=StringType())
def get_joke():
from pyjokes import get_joke
return get_joke()
@udf(returnType=IntegerType())
def double_and_json_parse(x):
import simplejson
return simplejson.loads(simplejson.dumps(x * 2))
@pandas_udf(returnType=LongType())
def multiply_and_add_roll(a: pd.Series, b: pd.Series) -> pd.Series:
import dice
return a * b + dice.roll(f"1d10")[0]
df = spark.range(1, 10)
df = df.withColumns({
"joke": get_joke(),
"doubled": double_and_json_parse(col("id")),
"mutliplied_with_roll": multiply_and_add_roll(col("id"), col("doubled"))
})
df.show()
Automatisk hantering av UDF-beroenden
Viktigt!
Den här funktionen är i offentlig förhandsversion och kräver Databricks Connect för Python 18.1 eller senare, Python 3.12 på den lokala datorn och ett kluster som kör Databricks Runtime 18.1 eller senare. Om du vill använda den här funktionen aktiverar du förhandsversionen av utökade Python-UDF:er i Unity Catalog på din arbetsyta.
Databricks Connect-API withAutoDependencies() :et möjliggör automatisk identifiering och uppladdning av lokala moduler och offentliga PyPI-beroenden som används i importinstruktionerna i dina UDF:er. Det tar bort behovet av att manuellt ange beroenden.
Följande kod aktiverar automatisk beroendehantering:
from databricks.connect import DatabricksSession, DatabricksEnv
env = DatabricksEnv().withAutoDependencies(upload_local=True, use_index=True)
spark = DatabricksSession.builder.withEnvironment(env).getOrCreate()
Metoden withAutoDependencies() accepterar följande parametrar:
-
upload_local: När de är inställda påTrueidentifieras, paketeras och laddas lokala moduler som importeras i UDF-sandbox-miljön automatiskt till UDF-sandbox-miljön. -
use_index: När det är inställt påTrue, identifieras och installeras de offentliga PyPI-beroenden som används i dina UDF:er automatiskt på Azure Databricks-beräkning. Identifieringsprocessen använder de installerade paketen på den lokala datorn för att fastställa versioner, vilket säkerställer konsekvens mellan din lokala miljö och fjärrkörningsmiljön.
Begränsningar
- Dynamiska importer (till exempel
importlib.import_module("foo")) stöds inte. - Namnområdespaket (till exempel
azure.eventhubochgoogle.cloud.aiplatform) stöds inte. - Beroenden som installeras med direkt-URL-referenser stöds inte. Detta inkluderar de som är installerade från lokala wheel-filer.
- Beroenden installerade från privata paketregister stöds inte. Paket som är installerade på det här sättet kan inte skiljas från paket som installerats från den offentliga PyPI:en.
- Beroendeidentifiering fungerar inte i ett Python-gränssnitt. Endast Python-skript, IPython Shell och Jupyter Notebooks stöds.
Exempel
I följande exempel visas automatisk beroendehantering med både lokala moduler och PyPI-paket. Det här exemplet kräver att du har installerat simplejson och dice (använder pip install simplejson dice).
Skapa först lokala hjälpmoduler:
# my_helper.py
def double(x):
return 2 * x
# my_json.py
import simplejson
def loads(x):
return simplejson.loads(x)
def dumps(x):
return simplejson.dumps(x)
I huvudskriptet importerar du sedan dessa moduler och använder dem i UDF:er:
# main.py
import dice as dc
from databricks.connect import DatabricksSession, DatabricksEnv
from pyspark.sql.functions import col, udf
from pyspark.sql.types import IntegerType, FloatType
import my_json
from my_helper import double
env = DatabricksEnv().withAutoDependencies(upload_local=True, use_index=True)
spark = DatabricksSession.builder.withEnvironment(env).getOrCreate()
@udf(returnType=IntegerType())
def double_and_json_parse(x):
return my_json.loads(my_json.dumps(double(x)))
@udf(returnType=FloatType())
def sum_and_add_noise(x, y):
return x + y + (dc.roll("d6")[0] / 6)
df = spark.range(1, 10)
df = df.withColumns({
"doubled": double_and_json_parse(col("id")),
"summed_with_noise": sum_and_add_noise(col("id"), col("doubled")),
})
df.show()
Loggning / Skogsavverkning
Om du vill mata ut identifierade beroenden anger du SPARK_CONNECT_LOG_LEVEL miljövariabeln till info eller debug. Du kan också konfigurera Python-loggningsramverket:
import logging
logging.basicConfig(level=logging.INFO)
De relevanta loggarna genereras av modulen databricks.connect.auto_dependencies , till exempel:
DEBUG:databricks.connect.auto_dependencies.discovery:Discovered local module: my_json
DEBUG:databricks.connect.auto_dependencies.discovery:Discovered local module: my_helper
DEBUG:databricks.connect.auto_dependencies.discovery:Discovered distribution: simplejson for module simplejson
DEBUG:databricks.connect.auto_dependencies.discovery:Discovered distribution: dice for module dice
INFO:databricks.connect.auto_dependencies.hook:Synced zip artifact for: my_json
INFO:databricks.connect.auto_dependencies.hook:Synced zip artifact for: my_helper
INFO:databricks.connect.auto_dependencies.hook:Updated simplejson with auto-detected version ==3.20.2
INFO:databricks.connect.auto_dependencies.hook:Updated dice with auto-detected version ==4.0.0
Python-basmiljö
UDF:er körs på Databricks-beräkningen och inte på klienten. Den grundläggande Python-miljö där UDF:er körs beror på Databricks-beräkningen.
För kluster är python-basmiljön Python-miljön för Databricks Runtime-versionen som körs i klustret. Python-versionen och listan över paket i den här basmiljön finns under avsnitten Systemmiljö och Installerade Python-bibliotek i Viktig information om Databricks Runtime.
För serverlös beräkning motsvarar den grundläggande Python-miljön den serverlösa miljöversionen enligt följande tabell. Databricks Connect-versioner som inte finns med i den här tabellen stöder inte serverlösa ännu eller har nått supportens slut. Se versionstödmatrisen och föråldrade Databricks Connect-versioner.
| Databricks Connect-version | UDF-serverlös miljö |
|---|---|
| 18.0, Python 3.12 | Version 5 |
| 17.2 till 17.3, Python 3.12 | Version 4 |
| 16.4.1 till under 17, Python 3.12 | Version 3 |
| 15.4.10 till under 16, Python 3.12 | Version 3 |
| 15.4.10 till under 16, Python 3.11 | Version 2 |