java实现简易版简易版dubbo_第1页
java实现简易版简易版dubbo_第2页
java实现简易版简易版dubbo_第3页
java实现简易版简易版dubbo_第4页
java实现简易版简易版dubbo_第5页
已阅读5页,还剩7页未读 继续免费阅读

下载本文档

版权说明:本文档由用户提供并上传,收益归属内容提供方,若内容存在侵权,请进行举报或认领

文档简介

第java实现简易版简易版dubbo@Override

publicvoidpostProcessBeanFactory(ConfigurableListableBeanFactorybeanFactory)throwsBeansException{

privateInterfaceConfigtransform(Serviceref){

InterfaceConfiginterfaceConfig=newInterfaceConfig();

interfaceConfig.setGroup(ref.group());

interfaceConfig.setVersion(ref.version());

interfaceConfig.setTimeout(ref.timeout());

returninterfaceConfig;

@Override

publicvoidsetBeanClassLoader(ClassLoaderclassLoader){

this.classLoader=classLoader;

4.2.2解析注解参数,注册服务到zookeeper

如上,我们在ServiceBean中,依赖标记有@Service的bean,同时将其曝光到本地,开启netty端口监听,注册到zk,具体如下,见afterPropertiesSet方法:

packagecessor;

importcom.jessin.practice.dubbo.config.InterfaceConfig;

importcom.jessin.practice.dubbo.config.MiniDubboProperties;

importcom.jessin.practice.dubbo.exporter.DubboExporter;

importty.NettyManager;

importty.NettyServer;

importcom.jessin.practice.dubbo.registry.CuratorZookeeperClient;

importcom.jessin.practice.dubbo.registry.RegistryManager;

importcom.jessin.practice.dubbo.utils.NetUtils;

importlombok.extern.slf4j.Slf4j;

importorg.springframework.beans.factory.DisposableBean;

importorg.springframework.beans.factory.InitializingBean;

*@Author:jessin

*@Date:19-11-27下午10:31

@Slf4j

publicclassServiceBeanimplementsInitializingBean,DisposableBean{

privateNettyServernettyServer;

*zk地址

privateCuratorZookeeperClientcuratorZookeeperClient;

privateObjectref;

privateMiniDubboPropertiesminiDubboProperties;

privateInterfaceConfiginterfaceConfig;

publicMiniDubboPropertiesgetMiniDubboProperties(){

returnminiDubboProperties;

publicvoidsetMiniDubboProperties(MiniDubboPropertiesminiDubboProperties){

this.miniDubboProperties=miniDubboProperties;

publicInterfaceConfiggetInterfaceConfig(){

returninterfaceConfig;

publicvoidsetInterfaceConfig(InterfaceConfiginterfaceConfig){

erfaceConfig=interfaceConfig;

publicObjectgetRef(){

returnref;

publicvoidsetRef(Objectref){

this.ref=ref;

@Override

publicvoidafterPropertiesSet()throwsException{

Class[]interfaces=ref.getClass().getInterfaces();

if(interfaces.length=0){

thrownewIllegalStateException(ref.getClass().getName()+"未实现接口");

//todo目前只能实现一个接口

StringclazzName=interfaces[0].getName();

("曝光key:{},ref:{}",clazzName,ref);

//暴露服务todo版本

DubboExporter.exportService(clazzName,ref);

//先开启,再注册

//判断协议

if("dubbo".equals(miniDubboProperties.getProtocol())){

//开启nettyserver

nettyServer=NettyManager.getNettyServer(miniDubboProperties.getPort());

}else{

thrownewRuntimeException("unknowncommunicateprotocol:"+miniDubboProperties.getProtocol());

//判断什么类型的注册中心

curatorZookeeperClient=RegistryManager.getCuratorZookeeperClient(miniDubboProperties.getRegistry());

StringproviderPath="/miniDubbo/"+interfaceConfig.getGroup()+"/"+clazzName+"/providers"+"/"+NetUtils.getServerIp()+":"+miniDubboProperties.getPort();

//注册zk,提炼register方法

curatorZookeeperClient.create(providerPath,true);

@Override

publicvoiddestroy()throwsException{

curatorZookeeperClient.doClose();

nettyServer.close();

4.2.3开启nettyserver,接收请求

在接受到consumer请求后,解码,然后根据类名、方法名,找到对应的曝光服务,进行反射调用,将方法返回结果和请求id原样写出去,返回给客户端。具体如下:

packagety;

importcom.alibaba.fastjson.JSON;

importcom.alibaba.fastjson.JSONObject;

importcom.jessin.practice.dubbo.exception.DubboException;

importcom.jessin.practice.dubbo.exporter.DubboExporter;

importcom.jessin.practice.dubbo.invoker.RpcInvocation;

importcom.jessin.practice.dubbo.transport.Request;

importcom.jessin.practice.dubbo.transport.Response;

importcom.jessin.practice.dubbo.utils.ArgDeserializerUtils;

importty.bootstrap.ServerBootstrap;

importty.channel.ChannelDuplexHandler;

importty.channel.ChannelFuture;

importty.channel.ChannelHandler;

importty.channel.ChannelHandlerContext;

importty.channel.ChannelInitializer;

importty.channel.ChannelOption;

importty.channel.ChannelPromise;

importty.channel.EventLoopGroup;

importty.channel.nio.NioEventLoopGroup;

importty.channel.socket.SocketChannel;

importty.channel.socket.nio.NioServerSocketChannel;

importjava.lang.reflect.Method;

importlombok.extern.slf4j.Slf4j;

*@Author:jessin

*@Date:19-11-27下午7:40

@Slf4j

publicclassNettyServer{

//todo底层会启动2*cpu个数的NioEventLoop,轮询注册到对应的NioEventLoop运行

privateEventLoopGroupboss=newNioEventLoopGroup();

privateEventLoopGroupworker=newNioEventLoopGroup();

//全局复用,是否需要考虑可共享?

privateServerHandlerserverHandler=newServerHandler();

privateintport;

publicNettyServer(intport){

ServerBootstrapserverBootstrap=newServerBootstrap();

//boss线程池用于accept到达的请求,worker线程池对到达的请求进行读写

//child表示对到达的请求起作用,没有child表示对ServerSocketChannel起作用

//服务端用NioServerSocketChannel

ChannelFuturechannelFuture;

this.port=port;

try{

serverBootstrap.group(boss,worker)

.channel(NioServerSocketChannel.class)

.childOption(ChannelOption.SO_KEEPALIVE,true)

//todooption最终设置到jdkseverchannel上

.option(ChannelOption.SO_BACKLOG,1024)

.childHandler(newChannelInitializerSocketChannel(){

@Override

protectedvoidinitChannel(SocketChannelch)throwsException{

//对到达的请求进行读写操作,责任链模式,ChannelPipeline

ch.pipeline()

.addLast(newBaseDecoder())

.addLast(newBaseEncoder())

.addLast(serverHandler);

//todobind时,会新建NioServerSocketChannel,并注册到NioEventLoop.selector中

//todo底层转换为pipeline.bind(),最终调用serverSocketChannel.bind(socketAddress,128);

channelFuture=serverBootstrap.bind(port);

//下面会阻塞

channelFuture.sync();

("服务器绑定端口:{}成功",port);

//TODO关闭时调用,客户端也得关闭

//channelFuture.channel().closeFuture().sync();

}catch(Exceptione){

thrownewRuntimeException("bindporterror:"+port,e);

*允许注册到多个客户端SocketChannel中

@ChannelHandler.Sharable

classServerHandlerextendsChannelDuplexHandler{

@Override

publicvoidchannelActive(ChannelHandlerContextctx)throwsException{

("客户端:{}和服务端建立连接成功",ctx.channel().remoteAddress());

@Override

publicvoidchannelRead(ChannelHandlerContextctx,Objectmsg)throwsException{

//这里是String类型,已经解码了

Requestrequest=JSONObject.parseObject((String)msg,Request.class);

("收到请求消息:{}",msg);

RpcInvocationrpcInvocation=request.getRpcInvocation();

Objectobj=DubboExporter.getService(rpcInvocation);

if(obj==null){

thrownewIllegalStateException("服务端未曝光接口:"+request);

Responseresponse=newResponse();

response.setId(request.getId());

try{

("开始反射调用:{}",msg);

//todo这里最好用线程池实现,不然会阻塞NioEventLoop

Methodmethod=obj.getClass().getMethod(rpcInvocation.getMethodName(),rpcInvocation.getParameterType());

Object[]originArgs=ArgDeserializerUtils.parseArgs(method,rpcInvocation.getParameterType(),rpcInvocation.getArgs());

("入参:{}",originArgs);

ObjectresponseData=method.invoke(obj,originArgs);

response.setResult(responseData);

("调用实例:{},方法:{},返回结果:{}",

obj,method,response);

}catch(Exceptione){

log.error("调用dubbo异常:{}",rpcInvocation,e);

response.setException(true);

response.setResult(newDubboException("服务端调用接口异常",e));

//TODO通过原来客户端通道发送出去

//这里会走编码吗?,必须写成String,或者改下Encoder

ctx.writeAndFlush(JSON.toJSONString(response));

@Override

publicvoidchannelInactive(ChannelHandlerContextctx)throwsException{

("收到客户端退出的消息");

ctx.close();

@Override

publicvoidexceptionCaught(ChannelHandlerContextctx,Throwablecause)throwsException{

log.error("IO出错了...",cause);

@Override

publicvoidwrite(ChannelHandlerContextctx,Objectmsg,ChannelPromisepromise)throwsException{

("发起写请求:{}",msg);

//TODO写的一般都有这个?

super.write(ctx,msg,promise);

*dubboshutdownhook

publicvoidclose(){

//TODO这里是否有问题??

("关闭端口:{}",port);

boss.shutdownGracefully();

worker.shutdownGracefully();

4.3自动化配置实现

这里根据yaml中配置的开关,自动开启consumer/provider配置,需要注意的是,由于注入的@Service处理器是容器级别的后处理器,需要使用静态方法进行注入,避免过早初始化自动配置类,而且不能@autowirte自动化属性,需要通过方法获取Environment,因为这个时候自动化属性类还没有对应的后处理器对其进行处理,拿到的属性是空的,需要自己做bind。

最后在Resource目录下,META-INF/spring.factories下,配置自动启动即可:

packagecom.jessin.practice.dubbo.config;

importcessor.ReferenceBeanPostProcessor;

importcessor.Service;

importcessor.ServiceBeanPostProcessor;

importlombok.extern.slf4j.Slf4j;

importorg.springframework.boot.autoconfigure.condition.AnyNestedCondition;

importorg.springframework.boot.autoconfigure.condition.ConditionalOnClass;

importorg.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;

importorg.springframework.boot.autoconfigure.condition.ConditionalOnProperty;

importperties.EnableConfigurationProperties;

importperties.bind.Bindable;

importperties.bind.Binder;

importorg.springframework.context.annotation.Bean;

importorg.springframework.context.annotation.Conditional;

importorg.springframework.context.annotation.Configuration;

importorg.springframework.core.env.Environment;

*todo自己调用自己,以及url支持,资源销毁

*@Author:jessin

*@Date:2025/10/269:27PM

@Configuration

@ConditionalOnClass(Service.class)

@EnableConfigurationProperties(MiniDubboProperties.class)

@Slf4j

publicclassMiniDubboAutoConfiguration{

static{

FastJsonConfig.config();

*由于BeanFactoryPostProcessor是提前获取的,这个时候CommonAnnotationBeanPostProcessor还没注册到beanFactory中,

*serviceBeanPostProcessor注入的属性为空

//@Autowired

//privateMiniDubboPropertiesminiDubboProperties;

//publicMiniDubboAutoConfiguration(){

//("initMiniDubboAutoConfiguration");

*由于这个Bean是BeanFactoryPostProcessor,提前获取时,

*ConfigurationProperties的ConfigurationPropertiesBindingPostProcessor还没注入到beanFactory中,

*所以MiniDubboProperties属性没法注入

*这里通过environment构造

*@paramenvironment

*@return

@Bean

@Conditional(ServerCondition.class)

@ConditionalOnMissingBean

publicstaticServiceBeanPostProcessorserviceBeanPostProcessor(Environmentenvironment){

MiniDubboPropertiesminiDubboProperties=getMiniDubboProperties(environment);

returnnewServiceBeanPostProcessor(miniDubboProperties);

staticclassServerConditionextendsAnyN

温馨提示

  • 1. 本站所有资源如无特殊说明,都需要本地电脑安装OFFICE2007和PDF阅读器。图纸软件为CAD,CAXA,PROE,UG,SolidWorks等.压缩文件请下载最新的WinRAR软件解压。
  • 2. 本站的文档不包含任何第三方提供的附件图纸等,如果需要附件,请联系上传者。文件的所有权益归上传用户所有。
  • 3. 本站RAR压缩包中若带图纸,网页内容里面会有图纸预览,若没有图纸预览就没有图纸。
  • 4. 未经权益所有人同意不得将文件中的内容挪作商业或盈利用途。
  • 5. 人人文库网仅提供信息存储空间,仅对用户上传内容的表现方式做保护处理,对用户上传分享的文档内容本身不做任何修改或编辑,并不能对任何下载内容负责。
  • 6. 下载文件中如有侵权或不适当内容,请与我们联系,我们立即纠正。
  • 7. 本站不保证下载资源的准确性、安全性和完整性, 同时也不承担用户因使用这些下载资源对自己和他人造成任何形式的伤害或损失。

评论

0/150

提交评论