Delen via


Spring Cloud Stream met Azure Service Bus

Dit artikel laat zien hoe u spring Cloud Stream Binder gebruikt om berichten te verzenden naar en te ontvangen van Service Bus queues en topics.

Azure biedt een asynchroon berichtenplatform met de naam Azure Service Bus ("Service Bus") dat is gebaseerd op de standaard Advanced Message Queueing Protocol 1.0 ("AMQP 1.0").. Service Bus kan worden gebruikt voor het hele scala aan ondersteunde Azure platforms.

Vereisten

Notitie

Als u uw account toegang wilt verlenen tot uw Azure Service Bus-resources, wijst u de rol Azure Service Bus Data Sender en Azure Service Bus Data Receiver toe aan het Microsoft Entra-account dat u momenteel gebruikt. Zie voor meer informatie over het verlenen van toegangsrollen Azure-rollen toewijzen met behulp van de Azure-portal en een toepassing authenticeren en autoriseren met Microsoft Entra ID om toegang te krijgen tot Azure Service Bus-entiteiten.

Belangrijk

Spring Boot versie 2.5 of hoger is vereist om de stappen in dit artikel uit te voeren.

Berichten verzenden en ontvangen van Azure Service Bus

Met een wachtrij of onderwerp voor Azure Service Bus kunt u berichten verzenden en ontvangen met Spring Cloud Azure Stream Binder-Service Bus.

Als u de Module Spring Cloud Azure Stream Binder Service Bus wilt installeren, voegt u de volgende afhankelijkheden toe aan uw bestand pom.xml:

  • De Spring Cloud Azure Stuklijst (BOM):

    <dependencyManagement>
      <dependencies>
        <dependency>
          <groupId>com.azure.spring</groupId>
          <artifactId>spring-cloud-azure-dependencies</artifactId>
          <version>7.1.0</version>
          <type>pom</type>
          <scope>import</scope>
        </dependency>
      </dependencies>
    </dependencyManagement>
    

    Notitie

    Als u Spring Boot 4.0.x gebruikt, moet u de spring-cloud-azure-dependencies versie instellen op 7.1.0.

    Als u Spring Boot 3.5.x gebruikt, moet u de spring-cloud-azure-dependencies versie instellen op 6.1.0.

    Als u Spring Boot 3.1.x-3.5.x gebruikt, moet u de spring-cloud-azure-dependencies versie instellen op 5.25.0.

    Als u Spring Boot 2.x gebruikt, moet u de spring-cloud-azure-dependencies versie instellen op 4.20.0.

    Deze BOM (Bill of Material) moet worden geconfigureerd in de <dependencyManagement> sectie van uw pom.xml-bestand. Dit zorgt ervoor dat alle Spring Cloud-Azure afhankelijkheden dezelfde versie gebruiken.

    Zie Welke Versie van Spring Cloud Azure Moet Ik Gebruiken voor meer informatie over de versie die voor deze BOM wordt gebruikt.

  • Het Service Bus-artefact van de Spring Cloud Azure Stream Binder:

    <dependency>
         <groupId>com.azure.spring</groupId>
         <artifactId>spring-cloud-azure-stream-binder-servicebus</artifactId>
    </dependency>
    

De toepassing coderen

