Nacos源码分析-集群间临时实例数据的一致性同步

  |   0 评论   |   0 浏览

当Nacos集群部署时,临时实例数据在集群之间是如何进行同步的?

Nacos针对临时实例数据在集群之间的同步开发了Distro一致性协议,Distro一致性协议是弱一致性协议,用来保证Nacos注册中心的可用性,当临时实例注册到Nacos注册中心时,集群的实例数据并不是一致的,当通过Distro协议同步之后才最终达到一致性,所以Distro协议保存了Nacos注册中心的AP(可用性)。

Distro协议将数据分为多个blocks,每一个Nacos集群节点负责一个block的数据,当每一个block的数据的生成、删除、同步都由一个Nacos集群节点负责,每一个Nacos集群节点仅负责处理所有的实例数据的一部分。同时,每一个Nacso集群节点都会将数据同步给集群的其他节点。

这篇文章主要分析如下两个问题:

  • Nacos 如何同步临时实例数据到集群其他节点?
  • 新节点是如何进行实例数据的同步的?

Nacos 同步临时实例数据到集群其他节点

DistroProtocol类就是实现Distro协议的类。同步临时数据的方法如下:

//源码位置:com.alibaba.nacos.core.distributed.distro.DistroProtocol#sync
public void sync(DistroKey distroKey, DataOperation action, long delay) {
        //遍历除了自身的所有集群节点
        for (Member each : memberManager.allMembersWithoutSelf()) {
            DistroKey distroKeyWithTarget = new DistroKey(distroKey.getResourceKey(), distroKey.getResourceType(),
                    each.getAddress());
            //封装为 Distro延迟任务
            DistroDelayTask distroDelayTask = new DistroDelayTask(distroKeyWithTarget, action, delay);
            //获取延迟任务执行引擎,并将 Distro延迟任务添加到延迟任务执行引擎中
            distroTaskEngineHolder.getDelayTaskExecuteEngine().addTask(distroKeyWithTarget, distroDelayTask);
            if (Loggers.DISTRO.isDebugEnabled()) {
                Loggers.DISTRO.debug("[DISTRO-SCHEDULE] {} to {}", distroKey, each.getAddress());
            }
        }
}

sync方法遍历除了自身的所有集群节点,并为每一个节点封装Distro延迟任务DistroDelayTask,然后通过任务引擎持有者distroTaskEngineHolder获取延迟任务执行引擎,并将延迟任务添加到延迟任务引擎中。添加到任务到延迟任务引擎中的源码分析可以参考《Nacos源码分析-任务执行引擎的设计分析》

distroTaskEngineHolder是任务引擎的持有者,源码比较简单:

public class DistroTaskEngineHolder {

    //延迟任务执行引擎
    private final DistroDelayTaskExecuteEngine delayTaskExecuteEngine = new DistroDelayTaskExecuteEngine();

    //普通任务执行引擎
    private final DistroExecuteTaskExecuteEngine executeWorkersManager = new DistroExecuteTaskExecuteEngine();

    public DistroTaskEngineHolder(DistroComponentHolder distroComponentHolder) {
        //设置默认的延迟任务处理器
        DistroDelayTaskProcessor defaultDelayTaskProcessor = new DistroDelayTaskProcessor(this, distroComponentHolder);
        delayTaskExecuteEngine.setDefaultTaskProcessor(defaultDelayTaskProcessor);
    }

    public DistroDelayTaskExecuteEngine getDelayTaskExecuteEngine() {
        return delayTaskExecuteEngine;
    }

    public DistroExecuteTaskExecuteEngine getExecuteWorkersManager() {
        return executeWorkersManager;
    }

    //延迟任务执行引擎注册处理器
    public void registerNacosTaskProcessor(Object key, NacosTaskProcessor nacosTaskProcessor) {
        this.delayTaskExecuteEngine.addProcessor(key, nacosTaskProcessor);
    }
}

