应用分享 | 分布式消息队列RocketMQ


应用名称: RocketMQ
简述: 分布式消息队列 RocketMQ。
NameServer源码: https://github.com/GLYASAI/rocketmq-namesrv.git
Broker源码: https://github.com/GLYASAI/rocketmq-broker.git
Console-ng源码: https://github.com/apache/rocketmq-externals/tree/master/rocketmq-console
----------

一. 关于RocketMQ

上图是一个典型的消息中间件收发消息的模型,RocketMQ也是这样的设计,简单说来,RocketMQ具有以下特点:

  • 是一个队列模型的消息中间件,具有高性能、高可靠、高实时、分布式特点。
  • Producer、Consumer、队列都可以分布式。
  • Producer向一些队列轮流发送消息,队列集合称为Topic,Consumer如果做广播消费,则一个 consumer实例消费这个Topic对应的所有队列,如果做集群消费,则多个Consumer实例平均消费这个topic对应的队列集合。
  • 能够保证严格的消息顺序
  • 提供丰富的消息拉取模式
  • 高效的订阅者水平扩展能力
  • 实时的消息订阅机制
  • 亿级消息堆积能力
  • 较少的依赖

二. 制作步骤

2.1 编写Dockerfile

这里以Broker作为例子, 进行说明:

1.  FROM openjdk:8-jdk-alpine
2.  LABEL maintainer=huangrh@goodrain.com
3.  
4.  ARG ROCKETMQ_VERSION=4.3.0
5.  ARG ROCKETMQ_HOME=/export/servers/rocketmq
6.  ARG BASE_URL=http://mirror.bit.edu.cn/apache/rocketmq
7. 
8.  RUN apk add --no-cache curl
9.  RUN mkdir -p ${ROCKETMQ_HOME} \
10.  && curl -fsSL -o /tmp/rocketmq.zip ${BASE_URL}/${ROCKETMQ_VERSION}/rocketmq-all-${ROCKETMQ_VERSION}-bin-release.zip \
11.  && unzip -o /tmp/rocketmq.zip -d ${ROCKETMQ_HOME} \
12.  && mv ${ROCKETMQ_HOME}/rocketmq-all*/* ${ROCKETMQ_HOME} \
13.  && rm -rf ${ROCKETMQ_HOME}/rocketmq-all* \
14.  && rm -f /tmp/rocketmq.zip \
15.  && rm -rf ${ROCKETMQ_HOME}/bin/runbroker.sh
16.
17. VOLUME ${ROCKETMQ_HOME}/data
18.
19. EXPOSE 10909 10911
20. 
21. COPY runbroker.sh ${ROCKETMQ_HOME}/bin/runbroker.sh
22. COPY memset.sh ${ROCKETMQ_HOME}/bin/memset.sh
23. COPY broker-entrypoint.sh ${ROCKETMQ_HOME}/bin/broker-entrypoint.sh
24. 
25. RUN chmod +x ${ROCKETMQ_HOME}/bin/runbroker.sh \
26.  && chmod +x ${ROCKETMQ_HOME}/bin/memset.sh \
27.  && chmod +x ${ROCKETMQ_HOME}/bin/broker-entrypoint.sh
28. 
29. WORKDIR ${ROCKETMQ_HOME}/bin
30. 
31. ENTRYPOINT ["./broker-entrypoint.sh"]

**1-6:**拉取基础镜像openjdk:8-jdk-alpine, 并用一些构建参数来设置默认值.
**8-15:**下载RocketMQ的二进制文件.
**17:**指定持久化目录为${ROCKETMQ_HOME}/data.
**19:**暴露端口10909和10911, 10909用于broker-master, 10911用于broker-slave.
**21-27:**将3个脚本(runbroker.sh, memset.sh, broker-entrypoint.sh, 请到github查看源码)拷贝到镜像中, 并设置为可执行文件.
29-31: 设置工作目录为${ROCKETMQ_HOME}/bin, 并执行启动脚本broker-entrypoint.sh.

脚本 memset.sh 是Rainbond为了设置JVM内存而制作的,通过将该脚本加入Broker的启动脚本(runbroker.sh),生成环境变量 ${JAVA_OPTS} 。这样RocketMQ启动时,JVM内存就会随着Rainbond应用设置的内存大小而改变,防止了内存泄露的风险。

因为Name Server读取的是Broker所在容器的IP, 我们无法通过容器的IP与Broker通信, 所以要使用broker-entrypoint.sh指定Broker的IP为Broker所在机器的内网IP.
注意, 只有在内网中, 才能与Broker通信.

2.2 创建Name Server

通过源码(源码地址)创建的方式, 创建Name Server:

Rainbond创建应用详细教程: 创建一个应用
如何制作一个可分享的云市应用?

高级设置-基本属性


环境变量ROCKETMQ_VERSION用于指定RocketMQ的版本.

高级设置-部署属性


Name Server需要的内在不多, 128M足够了.

2.3 创建Broker

通过源码(源码地址)创建的方式, 创建Broker:

高级设置-基本属性


指定Name Server的地址为127.0.0.1:9876.

添加服务依赖namesrv.

高级设置-部署属性


由于Broker是RocketMQ的核心, 大部分"重量级"的工作都是由Broker来完成的, 包括接收Producer发过来的消息, 处理Consumer的消费消息请求, 消息的持久化存储, 消息的HA机制以及服务端过滤功能等, 所以需要给Broker分配尽可能多的内存.

2.4 创建Console-ng

通过Docker镜像styletang/rocketmq-console-ng创建Console-ng:

高级设置-基本属性


开放对外服务.

设置JAVA_OPTS为-Drocketmq.namesrv.addr=127.0.0.1:9876 -com.rocketmq.sendMessageWithVIPChannel=false.

添加服务依赖namesrv.

2.5 验证

进入容器

发送和接收消息

> export NAMESRV_ADDR=localhost:9876
 > sh tools.sh org.apache.rocketmq.example.quickstart.Producer
 SendResult [sendStatus=SEND_OK, msgId= ...

 > sh tools.sh org.apache.rocketmq.example.quickstart.Consumer
 ConsumeMessageThread_%d Receive New Messages: [MessageExt...

通过console-ng进行监控

可以看到, console-ng监控到了Broker的消息变化.

高可用

TODO

不错,很详细的分享