Nota:
El acceso a esta página requiere autorización. Puede intentar iniciar sesión o cambiar directorios.
El acceso a esta página requiere autorización. Puede intentar cambiar los directorios.
Apache Kafka es una plataforma de streaming distribuida para la creación de canalizaciones de streaming de datos en tiempo real que mueve los datos de forma confiable entre aplicaciones o sistemas. Kafka Connect es una herramienta para realizar streaming de datos de forma escalable y confiable entre Apache Kafka y otros sistemas de datos. El Kusto Kafka Sink actúa como conector desde Kafka y no requiere el uso de código. Descargue el archivo jar del conector de salida desde el repositorio Git o desde Confluent Connector Hub.
En este artículo se muestra cómo ingerir datos con Kafka mediante una configuración de Docker autocontenida para simplificar el clúster de Kafka y la configuración del clúster del conector de Kafka.
Para más información, consulte el repositorio Git del conector y las especificaciones de la versión.
Requisitos previos
- Suscripción a Azure. Cree una cuenta de Azure gratuita.
- Un clúster y una base de datos de Azure Data Explorer con las directivas de retención y caché predeterminadas.
- CLI de Azure.
- Docker y Docker Compose.
Creación de una entidad de servicio de Microsoft Entra
La entidad de servicio de Microsoft Entra se puede crear mediante el portal de Azure o de forma programada, como en el ejemplo siguiente.
Esta entidad de servicio es la identidad utilizada por el conector para escribir datos en la tabla de Kusto. Tú otorgas permisos para que esta entidad de servicio acceda a los recursos de Kusto.
Inicie sesión en su suscripción de Azure a través de la CLI de Azure. A continuación, realice la autenticación en el explorador.
az loginElija la suscripción para hospedar el principal. Este paso es necesario si tiene varias suscripciones.
az account set --subscription YOUR_SUBSCRIPTION_GUIDCree el principal de servicio. En este ejemplo, el principal de servicio se llama
my-service-principal.az ad sp create-for-rbac -n "my-service-principal" --role Contributor --scopes /subscriptions/{SubID}En los datos JSON devueltos, copie los valores
appId,passwordytenantpara usarlos posteriormente.{ "appId": "00001111-aaaa-2222-bbbb-3333cccc4444", "displayName": "my-service-principal", "name": "my-service-principal", "password": "00001111-aaaa-2222-bbbb-3333cccc4444", "tenant": "00001111-aaaa-2222-bbbb-3333cccc4444" }
Ha creado su aplicación de Microsoft Entra y principal de servicio.
Creación de una tabla de destino
Desde el entorno de consulta, cree una tabla llamada
Stormscon el siguiente comando:.create table Storms (StartTime: datetime, EndTime: datetime, EventId: int, State: string, EventType: string, Source: string)Cree el mapeo de tabla
Storms_CSV_Mappingcorrespondiente para los datos ingeridos con el siguiente comando:.create table Storms ingestion csv mapping 'Storms_CSV_Mapping' '[{"Name":"StartTime","datatype":"datetime","Ordinal":0}, {"Name":"EndTime","datatype":"datetime","Ordinal":1},{"Name":"EventId","datatype":"int","Ordinal":2},{"Name":"State","datatype":"string","Ordinal":3},{"Name":"EventType","datatype":"string","Ordinal":4},{"Name":"Source","datatype":"string","Ordinal":5}]'Establezca una directiva de ingesta por lotes en la tabla para una latencia de ingesta configurable en espera.
Sugerencia
La directiva de procesamiento por lotes de ingesta es un optimizador de rendimiento e incluye tres parámetros. Si se cumple la primera condición, se desencadena la ingesta en la tabla.
.alter table Storms policy ingestionbatching @'{"MaximumBatchingTimeSpan":"00:00:15", "MaximumNumberOfItems": 100, "MaximumRawDataSizeMB": 300}'Utiliza la entidad de servicio de la sección Creación de una entidad de servicio de Microsoft Entra para conceder permisos para trabajar en la base de datos.
.add database YOUR_DATABASE_NAME admins ('aadapp=YOUR_APP_ID;YOUR_TENANT_ID') 'AAD App'
Ejecutar el laboratorio
El siguiente laboratorio está diseñado para ofrecerte la experiencia de empezar a crear datos, configurar el conector de Kafka y transmitir estos datos. Después, puede examinar los datos ingeridos.
Clonación del repositorio Git
Clone el repositorio Git del laboratorio.
Cree un directorio local en el equipo.
mkdir ~/kafka-kusto-hol cd ~/kafka-kusto-holClone el repositorio.
cd ~/kafka-kusto-hol git clone https://github.com/Azure/azure-kusto-labs cd azure-kusto-labs/kafka-integration/dockerized-quickstart
Contenido del repositorio clonado
Ejecute el siguiente comando para enumerar el contenido del repositorio clonado:
cd ~/kafka-kusto-hol/azure-kusto-labs/kafka-integration/dockerized-quickstart
tree
El resultado de esta búsqueda es:
├── README.md
├── adx-query.png
├── adx-sink-config.json
├── connector
│ └── Dockerfile
├── docker-compose.yaml
└── storm-events-producer
├── Dockerfile
├── StormEvents.csv
├── go.mod
├── go.sum
├── kafka
│ └── kafka.go
└── main.go
Revisión de los archivos del repositorio clonado
En las secciones siguientes se explican las partes importantes de los archivos del árbol de archivos.
adx-sink-config.json
Este archivo contiene el archivo de propiedades del receptor de Kusto donde se actualizan detalles de configuración específicos:
{
"name": "storm",
"config": {
"connector.class": "com.microsoft.azure.kusto.kafka.connect.sink.KustoSinkConnector",
"flush.size.bytes": 10000,
"flush.interval.ms": 10000,
"tasks.max": 1,
"topics": "storm-events",
"kusto.tables.topics.mapping": "[{'topic': 'storm-events','db': '<enter database name>', 'table': 'Storms','format': 'csv', 'mapping':'Storms_CSV_Mapping'}]",
"aad.auth.authority": "<enter tenant ID>",
"aad.auth.appid": "<enter application ID>",
"aad.auth.appkey": "<enter client secret>",
"kusto.ingestion.url": "https://ingest-<name of cluster>.<region>.kusto.windows.net",
"kusto.query.url": "https://<name of cluster>.<region>.kusto.windows.net",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "org.apache.kafka.connect.storage.StringConverter"
}
}
Reemplaza los valores de los siguientes atributos según su configuración: aad.auth.authority, aad.auth.appid, aad.auth.appkey, kusto.tables.topics.mapping (nombre de la base de datos), kusto.ingestion.url y kusto.query.url.
Connector/Dockerfile
Este archivo tiene los comandos para generar la imagen de Docker para la instancia del conector. Incluye la descarga del conector desde el directorio de versión del repositorio Git.
Directorio de Storm-events-producer
Este directorio tiene un programa de Go que lee un archivo "StormEvents.csv" local y publica los datos en un tema de Kafka.
docker-compose.yaml
version: "2"
services:
zookeeper:
image: debezium/zookeeper:1.2
ports:
- 2181:2181
kafka:
image: debezium/kafka:1.2
ports:
- 9092:9092
links:
- zookeeper
depends_on:
- zookeeper
environment:
- ZOOKEEPER_CONNECT=zookeeper:2181
- KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://localhost:9092
kusto-connect:
build:
context: ./connector
args:
KUSTO_KAFKA_SINK_VERSION: 1.0.1
ports:
- 8083:8083
links:
- kafka
depends_on:
- kafka
environment:
- BOOTSTRAP_SERVERS=kafka:9092
- GROUP_ID=adx
- CONFIG_STORAGE_TOPIC=my_connect_configs
- OFFSET_STORAGE_TOPIC=my_connect_offsets
- STATUS_STORAGE_TOPIC=my_connect_statuses
events-producer:
build:
context: ./storm-events-producer
links:
- kafka
depends_on:
- kafka
environment:
- KAFKA_BOOTSTRAP_SERVER=kafka:9092
- KAFKA_TOPIC=storm-events
- SOURCE_FILE=StormEvents.csv
Inicio de los contenedores
Inicie los contenedores en un terminal:
docker-compose upLa aplicación del productor comenzará a enviar eventos al tema
storm-events. Debería ver registros similares a los siguientes:.... events-producer_1 | sent message to partition 0 offset 0 events-producer_1 | event 2007-01-01 00:00:00.0000000,2007-01-01 00:00:00.0000000,13208,NORTH CAROLINA,Thunderstorm Wind,Public events-producer_1 | events-producer_1 | sent message to partition 0 offset 1 events-producer_1 | event 2007-01-01 00:00:00.0000000,2007-01-01 05:00:00.0000000,23358,WISCONSIN,Winter Storm,COOP Observer ....Para comprobar los registros, ejecute el siguiente comando en un terminal independiente:
docker-compose logs -f | grep kusto-connect
Inicio del conector
Use una llamada REST de Kafka Connect para iniciar el conector.
En un terminal independiente, ejecute la tarea de sink con el siguiente comando:
curl -X POST -H "Content-Type: application/json" --data @adx-sink-config.json http://localhost:8083/connectorsPara comprobar el estado, ejecute el siguiente comando en un terminal independiente:
curl http://localhost:8083/connectors/storm/status
El conector inicia la colocación en cola de los procesos de ingesta.
Nota:
Si tiene problemas con el conector de registros, cree una incidencia.
Identidad administrada
De forma predeterminada, el conector de Kafka usa el método de aplicación para la autenticación durante la ingesta. Para autenticarse mediante la identidad administrada:
Asigne a su clúster una identidad administrada y conceda permisos de lectura a la cuenta de almacenamiento. Para obtener más información, consulte Ingesta de datos mediante la autenticación de identidad administrada.
En el archivo adx-sink-config.json, establezca el
aad.auth.strategyenmanaged_identityy asegúrese de queaad.auth.appidse establece en el identificador de cliente de identidad administrada (aplicación).Utilice un token de servicio de metadatos de instancia privada en lugar de la entidad de servicio de Microsoft Entra.
Nota:
Al usar la identidad administrada, el conector recupera automáticamente el appId y el tenant del contexto de llamada, por lo que no necesitas proporcionar una contraseña.
Consulta y revisión de los datos
Confirmación de la ingesta de datos
Una vez que los datos hayan llegado a la tabla
Storms, confirma la transferencia de datos; para ello, comprueba el recuento de filas:Storms | countConfirme que no hay errores en el proceso de ingesta:
.show ingestion failuresUna vez que aparezcan los datos, pruebe a realizar algunas consultas.
Consultar los datos
Para ver todos los registros, ejecute la siguiente consulta:
Storms | take 10Use
whereyprojectpara filtrar datos específicos:Storms | where EventType == 'Drought' and State == 'TEXAS' | project StartTime, EndTime, Source, EventIdUse el operador
summarize:Storms | summarize event_count=count() by State | where event_count > 10 | project State, event_count | render columnchart
Para ver más ejemplos de consultas e instrucciones, consulte cómo escribir consultas en KQL y la documentación del lenguaje de consulta Kusto.
Restablecer
Para restablecer, realice los pasos siguientes:
- Detener los contenedores (
docker-compose down -v) - Eliminar (
drop table Storms) - Volver a crear la tabla
Storms - Volver a crear la asignación de tabla
- Reiniciar los contenedores (
docker-compose up)
Limpieza de recursos
Para eliminar los recursos de Azure Data Explorer, use az kusto cluster delete (kusto extension) o az kusto database delete (kusto extension):
az kusto cluster delete --name "<cluster name>" --resource-group "<resource group name>"
az kusto database delete --cluster-name "<cluster name>" --database-name "<database name>" --resource-group "<resource group name>"
También puede eliminar el clúster y la base de datos a través de Azure Portal. Para más información, consulte Eliminación de un clúster de Azure Data Explorer y Eliminación de una base de datos en Azure Data Explorer.
Ajuste del conector Sink de Kafka
Ajuste el conector Sink de Kafka para que funcione con la política de agrupamiento de ingesta:
- Ajuste el límite de tamaño
flush.size.bytesdel receptor de Kafka a partir de 1 MB, aumentando en incrementos de 10 MB o 100 MB. - Cuando se utiliza un Kafka Sink, los datos se procesan dos veces. En el lado del conector, los datos se agregan según la configuración de vaciado y en el del servicio según la directiva de procesamiento por lotes. Si el tiempo de procesamiento por lotes es demasiado corto, el conector y el servicio no pueden ingerir datos y se debe aumentar el tiempo de procesamiento por lotes. Establezca el tamaño del procesamiento por lotes en 1 GB y aumente o disminuya en incrementos de 100 MB según sea necesario. Por ejemplo, si el tamaño de vaciado es de 1 MB y el tamaño de la directiva de procesamiento por lotes es de 100 MB, el conector receptor de Kafka agrega datos a un lote de 100 MB. Después, el servicio ingiere ese lote. Si el tiempo de la directiva de procesamiento por lotes es de 20 segundos y el conector de Kafka Sink transfiere 50 MB en un período de 20 segundos, el servicio ingerirá un lote de 50 MB.
- Puede escalar agregando instancias y particiones de Kafka. Aumente
tasks.maxhasta alcanzar el número de particiones. Cree una partición si tiene suficientes datos para generar un blob del tamaño de la opciónflush.size.bytes. Si el blob es más pequeño, el lote se procesa cuando se alcanza el límite de tiempo, por lo que la partición no dispone de suficiente rendimiento. Un gran número de particiones significa una mayor sobrecarga de procesamiento.
Contenido relacionado
- Más información sobre Arquitecturas de macrodatos.
- Obtenga información sobre la Ingesta de datos de ejemplo con formato JSON en Azure Data Explorer.
- Más información con los laboratorios de Kafka: