Nacos源码分析-客户端如何感知服务变更

  |   0 评论   |   0 浏览

《nacos源码阅读-服务注册分析》分析中,会调用到如下方法进行数据一致性的同步,这里一致性同步包括两个方法,第一,不同的Nacos客户端的数据是如何同步的,换句话说就是Nacos客户端是如何感知服务变更的,第二,Nacos集群的数据是如何同步的,这篇文章分析第一点,第二点在其他文章中进行分析。先看下服务注册时,最终会调用到下面的代码,在《nacos源码阅读-服务注册分析》分析中,只分析到这里就停止了,接下来进行consistencyService的put方法分析:

//源码位置:com.alibaba.nacos.naming.core.ServiceManager#addInstance
public void addInstance(String namespaceId, String serviceName, boolean ephemeral, Instance... ips)
            throws NacosException {
        //省略代码
        synchronized (service) {
            //省略代码 
            //数据一致性同步
            consistencyService.put(key, instances);
        }
}

consistencyService的put方法就是进行数据一致性同步的,该方法会调用DelegateConsistencyServiceImpl的实现,如下:

//代码位置:com.alibaba.nacos.naming.consistency.DelegateConsistencyServiceImpl#put
public void put(String key, Record value) throws NacosException {
        mapConsistencyService(key).put(key, value);
}

private ConsistencyService mapConsistencyService(String key) {
        return KeyBuilder.matchEphemeralKey(key) ? ephemeralConsistencyService : persistentConsistencyService;
}

DelegateConsistencyServiceImpl类的mapConsistencyService方法会根据key来选择不同的一致性同步类,当Instance是临时的,即key以com.alibaba.nacos.naming.iplist.ephemeral.开头,那么就选择ephemeralConsistencyService,否则选择persistentConsistencyService,ephemeralConsistencyService类是用来同步临时性Instance,persistentConsistencyService类是用来同步持久化的Instance的。ephemeralConsistencyService类的put方法如下:

//代码位置:com.alibaba.nacos.naming.consistency.ephemeral.distro#DistroConsistencyServiceImpl#put
public void put(String key, Record value) throws NacosException {
    //保存instance到内存。并添加服务变更通知到队列中
    onPut(key, value);
    //集群间的数据一致性同步
    distroProtocol.sync(new DistroKey(key, KeyBuilder.INSTANCE_LIST_KEY_PREFIX), DataOperation.CHANGE,
            globalConfig.getTaskDispatchPeriod() / 2);
}

onPut方法的作用:保存instance到内存。并添加服务变更通知到队列中。

distroProtocol.sync方法作用:集群间的数据一致性同步。

这里只分析onPut,onPut方法的代码如下:

public void onPut(String key, Record value) {
        //如果instance是临时的,将保存到内存中
        if (KeyBuilder.matchEphemeralInstanceListKey(key)) {
            Datum<Instances> datum = new Datum<>();
            datum.value = (Instances) value;
            datum.key = key;
            datum.timestamp.incrementAndGet();
            dataStore.put(key, datum);
        }
        //如果监听器没有包含key,则直接返回
        if (!listeners.containsKey(key)) {
            return;
        }
        //添加数据变更任务到队列中
        notifier.addTask(key, DataOperation.CHANGE);
}

onPut首先判断下instance是否是临时的,如果是的话,则将数据封装为Datum,并用DataStore保存到Map中。addTask是将数据变更添加到队列中。

public void addTask(String datumKey, DataOperation action) {

   if (services.containsKey(datumKey) && action == DataOperation.CHANGE) {
         return;
   }
   if (action == DataOperation.CHANGE) {
      services.put(datumKey, StringUtils.EMPTY);
   }
   tasks.offer(Pair.with(datumKey, action));
}

tasks是BlockingQueue>队列。到这里,实际上服务注册才结束,但是Nacos客户端的数据同步还没有分析到,将服务变更添加到队列中是由Notifier类的addTask方法完成的,Notifier类继承了Runnable,在DistroConsistencyServiceImpl初始化完成时,会调用init方法进行启动Notifier线程:

@PostConstruct
public void init() {
   GlobalExecutor.submitDistroNotifyTask(notifier);
}

Notifier类的run方法如下:

public void run() {
    Loggers.DISTRO.info("distro notifier started");

    for (; ; ) {
        try {
            Pair<String, DataOperation> pair = tasks.take();
            handle(pair);
        } catch (Throwable e) {
            Loggers.DISTRO.error("[NACOS-DISTRO] Error while handling notifying task", e);
        }
    }
}

run不断从tasks队列中拿出服务变更的可以,然后交给handle方法处理:

