Dubbo服务的发布主要完成在ZK的注册和在Netty的暴露两大步骤,本文旨在深入整个服务暴露流程,看看Dubbo到底做哪些事情,体会Dubbo扩展机制在流程中的运用。Dubbo的服务发布流程时序图如下所示(引自官网)

这里的Actor可以理解成使用方,在一般应用中就是Spring容器了。看看ServiceBean(ServiceConfig子类)的onApplicationEvent方法:

1
2
3
4
5
6
7
8
9
10
public void onApplicationEvent(ApplicationEvent event) {
if (ContextRefreshedEvent.class.getName().equals(event.getClass().getName())) {
if (isDelay() && ! isExported() && ! isUnexported()) {
if (logger.isInfoEnabled()) {
logger.info("The service ready on spring started. service: " + getInterface());
}
export();
}
}
}

onApplicationEvent是Spring扩展接口ApplicationListener的方法,这里可以看作Spring容器和Dubbo容器的衔接点,在Spring容器启动后开始调用export方法进行服务暴露。

这里比较重要的是ServiceConfig的私有属性ref,其实质就是我们需要暴露的服务实例,ref是通过Spring在实例化bean过程中注入到ServiceConfig实例中的,具体就是通过反射调用了setRef方法,这是Spring的常规注入方式,这里我们不再详谈。

接下来进入export方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public synchronized void export() {
// 略过...
if (delay != null && delay > 0) {
Thread thread = new Thread(new Runnable() {
public void run() {
try {
Thread.sleep(delay);
} catch (Throwable e) {
}
doExport();
}
});
thread.setDaemon(true);
thread.setName("DelayExportServiceThread");
thread.start();
} else {
doExport();
}
}

这里主要是判断下是否需要延迟暴露,若是则启动一个守护线程在指定延迟后暴露服务,否则主线程继续进行服务暴露。

接下来的doExport方法主要是进行一些前置条件或参数的检查以及补齐,这里不再列出。

在doExportUrls方法中,主要进行注册服务的遍历,以在所有协议上暴露服务,因为Dubbo本身支持多协议的暴露。一般情况下我们使用ZK进行服务的注册和管理。

1
2
3
4
5
6
private void doExportUrls() {
List<URL> registryURLs = loadRegistries(true);
for (ProtocolConfig protocolConfig : protocols) {
doExportUrlsFor1Protocol(protocolConfig, registryURLs);
}
}

在doExportUrlsFor1Protocol方法中,前半部分主要是组装URL,我们知道Dubbo是基于URL总线在层间进行消息传递的。最后根据scope作用域服务暴露类型的判断。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
for (URL registryURL : registryURLs) {
url = url.addParameterIfAbsent("dynamic", registryURL.getParameter("dynamic"));
URL monitorUrl = loadMonitor(registryURL);
if (monitorUrl != null) {
url = url.addParameterAndEncoded(Constants.MONITOR_KEY, monitorUrl.toFullString());
}
if (logger.isInfoEnabled()) {
logger.info("Register dubbo service " + interfaceClass.getName() + " url " + url + " to registry " + registryURL);
}
Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterAndEncoded(Constants.EXPORT_KEY, url.toFullString()));
Exporter<?> exporter = protocol.export(invoker);
exporters.add(exporter);
}

这里很重要的是Invoker实例,作为Dubbo的核心模型,其它模型都向它靠扰,或转换成它,它代表一个可执行体,可向它发起invoke调用,它有可能是一个本地的实现,也可能是一个远程的实现,也可能一个集群实现。

