1.1 获取和调试RocketMQ的源代码

RocketMQ原先是阿里巴巴内部使用的消息中间件,于2017年提交到Apache基金会成为Apache基金会的顶级开源项目,GitHub代码库链接:https://github.com/apache/rocketmq.git。在Github网站上搜索RocketMQ,如图1-1所示。

图1-1 GitHub RocketMQ搜索界面

1.1.1 Eclipse获取RocketMQ源码

Step1:单击右键从菜单中选择import git,弹出如图1-2所示的对话框。

图1-2 Import对话框

Step2:点击Next按钮,弹出Projects from Git对话框,如图1-3所示。

图1-3 Import Projects from Git对话框

Step3:点击Next按钮,弹出Clone URI对话框,如图1-4所示。

图1-4 Import Projects from Git对话框

Step4:继续点击Next进入下一步,选择代码分支,如图1-5所示。

图1-5 Import Projects from Git对话框

Step5:选择所需要的分支后点击Next,进入代码存放目录选择,如图1-6所示。

图1-6 Import Projects from Git对话框

Step6:点击Next, Eclipse将从远程仓库下载代码,如图1-7所示。

图1-7 Import Projects from Git对话框

Step7:代码下载到指定目录后,默认选择Import existing projects(单分支),这里手动选择Import as general projects(多分支),点击Finish,成功导入,如图1-8所示。

图1-8 Import Projects from Git对话框

Step8:代码导入成功后,需要将项目转换成Maven项目,导入成功后的效果图,如图1-9所示。

图1-9 导入项目初始状态

Step9:单击右键从上下文菜单中选择rocketmq_new(文件下载目录名)→Configure→Convert and Detect Nested Projects转换成Maven项目,如图1-10所示。

图1-10 转换Maven项目

Step10:点击Finish执行Maven项目转换,完成RocketMQ的导入,如图1-11所示。

图1-11 转换Maven项目

转换过程中可能会弹出如图1-12所示提示框。

图1-12 转换Maven项目

解决办法有三种。

1)修改根pom.xml文件,找到如下条目,加上注释。

代码清单1-1 rocketmq根pom.xml文件

    <! --
        <plugin>
            <artifactId>maven-help-plugin</artifactId>
            <version>2.2</version>
            <executions>
                <execution>
                    <id>generate-effective-dependencies-pom</id>
                    <phase>generate-resources</phase>
                    <goals>
                        <goal>effective-pom</goal>
                    </goals>
                    <configuration>

                    <output>${project.build.directory}/effective-pom/effective-depende
                        ncies.xml</output>
                </configuration>
            </execution>
        </executions>
    </plugin>
    -->
    <! --
        <plugin>
            <artifactId>maven-surefire-plugin</artifactId>
            <version>2.19.1</version>
            <configuration>
                <forkCount>1</forkCount>
                <reuseForks>true</reuseForks>
            </configuration>
        </plugin>
    -->

2)注释remoting模块下pom.xml文件中部分代码。

代码清单1-2 rocketmq根pom.xml文件

    <! --
        <dependency>
            <groupId>io.netty</groupId>
            <artifactId>netty-tcnative</artifactId>
            <version>1.1.33.Fork22</version>
            <classifier>${os.detected.classifier}</classifier>
        </dependency>
    -->

3)右键一个项目,选择Maven→Update Project,如图1-13所示。

图1-13 更新Maven项目

1.1.2 Eclipse调试RocketMQ源码

本节将展示在Eclipse中启动NameServer、Broker,并运行消息发送与消息消费示例程序。

1.启动NameServer

Step1:展开namesrv模块,右键NamesrvStartup.java,移动到Debug As,选中Debug Configurations,弹出Debug Configurations对话框,如图1-14所示。

图1-14 选择Debug Configurations

Step2:选中Java Application条目并单击右键,选择New弹出Debug Configurations对话框,如图1-15所示。

图1-15 Debug Configurations, Create, manage, and run configurations

Step3:设置RocketMQ运行主目录。选择Environment选项卡,添加环境变量ROCKET_HOME。

Step4:在RocketMQ运行主目录中创建conf、logs、store三个文件夹,如图1-16所示。

图1-16 RocketMQ主目录

Step5:从RocketMQ distribution部署目录中将broker.conf、logback_broker.xml文件复制到conf目录中,logback_namesrv.xml文件则只需修改日志文件的目录,broker.conf文件内容如下所示。

