Nacos源码 (3) 注册中心
本文将从一个服务注册示例入手,通过阅读客户端、服务端源码,分析服务注册、服务发现原理。
使用的2.0.2的版本。
客户端
创建NacosNamingService对象
NacosNamingService nacosNamingService = new NacosNamingService(NACOS_HOST);
NacosNamingService提供两个构造方法:
public NacosNamingService(String serverList) throws NacosException {
Properties properties = new Properties();
properties.setProperty(PropertyKeyConst.SERVER_ADDR, serverList);
init(properties);
}
public NacosNamingService(Properties properties) throws NacosException {
init(properties);
}
第二个方法的properties的key在PropertyKeyConst常量类可以找到,如:
- namespace
- username
- password
- serverAddr
- clusterName
- 其他
构造方法中会初始化一些参数和组件:
-
初始化namespace参数
-
创建InstancesChangeNotifier对象,它实现了Subscriber接口,监听InstancesChangeEvent事件
public class InstancesChangeNotifier extends Subscriber<InstancesChangeEvent> { // key使用serviceName + groupName + clusters组合而成 // value是监听器集合 private final Map<String, ConcurrentHashSet<EventListener>> listenerMap; // 锁 private final Object lock = new Object();
-
向NotifyCenter注册InstancesChangeEvent事件,注册之前创建的InstancesChangeNotifier对象监听服务实例变化
NotifyCenter.registerToPublisher(InstancesChangeEvent.class, 16384); NotifyCenter.registerSubscriber(changeNotifier); // NotifyCenter维护着EventPublisher集,Subscriber会被注册到EventPublisher上 // EventPublisher提供publish方法向Event队列推送事件 // EventPublisher是一个Thread类,run方法从Event队列取事件通知Subscriber来处理
-
创建NamingClientProxyDelegate对象,用于与服务端通信,它是一个代理,内部使用其他的NamingClientProxy实现:
- NamingHttpClientProxy
- NamingGrpcClientProxy - 默认使用该实现类,其中有healthCheck检测服务端是否健康,服务端直接响应成功无操作
服务注册
NacosNamingService nacosNamingService = new NacosNamingService(NACOS_HOST);
nacosNamingService.registerInstance(ORDER_SERVICE, "192.168.0.100", 9999);
提供多个重载的registerInstance方法,最终使用这个方法:
public void registerInstance(String serviceName, String groupName, String ip, int port, String clusterName)
throws NacosException {
Instance instance = new Instance();
instance.setIp(ip);
instance.setPort(port);
instance.setWeight(1.0);
instance.setClusterName(clusterName);
registerInstance(serviceName, groupName, instance);
}
public void registerInstance(String serviceName, String groupName, Instance instance)
throws NacosException {
// 此处clientProxy是NamingClientProxyDelegate对象
clientProxy.registerService(serviceName, groupName, instance);
}
NamingClientProxyDelegate的registerService方法会选择一个具体的NamingClientProxy对象与服务端通信,默认使用NamingGrpcClientProxy对象。
NamingGrpcClientProxy的registerService方法构建InstanceRequest请求对象,之后使用RpcClient对象发送请求并接收响应。
RpcClient内部通过GrpcConnection对象使用GRPC来访问服务端。
内部的GRPC代码是使用protoc和protobuf-maven-plugin生成的,通信细节此处不做介绍。
服务下线
nacosNamingService.deregisterInstance(ORDER_SERVICE, "192.168.0.100", 9999);
deregisterInstance服务下线:
public void deregisterInstance(String serviceName,
String groupName,
String ip,
int port,
String clusterName) throws NacosException {
Instance instance = new Instance();
instance.setIp(ip);
instance.setPort(port);
instance.setClusterName(clusterName);
deregisterInstance(serviceName, groupName, instance);
}
public void deregisterInstance(String serviceName,
String groupName,
Instance instance) throws NacosException {
clientProxy.deregisterService(serviceName, groupName, instance);
}
查询实例
示例代码:
NacosNamingService namingService = new NacosNamingService(NACOS_HOST);
List<Instance> instances = namingService.getAllInstances(ORDER_SERVICE, true);
System.out.printf(">> instance count=%d\n", instances.size());
for (Instance instance : instances) {
System.out.printf(">> serviceName=%s, id=%s, cluster=%s, ip=%s, port=%s\n",
instance.getServiceName(), instance.getInstanceId(),
instance.getClusterName(), instance.getIp(), instance.getPort());
}
提供了几个重载的getAllInstances方法,最重要的参数就是subscribe,当为true时,会向服务端发送订阅请求,之后一直从ServiceInfoHolder中获取服务实例信息,而不再向服务端发送查询请求。
public List<Instance> getAllInstances(String serviceName, String groupName, List<String> clusters,
boolean subscribe) throws NacosException {
ServiceInfo serviceInfo;
String clusterString = StringUtils.join(clusters, ",");
if (subscribe) {
serviceInfo = serviceInfoHolder.getServiceInfo(serviceName, groupName, clusterString);
if (null == serviceInfo) {
// 订阅请求
serviceInfo = clientProxy.subscribe(serviceName, groupName, clusterString);
}
} else {
// 查询请求
serviceInfo = clientProxy.queryInstancesOfService(serviceName, groupName, clusterString, 0, false);
}
List<Instance> list;
if (serviceInfo == null || CollectionUtils.isEmpty(list = serviceInfo.getHosts())) {
return new ArrayList<Instance>();
}
return list;
}
服务订阅
示例代码:
NacosNamingService namingService = new NacosNamingService(NACOS_HOST);
namingService.subscribe(ORDER_SERVICE, new EventListener() {
@Override
public void onEvent(Event event) {
NamingEvent e = (NamingEvent) event;
System.out.println("serviceName=" + e.getServiceName());
List<Instance> instances = e.getInstances();
System.out.printf(">> instance count=%d\n", instances.size());
for (Instance instance : instances) {
System.out.printf(">> serviceName=%s, id=%s, cluster=%s, ip=%s, port=%s\n",
instance.getServiceName(), instance.getInstanceId(),
instance.getClusterName(), instance.getIp(), instance.getPort());
}
}
});
TimeUnit.SECONDS.sleep(1200);
subscribe方法:
public void subscribe(String serviceName, String groupName, List<String> clusters, EventListener listener)
throws NacosException {
String clusterString = StringUtils.join(clusters, ",");
// 将listener保存到listenerMap中
changeNotifier.registerListener(groupName, serviceName, clusterString, listener);
// 发送订阅请求
clientProxy.subscribe(serviceName, groupName, clusterString);
}
实例变化的方法调用栈:
当收到服务端的实例变化事件时,会触发grpc层的观察者监听:
public void onMessage(RespT message) {
if (firstResponseReceived && !streamingResponse) {
throw Status.INTERNAL
.withDescription("More than one responses received for unary or client-streaming call")
.asRuntimeException();
}
firstResponseReceived = true;
// 调用观察者
observer.onNext(message);
if (streamingResponse && adapter.autoFlowControlEnabled) {
// Request delivery of the next inbound message.
adapter.request(1);
}
}
此处的observer是在创建rpc连接的时候注册的:
private StreamObserver<Payload> bindRequestStream(
final BiRequestStreamGrpc.BiRequestStreamStub streamStub,
final GrpcConnection grpcConn) {
return streamStub.requestBiStream(new StreamObserver<Payload>() {
@Override
public void onNext(Payload payload) {
try {
Object parseBody = GrpcUtils.parse(payload);
final Request request = (Request) parseBody;
if (request != null) {
try {
// 调用ServerRequestHandler处理请求
Response response = handleServerRequest(request);
if (response != null) {
response.setRequestId(request.getRequestId());
sendResponse(response);
}
// ...
NamingPushRequestHandler的处理逻辑:
public Response requestReply(Request request) {
if (request instanceof NotifySubscriberRequest) {
NotifySubscriberRequest notifyResponse = (NotifySubscriberRequest) request;
serviceInfoHolder.processServiceInfo(notifyResponse.getServiceInfo());
return new NotifySubscriberResponse();
}
return null;
}
serviceInfoHolder.processServiceInfo方法:
public ServiceInfo processServiceInfo(ServiceInfo serviceInfo) {
String serviceKey = serviceInfo.getKey();
if (serviceKey == null) {
return null;
}
ServiceInfo oldService = serviceInfoMap.get(serviceInfo.getKey());
if (isEmptyOrErrorPush(serviceInfo)) {
//empty or error push, just ignore
return oldService;
}
serviceInfoMap.put(serviceInfo.getKey(), serviceInfo);
boolean changed = isChangedServiceInfo(oldService, serviceInfo);
if (StringUtils.isBlank(serviceInfo.getJsonFromServer())) {
serviceInfo.setJsonFromServer(JacksonUtils.toJson(serviceInfo));
}
MetricsMonitor.getServiceInfoMapSizeMonitor().set(serviceInfoMap.size());
if (changed) {
// 推送一个InstancesChangeEvent事件
NotifyCenter.publishEvent(new InstancesChangeEvent(
serviceInfo.getName(), serviceInfo.getGroupName(),
serviceInfo.getClusters(), serviceInfo.getHosts()));
DiskCache.write(serviceInfo, cacheDir);
}
return serviceInfo;
}
推送一个InstancesChangeEvent事件:
-
NotifyCenter维护着一个EventPublisher集,当有事件时,会选择一个目标EventPublisher
-
通过publish方法将事件保存到一个Event队列
public boolean publish(Event event) { checkIsStart(); boolean success = this.queue.offer(event); if (!success) { // 当队列操作失败时,直接使用当前线程处理事件 receiveEvent(event); return true; } return true; }
-
EventPublisher是一个线程,在NotifyCenter初始化时启动。run方法会从Event队列取事件,使用receiveEvent(event)进行处理
-
receiveEvent方法查找所有的Subscriber,其中就有最初创建的InstancesChangeNotifier,调用订阅者onEvent方法
服务端
服务注册
InstanceRequestHandler处理器
注册中心的rpc处理器在com.alibaba.nacos.naming.remote.rpc.handler包,处理服务注册和下线的处理器是InstanceRequestHandler类:
public class InstanceRequestHandler extends RequestHandler<InstanceRequest, InstanceResponse> {
private final EphemeralClientOperationServiceImpl clientOperationService;
public InstanceRequestHandler(EphemeralClientOperationServiceImpl clientOperationService) {
this.clientOperationService = clientOperationService;
}
@Secured(action = ActionTypes.WRITE, parser = NamingResourceParser.class)
public InstanceResponse handle(InstanceRequest request, RequestMeta meta) throws NacosException {
Service service = Service
.newService(request.getNamespace(), request.getGroupName(), request.getServiceName(), true);
switch (request.getType()) {
// 服务注册
case NamingRemoteConstants.REGISTER_INSTANCE:
return registerInstance(service, request, meta);
// 服务下线
case NamingRemoteConstants.DE_REGISTER_INSTANCE:
return deregisterInstance(service, request, meta);
default:
throw new NacosException(NacosException.INVALID_PARAM,
String.format("Unsupported request type %s", request.getType()));
}
}
// 服务注册
private InstanceResponse registerInstance(Service service, InstanceRequest request, RequestMeta meta) {
clientOperationService.registerInstance(service, request.getInstance(), meta.getConnectionId());
return new InstanceResponse(NamingRemoteConstants.REGISTER_INSTANCE);
}
// 服务下线
private InstanceResponse deregisterInstance(
Service service, InstanceRequest request, RequestMeta meta) {
clientOperationService.deregisterInstance(service, request.getInstance(), meta.getConnectionId());
return new InstanceResponse(NamingRemoteConstants.DE_REGISTER_INSTANCE);
}
}
服务注册核心流程
public void registerInstance(Service service, Instance instance, String clientId) {
Service singleton = ServiceManager.getInstance().getSingleton(service);
Client client = clientManager.getClient(clientId);
InstancePublishInfo instanceInfo = getPublishInfo(instance);
// Add a new instance for service for current client
// 1. 给当前客户端绑定service -> instance关系
// 2. 推送一个ClientChangedEvent事件
client.addServiceInstance(singleton, instanceInfo);
client.setLastUpdatedTime();
// 推送ClientRegisterServiceEvent和InstanceMetadataEvent事件
NotifyCenter.publishEvent(
new ClientOperationEvent.ClientRegisterServiceEvent(singleton, clientId));
NotifyCenter.publishEvent(
new MetadataEvent.InstanceMetadataEvent(singleton, instanceInfo.getMetadataId(), false));
}
- 给当前客户端绑定service -> instance关系
- 推送一个ClientChangedEvent事件
- 推送ClientRegisterServiceEvent事件
- 推送InstanceMetadataEvent事件
事件处理流程
ClientChangedEvent事件:Client changed event. Happened when Client add or remove service. 会由DistroClientDataProcessor进行处理,同步客户端数据到所有服务节点。
ClientRegisterServiceEvent事件:Client register service event. 由ClientServiceIndexesManager进行处理,ClientServiceIndexesManager类维护clientId与service的注册关系和订阅关系。另外该处理器会推送一个ServiceChangedEvent事件。
InstanceMetadataEvent事件:实例元数据事件。由NamingMetadataManager进行处理,NamingMetadataManager管理客户端注册的服务和实例元数据信息。InstanceMetadataEvent事件会触发该处理器的实例过期判断。
ServiceChangedEvent事件:Service data changed event. 有两个处理器:
- NamingSubscriberServiceV2Impl - 触发回调服务订阅者任务
- DoubleWriteEventListener - 触发将服务信息同步到其他nacos节点任务
服务下线
服务下线核心流程
public void deregisterInstance(Service service, Instance instance, String clientId) {
Service singleton = ServiceManager.getInstance().getSingleton(service);
Client client = clientManager.getClient(clientId);
// Remove service instance from client
// 1. 解除当前客户端的service -> instance关系
// 2. 推送一个ClientChangedEvent事件
InstancePublishInfo removedInstance = client.removeServiceInstance(singleton);
client.setLastUpdatedTime();
// 推送ClientDeregisterServiceEvent和InstanceMetadataEvent事件
if (null != removedInstance) {
NotifyCenter.publishEvent(
new ClientOperationEvent.ClientDeregisterServiceEvent(singleton, clientId));
NotifyCenter.publishEvent(
new MetadataEvent.InstanceMetadataEvent(singleton, removedInstance.getMetadataId(), true));
}
}
- 解除当前客户端的service -> instance关系
- 推送一个ClientChangedEvent事件
- 推送ClientDeregisterServiceEvent事件
- 推送InstanceMetadataEvent事件
事件处理流程
基本与服务注册流程相同。
ClientChangedEvent事件:Client changed event. Happened when Client add or remove service. 会由DistroClientDataProcessor进行处理,同步客户端数据到所有服务节点。
ClientDeregisterServiceEvent事件:Client deregister service event. 由ClientServiceIndexesManager进行处理,ClientServiceIndexesManager类维护clientId与service的注册关系和订阅关系。另外该处理器会推送一个ServiceChangedEvent事件。
InstanceMetadataEvent事件:实例元数据事件。由NamingMetadataManager进行处理,NamingMetadataManager管理客户端注册的服务和实例元数据信息。InstanceMetadataEvent事件会触发该处理器的实例过期判断。
ServiceChangedEvent事件:Service data changed event. 有两个处理器:
- NamingSubscriberServiceV2Impl - 触发回调服务订阅者任务
- DoubleWriteEventListener - 触发将服务信息同步到其他nacos节点任务
服务实例心跳
- 客户端会周期性的发送healthCheck请求
- 服务端每次收到客户端请求时都会更新对应connection的活跃时间戳
- 服务端也会周期性的检查客户端connection的活跃时间戳和客户端IP连接数,当超过一定的时间不活跃,服务端会发一个检测请求给客户端,当连接数超过阈值时将重置多余的连接
客户端healthCheck请求
客户端会周期性发送healthCheck请求,默认每5秒执行一次,在RpcClient中:
clientEventExecutor.submit(new Runnable() {
@Override
public void run() {
while (true) {
try {
if (isShutdown()) {
break;
}
ReconnectContext reconnectContext = reconnectionSignal
.poll(keepAliveTime, TimeUnit.MILLISECONDS);
if (reconnectContext == null) {
//check alive time.
if (System.currentTimeMillis() - lastActiveTimeStamp >= keepAliveTime) {
boolean isHealthy = healthCheck();
if (!isHealthy) {
if (currentConnection == null) {
continue;
}
RpcClientStatus rpcClientStatus = RpcClient.this.rpcClientStatus.get();
if (RpcClientStatus.SHUTDOWN.equals(rpcClientStatus)) {
break;
}
// ...
healthCheck健康检查:
private boolean healthCheck() {
HealthCheckRequest healthCheckRequest = new HealthCheckRequest();
try {
Response response = this.currentConnection.request(healthCheckRequest, 3000L);
return response != null && response.isSuccess();
} catch (NacosException e) {
//ignore
}
return false;
}
如果检查失败,将重新建立连接。
服务端记录connection活跃时间戳
服务端每次收到客户端请求时都会更新对应connection的活跃时间戳。
服务端使用GrpcRequestAcceptor作为业务层请求Acceptor入口,这个类会将GRPC的请求转为业务层请求,并转发到对应的RequestHandler处理器。
在其request方法中,会刷新对应connection的活跃时间戳:
Connection connection = connectionManager.getConnection(CONTEXT_KEY_CONN_ID.get());
RequestMeta requestMeta = new RequestMeta();
requestMeta.setClientIp(connection.getMetaInfo().getClientIp());
requestMeta.setConnectionId(CONTEXT_KEY_CONN_ID.get());
requestMeta.setClientVersion(connection.getMetaInfo().getVersion());
requestMeta.setLabels(connection.getMetaInfo().getLabels());
// 刷新connection的活跃时间戳
connectionManager.refreshActiveTime(requestMeta.getConnectionId());
Response response = requestHandler.handleRequest(request, requestMeta);
Payload payloadResponse = GrpcUtils.convert(response);
traceIfNecessary(payloadResponse, false);
responseObserver.onNext(payloadResponse);
responseObserver.onCompleted();
服务端connection活跃检查
服务端周期性检查客户端connection的活跃时间戳和客户端IP连接数,当超过一定的时间不活跃,服务端会发一个检测请求给客户端,当连接数超过阈值时将重置多余的连接。
服务端使用ConnectionManager管理连接:
Map<String, Connection> connections = new ConcurrentHashMap<String, Connection>();
在启动时,会创建周期性任务检查connections的活跃状态,默认每3秒执行一次,以下为代码片段:
// 检查长时间不活跃的连接和超过最大连接数的连接
for (Map.Entry<String, Connection> entry : entries) {
Connection client = entry.getValue();
String clientIp = client.getMetaInfo().getClientIp();
AtomicInteger integer = expelForIp.get(clientIp);
if (integer != null && integer.intValue() > 0) {
integer.decrementAndGet();
expelClient.add(client.getMetaInfo().getConnectionId());
expelCount--;
} else if (now - client.getMetaInfo().getLastActiveTime() >= KEEP_ALIVE_TIME) {
outDatedConnections.add(client.getMetaInfo().getConnectionId());
}
}
// ...
// 重置超过最大连接数的连接
for (String expelledClientId : expelClient) {
try {
Connection connection = getConnection(expelledClientId);
if (connection != null) {
ConnectResetRequest connectResetRequest = new ConnectResetRequest();
connectResetRequest.setServerIp(serverIp);
connectResetRequest.setServerPort(serverPort);
connection.asyncRequest(connectResetRequest, null);
}
} catch (ConnectionAlreadyClosedException e) {
unregister(expelledClientId);
} catch (Exception e) {
}
}
// ...
if (CollectionUtils.isNotEmpty(outDatedConnections)) {
Set<String> successConnections = new HashSet<>();
final CountDownLatch latch = new CountDownLatch(outDatedConnections.size());
for (String outDateConnectionId : outDatedConnections) {
try {
Connection connection = getConnection(outDateConnectionId);
if (connection != null) {
// 给客户端发检测请求
ClientDetectionRequest clientDetectionRequest = new ClientDetectionRequest();
connection.asyncRequest(clientDetectionRequest, new RequestCallBack() {
@Override
public Executor getExecutor() {
return null;
}
@Override
public long getTimeout() {
return 1000L;
}
@Override
public void onResponse(Response response) {
latch.countDown();
if (response != null && response.isSuccess()) {
connection.freshActiveTime();
successConnections.add(outDateConnectionId);
}
}
@Override
public void onException(Throwable e) {
latch.countDown();
}
});
} else {
latch.countDown();
}
} catch (ConnectionAlreadyClosedException e) {
latch.countDown();
} catch (Exception e) {
latch.countDown();
}
}
latch.await(3000L, TimeUnit.MILLISECONDS);
// 移除失败的已断开连接
for (String outDateConnectionId : outDatedConnections) {
if (!successConnections.contains(outDateConnectionId)) {
unregister(outDateConnectionId);
}
}
}
客户端断开连接
业务处理流程
GRPC连接层检测到连接断开之后,会触发GrpcServer的transportTerminated事件:
public void transportTerminated(Attributes transportAttrs) {
String connectionId = null;
try {
connectionId = transportAttrs.get(TRANS_KEY_CONN_ID);
} catch (Exception e) {
// Ignore
}
if (StringUtils.isNotBlank(connectionId)) {
// 使用ConnectionManager移除连接
connectionManager.unregister(connectionId);
}
}
ConnectionManager移除连接:
public synchronized void unregister(String connectionId) {
// 从Connection集移除连接
Connection remove = this.connections.remove(connectionId);
if (remove != null) {
String clientIp = remove.getMetaInfo().clientIp;
AtomicInteger atomicInteger = connectionForClientIp.get(clientIp);
// IP连接数--
if (atomicInteger != null) {
int count = atomicInteger.decrementAndGet();
if (count <= 0) {
connectionForClientIp.remove(clientIp);
}
}
remove.close();
// 通知ClientManager层断开连接
clientConnectionEventListenerRegistry.notifyClientDisConnected(remove);
}
}
ConnectionBasedClientManager的clientDisconnected方法:
public boolean clientDisconnected(String clientId) {
ConnectionBasedClient client = clients.remove(clientId);
if (null == client) {
return true;
}
client.release();
// 推送一个ClientDisconnectEvent事件
NotifyCenter.publishEvent(new ClientEvent.ClientDisconnectEvent(client));
return true;
}
事件处理流程
ClientDisconnectEvent事件:Client disconnect event. Happened when Client disconnect with server.
- ClientServiceIndexesManager - 维护注册和订阅关系
- DistroClientDataProcessor - 同步客户端数据到所有服务节点
- NamingMetadataManager - 维护客户端注册的服务和实例元数据信息
查询实例
ServiceQueryRequestHandler处理器
ServiceQueryRequestHandler类负责客户端的服务实例查询请求:
public class ServiceQueryRequestHandler extends RequestHandler<ServiceQueryRequest, QueryServiceResponse> {
private final ServiceStorage serviceStorage;
private final NamingMetadataManager metadataManager;
public ServiceQueryRequestHandler(ServiceStorage serviceStorage,
NamingMetadataManager metadataManager) {
this.serviceStorage = serviceStorage;
this.metadataManager = metadataManager;
}
@Override
@Secured(action = ActionTypes.READ, parser = NamingResourceParser.class)
public QueryServiceResponse handle(
ServiceQueryRequest request, RequestMeta meta) throws NacosException {
String namespaceId = request.getNamespace();
String groupName = request.getGroupName();
String serviceName = request.getServiceName();
Service service = Service.newService(namespaceId, groupName, serviceName);
String cluster = null == request.getCluster() ? "" : request.getCluster();
boolean healthyOnly = request.isHealthyOnly();
// ServiceInfo封装服务基本信息和其实例集合
ServiceInfo result = serviceStorage.getData(service);
ServiceMetadata serviceMetadata = metadataManager.getServiceMetadata(service).orElse(null);
result = ServiceUtil
.selectInstancesWithHealthyProtection(result, serviceMetadata, cluster, healthyOnly, true);
return QueryServiceResponse.buildSuccessResponse(result);
}
}
查询服务实例:
public ServiceInfo getData(Service service) {
// 如果缓存里面有服务信息则直接从缓存查找
return serviceDataIndexes.containsKey(service) ? serviceDataIndexes.get(service) : getPushData(service);
}
public ServiceInfo getPushData(Service service) {
ServiceInfo result = emptyServiceInfo(service);
if (!ServiceManager.getInstance().containSingleton(service)) {
return result;
}
// 从ClientServiceIndexesManager查找
result.setHosts(getAllInstancesFromIndex(service));
serviceDataIndexes.put(service, result);
return result;
}
private List<Instance> getAllInstancesFromIndex(Service service) {
Set<Instance> result = new HashSet<>();
Set<String> clusters = new HashSet<>();
// 从ClientServiceIndexesManager查找service绑定的client集
for (String each : serviceIndexesManager.getAllClientsRegisteredService(service)) {
// 查找该client注册的实例信息
Optional<InstancePublishInfo> instancePublishInfo = getInstanceInfo(each, service);
if (instancePublishInfo.isPresent()) {
Instance instance = parseInstance(service, instancePublishInfo.get());
result.add(instance);
clusters.add(instance.getClusterName());
}
}
// cache clusters of this service
serviceClusterIndex.put(service, clusters);
return new LinkedList<>(result);
}
private Optional<InstancePublishInfo> getInstanceInfo(String clientId, Service service) {
// 获取到client对象
Client client = clientManager.getClient(clientId);
if (null == client) {
return Optional.empty();
}
// 查找该client指定service注册的实例信息
// AbstractClient使用Map<Service, InstancePublishInfo>结构保存
// 前文介绍过在服务注册时会使用client.addServiceInstance方法添加注册信息
return Optional.ofNullable(client.getInstancePublishInfo(service));
}
前文介绍过ClientServiceIndexesManager类维护clientId与service的注册关系和订阅关系。
服务订阅
SubscribeServiceRequestHandler处理器
SubscribeServiceRequestHandler类负责客户端的服务订阅请求:
public class SubscribeServiceRequestHandler extends
RequestHandler<SubscribeServiceRequest, SubscribeServiceResponse> {
private final ServiceStorage serviceStorage;
private final NamingMetadataManager metadataManager;
private final EphemeralClientOperationServiceImpl clientOperationService;
public SubscribeServiceRequestHandler(ServiceStorage serviceStorage,
NamingMetadataManager metadataManager,
EphemeralClientOperationServiceImpl clientOperationService) {
this.serviceStorage = serviceStorage;
this.metadataManager = metadataManager;
this.clientOperationService = clientOperationService;
}
@Secured(action = ActionTypes.READ, parser = NamingResourceParser.class)
public SubscribeServiceResponse handle(
SubscribeServiceRequest request, RequestMeta meta) throws NacosException {
String namespaceId = request.getNamespace();
String serviceName = request.getServiceName();
String groupName = request.getGroupName();
String app = request.getHeader("app", "unknown");
String groupedServiceName = NamingUtils.getGroupedName(serviceName, groupName);
Service service = Service.newService(namespaceId, groupName, serviceName, true);
// 封装Subscriber对象:客户端IP、版本、命名空间等
Subscriber subscriber = new Subscriber(meta.getClientIp(), meta.getClientVersion(), app,
meta.getClientIp(), namespaceId, groupedServiceName, 0, request.getClusters());
ServiceInfo serviceInfo = handleClusterData(serviceStorage.getData(service),
metadataManager.getServiceMetadata(service).orElse(null),
subscriber);
if (request.isSubscribe()) {
// 服务订阅
clientOperationService.subscribeService(service, subscriber, meta.getConnectionId());
} else {
// 取消订阅
clientOperationService.unsubscribeService(service, subscriber, meta.getConnectionId());
}
return new SubscribeServiceResponse(ResponseCode.SUCCESS.getCode(), "success", serviceInfo);
}
private ServiceInfo handleClusterData(
ServiceInfo data, ServiceMetadata metadata, Subscriber subscriber) {
return StringUtils.isBlank(subscriber.getCluster()) ? data
: ServiceUtil.selectInstancesWithHealthyProtection(data, metadata, subscriber.getCluster());
}
}
服务订阅核心流程
public void subscribeService(Service service, Subscriber subscriber, String clientId) {
Service singleton = ServiceManager.getInstance().getSingletonIfExist(service).orElse(service);
Client client = clientManager.getClient(clientId);
// 为该client绑定service -> subscriber关系
client.addServiceSubscriber(singleton, subscriber);
client.setLastUpdatedTime();
// 推送一个ClientSubscribeServiceEvent事件
NotifyCenter.publishEvent(new ClientOperationEvent.ClientSubscribeServiceEvent(singleton, clientId));
}
事件处理流程
ClientSubscribeServiceEvent事件:Client subscribe service event. 由ClientServiceIndexesManager进行处理,ClientServiceIndexesManager类维护clientId与service的注册关系和订阅关系。
private void addSubscriberIndexes(Service service, String clientId) {
subscriberIndexes.computeIfAbsent(service, (key) -> new ConcurrentHashSet<>());
// Only first time add need notify event.
if (subscriberIndexes.get(service).add(clientId)) {
// 推送一个ServiceSubscribedEvent事件
NotifyCenter.publishEvent(new ServiceEvent.ServiceSubscribedEvent(service, clientId));
}
}
ServiceSubscribedEvent事件:Service is subscribed by one client event. NamingSubscriberServiceV2Impl进行处理。
public void onEvent(Event event) {
if (event instanceof ServiceEvent.ServiceChangedEvent) {
// If service changed, push to all subscribers.
ServiceEvent.ServiceChangedEvent serviceChangedEvent = (ServiceEvent.ServiceChangedEvent) event;
Service service = serviceChangedEvent.getService();
delayTaskEngine.addTask(
service, new PushDelayTask(service, PushConfig.getInstance().getPushTaskDelay()));
} else if (event instanceof ServiceEvent.ServiceSubscribedEvent) {
// 触发一次订阅者回调,把被订阅的服务的信息推送给订阅者
ServiceEvent.ServiceSubscribedEvent subscribedEvent = (ServiceEvent.ServiceSubscribedEvent) event;
Service service = subscribedEvent.getService();
delayTaskEngine.addTask(
service, new PushDelayTask(service, PushConfig.getInstance().getPushTaskDelay(),
subscribedEvent.getClientId()));
}
}
取消服务订阅
public void unsubscribeService(Service service, Subscriber subscriber, String clientId) {
Service singleton = ServiceManager.getInstance().getSingletonIfExist(service).orElse(service);
Client client = clientManager.getClient(clientId);
client.removeServiceSubscriber(singleton);
client.setLastUpdatedTime();
NotifyCenter.publishEvent(new ClientOperationEvent.ClientUnsubscribeServiceEvent(singleton, clientId));
}
推送一个ClientUnsubscribeServiceEvent事件,还是使用ClientServiceIndexesManager来处理,移除订阅关系。