博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
【本人秃顶程序员】使用Spring Cloud Stream和RabbitMQ实现事件驱动的微服务
阅读量:6337 次
发布时间:2019-06-22

本文共 6838 字,大约阅读时间需要 22 分钟。

hot3.png

←←←←←←←←←←←← 快!点关注

让我们展示如何使用Spring Cloud Stream来设计事件驱动的微服务。首先,Spring Cloud Stream首先有什么好处?因为Spring AMPQ提供了访问AMPQ工件所需的一切。如果您不熟悉Spring AMPQ,请查看此repo,其中包含许多有用的示例。那么为什么要使用Spring Cloud Stream ......?

Spring Cloud Stream概念

  • Spring Cloud Stream通过Binder概念将使用过的消息代理与Spring Integration消息通道绑定在一起。支持RabbitMQ和Kafka。
  • Spring Cloud Stream将基础架构配置从代码中分离为属性文件。这意味着即使您更改了底层代理,您的Spring Integration代码也将是相同的!

示例中的Spring Cloud Stream概念(RabbitMQ)

让我们有一个名为streamInput的交换,它有两个队列streamInput.cities和streamInput.persons。现在让我们将这两个队列插入两个消息通道citiesChannel和personsChannel来消费来自它的传入消息。使用Spring AMPQ,您需要创建SimpleMessageListenerContainer并在代码中连接基础结构。但这有很多样板代码。使用Spring Cloud Stream,您可以将AMPQ配置分离到属性文件:

spring.cloud.stream.bindings.citiesChannel.destination=streamInput  spring.cloud.stream.bindings.citiesChannel.group=cities  spring.cloud.stream.rabbit.bindings.citiesChannel.consumer.durableSubscription=true  spring.cloud.stream.rabbit.bindings.citiesChannel.consumer.bindingRoutingKey=citiesspring.cloud.stream.bindings.personsChannel.destination=streamInput  spring.cloud.stream.bindings.personsChannel.group=persons  spring.cloud.stream.rabbit.bindings.personsChannel.consumer.durableSubscription=true  spring.cloud.stream.rabbit.bindings.personsChannel.consumer.bindingRoutingKey=persons

配置详细信息

在类路径上使用RabbitMQ Binder,每个目标都映射到TopicExchange。在示例中,我创建了名为streamInput的TopicExchange, 并将其附加到两个消息通道citiesChannel和personsChannel。

spring.cloud.stream.bindings.citiesChannel.destination = streamInput   spring.cloud.stream.bindings.personsChannel.destination = streamInput

现在您需要了解RabbitMQ绑定器的灵感来自Kafka,队列的消费者被分组到消费者组中,其中只有一个消费者将获得消息。这是有道理的,因为您可以轻松扩展消费者。

因此,让我们创建两个队列streamInput.persons和streamInput.cities并将它们附加到streamInput TopicExchange和提到的消息通道

# This will create queue "streamInput.cities" connected to message channel citiesChannel where input messages will land.  spring.cloud.stream.bindings.citiesChannel.group=cities # Durable subscription, of course.  spring.cloud.stream.rabbit.bindings.citiesChannel.consumer.durableSubscription=true # AMPQ binding to exchange (previous spring.cloud.stream.bindings.
.destination settings). # Only messages with routingKey = 'cities' will land here. spring.cloud.stream.rabbit.bindings.citiesChannel.consumer.bindingRoutingKey=cities spring.cloud.stream.bindings.personsChannel.group=persons spring.cloud.stream.rabbit.bindings.personsChannel.consumer.durableSubscription=true spring.cloud.stream.rabbit.bindings.personsChannel.consumer.bindingRoutingKey=persons

连接属性到Spring Integration

好的,到目前为止我创建了两个队列。StreamInput.cities绑定到citiesChannel。StreamInput.persons绑定到peopleChannel。

<destination>.<group>是Spring Cloud Stream约定的队列命名,现在让我们将它连接到Spring Integration:

package com.example.spring.cloud.configuration;import org.springframework.cloud.stream.annotation.Input;  import org.springframework.messaging.SubscribableChannel;/**   \* Created by tomask79 on 30.03.17.   */  public interface SinkRabbitAPI {    String INPUT_CITIES = "citiesChannel";    String INPUT_PERSONS = "personsChannel";    @Input(SinkRabbitAPI.INPUT_CITIES)      SubscribableChannel citiesChannel();    @Input(SinkRabbitAPI.INPUT_PERSONS)      SubscribableChannel personsChannel();  }

Spring Boot启动时加载这个属性

package com.example.spring.cloud;import com.example.spring.cloud.configuration.SinkRabbitAPI;  import com.example.spring.cloud.configuration.SourceRabbitAPI;  import org.springframework.boot.SpringApplication;  import org.springframework.boot.autoconfigure.SpringBootApplication;  import org.springframework.cloud.stream.annotation.EnableBinding;  import org.springframework.scheduling.annotation.EnableScheduling;@SpringBootApplication  @EnableBinding({SinkRabbitAPI.class})  public class StreamingApplication {    public static void main(String\[\] args) {          SpringApplication.run(StreamingApplication.class, args);      }  }