代码清单1-3 broker.conf文件

        brokerClusterName=DefaultCluster
        brokerName=broker-a
        brokerId=0
        #nameServer地址,分号分割
        namesrvAddr=127.0.0.1:9876
        deleteWhen=04
        fileReservedTime=48
        brokerRole=ASYNC_MASTER
        flushDiskType=ASYNC_FLUSH
        #存储路径
        storePathRootDir=D:\\rocketmq\\store
        #commitLog存储路径
        storePathCommitLog=D:\\rocketmq\\store\\commitlog
        #消费队列存储路径
        storePathConsumeQueue=D:\\rocketmq\\store\\consumequeue
        #消息索引存储路径
        storePathIndex=D:\\rocketmq\\store\\index
        #checkpoint文件存储路径
        storeCheckpoint=D:\\rocketmq\\store\\checkpoint
        #abort文件存储路径
        abortFile=D:\\rocketmq\\store\\abort

Step6:在Eclipse Debug中运行NamesrvStartup,并输出“The Name Server boot success. Serializetype=JSON”。

2.启动Broker

Step1:展开broker模块,右键BrokerStartup.java,移动到Debug As,选中Debug Configurations,弹出如图1-17所示的对话框,选择arguments选项卡,配置-c属性指定broker配置文件路径。

图1-17 Debug Configurations, Create, manage, and run configurations

Step2:切换选项卡Environment,配置RocketMQ主目录,如图1-18所示。

图1-18 Debug Configurations, Create, manage, and run configurations

Step3:以Debug模式运行BrokerStartup.java,查看${ROCKET_HOME}/logs/broker. log文件,未报错则表示启动成功。

代码清单1-4 broker启动日志截图

        2018-03-22 20:47:29 INFO main - register broker to name server 127.0.0.1:9876 OK
        2018-03-22  20:47:29  INFO  main  -  The  broker[broker-a,  192.168.1.3:10911]  boot
    success. serializeType=JSON and name server is 127.0.0.1:9876
        2018-03-22  20:47:38  INFO  BrokerControllerScheduledThread1  -  dispatch  behind
    commit log 0 bytes
        2018-03-22  20:47:38  INFO  BrokerControllerScheduledThread1  -  Slave  fall  behind
    master: 0 bytes
        2018-03-22  20:47:39  INFO  BrokerControllerScheduledThread1  -  register  broker  to
    name server 127.0.0.1:9876 OK
        2018-03-22  20:48:09  INFO  BrokerControllerScheduledThread1  -  register  broker  to
    name server 127.0.0.1:9876 OK
        2018-03-22  20:48:37  INFO  BrokerControllerScheduledThread1  -  dispatch  behind
    commit log 0 bytes
        2018-03-22  20:48:37  INFO  BrokerControllerScheduledThread1  -  Slave  fall  behind
    master: 0 bytes
        2018-03-22  20:48:39  INFO  BrokerControllerScheduledThread1  -  register  broker  to
    name server 127.0.0.1:9876 OK
        2018-03-22  20:49:09  INFO  BrokerControllerScheduledThread1  -  register  broker  to
    name server 127.0.0.1:9876 OK

3.使用RocketMQ提供的实例验证消息发送与消息消费

Step1:修改org.apache.rocketmq.example.quickstart.Producer示例程序,设置消息生产者NameServer地址。

代码清单1-5 消息发送示例程序

        public class Producer {
            public static void main(String[] args) throws MQClientException,
                            InterruptedException {
                DefaultMQProducer producer = new
                            DefaultMQProducer("please_rename_unique_group_name");
                producer.setNamesrvAddr("127.0.0.1:9876");
                producer.start();
                for (int i = 0; i < 1; i++) {
                    try {
                        Message msg = new Message("TopicTest"/* Topic */, "TagA"/* Tag */,
                            ("Hello RocketMQ " + i).getBytes
                                  (RemotingHelper.DEFAULT_CHARSET)/* Message body */
                            );
                        SendResult sendResult = producer.send(msg);
                        System.out.printf("%s%n", sendResult);
                    } catch (Exception e) {
                        e.printStackTrace();
                        Thread.sleep(1000);
                    }
                }
                producer.shutdown();
        }
    }

Step2:运行该示例程序,查看运行结果,如果输出代码清单1-6所示结果则表示消息发送成功。

