En distribuerad samling data grupperade i namngivna kolumner.
En DataFrame motsvarar en relationstabell i Spark SQL och kan skapas med hjälp av olika funktioner i SparkSession.
Viktigt!
En DataFrame ska inte skapas direkt med konstruktorn.
Stöder Spark Connect
Egenskaper
| Fastighet |
Beskrivning |
sparkSession |
Returnerar SparkSession som skapade dataramen. |
rdd |
Returnerar innehållet som en RDD för rad (endast klassiskt läge). |
na |
Returnerar en DataFrameNaFunctions för hantering av saknade värden. |
stat |
Returnerar en DataFrameStatFunctions för statistikfunktioner. |
write |
Gränssnitt för att spara innehållet i dataramen som inte strömmas ut till extern lagring. |
writeStream |
Gränssnitt för att spara innehållet i strömmande DataFrame i extern lagring. |
schema |
Returnerar schemat för dataramen som en StructType. |
dtypes |
Returnerar alla kolumnnamn och deras datatyper som en lista. |
columns |
Hämtar namnen på alla kolumner i DataFrame som en lista. |
storageLevel |
Hämta dataramens aktuella lagringsnivå. |
isStreaming |
Returnerar Sant om dataramen innehåller en eller flera källor som kontinuerligt returnerar data när de tas emot. |
executionInfo |
Returnerar ett ExecutionInfo-objekt efter att frågan har körts. |
plot |
Returnerar en PySparkPlotAccessor för ritningsfunktioner. |
Methods
Datavisning och inspektion
Temporära vyer
Markering och projektion
Sortering och sortering
Sammansättning och gruppering
| Metod |
Beskrivning |
groupBy(*cols) |
Grupperar DataFrame efter de angivna kolumnerna så att aggregering kan utföras på dem. |
rollup(*cols) |
Skapa en flerdimensionell sammanslagning för den aktuella DataFrame med hjälp av de angivna kolumnerna. |
cube(*cols) |
Skapa en flerdimensionell kub för den aktuella DataFrame med hjälp av de angivna kolumnerna. |
groupingSets(groupingSets, *cols) |
Skapa flerdimensionell aggregering för den aktuella DataFrame med hjälp av de angivna grupperingsuppsättningarna. |
agg(*exprs) |
Aggregera på hela DataFrame utan grupper (förkortning för df.groupBy().agg()). |
observe(observation, *exprs) |
Definiera (namngivna) mått som ska observeras på DataFrame. |
Ansluter sig
Ange åtgärder
| Metod |
Beskrivning |
union(other) |
Returnera en ny DataFrame som innehåller en union av rader i den här och en annan DataFrame. |
unionByName(other, allowMissingColumns) |
Returnerar en ny DataFrame som innehåller en union av rader i den här och en annan DataFrame. |
intersect(other) |
Returnera en ny DataFrame som endast innehåller rader i både den här dataramen och en annan DataFrame. |
intersectAll(other) |
Returnera en ny DataFrame som innehåller rader i både den här dataramen och en annan DataFrame samtidigt som dubbletter bevaras. |
subtract(other) |
Returnera en ny DataFrame som innehåller rader i den här dataramen men inte i en annan DataFrame. |
exceptAll(other) |
Returnera en ny DataFrame som innehåller rader i den här dataramen men inte i en annan DataFrame samtidigt som dubbletter bevaras. |
Deduplication
| Metod |
Beskrivning |
distinct() |
Returnerar en ny DataFrame som innehåller de distinkta raderna i den här dataramen. |
dropDuplicates(subset) |
Returnera en ny DataFrame med duplicerade rader borttagna, om du bara vill överväga vissa kolumner. |
dropDuplicatesWithinWatermark(subset) |
Returnera en ny DataFrame med duplicerade rader borttagna, om du bara vill överväga vissa kolumner, inom vattenstämpeln. |
Sampling och delning
Partitionering
| Metod |
Beskrivning |
coalesce(numPartitions) |
Returnerar en ny DataFrame som har exakt numPartitions-partitioner. |
repartition(numPartitions, *cols) |
Returnerar en ny DataFrame som partitionerats av de angivna partitioneringsuttrycken. |
repartitionByRange(numPartitions, *cols) |
Returnerar en ny DataFrame som partitionerats av de angivna partitioneringsuttrycken. |
repartitionById(numPartitions, partitionIdCol) |
Returnerar en ny DataFrame som partitionerats av det angivna partitions-ID-uttrycket. |
Omforma
Datahantering saknas
Statistiska funktioner
Schemaåtgärder
| Metod |
Beskrivning |
to(schema) |
Returnerar en ny DataFrame där varje rad är avstämd för att matcha det angivna schemat. |
alias(alias) |
Returnerar en ny DataFrame med en aliasuppsättning. |
Iteration
| Metod |
Beskrivning |
foreach(f) |
Tillämpar f-funktionen på alla rader i den här dataramen. |
foreachPartition(f) |
Tillämpar f-funktionen på varje partition i dataramen. |
Cachelagring och beständighet
| Metod |
Beskrivning |
cache() |
Bevarar DataFrame med standardlagringsnivån (MEMORY_AND_DISK_DESER). |
persist(storageLevel) |
Anger lagringsnivån så att innehållet i DataFrame bevaras mellan olika åtgärder. |
unpersist(blocking) |
Markerar DataFrame som icke-beständig och tar bort alla block för den från minne och disk. |
Kontrollpunkter
Strömningsåtgärder
Optimeringstips
Gränser och förskjutningar
| Metod |
Beskrivning |
limit(num) |
Begränsar resultatantalet till det angivna talet. |
offset(num) |
Returnerar en ny DataFrame genom att hoppa över de första n raderna. |
Konverteringsmetoder
Att skriva in data
| Metod |
Beskrivning |
writeTo(table) |
Skapa en skrivkonfigurationsbyggare för v2-källor. |
mergeInto(table, condition) |
Sammanfogar en uppsättning uppdateringar, infogningar och borttagningar baserat på en källtabell till en måltabell. |
Jämförelse av DataFrame
| Metod |
Beskrivning |
sameSemantics(other) |
Returnerar Sant när de logiska frågeplanerna i båda DataFrames är lika med. |
semanticHash() |
Returnerar en hashkod för den logiska frågeplanen mot dataramen. |
| Metod |
Beskrivning |
inputFiles() |
Returnerar en ögonblicksbild av de filer som utgör dataramen. |
Avancerade SQL-funktioner
| Metod |
Beskrivning |
isLocal() |
Returnerar True om metoderna collect and take kan köras lokalt. |
asTable() |
Konverterar DataFrame till ett TableArg-objekt, som kan användas som ett tabellargument i en TVF. |
scalar() |
Returnera ett kolumnobjekt för en SCALAR-underfråga som innehåller exakt en rad och en kolumn. |
exists() |
Returnera ett kolumnobjekt för en EXISTS-underfråga. |
Exempel
Grundläggande DataFrame-åtgärder
# Create a DataFrame
people = spark.createDataFrame([
{"deptId": 1, "age": 40, "name": "Hyukjin Kwon", "gender": "M", "salary": 50},
{"deptId": 1, "age": 50, "name": "Takuya Ueshin", "gender": "M", "salary": 100},
{"deptId": 2, "age": 60, "name": "Xinrong Meng", "gender": "F", "salary": 150},
{"deptId": 3, "age": 20, "name": "Haejoon Lee", "gender": "M", "salary": 200}
])
# Select columns
people.select("name", "age").show()
# Filter rows
people.filter(people.age > 30).show()
# Add a new column
people.withColumn("age_plus_10", people.age + 10).show()
Sammansättning och gruppering
# Group by and aggregate
people.groupBy("gender").agg({"salary": "avg", "age": "max"}).show()
# Multiple aggregations
from pyspark.sql import functions as F
people.groupBy("deptId").agg(
F.avg("salary").alias("avg_salary"),
F.max("age").alias("max_age")
).show()
Ansluter sig
# Create another DataFrame
department = spark.createDataFrame([
{"id": 1, "name": "PySpark"},
{"id": 2, "name": "ML"},
{"id": 3, "name": "Spark SQL"}
])
# Join DataFrames
people.join(department, people.deptId == department.id).show()
# Chained operations
result = people.filter(people.age > 30) \\
.join(department, people.deptId == department.id) \\
.groupBy(department.name, "gender") \\
.agg({"salary": "avg", "age": "max"}) \\
.sort("max(age)")
result.show()