java开发Dubbo负载均衡与集群容错示例详解_第1页
java开发Dubbo负载均衡与集群容错示例详解_第2页
java开发Dubbo负载均衡与集群容错示例详解_第3页
java开发Dubbo负载均衡与集群容错示例详解_第4页
java开发Dubbo负载均衡与集群容错示例详解_第5页
已阅读5页,还剩17页未读 继续免费阅读

下载本文档

版权说明:本文档由用户提供并上传,收益归属内容提供方,若内容存在侵权,请进行举报或认领

文档简介

第java开发Dubbo负载均衡与集群容错示例详解目录负载均衡与集群容错Invoker服务目录RegistryDirectory获取Invoker列表监听注册中心刷新Invoker列表StaticDirectory服务路由ClusterFailoverClusterInvokerFailfastClusterInvokerFailsafeClusterInvokerFailbackClusterInvokerForkingClusterInvokerBroadcastClusterInvokerAbstractClusterInvoker小结负载均衡AbstractLoadBalanceRandomLoadBalanceLeastActiveLoadBalanceConsistentHashLoadBalance

RoundRobinLoadBalance总结

负载均衡与集群容错

Invoker

在Dubbo中Invoker就是一个具有调用功能的对象,在服务提供端就是实际的服务实现,只是将服务实现封装起来变成一个Invoker。

在服务消费端,从注册中心得到服务提供者的信息之后,将一条条信息封装为Invoker,这个Invoker就具备了远程调用的能力。

综上,Dubbo就是创建了一个统一的模型,将可调用(可执行体)的服务对象都统一封装为Invoker。

而ClusterInvoker就是将多个服务引入的Invoker封装起来,对外统一暴露一个Invoker,并且赋予这些Invoker集群容错的功能。

服务目录

服务目录,即Directory,实际上它就是多个Invoker的集合,服务提供端一般都会集群分布,同样的服务会有多个提供者,因此需要一个服务目录来统一存放它们,需要调用服务的时候便从这个服务目录中进行挑选。

同时服务目录还是实现了NotifyListener接口,当集群中新增了一台服务提供者或者下线了一台服务提供者,目录都会对服务提供者进行更新,新增或者删除对应的Invoker。

从上图中,可以看到用了一个抽象类AbstractDirectory来实现Directory接口,抽象类中运用到了模板方法模式,将一些公共方法和逻辑写好,作为一个骨架,然后具体实现由了两个子类来完成,两个子类分别为StaticDirectory和RegistryDirectory。

RegistryDirectory

RegistryDirectory实现了NotifyListener接口,可以监听注册中心的变化,当注册中心配置发生变化时,服务目录也可以收到变更通知,然后根据更新之后的配置刷新Invoker列表。

由此可知RegistryDirectory共有三个作用:

获取Invoker列表监听注册中心刷新Invoker列表

获取Invoker列表

RegistryDirectory实现了父类AbstractDirectory的抽象方法doList(),该方法可以得到Invoker列表

