Partager via


Spring Cloud Stream avec Azure Service Bus

Cet article montre comment utiliser Spring Cloud Stream Binder pour envoyer et recevoir des messages de Service Bus queues et topics.

Azure fournit une plateforme de messagerie asynchrone appelée Azure Service Bus (« Service Bus ») basée sur la norme Advanced Message Queueing Protocol 1.0 (« AMQP 1.0 »). Service Bus peut être utilisé sur l'ensemble des plateformes d'Azure prises en charge.

Prérequis

Remarque

Pour accorder à votre compte l'accès à vos ressources Azure Service Bus, affectez le rôle Azure Service Bus Data Sender et Azure Service Bus Data Receiver au compte Microsoft Entra que vous utilisez actuellement. Pour plus d’informations sur l’octroi de rôles d’accès, consultez Assigner des rôles Azure à l’aide du portail Azure et Authenticate et autoriser une application avec des Microsoft Entra ID pour accéder aux entités Azure Service Bus.

Importante

Spring Boot version 2.5 ou supérieure est nécessaire pour effectuer les étapes de cet article.

Envoyer et recevoir des messages de Azure Service Bus

Avec une file d’attente ou une rubrique pour Azure Service Bus, vous pouvez envoyer et recevoir des messages à l’aide de Spring Cloud Azure Stream Binder Service Bus.

Pour installer le module Spring Cloud Azure Stream Binder Service Bus, ajoutez les dépendances suivantes à votre fichier pom.xml :

  • Le Bill of Materials (BOM) de Spring Cloud Azure :

    <dependencyManagement>
      <dependencies>
        <dependency>
          <groupId>com.azure.spring</groupId>
          <artifactId>spring-cloud-azure-dependencies</artifactId>
          <version>7.1.0</version>
          <type>pom</type>
          <scope>import</scope>
        </dependency>
      </dependencies>
    </dependencyManagement>
    

    Remarque

    Si vous utilisez Spring Boot 4.0.x, veillez à mettre la spring-cloud-azure-dependencies version à 7.1.0.

    Si vous utilisez Spring Boot 3.5.x, assurez-vous de définir la version spring-cloud-azure-dependencies à 6.1.0.

    Si vous utilisez Spring Boot 3.1.x-3.5.x, veillez à définir la version spring-cloud-azure-dependencies5.25.0.

    Si vous utilisez Spring Boot 2.x, assurez-vous de définir la version spring-cloud-azure-dependencies sur 4.20.0.

    Cette liste de matériel (BOM) doit être configurée dans la section <dependencyManagement> de votre fichier pom.xml. Cela garantit que toutes les dépendances spring Cloud Azure utilisent la même version.

    Pour plus d’informations sur la version utilisée pour ce boM, consultez Which Version of Spring Cloud Azure Should I Use.

  • Artefact du Spring Cloud Azure Stream Binder pour Service Bus

    <dependency>
         <groupId>com.azure.spring</groupId>
         <artifactId>spring-cloud-azure-stream-binder-servicebus</artifactId>
    </dependency>
    

Coder l’application

