Springboot2.0中封装RabbitMQ到Maven私服

起因

由于未来我们的系统都是由一个统一的用户管理系统来管理用户数据的,所以各个子系统中不需要也不可以随意的新增或修改用户的个人信息,鉴于此,我考虑通过MQ的方式,实现在用户系统中添加、删除或修改用户信息时同时将这条操作记录发送给该用户拥有权限的子系统中去。

有图更好理解:

logo

既然说到MQ,则需要简单的介绍并了解一下MQ了,下文摘自百度百科:

MQ全称为Message Queue, 消息队列(MQ)是一种应用程序对应用程序的通信方法。应用程序通过读写出入队列的消息(针对应用程序的数据)来通信,而无需专用连接来链接它们。消息传递指的是程序之间通过在消息中发送数据进行通信,而不是通过直接调用彼此来通信,直接调用通常是用于诸如远程过程调用的技术。排队指的是应用程序通过 队列来通信。队列的使用除去了接收和发送应用程序同时执行的要求。其中较为成熟的MQ产品有IBM WEBSPHERE MQ等等。

经过各种文章的对比特性以及我们使用的业务场景等综合分析,最后选择了RabbitMQ作为我们的消息队列,至于对比的文章也都是百度的,有兴趣的可以自行查找。

教程

RabbitMQ的基础教程我就不写了,也是跟着往上的前辈分享边学边看的,在这里我分享一下:

首先是RabbitMQ的安装:

windows下 安装 rabbitMQ 及操作常用命令

Rabbitmq——用户管理

其次是RabbitMQ的基础:

【译】RabbitMQ教程一

【译】RabbitMQ教程二

【译】RabbitMQ教程三

【译】RabbitMQ教程四

【译】RabbitMQ教程五

【译】RabbitMQ教程六

然后学习RabbitMQ与Springboot的集成:

springboot(集成篇):RabbitMQ集成详解

spring boot实战(第十二篇)整合RabbitMQ

最后学习一下消息消费机制在Springboot下的实现:

学习Spring Boot:(二十六)使用 RabbitMQ 消息队列(消息确认ack,rpc)

业务

学习完上面的教程后,基本上对于RabbitMQ的基础操作就算是了解了,现在我们来结合我们的业务,考虑一下具体的实现内容和实现方式。

我们加入消息队列最核心的目的就是要做一个基础的数据同步:

当用户管理系统中的用户有增加、删除、修改的时候,我们可以通过消息队列将该次操作发给对应的子系统,子系统接到该操作指令后,同步的维护子系统的用户表即可。

既然核心目的知道了,那么实现内容就是分别要在用户管理系统中加入一个MQ的消息发送端、在各个子系统中加入消息的接收端,接收端接收到消息后执行对应的数据库操作,那么这里其实还隐藏着一个问题,随着子系统越来越多,如果不降接收端封装成公用模块的话,将会面临的是灾难性的维护成本,所以还必须考虑将接收端封装成Maven私服的包,并维护其版本。

实现

首先我们要定义一个接口,用来规范数据库的常用操作:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
/**
* @author 贾真
* 数据库操作规范的常用接口定义
* @param <T>
*/

public interface IsqlOperation<T> {
/**
* 接到数据添加用户
* @param t
*/

void addUser(T t);

/**
* 接到数据修改用户
* @param t
*/

void updateUser(T t);

/**
* 接到数据删除用户
* @param t
*/

void deleteUser(T t);

}

之后我们要定义一个父类,这个父类用来规范操作流程,并连接RabbitMQ的相关设置:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
/**
* @author 贾真
* 这个是MQ接收类的父类
*
* */

