RocketMQ源码分析之主从数据复制_第1页
RocketMQ源码分析之主从数据复制_第2页
RocketMQ源码分析之主从数据复制_第3页
RocketMQ源码分析之主从数据复制_第4页
RocketMQ源码分析之主从数据复制_第5页
已阅读5页,还剩50页未读 继续免费阅读

付费下载

下载本文档

版权说明:本文档由用户提供并上传,收益归属内容提供方,若内容存在侵权,请进行举报或认领

文档简介

RocketMQ源码分析之主从数据复制

前言

在RocketMQ主从架构中master和slave之间会进行数据同步,其中数据同步

包括元数据复制和commitlog复制,那么为什么同步的数据中不包括

consumequeue和indexFile呢?这里大家可以思考下:master节点上

consumequeue和indexFile是根据commitlog构建的,所以slave在同步完

commitlog后只需要根据commitlog构建consumequeue和indexFile即可。本

篇文章就来分析下master和slave之间是如何进行数据同步?

一、元数据复制

1.元数据复制入口在RocketMQ中的主从架构中,在启动slave节点的过程中

会启动一个定时任务,该定时任务的功能是从master节点获取元数据。具体如

下:

privatevoidhandleSlaveSynchronize(BrokerRclerole)

if(role==BrokerRole.SLAVE){

if(null!=slaveSyncFuture)

slaveSyncFuture.cancel(false);

this.slaveSynchronize.setMasterAddr(null);

slaveSyncFuture=this.scheduledExecutorSei

卜ice.scheduleAtFixedRate(newRunnable()

©Override

publicvoidrun()

try

BrokerController.this

.slaveSynchronize.syncAll();

catch(Throwablee)

log,error("Scheduled]

askSlaveSynchronizesyncAllerror.,f,e);

|},1000*3,1000*10,TimeUnit.MILLl|

}else

if(null!=slaveSyncFuture)

slaveSyncFuture.cancel(false);

this.slaveSynchronize.setMasterAddr(null);

)ublicvoidsyncAll()

this.syncTopicConfigO

this.syncConsumerOffset()

this.syncDelayOf^ZTTTTT^^^^^^^I

this.syncSubscriptionGroupConfig();

2.元数据复制都包含哪些内容?

从syncAHO方法可以看出元数据复制主要包含以下文件:

(1)topics.json:topic配置文件

(2)consumerOffset.json:consumer消费进度文件

(3)delayOffset.json:延迟消息拉取进度

(4)subscriptionGroup.json:consumerGroup酉己置文件

3.元数据同步流程

在RocketMQ中四种元数据文件同步的流程是一样的,这里以topics,json为例

来分析其流程。从上面syncAll()方法可知:topic配置文件的同步入口是

syncTopicConfigO方法,具体如下:

|privatevoidsyncTopicConfigO

|StringmasterAddrBak=this,master

|if(masterAddrBak!=null&&!masterAddrBak,equa|

|ls(brokerController.getBrokerAddr()))

I

|erOuterAPI().getAllTopicConf

