Nota:
El acceso a esta página requiere autorización. Puede intentar iniciar sesión o cambiar directorios.
El acceso a esta página requiere autorización. Puede intentar cambiar los directorios.
Importante
Esta característica está en versión preliminar pública.
En Databricks Runtime 15.3 y versiones posteriores, puede usar el tipo VARIANT para ingerir datos semiestructurados. En este artículo se describe el comportamiento y se proporciona patrones de ejemplo para la ingesta de datos desde el almacenamiento de objetos en la nube mediante Auto Loader y COPY INTO, la transmisión de registros desde Kafka, y comandos SQL para crear nuevas tablas con datos variantes o insertar nuevos registros usando el tipo variante. En la tabla siguiente se resumen los formatos de archivo admitidos y la compatibilidad con la versión de Databricks Runtime:
| Formato de archivo | Versión admitida de Databricks Runtime |
|---|---|
| JSON | 15.3 y versiones posteriores |
| XML | 16.4 y versiones posteriores |
| CSV | 16.4 y versiones posteriores |
Consulte Consulta de datos de variante.
Cree una tabla con una columna variable
VARIANT es un tipo SQL estándar en Databricks Runtime 15.3 y versiones posteriores y es compatible con tablas respaldadas por Delta Lake. Las tablas administradas en Azure Databricks usan Delta Lake de forma predeterminada, por lo que puede crear una tabla vacía con una sola VARIANT columna mediante la sintaxis siguiente.
CREATE TABLE table_name (variant_column VARIANT)
Como alternativa, puede usar una instrucción CTAS para crear una tabla con una columna variant. Use la PARSE_JSON función para analizar cadenas JSON o la FROM_XML función para analizar cadenas XML. En el ejemplo siguiente se crea una tabla con dos columnas.
- La
idcolumna se extrae de la cadena JSON como unSTRINGtipo. -
variant_columncontiene toda la cadena JSON codificada como unVARIANTtipo.
CREATE TABLE table_name AS
SELECT json_string:id AS id,
PARSE_JSON(json_string) variant_column
FROM source_data
Nota:
Databricks recomienda extraer campos consultados con frecuencia y almacenarlos como columnas no variantes para acelerar las consultas y optimizar el diseño de almacenamiento.
Las columnas VARIANT no se pueden usar para las claves de agrupación en clústeres, las particiones ni las claves de orden Z. No se puede usar el tipo de datos VARIANT para comparaciones, agrupación, ordenación y establecimiento de operaciones. Para más información, consulte las limitaciones.
Insertar datos mediante parse_json
Si la tabla de destino ya contiene una columna codificada como VARIANT, puede usar parse_json para insertar registros de cadena JSON como VARIANT. Por ejemplo, analice las cadenas JSON de la json_string columna e insértelas en table_name.
SQL
INSERT INTO table_name (variant_column)
SELECT PARSE_JSON(json_string)
FROM source_data
Pitón
from pyspark.sql.functions import col, parse_json
(spark.read
.table("source_data")
.select(parse_json(col("json_string")))
.write
.mode("append")
.saveAsTable("table_name")
)
Insertar datos mediante from_xml
Si la tabla de destino ya contiene una columna codificada como VARIANT, puede usar from_xml para insertar registros de cadena XML como VARIANT. Por ejemplo, analice las cadenas XML de la xml_string columna e insértelas en table_name.
SQL
INSERT INTO table_name (variant_column)
SELECT FROM_XML(xml_string, 'variant')
FROM source_data
Pitón
from pyspark.sql.functions import col, from_xml
(spark.read
.table("source_data")
.select(from_xml(col("xml_string"), "variant"))
.write
.mode("append")
.saveAsTable("table_name")
)
Insertar datos mediante from_csv
Si la tabla de destino ya contiene una columna codificada como VARIANT, puede usar from_csv para insertar registros de cadena CSV como VARIANT. Por ejemplo, analice los registros CSV de la csv_string columna e insértelos en table_name.
SQL
INSERT INTO table_name (variant_column)
SELECT FROM_CSV(csv_string, 'v variant').v
FROM source_data
Pitón
from pyspark.sql.functions import col, from_csv
(spark.read
.table("source_data")
.select(from_csv(col("csv_string"), "v variant").v)
.write
.mode("append")
.saveAsTable("table_name")
)
Ingesta de datos del almacenamiento de objetos en la nube como variante
El cargador automático se puede usar para cargar todos los datos de los orígenes de archivos admitidos como una sola VARIANT columna en una tabla de destino. Dado que VARIANT es flexible ante cambios de esquema y tipo y mantiene la distinción de mayúsculas y minúsculas y los valores NULL presentes en la fuente de datos, este patrón es robusto para la mayoría de los escenarios de ingesta con las siguientes advertencias:
- Los registros con formato incorrecto no se pueden codificar mediante el tipo
VARIANT. -
VARIANTel tipo solo puede contener registros de hasta 16 MB de tamaño.
Nota:
Variant trata los registros que son demasiado grandes de manera similar a los registros dañados. En el modo de procesamiento predeterminado PERMISSIVE, los registros demasiado grandes se capturan en el corruptRecordColumn.
Dado que todo el registro se registra como una sola VARIANT columna, no se produce ninguna evolución del esquema durante la ingesta y rescuedDataColumn no se admite. En el siguiente ejemplo se supone que la tabla de destino ya existe con una sola columna VARIANT.
(spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "json")
.option("singleVariantColumn", "variant_column")
.load("/Volumes/catalog_name/schema_name/volume_name/path")
.writeStream
.option("checkpointLocation", checkpoint_path)
.toTable("table_name")
)
También puede especificar VARIANT al definir un esquema o pasar schemaHints. Los datos del campo de origen al que se hace referencia deben contener un registro válido. En los ejemplos siguientes se muestra esta sintaxis.
# Define the schema.
# Writes the columns `name` as a string and `address` as variant.
(spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "json")
.schema("name STRING, address VARIANT")
.load("/Volumes/catalog_name/schema_name/volume_name/path")
.writeStream
.option("checkpointLocation", checkpoint_path)
.toTable("table_name")
)
# Define the schema.
# A single field `payload` containing JSON data is written as variant.
(spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "json")
.schema("payload VARIANT")
.load("/Volumes/catalog_name/schema_name/volume_name/path")
.writeStream
.option("checkpointLocation", checkpoint_path)
.toTable("table_name")
)
# Supply schema hints.
# Writes the `address` column as variant.
# Infers the schema for other fields using standard rules.
(spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "json")
.option("cloudFiles.schemaHints", "address VARIANT")
.load("/Volumes/catalog_name/schema_name/volume_name/path")
.writeStream
.option("checkpointLocation", checkpoint_path)
.toTable("table_name")
)
Usar COPY INTO con variante
Databricks recomienda usar Auto Loader sobre COPY INTO cuando esté disponible.
COPY INTO admite la ingesta de todo el contenido de un origen de datos admitido como una sola columna. En el siguiente ejemplo se crea una nueva tabla con una sola columna VARIANT y, a continuación, se usa COPY INTO para ingerir registros de un origen de archivo JSON.
CREATE TABLE table_name (variant_column VARIANT);
COPY INTO table_name
FROM '/Volumes/catalog_name/schema_name/volume_name/path'
FILEFORMAT = JSON
FILES = ('file-name')
FORMAT_OPTIONS ('singleVariantColumn' = 'variant_column')
Transmisión de datos de Kafka como variante
Muchos flujos de Kafka codifican sus cargas con JSON. La ingesta de flujos de Kafka mediante VARIANT hace que estas cargas de trabajo sean sólidas para los cambios de esquema.
En el siguiente ejemplo se muestra cómo leer un origen de streaming de Kafka, convertir el key en un STRING y el value en VARIANT y escribir en una tabla de destino.
from pyspark.sql.functions import col, parse_json
(spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("subscribe", "topic1")
.option("startingOffsets", "earliest")
.load()
.select(
col("key").cast("string"),
parse_json(col("value").cast("string"))
).writeStream
.option("checkpointLocation", checkpoint_path)
.toTable("table_name")
)
Pasos siguientes
- Consultar datos de variabilidad.
- Configure soporte de tipos variant.
- Más información sobre Auto Loader. Consulte ¿Qué es Auto Loader?.