代码清单1-6 消息发送结果

        SendResult [sendStatus=SEND_OK, msgId=C0A8010325B46D06D69C70A211400000,
        offsetMsgId=C0A8010300002A9F0000000000000000, messageQueue=MessageQueue
        [topic=TopicTest, brokerName=broker-a, queueId=0], queueOffset=0]

Step3:修改org.apache.rocketmq.example.quickstart.Consumer示例程序,设置消息消费者NameServer地址。

代码清单1-7 消息消费示例程序

        public class Consumer {
            public static void main(String[] args) throws InterruptedException,
                    MQClientException {
                DefaultMQPushConsumer consumer = new
                    DefaultMQPushConsumer("please_rename_unique_group_name_4");
                consumer.setNamesrvAddr("127.0.0.1:9876");
                consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
                consumer.subscribe("TopicTest", "*");
                consumer.registerMessageListener(new MessageListenerConcurrently() {
                    public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
                            ConsumeConcurrentlyContext context) {
                        System.out.printf("%s Receive New Messages: %s %n",
                            Thread.currentThread().getName(), msgs);
                    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                }
            });
            consumer.start();
            System.out.printf("Consumer Started.%n");
            }
        }

Step4:运行消息消费者程序,如果输出如下所示则表示消息消费成功。

代码清单1-8 消息消费结果

        Consumer Started.
        ConsumeMessageThread_1 Receive New Messages: [MessageExt [queueId=0,
        storeSize=178, queueOffset=0, sysFlag=0, bornTimestamp=1521723269443,
        bornHost=/192.168.1.3:57034, storeTimestamp=1521723269510,
        storeHost=/192.168.1.3:10911, msgId=C0A8010300002A9F0000000000000000,
        commitLogOffset=0, bodyCRC=613185359, reconsumeTimes=0,
        preparedTransactionOffset=0, toString()=Message [topic=TopicTest, flag=0,
        properties={MIN_OFFSET=0, MAX_OFFSET=1, CONSUME_START_TIME=1521723841419,
        UNIQ_KEY=C0A8010325B46D06D69C70A211400000, WAIT=true, TAGS=TagA}, body=16]]]

消息发送与消息消费都成功,则说明RocketMQ调试环境已经成功搭建了,可以直接Debug源码,探知RocketMQ的实现奥秘了。

1.1.3 IntelliJ IDEA获取RocketMQ源码

Step1:在IntelliJ IDEA VCS菜单中选择Check from Version Control,再选择Git,如图1-19所示。

图1-19 Git clone对话框

Step2:在弹出的对话框中,URL输入RocketMQ源码地址,选择保存的本地路径,点击Clone,弹出Git Repository对话框,如图1-20所示。

图1-20 Git Repository对话框

状态栏有代码检出的进度,如图1-21所示。

图1-21 RocketMQ Clon进度条

Step3:检出完成会弹出提示框,选择Yes,如图1-22所示。

图1-22 Checkout From Version Control

Step4:在弹出框中选择Maven,点击Next,如图1-23所示。

图1-23 Import Project

Step5:勾选jdk8,点击Next,如图1-24所示。

图1-24 Import Project select profiles

Step6:下面2步都直接点击Next,如图1-25、图1-26和图1-27所示。

图1-25 Select maven projects to import

图1-26 Create a new Project

图1-27 Import RocketMQ

Step7:导入成功后,效果图如图1-28所示。

图1-28 RocketMQ项目结构

Step8:执行maven命令clean install,进行编译和下载依赖,下载完成后,可以看到控制台BUILD SUCCESS的提示信息,如图1-29所示。

图1-29 mvn clean install RocketMQ

1.1.4 IntelliJ IDEA调试RocketMQ源码

本节将展示在IntelliJ IDEA中启动NameServer、Broker,并运行消息发送与消息消费示例程序。

1.启动NameServer

Step1:展开namesrv模块,右键NamesrvStartup.java,移动到Debug As,选中Debug‘NamesrvStartup.java.main()',弹出如图1-30、图1-31所示的对话框。

图1-30 NamesrvStartup Debug

图1-31 NamesrvStartup Debug Configurations

Step2:点击Environment variables后面的按钮,弹出Environment variables对话框,如图1-32所示。

图1-32 Environment Variables列表

Step3:点击“+”号,在Name输入框中输入ROCKETMQ_HOME, Value输入源码的保存路径。点击OK,回到Debug Configurations界面,再点击OK,如图1-33所示。

