Uma coleção distribuída de dados agrupados em colunas nomeadas.
Um DataFrame é equivalente a uma tabela relacional no Spark SQL e pode ser criado usando várias funções no SparkSession.
Importante
Um DataFrame não deve ser criado diretamente usando o construtor.
Suporta Spark Connect
Propriedades
| Propriedade |
Descrição |
sparkSession |
Devolve o SparkSession que criou este DataFrame. |
rdd |
Devolve o conteúdo como RDD de Row (apenas modo Clássico). |
na |
Devolve um DataFrameNaFunctions para lidar com valores em falta. |
stat |
Devolve uma DataFrameStatFunctions para funções estatísticas. |
write |
Interface para guardar o conteúdo do DataFrame não em streaming para armazenamento externo. |
writeStream |
Interface para guardar o conteúdo do DataFrame em streaming para armazenamento externo. |
schema |
Devolve o esquema deste DataFrame como um StructType. |
dtypes |
Devolve todos os nomes das colunas e os seus tipos de dados como uma lista. |
columns |
Recupera os nomes de todas as colunas do DataFrame como uma lista. |
storageLevel |
Obtenha o nível de armazenamento atual do DataFrame. |
isStreaming |
Retorna Verdadeiro se este DataFrame contiver uma ou mais fontes que retornam continuamente dados à medida que chegam. |
executionInfo |
Devolve um objeto ExecutionInfo após a execução da consulta. |
plot |
Devolve um PySparkPlotAccessor para funções de plotamento. |
Methods
Visualização e inspeção de dados
Vistas temporárias
Seleção e projeção
Ordenação e ordenação
Agregação e agrupamento
| Método |
Descrição |
groupBy(*cols) |
Agrupa o DataFrame pelas colunas especificadas para que a agregação possa ser realizada sobre elas. |
rollup(*cols) |
Crie um rollup multidimensional para o DataFrame atual usando as colunas especificadas. |
cube(*cols) |
Crie um cubo multidimensional para o DataFrame atual usando as colunas especificadas. |
groupingSets(groupingSets, *cols) |
Crie agregação multidimensional para o DataFrame atual usando os conjuntos de agrupamento especificados. |
agg(*exprs) |
Agregar em todo o DataFrame sem grupos (abreviação de df.groupBy().agg()). |
observe(observation, *exprs) |
Defina métricas (nomeadas) a observar no DataFrame. |
Joins
Definir operações
| Método |
Descrição |
union(other) |
Devolva um novo DataFrame contendo a união de linhas neste e noutro DataFrame. |
unionByName(other, allowMissingColumns) |
Devolve um novo DataFrame contendo a união de linhas neste e noutro DataFrame. |
intersect(other) |
Devolva um novo DataFrame contendo apenas linhas tanto neste DataFrame como noutro DataFrame. |
intersectAll(other) |
Devolva um novo DataFrame contendo linhas tanto neste DataFrame como noutro DataFrame, preservando duplicados. |
subtract(other) |
Devolva um novo DataFrame contendo linhas neste DataFrame mas não noutro DataFrame. |
exceptAll(other) |
Devolva uma nova DataFrame contendo linhas nesse DataFrame mas não noutra DataFrame, preservando duplicados. |
Deduplication
| Método |
Descrição |
distinct() |
Devolve um novo DataFrame contendo as linhas distintas deste DataFrame. |
dropDuplicates(subset) |
Devolva um novo DataFrame com as linhas duplicadas removidas, considerando opcionalmente apenas certas colunas. |
dropDuplicatesWithinWatermark(subset) |
Devolva um novo DataFrame com linhas duplicadas removidas, opcionalmente apenas considerando certas colunas, dentro da marca de água. |
Amostragem e divisão
Partitioning
| Método |
Descrição |
coalesce(numPartitions) |
Devolve um novo DataFrame que tem exatamente as partições numPartitions. |
repartition(numPartitions, *cols) |
Devolve uma nova DataFrame particionada pelas expressões de partição dadas. |
repartitionByRange(numPartitions, *cols) |
Devolve uma nova DataFrame particionada pelas expressões de partição dadas. |
repartitionById(numPartitions, partitionIdCol) |
Devolve uma nova DataFrame particionada pela expressão de ID de partição dada. |
Remodelação
Tratamento de dados em falta
Funções estatísticas
Operações de esquema
| Método |
Descrição |
to(schema) |
Devolve um novo DataFrame onde cada linha é reconciliada para corresponder ao esquema especificado. |
alias(alias) |
Devolve um novo DataFrame com um alias definido. |
Iteration
| Método |
Descrição |
foreach(f) |
Aplica a função f a todas as linhas deste DataFrame. |
foreachPartition(f) |
Aplica a função f a cada partição deste DataFrame. |
Cache e persistência
| Método |
Descrição |
cache() |
Persiste o DataFrame com o nível de armazenamento predefinido (MEMORY_AND_DISK_DESER). |
persist(storageLevel) |
Define o nível de armazenamento para persistir o conteúdo do DataFrame entre operações. |
unpersist(blocking) |
Marca o DataFrame como não persistente e remove todos os blocos da memória e do disco. |
Pontos de verificação
Operações de streaming
Dicas de otimização
Limites e deslocamentos
| Método |
Descrição |
limit(num) |
Limita a contagem de resultados ao número especificado. |
offset(num) |
Devolve um novo DataFrame ao saltar as primeiras n linhas. |
Métodos de conversão
Gravando dados
| Método |
Descrição |
writeTo(table) |
Cria um construtor de configuração de escrita para fontes v2. |
mergeInto(table, condition) |
Funde um conjunto de atualizações, inserções e eliminações baseadas numa tabela de origem numa tabela alvo. |
Comparação de DataFrame
| Método |
Descrição |
sameSemantics(other) |
Retorna Verdadeiro quando os planos lógicos de consulta dentro de ambos os DataFrames são iguais. |
semanticHash() |
Devolve um código hash do plano lógico de consulta contra este DataFrame. |
| Método |
Descrição |
inputFiles() |
Devolve um instantâneo de melhor esforço dos ficheiros que compõem este DataFrame. |
Funcionalidades avançadas de SQL
| Método |
Descrição |
isLocal() |
Retornos Verdadeiro se os métodos collect e take puderem ser executados localmente. |
asTable() |
Converte o DataFrame num objeto TableArg, que pode ser usado como argumento de tabela numa TVF. |
scalar() |
Devolva um objeto Coluna para uma Subconsulta SCALAR contendo exatamente uma linha e uma coluna. |
exists() |
Devolva um objeto Coluna para uma Subconsulta EXISTS. |
Exemplos
Operações básicas de DataFrame
# Create a DataFrame
people = spark.createDataFrame([
{"deptId": 1, "age": 40, "name": "Hyukjin Kwon", "gender": "M", "salary": 50},
{"deptId": 1, "age": 50, "name": "Takuya Ueshin", "gender": "M", "salary": 100},
{"deptId": 2, "age": 60, "name": "Xinrong Meng", "gender": "F", "salary": 150},
{"deptId": 3, "age": 20, "name": "Haejoon Lee", "gender": "M", "salary": 200}
])
# Select columns
people.select("name", "age").show()
# Filter rows
people.filter(people.age > 30).show()
# Add a new column
people.withColumn("age_plus_10", people.age + 10).show()
Agregação e agrupamento
# Group by and aggregate
people.groupBy("gender").agg({"salary": "avg", "age": "max"}).show()
# Multiple aggregations
from pyspark.sql import functions as F
people.groupBy("deptId").agg(
F.avg("salary").alias("avg_salary"),
F.max("age").alias("max_age")
).show()
Joins
# Create another DataFrame
department = spark.createDataFrame([
{"id": 1, "name": "PySpark"},
{"id": 2, "name": "ML"},
{"id": 3, "name": "Spark SQL"}
])
# Join DataFrames
people.join(department, people.deptId == department.id).show()
# Chained operations
result = people.filter(people.age > 30) \\
.join(department, people.deptId == department.id) \\
.groupBy(department.name, "gender") \\
.agg({"salary": "avg", "age": "max"}) \\
.sort("max(age)")
result.show()