起因
由于未来我们的系统都是由一个统一的用户管理系统来管理用户数据的,所以各个子系统中不需要也不可以随意的新增或修改用户的个人信息,鉴于此,我考虑通过MQ的方式,实现在用户系统中添加、删除或修改用户信息时同时将这条操作记录发送给该用户拥有权限的子系统中去。
有图更好理解:
既然说到MQ,则需要简单的介绍并了解一下MQ了,下文摘自百度百科:
MQ全称为Message Queue, 消息队列(MQ)是一种应用程序对应用程序的通信方法。应用程序通过读写出入队列的消息(针对应用程序的数据)来通信,而无需专用连接来链接它们。消息传递指的是程序之间通过在消息中发送数据进行通信,而不是通过直接调用彼此来通信,直接调用通常是用于诸如远程过程调用的技术。排队指的是应用程序通过 队列来通信。队列的使用除去了接收和发送应用程序同时执行的要求。其中较为成熟的MQ产品有IBM WEBSPHERE MQ等等。
经过各种文章的对比特性以及我们使用的业务场景等综合分析,最后选择了RabbitMQ作为我们的消息队列,至于对比的文章也都是百度的,有兴趣的可以自行查找。
教程
RabbitMQ的基础教程我就不写了,也是跟着往上的前辈分享边学边看的,在这里我分享一下:
首先是RabbitMQ的安装:
其次是RabbitMQ的基础:
然后学习RabbitMQ与Springboot的集成:
最后学习一下消息消费机制在Springboot下的实现:
学习Spring Boot:(二十六)使用 RabbitMQ 消息队列(消息确认ack,rpc)
业务
学习完上面的教程后,基本上对于RabbitMQ的基础操作就算是了解了,现在我们来结合我们的业务,考虑一下具体的实现内容和实现方式。
我们加入消息队列最核心的目的就是要做一个基础的数据同步:
当用户管理系统中的用户有增加、删除、修改的时候,我们可以通过消息队列将该次操作发给对应的子系统,子系统接到该操作指令后,同步的维护子系统的用户表即可。
既然核心目的知道了,那么实现内容就是分别要在用户管理系统中加入一个MQ的消息发送端、在各个子系统中加入消息的接收端,接收端接收到消息后执行对应的数据库操作,那么这里其实还隐藏着一个问题,随着子系统越来越多,如果不降接收端封装成公用模块的话,将会面临的是灾难性的维护成本,所以还必须考虑将接收端封装成Maven私服的包,并维护其版本。
实现
首先我们要定义一个接口,用来规范数据库的常用操作:
1 | /** |
之后我们要定义一个父类,这个父类用来规范操作流程,并连接RabbitMQ的相关设置:
1 | /** |
我在这里讲消息规范为带消息头和消息体的类型,中间用 “|” 进行分割,消息头主要放的是操作形式,如:add、update和delete,消息体放的就是用户信息的bean转化的json串了。
举例来说:add|{“username”:”jason”,”password”:”123456”}
一旦消息格式确定了,那么就需要定义一个工具类来处理这种消息类型:
1 | /** |
为了规范由于消息不规范带来的错误,我也设置了一个基础的异常类:
1 | /** |
以及公用数据类,主要放置的是命令头、异常信息代码和信息体以及消息队列的参数设置:
1 | /** |
然后是消息队列的基础设置:
1 | /** |
这里可以发现我用到了自定义属性,目的是子系统引用这个封装包后,可以自定义自己的队列名称、交换器名称和路由键名称,做到灵活处理各子系统的差异性(注意:下面注释的是子系统需要分别设置的属性):
1 | spring.rabbitmq.host=192.168.3.134 |
当然,为了将json串转化为对象,子系统本地还需要自定义一个Bean来接收数据,我这里只是提供了一个测试的Bean:
1 | /** |
辅助工作基本上都做完了,下面是如何使用了,根据我的设计,其实非常简单了,只需要继承AbsMessageRecv类病实现IsqlOperation接口,并传入对应的泛型类就可以了,请参看代码(这个类是在子系统中实现的,前提是要将消息队列接收端引入到你的项目中去):
1 | public class MessageRecvImpl extends AbsMessageRecv implements IsqlOperation<TestMQUser> { |
整个逻辑就是:
由于AbsMessageRecv父类中的container.setMessageListener(this);的设置,所以接收到的消息会最先进入到父类中ChannelAwareMessageListener接口的实现方法onMessage中,在该方法中我又调用了本类的receive(Message message)方法对数据进行处理,首先会默认的调用MqSplitUtil工具类将数据拆分中消息头和消息体,然后子系统中的MessageRecvImpl类由于继承了AbsMessageRecv并重写了receive(Message message)方法,便可以在这个方法内部实现相关的消息头判断逻辑,并根据IsqlOperation接口中定义的方法来实现对应的消息体入库操作。
总结
通过对MQ接收端的封装,我们可以快速的引入该封装架构,使子系统快速的建立起数据接收功能。