图1-33 增加Rocket home环境变量

Step4:在RocketMQ运行主目录中创建conf、logs、store三个文件夹。

Step5:从RocketMQ distribution部署目录中将broker.conf、logback_broker.xml文件复制到conf目录中,logback_namesrv.xml文件,只需修改日志文件的目录,broker.conf文件目录内容代码清单1-9所示。

代码清单1-9 broker.conf文件

          brokerClusterName=DefaultCluster
          brokerName=broker-a
          brokerId=0
          #nameServer地址,分号分割
          namesrvAddr=127.0.0.1:9876
          deleteWhen=04
          fileReservedTime=48
          brokerRole=ASYNC_MASTER
          flushDiskType=ASYNC_FLUSH
          #存储路径
          storePathRootDir=D:\\rocketmq\\store
          #commitLog存储路径
          storePathCommitLog=D:\\rocketmq\\store\\commitlog
          #消费队列存储路径
          storePathConsumeQueue=D:\\rocketmq\\store\\consumequeue
          #消息索引存储路径
          storePathIndex=D:\\rocketmq\\store\\index
          #checkpoint文件存储路径
          storeCheckpoint=D:\\rocketmq\\store\\checkpoint
          #abort文件存储路径
          abortFile=D:\\rocketmq\\store\\abort

图1-34 设置环境变量

Step6:在IntelliJ IDEA Debug中运行NamesrvStartup,并输出“The Name Server boot success. Serializetype=JSON”。

2.启动Broker

Step1:展开broker模块,右键BrokerStartup.java执行,会提示需要配置ROCKETMQ_HOME。在idea右上角选中Debug Configu rations,在弹出的对话框中选择arguments选项卡,配置-c属性指定broker配置文件路径,如图1-34所示。

Step2:切换选项卡Environment,配置RocketMQ主目录和broker配置文件,如图1-35所示。

图1-35 运行或调试运行时的环境设置

Step3:以Debug模式运行Broker Startup.java,查看${ROCKET_HOME}/logs/broker. log文件,未报错则表示启动成功,如代码清单1-10所示。

代码清单1-10 broker启动日志截图

            2018-06-15 17:14:27 INFO PullRequestHoldService - PullRequestHoldService
        service started
            2018-06-15 17:14:28 INFO main - register broker to name server 127.0.0.1:9876
        OK
            2018-06-15 17:14:28 INFO main - The broker[broker-a, 192.168.41.1:10911] boot
        success. serializeType=JSON and name server is 127.0.0.1:9876
            2018-06-15 17:14:37 INFO BrokerControllerScheduledThread1- dispatch behind
        commit log 0 bytes
            2018-06-15 17:14:37 INFO BrokerControllerScheduledThread1- Slave fall behind
        master: 534 bytes
            2018-06-15 17:14:38 INFO BrokerControllerScheduledThread1- register broker to
        name server 127.0.0.1:9876 OK
            2018-06-15 17:14:41 INFO ClientManageThread_1- new consumer connected, group:
        please_rename_unique_group_name_4 CONSUME_PASSIVELY CLUSTERING channel:
        ClientChannelInfo [channel=[id: 0x5babb0b1, L:/192.168.41.1:10911-
        R:/192.168.41.1:50635], clientId=192.168.41.1@15140, language=JAVA,
        version=253, lastUpdateTimestamp=1529054081078]
            2018-06-15 17:14:41 INFO ClientManageThread_1- subscription changed, add new
        topic, group: please_rename_unique_group_name_4 SubscriptionData
        [classFilterMode=false, topic=%RETRY%please_rename_unique_group_name_4,
        subString=*, tagsSet=[], codeSet=[], subVersion=1529053720311,
        expressionType=null]
            2018-06-15 17:14:41 INFO ClientManageThread_1- subscription changed, add new
        topic, group: please_rename_unique_group_name_4 SubscriptionData
        [classFilterMode=false, topic=TopicTest, subString=*, tagsSet=[], codeSet=[],
        subVersion=1529053720326, expressionType=null]
            2018-06-15 17:14:41 INFO ClientManageThread_1- registerConsumer info changed
        ConsumerData [groupName=please_rename_unique_group_name_4,
        consumeType=CONSUME_PASSIVELY, messageModel=CLUSTERING,
        consumeFromWhere=CONSUME_FROM_FIRST_OFFSET, unitMode=false,
        subscriptionDataSet=[SubscriptionData [classFilterMode=false,
        topic=%RETRY%please_rename_unique_group_name_4, subString=*, tagsSet=[],
        codeSet=[], subVersion=1529053720311, expressionType=null], SubscriptionData
        [classFilterMode=false, topic=TopicTest, subString=*, tagsSet=[], codeSet=[],
        subVersion=1529053720326, expressionType=null]]] 192.168.41.1:50635
            2018-06-15 17:14:41 INFO ClientManageThread_1- new producer connected, group:
        CLIENT_INNER_PRODUCER channel: ClientChannelInfo [channel=[id: 0x5babb0b1,
        L:/192.168.41.1:10911- R:/192.168.41.1:50635], clientId=192.168.41.1@15140,
        language=JAVA, version=253, lastUpdateTimestamp=1529054081079]