private void handle(Pair<String, DataOperation> pair) {
    try {
        String datumKey = pair.getValue0();
        DataOperation action = pair.getValue1();

        services.remove(datumKey);

        int count = 0;

        if (!listeners.containsKey(datumKey)) {
            return;
        }
        //遍历监听器
        for (RecordListener listener : listeners.get(datumKey)) {

            count++;

            try {
                //如果属于服务变更,则调用监听的onChange方法
                if (action == DataOperation.CHANGE) {
                    listener.onChange(datumKey, dataStore.get(datumKey).value);
                    continue;
                }

                if (action == DataOperation.DELETE) {
                    listener.onDelete(datumKey);
                    continue;
                }
            } catch (Throwable e) {
                Loggers.DISTRO.error("[NACOS-DISTRO] error while notifying listener of key: {}", datumKey, e);
            }
        }

        //省略代码
    } catch (Throwable e) {
        Loggers.DISTRO.error("[NACOS-DISTRO] Error while handling notifying task", e);
    }
}

handle不断遍历监听器,当action等于DataOperation.CHANGE,则调用listener.onChange方法,当action 等于DataOperation.DELETE,则调用listener.onDelete方法。当服务变更时,实际调用的是Service的onChange方法,该方法如下:

//代码位置:com.alibaba.nacos.naming.core.Service#onChange
public void onChange(String key, Instances value) throws Exception {

        Loggers.SRV_LOG.info("[NACOS-RAFT] datum is changed, key: {}, value: {}", key, value);
        //遍历所有的实例
        for (Instance instance : value.getInstanceList()) {

            if (instance == null) {
                // Reject this abnormal instance list:
                throw new RuntimeException("got null instance " + key);
            }
            //设置instance的权重
            if (instance.getWeight() > 10000.0D) {
                instance.setWeight(10000.0D);
            }

            if (instance.getWeight() < 0.01D && instance.getWeight() > 0.0D) {
                instance.setWeight(0.01D);
            }
        }
        //更新实例
        updateIPs(value.getInstanceList(), KeyBuilder.matchEphemeralInstanceListKey(key));
        //重新计算校验和
        recalculateChecksum();
}

onChange遍历所有的实例,并对instance进行设置权重,然后调用updateIPs方法更新所有的实例,最后调用recalculateChecksum方法重新计算校验和,在updateIPs方法中,会调用如下代码进行发布服务变更:

//代码位置:com.alibaba.nacos.naming.core.Service#updateIPs
public void updateIPs(Collection<Instance> instances, boolean ephemeral) {
    //省略代码
    getPushService().serviceChanged(this);
     //省略代码
}

public void serviceChanged(Service service) {
        // merge some change events to reduce the push frequency:
        //如果futureMap已经包括服务变更了,只接返回
        if (futureMap
                .containsKey(UtilsAndCommons.assembleFullServiceName(service.getNamespaceId(), service.getName()))) {
            return;
        }
        //服务变更发布
        this.applicationContext.publishEvent(new ServiceChangeEvent(this, service));
}

PushService类用来发布服务变更的,该类的serviceChanged方法调用了Spring的ApplicationContext类的publishEvent方法进行发布服务变更。publishEvent方法会触发PushService类的onApplicationEvent方法完成最终的服务变更通知,这个方法等下再进行分析,先分析下PushService类。当初始化PushService类时,会执行PushService类的静态代码:

static {
        try {
            //创建udp通信
            udpSocket = new DatagramSocket();
            //处理nacos客户端的响应
            Receiver receiver = new Receiver();
            //启动Receiver任务处理nacos客户端的响应
            Thread inThread = new Thread(receiver);
            inThread.setDaemon(true);
            inThread.setName("com.alibaba.nacos.naming.push.receiver");
            inThread.start();
            //清理僵尸客户端
            GlobalExecutor.scheduleRetransmitter(() -> {
                try {
                    removeClientIfZombie();
                } catch (Throwable e) {
                    Loggers.PUSH.warn("[NACOS-PUSH] failed to remove client zombie");
                }
            }, 0, 20, TimeUnit.SECONDS);

        } catch (SocketException e) {
            Loggers.SRV_LOG.error("[NACOS-PUSH] failed to init push service");
        }
}

PushService初始化时,会创建udp socket,进行服务端与客户端的数据通信。Receiver是处理接收到Nacos客户端的响应,这个响应就是当Nacos服务端同步数据给Nacos客户端时,Nacos客户端返回ack响应。最后启动定时任务清理Nacos客户端。

创建了udp socket,就可以分析 Service类的onApplicationEvent方法了:

