工程结构

此处利用RocketMQ先构建一个消息生产者体系,因此,如下结构图,不涉及消费者体系。完整的工程结构不应该如此结构图这般。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
.
├── build.gradle
├── mq-lib.iml
└── src
└── main
└── java
└── cn
└── diudiu
└── mq
├── config
│   ├── DiuDiuMqConfig.java
│   └── DiuDiuMqProducerConfig.java
├── message
│   ├── IMessageHandler.java
│   ├── Message.java
│   └── body
│   ├── LoginMessage.java
│   └── UserMessage.java
├── producer
│   ├── IMessageCallBack.java
│   ├── IProducer.java
│   ├── DiuDiuMqProducer.java
│   └── DiuDiuSendResult.java
├── rocketmq
│   └── producer
│      ├── DiuDiuDefaultMessageCallBack.java
│      └── DiuDiuRocketMqProducer.java
├── selector
│   └── DiuDiuMessageQueueSelector.java
└── topic
├── OrderTopic.java
└── UserTopic.java

依赖管理

在build.gradle中引入rocketMq的依赖。

1
2
3
4
dependencies {
compile 'com.alibaba.rocketmq:rocketmq-client:3.2.6'
}

集群配置

MQ集群的配置,多台集群之间使用逗号分隔。后期应该可以增加list配置。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
/**
* 配置MQ的集群地址
*/
public class DiuDiuMqConfig {
// 配置集群地址,多个地址可用逗号分隔
private String clusterAddresses;

public String getClusterAddresses() {
return clusterAddresses;
}

public String setClusterAddresses(String clusterAddresses) {
this.clusterAddresses = clusterAddresses;
}
}

生产者

生成者配置

生产者配置类,需要指定生产者的名字,发送超时时间(默认3000ms),消息压缩(默认超过4k才压缩)以及大小限制(默认最大128k),发送失败重试次数(默认2次)等参数。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
pulic class DiuDiuMqProducerConfig extends DiuDiuMqConfig {
private static final int DEFAULT_SEND_MSG_TIMEOUT = 3000; // 默认发送消息超时时间

// 生产者名字
private String producerName;
// 发送超时时间
private int sendMsgTimeout = DEFAULT_SEND_MSG_TIMEOUT;
// 消息超过多大开始压缩,默认是4K
private int compressMsgBodyOverHowMuch = 1024 * 4;
// 发送失败重试次数
private int retryTimesWhenSendFailed = 2;
// 消息大小限制,默认128K
private int maxMessageSize = 1024 * 128;

// 省略getter和setter
// ……
}

生产消息

消息体

封装一个包含topic、key以及消息体Body的Message,主要用于发送消息的接口可以将消息组装好,调用IProducer.send接口发送即可。

为了方便对消息传输,该类中增加了依据ProtostuffSerialize序列化对象的方法encode。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
public class Message<T extends Base> extends Base{

private static final Log LOGGER = LogFactory.getLog(Message.class);

private String topic;
private String key;
private T body;

public Message() {
}

private static final Map<Class, Object> SERIALIZERS = new ConcurrentHashMap<>();

// 省略getter和setter
// ……

@SuppressWarnings("unchecked")
private ProtostuffSerialize<T> getSerialize() {
Class clazz = this.body.getClass();
if (SERIALIZERS.containsKey(clazz)) {
return (ProtostuffSerialize) SERIALIZERS.get(clazz);
}
ProtostuffSerialize serialize = ProtostuffSerialize.getInstance(clazz);
SERIALIZERS.put(clazz, serialize);
return serialize;
}

public byte[] encoder() {
return getSerialize().encoder(body);
}
}

接口定义

消息发送方式分为两类:有序发送(sendByOrder)和随机发送(send)。这样做的目的也是为了消息消费时可能需需求有序消费或随机消费。

关于消息有序消费请看下面的有序消息队列。

