Dubbo服务暴露

Dubbo服务暴露的入口是ServiceBean

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
public class ServiceBean<T> extends ServiceConfig<T> implements InitializingBean, DisposableBean,
ApplicationContextAware, ApplicationListener<ContextRefreshedEvent>, BeanNameAware,
ApplicationEventPublisherAware{

public void afterPropertiesSet(){

...

if (!isDelay()) {
//判断是否延迟暴露
export();
}
}

@Override
public void onApplicationEvent(ContextRefreshedEvent event) {
if (isDelay() && !isExported() && !isUnexported()) {
if (logger.isInfoEnabled()) {
logger.info("The service ready on spring started. service: " + getInterface());
}
export();
}
}

@Override
public void export() {
super.export();
// Publish ServiceBeanExportedEvent
publishExportEvent();
}
}

ServiceBean的启动入口有2个,最终都是调用父类ServiceConfigexport方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
public class ServiceConfig<T> extends AbstractServiceConfig {
public synchronized void export() {

...
//延迟暴露或者立即暴露
if (delay != null && delay > 0) {
delayExportExecutor.schedule(new Runnable() {
@Override
public void run() {
doExport();
}
}, delay, TimeUnit.MILLISECONDS);
} else {
doExport();
}
}

protected synchronized void doExport() {
...
doExportUrls();
}

private void doExportUrls() {
List<URL> registryURLs = loadRegistries(true);
//循环处理多个协议
for (ProtocolConfig protocolConfig : protocols) {
doExportUrlsFor1Protocol(protocolConfig, registryURLs);
}
}

private void doExportUrlsFor1Protocol(ProtocolConfig protocolConfig, List<URL> registryURLs) {
//doExportUrlsFor1Protocol里的代码很多,这里只展示关键的几行代码

Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterAndEncoded(Constants.EXPORT_KEY, url.toFullString()));
DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this);

Exporter<?> exporter = protocol.export(wrapperInvoker);
exporters.add(exporter);
}
}

这里主要做了2件事情:

  • proxyFactory.getInvoker,传入refinterfaceClassurl对象,返回Invoker,其中ref即具体的Service实例。Invoker对下层分装了具体调用服务的细节。
  • protocol.export,这里根据url实际调用的是RegistryProtocol
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
public class RegistryProtocol implements Protocol {
@Override
public <T> Exporter<T> export(final Invoker<T> originInvoker) throws RpcException {
// 导出服务
final ExporterChangeableWrapper<T> exporter = doLocalExport(originInvoker);

// 获取注册中心 URL,以 zookeeper 注册中心为例,得到的示例 URL 如下:
// zookeeper://127.0.0.1:2181/com.alibaba.dubbo.registry.RegistryService?application=demo-provider&dubbo=2.0.2&export=dubbo%3A%2F%2F172.17.48.52%3A20880%2Fcom.alibaba.dubbo.demo.DemoService%3Fanyhost%3Dtrue%26application%3Ddemo-provider
URL registryUrl = getRegistryUrl(originInvoker);

// 根据 URL 加载 Registry 实现类,比如 ZookeeperRegistry
final Registry registry = getRegistry(originInvoker);
// 获取已注册的服务提供者 URL,比如:
// dubbo://172.17.48.52:20880/com.alibaba.dubbo.demo.DemoService?anyhost=true&application=demo-provider&dubbo=2.0.2&generic=false&interface=com.alibaba.dubbo.demo.DemoService&methods=sayHello
final URL registeredProviderUrl = getRegisteredProviderUrl(originInvoker);

//to judge to delay publish whether or not
boolean register = registeredProviderUrl.getParameter("register", true);

ProviderConsumerRegTable.registerProvider(originInvoker, registryUrl, registeredProviderUrl);

if (register) {
// 向注册中心注册服务
register(registryUrl, registeredProviderUrl);
ProviderConsumerRegTable.getProviderWrapper(originInvoker).setReg(true);
}


final URL overrideSubscribeUrl = getSubscribedOverrideUrl(registeredProviderUrl);
final OverrideListener overrideSubscribeListener = new OverrideListener(overrideSubscribeUrl, originInvoker);
overrideListeners.put(overrideSubscribeUrl, overrideSubscribeListener);
// 向注册中心进行订阅 override 数据
registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener);
// 创建并返回 DestroyableExporter
return new DestroyableExporter<T>(exporter, originInvoker, overrideSubscribeUrl, registeredProviderUrl);
}

public void register(URL registryUrl, URL registedProviderUrl) {
Registry registry = registryFactory.getRegistry(registryUrl);
registry.register(registedProviderUrl);
}
}

RegistryProtocol主要做的几件事情:

  • doLocalExport()暴露本地服务,一般是调用的DubboProtocolexport方法,DubboProtocol默认通过Netty开启端口。这里不做详细展开。
  • register()方法向注册中心注册服务。一般我们使用zookeepr注册中心。
  • registry.subscribe向注册中心进行订阅 override 数据,这个以后再分析。