|if(!this.brokerController.getTopic||

|onfigManager().getDataVersion()

|,equals(topicWrapper.getDataV|

^ConfigManager().getDataVersion()

I

|apper.getDataVersion());

this.brokerController.getTopi

|this.brokerController.getTopi|

|.putAll(topicWrapper.|

|this.brokerController.getTopi|

|cConfigManager().persist();

log.error(wSyncTopicConfigExceptior

首先slave会通过getAlITopicConfig方法以同步调用的方式向master发送

RequestCode.GET_ALL_TOPIC_CONFIG请求来获取topic配置文件信息。

ublicTopicConfigSerializeWrappergetAlITopicConfig(

finalStringaddr)throwsRemotingConnectExceptio

RemotingSendRequestException,

RemotingTimeoutException,InterruptedException,MQB|

rokerException{

RemotingCommandrequest=RemotingConimand.createRe

questCommand(RequestCode.GET_ALL_TOPIC_CONFIG,null);

RemotingCommandresponse=this.remotingClient.inv

okeSync(MixAll.brokerVIPChannel(true,addr),request,3000);

assertresponse!=null;

switch(response.getCode()){

caseResponseCode.SUCCESS:

returnTopicConfigSerializeWrapper.d

ecode(response.getBody(),TopicConfigSerializeWrapper.class);

default:

break;

thrownewMQBrokerException(response.getCode(),re

ponse.getRemark(),addr);

master在收到slave端的请求后会在AdminBrokerProcessor中进行处理,具

体是调用getAllTopicConfig方法来处理,其处理过程就是将master端的

topicConfigTable和dataVersion编码成json字符串并返回给slave。

[privateRemotingCommandgetAllTopicConfig(ChannelHandlerContextct|

[x,RemotingCommandrequest)

|finalRemotingCommandresponse=RemotingCommand.|

|createResponseCommand(GetAllTopicConfigResponseHeader.class)

|Stringcontent=this.brokerController.getTopicCon|

|figManager().encode();

(content!=&&content,length()0)

try

response.setBody(content.getBytes(Mi

All.DEFAULT_CHARSET));

}catch(UnsupportedEncodingExceptione)

log.error(MH,e);

response.setCode(ResponseCode.SYSTEM

|response.setRemark("UnsupportedEncodi

|ngException“+e);

__________________________________returriresponse;

}else

|log,error("Notopicinthisbroker,clie|

|nt:{}",ctx.channel().remoteAddress());

|response.setRemark("Notopicinthisbrok|

('I।il!':i-「)5!、”、:

response.setCode(ResponseCode.SUCCESS);

response.setRemark(null)

returnresponse;

slave在收到master返回的数据后会先判断本地的dateVersion与master返

回的是否一样,如果不一样则会进行以下操作:

(1)更新slave的dataVersion

(2)清空slave端的topicConfigTable并将master返回的数据写入

(3)将topic配置进行持久化

最后用下图来总结下整个流程:

RocketMQ其余的元数据同步过程与上图一样只是发送的请求类型不一样,在阅

读源码时我有注意到一个问题:在同步topic配置文件时采用的是VIP通道

(即连接的是master的10909端口),而在同步其余三种元数据时采用的是

10911端口,那么问题就是其余三种元数据在同步时为什么采用的是10911而

不是10909?我在GitHub上开了一个issue,如果大家有兴趣可以一起讨论。

这里我认为所有的元数据同步应该都使用10909端口,所以在GitHub提了一个

pr来修复该问题。

二、commitlog复制

commitlog复制相关服务是如何被启动的呢?broker在启动过程中会启动

DefaultMessageStore,在启动DefaultMessageStore的过程中会判断broker

是否启用了Dledger,如果没有启动则会启动HAService,具体如下:

)ublicvoidstart()throwsException

lock=lockFile.getChannel(),tryLock(0,1,false)

f(lock==null||lock.isSharedO|!lock.isValidO){

thrownewRuntimeException(MLockfailed,MQalreadystarted");

lockFile.getChannel().write(ByteBuffer.wrap("lock”.getBytes()));

longmaxPhysicalPosInLogicQueue=commitLog.getMinOffset()

for(ConcurrentMapVInteger,ConsumeQueue>maps:this.consumeQueue|

Table,values

for(ConsumeQueuelogic:maps,values。)

if(logic.getMaxPhysicOffset()>maxPhysicalPosInLogicQueue){|

maxPhysicalPosInLogicQueue=logic.getMaxPhysicOffset();

if(maxPhysicalPosInLogicQueue<0){|

maxPhysicaIPos1nLogicQueue=();H|

if(!messageStoreConfig.isEnableDLegerCommitLog()){

this.haService.start();

this.handleScheduleMessageService(messageStoreConfig.getBrokerR

)le());

this,flushConsumeQueueService.start();

this.storeStatsService.start();

this.addScheduleTask();

this,shutdown=false;

master和slave之间commitlog复制的整个过程如下:

1.启动master并监听slave的连接

2.启动slave,建立与master的连接

3.slave向master发送待拉取数据的物理偏移量

4.master根据待拉取数据的物理偏移量打包数据并发给slave

5.slave同步master发送的数据并唤醒reputMessageService服务构建

consumequeue和indexFile

下面详细分析master与slave的交互及commitlog同步过程

1.启动master并监听slave连接

master的启动过程如下,这里与master相关的是acceptSocketService和

groupTransferService,其中groupTransferService与commitlog的同步复制

相关,后面会详细说明。acceptSocketService主要负责master端监听slave

连接。

[publicvoidstart()throwsException

|this.acceptSocketService.beginAccept();

|this.acceptSocketService.s+qr+C

//groupTransferService与commitlog同步

this.groupTransferService.start();

this.haClient.start();

publicvoidbeginAccept()throwsException

this.serverSocketChannel=ServerSocketChannel.open()

this,selector二RemotingUtil.openSelector()

this.serverSocketChannel.socket(),setReuseAddress(true)

this.serverSocketChannel.socket().bind(this,socketAddressListen

B

this.serverSocketChannel.register(this.selector,SelectionKey.Of

(2)acceptSocketService.start()该函数的具体流程如下:

publicvoidrun()

10g.info(this.getServiceName()+“servicestarted");

while(!this.isStopped。)

try{

Set<SelectionKey>selected=this,selector.selectedKeys();

|for(SelectionKeyk:selected)

|Socketchannelsc=((ServerSocketChannel)k.channel()).a|

|HAService.log.info("HAServicereceivenewconnection,

|+sc.socket().getRemoteSocketAddress());

HAConnectionconn=newHAConnection(HAService.this,

corm,start();

HAService.this.addConnection(conn)

log,error("newHAConnectionexception”,e);

sc.close();

log.warn("Unexpectedopsinselect"+k.readyOps());

selected,clear();

log.error(this.getServiceNameO+“servicehasexception.n,e)

log.info(this.getServiceName()+“serviceend");

上面过程中有个重要的过程就是启动了HAConnection,HAConnection表示的是

master与slave之间的网络连接,在HAConnection中有两个重要的对象分别

是readSocketService和writeSocketService,其中readSocketService是

master读取实现类,writeSocketService是master写实现类,具体如下:

rnblicvoidstart()

this.readSocketService.start();

this.writeSocketService.start();

2.启动slave,建立与master连接及向master发送待拉取数据的物理偏移量

slave启动过程主要启动的是HAClient,具体如下:

[publievoidrun()

|log,info(this.getServiceName()+"servicestarted")

|if(this.connectMaster())

|booleanresult=this,reportSlaveMaxOffset(this.currentRep|

this,selector,select(1000)

booleanok=this.processReadEvent();

:♦J.(【)"'」心"「()

MMMMW

[10nginterval

|HAService.this.getDefaultMessageStore(),getSystemClock()[

|-this.

|if(interval〉HAService.this.getDefaultMessageStore(),getMe|

bsageStoreConfig()

|.getHaHousekeepinglnterval

|log,warn("HAC1ient,housekeeping,foundthisconnection],+|

this.masterAddress

+”]expired,"+interval);

this.closeMaster();

log.warn(nHAClient,masternotresponsesometime,soclose

connection");

}else{

this.waitForRunning(1000*5);

}catch(Exceptione){

log.warn(this.getServiceName()+“servicehasexception.”,e);

this,waitForRunning(1000*5);

log,info(this.getServiceName()+“serviceend");

这里需要详细看几个比较重要的点:

(1)slave连接master完成slave连接master的函数是connectMaster,该

函数主要完成以下操作:

・获取master的地址并连接,这里需要注意在开始时将通道设置为阻塞

的,在connect完成后又将连接通道设置为非阻塞的

[publicstaticSocketChannelconnect(SocketAddressremote,finalinttimeo|

|SocketCharmelsc二mil1;

|sc二SocketCharmel.open();

|sc.configureBlocking(true);

|sc.socket().setSoLinger(fa卜a-1)

|sc.socket(),setTcpNoDelay(true);

|sc.socket().connect(remote,timeoutMillis)

Isc.configureBlocking(false);

|returnsc;

|}catch(Exceptione)

|sc.close();

|}catch([OExceptionel)

returnnull;

•注册OP_READ事件

•获取slave端commitlog的最大物理偏移量并缓存在

currentReportedOffset

•更新lastWriteTimestamp,lastWriteTimestamp的作用是用来计算

master和slave之间同步的时间间隔

privatebooleanconnectMaster()throwsClosedChannelException

|if(null■二二socketChannel)

|Stringaddr二this.masterAddress.get();

|SocketAddresssocketAddress=RemotingLti1.string2SocketAddre|

|ss(addr);

|if(socketAddress!=null)

|二;..、()(-k(—;udR(——〔iLL)nn(、(「〔(s()(「kc——s);■

|if(this,socketchannel!=null)

|this.socketChannel.register(this,selector,SelectionKey.0|

pREAD);

|this.currentReportedOffset=HAService.this.defaultMessageStore|

this,lastWriteTimestamp=System,current!imeMiHis();

returnthis,socketchannel!=null;

(2)isTimeToReportOffset()isTimeToReportOffset()方法的作用是判断是

否向master汇报slave端commitlog的最大物理偏移量,判断依据是计算当前

时间与lastWriteTimestamp的时间间隔,如果该时间间隔大于

haSendHeartbeatInterval(默认是5秒,可以在配置文件中进行修改)

)rivatebooleanisTimeToReportOffset(){

longinterval二

HAService.this.defaultMessageStore.getSystemClockQ.now()一thi

.lastWriteTimestamp;

booleanneedHeart=interval>HAService.this.defaultMessageStore.

IgetMessageStoreConfig()

|,getHaSendHeartbeatInterval();

(3)reportSlaveMaxOffset(this.currentReportedOffset)

reportSlaveMaxOffset方法的作用是向master汇报slave端commitlog最大

物理偏移量,也就是将currentReportedOffset汇报给master,这里可以发现

是将currentReportedOffset存储在reportOffset中(reportOffset是8个

字节),然后将reportOffset发送给master,这里在向master发送时有个循

环,猜测该循环与通道是非阻塞性质有关,加上循环可以确保reportOffset发

送完成,发送完成后修改lastWriteTimestamp,最后如果reportOffset没有

空余则返回true

xrivatebooleanreportSlaveMaxOffset(finallongmaxOffset)

this.reportOffset.position(0);

this.reportOffset.],益

this.reportOffset.putLong(maxOffset);

this.reportOffset.position(0);

this.reportOffset.1,丁;十

r(inti=0;i<3&&this.reportOffset.hasRemainingQ;i++)

try

this.socketChannel.write(this.reportOffset);

}catch(lOExceptione){

log,error(this.getServiceName()

+nreportSlaveMaxOffsetthis.socketChannel.writeexception11,

e);

returnfalse;

lastWriteTimestamp=HAService.this.defaultMessageStore.getSyste|

|mClock().now();

return!this.reportOffset.hasRemainingO;

(4)reportSlaveMaxOffsetPlus()reportSlaveMaxOffsetPlus方法的作用是

判断slave端的当前commitlog的最大物理偏移量是否有增长,如果有则更新

reportSlaveMaxOffset并调用reportSlaveMaxOffset向master汇报

rivatebooleanreportSlaveMaxOffsetPlus()

booleanresult=true;

|10ngcurrentPhyOffset=HAService.this.defaultMessageStore.getMax|

if(currentPhyOffset>this.currentReportedOffset)

result=this.reportSlaveMaxOffset(this.currentReportedOffset);

if(!result)

this.closeMaster();

log.error("HAClient,reportSlaveMaxOffseterror,"+this,curr

entReportedOffset);

returnresult;

3.master根据待拉取数据的物理偏移量打包数据并发给slave

3.1master读取数据

从上面可知master读取slave发送数据是由HAConnection的

readSocketService对象的processReadEvent方法完成的,具体如下:

(1)首先判断byteBufferRead是否还有剩余空间,byteBufferRead的作用是

存储master读取的数据,如果没有空间则对byteBufferRead进行flip操作,

同时将processPosition重置为0,processPosition表示处理

byteBufferRead的位置

(2)读取通道中的数据并存储到byteBufferRead

(3)判断byteBufferRead的position与processPosition之间的差值是否大

于等于8,之所以进行判断是因为slave向master汇报commitlog的最大物理

偏移量占了8个字节,所以如果大于等于8则表示有一个完整的数据可以进行

处理。在条件满足的情况下会进行以下操作:

・计算出byteBufferRead距离当前位置最近的位置,具体见下图:假设下

图中每个单元格代表8个字节,position是(24,32)之间的任意一个位

置,现在要计算的是距离当前位置最近的一个完整的数据的结束位置,

具体算法是:this.byteBufferRead.position()-

(this.byteBufferRead.position()%8),计算出结束的位置后用当前

位置减去8就是开始的位置

•读取[pos-8,pos]之间的数据并存储在readOffset

•将processPosition移动到pos位置

(4)将上面读取到的待拉取数据的物理偏移量存储在slaveAckOffset(5)

判断slaveRequestOffset是否小于0,如果小于0则更新为本次slave待拉取

数据的物理偏移量(slaveAckOffset存储的是slave端已经拉取完成的物理偏

移量,slaveRequestOffset存储的是slave端请求拉取的数据的物理偏移量)

rivatebooleanprocessReadEvent()

if(!this.byteBufferRead.hasRemainingQ)

・P!,fl,'pssin>i;i川-:0:

|while(this.byteBufferRead.hasRemainingO)

|intreadSize=this.socketChannel.read(this.byteBufferRead);.

|readSizeZeroT;二c・

|this.lastReadTimestamp=HAConnection.this.haService.getDef

|aultMessageStore().getSystemClock().now();

|if((this.byteBufferRead.position。-this.processPosition)

|intpos二this.byteBufferRead.position()一(this.byteBuffei

longreadOffset=this.byteBufferRead.getLong(pos-8);

this.processPosition=pos;

|HAConnection.this.slaveAckOffset二readOffset

|if(HAConnection.this.slaveRequestOffset<0)

|HAConnection.this.slaveRequestOffset=readOffset

Ilog.info(,,slave[>t+HAConnection.this.clientAddr+w]req|

|HAConnection.this.haService.notifyTransferSome(HAConnect

if(++readSizeZeroTimes>=3)

log,error("readsocket1"+HAConnection.this.clientAddr+”]

returnfalse;

}catch(lOExceptione){

10g.error("processReadEventexception",e);

returnfalse;

returntrue;

3.2master向slave写数据

接着来看下master读取完待拉取数据后的操作,这里就到

writeSocketService了,其主要完成的master向slave写数据的功能。其实

现都在run方法中,具体如下:

(1)首先判断slaveRequestOffset是否等于T,如果等于-1则表示master

还没有收到slave端待拉取数据的请求。这里需要注意slaveRequestOffset是

master在收到slave端汇报的待拉取数据物理偏移量是更新的,即

readSocketService的processReadEvent方法中。

(2)判断nextTransferFromWhere是否等于T,nextTransferFromWhere表示

master下次向slave同步数据的物理偏移量,如果它的值为T则表示是第一次

进行数据同步。如果slaveRequestOffset等于0则从当前commitlog中最后一

个文件开始进行数据传输。如果slaveRequestOffset不等于0则

nextTransferFromWhere被赋值为slaveRequestOffset,即从slave请求的位

置开始传输数据。

(3)lastWriteOver表示的是上次数据传输是否完成,如果上次数据传输已经

完成并且当前距离上次最后写入时间的间隔大于haSendHeartbeatInterval

(默认是5秒可在配置文件中进行配置)则会向slave发送一个12字节的数据

包,其中前8个字节用来存储nextTransferFromWhere,后4个字节存储的值

为0。如果上次数据传输没有完成则会继续传输数据然后再判断是否传输完

成,如果还是没有传输完成则结束本次事件处理,等到下次事件处理时继续传

输上次没有传输完成的数据。

(4)如果上次数据已经传输完成,则会根据nextTransferFromWhere获取该偏

移量之后所有的数据,如果在该偏移量之后没有数据则会等待100毫秒,如果

数据不为空,首先会判断数据的长度是否大于haTransferBatchSize(默认是

32KB)从这里可以看出slave很有可能会收到master传输的不完整的消息。接

着会用变量thisOffset记录本次数据传输开始的偏移量,然后更新

nextTransferFromWhere,并对selectResult的ByteBuffer进行limit操作限

制本次传输数据的大小,最后将selectResult赋给

selectMappedBufferResulto在进行数据传输前会先在byteBufferHeader中记

录本次数据传输的开始位置thisOffset和传输的数据大小。

(5)调用transferDataO方法进行数据传输,在进行数据传输时会先传输

byteBufferHeader,然后传输selectMappedBufferResulto数据传输完成后会

对selectMappedBufferResult释放。

这里我们稍微总结下,master向slave传输的数据包实际上分为两种:

・不包含消息的数据包这类数据包共12个字节,其中前8个字节用来存

储master向slave同步数据的起始偏移量,后4个字节存储的是消息的

长度,这里存储的值为0

・包含消息的数据包这类数据包共分为三个部分,其中前8个字节用来存

储本次向slave传输数据的起始偏移量,后四个字节用来存储本次传输

的消息长度,最后一个部分占用size字节表示的是消息

publicvoidrun(){

HAConnection.log.info(this,getServiceName()+"servicestarted11)

while(!this.isStoppedO)

this.selector,select(1000);

if(-1==HAConnection.this.slaveRequestOffset)

nue;

|if(-1■二二this.nextTransferFromWhere)

|if(0=HAConnection.this.slaveRequestOffset)

|longmasterOffset=HAConnection.this.haService.getDefault|

MessageStore().getCommitLogO.getMaxOffset();

|masterOffset

|master0ffse

|一(masterOffset%HAConnection.this.haService.getDefau|

[tMessageStore().getMessageStoreConfig()^^^^^^^^^^^^^^^^^^^^[

|.getMappedFileSizeCommitLogO);

1

~"""""」.7;「4);N"

this.nextTransferFromWhere=masterOffset;

this.nextTransferFromWhere=HAConnection.this.slaveReque|

stOffset;

|log,info("mastertransferdatafrom"+this.nextTransferFrom|

/here+"toslave]"+HAConnection.this.cl"n+八乂小

+“],andslaverequest"+HAConnection.this.slaveRequestOf

fset);

|longinterval

|HAConnection.this.haService.getDefaultMessageStore(),get]

|if(interval>HAConnection.this.haService.getDefaultMessage|

[Store().getMessageStoreConf

|.getHaSendHeartbeatInterval())

|this.byteBufferHeader.position(0);

.bylel»un,('rll('<ici(?r.IimiI(hc^idrrSize):

this.byteBufferHeader.putLong(this.nextTransferFromWhere

B

this.byteBufferHeader.putlnt(O)

this.lastWriteOver=this.transferDataQ;

nue;

this.lastWriteOver二this.transferDataQ;

inue;

|SelectMappedBufferResultselectResult=

|HAConnection.this.haService.getDefaultMessageStoreQtgetCo]

^nmitLogData(this.nextTransferFromWhere);

|intsize二selectResult.getSize()

|if(size>HAConnection.this.haService.getDefaultMessageStor|

,().getMessageStoreConfig().getllaTransferBatchSize())第

size二HAConnection.this.haService.getDefaultMessageStore|

().getMessageStoreConfig().getHaTransferBatchSize();

longthisOffset二this.nextTransferFromWhere;^^^^^^^^J

this.nextTransferFromWhere+=

selectResult.getByteBuffer().limit仁;”)

this.selectMappedBufferResult=selectResult;^^^^^^^^^J

this.byteBufferHeader.position(0);

this.byteBufferHeader.limit(headerSize);

this.byteBufferHeader.putLong(thisOffset)

this.byteBufferHeader.putlnt(size);

this.lastWriteOver二this.transferDataQ;

HAConnection.this.haService.getWaitNotifyObject(),allWaitF

}catch(Exception。){|

HAConnection.log.error(this.getServiceName()+“servicehase

xception.n,e);

break;

HAConnection.this.haService.getWaitNotifyObject().removeFromWai

tingThreadTable();

if(this.selectMappedBufferResult!=null)

this.selectMappedBufferResult.release。;

this.makeStop();

readSocketService.makeStop();

haService.removeConnection(HAConnection.this);

SelectionKeysk二this.socketChannel.keyFor(this,selector);

if(sk!=null)

sk.cancel();

this,selector.close();

this.socketChannel.close();

}catch(lOExceptione)

HAConnection.10g.error,'",e);

HAConnection.log.info(this^getServiceName()+"serviceend");

4.slave读取master发送的数据包

slave处理读事件的方法是HAClient中的processReadEvent方法,

byteBufferRead是slave端的读缓冲区。processReadEvent方法的逻辑是首先

判断byteBufferRead是否还有剩余空间,如果还有剩余空间则将channel中的

数据读取到byteBufferRead中,然后调用dispatchReadRequest方法处理读取

的数据。

•rivatebooleanprocessReadEvent(){

intreadSizeZeroTimes二0;

while(this.byteBufferRead.hasRemainingO)

intreadSize=this,socketchannel,read(this.byteBufferRead);

readSizeZeroTimes=0;

booleanresult二this.dispatchReadRequest()

10g.error("HAClient,dispatchReadRequesterror")

returnfalse;

if(++readSizeZeroTimes>=3)

log.info("HAClient,processReadEventreadsocket<0");

}catch(lOExceptione)

log,info("HAClient,processReadEventreadsocketexception”,(

B

returnfalse;

returntrue;

这里需要重点分析dispatchReadRequest方法:

(1)用readSocketPos记录byteBufferRead当前的position

(2)计算byteBufferRead中当前position与dispatchPosition

(dispatchPosition指向byteBufferRead中已经处理的位置的指针)差值

diff

(3)判断diff是否大于等于12字节,这里之所有判断与12字节的关系是因

为master向slave发送的数据包前12字节包含了传输的数据的起始物理偏移

量以及传输的数据的长度。如果条件成立则从byteBufferRead中分别读取传输

的数据的起始物理偏移量和传输的数据的长度并记录在masterPhyOffset和

bodySize中。

(4)从当前节点获取commitlog的最大物理偏移量slavePhyOffset,并判断

slavePhyOffset与masterPhyOffset是否相等,正常情况下两者应该是相等

的,如果不相等则输出error信息。

(5)判断diff是否大于等于msgHeaderSize+bodySize,如果条件成立表示

当前有一个完整的数据包,然后执行以下操作:

•将byteBufferRead的position设置为dispatchPosition+

msgHeaderSize,也就是数据包中数据开始的位置

•读取数据包中的消息并存储在bodyData中

•调用appendToCommitLog方法完成数据追加操作

•将byteBufferRead的position重新设置到readSocketPos

•将dispatchPosition向前移动msgHeaderSize+bodySize

•调用reportSlaveMaxOffsetPlus方法判断slave端commitlog的是否有

追加,如果有新的增加则向master汇报currentReportedOffset

privatebooleandispatchReadRequest()

finalintmsgHeaderSize=8+4;”,

intreadSocketPos二this.byteBufferRead.position。

while(true)

温馨提示

  • 1. 本站所有资源如无特殊说明,都需要本地电脑安装OFFICE2007和PDF阅读器。图纸软件为CAD,CAXA,PROE,UG,SolidWorks等.压缩文件请下载最新的WinRAR软件解压。
  • 2. 本站的文档不包含任何第三方提供的附件图纸等,如果需要附件,请联系上传者。文件的所有权益归上传用户所有。
  • 3. 本站RAR压缩包中若带图纸,网页内容里面会有图纸预览,若没有图纸预览就没有图纸。
  • 4. 未经权益所有人同意不得将文件中的内容挪作商业或盈利用途。
  • 5. 人人文库网仅提供信息存储空间,仅对用户上传内容的表现方式做保护处理,对用户上传分享的文档内容本身不做任何修改或编辑,并不能对任何下载内容负责。
  • 6. 下载文件中如有侵权或不适当内容,请与我们联系,我们立即纠正。
  • 7. 本站不保证下载资源的准确性、安全性和完整性, 同时也不承担用户因使用这些下载资源对自己和他人造成任何形式的伤害或损失。

评论

0/150

提交评论