RocketMQ六nameserve

文章来源:一氧化碳中毒   发布时间:2021-6-20 19:23:42   点击数:
  ios开发求职招聘微信群 http://www.ktyx.com.cn/lehuo/baike/20210206/778.html

走过路过不要错过

点击蓝字      1.zk存在大量的集群间通信;      2.zk是一个比较重的组件,而本身就作为消息中间的mq,则最好不好另外再依赖其他组件;(个人感觉)      3.zk对于数据的固化能力比较弱,配置往往受限于zk的数据格式;

总体来说,可能就是rocketmq想要做的功能在zk上不太好做,或者做起来也费劲,或者太重,索性就不要搞了。自己搞一个完全定制化的好了。事实上,rocketmq的nameserver也实现得相当简单轻量。这也是设计者的初衷吧。

2.nameserver的启动流程解析

一般地,一个框架级别的服务启动,还是有些复杂的,那样的话,我们懒得去看其具体过程。但前面说了,nameserver实现得非常轻量级,所以,其启动也就相当简单。所以,我们可以快速一览其过程。   整个nameserver的启动类是org.apache.rocketmq.namesrv.NamesrvStartup,工作过程大致如下:

//入口mainpublicstaticvoidmain(String[]args){main0(args);}publicstaticNamesrvControllermain0(String[]args){try{//创建本服务的核心控制器,解析各种配置参数,默认值之类的NamesrvControllercontroller=createNamesrvController(args);//开启服务,如打开start(controller);Stringtip="TheNameServerbootsuccess.serializeType="+RemotingCommand.getSerializeTypeConfigInThisServer();log.info(tip);System.out.printf("%s%n",tip);returncontroller;}catch(Throwablee){e.printStackTrace();System.exit(-1);}returnnull;}

所以整个启动过程,基本就是一个Controller搞定了,你说不简单吗?额,也许不一定!整个创建Controller的过程就是解析参数的过程,有兴趣可以打开如下代码看看:

publicstaticNamesrvControllercreateNamesrvController(String[]args)throwsIOException,JoranException{System.setProperty(RemotingCommand.REMOTING_VERSION_KEY,Integer.toString(MQVersion.CURRENT_VERSION));//PackageConflictDetect.detectFastjson();Optionsoptions=ServerUtil.buildCommandlineOptions(newOptions());      1.配置信息kv的操作;      2.broker上下线管理操作;      3.topic路由信息管理服务;

各自实现当然是按照业务处理,本无需多说,但为了解概要,我们还是挑一个重点来说说吧:broker的上线处理注册:

//为保持前沿起见,咱们以高版本服务展开思路(即版本大于3.0.11)publicRemotingCommandregisterBrokerWithFilterServer(ChannelHandlerContextctx,RemotingCommandrequest)throwsRemotingCommandException{finalRemotingCommandresponse=RemotingCommand.createResponseCommand(RegisterBrokerResponseHeader.class);finalRegisterBrokerResponseHeaderresponseHeader=(RegisterBrokerResponseHeader)response.readCustomHeader();finalRegisterBrokerRequestHeaderrequestHeader=(RegisterBrokerRequestHeader)request.decodeCommandCustomHeader(RegisterBrokerRequestHeader.class);if(!checksum(ctx,request,requestHeader)){response.setCode(ResponseCode.SYSTEM_ERROR);response.setRemark("crc32notmatch");returnresponse;}RegisterBrokerBodyregisterBrokerBody=newRegisterBrokerBody();if(request.getBody()!=null){try{registerBrokerBody=RegisterBrokerBody.decode(request.getBody(),requestHeader.isCompressed());}catch(Exceptione){thrownewRemotingCommandException("FailedtodecodeRegisterBrokerBody",e);}}else{registerBrokerBody.getTopicConfigSerializeWrapper().getDataVersion().setCounter(newAtomicLong(0));registerBrokerBody.getTopicConfigSerializeWrapper().getDataVersion().setTimestamp(0);}//重点实现:registerBrokerRegisterBrokerResultresult=this.namesrvController.getRouteInfoManager().registerBroker(requestHeader.getClusterName(),requestHeader.getBrokerAddr(),requestHeader.getBrokerName(),requestHeader.getBrokerId(),requestHeader.getHaServerAddr(),registerBrokerBody.getTopicConfigSerializeWrapper(),registerBrokerBody.getFilterServerList(),ctx.channel());responseHeader.setHaServerAddr(result.getHaServerAddr());responseHeader.setMasterAddr(result.getMasterAddr());byte[]jsonValue=this.namesrvController.getKvConfigManager().getKVListByNamespace(NamesrvUtil.NAMESPACE_ORDER_TOPIC_CONFIG);response.setBody(jsonValue);response.setCode(ResponseCode.SUCCESS);response.setRemark(null);returnresponse;}//org.apache.rocketmq.namesrv.routeinfo.RouteInfoManager#registerBrokerpublicRegisterBrokerResultregisterBroker(finalStringclusterName,finalStringbrokerAddr,finalStringbrokerName,finallongbrokerId,finalStringhaServerAddr,finalTopicConfigSerializeWrappertopicConfigWrapper,finalListStringfilterServerList,finalChannelchannel){RegisterBrokerResultresult=newRegisterBrokerResult();try{try{//上锁更新各表数据this.lock.writeLock().lockInterruptibly();//集群名表SetStringbrokerNames=this.clusterAddrTable.get(clusterName);if(null==brokerNames){brokerNames=newHashSetString();this.clusterAddrTable.put(clusterName,brokerNames);}brokerNames.add(brokerName);booleanregisterFirst=false;//broker详细信息表BrokerDatabrokerData=this.brokerAddrTable.get(brokerName);if(null==brokerData){registerFirst=true;brokerData=newBrokerData(clusterName,brokerName,newHashMapLong,String());this.brokerAddrTable.put(brokerName,brokerData);}MapLong,StringbrokerAddrsMap=brokerData.getBrokerAddrs();//Switchslavetomaster:firstremove1,IP:PORTinnamesrv,thenadd0,IP:PORT//ThesameIP:PORTmustonlyhaveonerecordinbrokerAddrTableIteratorEntryLong,Stringit=brokerAddrsMap.entrySet().iterator();while(it.hasNext()){EntryLong,Stringitem=it.next();if(null!=brokerAddrbrokerAddr.equals(item.getValue())brokerId!=item.getKey()){it.remove();}}StringoldAddr=brokerData.getBrokerAddrs().put(brokerId,brokerAddr);registerFirst=registerFirst

(null==oldAddr);if(null!=topicConfigWrapperMixAll.MASTER_ID==brokerId){if(this.isBrokerTopicConfigChanged(brokerAddr,topicConfigWrapper.getDataVersion())

registerFirst){//首次注册或者topic变更,则更新topic信息ConcurrentMapString,TopicConfigtcTable=topicConfigWrapper.getTopicConfigTable();if(tcTable!=null){for(Map.EntryString,TopicConfigentry:tcTable.entrySet()){this.createAndUpdateQueueData(brokerName,entry.getValue());}}}}//存活的broker信息表BrokerLiveInfoprevBrokerLiveInfo=this.brokerLiveTable.put(brokerAddr,newBrokerLiveInfo(System.currentTimeMillis(),topicConfigWrapper.getDataVersion(),channel,haServerAddr));if(null==prevBrokerLiveInfo){log.info("newbrokerregistered,{}HAServer:{}",brokerAddr,haServerAddr);}if(filterServerList!=null){if(filterServerList.isEmpty()){this.filterServerTable.remove(brokerAddr);}else{this.filterServerTable.put(brokerAddr,filterServerList);}}//slave节点注册需绑定masterAddr返回if(MixAll.MASTER_ID!=brokerId){StringmasterAddr=brokerData.getBrokerAddrs().get(MixAll.MASTER_ID);if(masterAddr!=null){BrokerLiveInfobrokerLiveInfo=this.brokerLiveTable.get(masterAddr);if(brokerLiveInfo!=null){result.setHaServerAddr(brokerLiveInfo.getHaServerAddr());result.setMasterAddr(masterAddr);}}}}finally{this.lock.writeLock().unlock();}}catch(Exceptione){log.error("registerBrokerException",e);}returnresult;}

好吧,是不是很抽象。没关系,能知道大概意思就行了。大体上就是broker上线了,nameserver需要知道这些事,要把这信息加入到各表项中,以备将来使用。具体理解我们应该要从业务性质出发才能透彻。反正就和咱们平时写业务代码并无二致。

4.topic存储位置策略

nameserver除了有注册broker的核心作用外,还有一个非常核心的作用就是,为各消费者或生产者提供各topic信息所在位置。这个位置决定了数据如何存储以及如何访问问题,只要这个决策出问题,则整个集群的可靠性就无法保证了。所以,这个点需要我们深入理解下。

在kafka中,其存储策略是和shard强相关的,一个topic分配了多少shard就决定了它可以存储到几个机器节点上,即kafka是以shard作为粒度分配存储的。

但rocketmq中则不太一样,类似的概念有:topic是最外层的存储,而messageQueue则是内一层的存储,它是否是按照topic存储或者按照msgQueue存在呢?实际上,在官方文档中,已经描述清楚了:Broker在实际部署过程中对应一台服务器,每个Broker可以存储多个Topic的消息,每个Topic的消息也可以分片存储于不同的Broker。MessageQueue用于存储消息的物理地址,每个Topic中的消息地址存储于多个MessageQueue中。

即rocketmq中是以messagequeue作为最细粒度的存储的,实际上这基本无悬念,因为分布式存储需要。(试想以topic为存储粒度会带来多少问题就知道了)

那么,它又是如何划分哪个messagequeue存储在哪里的呢?

//RequestCode.GET_ROUTEINFO_BY_TOPICpublicRemotingCommandgetRouteInfoByTopic(ChannelHandlerContextctx,RemotingCommandrequest)throwsRemotingCommandException{finalRemotingCommandresponse=RemotingCommand.createResponseCommand(null);finalGetRouteInfoRequestHeaderrequestHeader=(GetRouteInfoRequestHeader)request.decodeCommandCustomHeader(GetRouteInfoRequestHeader.class);//获取topic路由信息TopicRouteDatatopicRouteData=this.namesrvController.getRouteInfoManager().pickupTopicRouteData(requestHeader.getTopic());if(topicRouteData!=null){//顺序消费配置if(this.namesrvController.getNamesrvConfig().isOrderMessageEnable()){StringorderTopicConf=this.namesrvController.getKvConfigManager().getKVConfig(NamesrvUtil.NAMESPACE_ORDER_TOPIC_CONFIG,requestHeader.getTopic());topicRouteData.setOrderTopicConf(orderTopicConf);}byte[]content=topicRouteData.encode();response.setBody(content);response.setCode(ResponseCode.SUCCESS);response.setRemark(null);returnresponse;}response.setCode(ResponseCode.TOPIC_NOT_EXIST);response.setRemark("Notopicrouteinfoinnameserverforthetopic:"+requestHeader.getTopic()+FAQUrl.suggestTodo(FAQUrl.APPLY_TOPIC_URL));returnresponse;}//org.apache.rocketmq.namesrv.routeinfo.RouteInfoManager#pickupTopicRouteDatapublicTopicRouteDatapickupTopicRouteData(finalStringtopic){TopicRouteDatatopicRouteData=newTopicRouteData();booleanfoundQueueData=false;booleanfoundBrokerData=false;SetStringbrokerNameSet=newHashSetString();ListBrokerDatabrokerDataList=newLinkedListBrokerData();topicRouteData.setBrokerDatas(brokerDataList);HashMapString,ListStringfilterServerMap=newHashMapString,ListString();topicRouteData.setFilterServerTable(filterServerMap);try{try{this.lock.readLock().lockInterruptibly();//获取所有topic的messageQueue信息ListQueueDataqueueDataList=this.topicQueueTable.get(topic);if(queueDataList!=null){topicRouteData.setQueueDatas(queueDataList);foundQueueData=true;IteratorQueueDatait=queueDataList.iterator();while(it.hasNext()){QueueDataqd=it.next();brokerNameSet.add(qd.getBrokerName());}//根据brokerName,查找broker信息,如果没找到说明该broker可能已经下线,不能算在路由信息内for(StringbrokerName:brokerNameSet){BrokerDatabrokerData=this.brokerAddrTable.get(brokerName);if(null!=brokerData){BrokerDatabrokerDataClone=newBrokerData(brokerData.getCluster(),brokerData.getBrokerName(),(HashMapLong,String)brokerData.getBrokerAddrs().clone());brokerDataList.add(brokerDataClone);//只要找到一个broker就可以进行路由处理foundBrokerData=true;for(finalStringbrokerAddr:brokerDataClone.getBrokerAddrs().values()){ListStringfilterServerList=this.filterServerTable.get(brokerAddr);filterServerMap.put(brokerAddr,filterServerList);}}}}}finally{this.lock.readLock().unlock();}}catch(Exceptione){log.error("pickupTopicRouteDataException",e);}log.debug("pickupTopicRouteData{}{}",topic,topicRouteData);//只有队列信息和broker信息都找到时,整个路由信息才可返回if(foundBrokerDatafoundQueueData){returntopicRouteData;}returnnull;}//QueueData作为路由信息的重要组成部分,其数据结构如下publicclassQueueDataimplementsComparableQueueData{privateStringbrokerName;privateintreadQueueNums;privateintwriteQueueNums;privateintperm;privateinttopicSynFlag;...}//brokerData数据结构如下publicclassBrokerDataimplementsComparableBrokerData{privateStringcluster;privateStringbrokerName;privateHashMapLong/*brokerId*/,String/*brokeraddress*/brokerAddrs;...}

ok,从上面的实现中,我们可以看到,查找路由信息,是根据topic进行查找的。而topic信息保存在topicQueueTable中。这里有个重要点是,整个路由查找过程,居然和queueId是无关的,那么它又是如何定位queueId所在的位置呢?另外,这个topicQueTable里的数据又是何时维护的呢?

首先,对于topicQueueTable的维护,是在broker注册和解注册时维护的,这很好理解。

//也就前面看到的broker为master节点时的createAndUpdateQueueData()privatevoidcreateAndUpdateQueueData(finalStringbrokerName,finalTopicConfigtopicConfig){QueueDataqueueData=newQueueData();queueData.setBrokerName(brokerName);queueData.setWriteQueueNums(topicConfig.getWriteQueueNums());queueData.setReadQueueNums(topicConfig.getReadQueueNums());queueData.setPerm(topicConfig.getPerm());queueData.setTopicSynFlag(topicConfig.getTopicSysFlag());ListQueueDataqueueDataList=this.topicQueueTable.get(topicConfig.getTopicName());//topic的首个brokerif(null==queueDataList){queueDataList=newLinkedListQueueData();queueDataList.add(queueData);this.topicQueueTable.put(topicConfig.getTopicName(),queueDataList);log.info("newtopicregistered,{}{}",topicConfig.getTopicName(),queueData);}else{booleanaddNewOne=true;IteratorQueueDatait=queueDataList.iterator();//添加一个brokerwhile(it.hasNext()){QueueDataqd=it.next();if(qd.getBrokerName().equals(brokerName)){if(qd.equals(queueData)){addNewOne=false;}else{log.info("topicchanged,{}OLD:{}NEW:{}",topicConfig.getTopicName(),qd,queueData);it.remove();}}}if(addNewOne){queueDataList.add(queueData);}}}

但针对queueId又是何时进行处理的呢?看起来nameserver不得而知。

事实上,数据发送到哪个broker或从哪个broker上进行数据消费,是由各客户端根据策略决定的。比如在producer中是这样处理的:

//org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl#sendDefaultImplprivateSendResultsendDefaultImpl(Messagemsg,finalCommunicationMode

转载请注明:http://www.lwblm.com/bytj/12023.html
  • 上一篇文章:
  • 下一篇文章: 没有了