2.2 NameServer启动流程

从源码的角度窥探一下NameServer启动流程,重点关注NameServer相关启动参数。

NameServer启动类:org.apache.rocketmq.namesrv.NamesrvStartup。

Step1:首先来解析配置文件,需要填充NameServerConfig、NettyServerConfig属性值。

代码清单2-1 NameServer加载配置文件

        final NamesrvConfig namesrvConfig = new NamesrvConfig();
        final NettyServerConfig nettyServerConfig = new NettyServerConfig();
        nettyServerConfig.setListenPort(9876);
        if (commandLine.hasOption('c')) {
            String file = commandLine.getOptionValue('c');
            if (file ! = null) {
                InputStream in = new BufferedInputStream(new FileInputStream(file));
                properties = new Properties();
                properties.load(in);
                MixAll.properties2Object(properties, namesrvConfig);
                MixAll.properties2Object(properties, nettyServerConfig);
                namesrvConfig.setConfigStorePath(file);
                System.out.printf("load config properties file OK, " + file + "%n");
                in.close();
            }
        }
        if (commandLine.hasOption('p')) {
            MixAll.printObjectProperties(null, namesrvConfig);
            MixAll.printObjectProperties(null, nettyServerConfig);
            System.exit(0);
        }
        MixAll.properties2Object(ServerUtil.commandLine2Properties(commandLine),
                    namesrvConfig);

从代码我们可以知道先创建NameServerConfig(NameServer业务参数)、NettyServer-Config(NameServer网络参数),然后在解析启动时把指定的配置文件或启动命令中的选项值,填充到nameServerConfig, nettyServerConfig对象。参数来源有如下两种方式。

1)-c configFile通过-c命令指定配置文件的路径。

2)使用“--属性名 属性值”,例如 --listenPort 9876。

代码清单2-2 NameServerConfig属性

        private String rocketmqHome = System.getProperty(MixAll.ROCKETMQ_HOME_PROPERTY,
                System.getenv(MixAll.ROCKETMQ_HOME_ENV));
        private String kvConfigPath = System.getProperty("user.home") + File.separator
                + "namesrv" + File.separator + "kvConfig.json";
        private String configStorePath = System.getProperty("user.home") +
            File.separator + "namesrv" + File.separator +
                    "namesrv.properties";
        private String productEnvName = "center";
        private boolean clusterTest = false;
        private boolean orderMessageEnable = false;

□ rocketmqhome: rocketmq主目录,可以通过-Drocketmq.home.dir=path或通过设置环境变量ROCKETMQ_HOME来配置RocketMQ的主目录。

□ kvConfigPath:NameServer存储KV配置属性的持久化路径。

□ configStorePath:nameServer默认配置文件路径,不生效。nameServer启动时如果要通过配置文件配置NameServer启动属性的话,请使用-c选项。

□ orderMessageEnable:是否支持顺序消息,默认是不支持。

代码清单2-3 NettyServerConfig属性

    private int listenPort = 8888;
    private int serverWorkerThreads = 8;
    private int serverCallbackExecutorThreads = 0;
    private int serverSelectorThreads = 3;
    private int serverOnewaySemaphoreValue = 256;
    private int serverAsyncSemaphoreValue = 64;
    private int serverChannelMaxIdleTimeSeconds = 120;
    private int serverSocketSndBufSize = NettySystemConfig.socketSndbufSize;
    private int serverSocketRcvBufSize = NettySystemConfig.socketRcvbufSize;
    private boolean serverPooledByteBufAllocatorEnable = true;
    private boolean useEpollNativeSelector = false;

□ listenPort:NameServer监听端口,该值默认会被初始化为9876。

□ serverWorkerThreads:Netty业务线程池线程个数。

□ serverCallbackExecutorThreads:Netty public任务线程池线程个数,Netty网络设计,根据业务类型会创建不同的线程池,比如处理消息发送、消息消费、心跳检测等。如果该业务类型(RequestCode)未注册线程池,则由public线程池执行。

□ serverSelectorThreads:IO线程池线程个数,主要是NameServer、Broker端解析请求、返回相应的线程个数,这类线程主要是处理网络请求的,解析请求包,然后转发到各个业务线程池完成具体的业务操作,然后将结果再返回调用方。

□ serverOnewaySemaphoreValue:send oneway消息请求并发度(Broker端参数)。

□ serverAsyncSemaphoreValue:异步消息发送最大并发度(Broker端参数)。

□ serverChannelMaxIdleTimeSeconds:网络连接最大空闲时间,默认120s。如果连接空闲时间超过该参数设置的值,连接将被关闭。

□ serverSocketSndBufSize:网络socket发送缓存区大小,默认64k。

□ serverSocketRcvBufSize:网络socket接收缓存区大小,默认64k。

□ serverPooledByteBufAllocatorEnable:ByteBuffer是否开启缓存,建议开启。

□ useEpollNativeSelector:是否启用Epoll IO模型,Linux环境建议开启。

小技巧

在启动NameServer时,可以先使用./mqnameserver -c configFile -p打印当前加载的配置属性。

Step2:根据启动属性创建NamesrvController实例,并初始化该实例,NameServerController实例为NameServer核心控制器。

代码清单2-4 NamesrvController#Initialize代码片段

        public boolean initialize() {
            this.kvConfigManager.load();
            this.remotingServer = new NettyRemotingServer(this.nettyServerConfig,
                    this.brokerHousekeepingService);
            this.remotingExecutor =
                Executors.newFixedThreadPool(nettyServerConfig.getServerWorkerThreads(),
                new ThreadFactoryImpl("RemotingExecutorThread_"));
            this.registerProcessor();
            this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
                    public void run() {
                        NamesrvController.this.routeInfoManager.scanNotActiveBroker();
                    }
                }, 5, 10, TimeUnit.SECONDS);

              this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
                    public void run() {
                        NamesrvController.this.kvConfigManager.printAllPeriodically();
                }
            }, 1, 10, TimeUnit.MINUTES);

            return true;
        }

加载KV配置,创建NettyServer网络处理对象,然后开启两个定时任务,在RocketMQ中此类定时任务统称为心跳检测。

□ 定时任务1:NameServer每隔10s扫描一次Broker,移除处于不激活状态的Broker。

□ 定时任务2:nameServer每隔10分钟打印一次KV配置。

Step3:注册JVM钩子函数并启动服务器,以便监听Broker、消息生产者的网络请求。

代码清单2-5 注册JVM钩子函数代码

        Runtime.getRuntime().addShutdownHook(new ShutdownHookThread(log, new
            Callable<Void>() {
                public Void call() throws Exception {
                    controller.shutdown();
                    return null;
                }
            }));
            controller.start();

这里主要是向读者展示一种常用的编程技巧,如果代码中使用了线程池,一种优雅停机的方式就是注册一个JVM钩子函数,在JVM进程关闭之前,先将线程池关闭,及时释放资源。