Spring Cloud 微服务入门教程(七):Spring Cloud Stream 消息队驱动式的微服务
2020年02月26日 17:30:00 · 本文共 5,333 字阅读时间约 19分钟 · 4,076 次浏览上一节的《Spring Cloud 微服务入门教程(六):Spring Cloud BUS 消息总线实现配置中心动态更新配置文件》已经安装了RabbitMQ消息队列,并实现了SpringCloudBus消息总线,本节介绍Spring Cloud Stream 消息队驱动式的微服务。可以使用RabbitMQ、Apache Kafka等,用于微服务之间的异步消息传递和接收。
我们先规划一下打算怎么做,直接上代码可能会有点难理解,我们要实现的是DemoClient给DemoService发送一条消息放入消息队列中,然后DemoService接收消息并且给DemoClient回复一条消息。我说一下我个人对这个Spring Cloud Stream的理解,消息被分为很多个频道,你可以接收某个频道,也可以对某个频道发送消息,所以你需要知道频道的名称,我就统一定义在统一接口中心里,这个统一接口中心是我自己设计的架构,并不是微服务的。在上一节我们已经配置过RabbitMQ,所以配置RabbitMQ的部分不再赘述。
修改apicenter、demoservice、democlient微服务的pom文件,增加spring-cloud-starter-stream-rabbit依赖,例如:
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>cloud</artifactId>
<groupId>net.renfei</groupId>
<version>1.0.0</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<groupId>net.renfei</groupId>
<artifactId>apicenter</artifactId>
<version>1.0.0</version>
<name>APICenter</name>
<description>接口中心</description>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>
</dependencies>
</project>
修改远程Git上的配置文件
修改远程Git上的application.yml,包括democlient和demoservice的配置,增加:
spring:
stream:
bindings:
demoServiceMQ:
group: demo
content-type: application/json
demoClientMQ:
group: demo
content-type: application/json
bindings后面的是我们的频道名称,这个是自定义的,再后面的group是消费组,消费组防止消息被重复消费,微服务可能会启动多个实例组,保证每个组中只有一个成员会收到该消息,content-type是告诉框架我们要把对象作为json格式保存,这样方便我们在消息队列中查看对象的内容,调试起来会很方便。
统一接口中心新增频道名
在apicenter中新增一个net.renfei.apicenter.message.MQChannel的interface,用来规范和暴露所有微服务的频道名称:
package net.renfei.apicenter.message;
/**
* 消息队列频道名称
*
* @author RenFei
*/
public interface MQChannel {
String DEMOSERVICE = "demoServiceMQ";
String DEMOCLIENT = "demoClientMQ";
}
消息接收端
在demoservice模块中新增net.renfei.demoservice.message.DemoServiceMessageClient作为接收客户端:
package net.renfei.demoservice.message;
import net.renfei.apicenter.message.MQChannel;
import org.springframework.cloud.stream.annotation.Input;
import org.springframework.messaging.SubscribableChannel;
public interface DemoServiceMessageClient {
@Input(MQChannel.DEMOSERVICE)
SubscribableChannel input();
}
在demoservice模块中新增net.renfei.demoservice.message.DemoClientMessageClient作为发送客户端:
package net.renfei.demoservice.message;
import org.springframework.cloud.stream.annotation.Output;
import org.springframework.messaging.MessageChannel;
public interface DemoClientMessageClient {
@Output
MessageChannel output();
}
在demoservice模块中新增net.renfei.demoservice.message.DemoServiceReceiver作为消息监听者:
package net.renfei.demoservice.message;
import lombok.extern.slf4j.Slf4j;
import net.renfei.apicenter.message.MQChannel;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.messaging.handler.annotation.SendTo;
import org.springframework.stereotype.Component;
/**
* 服务端监听消息队列
*
* @author RenFei
*/
@Slf4j
@Component
@EnableBinding({DemoServiceMessageClient.class, DemoClientMessageClient.class})
public class DemoServiceReceiver {
@StreamListener(MQChannel.DEMOSERVICE)
@SendTo(MQChannel.DEMOCLIENT)
public String process(Object message) {
log.info("Messages received by the DemoService:{}", message);
return "This is DemoServiceReceiver's reply";
}
}
消息发送端
在democlient中新增net.renfei.democlient.message.DemoClientMessageClient作为消息接收客户端:
package net.renfei.democlient.message;
import net.renfei.apicenter.message.MQChannel;
import org.springframework.cloud.stream.annotation.Input;
import org.springframework.cloud.stream.annotation.Output;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.SubscribableChannel;
public interface DemoClientMessageClient {
@Input(MQChannel.DEMOCLIENT)
SubscribableChannel input();
}
在democlient中新增net.renfei.democlient.message.DemoServiceMessageClient作为消息发送端:
package net.renfei.democlient.message;
import net.renfei.apicenter.message.MQChannel;
import org.springframework.cloud.stream.annotation.Output;
import org.springframework.messaging.MessageChannel;
/**
* DemoService的MQ频道客户端
*
* @author RenFei
*/
public interface DemoServiceMessageClient {
@Output(MQChannel.DEMOSERVICE)
MessageChannel output();
}
在democlient中新增net.renfei.democlient.controller.SendMessageController作为消息发送的触发入口:
package net.renfei.democlient.controller;
import net.renfei.democlient.message.DemoServiceMessageClient;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
@RestController
public class SendMessageController {
@Autowired
private DemoServiceMessageClient demoServiceMessageClient;
@GetMapping("/sendMessage")
public void sendMessage(){
demoServiceMessageClient.output().send(
MessageBuilder.withPayload("This is a message from democlient").build()
);
}
}
运行测试
先启动注册中心eureka,然后启动配置中心config,再启动demoservice服务,最后启动democlient,访问我们新建的DemoClientMessageClient,触发消息发送,演示系统的地址是:http://localhost:18081/sendMessage
总结
代码已经陈述完了,做一下总结,@Input SubscribableChannel是订阅频道,用于接收消息;@Output MessageChannel是用来发送消息,这样应用之间可以解耦合降低依赖,比较经典的场景是发送短信,核心的业务不需要等待短信接口的结果,直接给短信服务发送一个消息以后就去干别的事了,短信服务接收到消息以后逐一执行发送短信的任务。
版权声明:本文为博主「任霏」原创文章,遵循 CC BY-NC-SA 4.0 版权协议,转载请附上原文出处链接及本声明。
原文链接:https://blog.renfei.net/posts/1003327
相关推荐
猜你还喜欢这些内容,不妨试试阅读一下以下内容均由网友提交发布,版权与真实性无法查证,请自行辨别。
- 前后端分离项目接口数据加密的秘钥交换逻辑(RSA、AES)
- OmniGraffle 激活/破解 密钥/密匙/Key/License
- Parallels Desktop For Mac 16.0.1.48911 破解版 [TNT]
- Redis 未授权访问漏洞分析 cleanfda 脚本复现漏洞挖矿
- CleanMyMac X 破解版 [TNT] 4.6.0
- OmniPlan 激活/破解 密钥/密匙/Key/License
- 人大金仓 KingbaseES V8 R3 安装包、驱动包和 License 下载地址
- Sound Control 破解版 2.4.2
- Parallels Desktop For Mac 15.1.4.47270 破解版 [TNT]
- Parallels Desktop For Mac 16.0.0.48916 破解版 [TNT]
- 博客完全迁移上阿里云,我所使用的阿里云架构
- 微软确认Windows 10存在bug 部分电脑升级后被冻结
- 大佬们在说的AQS,到底啥是个AQS(AbstractQueuedSynchronizer)同步队列
- 比特币(BTC)钱包客户端区块链数据同步慢,区块链数据离线下载
- Java中说的CAS(compare and swap)是个啥
- 小心免费主题!那些WordPress主题后门,一招拥有管理员权限
- 强烈谴责[wamae.win]恶意反向代理我站并篡改我站网页
- 讨论下Java中的volatile和JMM(Java Memory Model)Java内存模型
- 新版个人网站 NEILREN4J 上线并开源程序源码
- 我站近期遭受到恶意不友好访问攻击公告