public void onApplicationEvent(ServiceChangeEvent event) {

        //获取服务相关信息
        Service service = event.getService();
        String serviceName = service.getName();
        String namespaceId = service.getNamespaceId();
        //定时发送udp推送
        Future future = GlobalExecutor.scheduleUdpSender(() -> {
            try {
                Loggers.PUSH.info(serviceName + " is changed, add it to push queue.");
                //获取Nacos客户端
                ConcurrentMap<String, PushClient> clients = clientMap
                        .get(UtilsAndCommons.assembleFullServiceName(namespaceId, serviceName));
                //如果Nacos客户端为空,则直接返回
                if (MapUtils.isEmpty(clients)) {
                    return;
                }

                Map<String, Object> cache = new HashMap<>(16);
                long lastRefTime = System.nanoTime();
                //遍历所有的需要推送的客户端
                for (PushClient client : clients.values()) {
                    //如果是僵尸客户端,删除僵尸客户端
                    if (client.zombie()) {
                        Loggers.PUSH.debug("client is zombie: " + client.toString());
                        clients.remove(client.toString());
                        Loggers.PUSH.debug("client is zombie: " + client.toString());
                        continue;
                    }

                    Receiver.AckEntry ackEntry;
                    Loggers.PUSH.debug("push serviceName: {} to client: {}", serviceName, client.toString());

                    String key = getPushCacheKey(serviceName, client.getIp(), client.getAgent());
                    byte[] compressData = null;
                    Map<String, Object> data = null;

                    if (switchDomain.getDefaultPushCacheMillis() >= 20000 && cache.containsKey(key)) {
                        org.javatuples.Pair pair = (org.javatuples.Pair) cache.get(key);
                        compressData = (byte[]) (pair.getValue0());
                        data = (Map<String, Object>) pair.getValue1();

                        Loggers.PUSH.debug("[PUSH-CACHE] cache hit: {}:{}", serviceName, client.getAddrStr());
                    }
                    //封装ackEntry
                    if (compressData != null) {
                        ackEntry = prepareAckEntry(client, compressData, data, lastRefTime);
                    } else {
                        ackEntry = prepareAckEntry(client, prepareHostsData(client), lastRefTime);
                        if (ackEntry != null) {
                            cache.put(key, new org.javatuples.Pair<>(ackEntry.origin.getData(), ackEntry.data));
                        }
                    }

                   //打印日志,代码省略

                  //udp推送
                    udpPush(ackEntry);
                }
            } catch (Exception e) {

            } finally {
                futureMap.remove(UtilsAndCommons.assembleFullServiceName(namespaceId, serviceName));
            }

        }, 1000, TimeUnit.MILLISECONDS);
        futureMap.put(UtilsAndCommons.assembleFullServiceName(namespaceId, serviceName), future);

}

onApplicationEvent的逻辑如下:

  • 遍历所有的需要推送的nacos客户端
  • 封装推送给nacos客户端的AckEntry,AckEntry封装了推送的数据
  • 将AckEntry推送给nacos客户端

udpPush将数据推送给nacos客户端:

private static Receiver.AckEntry udpPush(Receiver.AckEntry ackEntry) {
        if (ackEntry == null) {
            Loggers.PUSH.error("[NACOS-PUSH] ackEntry is null.");
            return null;
        }
        //如果大于最大重试次数,大于1次,则删除相关的缓存
        if (ackEntry.getRetryTimes() > MAX_RETRY_TIMES) {
            Loggers.PUSH.warn("max re-push times reached, retry times {}, key: {}", ackEntry.retryTimes, ackEntry.key);
            ackMap.remove(ackEntry.key);
            udpSendTimeMap.remove(ackEntry.key);
            failedPush += 1;
            return ackEntry;
        }

        try {
            //总推送次数
            if (!ackMap.containsKey(ackEntry.key)) {
                totalPush++;
            }
            //推送的数据包
            ackMap.put(ackEntry.key, ackEntry);
            //udp发送的时间戳
            udpSendTimeMap.put(ackEntry.key, System.currentTimeMillis());

            Loggers.PUSH.info("send udp packet: " + ackEntry.key);
            //udp发送
            udpSocket.send(ackEntry.origin);

            ackEntry.increaseRetryTime();
            //重试发送
            GlobalExecutor.scheduleRetransmitter(new Retransmitter(ackEntry),
                    TimeUnit.NANOSECONDS.toMillis(ACK_TIMEOUT_NANOS), TimeUnit.MILLISECONDS);

            return ackEntry;
        } catch (Exception e) {
            Loggers.PUSH.error("[NACOS-PUSH] failed to push data: {} to client: {}, error: {}", ackEntry.data,
                    ackEntry.origin.getAddress().getHostAddress(), e);
            ackMap.remove(ackEntry.key);
            udpSendTimeMap.remove(ackEntry.key);
            failedPush += 1;

            return null;
        }
}