3.使用RocketMQ提供的实例验证消息发送与消息消费

Step1:修改org.apache.rocketmq.example.quickstart.Producer示例程序,设置消息生产者NameServer地址。

代码清单1-11 消息发送示例程序

        public class Producer {
            public static void main(String[] args) throws MQClientException,
                            InterruptedException {
                DefaultMQProducer producer = new
                            DefaultMQProducer("please_rename_unique_group_name");
                producer.setNamesrvAddr("127.0.0.1:9876");
                producer.start();
                for (int i = 0; i < 1; i++) {
                    try {
                        Message msg = new Message("TopicTest" /* Topic */, "TagA" /* Tag */,
                            ("Hello RocketMQ " + i).getBytes
                                  (RemotingHelper.DEFAULT_CHARSET)/* Message body */
                            );
                        SendResult sendResult = producer.send(msg);
                        System.out.printf("%s%n", sendResult);
                    } catch (Exception e) {
                        e.printStackTrace();
                        Thread.sleep(1000);
        }
    }
    producer.shutdown();
}
}

Step2:运行该示例程序,查看运行结果,如果输出如下所示则表示消息发送成功。

代码清单1-12 消息发送结果

        SendResult [sendStatus=SEND_OK, msgId=C0A8006606EC18B4AAC24BC584450000,
        offsetMsgId=C0A8290100002A9F00000000000000B2, messageQueue=MessageQueue
        [topic=TopicTest, brokerName=broker-a, queueId=3], queueOffset=0]

Step3:修改org.apache.rocketmq.example.quickstart.Consumer示例程序,设置消息消费者NameServer地址。

代码清单1-13 消息消费示例程序

        public class Consumer {
            public static void main(String[] args) throws InterruptedException,
                        MQClientException {
                DefaultMQPushConsumer consumer = new
                        DefaultMQPushConsumer("please_rename_unique_group_name_4");
                consumer.setNamesrvAddr("127.0.0.1:9876");
                consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
                consumer.subscribe("TopicTest", "*");
                consumer.registerMessageListener(new MessageListenerConcurrently() {
                    public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
                            ConsumeConcurrentlyContext context) {
                        System.out.printf("%s Receive New Messages: %s %n",
                            Thread.currentThread().getName(), msgs);
                        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                    }
                });
                consumer.start();
                System.out.printf("Consumer Started.%n");
            }
        }

Step4:运行消息消费者程序,如果输出如下所示表示消息消费成功。

代码清单1-14 消息消费结果

        Consumer Started.
        ConsumeMessageThread_1 Receive New Messages: [MessageExt [queueId=0,
        storeSize=178, queueOffset=1, sysFlag=0, bornTimestamp=1529053736201,
        bornHost=/192.168.41.1:50331, storeTimestamp=1529053736210,
        storeHost=/192.168.41.1:10911, msgId=C0A8290100002A9F0000000000000164,
        commitLogOffset=356, bodyCRC=613185359, reconsumeTimes=0,
        preparedTransactionOffset=0, toString()=Message [topic=TopicTest, flag=0,
        properties={MIN_OFFSET=0, MAX_OFFSET=2, CONSUME_START_TIME=1529053736226,
        UNIQ_KEY=C0A800662C8C18B4AAC24BC70D080000, WAIT=true, TAGS=TagA}, body=16]]]

消息发送与消息消费都成功,则说明RocketMQ调试环境已成功搭建,可以通过Debug调试源码,探知RocketMQ的实现奥秘了。