DistroTaskEngineHolder类包括延迟任务执行引擎和普通任务执行引擎,并且提供了获取这两个任务执行引擎的方法。在DistroTaskEngineHolder初始化时,创建默认的延迟任务处理器DistroDelayTaskProcessor。当将延迟任务添加到延迟任务执行引擎中,默认任务处理器DistroDelayTaskProcessor就是进行处理延迟任务了:

public boolean process(NacosTask task) {
    //如果不是延迟任务,直接返回true
    if (!(task instanceof DistroDelayTask)) {
        return true;
    }
    //转换为延迟任务
    DistroDelayTask distroDelayTask = (DistroDelayTask) task;
    DistroKey distroKey = distroDelayTask.getDistroKey();
    //如果是数据改变行为,则创建Distro数据改变同步任务,并且将该任务添加到普通任务执行器引擎中。
    if (DataOperation.CHANGE.equals(distroDelayTask.getAction())) {
        DistroSyncChangeTask syncChangeTask = new DistroSyncChangeTask(distroKey, distroComponentHolder);
        distroTaskEngineHolder.getExecuteWorkersManager().addTask(distroKey, syncChangeTask);
        return true;
    }
    return false;
}

默认处理器DistroDelayTaskProcessor首先判断传进来的任务不是延迟任务,则直接返回ture。将传进来的任务转换为DistroDelayTask,如果是数据改变行为,则创建数据改变同步任务DistroSyncChangeTask,然后将任务添加到普通任务执行引擎中。DistroSyncChangeTask的run方法将会处理临时数据同步的任务:

public void run() {
    Loggers.DISTRO.info("[DISTRO-START] {}", toString());
    try {
        String type = getDistroKey().getResourceType();
        //获取需要同步的数据
        DistroData distroData = distroComponentHolder.findDataStorage(type).getDistroData(getDistroKey());
        //设置同步数据的类型
        distroData.setType(DataOperation.CHANGE);
        //进行临时实例数据同步
        boolean result = distroComponentHolder.findTransportAgent(type).syncData(distroData, getDistroKey().getTargetServer());
        //如果同步失败,则进行重试
        if (!result) {
            handleFailedTask();
        }
        Loggers.DISTRO.info("[DISTRO-END] {} result: {}", toString(), result);
    } catch (Exception e) {
        Loggers.DISTRO.warn("[DISTRO] Sync data change failed.", e);
        //如果同步发生异常,则进行重试同步
        handleFailedTask();
    }
}

DistroSyncChangeTask的run方法将获取需要同步的数据并设置同步数据的类型,然后进行临时实例数据的同步,如果同步失败或者通过过程中发生异常,则进行重试处理,重试就是将任务重新添加到任务执行引擎中,供下次进行重试处理延迟任务。

进行临时实例数据的同步由DistroHttpAgent类的syncData方法负责:

public boolean syncData(DistroData data, String targetServer) {
    //如果集群不包含该节点,则直接返回ture
    if (!memberManager.hasMember(targetServer)) {
        return true;
    }
    //获取同步的内容,通过http请求进行同步
    byte[] dataContent = data.getContent();
    return NamingProxy.syncData(dataContent, data.getDistroKey().getTargetServer());
}

syncData方法判断如果集群不包含该节点,则直接返回ture,然后通过http请求将数据同步给其他节点。请求的url为/nacos/v1/ns/distro/datum。

当其他节点接收到同步临时实例数据的请求,其他节点是如何处理同步过来的数据呢?

DistroController类的onSyncDatum方法处理同步过来的临时实例数据,

