本篇主要讲Dubbo服务是如何注册,导出并接受服务请求。
一、启动一个服务端Provider 1. 定义一个接口和实现 1 2 3 4 5 6 7 8 public interface UserService { void say (String message) ; } public class UserServiceImpl implements UserService { public void say (String message) { System.out.println("say:" + message); } }
2. 本地服务注册到zk 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 public class Tester { @Test public void providerTest() { UserService userService = new UserServiceImpl (); ApplicationConfig app = new ApplicationConfig (); app .setName ("providerTest"); RegistryConfig registry = new RegistryConfig (); registry .setAddress ("zookeeper://127.0 .0 .1 :2181 "); ProtocolConfig protocol = new ProtocolConfig (); protocol .setName ("dubbo"); protocol .setPort (8012 ); protocol .setThreads (200 ); ServiceConfig <UserService > service = new ServiceConfig <UserService >(); service .setApplication (app); service .setRegistry (registry); service .setProtocol (protocol); service .setInterface (UserService .class); service .setRef (userService); service .setVersion ("1.0 .0 "); service .export (); } }
3. 分析原理 这里只是分析下大概原理,给各位童靴先带来带你感受,实际步骤后面分析源码时候再细说
在进行分析之前我们思考一下,当我们不使用RPC框架和SpringCloud的时候,如果我们要调用其他第三方的服务时候,我们会怎么处理呢?
通过下面这中方式每次调用时候构建一个HTTP的请求。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 public class Tester { public static void sayRequest(String message){ OkHttpClient client = new OkHttpClient (); Request request = new Request .Builder() .url("http://第三方服务的接口地址?message" +message) .get ().addHeader("Cache-Control" ,"no-cache" ) .build(); client.new Call (request).execute(); } public static void main(String []args){ sayRequest("你好" ) } }
使用后我们就可以像调用本地方法一样来调用远程接口了? 那么Dubbo是如何实现的呢? 其实就是在底层帮我们做了类似于http的通信 而通过api的方式屏蔽了底层。让我们直接将调用本地方法一样调用远程方法。
关键词一:通信协议 dubbo默认不是基于HTTP,而是基于dubbo自定义的协议。因为jdk自带的socket api不太友好,所以dubbo底层是使用netty类做通信的 说白了这个协议和http类似都是基于tcp协议从而进行封装,不同点就是数据格式不同。 如下我们自定义了一个协议来读取tcp连接中数据。
下面代码不是重点,重点知道协议就是,约定从tcp连接中读取数据的方式和方法。比如约定了读的第一个字节是协议类型,第二个是序列化类型,第三个是报文数据长度,第四个就是具体的报文数据。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 @Override public void doDecode(ChannelHandlerContext ctx, ByteBuf inByteBuf, List <Object > out) throws Exception { byte[] dataArr; if (!inByteBuf.isReadable()) { Channel channel = ctx.channel(); SocketAddress socketAddress = channel.remoteAddress(); channel.close(); System.err.println(">>>>>>>>>[" + socketAddress + "]客户端已主动断开连接...." ); return ; } int dataHeadSize = inByteBuf.readableBytes(); if (!isFullMessageHeader(dataHeadSize)) { return ; } inByteBuf.markReaderIndex(); byte protocolType = inByteBuf.readByte(); byte serializationType = inByteBuf.readByte(); int dataSize = inByteBuf.readInt(); if (!isFullMessage(inByteBuf, dataSize)) { inByteBuf.resetReaderIndex(); System.out.println(); System.err.println("######################数据不足已重置buffer######################" ); return ; } System.out.println(); System.err.println("######################数据完整######################" ); dataArr = new byte[dataSize]; inByteBuf.readBytes(dataArr, 0 , dataSize); SerializeEnum serializeEnum = SerializeEnum.ofByType(serializationType); Class<? extends Serialize> serialize = serializeEnum.getSerialize(); Serialize serializeNewInstance = serialize.newInstance(); Object deserialize = serializeNewInstance.deserialize(dataArr); out.add(deserialize); }
关键词二:封装通信 服务端
服务端将需要提供的接口实现方法封装起来
并启动一个Netty服务
同时将自己的地址注册到zk中
客户端
客户端通过将接口方法封装成URL
去请求zk,拿到真实的provider地址
具体调用时候去请求服务端的netty服务之星
二、提供服务流程 这里我们只先分析dubbo的源码,后面再说dubbo整合spring的原理。
1. 要提供服务的对象 1 2 3 4 5 6 7 8 public interface UserService { void say (String message) ; } public class UserServiceImpl implements UserService { public void say (String message) { System.out.println("say:" + message); } }
2. 创建一个应用 1 2 ApplicationConfig app = new ApplicationConfig() app.setName("providerTest" )
3. 指定注册中心 这里我们使用zookeeper作为注册中心1 2 RegistryConfig registry = new RegistryConfig() registry.setAddress("zookeeper://127.0.0.1:2181" )
4. 指定通信协议 1 2 3 4 ProtocolConfig protocol = new ProtocolConfig() protocol.setName("dubbo" ) protocol.setPort(8012 ) protocol.setThreads(200 )
5. 导出服务到zk 1 2 3 4 5 6 7 8 9 10 11 12 // 服务提供者暴露服务配置 ServiceConfig<UserService> service = new ServiceConfig<UserService>() ; // 此实例很重,封装了与注册中心的连接,请自行缓存,否则可能造成内存和连接泄漏 service.set Application(app) ; service.set Registry(registry) ; // 多个注册中心可以用set Registries() service.set Protocol(protocol) ; // 多个协议可以用set Protocols() service.set Interface(UserService.class) ; service.set Ref(userService) ; service.set Version("1.0.0") ; // 暴露及注册服务 //dubbo ://192.168.1.9 :8012 /code.UserService?anyhost=true &application=providerTest&dubbo=2.5.3&interface=code.UserService&methods=say&pid=46787&revision=1.0.0&side=provider&threads=200×tamp=1597048727957&version =1.0.0 service.export() ;
当这一步进行完后,我们会在zookeeper的控制台找到自己的服务地址。
通过url解码之后就是
dubbo://192.168.1.9:8012/code.UserService?anyhost=true&application=providerTest&dubbo=2.5.3&interface=code.UserService&methods=say&pid=46787&revision=1.0.0&side=provider&threads=200×tamp=1597048727957&version=1.0.0
三、源码分析 1. 服务注册 我们在看二流程中,可以看到前面的1234创建的步骤都是在5中使用的,说明1234其实都是数据的载体,具体如何使用是在5中来使用的。而5的对象是ServiceConfig
。所以说看源码的入口就从ServiceConfig.export()
开始。
ServiceConfig的export最终后调用doExport();
doExport方法会先检查然后在注册服务到Netty服务器和注册到zk1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 protected synchronized void doExport () { interfaceClass = Class.forName(interfaceName, true , Thread.currentThread() .getContextClassLoader()); checkInterfaceAndMethods(interfaceClass, methods); checkRef(); checkApplication(); checkRegistry(); checkProtocol(); appendProperties(this ); checkStubAndMock(interfaceClass); if (path == null || path.length() == 0 ) { path = interfaceName; } doExportUrls(); }
这一步会将服务在本地启动一个服务,同时将服务注册到注册中心中。
1 2 3 4 5 6 7 8 private void doExportUrls () { List<URL> registryURLs = loadRegistries(true ); for (ProtocolConfig protocolConfig : protocols) { doExportUrlsFor1Protocol(protocolConfig, registryURLs); } }
根据doExportUrlsFor1Protocol的源码。我们发现dubbo中的所有模型都向Invoker来靠拢。
先创建一个Socket来验证下能不能连接上注册中心
然后根据协议信息,找到实现类。端口如果指定了就用指定的,没有指定就随机生成。默认是20880
获取服务版本号,首先查找MANIFEST.MF规范中的版本号。没有就用指定的版本号
通过反射生成Invoker对象
导出Invoker启动一个Netty服务DubboProtocol.openServer
注册到zk中RegistryProtocol.export
2. Netty服务接受服务 前面注册时候,我们说了在DubboProtocol中去创建服务的。那我们直接看这部分代码。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 public class DubboProtocol extends AbstractProtocol { private ExchangeHandler requestHandler = new ExchangeHandlerAdapter() { public Object reply(ExchangeChannel channel, Object message) throws RemotingException { if (message instanceof Invocation) { Invocation inv = (Invocation) message; Invoker<?> invoker = getInvoker(channel, inv); if (Boolean.TRUE.toString().equals(inv.getAttachments().get (IS_CALLBACK_SERVICE_INVOKE))){ String methodsStr = invoker.getUrl().getParameters().get ("methods" ); boolean hasMethod = false ; if (methodsStr == null || methodsStr.indexOf("," ) == -1 ){ hasMethod = inv.getMethodName().equals(methodsStr); } else { String [] methods = methodsStr.split ("," ); for (String method : methods){ if (inv.getMethodName().equals(method)){ hasMethod = true ; break ; } } } if (!hasMethod){ logger.warn(new IllegalStateException("The methodName " +inv.getMethodName()+" not found in callback service interface ,invoke will be ignored. please update the api interface. url is:" + invoker.getUrl()) +" ,invocation is :" +inv ); return null ; } } RpcContext.getContext().setRemoteAddress(channel.getRemoteAddress()); return invoker.invoke(inv); } throw new RemotingException(channel, "Unsupported request: " + message == null ? null : (message.getClass().getName() + ": " + message) + ", channel: consumer: " + channel.getRemoteAddress() + " --> provider: " + channel.getLocalAddress()); } private void openServer(URL url) { String key = url.getAddress(); boolean isServer = url.getParameter(Constants.IS_SERVER_KEY,true ); if (isServer) { ExchangeServer server = serverMap.get (key ); if (server == null ) { serverMap.put(key , createServer(url)); } else { server.reset(url); } } } private ExchangeServer createServer(URL url) { url = url.addParameterIfAbsent(Constants.CHANNEL_READONLYEVENT_SENT_KEY, Boolean.TRUE.toString()); url = url.addParameterIfAbsent(Constants.HEARTBEAT_KEY, String .valueOf(Constants.DEFAULT_HEARTBEAT)); String str = url.getParameter(Constants.SERVER_KEY, Constants.DEFAULT_REMOTING_SERVER); if (str != null && str .length() > 0 && ! ExtensionLoader.getExtensionLoader(Transporter.class).hasExtension(str )) throw new RpcException("Unsupported server type: " + str + ", url: " + url); url = url.addParameter(Constants.CODEC_KEY, Version.isCompatibleVersion() ? COMPATIBLE_CODEC_NAME : DubboCodec.NAME); ExchangeServer server; try { server = Exchangers.bind(url, requestHandler); } catch (RemotingException e) { throw new RpcException("Fail to start server(url: " + url + ") " + e.getMessage(), e); } str = url.getParameter(Constants.CLIENT_KEY); if (str != null && str .length() > 0 ) { Set<String > supportedTypes = ExtensionLoader.getExtensionLoader(Transporter.class).getSupportedExtensions(); if (!supportedTypes.contains(str )) { throw new RpcException("Unsupported client type: " + str ); } } return server; } }
我们主要看服务端要的3个方法
ExchangeHandler逻辑处理器
创建服务openServer和createServer。
ExchangeHandler#ExchangeHandler 既然我们说了底层是Netty来实现的,那么又知道Netty是通信框架。那么我们来看下服务端的处理逻辑吧。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 class DubboProtocol{ private ExchangeHandler requestHandler = new ExchangeHandlerAdapter() { public Object reply(ExchangeChannel channel, Object message) throws RemotingException { if (message instanceof Invocation) { Invocation inv = (Invocation) message; Invoker<?> invoker = getInvoker(channel, inv); if (Boolean.TRUE.toString().equals(inv.getAttachments().get (IS_CALLBACK_SERVICE_INVOKE))){ String methodsStr = invoker.getUrl().getParameters().get ("methods" ); boolean hasMethod = false ; if (methodsStr == null || methodsStr.indexOf("," ) == -1 ){ hasMethod = inv.getMethodName().equals(methodsStr); } else { String [] methods = methodsStr.split ("," ); for (String method : methods){ if (inv.getMethodName().equals(method)){ hasMethod = true ; break ; } } } if (!hasMethod){ logger.warn(new IllegalStateException("The methodName " +inv.getMethodName()+" not found in callback service interface ,invoke will be ignored. please update the api interface. url is:" + invoker.getUrl()) +" ,invocation is :" +inv ); return null ; } } RpcContext.getContext().setRemoteAddress(channel.getRemoteAddress()); return invoker.invoke(inv); } throw new RemotingException(channel, "Unsupported request: " + message == null ? null : (message.getClass().getName() + ": " + message) + ", channel: consumer: " + channel.getRemoteAddress() + " --> provider: " + channel.getLocalAddress()); } } }
请看注释
3. 编码器和解码器 这里稍微说一点编码器,dubbo协议的编码器。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 public class NettyServer extends AbstractServer implements Server { @Override protected void doOpen() throws Throwable { NettyHelper.setNettyLoggerFactory(); ExecutorService boss = Executors.new CachedThreadPool (new NamedThreadFactory ("NettyServerBoss" , true )); ExecutorService worker = Executors.new CachedThreadPool (new NamedThreadFactory ("NettyServerWorker" , true )); ChannelFactory channelFactory = new NioServerSocketChannelFactory (boss, worker, getUrl().getPositiveParameter(Constants.IO_THREADS_KEY, Constants.DEFAULT_IO_THREADS)); bootstrap = new ServerBootstrap (channelFactory); final NettyHandler nettyHandler = new NettyHandler (getUrl(), this ); channels = nettyHandler.getChannels(); bootstrap.setPipelineFactory(new ChannelPipelineFactory () { public ChannelPipeline getPipeline() { NettyCodecAdapter adapter = new NettyCodecAdapter (getCodec() ,getUrl(), NettyServer.this ); ChannelPipeline pipeline = Channels.pipeline(); pipeline.addLast("decoder" , adapter.getDecoder()); pipeline.addLast("encoder" , adapter.getEncoder()); pipeline.addLast("handler" , nettyHandler); return pipeline; } }); channel = bootstrap.bind(getBindAddress()); } }
NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec() ,getUrl(), NettyServer.this);
主要看这个类Codec2
我们主要看服务端如何将tcp二进制数据转成dubbo里面的模型。
客户端: 数据DecodeableRpcInvocation -> 通过编码器转换成 -> 二进制数据
服务端: 二进制数据 -> 解码器 -> DecodeableRpcInvocation -> DubboProtocol#requestHandler处理
4. 序列化协议 java模型如何转二进制,就是序列化协议。我们所说的hession2协议就在这里用的。这里追求的是速度快,数据小。
Serialization s = CodecSupport.getSerialization(channel.getUrl(), proto);
四、Invoker 前面说了dubbo中的模型都想Invoker靠拢。其实说白了就是反射。
1. 生成Invoker对象 可以看到dubbo里面已经提供了,构建方法。我们先熟悉如何使用其API。然后把这些小的知识点慢慢的串起来就好了。
1 2 3 4 5 6 7 8 public class Tester { @Test public void buildInvokerTest() { JavassistProxyFactory factory = new JavassistProxyFactory(); UserService userService = new UserServiceImpl(); URL dubboUrl = URL.valueOf("test://" ); final Invoker<UserService> invoker = factory .getInvoker(userService, UserService.class , dubboUrl); }
2. 创建执行参数 Invoker是执行体,Invocation是执行参数
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 public class Tester { public void buildInvokerTest() { final Invoker<UserService> invoker = factory.getInvoker(userService, UserService.class, dubboUrl); Invocation invocation = new Invocation() { public String getMethodName() { return "say" ; } public Class<?>[] getParameterTypes() { return new Class[]{String .class}; } public Object [] getArguments() { return new Object []{"hello" }; } public Map<String , String > getAttachments() { return null ; } public String getAttachment(String key ) { return null ; } public String getAttachment(String key , String defaultValue) { return null ; } public Invoker<?> getInvoker() { return null ; } }; System.out.println (invoker.invoke(invocation)); } }
3. 构建带有过滤器的Invoker 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 @Test public void linkedInvokerTest() { JavassistProxyFactory factory = new JavassistProxyFactory(); UserService userService = new UserServiceImpl(); URL dubboUrl = URL.valueOf("test://" ); final Invoker<UserService> invoker = factory.getInvoker(userService, UserService.class, dubboUrl); List<Filter> filters = new ArrayList(); Filter filter = new Filter() { public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException { System.out.println ("-------执行过滤器-------" ); return invoker.invoke(invocation); } }; filters.add (filter ); Invoker<UserService> userServiceInvoker = buildInvokerChain(invoker, filters); userServiceInvoker.invoke(new Invocation() { public String getMethodName() { return "say" ; } public Class<?>[] getParameterTypes() { return new Class[]{String .class}; } public Object [] getArguments() { return new Object []{"hello 过滤器" }; } public Map<String , String > getAttachments() { return null ; } public String getAttachment(String key ) { return null ; } public String getAttachment(String key , String defaultValue) { return null ; } public Invoker<?> getInvoker() { return null ; } }); }
五、总结 知识点回顾
如何将对象方法生成Invoker
如何将Invoker注册到注册地中心
如何处理客户端的请求
二进制数据转java数据协议
协议中包含的序列化知识
最后求关注,求订阅,谢谢你的阅读!
下一篇会讲,dubbo如何与Spring进行整合。
分享
新浪微博
微信
QQ空间
QQ好友
豆瓣
有道云笔记
取消