




版权说明:本文档由用户提供并上传,收益归属内容提供方,若内容存在侵权,请进行举报或认领
文档简介
第java实现简易版简易版dubbo@Override
publicvoidpostProcessBeanFactory(ConfigurableListableBeanFactorybeanFactory)throwsBeansException{
privateInterfaceConfigtransform(Serviceref){
InterfaceConfiginterfaceConfig=newInterfaceConfig();
interfaceConfig.setGroup(ref.group());
interfaceConfig.setVersion(ref.version());
interfaceConfig.setTimeout(ref.timeout());
returninterfaceConfig;
@Override
publicvoidsetBeanClassLoader(ClassLoaderclassLoader){
this.classLoader=classLoader;
4.2.2解析注解参数,注册服务到zookeeper
如上,我们在ServiceBean中,依赖标记有@Service的bean,同时将其曝光到本地,开启netty端口监听,注册到zk,具体如下,见afterPropertiesSet方法:
packagecessor;
importcom.jessin.practice.dubbo.config.InterfaceConfig;
importcom.jessin.practice.dubbo.config.MiniDubboProperties;
importcom.jessin.practice.dubbo.exporter.DubboExporter;
importty.NettyManager;
importty.NettyServer;
importcom.jessin.practice.dubbo.registry.CuratorZookeeperClient;
importcom.jessin.practice.dubbo.registry.RegistryManager;
importcom.jessin.practice.dubbo.utils.NetUtils;
importlombok.extern.slf4j.Slf4j;
importorg.springframework.beans.factory.DisposableBean;
importorg.springframework.beans.factory.InitializingBean;
*@Author:jessin
*@Date:19-11-27下午10:31
@Slf4j
publicclassServiceBeanimplementsInitializingBean,DisposableBean{
privateNettyServernettyServer;
*zk地址
privateCuratorZookeeperClientcuratorZookeeperClient;
privateObjectref;
privateMiniDubboPropertiesminiDubboProperties;
privateInterfaceConfiginterfaceConfig;
publicMiniDubboPropertiesgetMiniDubboProperties(){
returnminiDubboProperties;
publicvoidsetMiniDubboProperties(MiniDubboPropertiesminiDubboProperties){
this.miniDubboProperties=miniDubboProperties;
publicInterfaceConfiggetInterfaceConfig(){
returninterfaceConfig;
publicvoidsetInterfaceConfig(InterfaceConfiginterfaceConfig){
erfaceConfig=interfaceConfig;
publicObjectgetRef(){
returnref;
publicvoidsetRef(Objectref){
this.ref=ref;
@Override
publicvoidafterPropertiesSet()throwsException{
Class[]interfaces=ref.getClass().getInterfaces();
if(interfaces.length=0){
thrownewIllegalStateException(ref.getClass().getName()+"未实现接口");
//todo目前只能实现一个接口
StringclazzName=interfaces[0].getName();
("曝光key:{},ref:{}",clazzName,ref);
//暴露服务todo版本
DubboExporter.exportService(clazzName,ref);
//先开启,再注册
//判断协议
if("dubbo".equals(miniDubboProperties.getProtocol())){
//开启nettyserver
nettyServer=NettyManager.getNettyServer(miniDubboProperties.getPort());
}else{
thrownewRuntimeException("unknowncommunicateprotocol:"+miniDubboProperties.getProtocol());
//判断什么类型的注册中心
curatorZookeeperClient=RegistryManager.getCuratorZookeeperClient(miniDubboProperties.getRegistry());
StringproviderPath="/miniDubbo/"+interfaceConfig.getGroup()+"/"+clazzName+"/providers"+"/"+NetUtils.getServerIp()+":"+miniDubboProperties.getPort();
//注册zk,提炼register方法
curatorZookeeperClient.create(providerPath,true);
@Override
publicvoiddestroy()throwsException{
curatorZookeeperClient.doClose();
nettyServer.close();
4.2.3开启nettyserver,接收请求
在接受到consumer请求后,解码,然后根据类名、方法名,找到对应的曝光服务,进行反射调用,将方法返回结果和请求id原样写出去,返回给客户端。具体如下:
packagety;
importcom.alibaba.fastjson.JSON;
importcom.alibaba.fastjson.JSONObject;
importcom.jessin.practice.dubbo.exception.DubboException;
importcom.jessin.practice.dubbo.exporter.DubboExporter;
importcom.jessin.practice.dubbo.invoker.RpcInvocation;
importcom.jessin.practice.dubbo.transport.Request;
importcom.jessin.practice.dubbo.transport.Response;
importcom.jessin.practice.dubbo.utils.ArgDeserializerUtils;
importty.bootstrap.ServerBootstrap;
importty.channel.ChannelDuplexHandler;
importty.channel.ChannelFuture;
importty.channel.ChannelHandler;
importty.channel.ChannelHandlerContext;
importty.channel.ChannelInitializer;
importty.channel.ChannelOption;
importty.channel.ChannelPromise;
importty.channel.EventLoopGroup;
importty.channel.nio.NioEventLoopGroup;
importty.channel.socket.SocketChannel;
importty.channel.socket.nio.NioServerSocketChannel;
importjava.lang.reflect.Method;
importlombok.extern.slf4j.Slf4j;
*@Author:jessin
*@Date:19-11-27下午7:40
@Slf4j
publicclassNettyServer{
//todo底层会启动2*cpu个数的NioEventLoop,轮询注册到对应的NioEventLoop运行
privateEventLoopGroupboss=newNioEventLoopGroup();
privateEventLoopGroupworker=newNioEventLoopGroup();
//全局复用,是否需要考虑可共享?
privateServerHandlerserverHandler=newServerHandler();
privateintport;
publicNettyServer(intport){
ServerBootstrapserverBootstrap=newServerBootstrap();
//boss线程池用于accept到达的请求,worker线程池对到达的请求进行读写
//child表示对到达的请求起作用,没有child表示对ServerSocketChannel起作用
//服务端用NioServerSocketChannel
ChannelFuturechannelFuture;
this.port=port;
try{
serverBootstrap.group(boss,worker)
.channel(NioServerSocketChannel.class)
.childOption(ChannelOption.SO_KEEPALIVE,true)
//todooption最终设置到jdkseverchannel上
.option(ChannelOption.SO_BACKLOG,1024)
.childHandler(newChannelInitializerSocketChannel(){
@Override
protectedvoidinitChannel(SocketChannelch)throwsException{
//对到达的请求进行读写操作,责任链模式,ChannelPipeline
ch.pipeline()
.addLast(newBaseDecoder())
.addLast(newBaseEncoder())
.addLast(serverHandler);
//todobind时,会新建NioServerSocketChannel,并注册到NioEventLoop.selector中
//todo底层转换为pipeline.bind(),最终调用serverSocketChannel.bind(socketAddress,128);
channelFuture=serverBootstrap.bind(port);
//下面会阻塞
channelFuture.sync();
("服务器绑定端口:{}成功",port);
//TODO关闭时调用,客户端也得关闭
//channelFuture.channel().closeFuture().sync();
}catch(Exceptione){
thrownewRuntimeException("bindporterror:"+port,e);
*允许注册到多个客户端SocketChannel中
@ChannelHandler.Sharable
classServerHandlerextendsChannelDuplexHandler{
@Override
publicvoidchannelActive(ChannelHandlerContextctx)throwsException{
("客户端:{}和服务端建立连接成功",ctx.channel().remoteAddress());
@Override
publicvoidchannelRead(ChannelHandlerContextctx,Objectmsg)throwsException{
//这里是String类型,已经解码了
Requestrequest=JSONObject.parseObject((String)msg,Request.class);
("收到请求消息:{}",msg);
RpcInvocationrpcInvocation=request.getRpcInvocation();
Objectobj=DubboExporter.getService(rpcInvocation);
if(obj==null){
thrownewIllegalStateException("服务端未曝光接口:"+request);
Responseresponse=newResponse();
response.setId(request.getId());
try{
("开始反射调用:{}",msg);
//todo这里最好用线程池实现,不然会阻塞NioEventLoop
Methodmethod=obj.getClass().getMethod(rpcInvocation.getMethodName(),rpcInvocation.getParameterType());
Object[]originArgs=ArgDeserializerUtils.parseArgs(method,rpcInvocation.getParameterType(),rpcInvocation.getArgs());
("入参:{}",originArgs);
ObjectresponseData=method.invoke(obj,originArgs);
response.setResult(responseData);
("调用实例:{},方法:{},返回结果:{}",
obj,method,response);
}catch(Exceptione){
log.error("调用dubbo异常:{}",rpcInvocation,e);
response.setException(true);
response.setResult(newDubboException("服务端调用接口异常",e));
//TODO通过原来客户端通道发送出去
//这里会走编码吗?,必须写成String,或者改下Encoder
ctx.writeAndFlush(JSON.toJSONString(response));
@Override
publicvoidchannelInactive(ChannelHandlerContextctx)throwsException{
("收到客户端退出的消息");
ctx.close();
@Override
publicvoidexceptionCaught(ChannelHandlerContextctx,Throwablecause)throwsException{
log.error("IO出错了...",cause);
@Override
publicvoidwrite(ChannelHandlerContextctx,Objectmsg,ChannelPromisepromise)throwsException{
("发起写请求:{}",msg);
//TODO写的一般都有这个?
super.write(ctx,msg,promise);
*dubboshutdownhook
publicvoidclose(){
//TODO这里是否有问题??
("关闭端口:{}",port);
boss.shutdownGracefully();
worker.shutdownGracefully();
4.3自动化配置实现
这里根据yaml中配置的开关,自动开启consumer/provider配置,需要注意的是,由于注入的@Service处理器是容器级别的后处理器,需要使用静态方法进行注入,避免过早初始化自动配置类,而且不能@autowirte自动化属性,需要通过方法获取Environment,因为这个时候自动化属性类还没有对应的后处理器对其进行处理,拿到的属性是空的,需要自己做bind。
最后在Resource目录下,META-INF/spring.factories下,配置自动启动即可:
packagecom.jessin.practice.dubbo.config;
importcessor.ReferenceBeanPostProcessor;
importcessor.Service;
importcessor.ServiceBeanPostProcessor;
importlombok.extern.slf4j.Slf4j;
importorg.springframework.boot.autoconfigure.condition.AnyNestedCondition;
importorg.springframework.boot.autoconfigure.condition.ConditionalOnClass;
importorg.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
importorg.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
importperties.EnableConfigurationProperties;
importperties.bind.Bindable;
importperties.bind.Binder;
importorg.springframework.context.annotation.Bean;
importorg.springframework.context.annotation.Conditional;
importorg.springframework.context.annotation.Configuration;
importorg.springframework.core.env.Environment;
*todo自己调用自己,以及url支持,资源销毁
*@Author:jessin
*@Date:2025/10/269:27PM
@Configuration
@ConditionalOnClass(Service.class)
@EnableConfigurationProperties(MiniDubboProperties.class)
@Slf4j
publicclassMiniDubboAutoConfiguration{
static{
FastJsonConfig.config();
*由于BeanFactoryPostProcessor是提前获取的,这个时候CommonAnnotationBeanPostProcessor还没注册到beanFactory中,
*serviceBeanPostProcessor注入的属性为空
//@Autowired
//privateMiniDubboPropertiesminiDubboProperties;
//publicMiniDubboAutoConfiguration(){
//("initMiniDubboAutoConfiguration");
*由于这个Bean是BeanFactoryPostProcessor,提前获取时,
*ConfigurationProperties的ConfigurationPropertiesBindingPostProcessor还没注入到beanFactory中,
*所以MiniDubboProperties属性没法注入
*这里通过environment构造
*@paramenvironment
*@return
@Bean
@Conditional(ServerCondition.class)
@ConditionalOnMissingBean
publicstaticServiceBeanPostProcessorserviceBeanPostProcessor(Environmentenvironment){
MiniDubboPropertiesminiDubboProperties=getMiniDubboProperties(environment);
returnnewServiceBeanPostProcessor(miniDubboProperties);
staticclassServerConditionextendsAnyN
温馨提示
- 1. 本站所有资源如无特殊说明,都需要本地电脑安装OFFICE2007和PDF阅读器。图纸软件为CAD,CAXA,PROE,UG,SolidWorks等.压缩文件请下载最新的WinRAR软件解压。
- 2. 本站的文档不包含任何第三方提供的附件图纸等,如果需要附件,请联系上传者。文件的所有权益归上传用户所有。
- 3. 本站RAR压缩包中若带图纸,网页内容里面会有图纸预览,若没有图纸预览就没有图纸。
- 4. 未经权益所有人同意不得将文件中的内容挪作商业或盈利用途。
- 5. 人人文库网仅提供信息存储空间,仅对用户上传内容的表现方式做保护处理,对用户上传分享的文档内容本身不做任何修改或编辑,并不能对任何下载内容负责。
- 6. 下载文件中如有侵权或不适当内容,请与我们联系,我们立即纠正。
- 7. 本站不保证下载资源的准确性、安全性和完整性, 同时也不承担用户因使用这些下载资源对自己和他人造成任何形式的伤害或损失。
最新文档
- 2025年广播媒体融合中的新媒体平台内容制作与分发策略报告
- 2025年酒酿制行业深度研究报告
- 2025年装配模项目可行性研究报告
- 2025年花园用石材行业深度研究报告
- 2025年中级会计实务考试关键注意事项及试题
- 2025年社交媒体平台文化内容传播的社交媒体内容创新与效果评估
- 企业财务绩效指标的试题及答案
- 锻炼思维的市政试题及答案汇编
- 财务管理学生必读试题及答案
- 2025年财务管理考试真题回顾及试题与答案
- 合伙经营煤炭协议书
- 医生入职考试试题及答案
- 学校食堂安全风险管控清单
- 安徽省C20教育联盟2025年九年级中考“功夫”卷(一)数学(原卷版+解析版)
- 家校社协同育人促进学生核心素养发展的实践研究范文
- 磷矿反浮选操作规程
- 中华人民共和国医疗器械注册申请表
- 医院胸痛中心应知应会
- 1000道二年级数学口算练习题
- 收到个税手续费返还奖励给办税人员的文件-财税实操
- 《晨会的重要性》课件
评论
0/150
提交评论