清单:IProducer.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public interface IProducer {
// 发送消息
void send(Message message);
// 顺序发送消息
void sendByOrder(Integer sequence, Message message);

// 直接发送字节
void send(String topic, String key, byte[] message);
// 顺序发送字节
void sendByOrder(Integer sequence, String topic, String key, byte[] message);

//发送消息,同时指定异步回调方法
void send(Message message, IMessageCallBack callBack);
//发送消息,同时指定异步回调方法
void send(String topic, String key, byte[] message, IMessageCallBack callBack);
}

实现生产

清单:DiuDiuMqProducer.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
public class DiuDiuMqProducer extends DiuDiuMqProducerConfig implements IProducer {
private DiuDiuRocketMqProducer delegate = new DiuDiuRocketMqProducer();
@Override
public void send(Message message) {
delegate.send(message);
}

@Override
public void sendOrder(Integer sequence, Message message) {
delegate.send(sequence, message);
}

@Override
public void send(String topic, String key, byte[] message) {
delegate.send(topic, key, message);
}

@Override
public void sendOrder(Integer sequence, String topic, String key, byte[] message) {
delegate.send(sequence, topic, key, message);
}

@Override
public void send(String topic, String key, byte[] message, IMessageCallBack callBack) {
delegate.send(topic, key, message, callBack);
}

@Override
public void send(Message message, IMessageCallBack callBack) {
delegate.send(message, callBack);
}

public void init() {
delegate.init(this);
}

public void destory() {
delegate.destroy();
}
}

