Code Analysis For Dubbo Plugin
Apache ShenYu is an asynchronous, high-performance, cross-language, responsive
API
gateway.
The Apache ShenYu
gateway uses the dubbo
plugin to make calls to the dubbo
service. You can see the official documentation Dubbo Quick Start to learn how to use the plugin.
This article is based on
shenyu-2.4.3
version for source code analysis, please refer to Dubbo Service Access for the introduction of the official website.
#
1. Service RegistrationTake the example provided on the official website shenyu-examples-dubbo. Suppose your dubbo
service is defined as follows (spring-dubbo.xml
).
<beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:dubbo="http://code.alibabatech.com/schema/dubbo" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://code.alibabatech.com/schema/dubbo https://code.alibabatech.com/schema/dubbo/dubbo.xsd">
<dubbo:application name="test-dubbo-service"/> <dubbo:registry address="${dubbo.registry.address}"/> <dubbo:protocol name="dubbo" port="20888"/>
<dubbo:service timeout="10000" interface="org.apache.shenyu.examples.dubbo.api.service.DubboTestService" ref="dubboTestService"/>
</beans>
Declare the application service name, register the center address, use the dubbo
protocol, declare the service interface, and the corresponding interface implementation class.
/** * DubboTestServiceImpl. */@Service("dubboTestService")public class DubboTestServiceImpl implements DubboTestService { @Override @ShenyuDubboClient(path = "/findById", desc = "Query by Id") public DubboTest findById(final String id) { return new DubboTest(id, "hello world shenyu Apache, findById"); }
//......}
In the interface implementation class, use the annotation @ShenyuDubboClient
to register the service with shenyu-admin
. The role of this annotation and its rationale will be analyzed later.
The configuration information in the configuration file application.yml
.
server: port: 8011 address: 0.0.0.0 servlet: context-path: /spring: main: allow-bean-definition-overriding: truedubbo: registry: address: zookeeper://localhost:2181 # dubbo registry shenyu: register: registerType: http serverLists: http://localhost:9095 props: username: admin password: 123456 client: dubbo: props: contextPath: /dubbo appName: dubbo
In the configuration file, declare the registry address used by dubbo
. The dubbo
service registers with shenyu-admin
, using the method http
, and the registration address is http://localhost:9095
.
See Application Client Access for more information on the use of the registration method.
#
1.1 Declaration of registration interfaceUse the annotation @ShenyuDubboClient
to register the service to the gateway. The simple demo
is as follows.
// dubbo sevice@Service("dubboTestService")public class DubboTestServiceImpl implements DubboTestService { @Override @ShenyuDubboClient(path = "/findById", desc = "Query by Id") // need to be registered method public DubboTest findById(final String id) { return new DubboTest(id, "hello world shenyu Apache, findById"); }
//......}
annotation definition:
/** * Works on classes and methods */@Retention(RetentionPolicy.RUNTIME)@Target({ElementType.TYPE, ElementType.METHOD})@Inheritedpublic @interface ShenyuDubboClient { //path String path(); //rule name String ruleName() default ""; //desc String desc() default "";
//enabled boolean enabled() default true;}
#
1.2 Scan annotation informationAnnotation scanning is done through the ApacheDubboServiceBeanListener
, which implements the ApplicationListener<ContextRefreshedEvent>
interface and starts executing the event handler method when a context refresh event occurs during the Spring
container startup onApplicationEvent()
.
During constructor instantiation.
- Read property configuration
- Start the thread pool
- Start the registry for registering with
shenyu-admin
public class ApacheDubboServiceBeanListener implements ApplicationListener<ContextRefreshedEvent> {
// ......
//Constructor public ApacheDubboServiceBeanListener(final PropertiesConfig clientConfig, final ShenyuClientRegisterRepository shenyuClientRegisterRepository) { //1.Read property configuration Properties props = clientConfig.getProps(); String contextPath = props.getProperty(ShenyuClientConstants.CONTEXT_PATH); String appName = props.getProperty(ShenyuClientConstants.APP_NAME); if (StringUtils.isBlank(contextPath)) { throw new ShenyuClientIllegalArgumentException("apache dubbo client must config the contextPath or appName"); } this.contextPath = contextPath; this.appName = appName; this.host = props.getProperty(ShenyuClientConstants.HOST); this.port = props.getProperty(ShenyuClientConstants.PORT); //2.Start the thread pool executorService = Executors.newSingleThreadExecutor(new ThreadFactoryBuilder().setNameFormat("shenyu-apache-dubbo-client-thread-pool-%d").build()); //3.Start the registry for registering with `shenyu-admin` publisher.start(shenyuClientRegisterRepository); }
/** * Context refresh event, execute method logic */ @Override public void onApplicationEvent(final ContextRefreshedEvent contextRefreshedEvent) { //...... }
- ApacheDubboServiceBeanListener#onApplicationEvent()
Rewritten method logic: read Dubbo
service ServiceBean
, build metadata object and URI
object, and register it with shenyu-admin
.
@Override public void onApplicationEvent(final ContextRefreshedEvent contextRefreshedEvent) { //read ServiceBean Map<String, ServiceBean> serviceBean = contextRefreshedEvent.getApplicationContext().getBeansOfType(ServiceBean.class); if (serviceBean.isEmpty()) { return; } //The method is guaranteed to be executed only once if (!registered.compareAndSet(false, true)) { return; } //handle metadata for (Map.Entry<String, ServiceBean> entry : serviceBean.entrySet()) { handler(entry.getValue()); } //handle URI serviceBean.values().stream().findFirst().ifPresent(bean -> { publisher.publishEvent(buildURIRegisterDTO(bean)); }); }
handler()
In the
handler()
method, read all methods from theserviceBean
, determine if there is aShenyuDubboClient
annotation on the method, build a metadata object if it exists, and register the method withshenyu-admin
through the registry.
private void handler(final ServiceBean<?> serviceBean) { //get proxy Object refProxy = serviceBean.getRef(); //get class Class<?> clazz = refProxy.getClass(); if (AopUtils.isAopProxy(refProxy)) { clazz = AopUtils.getTargetClass(refProxy); } //all methods Method[] methods = ReflectionUtils.getUniqueDeclaredMethods(clazz); for (Method method : methods) { //read ShenyuDubboClient annotation ShenyuDubboClient shenyuDubboClient = method.getAnnotation(ShenyuDubboClient.class); if (Objects.nonNull(shenyuDubboClient)) { //build meatdata and registry publisher.publishEvent(buildMetaDataDTO(serviceBean, shenyuDubboClient, method)); } } }
buildMetaDataDTO()
Constructs a metadata object where the necessary information for method registration is constructed and subsequently used for selector or rule matching.
private MetaDataRegisterDTO buildMetaDataDTO(final ServiceBean<?> serviceBean, final ShenyuDubboClient shenyuDubboClient, final Method method) { //app name String appName = buildAppName(serviceBean); //path String path = contextPath + shenyuDubboClient.path(); //desc String desc = shenyuDubboClient.desc(); //service name String serviceName = serviceBean.getInterface(); //rule name String configRuleName = shenyuDubboClient.ruleName(); String ruleName = ("".equals(configRuleName)) ? path : configRuleName; //method name String methodName = method.getName(); //parameter Types Class<?>[] parameterTypesClazz = method.getParameterTypes(); String parameterTypes = Arrays.stream(parameterTypesClazz).map(Class::getName).collect(Collectors.joining(",")); return MetaDataRegisterDTO.builder() .appName(appName) .serviceName(serviceName) .methodName(methodName) .contextPath(contextPath) .host(buildHost()) .port(buildPort(serviceBean)) .path(path) .ruleName(ruleName) .pathDesc(desc) .parameterTypes(parameterTypes) .rpcExt(buildRpcExt(serviceBean)) //dubbo ext .rpcType(RpcTypeEnum.DUBBO.getName()) .enabled(shenyuDubboClient.enabled()) .build(); }
buildRpcExt()
dubbo
ext information.private String buildRpcExt(final ServiceBean serviceBean) { DubboRpcExt build = DubboRpcExt.builder() .group(StringUtils.isNotEmpty(serviceBean.getGroup()) ? serviceBean.getGroup() : "")//group .version(StringUtils.isNotEmpty(serviceBean.getVersion()) ? serviceBean.getVersion() : "")//version .loadbalance(StringUtils.isNotEmpty(serviceBean.getLoadbalance()) ? serviceBean.getLoadbalance() : Constants.DEFAULT_LOADBALANCE)//load balance .retries(Objects.isNull(serviceBean.getRetries()) ? Constants.DEFAULT_RETRIES : serviceBean.getRetries())//retry .timeout(Objects.isNull(serviceBean.getTimeout()) ? Constants.DEFAULT_CONNECT_TIMEOUT : serviceBean.getTimeout())//time .sent(Objects.isNull(serviceBean.getSent()) ? Constants.DEFAULT_SENT : serviceBean.getSent())//sent .cluster(StringUtils.isNotEmpty(serviceBean.getCluster()) ? serviceBean.getCluster() : Constants.DEFAULT_CLUSTER)//cluster .url("") .build(); return GsonUtils.getInstance().toJson(build); }
buildURIRegisterDTO()
Construct
URI
objects to register information about the service itself, which can be subsequently used for service probing live.
private URIRegisterDTO buildURIRegisterDTO(final ServiceBean serviceBean) { return URIRegisterDTO.builder() .contextPath(this.contextPath) //context path .appName(buildAppName(serviceBean))//app name .rpcType(RpcTypeEnum.DUBBO.getName())//dubbo .host(buildHost()) //host .port(buildPort(serviceBean))//port .build(); }
The specific registration logic is implemented by the registration center, please refer to Client Access Principles .
//To the registration center, post registration events publisher.publishEvent();
#
1.3 Processing registration informationThe metadata and URI
data registered by the client through the registry are processed at the shenyu-admin
end, which is responsible for storing to the database and synchronizing to the shenyu
gateway. The client-side registration processing logic of the Dubbo
plugin is in the ShenyuClientRegisterDubboServiceImpl
. The inheritance relationship is as follows.
- ShenyuClientRegisterService: client registration service, top-level interface.
- FallbackShenyuClientRegisterService: registration failure, provides retry operation.
- AbstractShenyuClientRegisterServiceImpl: abstract class, implements part of the public registration logic.
- ShenyuClientRegisterDubboServiceImpl: implementation of the
Dubbo
plugin registration.
#
1.3.1 Registration Serviceorg.apache.shenyu.admin.service.register.AbstractShenyuClientRegisterServiceImpl#register()
The metadata
MetaDataRegisterDTO
object registered by the client through the registry is picked up and dropped in theregister()
method ofshenyu-admin
.
@Override public String register(final MetaDataRegisterDTO dto) { //1. register selector String selectorHandler = selectorHandler(dto); String selectorId = selectorService.registerDefault(dto, PluginNameAdapter.rpcTypeAdapter(rpcType()), selectorHandler); //2. register rule String ruleHandler = ruleHandler(); RuleDTO ruleDTO = buildRpcDefaultRuleDTO(selectorId, dto, ruleHandler); ruleService.registerDefault(ruleDTO); //3. register metadata registerMetadata(dto); //4. register contextPath String contextPath = dto.getContextPath(); if (StringUtils.isNotEmpty(contextPath)) { registerContextPath(dto); } return ShenyuResultMessage.SUCCESS; }
#
1.3.1.1 Register Selector- org.apache.shenyu.admin.service.impl.SelectorServiceImpl#registerDefault()
Construct contextPath
, find if the selector information exists, if it does, return id
; if it doesn't, create the default selector information.
@Override public String registerDefault(final MetaDataRegisterDTO dto, final String pluginName, final String selectorHandler) { // build contextPath String contextPath = ContextPathUtils.buildContextPath(dto.getContextPath(), dto.getAppName()); // Find if selector information exists by name SelectorDO selectorDO = findByNameAndPluginName(contextPath, pluginName); if (Objects.isNull(selectorDO)) { // Create a default selector message if it does not exist return registerSelector(contextPath, pluginName, selectorHandler); } return selectorDO.getId(); }
Default selector information
Construct the default selector information and its conditional properties here.
//register selector private String registerSelector(final String contextPath, final String pluginName, final String selectorHandler) { //build selector SelectorDTO selectorDTO = buildSelectorDTO(contextPath, pluginMapper.selectByName(pluginName).getId()); selectorDTO.setHandle(selectorHandler); //register default selector return registerDefault(selectorDTO); } //build selector private SelectorDTO buildSelectorDTO(final String contextPath, final String pluginId) { //build default SelectorDTO selectorDTO = buildDefaultSelectorDTO(contextPath); selectorDTO.setPluginId(pluginId); //build the conditional properties of the default selector selectorDTO.setSelectorConditions(buildDefaultSelectorConditionDTO(contextPath)); return selectorDTO; }
- Build default selector
private SelectorDTO buildDefaultSelectorDTO(final String name) { return SelectorDTO.builder() .name(name) // name .type(SelectorTypeEnum.CUSTOM_FLOW.getCode()) // default type cutom .matchMode(MatchModeEnum.AND.getCode()) //default match mode .enabled(Boolean.TRUE) //enable .loged(Boolean.TRUE) //log .continued(Boolean.TRUE) .sort(1) .build();}
- Build default selector conditional properties
private List<SelectorConditionDTO> buildDefaultSelectorConditionDTO(final String contextPath) { SelectorConditionDTO selectorConditionDTO = new SelectorConditionDTO(); selectorConditionDTO.setParamType(ParamTypeEnum.URI.getName()); // default URI selectorConditionDTO.setParamName("/"); selectorConditionDTO.setOperator(OperatorEnum.MATCH.getAlias()); // default match selectorConditionDTO.setParamValue(contextPath + AdminConstants.URI_SUFFIX); return Collections.singletonList(selectorConditionDTO);}
- Register default selector
@Overridepublic String registerDefault(final SelectorDTO selectorDTO) { //selector information SelectorDO selectorDO = SelectorDO.buildSelectorDO(selectorDTO); //selector conditional properties List<SelectorConditionDTO> selectorConditionDTOs = selectorDTO.getSelectorConditions(); if (StringUtils.isEmpty(selectorDTO.getId())) { // insert selector information into the database selectorMapper.insertSelective(selectorDO); // inserting selector conditional properties to the database selectorConditionDTOs.forEach(selectorConditionDTO -> { selectorConditionDTO.setSelectorId(selectorDO.getId()); selectorConditionMapper.insertSelective(SelectorConditionDO.buildSelectorConditionDO(selectorConditionDTO)); }); } // Publish synchronization events to synchronize selection information and its conditional attributes to the gateway publishEvent(selectorDO, selectorConditionDTOs); return selectorDO.getId();}
#
1.3.1.2 Registration RulesIn the second step of registering the service, start building the default rules and then register the rules.
@Override public String register(final MetaDataRegisterDTO dto) { //1. handle selector //...... //2. handle rule String ruleHandler = ruleHandler(); // build default rule RuleDTO ruleDTO = buildRpcDefaultRuleDTO(selectorId, dto, ruleHandler); // register rule ruleService.registerDefault(ruleDTO); //3. reigster metadata //...... //4. register ContextPath //...... return ShenyuResultMessage.SUCCESS; }
- 默认规则处理属性
@Override protected String ruleHandler() { // default rule return new DubboRuleHandle().toJson(); }
Dubbo
plugin default rule handling properties.
public class DubboRuleHandle implements RuleHandle {
/** * dubbo version. */ private String version;
/** * group. */ private String group;
/** * retry. */ private Integer retries = 0;
/** * loadbalance:RANDOM */ private String loadbalance = LoadBalanceEnum.RANDOM.getName();
/** * timeout default 3000 */ private long timeout = Constants.TIME_OUT;}
- build default rule
// build default rule private RuleDTO buildRpcDefaultRuleDTO(final String selectorId, final MetaDataRegisterDTO metaDataDTO, final String ruleHandler) { return buildRuleDTO(selectorId, ruleHandler, metaDataDTO.getRuleName(), metaDataDTO.getPath()); } // build default rule private RuleDTO buildRuleDTO(final String selectorId, final String ruleHandler, final String ruleName, final String path) { RuleDTO ruleDTO = RuleDTO.builder() .selectorId(selectorId) .name(ruleName) .matchMode(MatchModeEnum.AND.getCode()) .enabled(Boolean.TRUE) .loged(Boolean.TRUE) .sort(1) .handle(ruleHandler) .build(); RuleConditionDTO ruleConditionDTO = RuleConditionDTO.builder() .paramType(ParamTypeEnum.URI.getName()) .paramName("/") .paramValue(path) .build(); if (path.indexOf("*") > 1) { ruleConditionDTO.setOperator(OperatorEnum.MATCH.getAlias()); } else { ruleConditionDTO.setOperator(OperatorEnum.EQ.getAlias()); } ruleDTO.setRuleConditions(Collections.singletonList(ruleConditionDTO)); return ruleDTO; }
- org.apache.shenyu.admin.service.impl.RuleServiceImpl#registerDefault()
Registration rules: insert records to the database and publish events to the gateway for data synchronization.
@Override public String registerDefault(final RuleDTO ruleDTO) { RuleDO exist = ruleMapper.findBySelectorIdAndName(ruleDTO.getSelectorId(), ruleDTO.getName()); if (Objects.nonNull(exist)) { return ""; }
RuleDO ruleDO = RuleDO.buildRuleDO(ruleDTO); List<RuleConditionDTO> ruleConditions = ruleDTO.getRuleConditions(); if (StringUtils.isEmpty(ruleDTO.getId())) { // insert rule information into the database ruleMapper.insertSelective(ruleDO); //insert rule body conditional attributes into the database ruleConditions.forEach(ruleConditionDTO -> { ruleConditionDTO.setRuleId(ruleDO.getId()); ruleConditionMapper.insertSelective(RuleConditionDO.buildRuleConditionDO(ruleConditionDTO)); }); } // Publish events to the gateway for data synchronization publishEvent(ruleDO, ruleConditions); return ruleDO.getId(); }
#
1.3.1.3 Register MetadataMetadata is mainly used for RPC
service calls.
@Override public String register(final MetaDataRegisterDTO dto) { //1. register selector //...... //2. register rule //...... //3. register metadata registerMetadata(dto); //4. register ContextPath //...... return ShenyuResultMessage.SUCCESS; }
org.apache.shenyu.admin.service.register.ShenyuClientRegisterDubboServiceImpl#registerMetadata()
Insert or update metadata and then publish sync events to the gateway.
@Override protected void registerMetadata(final MetaDataRegisterDTO dto) { // get metaDataService MetaDataService metaDataService = getMetaDataService(); MetaDataDO exist = metaDataService.findByPath(dto.getPath()); //insert or update metadata metaDataService.saveOrUpdateMetaData(exist, dto); }
@Override public void saveOrUpdateMetaData(final MetaDataDO exist, final MetaDataRegisterDTO metaDataDTO) { DataEventTypeEnum eventType; // DTO->DO MetaDataDO metaDataDO = MetaDataTransfer.INSTANCE.mapRegisterDTOToEntity(metaDataDTO); // insert data if (Objects.isNull(exist)) { Timestamp currentTime = new Timestamp(System.currentTimeMillis()); metaDataDO.setId(UUIDUtils.getInstance().generateShortUuid()); metaDataDO.setDateCreated(currentTime); metaDataDO.setDateUpdated(currentTime); metaDataMapper.insert(metaDataDO); eventType = DataEventTypeEnum.CREATE; } else { // update metaDataDO.setId(exist.getId()); metaDataMapper.update(metaDataDO); eventType = DataEventTypeEnum.UPDATE; } // Publish sync events to gateway eventPublisher.publishEvent(new DataChangedEvent(ConfigGroupEnum.META_DATA, eventType, Collections.singletonList(MetaDataTransfer.INSTANCE.mapToData(metaDataDO)))); }
#
1.3.2 Register URI- org.apache.shenyu.admin.service.register.FallbackShenyuClientRegisterService#registerURI()
The server side receives the URI
information registered by the client and processes it.
@Override public String registerURI(final String selectorName, final List<URIRegisterDTO> uriList) { String result; String key = key(selectorName); try { this.removeFallBack(key); // register URI result = this.doRegisterURI(selectorName, uriList); logger.info("Register success: {},{}", selectorName, uriList); } catch (Exception ex) { logger.warn("Register exception: cause:{}", ex.getMessage()); result = ""; // Retry after registration failure this.addFallback(key, new FallbackHolder(selectorName, uriList)); } return result; }
- org.apache.shenyu.admin.service.register.AbstractShenyuClientRegisterServiceImpl#doRegisterURI()
Get a valid URI
from the URI
registered by the client, update the corresponding selector handle
property, and send a selector update event to the gateway.
@Override public String doRegisterURI(final String selectorName, final List<URIRegisterDTO> uriList) { //check if (CollectionUtils.isEmpty(uriList)) { return ""; } SelectorDO selectorDO = selectorService.findByNameAndPluginName(selectorName, PluginNameAdapter.rpcTypeAdapter(rpcType())); if (Objects.isNull(selectorDO)) { throw new ShenyuException("doRegister Failed to execute,wait to retry."); } // gte valid URI List<URIRegisterDTO> validUriList = uriList.stream().filter(dto -> Objects.nonNull(dto.getPort()) && StringUtils.isNotBlank(dto.getHost())).collect(Collectors.toList()); // build handle String handler = buildHandle(validUriList, selectorDO); if (handler != null) { selectorDO.setHandle(handler); SelectorData selectorData = selectorService.buildByName(selectorName, PluginNameAdapter.rpcTypeAdapter(rpcType())); selectorData.setHandle(handler); // Update the handle property of the selector to the database selectorService.updateSelective(selectorDO); // Send selector update events to the gateway eventPublisher.publishEvent(new DataChangedEvent(ConfigGroupEnum.SELECTOR, DataEventTypeEnum.UPDATE, Collections.singletonList(selectorData))); } return ShenyuResultMessage.SUCCESS; }
The source code analysis on service registration is completed as well as the analysis flow chart is as follows.
The next step is to analyze how the dubbo
plugin initiates calls to the http
service based on this information.
#
2. Service InvocationThe dubbo
plugin is the core processing plugin used by the ShenYu
gateway to convert http
requests into the dubbo protocol
and invoke the dubbo
service.
Take the case provided by the official website Quick Start with Dubbo as an example, a dubbo
service is registered with shenyu-admin
through the registry, and then requested through the ShenYu
gateway proxy, the request is as follows.
GET http://localhost:9195/dubbo/findById?id=100Accept: application/json
The class inheritance relationship in the Dubbo
plugin is as follows.
- ShenyuPlugin: top-level interface, defining interface methods.
- AbstractShenyuPlugin: abstract class that implements plugin common logic.
- AbstractDubboPlugin: dubbo plugin abstract class, implementing
dubbo
common logic. - ApacheDubboPlugin: ApacheDubbo plugin.
ShenYu Gateway supports ApacheDubbo and AlibabaDubbo\
#
2.1 Receive requestsAfter passing the ShenYu
gateway proxy, the request entry is ShenyuWebHandler
, which implements the org.springframework.web.server.WebHandler
interface.
public final class ShenyuWebHandler implements WebHandler, ApplicationListener<SortPluginEvent> { //...... /** * hanlde request */ @Override public Mono<Void> handle(@NonNull final ServerWebExchange exchange) { // execute default plugin chain Mono<Void> execute = new DefaultShenyuPluginChain(plugins).execute(exchange); if (scheduled) { return execute.subscribeOn(scheduler); } return execute; } private static class DefaultShenyuPluginChain implements ShenyuPluginChain {
private int index;
private final List<ShenyuPlugin> plugins;
DefaultShenyuPluginChain(final List<ShenyuPlugin> plugins) { this.plugins = plugins; }
/** * execute. */ @Override public Mono<Void> execute(final ServerWebExchange exchange) { return Mono.defer(() -> { if (this.index < plugins.size()) { // get plugin ShenyuPlugin plugin = plugins.get(this.index++); boolean skip = plugin.skip(exchange); if (skip) { // next return this.execute(exchange); } // execute return plugin.execute(exchange, this); } return Mono.empty(); }); } }}
#
2.2 Match Rule- org.apache.shenyu.plugin.base.AbstractShenyuPlugin#execute()
Execute the matching logic for selectors and rules in the execute()
method.
- Matching selectors.
- Matching rules.
- Execute the plugin.
@Override public Mono<Void> execute(final ServerWebExchange exchange, final ShenyuPluginChain chain) { // plugin name String pluginName = named(); // plugin data PluginData pluginData = BaseDataCache.getInstance().obtainPluginData(pluginName); if (pluginData != null && pluginData.getEnabled()) { // selector data final Collection<SelectorData> selectors = BaseDataCache.getInstance().obtainSelectorData(pluginName); if (CollectionUtils.isEmpty(selectors)) { return handleSelectorIfNull(pluginName, exchange, chain); } // match selector SelectorData selectorData = matchSelector(exchange, selectors); if (Objects.isNull(selectorData)) { return handleSelectorIfNull(pluginName, exchange, chain); } selectorLog(selectorData, pluginName); // rule data List<RuleData> rules = BaseDataCache.getInstance().obtainRuleData(selectorData.getId()); if (CollectionUtils.isEmpty(rules)) { return handleRuleIfNull(pluginName, exchange, chain); } // match rule RuleData rule; if (selectorData.getType() == SelectorTypeEnum.FULL_FLOW.getCode()) { //get last rule = rules.get(rules.size() - 1); } else { rule = matchRule(exchange, rules); } if (Objects.isNull(rule)) { return handleRuleIfNull(pluginName, exchange, chain); } ruleLog(rule, pluginName); // execute return doExecute(exchange, chain, selectorData, rule); } return chain.execute(exchange); }
#
2.3 Execute GlobalPlugin- org.apache.shenyu.plugin.global.GlobalPlugin#execute()
GlobalPlugin
is a global plugin that constructs contextual information in the execute()
method.
public class GlobalPlugin implements ShenyuPlugin { // shenyu context private final ShenyuContextBuilder builder; //...... @Override public Mono<Void> execute(final ServerWebExchange exchange, final ShenyuPluginChain chain) { // build context information to be passed into the exchange ShenyuContext shenyuContext = builder.build(exchange); exchange.getAttributes().put(Constants.CONTEXT, shenyuContext); return chain.execute(exchange); } //......}
- org.apache.shenyu.plugin.global.DefaultShenyuContextBuilder#build()
Build the default context information.
public class DefaultShenyuContextBuilder implements ShenyuContextBuilder { //...... @Override public ShenyuContext build(final ServerWebExchange exchange) { //build data Pair<String, MetaData> buildData = buildData(exchange); //wrap ShenyuContext return decoratorMap.get(buildData.getLeft()).decorator(buildDefaultContext(exchange.getRequest()), buildData.getRight()); } private Pair<String, MetaData> buildData(final ServerWebExchange exchange) { //...... //get the metadata according to the requested uri MetaData metaData = MetaDataCache.getInstance().obtain(request.getURI().getPath()); if (Objects.nonNull(metaData) && Boolean.TRUE.equals(metaData.getEnabled())) { exchange.getAttributes().put(Constants.META_DATA, metaData); return Pair.of(metaData.getRpcType(), metaData); } else { return Pair.of(RpcTypeEnum.HTTP.getName(), new MetaData()); } } //set the default context information private ShenyuContext buildDefaultContext(final ServerHttpRequest request) { String appKey = request.getHeaders().getFirst(Constants.APP_KEY); String sign = request.getHeaders().getFirst(Constants.SIGN); String timestamp = request.getHeaders().getFirst(Constants.TIMESTAMP); ShenyuContext shenyuContext = new ShenyuContext(); String path = request.getURI().getPath(); shenyuContext.setPath(path); shenyuContext.setAppKey(appKey); shenyuContext.setSign(sign); shenyuContext.setTimestamp(timestamp); shenyuContext.setStartDateTime(LocalDateTime.now()); Optional.ofNullable(request.getMethod()).ifPresent(httpMethod -> shenyuContext.setHttpMethod(httpMethod.name())); return shenyuContext; } }
- org.apache.shenyu.plugin.dubbo.common.context.DubboShenyuContextDecorator#decorator()
wrap ShenyuContext
:
public class DubboShenyuContextDecorator implements ShenyuContextDecorator { @Override public ShenyuContext decorator(final ShenyuContext shenyuContext, final MetaData metaData) { shenyuContext.setModule(metaData.getAppName()); shenyuContext.setMethod(metaData.getServiceName()); shenyuContext.setContextPath(metaData.getContextPath()); shenyuContext.setRpcType(RpcTypeEnum.DUBBO.getName()); return shenyuContext; } @Override public String rpcType() { return RpcTypeEnum.DUBBO.getName(); }}
#
2.4 Execute RpcParamTransformPluginThe RpcParamTransformPlugin
is responsible for reading the parameters from the http
request, saving them in the exchange
and passing them to the rpc
service.
- org.apache.shenyu.plugin.base.RpcParamTransformPlugin#execute()
In the execute()
method, the core logic of the plugin is executed: get the request information from exchange
and process the parameters according to the form of content passed in by the request.
public class RpcParamTransformPlugin implements ShenyuPlugin {
@Override public Mono<Void> execute(final ServerWebExchange exchange, final ShenyuPluginChain chain) { //get request information from exchange ServerHttpRequest request = exchange.getRequest(); ShenyuContext shenyuContext = exchange.getAttribute(Constants.CONTEXT); if (Objects.nonNull(shenyuContext)) { // APPLICATION_JSON MediaType mediaType = request.getHeaders().getContentType(); if (MediaType.APPLICATION_JSON.isCompatibleWith(mediaType)) { return body(exchange, request, chain); } // APPLICATION_FORM_URLENCODED if (MediaType.APPLICATION_FORM_URLENCODED.isCompatibleWith(mediaType)) { return formData(exchange, request, chain); } //query return query(exchange, request, chain); } return chain.execute(exchange); } //APPLICATION_JSON private Mono<Void> body(final ServerWebExchange exchange, final ServerHttpRequest serverHttpRequest, final ShenyuPluginChain chain) { return Mono.from(DataBufferUtils.join(serverHttpRequest.getBody()) .flatMap(body -> { exchange.getAttributes().put(Constants.PARAM_TRANSFORM, resolveBodyFromRequest(body));//解析body,保存到exchange中 return chain.execute(exchange); })); } // APPLICATION_FORM_URLENCODED private Mono<Void> formData(final ServerWebExchange exchange, final ServerHttpRequest serverHttpRequest, final ShenyuPluginChain chain) { return Mono.from(DataBufferUtils.join(serverHttpRequest.getBody()) .flatMap(map -> { String param = resolveBodyFromRequest(map); LinkedMultiValueMap<String, String> linkedMultiValueMap; try { linkedMultiValueMap = BodyParamUtils.buildBodyParams(URLDecoder.decode(param, StandardCharsets.UTF_8.name())); //格式化数据 } catch (UnsupportedEncodingException e) { return Mono.error(e); } exchange.getAttributes().put(Constants.PARAM_TRANSFORM, HttpParamConverter.toMap(() -> linkedMultiValueMap));// 保存到exchange中 return chain.execute(exchange); })); } //query private Mono<Void> query(final ServerWebExchange exchange, final ServerHttpRequest serverHttpRequest, final ShenyuPluginChain chain) { exchange.getAttributes().put(Constants.PARAM_TRANSFORM, HttpParamConverter.ofString(() -> serverHttpRequest.getURI().getQuery()));//保存到exchange中 return chain.execute(exchange); } //...... }
#
2.5 Execute DubboPlugin- org.apache.shenyu.plugin.dubbo.common.AbstractDubboPlugin#doExecute()
In the doExecute()
method, the main purpose is to check the metadata and parameters.
public abstract class AbstractDubboPlugin extends AbstractShenyuPlugin { @Override public Mono<Void> doExecute(final ServerWebExchange exchange, final ShenyuPluginChain chain, final SelectorData selector, final RuleData rule) { //param String param = exchange.getAttribute(Constants.PARAM_TRANSFORM); //context ShenyuContext shenyuContext = exchange.getAttribute(Constants.CONTEXT); assert shenyuContext != null; //metaData MetaData metaData = exchange.getAttribute(Constants.META_DATA); //check metaData if (!checkMetaData(metaData)) { LOG.error(" path is : {}, meta data have error : {}", shenyuContext.getPath(), metaData); exchange.getResponse().setStatusCode(HttpStatus.INTERNAL_SERVER_ERROR); Object error = ShenyuResultWrap.error(exchange, ShenyuResultEnum.META_DATA_ERROR, null); return WebFluxResultUtils.result(exchange, error); } //check if (Objects.nonNull(metaData) && StringUtils.isNoneBlank(metaData.getParameterTypes()) && StringUtils.isBlank(param)) { exchange.getResponse().setStatusCode(HttpStatus.INTERNAL_SERVER_ERROR); Object error = ShenyuResultWrap.error(exchange, ShenyuResultEnum.DUBBO_HAVE_BODY_PARAM, null); return WebFluxResultUtils.result(exchange, error); } //set rpcContext this.rpcContext(exchange); //dubbo invoke return this.doDubboInvoker(exchange, chain, selector, rule, metaData, param); }}
- org.apache.shenyu.plugin.apache.dubbo.ApacheDubboPlugin#doDubboInvoker()
Set special context information in the doDubboInvoker()
method, and then start the dubbo generalization call.
public class ApacheDubboPlugin extends AbstractDubboPlugin { @Override protected Mono<Void> doDubboInvoker(final ServerWebExchange exchange, final ShenyuPluginChain chain, final SelectorData selector, final RuleData rule, final MetaData metaData, final String param) { //set the current selector and rule information, and request address for dubbo graying support RpcContext.getContext().setAttachment(Constants.DUBBO_SELECTOR_ID, selector.getId()); RpcContext.getContext().setAttachment(Constants.DUBBO_RULE_ID, rule.getId()); RpcContext.getContext().setAttachment(Constants.DUBBO_REMOTE_ADDRESS, Objects.requireNonNull(exchange.getRequest().getRemoteAddress()).getAddress().getHostAddress()); //dubbo generic invoker final Mono<Object> result = dubboProxyService.genericInvoker(param, metaData, exchange); //execute next plugin in chain return result.then(chain.execute(exchange)); }}
- org.apache.shenyu.plugin.apache.dubbo.proxy.ApacheDubboProxyService#genericInvoker()
genericInvoker()
method.
- Gets the
ReferenceConfig
object. - Gets the generalization service
GenericService
object. - Constructs the request parameter
pair
object. - Initiates an asynchronous generalization invocation.
public class ApacheDubboProxyService { //......
/** * Generic invoker object. */ public Mono<Object> genericInvoker(final String body, final MetaData metaData, final ServerWebExchange exchange) throws ShenyuException { //1.Get the ReferenceConfig object ReferenceConfig<GenericService> reference = ApacheDubboConfigCache.getInstance().get(metaData.getPath());
if (Objects.isNull(reference) || StringUtils.isEmpty(reference.getInterface())) { //Failure of the current cache information ApacheDubboConfigCache.getInstance().invalidate(metaData.getPath()); //Reinitialization with metadata reference = ApacheDubboConfigCache.getInstance().initRef(metaData); } //2.Get the GenericService object of the generalization service GenericService genericService = reference.get(); //3.Constructing the request parameter pair object Pair<String[], Object[]> pair; if (StringUtils.isBlank(metaData.getParameterTypes()) || ParamCheckUtils.dubboBodyIsEmpty(body)) { pair = new ImmutablePair<>(new String[]{}, new Object[]{}); } else { pair = dubboParamResolveService.buildParameter(body, metaData.getParameterTypes()); } //4.Initiating asynchronous generalization calls return Mono.fromFuture(invokeAsync(genericService, metaData.getMethodName(), pair.getLeft(), pair.getRight()).thenApply(ret -> { //handle result if (Objects.isNull(ret)) { ret = Constants.DUBBO_RPC_RESULT_EMPTY; } exchange.getAttributes().put(Constants.RPC_RESULT, ret); exchange.getAttributes().put(Constants.CLIENT_RESPONSE_RESULT_TYPE, ResultEnum.SUCCESS.getName()); return ret; })).onErrorMap(exception -> exception instanceof GenericException ? new ShenyuException(((GenericException) exception).getExceptionMessage()) : new ShenyuException(exception));//处理异常 } //Generalized calls, asynchronous operations private CompletableFuture<Object> invokeAsync(final GenericService genericService, final String method, final String[] parameterTypes, final Object[] args) throws GenericException { genericService.$invoke(method, parameterTypes, args); Object resultFromFuture = RpcContext.getContext().getFuture(); return resultFromFuture instanceof CompletableFuture ? (CompletableFuture<Object>) resultFromFuture : CompletableFuture.completedFuture(resultFromFuture); }}
Calling the dubbo
service at the gateway can be achieved by generalizing the call.
The ReferenceConfig
object is the key object to support generalization calls , and its initialization operation is done during data synchronization. There are two parts of data involved here, one is the synchronized plugin handler
information and the other is the synchronized plugin metadata information.
- org.apache.shenyu.plugin.dubbo.common.handler.AbstractDubboPluginDataHandler#handlerPlugin()
When the plugin data is updated, the data synchronization module synchronizes the data from shenyu-admin
to the gateway. The initialization operation is performed in handlerPlugin()
.
public abstract class AbstractDubboPluginDataHandler implements PluginDataHandler { //...... //Initializing the configuration cache protected abstract void initConfigCache(DubboRegisterConfig dubboRegisterConfig);
@Override public void handlerPlugin(final PluginData pluginData) { if (Objects.nonNull(pluginData) && Boolean.TRUE.equals(pluginData.getEnabled())) { //Data deserialization DubboRegisterConfig dubboRegisterConfig = GsonUtils.getInstance().fromJson(pluginData.getConfig(), DubboRegisterConfig.class); DubboRegisterConfig exist = Singleton.INST.get(DubboRegisterConfig.class); if (Objects.isNull(dubboRegisterConfig)) { return; } if (Objects.isNull(exist) || !dubboRegisterConfig.equals(exist)) { // Perform initialization operations this.initConfigCache(dubboRegisterConfig); } Singleton.INST.single(DubboRegisterConfig.class, dubboRegisterConfig); } } //......}
- org.apache.shenyu.plugin.apache.dubbo.handler.ApacheDubboPluginDataHandler#initConfigCache()
Perform initialization operations.
public class ApacheDubboPluginDataHandler extends AbstractDubboPluginDataHandler {
@Override protected void initConfigCache(final DubboRegisterConfig dubboRegisterConfig) { //perform initialization operations ApacheDubboConfigCache.getInstance().init(dubboRegisterConfig); //cached results before failure ApacheDubboConfigCache.getInstance().invalidateAll(); }}
- org.apache.shenyu.plugin.apache.dubbo.cache.ApacheDubboConfigCache#init()
In the initialization, set registryConfig
and consumerConfig
.
public final class ApacheDubboConfigCache extends DubboConfigCache { //...... /** * init */ public void init(final DubboRegisterConfig dubboRegisterConfig) { //ApplicationConfig if (Objects.isNull(applicationConfig)) { applicationConfig = new ApplicationConfig("shenyu_proxy"); } //When the protocol or address changes, you need to update the registryConfig if (needUpdateRegistryConfig(dubboRegisterConfig)) { RegistryConfig registryConfigTemp = new RegistryConfig(); registryConfigTemp.setProtocol(dubboRegisterConfig.getProtocol()); registryConfigTemp.setId("shenyu_proxy"); registryConfigTemp.setRegister(false); registryConfigTemp.setAddress(dubboRegisterConfig.getRegister()); Optional.ofNullable(dubboRegisterConfig.getGroup()).ifPresent(registryConfigTemp::setGroup); registryConfig = registryConfigTemp; } //ConsumerConfig if (Objects.isNull(consumerConfig)) { consumerConfig = ApplicationModel.getConfigManager().getDefaultConsumer().orElseGet(() -> { ConsumerConfig consumerConfig = new ConsumerConfig(); consumerConfig.refresh(); return consumerConfig; }); //ConsumerConfig Optional.ofNullable(dubboRegisterConfig.getThreadpool()).ifPresent(consumerConfig::setThreadpool); Optional.ofNullable(dubboRegisterConfig.getCorethreads()).ifPresent(consumerConfig::setCorethreads); Optional.ofNullable(dubboRegisterConfig.getThreads()).ifPresent(consumerConfig::setThreads); Optional.ofNullable(dubboRegisterConfig.getQueues()).ifPresent(consumerConfig::setQueues); } } //Does the registration configuration need to be updated private boolean needUpdateRegistryConfig(final DubboRegisterConfig dubboRegisterConfig) { if (Objects.isNull(registryConfig)) { return true; } return !Objects.equals(dubboRegisterConfig.getProtocol(), registryConfig.getProtocol()) || !Objects.equals(dubboRegisterConfig.getRegister(), registryConfig.getAddress()) || !Objects.equals(dubboRegisterConfig.getProtocol(), registryConfig.getProtocol()); }
//......}
- org.apache.shenyu.plugin.apache.dubbo.subscriber.ApacheDubboMetaDataSubscriber#onSubscribe()
When the metadata is updated, the data synchronization module synchronizes the data from shenyu-admin
to the gateway. The metadata update operation is performed in the onSubscribe()
method.
public class ApacheDubboMetaDataSubscriber implements MetaDataSubscriber { //local memory cache private static final ConcurrentMap<String, MetaData> META_DATA = Maps.newConcurrentMap();
//update metaData public void onSubscribe(final MetaData metaData) { // dubbo if (RpcTypeEnum.DUBBO.getName().equals(metaData.getRpcType())) { //Whether the corresponding metadata exists MetaData exist = META_DATA.get(metaData.getPath()); if (Objects.isNull(exist) || Objects.isNull(ApacheDubboConfigCache.getInstance().get(metaData.getPath()))) { // initRef ApacheDubboConfigCache.getInstance().initRef(metaData); } else { // The corresponding metadata has undergone an update operation if (!Objects.equals(metaData.getServiceName(), exist.getServiceName()) || !Objects.equals(metaData.getRpcExt(), exist.getRpcExt()) || !Objects.equals(metaData.getParameterTypes(), exist.getParameterTypes()) || !Objects.equals(metaData.getMethodName(), exist.getMethodName())) { //Build ReferenceConfig again based on the latest metadata ApacheDubboConfigCache.getInstance().build(metaData); } } //local memory cache META_DATA.put(metaData.getPath(), metaData); } }
//dalete public void unSubscribe(final MetaData metaData) { if (RpcTypeEnum.DUBBO.getName().equals(metaData.getRpcType())) { //使ReferenceConfig失效 ApacheDubboConfigCache.getInstance().invalidate(metaData.getPath()); META_DATA.remove(metaData.getPath()); } }}
- org.apache.shenyu.plugin.apache.dubbo.cache.ApacheDubboConfigCache#initRef()
Build ReferenceConfig
objects from metaData
.
public final class ApacheDubboConfigCache extends DubboConfigCache { //...... public ReferenceConfig<GenericService> initRef(final MetaData metaData) { try { //First try to get it from the cache, and return it directly if it exists ReferenceConfig<GenericService> referenceConfig = cache.get(metaData.getPath()); if (StringUtils.isNoneBlank(referenceConfig.getInterface())) { return referenceConfig; } } catch (ExecutionException e) { LOG.error("init dubbo ref exception", e); } //build if not exist return build(metaData); }
/** * Build reference config. */ @SuppressWarnings("deprecation") public ReferenceConfig<GenericService> build(final MetaData metaData) { if (Objects.isNull(applicationConfig) || Objects.isNull(registryConfig)) { return new ReferenceConfig<>(); } ReferenceConfig<GenericService> reference = new ReferenceConfig<>(); //ReferenceConfig reference.setGeneric("true"); //generic invoke reference.setAsync(true);//async
reference.setApplication(applicationConfig);//applicationConfig reference.setRegistry(registryConfig);//registryConfig reference.setConsumer(consumerConfig);//consumerConfig reference.setInterface(metaData.getServiceName());//serviceName reference.setProtocol("dubbo");//dubbo reference.setCheck(false); reference.setLoadbalance("gray");//gray
Map<String, String> parameters = new HashMap<>(2); parameters.put("dispatcher", "direct"); reference.setParameters(parameters);
String rpcExt = metaData.getRpcExt();//rpc ext param DubboParam dubboParam = parserToDubboParam(rpcExt); if (Objects.nonNull(dubboParam)) { if (StringUtils.isNoneBlank(dubboParam.getVersion())) { reference.setVersion(dubboParam.getVersion());//version } if (StringUtils.isNoneBlank(dubboParam.getGroup())) { reference.setGroup(dubboParam.getGroup());//group } if (StringUtils.isNoneBlank(dubboParam.getUrl())) { reference.setUrl(dubboParam.getUrl());//url } if (StringUtils.isNoneBlank(dubboParam.getCluster())) { reference.setCluster(dubboParam.getCluster()); } Optional.ofNullable(dubboParam.getTimeout()).ifPresent(reference::setTimeout);//timeout Optional.ofNullable(dubboParam.getRetries()).ifPresent(reference::setRetries);//retires Optional.ofNullable(dubboParam.getSent()).ifPresent(reference::setSent);//Whether to ack async-sent } try { //get GenericService Object obj = reference.get(); if (Objects.nonNull(obj)) { LOG.info("init apache dubbo reference success there meteData is :{}", metaData); //cache reference cache.put(metaData.getPath(), reference); } } catch (Exception e) { LOG.error("init apache dubbo reference exception", e); } return reference; } //...... }
#
2.6 Execute ResponsePlugin- org.apache.shenyu.plugin.response.ResponsePlugin#execute()
The response results are handled by the ResponsePlugin
plugin.
@Override public Mono<Void> execute(final ServerWebExchange exchange, final ShenyuPluginChain chain) { ShenyuContext shenyuContext = exchange.getAttribute(Constants.CONTEXT); assert shenyuContext != null; // handle results according to rpc type return writerMap.get(shenyuContext.getRpcType()).writeWith(exchange, chain); }
The processing type is determined by MessageWriter
and the class inheritance relationship is as follows.
- MessageWriter: interface, defining message processing methods.
- NettyClientMessageWriter: processing of
Netty
call results. - RPCMessageWriter: processing the results of
RPC
calls. - WebClientMessageWriter: processing the results of
WebClient
calls.
Dubbo
service call, the processing result is RPCMessageWriter
of course.
- org.apache.shenyu.plugin.response.strategy.RPCMessageWriter#writeWith()
Process the response results in the writeWith()
method.
public class RPCMessageWriter implements MessageWriter {
@Override public Mono<Void> writeWith(final ServerWebExchange exchange, final ShenyuPluginChain chain) { return chain.execute(exchange).then(Mono.defer(() -> { Object result = exchange.getAttribute(Constants.RPC_RESULT); //result if (Objects.isNull(result)) { Object error = ShenyuResultWrap.error(exchange, ShenyuResultEnum.SERVICE_RESULT_ERROR, null); return WebFluxResultUtils.result(exchange, error); } return WebFluxResultUtils.result(exchange, result); })); }}
At this point in the analysis, the source code analysis of the Dubbo
plugin is complete, and the analysis flow chart is as follows.
#
3. SummaryThe source code analysis in this article starts from Dubbo
service registration to Dubbo
plug-in service calls. The Dubbo
plugin is mainly used to handle the conversion of http
requests to the dubbo
protocol, and the main logic is implemented through generalized calls.