Partager via


Spring Cloud Stream avec Azure Event Hubs

Ce tutoriel montre comment envoyer et recevoir des messages à l’aide de Azure Event Hubs et spring Cloud Stream Binder Eventhubs dans une application Spring Boot.

Prérequis

Remarque

Pour accorder à votre compte l'accès aux ressources, dans Azure Event Hubs, affectez le rôle Azure Event Hubs Data Receiver et Azure Event Hubs Data Sender au compte Microsoft Entra que vous utilisez actuellement. Ensuite, dans le compte Azure Storage, attribuez le rôle Storage Blob Data Contributor au compte Microsoft Entra que vous utilisez actuellement. Pour plus d’informations sur l’octroi de rôles d’accès, consultez Assigner des rôles Azure à l’aide du portail Azure et Autoriser l’accès aux ressources Event Hubs à l’aide de Microsoft Entra ID.

Important

La version 2.5 ou supérieure de Spring Boot est nécessaire pour réaliser les étapes de ce tutoriel.

Envoyer et recevoir des messages de Azure Event Hubs

Avec un compte Azure Storage et un hub d’événements Azure, vous pouvez envoyer et recevoir des messages à l’aide de Spring Cloud Azure Stream Binder Event Hubs.

Pour installer le module Spring Cloud Azure Stream Binder Event Hubs, ajoutez les dépendances suivantes à votre fichier pom.xml :

  • Le Bill of Materials (BOM) de Spring Cloud Azure :

    <dependencyManagement>
      <dependencies>
        <dependency>
          <groupId>com.azure.spring</groupId>
          <artifactId>spring-cloud-azure-dependencies</artifactId>
          <version>7.1.0</version>
          <type>pom</type>
          <scope>import</scope>
        </dependency>
      </dependencies>
    </dependencyManagement>
    

    Remarque

    Si vous utilisez Spring Boot 4.0.x, veillez à mettre la spring-cloud-azure-dependencies version à 7.1.0.

    Si vous utilisez Spring Boot 3.5.x, assurez-vous de définir la version spring-cloud-azure-dependencies à 6.1.0.

    Si vous utilisez Spring Boot 3.1.x-3.5.x, veillez à définir la version spring-cloud-azure-dependencies5.25.0.

    Si vous utilisez Spring Boot 2.x, assurez-vous de définir la version spring-cloud-azure-dependencies sur 4.20.0.

    Cette liste de matériel (BOM) doit être configurée dans la section <dependencyManagement> de votre fichier pom.xml. Cela garantit que toutes les dépendances spring Cloud Azure utilisent la même version.

    Pour plus d’informations sur la version utilisée pour ce boM, consultez Which Version of Spring Cloud Azure Should I Use.

  • L'artefact Spring Cloud Azure Stream Binder Event Hubs :

    <dependency>
       <groupId>com.azure.spring</groupId>
       <artifactId>spring-cloud-azure-stream-binder-eventhubs</artifactId>
    </dependency>
    

Coder l’application