public class AbsMessageRecv implements ChannelAwareMessageListener {
protected static final Logger log = LoggerFactory.getLogger(AbsMessageRecv.class);
protected static final Gson gson =new Gson();
@Value("${mq.queuename}")
private String queuename;
public String commandOrder;
public String commandBody;

public String getCommandBody(String command){
return MqSplitUtil.SplitCommandBody(command);
}

public String getCommandOrder(String command){
return MqSplitUtil.SplitCommandOrder(command);
}


@Override
public void onMessage(Message message, Channel channel) throws Exception {
try {
receive(message);
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}catch (IOException e){
e.printStackTrace();
}
}

public void receive(Message message)throws Exception {
String messageString=new String(message.getBody());
log.info(">>>>>>> receive: " + messageString);
commandOrder=this.getCommandOrder(messageString);
commandBody=this.getCommandBody(messageString);
}

@Bean
public SimpleMessageListenerContainer SimpleContainer(ConnectionFactory connectionFactory) {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
container.setQueueNames(queuename);
container.setMessageListener(this);
container.setExposeListenerChannel(true);
container.setMaxConcurrentConsumers(Constant.MAX_CONCURRENT_CONSUMERS);
container.setConcurrentConsumers(Constant.CONCURRENT_CONSUMERS);
// 设置为手动,默认为 AUTO,如果设置了手动应答 basicack,就要设置manual
container.setAcknowledgeMode(AcknowledgeMode.MANUAL);
return container;
}
}

我在这里讲消息规范为带消息头和消息体的类型,中间用 “|” 进行分割,消息头主要放的是操作形式,如:add、update和delete,消息体放的就是用户信息的bean转化的json串了。

举例来说:add|{“username”:”jason”,”password”:”123456”}

一旦消息格式确定了,那么就需要定义一个工具类来处理这种消息类型:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
/**
* @author 贾真
* 负责处理MQ发送过来的消息
* 其格式为add|{json对象}
* 消息头为add/update/delete分别代表新增、修改和删除操作
* 消息体为json对象 代表的是某个用户的用户信息
*/

public class MqSplitUtil {
private static final Logger log = LoggerFactory.getLogger(MqSplitUtil.class);
private MqSplitUtil(){
}

/**
* 传输命令的通用校验规则
* @param command
*/

private static void checkCommand(String command){
if(command==null||"".equals(command)){
log.error(Constant.MQEXCEPTION_101_INFO+"Command值:"+command);
throw new MQException(Constant.MQEXCEPTION_101_CODE,Constant.MQEXCEPTION_101_INFO);
}
if(!command.contains("|")){
log.error(Constant.MQEXCEPTION_102_INFO+"Command值:"+command);
throw new MQException(Constant.MQEXCEPTION_102_CODE,Constant.MQEXCEPTION_102_INFO);
}
}
/**
* 获取MQ消息的消息体
* @param command
* @return
*/

public static String SplitCommandBody(String command){
checkCommand(command);
return command.substring(command.indexOf("|")+1, command.length());
}
/**
* 获取MQ消息的消息头
* @param command
* @return
*/

public static String SplitCommandOrder(String command){
checkCommand(command);
return command.substring(0,command.indexOf("|"));
}
public static void main(String[] args){
System.out.println(SplitCommandOrder("add|{adb@|dddasdcc}"));
System.out.println(SplitCommandBody("aaa|{adb@|dddasdcc}"));
/* Gson gson=new Gson();
User user =new User();
user.setName("张三");
user.setPassword("123456");
user.setSex("男");
System.out.println(gson.toJson(user));*/

}

}

为了规范由于消息不规范带来的错误,我也设置了一个基础的异常类:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
/**
* 自定义异常类(继承运行时异常)
* @author 贾真
*/