publicListInvokerTdoList(Invocationinvocation){

if(this.forbidden){

thrownewRpcException(....);

}else{

ListInvokerTinvokers=null;

MapString,ListInvokerTlocalMethodInvokerMap=this.methodInvokerMap;//获取方法调用名和Invoker的映射表

if(localMethodInvokerMap!=nulllocalMethodInvokerMap.size()0){

StringmethodName=RpcUtils.getMethodName(invocation);

Object[]args=RpcUtils.getArguments(invocation);

//以下就是根据方法名和方法参数获取可调用的Invoker

if(args!=nullargs.length0args[0]!=null(args[0]instanceofString||args[0].getClass().isEnum())){

invokers=(List)localMethodInvokerMap.get(methodName+"."+args[0]);

if(invokers==null){

invokers=(List)localMethodInvokerMap.get(methodName);

if(invokers==null){

invokers=(List)localMethodInvokerMap.get("*");

if(invokers==null){

IteratorListInvokerTiterator=localMethodInvokerMap.values().iterator();

if(iterator.hasNext()){

invokers=(List)iterator.next();

return(List)(invokers==nullnewArrayList(0):invokers);

监听注册中心

通过实现NotifyListener接口可以感知注册中心的数据变更。

RegistryDirectory定义了三个集合invokerUrlsrouterUrlsconfiguratorUrls分别处理对应的配置然后转化成对象。

publicsynchronizedvoidnotify(ListURLurls){

ListURLinvokerUrls=newArrayList();

ListURLrouterUrls=newArrayList();

ListURLconfiguratorUrls=newArrayList();

Iteratori$=urls.iterator();

while(true){

while(true){

while(i$.hasNext()){

//....根据urls填充上述三个列表

if(configuratorUrls!=null!configuratorUrls.isEmpty()){

this.configurators=toConfigurators(configuratorUrls);//根据urls转化为configurators配置

ListlocalConfigurators;

if(routerUrls!=null!routerUrls.isEmpty()){

localConfigurators=this.toRouters(routerUrls);

if(localConfigurators!=null){

this.setRouters(localConfigurators);//根据urls转化为routers配置

localConfigurators=this.configurators;

this.overrideDirectoryUrl=this.directoryUrl;

Configuratorconfigurator;

if(localConfigurators!=null!localConfigurators.isEmpty()){

for(Iteratori$=localConfigurators.iterator();i$.hasNext();this.overrideDirectoryUrl=configurator.configure(this.overrideDirectoryUrl)){

configurator=(Configurator)i$.next();

this.refreshInvoker(invokerUrls);//根据invokerUrls刷新invoker列表

return;

刷新Invoker列表

privatevoidrefreshInvoker(ListURLinvokerUrls){

//如果invokerUrls只有一个URL并且协议是empty,那么清除所有invoker

if(invokerUrls!=nullinvokerUrls.size()==1invokerUrls.get(0)!=null"empty".equals(((URL)invokerUrls.get(0)).getProtocol())){

this.forbidden=true;

this.methodInvokerMap=null;

this.destroyAllInvokers();

}else{

this.forbidden=false;

MapString,InvokerToldUrlInvokerMap=this.urlInvokerMap;//获取旧的Invoker列表

if(invokerUrls.isEmpty()this.cachedInvokerUrls!=null){

invokerUrls.addAll(this.cachedInvokerUrls);

}else{

this.cachedInvokerUrls=newHashSet();

this.cachedInvokerUrls.addAll(invokerUrls);

if(invokerUrls.isEmpty()){

return;

//根据URL生成InvokerMap

MapString,InvokerTnewUrlInvokerMap=this.toInvokers(invokerUrls);

//根据新的InvokerMap生成方法名和Invoker列表对应的Map

MapString,ListInvokerTnewMethodInvokerMap=this.toMethodInvokers(newUrlInvokerMap);

if(newUrlInvokerMap==null||newUrlInvokerMap.size()==0){

logger.error(newIllegalStateException("urlstoinvokerserror.invokerUrls.size:"+invokerUrls.size()+",invoker.size:0.urls:"+invokerUrls.toString()));

return;

this.methodInvokerMap=this.multiGroupthis.toMergeMethodInvokerMap(newMethodInvokerMap):newMethodInvokerMap;

this.urlInvokerMap=newUrlInvokerMap;

try{

this.destroyUnusedInvokers(oldUrlInvokerMap,newUrlInvokerMap);//销毁无效的Invoker

}catch(Exceptionvar6){

logger.warn("destroyUnusedInvokerserror.",var6);

上述操作就是根据invokerUrls数量以及协议头是否为empty来判断是否禁用所有invokers,如果不禁用的话将invokerUrls转化为Invoker,并且得到url,Invoker的映射关系。

再进一步进行转化,得到methodName,List的映射关系,再将同一组的Invoker进行合并,将合并结果赋值给methodInvokerMap,这个methodInvokerMap就是在doList中使用到的Map。

最后刷新InvokerMap,销毁无效的Invoker。

StaticDirectory

StaticDirectory是静态目录,所有Invoker是固定的不会删减的,并且所有Invoker由构造器来传入。

内部逻辑也相当简单,只定义了一个列表用于存储Invokers。实现父类的方法也只是将这些Invokers原封不动地返回。

privatefinalListInvokerTinvokers;

protectedListInvokerTdoList(Invocationinvocation)throwsRpcException{

returnthis.invokers;

服务路由

服务路由规定了服务消费者可以调用哪些服务提供者,Dubbo常用的是条件路由ConditionRouter。

条件路由由两个条件组成,格式为[服务消费者匹配条件]=[服务提供者匹配条件],例如5=9规定了只有IP为5的服务消费者才可以访问IP为9的服务提供者,不可以调用其他的服务。

路由一样是通过RegistryDirectory中的notify()更新的,在调用toMethodInvokers()的时候会进行服务器级别的路由和方法级别的路由。

Cluster

在前面的流程中我们已经通过Directory获取了服务目录,并且通过路由获取了一个或多个Invoker,但是对于服务消费者还是需要进行选择,筛选出一个Invoker进行调用。

Dubbo默认的Cluster实现有多种,如下:

FailoverClusterFailfastClusterFailsafeClusterFailbackClusterBroadcastClusterAvailableCluster

每个Cluster内部返回的都是xxxClusterInvoker,例如FailoverCluster:

publicclassFailoverClusterimplementsCluster{

publicstaticfinalStringNAME="failover";

publicFailoverCluster(){

publicTInvokerTjoin(DirectoryTdirectory)throwsRpcException{

returnnewFailoverClusterInvoker(directory);

FailoverClusterInvoker

FailoverClusterInvoker实现的功能是失败调用(有重试次数)自动切换。

publicResultdoInvoke(Invocationinvocation,ListInvokerTinvokers,LoadBalanceloadbalance)throwsRpcException{

ListInvokerTcopyinvokers=invokers;

this.checkInvokers(invokers,invocation);

//重试次数

intlen=this.getUrl().getMethodParameter(invocation.getMethodName(),"retries",2)+1;

if(len=0){

len=1;

RpcExceptionle=null;

ListInvokerTinvoked=newArrayList(invokers.size());

SetStringproviders=newHashSet(len);

//根据重试次数循环调用

for(inti=0;ilen;++i){

if(i0){

this.checkWhetherDestroyed();

copyinvokers=this.list(invocation);

this.checkInvokers(copyinvokers,invocation);

//负载均衡筛选出一个Invoker作本次调用

InvokerTinvoker=this.select(loadbalance,invocation,copyinvokers,invoked);

//将使用过的Invoker保存起来,下次重试时做过滤用

invoked.add(invoker);

//记录到上下文中

RpcContext.getContext().setInvokers(invoked);

try{

//发起调用

Resultresult=invoker.invoke(invocation);

if(le!=nulllogger.isWarnEnabled()){

logger.warn("....");

Resultvar12=result;

returnvar12;

}catch(RpcExceptionvar17){//catch异常继续下次循环重试

if(var17.isBiz()){

throwvar17;

le=var17;

}catch(Throwablevar18){

le=newRpcException(var18.getMessage(),var18);

}finally{

providers.add(invoker.getUrl().getAddress());

thrownewRpcException(....);

上述方法中,首先获取重试次数len,根据重试次数进行循环调用,调用发生异常会被catch住,然后重新调用。

每次循环会通过负载均衡选出一个Invoker,然后利用这个Invoker进行远程调用,每次选出的Invoker会记录下来,在下次调用的select()中会将使用上次调用的Invoker进行重试,如果上一次没有调用或者上次调用的Invoker下线了,那么会重新进行负载均衡进行选择。

FailfastClusterInvoker

FailfastClusterInvoker只会进行一次远程调用,如果失败后立马抛出异常。

publicResultdoInvoke(Invocationinvocation,ListInvokerTinvokers,LoadBalanceloadbalance)throwsRpcException{

this.checkInvokers(invokers,invocation);

Invokerinvoker=this.select(loadbalance,invocation,invokers,(List)null);//负载均衡选择Invoker

try{

returninvoker.invoke(invocation);//发起远程调用

}catch(Throwablevar6){//失败调用直接将错误抛出

if(var6instanceofRpcException((RpcException)var6).isBiz()){

throw(RpcException)var6;

}else{

thrownewRpcException(....);

FailsafeClusterInvoker

FailsafeClusterInvoker是一种安全失败的cluster,调用发生错误仅仅是记录一下日志,然后就返回了空结果。

publicResultdoInvoke(Invocationinvocation,ListInvokerTinvokers,LoadBalanceloadbalance)throwsRpcException{

try{

this.checkInvokers(invokers,invocation);

//负载均衡选出Invoker后直接进行调用

InvokerTinvoker=this.select(loadbalance,invocation,invokers,(List)null);

returninvoker.invoke(invocation);

}catch(Throwablevar5){//调用错误只是打印日志

logger.error("Failsafeignoreexception:"+var5.getMessage(),var5);

returnnewRpcResult();

FailbackClusterInvoker

FailbackClusterInvoker调用失败后,会记录下本次调用,然后返回一个空结果给服务消费者,并且会通过一个定时任务对失败的调用进行重试。适用于执行消息通知等最大努力场景。

protectedResultdoInvoke(Invocationinvocation,ListInvokerTinvokers,LoadBalanceloadbalance)throwsRpcException{

try{

this.checkInvokers(invokers,invocation);

//负载均衡选出Invoker

InvokerTinvoker=this.select(loadbalance,invocation,invokers,(List)null);

//执行调用,执行成功返回调用结果

returninvoker.invoke(invocation);

}catch(Throwablevar5){

//调用失败

logger.error("....");

//记录下本次失败调用

this.addFailed(invocation,this);

//返回空结果

returnnewRpcResult();

privatevoidaddFailed(Invocationinvocation,AbstractClusterInvokerrouter){

if(this.retryFuture==null){

synchronized(this){

//如果未创建重试本次调用的定时任务

if(this.retryFuture==null){

//创建定时任务

this.retryFuture=this.scheduledExecutorService.scheduleWithFixedDelay(newRunnable(){

publicvoidrun(){

try{

//定时进行重试

FailbackClusterInvoker.this.retryFailed();

}catch(Throwablevar2){

FailbackClusterInvoker.logger.error("....",var2);

},5000L,5000L,TimeUnit.MILLISECONDS);

//将invocation和router存入map

this.failed.put(invocation,router);

voidretryFailed(){

if(this.failed.size()!=0){

Iteratori$=(newHashMap(this.failed)).entrySet().iterator();

while(i$.hasNext()){

EntryInvocation,AbstractClusterInvokerentry=(Entry)i$.next();

Invocationinvocation=(Invocation)entry.getKey();

Invokerinvoker=(Invoker)entry.getValue();

try{

//进行重试调用

invoker.invoke(invocation);

//调用成功未产生异常则移除本次失败调用的记录,销毁定时任务

this.failed.remove(invocation);

}catch(Throwablevar6){

logger.error("....",var6);

逻辑比较简单,大致就是当调用错误时返回空结果,并记录下本次失败调用到failedinvocation,router中,并且会创建一个定时任务定时地去调用failed中记录的失败调用,如果调用成功了就从failed中移除这个调用。

ForkingClusterInvoker

ForkingClusterInvoker运行时,会将所有Invoker都放入线程池中并发调用,只要有一个Invoker调用成功了就返回结果,doInvoker方法立即停止运行。

适用于对实时性比较高的读写操作。

publicResultdoInvoke(finalInvocationinvocation,ListInvokerTinvokers,LoadBalanceloadbalance)throwsRpcException{

Resultvar19;

try{

this.checkInvokers(invokers,invocation);

intforks=this.getUrl().getParameter("forks",2);

inttimeout=this.getUrl().getParameter("timeout",1000);

finalObjectselected;

if(forks0forksinvokers.size()){

selected=newArrayList();

for(inti=0;iforks;++i){

InvokerTinvoker=this.select(loadbalance,invocation,invokers,(List)selected);

if(!((List)selected).contains(invoker)){

//选择好的Invoker放入这个selected列表

((List)selected).add(invoker);

}else{

selected=invokers;

RpcContext.getContext().setInvokers((List)selected);

finalAtomicIntegercount=newAtomicInteger();

//阻塞队列

finalBlockingQueueObjectref=newLinkedBlockingQueue();

Iteratori$=((List)selected).iterator();

while(i$.hasNext()){

finalInvokerTinvoker=(Invoker)i$.next();

this.executor.execute(newRunnable(){

publicvoidrun(){

try{

Resultresult=invoker.invoke(invocation);

ref.offer(result);

}catch(Throwablevar3){

intvalue=count.incrementAndGet();

if(value=((List)selected).size()){//等待所有调用都产生异常才入队

ref.offer(var3);

try{

//阻塞获取结果

Objectret=ref.poll((long)timeout,TimeUnit.MILLISECONDS);

if(retinstanceofThrowable){

Throwablee=(Throwable)ret;

thrownewRpcException(....);

var19=(Result)ret;

}catch(InterruptedExceptionvar14){

thrownewRpcException(....);

}finally{

RpcContext.getContext().clearAttachments();

returnvar19;

BroadcastClusterInvoker

BroadcastClusterInvoker运行时会将所有Invoker逐个调用,在最后判断中如果有一个调用产生错误,则抛出异常。

适用于通知所有提供者更新缓存或日志等本地资源的场景。

publicResultdoInvoke(Invocationinvocation,ListInvokerTinvokers,LoadBalanceloadbalance)throwsRpcException{

this.checkInvokers(invokers,invocation);

RpcContext.getContext().setInvokers(invokers);

RpcExceptionexception=null;

Resultresult=null;

Iteratori$=invokers.iterator();

while(i$.hasNext()){

Invokerinvoker=(Invoker)i$.next();

try{

result=invoker.invoke(invocation);

}catch(RpcExceptionvar9){

exception=var9;

logger.warn(var9.getMessage(),var9);

}catch(Throwablevar10){

exception=newRpcException(var10.getMessage(),var10);

logger.warn(var10.getMessage(),var10);

//如果调用过程中发生过错误抛出异常

if(exception!=null){

throwexception;

}else{

//返回调用结果

returnresult;

AbstractClusterInvoker

AbstractClusterInvoker是上述所有类的父类,内部结构较为简单。AvailableCluster内部返回结果就是AvailableClusterInvoker。

publicclassAvailableClusterimplementsCluster{

publicstaticfinalStringNAME="available";

publicAvailableCluster(){

publicTInvokerTjoin(DirectoryTdirectory)throwsRpcException{

returnnewAbstractClusterInvokerT(directory){

publicResultdoInvoke(Invocationinvocation,ListInvokerTinvokers,LoadBalanceloadbalance)throwsRpcException{

Iteratori$=invokers.iterator();

Invokerinvoker;

do{//循环判断:哪个invoker能用就调用哪个

if(!i$.hasNext()){

thrownewRpcException("Noprovideravailablein"+invokers);

invoker=(Invoker)i$.next();

}while(!invoker.isAvailable());

returninvoker.invoke(invocation);

小结

上述中有很多种集群的实现,各适用于不同的场景,加了Cluster这个中间层,向服务消费者屏蔽了集群调用的细节,并且支持不同场景使用不同的模式。

负载均衡

Dubbo中的负载均衡,即LoadBalance,服务提供者一般都是集群分布,所以需要Dubbo选择出合适的服务提供者来给服务消费者调用。

Dubbo中提供了多种负载均衡算法:

RandomLoadBalanceLeastActiveLoadBalanceConsistentHashLoadBalanceRoundRobinLoadBalance

AbstractLoadBalance

实现类都继承了于这个类,该类实现了LoadBalance,使用模板方法模式,将一些公用的逻辑封装好,而具体的实现由子类自定义。

publicTInvokerTselect(ListInvokerTinvokers,URLurl,Invocationinvocation){

if(invokers!=null!invokers.isEmpty()){

//子类实现

returninvokers.size()==1(Invoker)invokers.get(0):this.doSelect(invokers,url,invocation);

}else{

returnnull;

protectedabstractTInvokerTdoSelect(ListInvokerTvar1,URLvar2,Invocationvar3);

服务刚启动需要预热,不能突然让服务负载过高,需要进行服务的降权。

protectedintgetWeight(Invokerinvoker,Invocationinvocation){

intweight=invoker.getUrl().getMethodParameter(invocation.getMethodName(),"weight",100);//获得权重

if(weight0){

longtimestamp=invoker.getUrl().getParameter("remote.timestamp",0L);//启动时间

if(timestamp0L){

intuptime=(int)(System.currentTimeMillis()-timestamp);//计算已启动时长

intwarmup=invoker.getUrl().getParameter("warmup",600000);

if(uptime0uptimewarmup){

weight=calculateWarmupWeight(uptime,warmup,weig

温馨提示

  • 1. 本站所有资源如无特殊说明,都需要本地电脑安装OFFICE2007和PDF阅读器。图纸软件为CAD,CAXA,PROE,UG,SolidWorks等.压缩文件请下载最新的WinRAR软件解压。
  • 2. 本站的文档不包含任何第三方提供的附件图纸等,如果需要附件,请联系上传者。文件的所有权益归上传用户所有。
  • 3. 本站RAR压缩包中若带图纸,网页内容里面会有图纸预览,若没有图纸预览就没有图纸。
  • 4. 未经权益所有人同意不得将文件中的内容挪作商业或盈利用途。
  • 5. 人人文库网仅提供信息存储空间,仅对用户上传内容的表现方式做保护处理,对用户上传分享的文档内容本身不做任何修改或编辑,并不能对任何下载内容负责。
  • 6. 下载文件中如有侵权或不适当内容,请与我们联系,我们立即纠正。
  • 7. 本站不保证下载资源的准确性、安全性和完整性, 同时也不承担用户因使用这些下载资源对自己和他人造成任何形式的伤害或损失。

评论

0/150

提交评论