本文基于
shenyu-2.6.1版本进行源码分析,官网的介绍请参考 数据同步原理 。
Admin管理端
以新增插件的流程来理解下整体的流程

接收数据
- PluginController.createPlugin()
进入PluginController类中的createPlugin()方法,它负责数据的校验,添加或更新数据,返回结果信息。
@Validated
@RequiredArgsConstructor
@RestController
@RequestMapping("/plugin")
public class PluginController {
@PostMapping("")
@RequiresPermissions("system:plugin:add")
public ShenyuAdminResult createPlugin(@Valid @ModelAttribute final PluginDTO pluginDTO) {
// 调用pluginService.createOrUpdate 进行处理逻辑
return ShenyuAdminResult.success(pluginService.createOrUpdate(pluginDTO));
}
// ......
}
处理数据
- PluginServiceImpl.createOrUpdate() -> PluginServiceImpl.create()
在PluginServiceImpl类中通过create()方法完成数据的转换,保存到数据库,发布事件。
@RequiredArgsConstructor
@Service
public class PluginServiceImpl implements SelectorService {
// 事件发布对象 pluginEventPublisher
private final PluginEventPublisher pluginEventPublisher;
private String create(final PluginDTO pluginDTO) {
// 判断有没有对应的插件
Assert.isNull(pluginMapper.nameExisted(pluginDTO.getName()), AdminConstants.PLUGIN_NAME_IS_EXIST);
// 自定义的插件jar
if (!Objects.isNull(pluginDTO.getFile())) {
Assert.isTrue(checkFile(Base64.decode(pluginDTO.getFile())), AdminConstants.THE_PLUGIN_JAR_FILE_IS_NOT_CORRECT_OR_EXCEEDS_16_MB);
}
// 创建plugin对象
PluginDO pluginDO = PluginDO.buildPluginDO(pluginDTO);
// 插入对象到数据库
if (pluginMapper.insertSelective(pluginDO) > 0) {
// 插件新增成功,则发布创建事件
// publish create event. init plugin data
pluginEventPublisher.onCreated(pluginDO);
}
return ShenyuResultMessage.CREATE_SUCCESS;
}
// ......
}
在PluginServiceImpl类完成数据的持久化操作,即保存数据到数据库,并通过 pluginEventPublisher 进行发布事件。
pluginEventPublisher.onCreateed方法的逻辑是:发布变更的事件。
@Override
public void onCreated(final PluginDO plugin) {
// 发布DataChangeEvent事件:事件分组(插件、选择器、规则)、事件类型(创建、删除、更新)、变更的数据
publisher.publishEvent(new DataChangedEvent(ConfigGroupEnum.PLUGIN, DataEventTypeEnum.CREATE,
Collections.singletonList(PluginTransfer.INSTANCE.mapToData(plugin))));
// 发布PluginCreatedEvent
publish(new PluginCreatedEvent(plugin, SessionUtil.visitorName()));
}
发布变更数据通过publisher.publishEvent()完成,这个publisher对象是一个ApplicationEventPublisher类,这个类的全限定名是org.springframework.context.ApplicationEventPublisher。看到这儿,我们知道了发布数据是通过Spring相关的功能来完成的。
关于
ApplicationEventPublisher:当有状态发生变化时,发布者调用
ApplicationEventPublisher的publishEvent方法发布一个事件,Spring容器广播事件给所有观察者,调用观察者的onApplicationEvent方法把事件对象传递给观察者。调用publishEvent方法有两种途径,一种是实现接口由容器注入ApplicationEventPublisher对象然后调用其方法,另一种是直接调用容器的方法,两种方法发布事件没有太大区别。
ApplicationEventPublisher:发布事件;ApplicationEvent:Spring事件,记录事件源、时间和数据;ApplicationListener:事件监听者,观察者;
在Spring的事件发布机制中,有三个对象,
一个是发布事件的ApplicationEventPublisher,在ShenYu中通过构造器注入了一个eventPublisher。
另一个对象是ApplicationEvent,在ShenYu中通过DataChangedEvent继承了它,表示事件对象。
public class DataChangedEvent extends ApplicationEvent {
//......
}
最后一个是 ApplicationListener,在ShenYu中通过DataChangedEventDispatcher类实现了该接口,作为事件的监听者,负责处理事件对象。
@Component
public class DataChangedEventDispatcher implements ApplicationListener<DataChangedEvent>, InitializingBean {
//......
}
分发数据
- DataChangedEventDispatcher.onApplicationEvent()
当事件发布完成后,会自动进入到DataChangedEventDispatcher类中的onApplicationEvent()方法,进行事件处理。
@Component
public class DataChangedEventDispatcher implements ApplicationListener<DataChangedEvent>, InitializingBean {
/**
* 有数据变更时,调用此方法
* @param event
*/
@Override
@SuppressWarnings("unchecked")
public void onApplicationEvent(final DataChangedEvent event) {
// 遍历数据变更监听器(这里只会注册ApolloDataChangedListener)
for (DataChangedListener listener : listeners) {
// 依据不同的分组类型进行转发
switch (event.getGroupKey()) {
case APP_AUTH: // 认证信息
listener.onAppAuthChanged((List<AppAuthData>) event.getSource(), event.getEventType());
break;
case PLUGIN: // 插件事件
// 调用注册的listener对象
listener.onPluginChanged((List<PluginData>) event.getSource(), event.getEventType());
break;
case RULE: // 规则事件
listener.onRuleChanged((List<RuleData>) event.getSource(), event.getEventType());
break;
case SELECTOR: // 选择器事件
listener.onSelectorChanged((List<SelectorData>) event.getSource(), event.getEventType());
break;
case META_DATA: // 元数据事件
listener.onMetaDataChanged((List<MetaData>) event.getSource(), event.getEventType());
break;
case PROXY_SELECTOR: // 代理选择器事件
listener.onProxySelectorChanged((List<ProxySelectorData>) event.getSource(), event.getEventType());
break;
case DISCOVER_UPSTREAM: // 注册发现下游列表事件
listener.onDiscoveryUpstreamChanged((List<DiscoverySyncData>) event.getSource(), event.getEventType());
applicationContext.getBean(LoadServiceDocEntry.class).loadDocOnUpstreamChanged((List<DiscoverySyncData>) event.getSource(), event.getEventType());
break;
default:
throw new IllegalStateException("Unexpected value: " + event.getGroupKey());
}
}
}
}
当有数据变更时,调用onApplicationEvent方法,然后遍历所有数据变更监听器,判断是哪种数据类型,交给相应的数据监听器进行处理。
ShenYu将所有数据进行了分组,一共会有以下种:认证信息、插件信息、规则信息、选择器信息、元数据、代理选择器、发现下游事件。
这里的数据变更监听器(DataChangedListener),就是数据同步策略的抽象,由特定的实现来处理,而不同的监听由不同的实现来处理,当前分析的是Apollo来
监听,所以这里只关注 ApolloDataChangedListener。
// 继承AbstractNodeDataChangedListener
public class ApolloDataChangedListener extends AbstractNodeDataChangedListener {
}
ApolloDataChangedListener 继承了 AbstractNodeDataChangedListener 类,该类主要是以key作为存储方式的基类,如apollo、nacos等,其他的如zookeeper、
consul、etcd 等是以path的方式进行分层级来查找的。
// 以key作为查找存储方式的基类
public abstract class AbstractNodeDataChangedListener implements DataChangedListener {
protected AbstractNodeDataChangedListener(final ChangeData changeData) {
this.changeData = changeData;
}
}
AbstractNodeDataChangedListener 接收 ChangeData作为参数,该对象定义了存储于Apollo中的各个数据的key命名,存储于Apollo中的数据包括以下数据:
- 插件(plugin)
- 选择器(selector)
- 规则(rule)
- 授权(auth)
- 元数据(meta)
- 代理选择器(proxy.selector)
- 下游列表(discovery)
这些信息由ApolloDataChangedListener构造器指定:
public class ApolloDataChangedListener extends AbstractNodeDataChangedListener {
public ApolloDataChangedListener(final ApolloClient apolloClient) {
// 配置几类分组数据的前缀
super(new ChangeData(ApolloPathConstants.PLUGIN_DATA_ID,
ApolloPathConstants.SELECTOR_DATA_ID,
ApolloPathConstants.RULE_DATA_ID,
ApolloPathConstants.AUTH_DATA_ID,
ApolloPathConstants.META_DATA_ID,
ApolloPathConstants.PROXY_SELECTOR_DATA_ID,
ApolloPathConstants.DISCOVERY_DATA_ID));
// 操作apollo的对象
this.apolloClient = apolloClient;
}
}
DataChangedListener 定义了以下几个方法:
// 数据变更监听器
public interface DataChangedListener {
// 授权信息变更时调用
default void onAppAuthChanged(List<AppAuthData> changed, DataEventTypeEnum eventType) {
}
// 插件信息变更时调用
default void onPluginChanged(List<PluginData> changed, DataEventTypeEnum eventType) {
}
// 选择器信息变更时调用
default void onSelectorChanged(List<SelectorData> changed, DataEventTypeEnum eventType) {
}
// 元数据信息变更时调用
default void onMetaDataChanged(List<MetaData> changed, DataEventTypeEnum eventType) {
}
// 规则信息变更时调用
default void onRuleChanged(List<RuleData> changed, DataEventTypeEnum eventType) {
}
// 代理选择器变更时调用
default void onProxySelectorChanged(List<ProxySelectorData> changed, DataEventTypeEnum eventType) {
}
// 发现下游信息变更时调用
default void onDiscoveryUpstreamChanged(List<DiscoverySyncData> changed, DataEventTypeEnum eventType) {
}
}
由 DataChangedEventDispatcher处理插件时,调用方法 listener.onPluginChanged, 接下来分析 下对象的逻辑,实现由AbstractNodeDataChangedListener处理:
public abstract class AbstractNodeDataChangedListener implements DataChangedListener {
@Override
public void onPluginChanged(final List<PluginData> changed, final DataEventTypeEnum eventType) {
// 配置前缀为plugin.
final String configKeyPrefix = changeData.getPluginDataId() + DefaultNodeConstants.JOIN_POINT;
this.onCommonChanged(configKeyPrefix, changed, eventType, PluginData::getName, PluginData.class);
LOG.debug("[DataChangedListener] PluginChanged {}", configKeyPrefix);
}
}
首先构建配置数据的key前缀为:plugin., 再调用onCommonChanged统一处理:
private <T> void onCommonChanged(final String configKeyPrefix, final List<T> changedList,
final DataEventTypeEnum eventType, final Function<? super T, ? extends String> mapperToKey,
final Class<T> tClass) {
// Avoiding concurrent operations on list nodes
final ReentrantLock reentrantLock = listSaveLockMap.computeIfAbsent(configKeyPrefix, key -> new ReentrantLock());
try {
reentrantLock.lock();
// 当前传入的插件列表
final List<String> changeNames = changedList.stream().map(mapperToKey).collect(Collectors.toList());
switch (eventType) {
// 删除操作
case DELETE:
// 按 plugin.${pluginName} 进行删除
changedList.stream().map(mapperToKey).forEach(removeKey -> {
delConfig(configKeyPrefix + removeKey);
});
// 从plugin.list中移除对应的插件名称
// plugin.list 记录下了目前启用的列表
delChangedData(configKeyPrefix, changeNames);
break;
case REFRESH:
case MYSELF:
// 重载逻辑
// 获取plugin.list中的所有插件列表
final List<String> configDataNames = this.getConfigDataNames(configKeyPrefix);
// 依次更新当前调整的每个插件
changedList.forEach(changedData -> {
// 发布配置
publishConfig(configKeyPrefix + mapperToKey.apply(changedData), changedData);
});
// 目前存储的列表中,如果数据比当前传入的多,则删除多余的数据
if (configDataNames != null && configDataNames.size() > changedList.size()) {
// 踢除当前加载的数据
configDataNames.removeAll(changeNames);
// 逐个删除已经取消的数据
configDataNames.forEach(this::delConfig);
}
// 重新更新列表数据
publishConfig(configKeyPrefix + DefaultNodeConstants.LIST_STR, changeNames);
break;
default:
// 新增或是更新
changedList.forEach(changedData -> {
publishConfig(configKeyPrefix + mapperToKey.apply(changedData), changedData);
});
// 将新加的插件更新
putChangeData(configKeyPrefix, changeNames);
break;
}
} catch (Exception e) {
LOG.error("AbstractNodeDataChangedListener onCommonMultiChanged error ", e);
} finally {
reentrantLock.unlock();
}
}
在以上逻辑,其实包含全量重载(REFRESH、MYSELF)与增量(DELETE、UPDATE、CREATE)的处理
在插件中主要包含两个节点:
plugin.list当前生效的插件列表plugin.${plugin.name}具体插件的详细信息 最后,将这两个节点对应的数据写入Apollo。