在此之后,我们可以创建消费者从绑定的消息通道中的队列接收消息:

import com.example.spring.cloud.configuration.SinkRabbitAPI;  import com.example.spring.cloud.configuration.SourceRabbitAPI;  import org.springframework.cloud.stream.annotation.StreamListener;  import org.springframework.integration.support.MessageBuilder;  import org.springframework.messaging.MessageChannel;  import org.springframework.messaging.handler.annotation.SendTo;  import org.springframework.stereotype.Service;import javax.annotation.Resource;/**   \* Created by tomask79 on 30.03.17.   */  @Service  public class ProcessingAMPQEndpoint {    @StreamListener(SinkRabbitAPI.INPUT_CITIES)      public void processCity(final String city) {          System.out.println("Trying to process input city: "+city);      }    @StreamListener(SinkRabbitAPI.INPUT_PERSONS)      public void processPersons(final String person) {          System.out.println("Trying to process input person: "+person);      }  }

RabbitMQ绑定器和代理配置

Spring Cloud Stream如何知道在哪里寻找消息中间件?如果在类路径中找到RabbitMQ绑定器,则使用默认RabbitMQ主机(localhost)和端口(5672)连接到RabbitMQ服务器。如果您的消息中间件配置在不同端口,则需要配置属性:

spring:    cloud:      stream:        bindings:          ...        binders:            rabbitbinder:              type: rabbit              environment:                spring:                  rabbitmq:                    host: rabbitmq                    port: 5672                    username: XXX                    password: XXX

测试消息消费

  • 安装并运行RabbitMQ代理
  • rabbitmq.git
  • mvn clean install
  • java -jar target / streaming-0.0.1-SNAPSHOT.jar
  • 现在使用路由键'cities'或'persons'在streamInput Exchange上发布消息...输出应该是:
Started StreamingApplication in 6.513 seconds (JVM running for 6.92)   Trying to process input city: sdjfjljksdflkjsdflkjsdfsfd  Trying to process input person: sdjfjljksdflkjsdflkjsdfsfd

使用Spring Cloud Stream重新传递消息

您通常希望在进入DLX交换之前再次尝试接收消息。首先,让我们配置Spring Cloud Stream尝试重新发送失败消息的次数:

spring.cloud.stream.bindings.personsChannel.consumer.maxAttempts = 6

这意味着如果从streamInput.persons队列接收的消息出错,那么Spring Cloud Stream将尝试重新发送六次。让我们试试,首先让我们修改接收端点以模拟接收崩溃:

 @StreamListener(SinkRabbitAPI.INPUT_PERSONS)      public void processPersons(final String person) {          System.out.println("Trying to process input person: "+person);          throw new RuntimeException();      }

如果我现在尝试使用人员路由键将某些内容发布到streamInput交换中,那么这应该是输出:

Trying to process input person: sfsdfsdfsd  Trying to process input person: sfsdfsdfsd  Trying to process input person: sfsdfsdfsd  Trying to process input person: sfsdfsdfsd  Trying to process input person: sfsdfsdfsd  Trying to process input person: sfsdfsdfsd   Retry Policy Exhausted          at org.springframework.amqp.rabbit.retry.RejectAndDontRequeueRecoverer.recover  (RejectAndDontRequeueRecoverer.java:45) ~\[spring-rabbit-1.7.0.RELEASE.jar! /:na\]          at org.springframework.amqp.rabbit.config.StatelessRetryOperationsInterc       

建议将Spring Cloud Stream 用于事件驱动的MicroServices,因为它可以节省时间,而且您不需要为Java中的AMPQ基础架构编写样板代码。

写在最后:

秃顶程序员的不易,看到这里,点了关注吧!

点关注,不迷路,持续更新!!!

转载于:https://my.oschina.net/u/4065580/blog/3007153

你可能感兴趣的文章
子域名信息收集攻略
查看>>
[Android]开发数独游戏思路分析过程
查看>>
SpreadJS 类Excel表格控件 - V12 新特性详解
查看>>
理解并取证:IPv6与IPv4在报文结构上的区别
查看>>
EOS主网上线只是开始,如何运营决定未来
查看>>
不用Visual Studio,5分钟轻松实现一张报表
查看>>
(译)如何使用cocos2d和box2d来制作一个Breakout游戏:第一部分
查看>>
计算机图形学(一) 图形系统综述
查看>>
持续集成(CI)- 几种测试的区别(摘录)
查看>>
多用户虚拟Web3D环境Deep MatrixIP9 1.04发布
查看>>
求高手,求解释
查看>>
[MSSQL]NTILE另类分页有么有?!
查看>>
winform datagridview 通过弹出小窗口来隐藏列 和冻结窗口
查看>>
Jquery闪烁提示特效
查看>>
最佳6款用于移动网站开发的 jQuery 图片滑块插件
查看>>
C++ String
查看>>
获取系统托盘图标的坐标及文本
查看>>
log4j Test
查看>>
HDU 1255 覆盖的面积(矩形面积交)
查看>>
SQL数据库无法附加,提示 MDF" 已压缩,但未驻留在只读数据库或文件组中。必须将此文件解压缩。...
查看>>