public ResponseEntity onSyncDatum(@RequestBody Map<String, Datum<Instances>> dataMap) throws Exception {
        //如果同步过来的数据为空,抛出异常
        if (dataMap.isEmpty()) {
            Loggers.DISTRO.error("[onSync] receive empty entity!");
            throw new NacosException(NacosException.INVALID_PARAM, "receive empty entity!");
        }
        //遍历同步的临时实例数据
        for (Map.Entry<String, Datum<Instances>> entry : dataMap.entrySet()) {
            //如果是临时数据
            if (KeyBuilder.matchEphemeralInstanceListKey(entry.getKey())) {
                String namespaceId = KeyBuilder.getNamespace(entry.getKey());
                String serviceName = KeyBuilder.getServiceName(entry.getKey());
                //如果没有service,则创建新的服务
                if (!serviceManager.containService(namespaceId, serviceName) && switchDomain
                        .isDefaultInstanceEphemeral()) {
                    serviceManager.createEmptyService(namespaceId, serviceName, true);
                }
                DistroHttpData distroHttpData = new DistroHttpData(createDistroKey(entry.getKey()), entry.getValue());
                //将接收的数据进行处理
                distroProtocol.onReceive(distroHttpData);
            }
        }
        return ResponseEntity.ok("ok");
}

onSyncDatum方法首先判断同步过来的数据是否为空,如果为空则抛出异常。遍历同步过来的数据,判断是否是临时实例数据,如果是则判断该服务实例是否已经存在,不存在就创建新的服务实例,然后将接收的数据交给distroProtocol的onReceive的方法处理。

public boolean onReceive(DistroData distroData) {
    String resourceType = distroData.getDistroKey().getResourceType();
    //获取处理器
    DistroDataProcessor dataProcessor = distroComponentHolder.findDataProcessor(resourceType);
    if (null == dataProcessor) {
        Loggers.DISTRO.warn("[DISTRO] Can't find data process for received data {}", resourceType);
        return false;
    }
    //处理同步的实例数据
    return dataProcessor.processData(distroData);
}

onReceive方法首先根据资源类型找到处理实例数据的处理器,然后将数据交给DistroConsistencyServiceImpl处理器的processData方法处理数据。该方法如下:

public boolean processData(DistroData distroData) {
    DistroHttpData distroHttpData = (DistroHttpData) distroData;
    //进行反序列化
    Datum<Instances> datum = (Datum<Instances>) distroHttpData.getDeserializedContent();
    //将临时数据缓存到内容并进行通知
    onPut(datum.key, datum.value);
    return true;
}

processData方法将同步过来的数据进行反序列化,然后调用onPut方法进行临时数据缓存并进行通知,onPut方法的具体逻辑可以参考《Nacos源码分析-客户端如何感知服务变更》。到这里,Nacos同步临时实例数据到集群其他节点的源码分析已经完成了。接下来分析新节点是如何进行实例数据的同步的。

新节点同步实例数据

如果nacos集群中有新的节点加入,那么新节点就会从其他节点进行全量拉取数据。当DistroProtocol初始化时,调用startDistroTask方法进行全量拉取数据:

private void startDistroTask() {
    if (EnvUtil.getStandaloneMode()) {
        isInitialized = true;
        return;
    }
    startVerifyTask();
    //开始全量拉取数据
    startLoadTask();
}

private void startLoadTask() {
        DistroCallback loadCallback = new DistroCallback() {
            @Override
            public void onSuccess() {
                isInitialized = true;
            }

            @Override
            public void onFailed(Throwable throwable) {
                isInitialized = false;
            }
        };
        //提交全量拉取数据的任务
        GlobalExecutor.submitLoadDataTask(
                new DistroLoadDataTask(memberManager, distroComponentHolder, distroConfig, loadCallback));
}

startLoadTask方法开启了全量拉取数据的线程任务,DistroLoadDataTask是具体执行全量拉取数据的任务。run方法如下:

public void run() {
    try {
        load();
        if (!checkCompleted()) {
            GlobalExecutor.submitLoadDataTask(this, distroConfig.getLoadDataRetryDelayMillis());
        } else {
            loadCallback.onSuccess();
            Loggers.DISTRO.info("[DISTRO-INIT] load snapshot data success");
        }
    } catch (Exception e) {
        loadCallback.onFailed(e);
        Loggers.DISTRO.error("[DISTRO-INIT] load snapshot data failed. ", e);
    }
}