Procédez comme suit pour configurer votre application afin d’utiliser une file d’attente ou une rubrique Service Bus pour envoyer et recevoir des messages.

  1. Configurez les informations d’identification Service Bus dans le fichier de configuration application.properties.

     spring.cloud.azure.servicebus.namespace=${AZURE_SERVICEBUS_NAMESPACE}
     spring.cloud.stream.bindings.consume-in-0.destination=${AZURE_SERVICEBUS_QUEUE_NAME}
     spring.cloud.stream.bindings.supply-out-0.destination=${AZURE_SERVICEBUS_QUEUE_NAME}
     spring.cloud.stream.servicebus.bindings.consume-in-0.consumer.auto-complete=false
     spring.cloud.stream.servicebus.bindings.supply-out-0.producer.entity-type=queue
     spring.cloud.function.definition=consume;supply;
     spring.cloud.stream.poller.fixed-delay=60000 
     spring.cloud.stream.poller.initial-delay=0
    

    Le tableau suivant décrit les champs de la configuration :

    Champ Descriptif
    spring.cloud.azure.servicebus.namespace Spécifiez l’espace de noms que vous avez obtenu dans votre Service Bus à partir du portail Azure.
    spring.cloud.stream.bindings.consume-in-0.destination Spécifiez la file d’attente ou le sujet Service Bus que vous avez utilisé dans ce didacticiel.
    spring.cloud.stream.bindings.supply-out-0.destination Spécifiez la même valeur que celle utilisée pour la destination d’entrée.
    spring.cloud.stream.servicebus.bindings.consume-in-0.consumer.auto-complete Indiquez si les messages doivent être décomptés automatiquement. S’il est défini comme false, un en-tête de message de Checkpointer est ajouté pour permettre aux développeurs de régler manuellement les messages.
    spring.cloud.stream.servicebus.bindings.supply-out-0.producer.entity-type Spécifiez le type d'entité pour la liaison de sortie, qui peut être queue ou topic.
    spring.cloud.function.definition Spécifiez le composant Bean fonctionnel à lier aux destinations externes qui sont exposées par les liaisons.
    spring.cloud.stream.poller.fixed-delay Spécifiez le délai fixe pour le poller par défaut en millisecondes. La valeur par défaut est 1000 L. 60000 est la valeur recommandée.
    spring.cloud.stream.poller.initial-delay Spécifiez le délai initial pour les déclencheurs périodiques. La valeur par défaut est 0.
  2. Modifiez le fichier de classe de démarrage pour afficher le contenu suivant.

    import com.azure.spring.messaging.checkpoint.Checkpointer;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.boot.CommandLineRunner;
    import org.springframework.boot.SpringApplication;
    import org.springframework.boot.autoconfigure.SpringBootApplication;
    import org.springframework.context.annotation.Bean;
    import org.springframework.messaging.Message;
    import org.springframework.messaging.support.MessageBuilder;
    import reactor.core.publisher.Flux;
    import reactor.core.publisher.Sinks;
    import java.util.function.Consumer;
    import java.util.function.Supplier;
    import static com.azure.spring.messaging.AzureHeaders.CHECKPOINTER;
    
    @SpringBootApplication
    public class ServiceBusQueueBinderApplication implements CommandLineRunner {
    
        private static final Logger LOGGER = LoggerFactory.getLogger(ServiceBusQueueBinderApplication.class);
        private static final Sinks.Many<Message<String>> many = Sinks.many().unicast().onBackpressureBuffer();
    
        public static void main(String[] args) {
            SpringApplication.run(ServiceBusQueueBinderApplication.class, args);
        }
    
        @Bean
        public Supplier<Flux<Message<String>>> supply() {
            return ()->many.asFlux()
                           .doOnNext(m->LOGGER.info("Manually sending message {}", m))
                           .doOnError(t->LOGGER.error("Error encountered", t));
        }
    
        @Bean
        public Consumer<Message<String>> consume() {
            return message->{
                Checkpointer checkpointer = (Checkpointer) message.getHeaders().get(CHECKPOINTER);
                LOGGER.info("New message received: '{}'", message.getPayload());
                checkpointer.success()
                            .doOnSuccess(s->LOGGER.info("Message '{}' successfully checkpointed", message.getPayload()))
                            .doOnError(e->LOGGER.error("Error found", e))
                            .block();
            };
        }
    
        @Override
        public void run(String... args) {
            LOGGER.info("Going to add message {} to Sinks.Many.", "Hello World");
            many.emitNext(MessageBuilder.withPayload("Hello World").build(), Sinks.EmitFailureHandler.FAIL_FAST);
        }
    
    }
    

    Conseil

    Dans ce tutoriel, il n'y a pas d'opérations d'authentification dans les configurations ou le code. Toutefois, la connexion à Azure services nécessite une authentification. Pour terminer l’authentification, vous devez utiliser Azure Identity. Spring Cloud Azure utilise DefaultAzureCredential, que la bibliothèque Azure Identity fournit pour vous aider à obtenir des informations d’identification sans aucune modification du code.

    DefaultAzureCredential prend en charge plusieurs méthodes d’authentification et détermine quelle méthode doit être utilisée au moment de l’exécution. Cette approche permet à votre application d'utiliser différentes méthodes d'authentification dans différents environnements (tels que les environnements locaux et de production) sans implémenter de code spécifique à l'environnement. Pour plus d’informations, consultez DefaultAzureCredential.

    Pour terminer l’authentification dans les environnements de développement locaux, vous pouvez utiliser Azure CLI, Visual Studio Code, PowerShell ou d’autres méthodes. Pour plus d’informations, consultez l'authentification Azure dans les environnements de développement Java. Pour terminer l’authentification dans Azure environnements d’hébergement, nous vous recommandons d’utiliser l’identité managée affectée par l’utilisateur. Pour plus d’informations, consultez Qu’est-ce que les identités managées pour les ressources Azure ?

  3. Lancez l’application. Les messages tels que l'exemple suivant seront affichés dans le journal de votre application :

    New message received: 'Hello World'
    Message 'Hello World' successfully checkpointed
    

Étapes suivantes