清单:DiuDiuRocketMqProducer.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
public class DiuDiuRocketMqProducer {
private Logger logger = LoggerFactory.getLogger(DiuDiuRocketMqProducer.class);

public DefaultMQProducer producer;
private DiuDiuDefaultMessageCallBack callBack = new DiuDiuDefaultMessageCallBack();
private DiuDiuMessageQueueSelector selector = new DiuDiuMessageQueueSelector();


public DiuDiuRocketMqProducer() {

}

public void send (Message message) {

}

public void send(String topic, String key, byte[] message) {
Message msg = new Message(); // com.alibaba.rocketmq.common.message.Message

msg.setBody(message);
msg.setTopic(topic);
msg.setKey(key);
String uniqueId = "1234444"; // 根据请求上下文生成唯一码
DiuDiuSendResult diuDiuSendResult = new DiuDiuSendResult(topic, uniqueId);
try {
logger.debug("发送消息:topic " + topic + ", key " + key);
producer.send(msg, new SendCallBack() {
@Override
public void onSuccess(SendResult sendResult) {
// 发送成功后,将结果通过回调函数进行确认处理
diuDiuSendResult.setMsgId(sendResult.getMsgId());
diuDiuSendResult.setMessageQueue(diuDiuSendResult.getMessageQueue());
diuDiuSendResult.setQueueOffset(sendResult.getQueueOffset());
callBack.onSendSuccess(diuDiuSendResult);
}

@Override
public void onException(Throwable e) {
diuDiuSendResult.setException(e);
callBack.onSendFaild(diuDiuSendResult);
}
});
} catch (MQClientException e) {
e.printStackTrace();
} catch (RemotingException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}

public void send(String topic, String key, byte[] message, final IMessageCallBack messageCallBack) {
Message msg = new Message(); // com.alibaba.rocketmq.common.message.Message

msg.setBody(message);
msg.setTopic(topic);
msg.setKey(key);
String uniqueId = "1234444"; // 根据请求上下文生成唯一码
DiuDiuSendResult diuDiuSendResult = new DiuDiuSendResult(topic, uniqueId);
try {
logger.debug("发送消息:topic " + topic + ", key " + key);
producer.send(msg, new SendCallBack() {
@Override
public void onSuccess(SendResult sendResult) {
// 发送成功后,将结果通过回调函数进行确认处理
diuDiuSendResult.setMsgId(sendResult.getMsgId());
diuDiuSendResult.setMessageQueue(diuDiuSendResult.getMessageQueue());
diuDiuSendResult.setQueueOffset(sendResult.getQueueOffset());
messageCallBack.onSendSuccess(diuDiuSendResult);
}

@Override
public void onException(Throwable e) {
diuDiuSendResult.setException(e);
messageCallBack.onSendFaild(diuDiuSendResult);
}
});
} catch (MQClientException e) {
e.printStackTrace();
} catch (RemotingException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}

public void send(Integer sequence, DiuDiuMessageQueueSelector selector, String topic, String key, byte[] message) {
Message msg = new Message(); // com.alibaba.rocketmq.common.message.Message

msg.setBody(message);
msg.setTopic(topic);
msg.setKey(key);

String uniqueId = "1234444"; // 根据请求上下文生成唯一码
DiuDiuSendResult diuDiuSendResult = new DiuDiuSendResult(topic, uniqueId);

try {
logger.debug("发送消息:topic " + topic + ", key " + key);
producer.send(msg, selector, sequence, new SendCallBack() {
@Override
public void onSuccess(SendResult sendResult) {
// 发送成功后,将结果通过回调函数进行确认处理
diuDiuSendResult.setMsgId(sendResult.getMsgId());
diuDiuSendResult.setMessageQueue(diuDiuSendResult.getMessageQueue());
diuDiuSendResult.setQueueOffset(sendResult.getQueueOffset());
messageCallBack.onSendSuccess(diuDiuSendResult);
}

@Override
public void onException(Throwable e) {
diuDiuSendResult.setException(e);
messageCallBack.onSendFaild(diuDiuSendResult);
}
});
} catch (MQClientException e) {
e.printStackTrace();
} catch (RemotingException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}

public void send(Message message) { // Message 是自定义,不是rocket的Message
send(message.getTopic(), message.getKey(), message.encoder());
}

public void send(Message message, IMessageCallBack callBack) { // Message 是自定义,不是rocket的Message
send(message.getTopic(), message.getKey(), message.encoder(), callBack);
}

public void send(Integer sequence, String topic, String key, byte[] message) {
send(sequence, selector, topic, key, message);
}

public void send(Integer sequence, Message message) {
send(sequence, selector, message.getTopic(), message.getKey(), message.encoder());
}

public void init(DiuDiuMqProducerConfig config) {
producer = new DefaultMQProducer();
producer.setNamesrvAddr(config.getClusterAddresses());
producer.setProducerGroup(config.getProducerName());
producer.setInstanceName(config.getProducerName() + new Random(100).nextInt());
try {
producer.start();
} catch (MQClientException e) {
e.printStackTrace();
}
}

public void destroy() {
producer.shutdown();
}
}

发送消息回调处理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public class DiuDiuDefaultMessageCallBack implements IMessageCallBack {
Logger interfaceLogger = LoggerFactory.getLogger(LoggerName.INTERFACE);
Logger mqLogger = LoggerFactory.getLogger(LoggerName.MQ);

@Override
public void onSendSuccess(DiuDiuSendResult diuDiuSendResult) {
String loggerMsg = "send message successfully, " +
"topic " + diuDiuSendResult.getTopic() + " msgId " + diuDiuSendResult.getMsgId() + " queue " + diuDiuSendResult.getMessageQueue()
+ " offset " + diuDiuSendResult.getQueueOffset();
interfaceLogger.info(loggerMsg,diuDiuSendResult.getUniqueId());
mqLogger.info(loggerMsg,diuDiuSendResult.getMsgId());
}

@Override
public void onSendFaild(DiuDiuSendResult diuDiuSendResult) {
String loggerMsg = "send message faild, cause by " + diuDiuSendResult.getException().getMessage();
interfaceLogger.info(loggerMsg,diuDiuSendResult.getUniqueId());
}
}

DiuDiuSendResult

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
public class DiuDiuSendResult {
//产生该消息的uniqueId
private String uniqueId;
//该消息对应topic
private String topic;
//该消息对应的msgId
private String msgId;
//队列信息
private MessageQueue messageQueue;
//offset
private long queueOffset;
//失败异常
private Throwable exception;

public DiuDiuSendResult(String topic, String uniqueId) {
this.topic = topic;
this.uniqueId = uniqueId;
}
// 省略getter和setter
// ……
}

有序消息队列

1
2
3
4
5
6
7
8
9
public class DiuDiuMessageQueueSelector implements MessageQueueSelector {
@Override
public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
Integer id = (Integer) arg;
int index = id % mqs.size();
return mqs.get(index);
}
}