udpPush是真正将数据发送给nacos客户端的,逻辑如下:

  • 如果超过最大重试次数一次,则删除ackMap和udpSendTimeMap中保存的数据包和udp发送的时间戳。
  • 否则,发送数据包之前,首先将数据包保存到ackMap,发送时间保存到udpSendTimeMap中,然后将数据包发送给nacos客户端。
  • 重试发送调用udpPush方法发送数据包给nacos客户端。
  • 如果发送数据包发生异常,则删除ackMap和udpSendTimeMap中保存的数据包和发送时间戳。

nacos服务端将数据包发送给nacos客户端,那么nacos客户端是如何处理从nacos服务端接收到的数据?在NacosNamingService类初始化是,会初始化HostReactor类,HostReactor类中有一个PushReceiver类,PushReceiver类就是用来处理从nacos服务端接收到的数据。如下是PushReceiver初始化的代码:

public PushReceiver(HostReactor hostReactor) {
        try {
            this.hostReactor = hostReactor;
            //获取udp端口,从环境变量中获取push.receiver.udp.port的值
            String udpPort = getPushReceiverUdpPort();
            if (StringUtils.isEmpty(udpPort)) {
                this.udpSocket = new DatagramSocket();
            } else {
                this.udpSocket = new DatagramSocket(new InetSocketAddress(Integer.parseInt(udpPort)));
            }
            //初始化线程池,用来调度该对象
            this.executorService = new ScheduledThreadPoolExecutor(1, new ThreadFactory() {
                @Override
                public Thread newThread(Runnable r) {
                    Thread thread = new Thread(r);
                    thread.setDaemon(true);
                    thread.setName("com.alibaba.nacos.naming.push.receiver");
                    return thread;
                }
            });

            this.executorService.execute(this);
        } catch (Exception e) {
            NAMING_LOGGER.error("[NA] init udp socket failed", e);
        }
}

PushReceiver初始化时,从环境变量获取push.receiver.udp.port的值,即udp端口。并创建DatagramSocket对象,用来接收nacos服务端的发送过来的数据。并创建线程池,用来处理从nacos服务端接收到的数据。PushReceiver的run方法处理接收到数据包:

public void run() {
        while (!closed) {
            try {

                // byte[] is initialized with 0 full filled by default
                //buffer作为接收到数据的缓冲池
                byte[] buffer = new byte[UDP_MSS];
                DatagramPacket packet = new DatagramPacket(buffer, buffer.length);
                //接收数据
                udpSocket.receive(packet);

                String json = new String(IoUtils.tryDecompress(packet.getData()), UTF_8).trim();
                NAMING_LOGGER.info("received push data: " + json + " from " + packet.getAddress().toString());
                //将接收到的数据,转换为PushPacket队列
                PushPacket pushPacket = JacksonUtils.toObj(json, PushPacket.class);
                String ack;
                //拼接ack响应
                if ("dom".equals(pushPacket.type) || "service".equals(pushPacket.type)) {

                    //处理数据包
                    hostReactor.processServiceJson(pushPacket.data);

                    // send ack to server
                    ack = "{\"type\": \"push-ack\"" + ", \"lastRefTime\":\"" + pushPacket.lastRefTime + "\", \"data\":"
                            + "\"\"}";
                } else if ("dump".equals(pushPacket.type)) {
                    // dump data to server
                    ack = "{\"type\": \"dump-ack\"" + ", \"lastRefTime\": \"" + pushPacket.lastRefTime + "\", \"data\":"
                            + "\"" + StringUtils.escapeJavaScript(JacksonUtils.toJson(hostReactor.getServiceInfoMap()))
                            + "\"}";
                } else {
                    // do nothing send ack only
                    ack = "{\"type\": \"unknown-ack\"" + ", \"lastRefTime\":\"" + pushPacket.lastRefTime
                            + "\", \"data\":" + "\"\"}";
                }
                //发送ack响应给nacos服务端
                udpSocket.send(new DatagramPacket(ack.getBytes(UTF_8), ack.getBytes(UTF_8).length,
                        packet.getSocketAddress()));
            } catch (Exception e) {
                if (closed) {
                    return;
                }
                NAMING_LOGGER.error("[NA] error while receiving push data", e);
            }
        }
}
  • 创建buffer作为缓存池,接收数据包
  • 将接收到的数据转换为PushPacket,并调用processServiceJson方法处理数据。
  • 封装ack,并将响应发送给nacos服务端。

processServiceJson方法将数据转换为ServiceInfo,判断服务信息是否有改变,当服务是新注册的,服务有更新、服务有删除,都说明服务被改变了。如果服务改变了,则通过消息通知机制告知所有订阅了InstancesChangeEvent事件的订阅者。这里就不分析了processServiceJson方法了,大致说下技术的一些大概实现。

原文链接https://zhuanlan.zhihu.com/p/556259644


标题:Nacos源码分析-客户端如何感知服务变更
作者:michael
地址:https://blog.junxworks.cn/articles/2024/03/08/1709866945487.html