Gebruik de volgende stappen om uw toepassing te configureren voor het gebruik van een Service Bus wachtrij of onderwerp voor het verzenden en ontvangen van berichten.

  1. Configureer de Service Bus referenties in het configuratiebestand application.properties.

     spring.cloud.azure.servicebus.namespace=${AZURE_SERVICEBUS_NAMESPACE}
     spring.cloud.stream.bindings.consume-in-0.destination=${AZURE_SERVICEBUS_QUEUE_NAME}
     spring.cloud.stream.bindings.supply-out-0.destination=${AZURE_SERVICEBUS_QUEUE_NAME}
     spring.cloud.stream.servicebus.bindings.consume-in-0.consumer.auto-complete=false
     spring.cloud.stream.servicebus.bindings.supply-out-0.producer.entity-type=queue
     spring.cloud.function.definition=consume;supply;
     spring.cloud.stream.poller.fixed-delay=60000 
     spring.cloud.stream.poller.initial-delay=0
    

    In de volgende tabel worden de velden in de configuratie beschreven:

    Veld Beschrijving
    spring.cloud.azure.servicebus.namespace Geef de naamruimte op die u hebt verkregen in uw Service Bus vanuit de Azure-portal.
    spring.cloud.stream.bindings.consume-in-0.destination Geef de Service Bus wachtrij of Service Bus onderwerp op dat u in deze zelfstudie hebt gebruikt.
    spring.cloud.stream.bindings.supply-out-0.destination Geef dezelfde waarde op als voor invoerbestemming.
    spring.cloud.stream.servicebus.bindings.consume-in-0.consumer.auto-complete Geef op of berichten automatisch moeten worden afgehandeld. Als deze optie is ingesteld als false, wordt er een berichtkop van Checkpointer toegevoegd om ontwikkelaars in staat te stellen berichten handmatig te vereffenen.
    spring.cloud.stream.servicebus.bindings.supply-out-0.producer.entity-type Geef het entiteitstype voor de uitvoerbinding op, dit kan queue of topic zijn.
    spring.cloud.function.definition Geef op welke functionele bean verbonden moet worden met de externe bestemmingen die door de bindingen worden geëxposeerd.
    spring.cloud.stream.poller.fixed-delay Geef een vaste vertraging op voor de standaard poller in milliseconden. De standaardwaarde is 1000 L. De aanbevolen waarde is 60000.
    spring.cloud.stream.poller.initial-delay Geef de initiële vertraging op voor periodieke triggers. De standaardwaarde is 0.
  2. Bewerk het opstartklassebestand om de volgende inhoud weer te geven.

    import com.azure.spring.messaging.checkpoint.Checkpointer;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.boot.CommandLineRunner;
    import org.springframework.boot.SpringApplication;
    import org.springframework.boot.autoconfigure.SpringBootApplication;
    import org.springframework.context.annotation.Bean;
    import org.springframework.messaging.Message;
    import org.springframework.messaging.support.MessageBuilder;
    import reactor.core.publisher.Flux;
    import reactor.core.publisher.Sinks;
    import java.util.function.Consumer;
    import java.util.function.Supplier;
    import static com.azure.spring.messaging.AzureHeaders.CHECKPOINTER;
    
    @SpringBootApplication
    public class ServiceBusQueueBinderApplication implements CommandLineRunner {
    
        private static final Logger LOGGER = LoggerFactory.getLogger(ServiceBusQueueBinderApplication.class);
        private static final Sinks.Many<Message<String>> many = Sinks.many().unicast().onBackpressureBuffer();
    
        public static void main(String[] args) {
            SpringApplication.run(ServiceBusQueueBinderApplication.class, args);
        }
    
        @Bean
        public Supplier<Flux<Message<String>>> supply() {
            return ()->many.asFlux()
                           .doOnNext(m->LOGGER.info("Manually sending message {}", m))
                           .doOnError(t->LOGGER.error("Error encountered", t));
        }
    
        @Bean
        public Consumer<Message<String>> consume() {
            return message->{
                Checkpointer checkpointer = (Checkpointer) message.getHeaders().get(CHECKPOINTER);
                LOGGER.info("New message received: '{}'", message.getPayload());
                checkpointer.success()
                            .doOnSuccess(s->LOGGER.info("Message '{}' successfully checkpointed", message.getPayload()))
                            .doOnError(e->LOGGER.error("Error found", e))
                            .block();
            };
        }
    
        @Override
        public void run(String... args) {
            LOGGER.info("Going to add message {} to Sinks.Many.", "Hello World");
            many.emitNext(MessageBuilder.withPayload("Hello World").build(), Sinks.EmitFailureHandler.FAIL_FAST);
        }
    
    }
    

    Aanbeveling

    In deze handleiding zijn er geen verificatiebewerkingen in de configuraties of de code. Voor het maken van verbinding met Azure-services is echter verificatie vereist. Als u de verificatie wilt voltooien, moet u Azure Identiteit gebruiken. Spring Cloud Azure maakt gebruik van DefaultAzureCredential, die de Azure Identity-bibliotheek biedt om u te helpen referenties te verkrijgen zonder codewijzigingen.

    DefaultAzureCredential ondersteunt meerdere verificatiemethoden en bepaalt welke methode tijdens runtime moet worden gebruikt. Met deze aanpak kan uw app verschillende verificatiemethoden gebruiken in verschillende omgevingen (zoals lokale en productieomgevingen) zonder omgevingsspecifieke code te implementeren. Zie DefaultAzureCredential voor meer informatie.

    Als u de verificatie in lokale ontwikkelomgevingen wilt voltooien, kunt u Azure CLI, Visual Studio Code, PowerShell of andere methoden gebruiken. Zie Azure-verificatie in Java ontwikkelomgevingen voor meer informatie. Als u de verificatie in Azure hostingomgevingen wilt voltooien, raden we u aan om een door de gebruiker toegewezen beheerde identiteit te gebruiken. Zie Wat zijn beheerde identiteiten voor Azure resources?

  3. Start de toepassing. Berichten zoals het volgende voorbeeld worden in uw toepassingslogboek geplaatst:

    New message received: 'Hello World'
    Message 'Hello World' successfully checkpointed
    

Volgende stappen