Invoker实例从proxyFactory获取,而proxyFactory在这里实际是个适配器,其生成逻辑参见dubbo扩展机制。源码如下所示:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
public class ProxyFactory$Adpative implements com.alibaba.dubbo.rpc.ProxyFactory {
public java.lang.Object getProxy(com.alibaba.dubbo.rpc.Invoker arg0) throws com.alibaba.dubbo.rpc.Invoker {
if (arg0 == null)
throw new IllegalArgumentException("com.alibaba.dubbo.rpc.Invoker argument == null");
if (arg0.getUrl() == null)
throw new IllegalArgumentException("com.alibaba.dubbo.rpc.Invoker argument getUrl() == null");
com.alibaba.dubbo.common.URL url = arg0.getUrl();
String extName = url.getParameter("proxy", "javassist");
if(extName == null)
throw new IllegalStateException("Fail to get extension(com.alibaba.dubbo.rpc.ProxyFactory) name from url(" + url.toString() + ") use keys([proxy])");
com.alibaba.dubbo.rpc.ProxyFactory extension = (com.alibaba.dubbo.rpc.ProxyFactory)ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.rpc.ProxyFactory.class).getExtension(extName);
return extension.getProxy(arg0);
}
public com.alibaba.dubbo.rpc.Invoker getInvoker(java.lang.Object arg0, java.lang.Class arg1, com.alibaba.dubbo.common.URL arg2) throws java.lang.Object {
if (arg2 == null)
throw new IllegalArgumentException("url == null");
com.alibaba.dubbo.common.URL url = arg2;
String extName = url.getParameter("proxy", "javassist");
if(extName == null)
throw new IllegalStateException("Fail to get extension(com.alibaba.dubbo.rpc.ProxyFactory) name from url(" + url.toString() + ") use keys([proxy])");
com.alibaba.dubbo.rpc.ProxyFactory extension = (com.alibaba.dubbo.rpc.ProxyFactory)ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.rpc.ProxyFactory.class).getExtension(extName);
return extension.getInvoker(arg0, arg1, arg2);
}
}

可以看到,在getInvoker方法中,会优先获取proxy扩展,否则默认获取javassist扩展。一般情况下,我们未主动扩展配置代理工厂的话,使用JavassistProxyFactory。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public class JavassistProxyFactory extends AbstractProxyFactory {
@SuppressWarnings("unchecked")
public <T> T getProxy(Invoker<T> invoker, Class<?>[] interfaces) {
return (T) Proxy.getProxy(interfaces).newInstance(new InvokerInvocationHandler(invoker));
}
public <T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) {
// TODO Wrapper类不能正确处理带$的类名
final Wrapper wrapper = Wrapper.getWrapper(proxy.getClass().getName().indexOf('$') < 0 ? proxy.getClass() : type);
return new AbstractProxyInvoker<T>(proxy, type, url) {
@Override
protected Object doInvoke(T proxy, String methodName,
Class<?>[] parameterTypes,
Object[] arguments) throws Throwable {
return wrapper.invokeMethod(proxy, methodName, parameterTypes, arguments);
}
};
}
}

注意到这里的入参包括ref服务实例和其接口类型,因为需要对服务进行代理封装,最终是生成一个AbstractProxyInvoker实例,其doInvoker方法成为服务调用的入口。以下是具体的封装过程:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public static Wrapper getWrapper(Class<?> c) {
while( ClassGenerator.isDynamicClass(c) ) // can not wrapper on dynamic class.
c = c.getSuperclass();
if( c == Object.class )
return OBJECT_WRAPPER;
Wrapper ret = WRAPPER_MAP.get(c);
if( ret == null )
{
ret = makeWrapper(c);
WRAPPER_MAP.put(c,ret);
}
return ret;
}

具体的makeWrapper方法时利用javassist技术动态构造Wapper类型并创建实例,源码较长这里不再列出,以下是Wapper类型的invokeMethod方法源码(注意是javasssit语法形式):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public Object invokeMethod(Object o, String n, Class[] p, Object[] v) throws java.lang.reflect.InvocationTargetException {
indi.cesc.inno.learn.dubbo.HelloService w;
try{
w = ((indi.cesc.inno.learn.dubbo.HelloService)$1);
}catch(Throwable e){
throw new IllegalArgumentException(e);
}
try{
if( "sayHello".equals( $2 ) && $3.length == 1 ) {
return ($w)w.sayHello((indi.cesc.inno.learn.dubbo.HelloRequest)$4[0]); // 真实方法调用
}
} catch(Throwable e) {
throw new java.lang.reflect.InvocationTargetException(e);
}
throw new com.alibaba.dubbo.common.bytecode.NoSuchMethodException("Not found method \""+$2+"\" in class indi.cesc.inno.learn.dubbo.HelloService.");
}

这里就将真实服务加入到整体调用链条之中,后续再将Invoker往上层传递,打通整个链条。

回到doExportUrlsFor1Protocol方法中,我们继续服务的暴露流程。protocol实例调用export方法进入后续流程。这里的protocol类型实际依旧是个适配器,export方法源码如下:

1
2
3
4
5
6
7
8
9
10
11
12
public com.alibaba.dubbo.rpc.Exporter export(com.alibaba.dubbo.rpc.Invoker arg0) throws com.alibaba.dubbo.rpc.Invoker {
if (arg0 == null)
throw new IllegalArgumentException("com.alibaba.dubbo.rpc.Invoker argument == null");
if (arg0.getUrl() == null)
throw new IllegalArgumentException("com.alibaba.dubbo.rpc.Invoker argument getUrl() == null");
com.alibaba.dubbo.common.URL url = arg0.getUrl();
String extName = ( url.getProtocol() == null ? "dubbo" : url.getProtocol() );
if(extName == null)
throw new IllegalStateException("Fail to get extension(com.alibaba.dubbo.rpc.Protocol) name from url(" + url.toString() + ") use keys([protocol])");
com.alibaba.dubbo.rpc.Protocol extension = (com.alibaba.dubbo.rpc.Protocol)ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.rpc.Protocol.class).getExtension(extName);
return extension.export(arg0);
}

注意invoker的url不是服务暴露的url,而是协议注册的url,因此url里面的协议是registry。尝试获取名为registry的Protocol扩展,但进入ExtensionLoader后被拦截,实际拿到了其封装类ProtocolFilterWrapper,其负责组装过滤器链。类似的,后续将拿到ProtocolListenerWrapper实例,负责组装监听器链。通过分析代码发现,封装器实际上过滤到了注册协议,并没有执行过滤器或监听器链条的组装。

1
2
3
4
5
6
public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
if (Constants.REGISTRY_PROTOCOL.equals(invoker.getUrl().getProtocol())) {
return protocol.export(invoker);
}
return protocol.export(buildInvokerChain(invoker, Constants.SERVICE_FILTER_KEY, Constants.PROVIDER));
}

我们将直接进入进入RegistryProtocol的export方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
public <T> Exporter<T> export(final Invoker<T> originInvoker) throws RpcException {
//export invoker
final ExporterChangeableWrapper<T> exporter = doLocalExport(originInvoker);
//registry provider
final Registry registry = getRegistry(originInvoker);
final URL registedProviderUrl = getRegistedProviderUrl(originInvoker);
registry.register(registedProviderUrl);
// 订阅override数据
// FIXME 提供者订阅时,会影响同一JVM即暴露服务,又引用同一服务的的场景,因为subscribed以服务名为缓存的key,导致订阅信息覆盖。
final URL overrideSubscribeUrl = getSubscribedOverrideUrl(registedProviderUrl);
final OverrideListener overrideSubscribeListener = new OverrideListener(overrideSubscribeUrl);
overrideListeners.put(overrideSubscribeUrl, overrideSubscribeListener);
registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener);
//保证每次export都返回一个新的exporter实例
return new Exporter<T>() {
public Invoker<T> getInvoker() {
return exporter.getInvoker();
}
public void unexport() {
try {
exporter.unexport();
} catch (Throwable t) {
logger.warn(t.getMessage(), t);
}
try {
registry.unregister(registedProviderUrl);
} catch (Throwable t) {
logger.warn(t.getMessage(), t);
}
try {
overrideListeners.remove(overrideSubscribeUrl);
registry.unsubscribe(overrideSubscribeUrl, overrideSubscribeListener);
} catch (Throwable t) {
logger.warn(t.getMessage(), t);
}
}
};
}

doLocalExport方法将进行在netty暴露并监听服务的后续逻辑,而之后的代码主要是在ZK进行服务的注册,并注册一个监听器OverrideListener,用于在注册路径变更时重新注册服务。ZK注册的逻辑我们不再深入,这里继续进入doLocalExport方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
private <T> ExporterChangeableWrapper<T> doLocalExport(final Invoker<T> originInvoker){
String key = getCacheKey(originInvoker);
ExporterChangeableWrapper<T> exporter = (ExporterChangeableWrapper<T>) bounds.get(key);
if (exporter == null) {
synchronized (bounds) {
exporter = (ExporterChangeableWrapper<T>) bounds.get(key);
if (exporter == null) {
final Invoker<?> invokerDelegete = new InvokerDelegete<T>(originInvoker, getProviderUrl(originInvoker));
exporter = new ExporterChangeableWrapper<T>((Exporter<T>)protocol.export(invokerDelegete), originInvoker);
bounds.put(key, exporter);
}
}
}
return (ExporterChangeableWrapper<T>) exporter;
}

这里将再次调用protocol适配器,不过和之前的差异在于入参invoker的url是Dubbo服务暴露的url,其协议名为dubbo,因此我们最终将调用DubboProtocol。不过,在此之前,需要先通过封装器ProtocolFilterWrapper和ProtocolListenerWrapper构造过滤器链和监听器链。这块内容参见dubbo过滤器链

