Nota:
El acceso a esta página requiere autorización. Puede intentar iniciar sesión o cambiar directorios.
El acceso a esta página requiere autorización. Puede intentar cambiar los directorios.
Importante
Esta documentación se ha retirado y es posible que no se actualice. Los productos, servicios o tecnologías mencionados en este contenido ya no se admiten. Consulte ¿Qué es Auto Loader?.
El conector ABS-AQS de Databricks proporciona un origen de archivos optimizado, que usa Azure Queue Storage (AQS), para encontrar nuevos archivos que se hayan escrito en un contenedor de Azure Blob Storage (ABS) sin necesidad de enumerar todos los archivos de forma repetida. Esto proporciona dos ventajas:
- Menor latencia: no es necesario enumerar las estructuras de directorios anidados en ABS, una operación lenta y que consume muchos recursos.
- Menores costos: no se realizan solicitudes de API más costosas LIST a ABS.
Nota:
El origen de ABS-AQS elimina los mensajes de la cola de AQS a medida que este consume los eventos. Si quiere hacer que otras canalizaciones consuman mensajes de esta cola, configure una cola de AQS independiente para el lector optimizado. Puede configurar varias suscripciones a Event Grid para publicar en distintas colas.
Utilice el origen de archivo de ABS-AQS
Para usar el origen de archivo de ABS-AQS, debe seguir los pasos siguientes:
Saque provecho de las suscripciones a Azure Event Grid y diríjalas a AQS para configurar las notificaciones de eventos de ABS. Consulte Reacción ante eventos de Blob Storage.
Especifique las opciones
fileFormatyqueueUrl, y un esquema. Por ejemplo:spark.readStream \ .format("abs-aqs") \ .option("fileFormat", "json") \ .option("queueName", ...) \ .option("connectionString", ...) \ .schema(...) \ .load()
Autenticación con Azure Queue Storage y Blob Storage
Para autenticarse con Azure Queue Storage y Blob Storage, use los tokens de Firma de Acceso Compartido (SAS) o las claves de la cuenta de almacenamiento. Debe proporcionar una cadena de conexión para la cuenta de almacenamiento donde está implementada su cola, esta debe contener ya sea el token SAS o las claves de acceso a su cuenta de almacenamiento. Para obtener más información sobre las cadenas de conexión, consulte Configuración de las cadenas de conexión de Azure Storage.
También deberá proporcionar acceso a los contenedores de Azure Blob Storage. Consulte Conexión a Azure Data Lake Storage y Blob Storage para obtener información sobre cómo configurar el acceso al contenedor de Azure Blob Storage.
Nota:
Databricks recomienda encarecidamente que utilice Administrar secretos para proporcionar las cadenas de conexión.
Configuración
| Opción | Tipo | Valor predeterminado | Descripción |
|---|---|---|---|
| permitirSobrescrituras | Booleano | true |
Indica si se debe volver a procesar un blob que ha sido sobrescrito. |
| connectionString | Cuerda | Ninguno (parámetro obligatorio) | Cadena de conexión para acceder a la cola. |
| fetchParallelism | Entero | 1 | Número de subprocesos que se utilizan al obtener mensajes desde el servicio de cola. |
| formato de archivo | Cuerda | Ninguno (parámetro obligatorio) | Formato de los archivos, como parquet, json, csv, text, etc. |
| ignoreFileDeletion | Booleano | false |
Si tiene configuraciones de ciclo de vida o elimina los archivos de origen manualmente, debe establecer esta opción en true. |
| maxFileAge | Entero | 604800 | Determina cuánto tiempo (en segundos) se almacenan las notificaciones de archivo como estado para evitar el procesamiento duplicado. |
| reescrituras de rutas | Cadena JSON. | "{}" |
Si utilizas puntos de montaje, puedes reescribir el prefijo de la ruta de acceso container@storageAccount/key con el punto de montaje. Solo se pueden reescribir los prefijos. Por ejemplo, para la configuración {"myContainer@myStorageAccount/path": "dbfs:/mnt/data-warehouse"}, la ruta de acceso wasbs://myContainer@myStorageAccount.blob.windows.core.net/path/2017/08/fileA.json se modifica adbfs:/mnt/data-warehouse/2017/08/fileA.json. |
| queueFetchInterval | Una cadena de tiempo, por ejemplo, 2m para 2 minutos. |
"5s" |
¿Qué tiempo de espera entre las extracciones si la cola está vacía? Azure cobra por solicitud de API a AQS. Por lo tanto, si los datos no llegan con frecuencia, este valor se puede establecer en una duración larga. Siempre que la cola no esté vacía, recuperaremos continuamente. Si se crean nuevos archivos cada 5 minutos, es posible que quiera establecer un valor de queueFetchInterval alto para reducir los costos de AQS. |
| queueName | Cuerda | Ninguno (parámetro obligatorio) | Nombre de la cola de AQS. |
Si observa una gran cantidad de mensajes en los registros de control del controlador que se parecen a Fetched 0 new events and 3 old events., donde tiende a observar muchos más eventos antiguos que nuevos, debe reducir el intervalo de activación de su transmisión.
Si está consumiendo archivos de una ubicación de Blob Storage donde espera que quizás algunos archivos se eliminen antes de que se puedan procesar, puede establecer la siguiente configuración para omitir el error y continuar con el procesamiento:
spark.sql("SET spark.sql.files.ignoreMissingFiles=true")
Preguntas más frecuentes
Si el valor de ignoreFileDeletion es False (valor predeterminado) y el objeto se ha eliminado, ¿se producirá un error en toda la canalización?
Sí, si recibimos un evento que indica que el archivo se eliminó, se producirá un error en toda la canalización.
¿Cómo debo establecer el valor de maxFileAge?
Azure Queue Storage proporciona semántica de entrega de mensajes al menos una vez, por lo que es necesario mantener el estado para la desduplicación. La configuración predeterminada de maxFileAge es 7 días, que es igual al TTL máximo de un mensaje en la cola.