我们重点看ZookeeperRegistryregister方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
public class ZookeeperRegistry extends FailbackRegistry {
public ZookeeperRegistry(URL url, ZookeeperTransporter zookeeperTransporter) {
super(url);

...

zkClient = zookeeperTransporter.connect(url);
zkClient.addStateListener(new StateListener() {
@Override
public void stateChanged(int state) {
//如果注册中心重连,所有已注册服务重新注册
if (state == RECONNECTED) {
try {
recover();
} catch (Exception e) {
logger.error(e.getMessage(), e);
}
}
}
});
}

@Override
protected void doRegister(URL url) {
try {
//第二个参数默认为true,表示创建临时节点
zkClient.create(toUrlPath(url), url.getParameter(Constants.DYNAMIC_KEY, true));
} catch (Throwable e) {
throw new RpcException("Failed to register " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e);
}
}
}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
public abstract class FailbackRegistry extends AbstractRegistry {
public FailbackRegistry(URL url) {
super(url);
this.retryPeriod = url.getParameter(Constants.REGISTRY_RETRY_PERIOD_KEY, Constants.DEFAULT_REGISTRY_RETRY_PERIOD);
this.retryFuture = retryExecutor.scheduleWithFixedDelay(new Runnable() {
@Override
public void run() {
// Check and connect to the registry
try {
retry();
} catch (Throwable t) { // Defensive fault tolerance
logger.error("Unexpected error occur at failed retry, cause: " + t.getMessage(), t);
}
}
}, retryPeriod, retryPeriod, TimeUnit.MILLISECONDS);
}

@Override
public void register(URL url) {
super.register(url);
failedRegistered.remove(url);
failedUnregistered.remove(url);
try {
// Sending a registration request to the server side
doRegister(url);
} catch (Exception e) {
Throwable t = e;

// If the startup detection is opened, the Exception is thrown directly.
boolean check = getUrl().getParameter(Constants.CHECK_KEY, true)
&& url.getParameter(Constants.CHECK_KEY, true)
&& !Constants.CONSUMER_PROTOCOL.equals(url.getProtocol());
boolean skipFailback = t instanceof SkipFailbackWrapperException;
if (check || skipFailback) {
if (skipFailback) {
t = t.getCause();
}
throw new IllegalStateException("Failed to register " + url + " to registry " + getUrl().getAddress() + ", cause: " + t.getMessage(), t);
} else {
logger.error("Failed to register " + url + ", waiting for retry, cause: " + t.getMessage(), t);
}

// Record a failed registration request to a failed list, retry regularly
failedRegistered.add(url);
}
}
}

ZookeeperRegistry继承了FailbackRegistryFailbackRegistry顾名思义,实现了failback策略,当请求失败时,会进行重试。构造函数里初始化了一个ScheduledExecutorService,定时的调用retry()。最后调用子类的doRegister()来实现注册。

ZookeeperRegistry通过zookeeperTransporter.connect()初始化了ZookeeperClient对象,在doRegister()方法中调用zkClient.create()来创建节点,默认创建临时节点。

1
2
3
4
5
6
7
8
9
10
public class CuratorZookeeperTransporter implements ZookeeperTransporter {

@Override
public ZookeeperClient connect(URL url) {
return new CuratorZookeeperClient(url);
}

}


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
public class CuratorZookeeperClient extends AbstractZookeeperClient<CuratorWatcher> {
private final CuratorFramework client;
public CuratorZookeeperClient(URL url) {
super(url);
try {
CuratorFrameworkFactory.Builder builder = CuratorFrameworkFactory.builder()
.connectString(url.getBackupAddress())
.retryPolicy(new RetryNTimes(1, 1000))
.connectionTimeoutMs(5000);
String authority = url.getAuthority();
if (authority != null && authority.length() > 0) {
builder = builder.authorization("digest", authority.getBytes());
}
client = builder.build();
client.getConnectionStateListenable().addListener(new ConnectionStateListener() {
@Override
public void stateChanged(CuratorFramework client, ConnectionState state) {
if (state == ConnectionState.LOST) {
CuratorZookeeperClient.this.stateChanged(StateListener.DISCONNECTED);
} else if (state == ConnectionState.CONNECTED) {
CuratorZookeeperClient.this.stateChanged(StateListener.CONNECTED);
} else if (state == ConnectionState.RECONNECTED) {
CuratorZookeeperClient.this.stateChanged(StateListener.RECONNECTED);
}
}
});
client.start();
} catch (Exception e) {
throw new IllegalStateException(e.getMessage(), e);
}
}
@Override
public void createPersistent(String path) {
try {
client.create().forPath(path);
} catch (NodeExistsException e) {
} catch (Exception e) {
throw new IllegalStateException(e.getMessage(), e);
}
}

@Override
public void createEphemeral(String path) {
try {
client.create().withMode(CreateMode.EPHEMERAL).forPath(path);
} catch (NodeExistsException e) {
} catch (Exception e) {
throw new IllegalStateException(e.getMessage(), e);
}
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public abstract class AbstractZookeeperClient<TargetChildListener> implements ZookeeperClient {
@Override
public void create(String path, boolean ephemeral) {
if (!ephemeral) {
if (checkExists(path)) {
return;
}
}
int i = path.lastIndexOf('/');
if (i > 0) {
create(path.substring(0, i), false);
}
if (ephemeral) {
createEphemeral(path);
} else {
createPersistent(path);
}
}
}

可以看到最终是通过CuratorFramework和zookeeper进行通信的。

总结

最后整个服务暴露流程可以总结如下:

1
2
3
4
5
6
7
ServiceBean#export
->ServiceConfig#export
->RegistryProtocol#export
->DubboProtocol#export //暴露Dubbo服务
->CuratorZookeeperClient#doRegister //注册中心
->ZookeeperClient#create
->CuratorFramework#create