Nacos源码分析-集群间临时实例数据的一致性同步
当Nacos集群部署时,临时实例数据在集群之间是如何进行同步的?
Nacos针对临时实例数据在集群之间的同步开发了Distro一致性协议,Distro一致性协议是弱一致性协议,用来保证Nacos注册中心的可用性,当临时实例注册到Nacos注册中心时,集群的实例数据并不是一致的,当通过Distro协议同步之后才最终达到一致性,所以Distro协议保存了Nacos注册中心的AP(可用性)。
Distro协议将数据分为多个blocks,每一个Nacos集群节点负责一个block的数据,当每一个block的数据的生成、删除、同步都由一个Nacos集群节点负责,每一个Nacos集群节点仅负责处理所有的实例数据的一部分。同时,每一个Nacso集群节点都会将数据同步给集群的其他节点。
这篇文章主要分析如下两个问题:
- Nacos 如何同步临时实例数据到集群其他节点?
- 新节点是如何进行实例数据的同步的?
Nacos 同步临时实例数据到集群其他节点
DistroProtocol类就是实现Distro协议的类。同步临时数据的方法如下:
//源码位置:com.alibaba.nacos.core.distributed.distro.DistroProtocol#sync
public void sync(DistroKey distroKey, DataOperation action, long delay) {
//遍历除了自身的所有集群节点
for (Member each : memberManager.allMembersWithoutSelf()) {
DistroKey distroKeyWithTarget = new DistroKey(distroKey.getResourceKey(), distroKey.getResourceType(),
each.getAddress());
//封装为 Distro延迟任务
DistroDelayTask distroDelayTask = new DistroDelayTask(distroKeyWithTarget, action, delay);
//获取延迟任务执行引擎,并将 Distro延迟任务添加到延迟任务执行引擎中
distroTaskEngineHolder.getDelayTaskExecuteEngine().addTask(distroKeyWithTarget, distroDelayTask);
if (Loggers.DISTRO.isDebugEnabled()) {
Loggers.DISTRO.debug("[DISTRO-SCHEDULE] {} to {}", distroKey, each.getAddress());
}
}
}
sync方法遍历除了自身的所有集群节点,并为每一个节点封装Distro延迟任务DistroDelayTask,然后通过任务引擎持有者distroTaskEngineHolder获取延迟任务执行引擎,并将延迟任务添加到延迟任务引擎中。添加到任务到延迟任务引擎中的源码分析可以参考《Nacos源码分析-任务执行引擎的设计分析》。
distroTaskEngineHolder是任务引擎的持有者,源码比较简单:
public class DistroTaskEngineHolder {
//延迟任务执行引擎
private final DistroDelayTaskExecuteEngine delayTaskExecuteEngine = new DistroDelayTaskExecuteEngine();
//普通任务执行引擎
private final DistroExecuteTaskExecuteEngine executeWorkersManager = new DistroExecuteTaskExecuteEngine();
public DistroTaskEngineHolder(DistroComponentHolder distroComponentHolder) {
//设置默认的延迟任务处理器
DistroDelayTaskProcessor defaultDelayTaskProcessor = new DistroDelayTaskProcessor(this, distroComponentHolder);
delayTaskExecuteEngine.setDefaultTaskProcessor(defaultDelayTaskProcessor);
}
public DistroDelayTaskExecuteEngine getDelayTaskExecuteEngine() {
return delayTaskExecuteEngine;
}
public DistroExecuteTaskExecuteEngine getExecuteWorkersManager() {
return executeWorkersManager;
}
//延迟任务执行引擎注册处理器
public void registerNacosTaskProcessor(Object key, NacosTaskProcessor nacosTaskProcessor) {
this.delayTaskExecuteEngine.addProcessor(key, nacosTaskProcessor);
}
}
DistroTaskEngineHolder类包括延迟任务执行引擎和普通任务执行引擎,并且提供了获取这两个任务执行引擎的方法。在DistroTaskEngineHolder初始化时,创建默认的延迟任务处理器DistroDelayTaskProcessor。当将延迟任务添加到延迟任务执行引擎中,默认任务处理器DistroDelayTaskProcessor就是进行处理延迟任务了:
public boolean process(NacosTask task) {
//如果不是延迟任务,直接返回true
if (!(task instanceof DistroDelayTask)) {
return true;
}
//转换为延迟任务
DistroDelayTask distroDelayTask = (DistroDelayTask) task;
DistroKey distroKey = distroDelayTask.getDistroKey();
//如果是数据改变行为,则创建Distro数据改变同步任务,并且将该任务添加到普通任务执行器引擎中。
if (DataOperation.CHANGE.equals(distroDelayTask.getAction())) {
DistroSyncChangeTask syncChangeTask = new DistroSyncChangeTask(distroKey, distroComponentHolder);
distroTaskEngineHolder.getExecuteWorkersManager().addTask(distroKey, syncChangeTask);
return true;
}
return false;
}
默认处理器DistroDelayTaskProcessor首先判断传进来的任务不是延迟任务,则直接返回ture。将传进来的任务转换为DistroDelayTask,如果是数据改变行为,则创建数据改变同步任务DistroSyncChangeTask,然后将任务添加到普通任务执行引擎中。DistroSyncChangeTask的run方法将会处理临时数据同步的任务:
public void run() {
Loggers.DISTRO.info("[DISTRO-START] {}", toString());
try {
String type = getDistroKey().getResourceType();
//获取需要同步的数据
DistroData distroData = distroComponentHolder.findDataStorage(type).getDistroData(getDistroKey());
//设置同步数据的类型
distroData.setType(DataOperation.CHANGE);
//进行临时实例数据同步
boolean result = distroComponentHolder.findTransportAgent(type).syncData(distroData, getDistroKey().getTargetServer());
//如果同步失败,则进行重试
if (!result) {
handleFailedTask();
}
Loggers.DISTRO.info("[DISTRO-END] {} result: {}", toString(), result);
} catch (Exception e) {
Loggers.DISTRO.warn("[DISTRO] Sync data change failed.", e);
//如果同步发生异常,则进行重试同步
handleFailedTask();
}
}
DistroSyncChangeTask的run方法将获取需要同步的数据并设置同步数据的类型,然后进行临时实例数据的同步,如果同步失败或者通过过程中发生异常,则进行重试处理,重试就是将任务重新添加到任务执行引擎中,供下次进行重试处理延迟任务。
进行临时实例数据的同步由DistroHttpAgent类的syncData方法负责:
public boolean syncData(DistroData data, String targetServer) {
//如果集群不包含该节点,则直接返回ture
if (!memberManager.hasMember(targetServer)) {
return true;
}
//获取同步的内容,通过http请求进行同步
byte[] dataContent = data.getContent();
return NamingProxy.syncData(dataContent, data.getDistroKey().getTargetServer());
}
syncData方法判断如果集群不包含该节点,则直接返回ture,然后通过http请求将数据同步给其他节点。请求的url为/nacos/v1/ns/distro/datum。
当其他节点接收到同步临时实例数据的请求,其他节点是如何处理同步过来的数据呢?
DistroController类的onSyncDatum方法处理同步过来的临时实例数据,
public ResponseEntity onSyncDatum(@RequestBody Map<String, Datum<Instances>> dataMap) throws Exception {
//如果同步过来的数据为空,抛出异常
if (dataMap.isEmpty()) {
Loggers.DISTRO.error("[onSync] receive empty entity!");
throw new NacosException(NacosException.INVALID_PARAM, "receive empty entity!");
}
//遍历同步的临时实例数据
for (Map.Entry<String, Datum<Instances>> entry : dataMap.entrySet()) {
//如果是临时数据
if (KeyBuilder.matchEphemeralInstanceListKey(entry.getKey())) {
String namespaceId = KeyBuilder.getNamespace(entry.getKey());
String serviceName = KeyBuilder.getServiceName(entry.getKey());
//如果没有service,则创建新的服务
if (!serviceManager.containService(namespaceId, serviceName) && switchDomain
.isDefaultInstanceEphemeral()) {
serviceManager.createEmptyService(namespaceId, serviceName, true);
}
DistroHttpData distroHttpData = new DistroHttpData(createDistroKey(entry.getKey()), entry.getValue());
//将接收的数据进行处理
distroProtocol.onReceive(distroHttpData);
}
}
return ResponseEntity.ok("ok");
}
onSyncDatum方法首先判断同步过来的数据是否为空,如果为空则抛出异常。遍历同步过来的数据,判断是否是临时实例数据,如果是则判断该服务实例是否已经存在,不存在就创建新的服务实例,然后将接收的数据交给distroProtocol的onReceive的方法处理。
public boolean onReceive(DistroData distroData) {
String resourceType = distroData.getDistroKey().getResourceType();
//获取处理器
DistroDataProcessor dataProcessor = distroComponentHolder.findDataProcessor(resourceType);
if (null == dataProcessor) {
Loggers.DISTRO.warn("[DISTRO] Can't find data process for received data {}", resourceType);
return false;
}
//处理同步的实例数据
return dataProcessor.processData(distroData);
}
onReceive方法首先根据资源类型找到处理实例数据的处理器,然后将数据交给DistroConsistencyServiceImpl处理器的processData方法处理数据。该方法如下:
public boolean processData(DistroData distroData) {
DistroHttpData distroHttpData = (DistroHttpData) distroData;
//进行反序列化
Datum<Instances> datum = (Datum<Instances>) distroHttpData.getDeserializedContent();
//将临时数据缓存到内容并进行通知
onPut(datum.key, datum.value);
return true;
}
processData方法将同步过来的数据进行反序列化,然后调用onPut方法进行临时数据缓存并进行通知,onPut方法的具体逻辑可以参考《Nacos源码分析-客户端如何感知服务变更》。到这里,Nacos同步临时实例数据到集群其他节点的源码分析已经完成了。接下来分析新节点是如何进行实例数据的同步的。
新节点同步实例数据
如果nacos集群中有新的节点加入,那么新节点就会从其他节点进行全量拉取数据。当DistroProtocol初始化时,调用startDistroTask方法进行全量拉取数据:
private void startDistroTask() {
if (EnvUtil.getStandaloneMode()) {
isInitialized = true;
return;
}
startVerifyTask();
//开始全量拉取数据
startLoadTask();
}
private void startLoadTask() {
DistroCallback loadCallback = new DistroCallback() {
@Override
public void onSuccess() {
isInitialized = true;
}
@Override
public void onFailed(Throwable throwable) {
isInitialized = false;
}
};
//提交全量拉取数据的任务
GlobalExecutor.submitLoadDataTask(
new DistroLoadDataTask(memberManager, distroComponentHolder, distroConfig, loadCallback));
}
startLoadTask方法开启了全量拉取数据的线程任务,DistroLoadDataTask是具体执行全量拉取数据的任务。run方法如下:
public void run() {
try {
load();
if (!checkCompleted()) {
GlobalExecutor.submitLoadDataTask(this, distroConfig.getLoadDataRetryDelayMillis());
} else {
loadCallback.onSuccess();
Loggers.DISTRO.info("[DISTRO-INIT] load snapshot data success");
}
} catch (Exception e) {
loadCallback.onFailed(e);
Loggers.DISTRO.error("[DISTRO-INIT] load snapshot data failed. ", e);
}
}
run方法使用load方法加载从远程加载全量数据,如果检测到加载数据没有完成,则继续提交全量拉取数据的任务,否则进行任务的成功回调。如果加载数据发生了异常,则进行任务的失败回调。
private void load() throws Exception {
while (memberManager.allMembersWithoutSelf().isEmpty()) {
Loggers.DISTRO.info("[DISTRO-INIT] waiting server list init...");
TimeUnit.SECONDS.sleep(1);
}
while (distroComponentHolder.getDataStorageTypes().isEmpty()) {
Loggers.DISTRO.info("[DISTRO-INIT] waiting distro data storage register...");
TimeUnit.SECONDS.sleep(1);
}
for (String each : distroComponentHolder.getDataStorageTypes()) {
if (!loadCompletedMap.containsKey(each) || !loadCompletedMap.get(each)) {
loadCompletedMap.put(each, loadAllDataSnapshotFromRemote(each));
}
}
}
在服务启动的时候,是没有其他远程服务的地址的,如果服务地址都是空的,则进行等待,直到服务地址不为空。
接着判断数据存储类型是否为空,如果为空,则进行等待,直到服务地址不为空。
遍历所有的数据存储类型,判断loadCompletedMap是否存在数据存储类型和该类型的数据是否已经加载完成,如果没有则调用loadAllDataSnapshotFromRemote进行全量数据的加载:
private boolean loadAllDataSnapshotFromRemote(String resourceType) {
DistroTransportAgent transportAgent = distroComponentHolder.findTransportAgent(resourceType);
DistroDataProcessor dataProcessor = distroComponentHolder.findDataProcessor(resourceType);
if (null == transportAgent || null == dataProcessor) {
Loggers.DISTRO.warn("[DISTRO-INIT] Can't find component for type {}, transportAgent: {}, dataProcessor: {}",
resourceType, transportAgent, dataProcessor);
return false;
}
//遍历所有的远程服务地址
for (Member each : memberManager.allMembersWithoutSelf()) {
try {
Loggers.DISTRO.info("[DISTRO-INIT] load snapshot {} from {}", resourceType, each.getAddress());
//通过http请求拉取远程服务的所有全量数据
DistroData distroData = transportAgent.getDatumSnapshot(each.getAddress());
//处理拉取回来的全量数据
boolean result = dataProcessor.processSnapshot(distroData);
Loggers.DISTRO
.info("[DISTRO-INIT] load snapshot {} from {} result: {}", resourceType, each.getAddress(),
result);
if (result) {
return true;
}
} catch (Exception e) {
Loggers.DISTRO.error("[DISTRO-INIT] load snapshot {} from {} failed.", resourceType, each.getAddress(), e);
}
}
return false;
}
loadAllDataSnapshotFromRemote方法做了两件事:
- 通过http请求拉取远程服务的所有全量数据:拉取数据的接口为:/distro/v1/ns/distro/datums
- 处理拉取回来的全量数据
处理全量数据的方法为processData:
private boolean processData(byte[] data) throws Exception {
//如果data的数据长度为0
if (data.length > 0) {
//对拉取的数据进行反序列化
Map<String, Datum<Instances>> datumMap = serializer.deserializeMap(data, Instances.class);
//对拉取回来的数据进行反序列化
for (Map.Entry<String, Datum<Instances>> entry : datumMap.entrySet()) {
//添加到内存中
dataStore.put(entry.getKey(), entry.getValue());
//如果监听器没有包含该服务
if (!listeners.containsKey(entry.getKey())) {
// pretty sure the service not exist:
if (switchDomain.isDefaultInstanceEphemeral()) {
// create empty service
//创建新的service对象
Loggers.DISTRO.info("creating service {}", entry.getKey());
Service service = new Service();
String serviceName = KeyBuilder.getServiceName(entry.getKey());
String namespaceId = KeyBuilder.getNamespace(entry.getKey());
service.setName(serviceName);
service.setNamespaceId(namespaceId);
service.setGroupName(Constants.DEFAULT_GROUP);
// now validate the service. if failed, exception will be thrown
service.setLastModifiedMillis(System.currentTimeMillis());
service.recalculateChecksum();
// The Listener corresponding to the key value must not be empty
RecordListener listener = listeners.get(KeyBuilder.SERVICE_META_KEY_PREFIX).peek();
if (Objects.isNull(listener)) {
return false;
}
listener.onChange(KeyBuilder.buildServiceMetaKey(namespaceId, serviceName), service);
}
}
}
//遍历所有的数据,并通知监听器
for (Map.Entry<String, Datum<Instances>> entry : datumMap.entrySet()) {
if (!listeners.containsKey(entry.getKey())) {
// Should not happen:
Loggers.DISTRO.warn("listener of {} not found.", entry.getKey());
continue;
}
try {
for (RecordListener listener : listeners.get(entry.getKey())) {
listener.onChange(entry.getKey(), entry.getValue().value);
}
} catch (Exception e) {
Loggers.DISTRO.error("[NACOS-DISTRO] error while execute listener of key: {}", entry.getKey(), e);
continue;
}
// Update data store if listener executed successfully:
dataStore.put(entry.getKey(), entry.getValue());
}
}
return true;
}
processData方法的逻辑:
- 对拉取回来的数据进行反序列化,并保存到内存中。
- 遍历全量数据,如果是临时实例数据,则创建新的service对象,并进行通知监听器数据改变了。
- 遍历全量数据,如果监听器中不包含该service,则跳过,然后遍历所有的监听器,通知监听器,数据已经发生改变了,并且最后更新缓存。
最后看看,远程服务是如何处理全量拉取数据的请求的:
@GetMapping("/datums")
public ResponseEntity getAllDatums() {
//获取临时实例数据
DistroData distroData = distroProtocol.onSnapshot(KeyBuilder.INSTANCE_LIST_KEY_PREFIX);
return ResponseEntity.ok(distroData.getContent());
}
public DistroData onSnapshot(String type) {
//根据类型获取数据存储器
DistroDataStorage distroDataStorage = distroComponentHolder.findDataStorage(type);
if (null == distroDataStorage) {
Loggers.DISTRO.warn("[DISTRO] Can't find data storage for received key {}", type);
return new DistroData(new DistroKey("snapshot", type), new byte[0]);
}
return distroDataStorage.getDatumSnapshot();
}
public DistroData getDatumSnapshot() {
//从内存中获取所有的缓存数据
Map<String, Datum> result = dataStore.getDataMap();
//进行序列化
byte[] dataContent = ApplicationUtils.getBean(Serializer.class).serialize(result);
DistroKey distroKey = new DistroKey("snapshot", KeyBuilder.INSTANCE_LIST_KEY_PREFIX);
return new DistroData(distroKey, dataContent);
}
全量数据拉取无非就是从内存中获取所有临时实例的数据,并且对数据进行序列化,然后返回给客户端。
原文链接https://zhuanlan.zhihu.com/p/565590169?utm_id=0
标题:Nacos源码分析-集群间临时实例数据的一致性同步
作者:michael
地址:https://blog.junxworks.cn/articles/2024/03/07/1709782628021.html