之后,我们进入DubboProtocol的export方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
URL url = invoker.getUrl();
// export service.
String key = serviceKey(url);
DubboExporter<T> exporter = new DubboExporter<T>(invoker, key, exporterMap);
exporterMap.put(key, exporter);
//export an stub service for dispaching event
Boolean isStubSupportEvent = url.getParameter(Constants.STUB_EVENT_KEY,Constants.DEFAULT_STUB_EVENT);
Boolean isCallbackservice = url.getParameter(Constants.IS_CALLBACK_SERVICE, false);
if (isStubSupportEvent && !isCallbackservice){
String stubServiceMethods = url.getParameter(Constants.STUB_EVENT_METHODS_KEY);
if (stubServiceMethods == null || stubServiceMethods.length() == 0 ){
if (logger.isWarnEnabled()){
logger.warn(new IllegalStateException("consumer [" +url.getParameter(Constants.INTERFACE_KEY) +
"], has set stubproxy support event ,but no stub methods founded."));
}
} else {
stubServiceMethodsMap.put(url.getServiceKey(), stubServiceMethods);
}
}
openServer(url);
return exporter;
}

这里主要处理stub服务,stub的原理参见dubbo官方文档。这里比较迷惑的是stubServiceMethodsMap是一个私有属性,在DubboProtocol中就不存在对其的读操作。鉴于不是一个重要功能,我们不再纠结于此。随后进入openServer方法,需要创建一个ExchangeServer。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
private ExchangeServer createServer(URL url) {
//默认开启server关闭时发送readonly事件
url = url.addParameterIfAbsent(Constants.CHANNEL_READONLYEVENT_SENT_KEY, Boolean.TRUE.toString());
//默认开启heartbeat
url = url.addParameterIfAbsent(Constants.HEARTBEAT_KEY, String.valueOf(Constants.DEFAULT_HEARTBEAT));
String str = url.getParameter(Constants.SERVER_KEY, Constants.DEFAULT_REMOTING_SERVER);
if (str != null && str.length() > 0 && ! ExtensionLoader.getExtensionLoader(Transporter.class).hasExtension(str))
throw new RpcException("Unsupported server type: " + str + ", url: " + url);
url = url.addParameter(Constants.CODEC_KEY, Version.isCompatibleVersion() ? COMPATIBLE_CODEC_NAME : DubboCodec.NAME);
ExchangeServer server;
try {
server = Exchangers.bind(url, requestHandler);
} catch (RemotingException e) {
throw new RpcException("Fail to start server(url: " + url + ") " + e.getMessage(), e);
}
str = url.getParameter(Constants.CLIENT_KEY);
if (str != null && str.length() > 0) {
Set<String> supportedTypes = ExtensionLoader.getExtensionLoader(Transporter.class).getSupportedExtensions();
if (!supportedTypes.contains(str)) {
throw new RpcException("Unsupported client type: " + str);
}
}
return server;
}

这里补充了一些url参数,然后创建一个ExchangeServer。注意参数中有个ExchangeHandler,其主要实现了一个reply方法,作为服务调用中的一处响应方法,其内部获取到invoker实例,触发调用invoke方法。这里实际上就是将Invoker封装转换成了ExchangeHandler,进行后续的传递。接下来进入Exchangers的bind方法,通过ExtensionLoader获取到HeaderExchanger,执行器bind方法:

1
2
3
public ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException {
return new HeaderExchangeServer(Transporters.bind(url, new DecodeHandler(new HeaderExchangeHandler(handler))));
}

这里又进行了两次包装,分别包装成HeaderExchangeHandler和DecodeHandler。前者名为头交换,但实际代码中并未有相关操作,感觉名不副实;后者主要是处理消息的解码。我们继续进入Transporters的bind方法,类似的,将获取一个适配器类型的Transporter扩展,其bind方法源码如下:

1
2
3
4
5
6
7
8
9
public com.alibaba.dubbo.remoting.Server bind(com.alibaba.dubbo.common.URL arg0, com.alibaba.dubbo.remoting.ChannelHandler arg1) throws com.alibaba.dubbo.common.URL {
if (arg0 == null) throw new IllegalArgumentException("url == null");
com.alibaba.dubbo.common.URL url = arg0;
String extName = url.getParameter("server", url.getParameter("transporter", "netty"));
if(extName == null)
throw new IllegalStateException("Fail to get extension(com.alibaba.dubbo.remoting.Transporter) name from url(" + url.toString() + ") use keys([server, transporter])");
com.alibaba.dubbo.remoting.Transporter extension = (com.alibaba.dubbo.remoting.Transporter)ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.remoting.Transporter.class).getExtension(extName);
return extension.bind(arg0, arg1);
}

这里扩展名获取的优先级是server、transporter、netty。默认情况下,我们会获取到nettyNettyTransporter,netty也是Dubbo默认的网络传输框架。

1
2
3
public Server bind(URL url, ChannelHandler listener) throws RemotingException {
return new NettyServer(url, listener);
}

在NettyTransporter中将新建一个NettyServer:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
public NettyServer(URL url, ChannelHandler handler) throws RemotingException{
super(url, ChannelHandlers.wrap(handler, ExecutorUtil.setThreadName(url, SERVER_THREAD_POOL_NAME)));
}
public AbstractServer(URL url, ChannelHandler handler) throws RemotingException {
super(url, handler);
localAddress = getUrl().toInetSocketAddress();
String host = url.getParameter(Constants.ANYHOST_KEY, false)
|| NetUtils.isInvalidLocalHost(getUrl().getHost())
? NetUtils.ANYHOST : getUrl().getHost();
bindAddress = new InetSocketAddress(host, getUrl().getPort());
this.accepts = url.getParameter(Constants.ACCEPTS_KEY, Constants.DEFAULT_ACCEPTS);
this.idleTimeout = url.getParameter(Constants.IDLE_TIMEOUT_KEY, Constants.DEFAULT_IDLE_TIMEOUT);
try {
doOpen();
if (logger.isInfoEnabled()) {
logger.info("Start " + getClass().getSimpleName() + " bind " + getBindAddress() + ", export " + getLocalAddress());
}
} catch (Throwable t) {
throw new RemotingException(url.toInetSocketAddress(), null, "Failed to bind " + getClass().getSimpleName()
+ " on " + getLocalAddress() + ", cause: " + t.getMessage(), t);
}
if (handler instanceof WrappedChannelHandler ){
executor = ((WrappedChannelHandler)handler).getExecutor();
}
}

构造好连接相关参数后,将调用doOpen方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
protected void doOpen() throws Throwable {
NettyHelper.setNettyLoggerFactory();
ExecutorService boss = Executors.newCachedThreadPool(new NamedThreadFactory("NettyServerBoss", true));
ExecutorService worker = Executors.newCachedThreadPool(new NamedThreadFactory("NettyServerWorker", true));
ChannelFactory channelFactory = new NioServerSocketChannelFactory(boss, worker, getUrl().getPositiveParameter(Constants.IO_THREADS_KEY, Constants.DEFAULT_IO_THREADS));
bootstrap = new ServerBootstrap(channelFactory);
final NettyHandler nettyHandler = new NettyHandler(getUrl(), this);
channels = nettyHandler.getChannels();
// https://issues.jboss.org/browse/NETTY-365
// https://issues.jboss.org/browse/NETTY-379
// final Timer timer = new HashedWheelTimer(new NamedThreadFactory("NettyIdleTimer", true));
bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
public ChannelPipeline getPipeline() {
NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec() ,getUrl(), NettyServer.this);
ChannelPipeline pipeline = Channels.pipeline();
/*int idleTimeout = getIdleTimeout();
if (idleTimeout > 10000) {
pipeline.addLast("timer", new IdleStateHandler(timer, idleTimeout / 1000, 0, 0));
}*/
pipeline.addLast("decoder", adapter.getDecoder());
pipeline.addLast("encoder", adapter.getEncoder());
pipeline.addLast("handler", nettyHandler);
return pipeline;
}
});
// bind
channel = bootstrap.bind(getBindAddress());
}

这里终于到达netty的操作层面,主要是构建ServerBootstrap并启动netty服务。可以看到,我们的ExchangeHandler经过层层封装和传递后最终作为参数传递给了ChannelPipeline,其将作为一个消息接受的监听服务存在。

后记

本文以类似记流水账的形式梳理了Dubbo服务注册和暴露的整体流程,讲的并不深入,但对于理清整个工作流程是有帮助的。后面的文章我们将继续Dubbo服务端的解析,看下Dubbo服务接收、处理、回复请求的整体流程。