public class MQException extends RuntimeException{

private static final long serialVersionUID = 1L;

/**
* 错误编码
*/

private String errorCode;

/**
* 消息是否为属性文件中的Key
*/

private boolean propertiesKey = true;

/**
* 构造一个基本异常.
*
* @param message
* 信息描述
*/

public MQException(String message)
{

super(message);
}

/**
* 构造一个基本异常.
*
* @param errorCode
* 错误编码
* @param message
* 信息描述
*/

public MQException(String errorCode, String message)
{

this(errorCode, message, true);
}

/**
* 构造一个基本异常.
*
* @param errorCode
* 错误编码
* @param message
* 信息描述
*/

public MQException(String errorCode, String message, Throwable cause)
{

this(errorCode, message, cause, true);
}

/**
* 构造一个基本异常.
*
* @param errorCode
* 错误编码
* @param message
* 信息描述
* @param propertiesKey
* 消息是否为属性文件中的Key
*/

public MQException(String errorCode, String message, boolean propertiesKey)
{

super(message);
this.setErrorCode(errorCode);
this.setPropertiesKey(propertiesKey);
}

/**
* 构造一个基本异常.
*
* @param errorCode
* 错误编码
* @param message
* 信息描述
*/

public MQException(String errorCode, String message, Throwable cause, boolean propertiesKey)
{

super(message, cause);
this.setErrorCode(errorCode);
this.setPropertiesKey(propertiesKey);
}

/**
* 构造一个基本异常.
*
* @param message
* 信息描述
* @param cause
* 根异常类(可以存入任何异常)
*/

public MQException(String message, Throwable cause)
{

super(message, cause);
}

public String getErrorCode()
{

return errorCode;
}

public void setErrorCode(String errorCode)
{

this.errorCode = errorCode;
}

public boolean isPropertiesKey()
{

return propertiesKey;
}

public void setPropertiesKey(boolean propertiesKey)
{

this.propertiesKey = propertiesKey;
}

}

以及公用数据类,主要放置的是命令头、异常信息代码和信息体以及消息队列的参数设置:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
/**
* @author 贾真
*/

public class Constant {
/**
* 操作命令头
*/

public static final String COMMAND_ADD="add";
public static final String COMMAND_UPDATE="update";
public static final String COMMAND_DELETE="delete";

/**
* 异常信息
*/

public static final String MQEXCEPTION_101_CODE="101";
public static final String MQEXCEPTION_101_INFO="MQ数据为空,请查验!";
public static final String MQEXCEPTION_102_CODE="102";
public static final String MQEXCEPTION_102_INFO="MQ数据格式不包含关键符号,请查验!";
public static final String MQEXCEPTION_103_CODE="103";
public static final String MQEXCEPTION_103_INFO="找不到对应的命令头,请检查发送数据!";


/**
* 消息队列最大消费数和默认消费数
*/

public static final int MAX_CONCURRENT_CONSUMERS=1;
public static final int CONCURRENT_CONSUMERS=1;


/**
* 队列是否持久化
*/

public static final boolean QUEUE_DURABLE = true;
/**
* 仅创建者可以使用的私有队列,断开后自动删除
*/

public static final boolean QUEUE_EXCLUSIVE = false;
/**
* 当所有消费客户端连接断开后,是否自动删除队列
*/

public static final boolean QUEUE_AUTO_DELETE = false;

/**
* 交换器是否持久化
*/

public static final boolean EXCHANGE_DURABLE = true;

/**
* 当所有消费客户端连接断开后,是否自动删除队列
*/

public static final boolean EXCHANGE_AUTO_DELETE = false;


}

然后是消息队列的基础设置:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
/**
* @author 贾真
*/

@Configuration
@Component
public class RabbitConfig {
/**
* 队列名称
*/

@Value("${mq.queuename}")
private String queuename;
/**
* 交换器名称
*/

@Value("${mq.exchangename}")
private String exchangename;
/**
* 路由键
*/

@Value("${mq.routingkey}")
private String routingkey;
@Bean
public Queue queue() {
return new Queue(queuename, Constant.QUEUE_DURABLE, Constant.QUEUE_EXCLUSIVE, Constant.QUEUE_AUTO_DELETE);
}

@Bean
public TopicExchange exchange() {
return new TopicExchange(exchangename, Constant.EXCHANGE_DURABLE, Constant.EXCHANGE_AUTO_DELETE);

}
@Bean
public Binding binding(Queue queue, TopicExchange exchange) {
return BindingBuilder.bind(queue).to(exchange).with(routingkey);
}

}

