Dubbo服务暴露 Dubbo服务暴露的入口是ServiceBean
。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 public class ServiceBean <T > extends ServiceConfig <T > implements InitializingBean , DisposableBean , ApplicationContextAware , ApplicationListener <ContextRefreshedEvent >, BeanNameAware , ApplicationEventPublisherAware { public void afterPropertiesSet () { ... if (!isDelay()) { export(); } } @Override public void onApplicationEvent (ContextRefreshedEvent event) { if (isDelay() && !isExported() && !isUnexported()) { if (logger.isInfoEnabled()) { logger.info("The service ready on spring started. service: " + getInterface()); } export(); } } @Override public void export () { super .export(); publishExportEvent(); } }
ServiceBean
的启动入口有2个,最终都是调用父类ServiceConfig
的export
方法。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 public class ServiceConfig <T > extends AbstractServiceConfig { public synchronized void export () { ... if (delay != null && delay > 0 ) { delayExportExecutor.schedule(new Runnable() { @Override public void run () { doExport(); } }, delay, TimeUnit.MILLISECONDS); } else { doExport(); } } protected synchronized void doExport () { ... doExportUrls(); } private void doExportUrls () { List<URL> registryURLs = loadRegistries(true ); for (ProtocolConfig protocolConfig : protocols) { doExportUrlsFor1Protocol(protocolConfig, registryURLs); } } private void doExportUrlsFor1Protocol (ProtocolConfig protocolConfig, List<URL> registryURLs) { Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterAndEncoded(Constants.EXPORT_KEY, url.toFullString())); DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this ); Exporter<?> exporter = protocol.export(wrapperInvoker); exporters.add(exporter); } }
这里主要做了2件事情:
proxyFactory.getInvoker
,传入ref
、interfaceClass
、url
对象,返回Invoker
,其中ref
即具体的Service实例。Invoker
对下层分装了具体调用服务的细节。
protocol.export
,这里根据url实际调用的是RegistryProtocol
。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 public class RegistryProtocol implements Protocol { @Override public <T> Exporter<T> export (final Invoker<T> originInvoker) throws RpcException { final ExporterChangeableWrapper<T> exporter = doLocalExport(originInvoker); URL registryUrl = getRegistryUrl(originInvoker); final Registry registry = getRegistry(originInvoker); final URL registeredProviderUrl = getRegisteredProviderUrl(originInvoker); boolean register = registeredProviderUrl.getParameter("register" , true ); ProviderConsumerRegTable.registerProvider(originInvoker, registryUrl, registeredProviderUrl); if (register) { register(registryUrl, registeredProviderUrl); ProviderConsumerRegTable.getProviderWrapper(originInvoker).setReg(true ); } final URL overrideSubscribeUrl = getSubscribedOverrideUrl(registeredProviderUrl); final OverrideListener overrideSubscribeListener = new OverrideListener(overrideSubscribeUrl, originInvoker); overrideListeners.put(overrideSubscribeUrl, overrideSubscribeListener); registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener); return new DestroyableExporter<T>(exporter, originInvoker, overrideSubscribeUrl, registeredProviderUrl); } public void register (URL registryUrl, URL registedProviderUrl) { Registry registry = registryFactory.getRegistry(registryUrl); registry.register(registedProviderUrl); } }
RegistryProtocol
主要做的几件事情:
doLocalExport()
暴露本地服务,一般是调用的DubboProtocol
的export
方法,DubboProtocol
默认通过Netty
开启端口。这里不做详细展开。
register()
方法向注册中心注册服务。一般我们使用zookeepr注册中心。
registry.subscribe
向注册中心进行订阅 override 数据,这个以后再分析。
我们重点看ZookeeperRegistry
的register
方法。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 public class ZookeeperRegistry extends FailbackRegistry { public ZookeeperRegistry (URL url, ZookeeperTransporter zookeeperTransporter) { super (url); ... zkClient = zookeeperTransporter.connect(url); zkClient.addStateListener(new StateListener() { @Override public void stateChanged (int state) { if (state == RECONNECTED) { try { recover(); } catch (Exception e) { logger.error(e.getMessage(), e); } } } }); } @Override protected void doRegister (URL url) { try { zkClient.create(toUrlPath(url), url.getParameter(Constants.DYNAMIC_KEY, true )); } catch (Throwable e) { throw new RpcException("Failed to register " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e); } } }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 public abstract class FailbackRegistry extends AbstractRegistry { public FailbackRegistry (URL url) { super (url); this .retryPeriod = url.getParameter(Constants.REGISTRY_RETRY_PERIOD_KEY, Constants.DEFAULT_REGISTRY_RETRY_PERIOD); this .retryFuture = retryExecutor.scheduleWithFixedDelay(new Runnable() { @Override public void run () { try { retry(); } catch (Throwable t) { logger.error("Unexpected error occur at failed retry, cause: " + t.getMessage(), t); } } }, retryPeriod, retryPeriod, TimeUnit.MILLISECONDS); } @Override public void register (URL url) { super .register(url); failedRegistered.remove(url); failedUnregistered.remove(url); try { doRegister(url); } catch (Exception e) { Throwable t = e; boolean check = getUrl().getParameter(Constants.CHECK_KEY, true ) && url.getParameter(Constants.CHECK_KEY, true ) && !Constants.CONSUMER_PROTOCOL.equals(url.getProtocol()); boolean skipFailback = t instanceof SkipFailbackWrapperException; if (check || skipFailback) { if (skipFailback) { t = t.getCause(); } throw new IllegalStateException("Failed to register " + url + " to registry " + getUrl().getAddress() + ", cause: " + t.getMessage(), t); } else { logger.error("Failed to register " + url + ", waiting for retry, cause: " + t.getMessage(), t); } failedRegistered.add(url); } } }
ZookeeperRegistry
继承了FailbackRegistry
,FailbackRegistry
顾名思义,实现了failback策略,当请求失败时,会进行重试。构造函数里初始化了一个ScheduledExecutorService
,定时的调用retry()
。最后调用子类的doRegister()
来实现注册。
ZookeeperRegistry
通过zookeeperTransporter.connect()
初始化了ZookeeperClient
对象,在doRegister()
方法中调用zkClient.create()
来创建节点,默认创建临时节点。
1 2 3 4 5 6 7 8 9 10 public class CuratorZookeeperTransporter implements ZookeeperTransporter { @Override public ZookeeperClient connect (URL url) { return new CuratorZookeeperClient(url); } }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 public class CuratorZookeeperClient extends AbstractZookeeperClient <CuratorWatcher > { private final CuratorFramework client; public CuratorZookeeperClient (URL url) { super (url); try { CuratorFrameworkFactory.Builder builder = CuratorFrameworkFactory.builder() .connectString(url.getBackupAddress()) .retryPolicy(new RetryNTimes(1 , 1000 )) .connectionTimeoutMs(5000 ); String authority = url.getAuthority(); if (authority != null && authority.length() > 0 ) { builder = builder.authorization("digest" , authority.getBytes()); } client = builder.build(); client.getConnectionStateListenable().addListener(new ConnectionStateListener() { @Override public void stateChanged (CuratorFramework client, ConnectionState state) { if (state == ConnectionState.LOST) { CuratorZookeeperClient.this .stateChanged(StateListener.DISCONNECTED); } else if (state == ConnectionState.CONNECTED) { CuratorZookeeperClient.this .stateChanged(StateListener.CONNECTED); } else if (state == ConnectionState.RECONNECTED) { CuratorZookeeperClient.this .stateChanged(StateListener.RECONNECTED); } } }); client.start(); } catch (Exception e) { throw new IllegalStateException(e.getMessage(), e); } } @Override public void createPersistent (String path) { try { client.create().forPath(path); } catch (NodeExistsException e) { } catch (Exception e) { throw new IllegalStateException(e.getMessage(), e); } } @Override public void createEphemeral (String path) { try { client.create().withMode(CreateMode.EPHEMERAL).forPath(path); } catch (NodeExistsException e) { } catch (Exception e) { throw new IllegalStateException(e.getMessage(), e); } } }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 public abstract class AbstractZookeeperClient <TargetChildListener > implements ZookeeperClient { @Override public void create (String path, boolean ephemeral) { if (!ephemeral) { if (checkExists(path)) { return ; } } int i = path.lastIndexOf('/' ); if (i > 0 ) { create(path.substring(0 , i), false ); } if (ephemeral) { createEphemeral(path); } else { createPersistent(path); } } }
可以看到最终是通过CuratorFramework
和zookeeper进行通信的。
总结 最后整个服务暴露流程可以总结如下:
1 2 3 4 5 6 7 ServiceBean#export ->ServiceConfig#export ->RegistryProtocol#export ->DubboProtocol#export //暴露Dubbo服务 ->CuratorZookeeperClient#doRegister //注册中心 ->ZookeeperClient#create ->CuratorFramework#create