Nacos源码分析-客户端如何感知服务变更
在《nacos源码阅读-服务注册分析》分析中,会调用到如下方法进行数据一致性的同步,这里一致性同步包括两个方法,第一,不同的Nacos客户端的数据是如何同步的,换句话说就是Nacos客户端是如何感知服务变更的,第二,Nacos集群的数据是如何同步的,这篇文章分析第一点,第二点在其他文章中进行分析。先看下服务注册时,最终会调用到下面的代码,在《nacos源码阅读-服务注册分析》分析中,只分析到这里就停止了,接下来进行consistencyService的put方法分析:
//源码位置:com.alibaba.nacos.naming.core.ServiceManager#addInstance
public void addInstance(String namespaceId, String serviceName, boolean ephemeral, Instance... ips)
throws NacosException {
//省略代码
synchronized (service) {
//省略代码
//数据一致性同步
consistencyService.put(key, instances);
}
}
consistencyService的put方法就是进行数据一致性同步的,该方法会调用DelegateConsistencyServiceImpl的实现,如下:
//代码位置:com.alibaba.nacos.naming.consistency.DelegateConsistencyServiceImpl#put
public void put(String key, Record value) throws NacosException {
mapConsistencyService(key).put(key, value);
}
private ConsistencyService mapConsistencyService(String key) {
return KeyBuilder.matchEphemeralKey(key) ? ephemeralConsistencyService : persistentConsistencyService;
}
DelegateConsistencyServiceImpl类的mapConsistencyService方法会根据key来选择不同的一致性同步类,当Instance是临时的,即key以com.alibaba.nacos.naming.iplist.ephemeral.开头,那么就选择ephemeralConsistencyService,否则选择persistentConsistencyService,ephemeralConsistencyService类是用来同步临时性Instance,persistentConsistencyService类是用来同步持久化的Instance的。ephemeralConsistencyService类的put方法如下:
//代码位置:com.alibaba.nacos.naming.consistency.ephemeral.distro#DistroConsistencyServiceImpl#put
public void put(String key, Record value) throws NacosException {
//保存instance到内存。并添加服务变更通知到队列中
onPut(key, value);
//集群间的数据一致性同步
distroProtocol.sync(new DistroKey(key, KeyBuilder.INSTANCE_LIST_KEY_PREFIX), DataOperation.CHANGE,
globalConfig.getTaskDispatchPeriod() / 2);
}
onPut方法的作用:保存instance到内存。并添加服务变更通知到队列中。
distroProtocol.sync方法作用:集群间的数据一致性同步。
这里只分析onPut,onPut方法的代码如下:
public void onPut(String key, Record value) {
//如果instance是临时的,将保存到内存中
if (KeyBuilder.matchEphemeralInstanceListKey(key)) {
Datum<Instances> datum = new Datum<>();
datum.value = (Instances) value;
datum.key = key;
datum.timestamp.incrementAndGet();
dataStore.put(key, datum);
}
//如果监听器没有包含key,则直接返回
if (!listeners.containsKey(key)) {
return;
}
//添加数据变更任务到队列中
notifier.addTask(key, DataOperation.CHANGE);
}
onPut首先判断下instance是否是临时的,如果是的话,则将数据封装为Datum,并用DataStore保存到Map中。addTask是将数据变更添加到队列中。
public void addTask(String datumKey, DataOperation action) {
if (services.containsKey(datumKey) && action == DataOperation.CHANGE) {
return;
}
if (action == DataOperation.CHANGE) {
services.put(datumKey, StringUtils.EMPTY);
}
tasks.offer(Pair.with(datumKey, action));
}
tasks是BlockingQueue>队列。到这里,实际上服务注册才结束,但是Nacos客户端的数据同步还没有分析到,将服务变更添加到队列中是由Notifier类的addTask方法完成的,Notifier类继承了Runnable,在DistroConsistencyServiceImpl初始化完成时,会调用init方法进行启动Notifier线程:
@PostConstruct
public void init() {
GlobalExecutor.submitDistroNotifyTask(notifier);
}
Notifier类的run方法如下:
public void run() {
Loggers.DISTRO.info("distro notifier started");
for (; ; ) {
try {
Pair<String, DataOperation> pair = tasks.take();
handle(pair);
} catch (Throwable e) {
Loggers.DISTRO.error("[NACOS-DISTRO] Error while handling notifying task", e);
}
}
}
run不断从tasks队列中拿出服务变更的可以,然后交给handle方法处理:
private void handle(Pair<String, DataOperation> pair) {
try {
String datumKey = pair.getValue0();
DataOperation action = pair.getValue1();
services.remove(datumKey);
int count = 0;
if (!listeners.containsKey(datumKey)) {
return;
}
//遍历监听器
for (RecordListener listener : listeners.get(datumKey)) {
count++;
try {
//如果属于服务变更,则调用监听的onChange方法
if (action == DataOperation.CHANGE) {
listener.onChange(datumKey, dataStore.get(datumKey).value);
continue;
}
if (action == DataOperation.DELETE) {
listener.onDelete(datumKey);
continue;
}
} catch (Throwable e) {
Loggers.DISTRO.error("[NACOS-DISTRO] error while notifying listener of key: {}", datumKey, e);
}
}
//省略代码
} catch (Throwable e) {
Loggers.DISTRO.error("[NACOS-DISTRO] Error while handling notifying task", e);
}
}
handle不断遍历监听器,当action等于DataOperation.CHANGE,则调用listener.onChange方法,当action 等于DataOperation.DELETE,则调用listener.onDelete方法。当服务变更时,实际调用的是Service的onChange方法,该方法如下:
//代码位置:com.alibaba.nacos.naming.core.Service#onChange
public void onChange(String key, Instances value) throws Exception {
Loggers.SRV_LOG.info("[NACOS-RAFT] datum is changed, key: {}, value: {}", key, value);
//遍历所有的实例
for (Instance instance : value.getInstanceList()) {
if (instance == null) {
// Reject this abnormal instance list:
throw new RuntimeException("got null instance " + key);
}
//设置instance的权重
if (instance.getWeight() > 10000.0D) {
instance.setWeight(10000.0D);
}
if (instance.getWeight() < 0.01D && instance.getWeight() > 0.0D) {
instance.setWeight(0.01D);
}
}
//更新实例
updateIPs(value.getInstanceList(), KeyBuilder.matchEphemeralInstanceListKey(key));
//重新计算校验和
recalculateChecksum();
}
onChange遍历所有的实例,并对instance进行设置权重,然后调用updateIPs方法更新所有的实例,最后调用recalculateChecksum方法重新计算校验和,在updateIPs方法中,会调用如下代码进行发布服务变更:
//代码位置:com.alibaba.nacos.naming.core.Service#updateIPs
public void updateIPs(Collection<Instance> instances, boolean ephemeral) {
//省略代码
getPushService().serviceChanged(this);
//省略代码
}
public void serviceChanged(Service service) {
// merge some change events to reduce the push frequency:
//如果futureMap已经包括服务变更了,只接返回
if (futureMap
.containsKey(UtilsAndCommons.assembleFullServiceName(service.getNamespaceId(), service.getName()))) {
return;
}
//服务变更发布
this.applicationContext.publishEvent(new ServiceChangeEvent(this, service));
}
PushService类用来发布服务变更的,该类的serviceChanged方法调用了Spring的ApplicationContext类的publishEvent方法进行发布服务变更。publishEvent方法会触发PushService类的onApplicationEvent方法完成最终的服务变更通知,这个方法等下再进行分析,先分析下PushService类。当初始化PushService类时,会执行PushService类的静态代码:
static {
try {
//创建udp通信
udpSocket = new DatagramSocket();
//处理nacos客户端的响应
Receiver receiver = new Receiver();
//启动Receiver任务处理nacos客户端的响应
Thread inThread = new Thread(receiver);
inThread.setDaemon(true);
inThread.setName("com.alibaba.nacos.naming.push.receiver");
inThread.start();
//清理僵尸客户端
GlobalExecutor.scheduleRetransmitter(() -> {
try {
removeClientIfZombie();
} catch (Throwable e) {
Loggers.PUSH.warn("[NACOS-PUSH] failed to remove client zombie");
}
}, 0, 20, TimeUnit.SECONDS);
} catch (SocketException e) {
Loggers.SRV_LOG.error("[NACOS-PUSH] failed to init push service");
}
}
PushService初始化时,会创建udp socket,进行服务端与客户端的数据通信。Receiver是处理接收到Nacos客户端的响应,这个响应就是当Nacos服务端同步数据给Nacos客户端时,Nacos客户端返回ack响应。最后启动定时任务清理Nacos客户端。
创建了udp socket,就可以分析 Service类的onApplicationEvent方法了:
public void onApplicationEvent(ServiceChangeEvent event) {
//获取服务相关信息
Service service = event.getService();
String serviceName = service.getName();
String namespaceId = service.getNamespaceId();
//定时发送udp推送
Future future = GlobalExecutor.scheduleUdpSender(() -> {
try {
Loggers.PUSH.info(serviceName + " is changed, add it to push queue.");
//获取Nacos客户端
ConcurrentMap<String, PushClient> clients = clientMap
.get(UtilsAndCommons.assembleFullServiceName(namespaceId, serviceName));
//如果Nacos客户端为空,则直接返回
if (MapUtils.isEmpty(clients)) {
return;
}
Map<String, Object> cache = new HashMap<>(16);
long lastRefTime = System.nanoTime();
//遍历所有的需要推送的客户端
for (PushClient client : clients.values()) {
//如果是僵尸客户端,删除僵尸客户端
if (client.zombie()) {
Loggers.PUSH.debug("client is zombie: " + client.toString());
clients.remove(client.toString());
Loggers.PUSH.debug("client is zombie: " + client.toString());
continue;
}
Receiver.AckEntry ackEntry;
Loggers.PUSH.debug("push serviceName: {} to client: {}", serviceName, client.toString());
String key = getPushCacheKey(serviceName, client.getIp(), client.getAgent());
byte[] compressData = null;
Map<String, Object> data = null;
if (switchDomain.getDefaultPushCacheMillis() >= 20000 && cache.containsKey(key)) {
org.javatuples.Pair pair = (org.javatuples.Pair) cache.get(key);
compressData = (byte[]) (pair.getValue0());
data = (Map<String, Object>) pair.getValue1();
Loggers.PUSH.debug("[PUSH-CACHE] cache hit: {}:{}", serviceName, client.getAddrStr());
}
//封装ackEntry
if (compressData != null) {
ackEntry = prepareAckEntry(client, compressData, data, lastRefTime);
} else {
ackEntry = prepareAckEntry(client, prepareHostsData(client), lastRefTime);
if (ackEntry != null) {
cache.put(key, new org.javatuples.Pair<>(ackEntry.origin.getData(), ackEntry.data));
}
}
//打印日志,代码省略
//udp推送
udpPush(ackEntry);
}
} catch (Exception e) {
} finally {
futureMap.remove(UtilsAndCommons.assembleFullServiceName(namespaceId, serviceName));
}
}, 1000, TimeUnit.MILLISECONDS);
futureMap.put(UtilsAndCommons.assembleFullServiceName(namespaceId, serviceName), future);
}
onApplicationEvent的逻辑如下:
- 遍历所有的需要推送的nacos客户端
- 封装推送给nacos客户端的AckEntry,AckEntry封装了推送的数据
- 将AckEntry推送给nacos客户端
udpPush将数据推送给nacos客户端:
private static Receiver.AckEntry udpPush(Receiver.AckEntry ackEntry) {
if (ackEntry == null) {
Loggers.PUSH.error("[NACOS-PUSH] ackEntry is null.");
return null;
}
//如果大于最大重试次数,大于1次,则删除相关的缓存
if (ackEntry.getRetryTimes() > MAX_RETRY_TIMES) {
Loggers.PUSH.warn("max re-push times reached, retry times {}, key: {}", ackEntry.retryTimes, ackEntry.key);
ackMap.remove(ackEntry.key);
udpSendTimeMap.remove(ackEntry.key);
failedPush += 1;
return ackEntry;
}
try {
//总推送次数
if (!ackMap.containsKey(ackEntry.key)) {
totalPush++;
}
//推送的数据包
ackMap.put(ackEntry.key, ackEntry);
//udp发送的时间戳
udpSendTimeMap.put(ackEntry.key, System.currentTimeMillis());
Loggers.PUSH.info("send udp packet: " + ackEntry.key);
//udp发送
udpSocket.send(ackEntry.origin);
ackEntry.increaseRetryTime();
//重试发送
GlobalExecutor.scheduleRetransmitter(new Retransmitter(ackEntry),
TimeUnit.NANOSECONDS.toMillis(ACK_TIMEOUT_NANOS), TimeUnit.MILLISECONDS);
return ackEntry;
} catch (Exception e) {
Loggers.PUSH.error("[NACOS-PUSH] failed to push data: {} to client: {}, error: {}", ackEntry.data,
ackEntry.origin.getAddress().getHostAddress(), e);
ackMap.remove(ackEntry.key);
udpSendTimeMap.remove(ackEntry.key);
failedPush += 1;
return null;
}
}
udpPush是真正将数据发送给nacos客户端的,逻辑如下:
- 如果超过最大重试次数一次,则删除ackMap和udpSendTimeMap中保存的数据包和udp发送的时间戳。
- 否则,发送数据包之前,首先将数据包保存到ackMap,发送时间保存到udpSendTimeMap中,然后将数据包发送给nacos客户端。
- 重试发送调用udpPush方法发送数据包给nacos客户端。
- 如果发送数据包发生异常,则删除ackMap和udpSendTimeMap中保存的数据包和发送时间戳。
nacos服务端将数据包发送给nacos客户端,那么nacos客户端是如何处理从nacos服务端接收到的数据?在NacosNamingService类初始化是,会初始化HostReactor类,HostReactor类中有一个PushReceiver类,PushReceiver类就是用来处理从nacos服务端接收到的数据。如下是PushReceiver初始化的代码:
public PushReceiver(HostReactor hostReactor) {
try {
this.hostReactor = hostReactor;
//获取udp端口,从环境变量中获取push.receiver.udp.port的值
String udpPort = getPushReceiverUdpPort();
if (StringUtils.isEmpty(udpPort)) {
this.udpSocket = new DatagramSocket();
} else {
this.udpSocket = new DatagramSocket(new InetSocketAddress(Integer.parseInt(udpPort)));
}
//初始化线程池,用来调度该对象
this.executorService = new ScheduledThreadPoolExecutor(1, new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread thread = new Thread(r);
thread.setDaemon(true);
thread.setName("com.alibaba.nacos.naming.push.receiver");
return thread;
}
});
this.executorService.execute(this);
} catch (Exception e) {
NAMING_LOGGER.error("[NA] init udp socket failed", e);
}
}
PushReceiver初始化时,从环境变量获取push.receiver.udp.port的值,即udp端口。并创建DatagramSocket对象,用来接收nacos服务端的发送过来的数据。并创建线程池,用来处理从nacos服务端接收到的数据。PushReceiver的run方法处理接收到数据包:
public void run() {
while (!closed) {
try {
// byte[] is initialized with 0 full filled by default
//buffer作为接收到数据的缓冲池
byte[] buffer = new byte[UDP_MSS];
DatagramPacket packet = new DatagramPacket(buffer, buffer.length);
//接收数据
udpSocket.receive(packet);
String json = new String(IoUtils.tryDecompress(packet.getData()), UTF_8).trim();
NAMING_LOGGER.info("received push data: " + json + " from " + packet.getAddress().toString());
//将接收到的数据,转换为PushPacket队列
PushPacket pushPacket = JacksonUtils.toObj(json, PushPacket.class);
String ack;
//拼接ack响应
if ("dom".equals(pushPacket.type) || "service".equals(pushPacket.type)) {
//处理数据包
hostReactor.processServiceJson(pushPacket.data);
// send ack to server
ack = "{\"type\": \"push-ack\"" + ", \"lastRefTime\":\"" + pushPacket.lastRefTime + "\", \"data\":"
+ "\"\"}";
} else if ("dump".equals(pushPacket.type)) {
// dump data to server
ack = "{\"type\": \"dump-ack\"" + ", \"lastRefTime\": \"" + pushPacket.lastRefTime + "\", \"data\":"
+ "\"" + StringUtils.escapeJavaScript(JacksonUtils.toJson(hostReactor.getServiceInfoMap()))
+ "\"}";
} else {
// do nothing send ack only
ack = "{\"type\": \"unknown-ack\"" + ", \"lastRefTime\":\"" + pushPacket.lastRefTime
+ "\", \"data\":" + "\"\"}";
}
//发送ack响应给nacos服务端
udpSocket.send(new DatagramPacket(ack.getBytes(UTF_8), ack.getBytes(UTF_8).length,
packet.getSocketAddress()));
} catch (Exception e) {
if (closed) {
return;
}
NAMING_LOGGER.error("[NA] error while receiving push data", e);
}
}
}
- 创建buffer作为缓存池,接收数据包
- 将接收到的数据转换为PushPacket,并调用processServiceJson方法处理数据。
- 封装ack,并将响应发送给nacos服务端。
processServiceJson方法将数据转换为ServiceInfo,判断服务信息是否有改变,当服务是新注册的,服务有更新、服务有删除,都说明服务被改变了。如果服务改变了,则通过消息通知机制告知所有订阅了InstancesChangeEvent事件的订阅者。这里就不分析了processServiceJson方法了,大致说下技术的一些大概实现。
原文链接https://zhuanlan.zhihu.com/p/556259644
标题:Nacos源码分析-客户端如何感知服务变更
作者:michael
地址:https://blog.junxworks.cn/articles/2024/03/08/1709866945487.html