Suivez les étapes suivantes pour configurer votre application pour produire et consommer des messages à l’aide de Azure Event Hubs.

  1. Configurez les informations d'identification du hub d'événements en ajoutant les propriétés suivantes à votre fichier application.properties.

     spring.cloud.azure.eventhubs.namespace=${AZURE_EVENTHUBS_NAMESPACE}
     spring.cloud.azure.eventhubs.processor.checkpoint-store.account-name=${AZURE_STORAGE_ACCOUNT_NAME}
     spring.cloud.azure.eventhubs.processor.checkpoint-store.container-name=${AZURE_STORAGE_CONTAINER_NAME}
     spring.cloud.stream.bindings.consume-in-0.destination=${AZURE_EVENTHUB_NAME}
     spring.cloud.stream.bindings.consume-in-0.group=${AZURE_EVENTHUB_CONSUMER_GROUP}
     spring.cloud.stream.bindings.supply-out-0.destination=${AZURE_EVENTHUB_NAME}
     spring.cloud.stream.eventhubs.bindings.consume-in-0.consumer.checkpoint.mode=MANUAL
     spring.cloud.function.definition=consume;supply;
     spring.cloud.stream.poller.initial-delay=0
     spring.cloud.stream.poller.fixed-delay=1000
    

    Le tableau suivant décrit les champs de la configuration :

    Champ Descriptif
    spring.cloud.azure.eventhubs.namespace Spécifiez l’espace de noms que vous avez obtenu dans votre event Hub à partir du portail Azure.
    spring.cloud.azure.eventhubs.processor.checkpoint-store.account-name Spécifiez le compte de stockage que vous avez créé dans ce tutoriel.
    spring.cloud.azure.eventhubs.processor.checkpoint-store.container-name Spécifiez le conteneur de votre compte de stockage.
    spring.cloud.stream.bindings.consume-in-0.destination Indiquez le hub d'événements utilisé dans ce guide.
    spring.cloud.stream.bindings.consume-in-0.group Spécifiez les groupes de consommateurs de votre instance Event Hubs.
    spring.cloud.stream.bindings.supply-out-0.destination Indiquez le même hub d'événements que celui utilisé dans ce guide.
    spring.cloud.stream.eventhubs.bindings.consume-in-0.consumer.checkpoint.mode Spécifiez MANUAL.
    spring.cloud.function.definition Spécifiez le composant Bean fonctionnel à lier aux destinations externes qui sont exposées par les liaisons.
    spring.cloud.stream.poller.initial-delay Spécifiez le délai initial pour les déclencheurs périodiques. La valeur par défaut est 0.
    spring.cloud.stream.poller.fixed-delay Spécifiez le délai fixe pour le poller par défaut en millisecondes. La valeur par défaut est 1000 L.
  2. Modifiez le fichier de classe de démarrage pour afficher le contenu suivant.

    import com.azure.spring.messaging.checkpoint.Checkpointer;
    import com.azure.spring.messaging.eventhubs.support.EventHubsHeaders;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.boot.CommandLineRunner;
    import org.springframework.boot.SpringApplication;
    import org.springframework.boot.autoconfigure.SpringBootApplication;
    import org.springframework.context.annotation.Bean;
    import org.springframework.messaging.Message;
    import org.springframework.messaging.support.MessageBuilder;
    import reactor.core.publisher.Flux;
    import reactor.core.publisher.Sinks;
    import java.util.function.Consumer;
    import java.util.function.Supplier;
    import static com.azure.spring.messaging.AzureHeaders.CHECKPOINTER;
    
    @SpringBootApplication
    public class EventHubBinderApplication implements CommandLineRunner {
    
        private static final Logger LOGGER = LoggerFactory.getLogger(EventHubBinderApplication.class);
        private static final Sinks.Many<Message<String>> many = Sinks.many().unicast().onBackpressureBuffer();
    
        public static void main(String[] args) {
            SpringApplication.run(EventHubBinderApplication.class, args);
        }
    
        @Bean
        public Supplier<Flux<Message<String>>> supply() {
            return ()->many.asFlux()
                           .doOnNext(m->LOGGER.info("Manually sending message {}", m))
                           .doOnError(t->LOGGER.error("Error encountered", t));
        }
    
        @Bean
        public Consumer<Message<String>> consume() {
            return message->{
                Checkpointer checkpointer = (Checkpointer) message.getHeaders().get(CHECKPOINTER);
                LOGGER.info("New message received: '{}', partition key: {}, sequence number: {}, offset: {}, enqueued "
                        +"time: {}",
                    message.getPayload(),
                    message.getHeaders().get(EventHubsHeaders.PARTITION_KEY),
                    message.getHeaders().get(EventHubsHeaders.SEQUENCE_NUMBER),
                    message.getHeaders().get(EventHubsHeaders.OFFSET),
                    message.getHeaders().get(EventHubsHeaders.ENQUEUED_TIME)
                );
                checkpointer.success()
                            .doOnSuccess(success->LOGGER.info("Message '{}' successfully checkpointed",
                                message.getPayload()))
                            .doOnError(error->LOGGER.error("Exception found", error))
                            .block();
            };
        }
    
        @Override
        public void run(String... args) {
            LOGGER.info("Going to add message {} to sendMessage.", "Hello World");
            many.emitNext(MessageBuilder.withPayload("Hello World").build(), Sinks.EmitFailureHandler.FAIL_FAST);
        }
    
    }
    

    Conseil

    Dans ce tutoriel, il n'y a pas d'opérations d'authentification dans les configurations ou le code. Toutefois, la connexion à Azure services nécessite une authentification. Pour terminer l’authentification, vous devez utiliser Azure Identity. Spring Cloud Azure utilise DefaultAzureCredential, que la bibliothèque Azure Identity fournit pour vous aider à obtenir des informations d’identification sans aucune modification du code.

    DefaultAzureCredential prend en charge plusieurs méthodes d’authentification et détermine quelle méthode doit être utilisée au moment de l’exécution. Cette approche permet à votre application d'utiliser différentes méthodes d'authentification dans différents environnements (tels que les environnements locaux et de production) sans implémenter de code spécifique à l'environnement. Pour plus d’informations, consultez DefaultAzureCredential.

    Pour terminer l’authentification dans les environnements de développement locaux, vous pouvez utiliser Azure CLI, Visual Studio Code, PowerShell ou d’autres méthodes. Pour plus d’informations, consultez l'authentification Azure dans les environnements de développement Java. Pour terminer l’authentification dans Azure environnements d’hébergement, nous vous recommandons d’utiliser l’identité managée affectée par l’utilisateur. Pour plus d’informations, consultez Qu’est-ce que les identités managées pour les ressources Azure ?

  3. Lancez l’application. Les messages de ce type apparaîtront dans le journal de votre application, comme illustré ci-dessous :

    New message received: 'Hello World', partition key: 107207233, sequence number: 458, offset: 94256, enqueued time: 2023-02-17T08:27:59.641Z
    Message 'Hello World!' successfully checkpointed
    

Déployer sur Azure Spring Apps

Maintenant que l'application Spring Boot fonctionne localement, il est temps de la mettre en production. Azure Spring Apps facilite le déploiement d’applications Spring Boot sur Azure sans aucune modification du code. Le service gère l’infrastructure des applications Spring, ce qui permet aux développeurs de se concentrer sur leur code. Azure Spring Apps fournit une gestion du cycle de vie à l’aide de la supervision et des diagnostics complets, de la gestion de la configuration, de la découverte de services, de l’intégration CI/CD, des déploiements bleu-vert, etc. Pour déployer votre application sur Azure Spring Apps, consultez Deploy your first application to Azure Spring Apps.

Étapes suivantes