Uma coleção distribuída de dados agrupados em colunas nomeadas.
Um DataFrame é equivalente a uma tabela relacional no Spark SQL e pode ser criado usando várias funções no SparkSession.
Importante
Um DataFrame não deve ser criado diretamente usando o construtor.
Dá suporte ao Spark Connect
Propriedades
| Propriedade |
Descrição |
sparkSession |
Retorna SparkSession que criou esse DataFrame. |
rdd |
Retorna o conteúdo como um RDD de Linha (somente modo clássico). |
na |
Retorna um DataFrameNaFunctions para lidar com valores ausentes. |
stat |
Retorna um DataFrameStatFunctions para funções de estatística. |
write |
Interface para salvar o conteúdo do DataFrame sem streaming no armazenamento externo. |
writeStream |
Interface para salvar o conteúdo do DataFrame de streaming no armazenamento externo. |
schema |
Retorna o esquema deste DataFrame como um StructType. |
dtypes |
Retorna todos os nomes de coluna e seus tipos de dados como uma lista. |
columns |
Recupera os nomes de todas as colunas no DataFrame como uma lista. |
storageLevel |
Obtenha o nível de armazenamento atual do DataFrame. |
isStreaming |
Retorna True se esse DataFrame contiver uma ou mais fontes que retornam dados continuamente à medida que chegam. |
executionInfo |
Retorna um objeto ExecutionInfo depois que a consulta foi executada. |
plot |
Retorna um PySparkPlotAccessor para plotar funções. |
Methods
Exibição e inspeção de dados
Visões temporárias
Seleção e projeção
Classificação e ordenação
Agregação e agrupamento
| Método |
Descrição |
groupBy(*cols) |
Agrupa o DataFrame pelas colunas especificadas para que a agregação possa ser executada nelas. |
rollup(*cols) |
Crie um rollup multidimensional para o DataFrame atual usando as colunas especificadas. |
cube(*cols) |
Crie um cubo multidimensional para o DataFrame atual usando as colunas especificadas. |
groupingSets(groupingSets, *cols) |
Crie agregação multidimensional para o DataFrame atual usando os conjuntos de agrupamento especificados. |
agg(*exprs) |
Agregar em todo o DataFrame sem grupos (abreviação para df.groupBy().agg()). |
observe(observation, *exprs) |
Defina as métricas (nomeadas) a serem observadas no DataFrame. |
Joins
Definir operações
| Método |
Descrição |
union(other) |
Retorne um novo DataFrame que contém a união de linhas neste e em outro DataFrame. |
unionByName(other, allowMissingColumns) |
Retorna um novo DataFrame que contém a união de linhas neste e em outro DataFrame. |
intersect(other) |
Retornar um novo DataFrame que contém linhas somente neste DataFrame e em outro DataFrame. |
intersectAll(other) |
Retorne um novo DataFrame que contém linhas neste DataFrame e em outro DataFrame, preservando duplicatas. |
subtract(other) |
Retorne um novo DataFrame que contém linhas neste DataFrame, mas não em outro DataFrame. |
exceptAll(other) |
Retorne um novo DataFrame que contém linhas neste DataFrame, mas não em outro DataFrame, preservando duplicatas. |
Deduplicação
| Método |
Descrição |
distinct() |
Retorna um novo DataFrame que contém as linhas distintas neste DataFrame. |
dropDuplicates(subset) |
Retorne um novo DataFrame com linhas duplicadas removidas, opcionalmente considerando apenas determinadas colunas. |
dropDuplicatesWithinWatermark(subset) |
Retorne um novo DataFrame com linhas duplicadas removidas, opcionalmente considerando apenas determinadas colunas, dentro da marca d'água. |
Amostragem e divisão
Partitioning
| Método |
Descrição |
coalesce(numPartitions) |
Retorna um novo DataFrame que tem exatamente partições numPartitions. |
repartition(numPartitions, *cols) |
Retorna um novo DataFrame particionado pelas expressões de particionamento fornecidas. |
repartitionByRange(numPartitions, *cols) |
Retorna um novo DataFrame particionado pelas expressões de particionamento fornecidas. |
repartitionById(numPartitions, partitionIdCol) |
Retorna um novo DataFrame particionado pela expressão de ID de partição fornecida. |
Remodelagem
Tratamento de dados ausentes
Funções estatísticas
Operações de esquema
| Método |
Descrição |
to(schema) |
Retorna um novo DataFrame em que cada linha é reconciliada para corresponder ao esquema especificado. |
alias(alias) |
Retorna um novo DataFrame com um conjunto de alias. |
Iteração
| Método |
Descrição |
foreach(f) |
Aplica a função f a todas as linhas deste DataFrame. |
foreachPartition(f) |
Aplica a função f a cada partição deste DataFrame. |
Cache e persistência
| Método |
Descrição |
cache() |
Persiste o DataFrame com o nível de armazenamento padrão (MEMORY_AND_DISK_DESER). |
persist(storageLevel) |
Define o nível de armazenamento para persistir o conteúdo do DataFrame entre operações. |
unpersist(blocking) |
Marca o DataFrame como não persistente e remove todos os blocos para ele da memória e do disco. |
Definindo o ponto de verificação
Operações de streaming
Dicas de otimização
Limites e deslocamentos
| Método |
Descrição |
limit(num) |
Limita a contagem de resultados ao número especificado. |
offset(num) |
Retorna um novo DataFrame ignorando as primeiras n linhas. |
Métodos de conversão
Gravação de dados
| Método |
Descrição |
writeTo(table) |
Crie um construtor de configuração de gravação para fontes v2. |
mergeInto(table, condition) |
Mescla um conjunto de atualizações, inserções e exclusões com base em uma tabela de origem em uma tabela de destino. |
Comparação de DataFrame
| Método |
Descrição |
sameSemantics(other) |
Retorna True quando os planos de consulta lógica dentro de ambos os DataFrames são iguais. |
semanticHash() |
Retorna um código hash do plano de consulta lógica em relação a esse DataFrame. |
| Método |
Descrição |
inputFiles() |
Retorna um instantâneo de melhor esforço dos arquivos que compõem esse DataFrame. |
Recursos avançados do SQL
| Método |
Descrição |
isLocal() |
Retorna True se os métodos collect e take podem ser executados localmente. |
asTable() |
Converte o DataFrame em um objeto TableArg, que pode ser usado como um argumento de tabela em um TVF. |
scalar() |
Retornar um objeto Column para uma subconsulta SCALAR que contém exatamente uma linha e uma coluna. |
exists() |
Retornar um objeto Column para uma Subconsulta EXISTS. |
Exemplos
Operações básicas do DataFrame
# Create a DataFrame
people = spark.createDataFrame([
{"deptId": 1, "age": 40, "name": "Hyukjin Kwon", "gender": "M", "salary": 50},
{"deptId": 1, "age": 50, "name": "Takuya Ueshin", "gender": "M", "salary": 100},
{"deptId": 2, "age": 60, "name": "Xinrong Meng", "gender": "F", "salary": 150},
{"deptId": 3, "age": 20, "name": "Haejoon Lee", "gender": "M", "salary": 200}
])
# Select columns
people.select("name", "age").show()
# Filter rows
people.filter(people.age > 30).show()
# Add a new column
people.withColumn("age_plus_10", people.age + 10).show()
Agregação e agrupamento
# Group by and aggregate
people.groupBy("gender").agg({"salary": "avg", "age": "max"}).show()
# Multiple aggregations
from pyspark.sql import functions as F
people.groupBy("deptId").agg(
F.avg("salary").alias("avg_salary"),
F.max("age").alias("max_age")
).show()
Joins
# Create another DataFrame
department = spark.createDataFrame([
{"id": 1, "name": "PySpark"},
{"id": 2, "name": "ML"},
{"id": 3, "name": "Spark SQL"}
])
# Join DataFrames
people.join(department, people.deptId == department.id).show()
# Chained operations
result = people.filter(people.age > 30) \\
.join(department, people.deptId == department.id) \\
.groupBy(department.name, "gender") \\
.agg({"salary": "avg", "age": "max"}) \\
.sort("max(age)")
result.show()