run方法使用load方法加载从远程加载全量数据,如果检测到加载数据没有完成,则继续提交全量拉取数据的任务,否则进行任务的成功回调。如果加载数据发生了异常,则进行任务的失败回调。

private void load() throws Exception {
    while (memberManager.allMembersWithoutSelf().isEmpty()) {
        Loggers.DISTRO.info("[DISTRO-INIT] waiting server list init...");
        TimeUnit.SECONDS.sleep(1);
    }
    while (distroComponentHolder.getDataStorageTypes().isEmpty()) {
        Loggers.DISTRO.info("[DISTRO-INIT] waiting distro data storage register...");
        TimeUnit.SECONDS.sleep(1);
    }
    for (String each : distroComponentHolder.getDataStorageTypes()) {
        if (!loadCompletedMap.containsKey(each) || !loadCompletedMap.get(each)) {
            loadCompletedMap.put(each, loadAllDataSnapshotFromRemote(each));
        }
    }
}

在服务启动的时候,是没有其他远程服务的地址的,如果服务地址都是空的,则进行等待,直到服务地址不为空。

接着判断数据存储类型是否为空,如果为空,则进行等待,直到服务地址不为空。

遍历所有的数据存储类型,判断loadCompletedMap是否存在数据存储类型和该类型的数据是否已经加载完成,如果没有则调用loadAllDataSnapshotFromRemote进行全量数据的加载:

private boolean loadAllDataSnapshotFromRemote(String resourceType) {
    DistroTransportAgent transportAgent = distroComponentHolder.findTransportAgent(resourceType);
    DistroDataProcessor dataProcessor = distroComponentHolder.findDataProcessor(resourceType);
    if (null == transportAgent || null == dataProcessor) {
        Loggers.DISTRO.warn("[DISTRO-INIT] Can't find component for type {}, transportAgent: {}, dataProcessor: {}",
                resourceType, transportAgent, dataProcessor);
        return false;
    }
    //遍历所有的远程服务地址
    for (Member each : memberManager.allMembersWithoutSelf()) {
        try {
            Loggers.DISTRO.info("[DISTRO-INIT] load snapshot {} from {}", resourceType, each.getAddress());
            //通过http请求拉取远程服务的所有全量数据
            DistroData distroData = transportAgent.getDatumSnapshot(each.getAddress());
            //处理拉取回来的全量数据
            boolean result = dataProcessor.processSnapshot(distroData);
            Loggers.DISTRO
                    .info("[DISTRO-INIT] load snapshot {} from {} result: {}", resourceType, each.getAddress(),
                            result);
            if (result) {
                return true;
            }
        } catch (Exception e) {
            Loggers.DISTRO.error("[DISTRO-INIT] load snapshot {} from {} failed.", resourceType, each.getAddress(), e);
        }
    }
    return false;
}

loadAllDataSnapshotFromRemote方法做了两件事:

  • 通过http请求拉取远程服务的所有全量数据:拉取数据的接口为:/distro/v1/ns/distro/datums
  • 处理拉取回来的全量数据

处理全量数据的方法为processData:

private boolean processData(byte[] data) throws Exception {
    //如果data的数据长度为0
    if (data.length > 0) {
        //对拉取的数据进行反序列化
        Map<String, Datum<Instances>> datumMap = serializer.deserializeMap(data, Instances.class);

        //对拉取回来的数据进行反序列化
        for (Map.Entry<String, Datum<Instances>> entry : datumMap.entrySet()) {
            //添加到内存中
            dataStore.put(entry.getKey(), entry.getValue());
            //如果监听器没有包含该服务
            if (!listeners.containsKey(entry.getKey())) {
                // pretty sure the service not exist:
                if (switchDomain.isDefaultInstanceEphemeral()) {
                    // create empty service
                    //创建新的service对象
                    Loggers.DISTRO.info("creating service {}", entry.getKey());
                    Service service = new Service();
                    String serviceName = KeyBuilder.getServiceName(entry.getKey());
                    String namespaceId = KeyBuilder.getNamespace(entry.getKey());
                    service.setName(serviceName);
                    service.setNamespaceId(namespaceId);
                    service.setGroupName(Constants.DEFAULT_GROUP);
                    // now validate the service. if failed, exception will be thrown
                    service.setLastModifiedMillis(System.currentTimeMillis());
                    service.recalculateChecksum();

                    // The Listener corresponding to the key value must not be empty
                    RecordListener listener = listeners.get(KeyBuilder.SERVICE_META_KEY_PREFIX).peek();
                    if (Objects.isNull(listener)) {
                        return false;
                    }
                    listener.onChange(KeyBuilder.buildServiceMetaKey(namespaceId, serviceName), service);
                }
            }
        }

        //遍历所有的数据,并通知监听器
        for (Map.Entry<String, Datum<Instances>> entry : datumMap.entrySet()) {

            if (!listeners.containsKey(entry.getKey())) {
                // Should not happen:
                Loggers.DISTRO.warn("listener of {} not found.", entry.getKey());
                continue;
            }

            try {
                for (RecordListener listener : listeners.get(entry.getKey())) {
                    listener.onChange(entry.getKey(), entry.getValue().value);
                }
            } catch (Exception e) {
                Loggers.DISTRO.error("[NACOS-DISTRO] error while execute listener of key: {}", entry.getKey(), e);
                continue;
            }

            // Update data store if listener executed successfully:
            dataStore.put(entry.getKey(), entry.getValue());
        }
    }
    return true;
}

processData方法的逻辑:

  • 对拉取回来的数据进行反序列化,并保存到内存中。
  • 遍历全量数据,如果是临时实例数据,则创建新的service对象,并进行通知监听器数据改变了。
  • 遍历全量数据,如果监听器中不包含该service,则跳过,然后遍历所有的监听器,通知监听器,数据已经发生改变了,并且最后更新缓存。

最后看看,远程服务是如何处理全量拉取数据的请求的:

@GetMapping("/datums")
public ResponseEntity getAllDatums() {
        //获取临时实例数据
        DistroData distroData = distroProtocol.onSnapshot(KeyBuilder.INSTANCE_LIST_KEY_PREFIX);
        return ResponseEntity.ok(distroData.getContent());
}
public DistroData onSnapshot(String type) {
        //根据类型获取数据存储器
        DistroDataStorage distroDataStorage = distroComponentHolder.findDataStorage(type);
        if (null == distroDataStorage) {
            Loggers.DISTRO.warn("[DISTRO] Can't find data storage for received key {}", type);
            return new DistroData(new DistroKey("snapshot", type), new byte[0]);
        }
        return distroDataStorage.getDatumSnapshot();
}
public DistroData getDatumSnapshot() {
        //从内存中获取所有的缓存数据
        Map<String, Datum> result = dataStore.getDataMap();
        //进行序列化
        byte[] dataContent = ApplicationUtils.getBean(Serializer.class).serialize(result);
        DistroKey distroKey = new DistroKey("snapshot", KeyBuilder.INSTANCE_LIST_KEY_PREFIX);
        return new DistroData(distroKey, dataContent);
}

全量数据拉取无非就是从内存中获取所有临时实例的数据,并且对数据进行序列化,然后返回给客户端。

原文链接https://zhuanlan.zhihu.com/p/565590169?utm_id=0


标题:Nacos源码分析-集群间临时实例数据的一致性同步
作者:michael
地址:https://blog.junxworks.cn/articles/2024/03/07/1709782628021.html