这里可以发现我用到了自定义属性,目的是子系统引用这个封装包后,可以自定义自己的队列名称、交换器名称和路由键名称,做到灵活处理各子系统的差异性(注意:下面注释的是子系统需要分别设置的属性):

1
2
3
4
5
6
7
8
9
10
11
12
spring.rabbitmq.host=192.168.3.134
spring.rabbitmq.port=5672
spring.rabbitmq.username=admin
spring.rabbitmq.password=dhcc_mq
# 开启确认消息是否到达交换器,需要设置true
spring.rabbitmq.publisher-confirms=true
# 开启确认消息是否到达队列,需要设置true
spring.rabbitmq.publisher-returns=true

#mq.queuename=com.example.recv1
#mq.exchangename=demo-exchanges1
#mq.routingkey=route-key1

当然,为了将json串转化为对象,子系统本地还需要自定义一个Bean来接收数据,我这里只是提供了一个测试的Bean:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
/**
* @author 贾真
*/

public class TestMQUser {

String name;
String password;
String sex;

public String getName() {
return name;
}

public void setName(String name) {
this.name = name;
}

public String getPassword() {
return password;
}

public void setPassword(String password) {
this.password = password;
}

public String getSex() {
return sex;
}

public void setSex(String sex) {
this.sex = sex;
}
}

辅助工作基本上都做完了,下面是如何使用了,根据我的设计,其实非常简单了,只需要继承AbsMessageRecv类病实现IsqlOperation接口,并传入对应的泛型类就可以了,请参看代码(这个类是在子系统中实现的,前提是要将消息队列接收端引入到你的项目中去):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
public class MessageRecvImpl extends AbsMessageRecv implements IsqlOperation<TestMQUser> {
protected static final Logger log = LoggerFactory.getLogger(MessageRecvImpl.class);
@Override
public void receive(Message message) throws Exception {
super.receive(message);
switch (commandOrder){
case Constant.COMMAND_ADD:
addUser(gson.fromJson(commandBody,TestMQUser.class));
break;
case Constant.COMMAND_UPDATE:
updateUser(gson.fromJson(commandBody,TestMQUser.class));
break;
case Constant.COMMAND_DELETE:
deleteUser(gson.fromJson(commandBody,TestMQUser.class));
break;
default:
log.error(Constant.MQEXCEPTION_103_INFO+"Command Body值:"+commandBody);
throw new MQException(Constant.MQEXCEPTION_103_CODE,Constant.MQEXCEPTION_103_INFO);
}
}
@Override
public void addUser(TestMQUser user) {
log.info("新增用户:"+user.getName());
}

@Override
public void updateUser(TestMQUser user) {
log.info("修改用户:"+user.getName());
}

@Override
public void deleteUser(TestMQUser user) {
log.info("删除用户:"+user.getName());
}

}

整个逻辑就是:

由于AbsMessageRecv父类中的container.setMessageListener(this);的设置,所以接收到的消息会最先进入到父类中ChannelAwareMessageListener接口的实现方法onMessage中,在该方法中我又调用了本类的receive(Message message)方法对数据进行处理,首先会默认的调用MqSplitUtil工具类将数据拆分中消息头和消息体,然后子系统中的MessageRecvImpl类由于继承了AbsMessageRecv并重写了receive(Message message)方法,便可以在这个方法内部实现相关的消息头判断逻辑,并根据IsqlOperation接口中定义的方法来实现对应的消息体入库操作。

总结

通过对MQ接收端的封装,我们可以快速的引入该封装架构,使子系统快速的建立起数据接收功能。

热评文章