從零開始,設計一個高效能 API 閘道器

一、前言

最近在 github 上看了 soul 閘道器的設計,突然就來了興趣準備自己從零開始寫一個高效能的閘道器。經過兩週時間的開發,我的閘道器 ship-gate 核心功能基本都已完成,最大的缺陷就是前端功底太差沒有管理後臺 。

二、設計

2。1 技術選型

閘道器是所有請求的入口,所以要求有很高的吞吐量,為了實現這點可以使用請求非同步化來解決。目前一般有以下兩種方案:

Tomcat/Jetty+NIO+Servlet3

Servlet3 已經支援非同步,這種方案使用比較多,京東,有贊和 Zuul,都用的是這種方案。

Netty+NIO

Netty 為高併發而生,目前唯品會的閘道器使用這個策略,在唯品會的技術文章中在相同的情況下 Netty 是每秒 30w+的吞吐量,Tomcat 是 13w+,可以看出是有一定的差距的,但是 Netty 需要自己處理 HTTP 協議,這一塊比較麻煩。

後面發現 Soul 閘道器是基於 Spring WebFlux(底層 Netty)的,不用太關心 HTTP 協議的處理,於是決定也用 Spring WebFlux。

閘道器的第二個特點是具備可擴充套件性,比如 Netflix Zuul 有 preFilters,postFilters 等在不同的階段方便處理不同的業務,基於責任鏈模式將請求進行鏈式處理即可實現。

在微服務架構下,服務都會進行多例項部署來保證高可用,請求到達閘道器時,閘道器需要根據 URL 找到所有可用的例項,這時就需要服務註冊和發現功能,即註冊中心。

現在流行的註冊中心有 Apache 的 Zookeeper 和阿里的 Nacos 兩種(consul 有點小眾),因為之前寫 RPC 框架時已經用過了 Zookeeper,所以這次就選擇了 Nacos。

2。2 需求清單

首先要明確目標,即開發一個具備哪些特性的閘道器,總結下後如下:

自定義路由規則

可基於 version 的路由規則設定,路由物件包括 DEFAUL,HEADER 和 QUERY 三種,匹配方式包括=、regex、like 三種。

跨語言

HTTP 協議天生跨語言

高效能

Netty 本身就是一款高效能的通訊框架,同時 server 將一些路由規則等資料快取到 JVM 記憶體避免請求 admin 服務。

高可用

支援叢集模式防止單節點故障,無狀態。

灰度釋出

灰度釋出(又名金絲雀釋出)是指在黑與白之間,能夠平滑過渡的一種釋出方式。在其上可以進行 A/B testing,即讓一部分使用者繼續用產品特性 A,一部分使用者開始用產品特性 B,如果使用者對 B 沒有什麼反對意見,那麼逐步擴大範圍,把所有使用者都遷移到 B 上面來。透過特性一可以實現。

介面鑑權

基於責任鏈模式,使用者開發自己的鑑權外掛即可。

負載均衡

支援多種負載均衡演算法,如隨機,輪詢,加權輪詢等。利用 SPI 機制可以根據配置進行動態載入。

2。3 架構設計

在參考了一些優秀的閘道器 Zuul,Spring Cloud Gateway,Soul 後,將專案劃分為以下幾個模組。

從零開始,設計一個高效能 API 閘道器

它們之間的關係如圖:

從零開始,設計一個高效能 API 閘道器

閘道器設計

注意

:這張圖與實際實現有點出入,Nacos push 到本地快取的那個環節沒有實現,目前只有 ship-sever 定時輪詢 pull 的過程。ship-admin 從 Nacos 獲取註冊服務資訊的過程,也改成了 ServiceA 啟動時主動發生 HTTP 請求通知 ship-admin。

2。4 表結構設計

從零開始,設計一個高效能 API 閘道器

三、編碼

3。1 ship-client-spring-boot-starter

首先建立一個

spring-boot-starter

命名為

ship-client-spring-boot-starter

,其核心類

AutoRegisterListener

就是在專案啟動時做了兩件事:

服務資訊註冊到 Nacos 註冊中心

知 ship-admin 服務上線了並註冊下線 hook。

程式碼如下:

public class AutoRegisterListener implements ApplicationListener { private final static Logger LOGGER = LoggerFactory。getLogger(AutoRegisterListener。class); private volatile AtomicBoolean registered = new AtomicBoolean(false); private final ClientConfigProperties properties; @NacosInjected private NamingService namingService; @Autowired private RequestMappingHandlerMapping handlerMapping; private final ExecutorService pool; /** * url list to ignore */ private static List ignoreUrlList = new LinkedList<>(); static { ignoreUrlList。add(“/error”); } public AutoRegisterListener(ClientConfigProperties properties) { if (!check(properties)) { LOGGER。error(“client config port,contextPath,appName adminUrl and version can‘t be empty!”); throw new ShipException(“client config port,contextPath,appName adminUrl and version can’t be empty!”); } this。properties = properties; pool = new ThreadPoolExecutor(1, 4, 0, TimeUnit。SECONDS, new LinkedBlockingQueue<>()); } /** * check the ClientConfigProperties * * @param properties * @return */ private boolean check(ClientConfigProperties properties) { if (properties。getPort() == null || properties。getContextPath() == null || properties。getVersion() == null || properties。getAppName() == null || properties。getAdminUrl() == null) { return false; } return true; } @Override public void onApplicationEvent(ContextRefreshedEvent event) { if (!registered。compareAndSet(false, true)) { return; } doRegister(); registerShutDownHook(); } /** * send unregister request to admin when jvm shutdown */ private void registerShutDownHook() { final String url = “http://” + properties。getAdminUrl() + AdminConstants。UNREGISTER_PATH; final UnregisterAppDTO unregisterAppDTO = new UnregisterAppDTO(); unregisterAppDTO。setAppName(properties。getAppName()); unregisterAppDTO。setVersion(properties。getVersion()); unregisterAppDTO。setIp(IpUtil。getLocalIpAddress()); unregisterAppDTO。setPort(properties。getPort()); Runtime。getRuntime()。addShutdownHook(new Thread(() -> { OkhttpTool。doPost(url, unregisterAppDTO); LOGGER。info(“[{}:{}] unregister from ship-admin success!”, unregisterAppDTO。getAppName(), unregisterAppDTO。getVersion()); })); } /** * register all interface info to register center */ private void doRegister() { Instance instance = new Instance(); instance。setIp(IpUtil。getLocalIpAddress()); instance。setPort(properties。getPort()); instance。setEphemeral(true); Map metadataMap = new HashMap<>(); metadataMap。put(“version”, properties。getVersion()); metadataMap。put(“appName”, properties。getAppName()); instance。setMetadata(metadataMap); try { namingService。registerInstance(properties。getAppName(), NacosConstants。APP_GROUP_NAME, instance); } catch (NacosException e) { LOGGER。error(“register to nacos fail”, e); throw new ShipException(e。getErrCode(), e。getErrMsg()); } LOGGER。info(“register interface info to nacos success!”); // send register request to ship-admin String url = “http://” + properties。getAdminUrl() + AdminConstants。REGISTER_PATH; RegisterAppDTO registerAppDTO = buildRegisterAppDTO(instance); OkhttpTool。doPost(url, registerAppDTO); LOGGER。info(“register to ship-admin success!”); } private RegisterAppDTO buildRegisterAppDTO(Instance instance) { RegisterAppDTO registerAppDTO = new RegisterAppDTO(); registerAppDTO。setAppName(properties。getAppName()); registerAppDTO。setContextPath(properties。getContextPath()); registerAppDTO。setIp(instance。getIp()); registerAppDTO。setPort(instance。getPort()); registerAppDTO。setVersion(properties。getVersion()); return registerAppDTO; } }

3。2 ship-server

ship-sever 專案主要包括了兩個部分內容, 1。請求動態路由的主流程 2。本地快取資料和 ship-admin 及 nacos 同步,這部分在後面 3。3 再講。

ship-server 實現動態路由的原理是利用 WebFilter 攔截請求,然後將請求教給 plugin chain 去鏈式處理。

PluginFilter 根據 URL 解析出 appName,然後將啟用的 plugin 組裝成 plugin chain。

public class PluginFilter implements WebFilter { private ServerConfigProperties properties; public PluginFilter(ServerConfigProperties properties) { this。properties = properties; } @Override public Mono filter(ServerWebExchange exchange, WebFilterChain chain) { String appName = parseAppName(exchange); if (CollectionUtils。isEmpty(ServiceCache。getAllInstances(appName))) { throw new ShipException(ShipExceptionEnum。SERVICE_NOT_FIND); } PluginChain pluginChain = new PluginChain(properties, appName); pluginChain。addPlugin(new DynamicRoutePlugin(properties)); pluginChain。addPlugin(new AuthPlugin(properties)); return pluginChain。execute(exchange, pluginChain); } private String parseAppName(ServerWebExchange exchange) { RequestPath path = exchange。getRequest()。getPath(); String appName = path。value()。split(“/”)[1]; return appName; } }

PluginChain 繼承了 AbstractShipPlugin 並持有所有要執行的外掛。

/** * @Author: Ship * @Description: * @Date: Created in 2020/12/25 */ public class PluginChain extends AbstractShipPlugin { /** * the pos point to current plugin */ private int pos; /** * the plugins of chain */ private List plugins; private final String appName; public PluginChain(ServerConfigProperties properties, String appName) { super(properties); this。appName = appName; } /** * add enabled plugin to chain * * @param shipPlugin */ public void addPlugin(ShipPlugin shipPlugin) { if (plugins == null) { plugins = new ArrayList<>(); } if (!PluginCache。isEnabled(appName, shipPlugin。name())) { return; } plugins。add(shipPlugin); // order by the plugin‘s order plugins。sort(Comparator。comparing(ShipPlugin::order)); } @Override public Integer order() { return null; } @Override public String name() { return null; } @Override public Mono execute(ServerWebExchange exchange, PluginChain pluginChain) { if (pos == plugins。size()) { return exchange。getResponse()。setComplete(); } return pluginChain。plugins。get(pos++)。execute(exchange, pluginChain); } public String getAppName() { return appName; } }

AbstractShipPlugin 實現了 ShipPlugin 介面,並持有 ServerConfigProperties 配置物件。

public abstract class AbstractShipPlugin implements ShipPlugin { protected ServerConfigProperties properties; public AbstractShipPlugin(ServerConfigProperties properties) { this。properties = properties; } }

ShipPlugin 介面定義了所有外掛必須實現的三個方法

order()

name()

execute()

public interface ShipPlugin { /** * lower values have higher priority * * @return */ Integer order(); /** * return current plugin name * * @return */ String name(); Mono execute(ServerWebExchange exchange,PluginChain pluginChain); }

DynamicRoutePlugin 繼承了抽象類 AbstractShipPlugin,包含了動態路由的主要業務邏輯。

public class DynamicRoutePlugin extends AbstractShipPlugin { private final static Logger LOGGER = LoggerFactory。getLogger(DynamicRoutePlugin。class); private static WebClient webClient; private static final Gson gson = new GsonBuilder()。create(); static { HttpClient httpClient = HttpClient。create() 。tcpConfiguration(client -> client。doOnConnected(conn -> conn。addHandlerLast(new ReadTimeoutHandler(3)) 。addHandlerLast(new WriteTimeoutHandler(3))) 。option(ChannelOption。TCP_NODELAY, true) ); webClient = WebClient。builder()。clientConnector(new ReactorClientHttpConnector(httpClient)) 。build(); } public DynamicRoutePlugin(ServerConfigProperties properties) { super(properties); } @Override public Integer order() { return ShipPluginEnum。DYNAMIC_ROUTE。getOrder(); } @Override public String name() { return ShipPluginEnum。DYNAMIC_ROUTE。getName(); } @Override public Mono execute(ServerWebExchange exchange, PluginChain pluginChain) { String appName = pluginChain。getAppName(); ServiceInstance serviceInstance = chooseInstance(appName, exchange。getRequest()); // LOGGER。info(“selected instance is [{}]”, gson。toJson(serviceInstance)); // request service String url = buildUrl(exchange, serviceInstance); return forward(exchange, url); } /** * forward request to backend service * * @param exchange * @param url * @return */ private Mono forward(ServerWebExchange exchange, String url) { ServerHttpRequest request = exchange。getRequest(); ServerHttpResponse response = exchange。getResponse(); HttpMethod method = request。getMethod(); WebClient。RequestBodySpec requestBodySpec = webClient。method(method)。uri(url)。headers((headers) -> { headers。addAll(request。getHeaders()); }); WebClient。RequestHeadersSpec<?> reqHeadersSpec; if (requireHttpBody(method)) { reqHeadersSpec = requestBodySpec。body(BodyInserters。fromDataBuffers(request。getBody())); } else { reqHeadersSpec = requestBodySpec; } // nio->callback->nio return reqHeadersSpec。exchange()。timeout(Duration。ofMillis(properties。getTimeOutMillis())) 。onErrorResume(ex -> { return Mono。defer(() -> { String errorResultJson = “”; if (ex instanceof TimeoutException) { errorResultJson = “{\”code\“:5001,\”message\“:\”network timeout\“}”; } else { errorResultJson = “{\”code\“:5000,\”message\“:\”system error\“}”; } return ShipResponseUtil。doResponse(exchange, errorResultJson); })。then(Mono。empty()); })。flatMap(backendResponse -> { response。setStatusCode(backendResponse。statusCode()); response。getHeaders()。putAll(backendResponse。headers()。asHttpHeaders()); return response。writeWith(backendResponse。bodyToFlux(DataBuffer。class)); }); } /** * weather the http method need http body * * @param method * @return */ private boolean requireHttpBody(HttpMethod method) { if (method。equals(HttpMethod。POST) || method。equals(HttpMethod。PUT) || method。equals(HttpMethod。PATCH)) { return true; } return false; } private String buildUrl(ServerWebExchange exchange, ServiceInstance serviceInstance) { ServerHttpRequest request = exchange。getRequest(); String query = request。getURI()。getQuery(); String path = request。getPath()。value()。replaceFirst(“/” + serviceInstance。getAppName(), “”); String url = “http://” + serviceInstance。getIp() + “:” + serviceInstance。getPort() + path; if (!StringUtils。isEmpty(query)) { url = url + “?” + query; } return url; } /** * choose an ServiceInstance according to route rule config and load balancing algorithm * * @param appName * @param request * @return */ private ServiceInstance chooseInstance(String appName, ServerHttpRequest request) { List serviceInstances = ServiceCache。getAllInstances(appName); if (CollectionUtils。isEmpty(serviceInstances)) { LOGGER。error(“service instance of {} not find”, appName); throw new ShipException(ShipExceptionEnum。SERVICE_NOT_FIND); } String version = matchAppVersion(appName, request); if (StringUtils。isEmpty(version)) { throw new ShipException(“match app version error”); } // filter serviceInstances by version List instances = serviceInstances。stream()。filter(i -> i。getVersion()。equals(version))。collect(Collectors。toList()); //Select an instance based on the load balancing algorithm LoadBalance loadBalance = LoadBalanceFactory。getInstance(properties。getLoadBalance(), appName, version); ServiceInstance serviceInstance = loadBalance。chooseOne(instances); return serviceInstance; } private String matchAppVersion(String appName, ServerHttpRequest request) { List rules = RouteRuleCache。getRules(appName); rules。sort(Comparator。comparing(AppRuleDTO::getPriority)。reversed()); for (AppRuleDTO rule : rules) { if (match(rule, request)) { return rule。getVersion(); } } return null; } private boolean match(AppRuleDTO rule, ServerHttpRequest request) { String matchObject = rule。getMatchObject(); String matchKey = rule。getMatchKey(); String matchRule = rule。getMatchRule(); Byte matchMethod = rule。getMatchMethod(); if (MatchObjectEnum。DEFAULT。getCode()。equals(matchObject)) { return true; } else if (MatchObjectEnum。QUERY。getCode()。equals(matchObject)) { String param = request。getQueryParams()。getFirst(matchKey); if (!StringUtils。isEmpty(param)) { return StringTools。match(param, matchMethod, matchRule); } } else if (MatchObjectEnum。HEADER。getCode()。equals(matchObject)) { HttpHeaders headers = request。getHeaders(); String headerValue = headers。getFirst(matchKey); if (!StringUtils。isEmpty(headerValue)) { return StringTools。match(headerValue, matchMethod, matchRule); } } return false; } }

3。3 資料同步

app 資料同步

後臺服務(如訂單服務)啟動時,只將服務名,版本,ip 地址和埠號註冊到了 Nacos,並沒有例項的權重和啟用的外掛資訊怎麼辦?

一般線上的例項權重和外掛列表都是在管理介面配置,然後動態生效的,所以需要 ship-admin 定時更新例項的權重和外掛資訊到註冊中心。

對應程式碼 ship-admin 的 NacosSyncListener

@Configuration public class NacosSyncListener implements ApplicationListener { private static final Logger LOGGER = LoggerFactory。getLogger(NacosSyncListener。class); private static ScheduledThreadPoolExecutor scheduledPool = new ScheduledThreadPoolExecutor(1, new ShipThreadFactory(“nacos-sync”, true)。create()); @NacosInjected private NamingService namingService; @Value(“${nacos。discovery。server-addr}”) private String baseUrl; @Resource private AppService appService; @Override public void onApplicationEvent(ContextRefreshedEvent event) { if (event。getApplicationContext()。getParent() != null) { return; } String url = “http://” + baseUrl + NacosConstants。INSTANCE_UPDATE_PATH; scheduledPool。scheduleWithFixedDelay(new NacosSyncTask(namingService, url, appService), 0, 30L, TimeUnit。SECONDS); } class NacosSyncTask implements Runnable { private NamingService namingService; private String url; private AppService appService; private Gson gson = new GsonBuilder()。create(); public NacosSyncTask(NamingService namingService, String url, AppService appService) { this。namingService = namingService; this。url = url; this。appService = appService; } /** * Regular update weight,enabled plugins to nacos instance */ @Override public void run() { try { // get all app names ListView services = namingService。getServicesOfServer(1, Integer。MAX_VALUE, NacosConstants。APP_GROUP_NAME); if (CollectionUtils。isEmpty(services。getData())) { return; } List appNames = services。getData(); List appInfos = appService。getAppInfos(appNames); for (AppInfoDTO appInfo : appInfos) { if (CollectionUtils。isEmpty(appInfo。getInstances())) { continue; } for (ServiceInstance instance : appInfo。getInstances()) { Map queryMap = buildQueryMap(appInfo, instance); String resp = OkhttpTool。doPut(url, queryMap, “”); LOGGER。debug(“response :{}”, resp); } } } catch (Exception e) { LOGGER。error(“nacos sync task error”, e); } } private Map buildQueryMap(AppInfoDTO appInfo, ServiceInstance instance) { Map map = new HashMap<>(); map。put(“serviceName”, appInfo。getAppName()); map。put(“groupName”, NacosConstants。APP_GROUP_NAME); map。put(“ip”, instance。getIp()); map。put(“port”, instance。getPort()); map。put(“weight”, instance。getWeight()。doubleValue()); NacosMetadata metadata = new NacosMetadata(); metadata。setAppName(appInfo。getAppName()); metadata。setVersion(instance。getVersion()); metadata。setPlugins(String。join(“,”, appInfo。getEnabledPlugins())); map。put(“metadata”, StringTools。urlEncode(gson。toJson(metadata))); map。put(“ephemeral”, true); return map; } } }

ship-server 再定時從 Nacos 拉取 app 資料更新到本地 Map 快取。

@Configuration public class DataSyncTaskListener implements ApplicationListener { private static ScheduledThreadPoolExecutor scheduledPool = new ScheduledThreadPoolExecutor(1, new ShipThreadFactory(“service-sync”, true)。create()); @NacosInjected private NamingService namingService; @Autowired private ServerConfigProperties properties; @Override public void onApplicationEvent(ContextRefreshedEvent event) { if (event。getApplicationContext()。getParent() != null) { return; } scheduledPool。scheduleWithFixedDelay(new DataSyncTask(namingService) , 0L, properties。getCacheRefreshInterval(), TimeUnit。SECONDS); WebsocketSyncCacheServer websocketSyncCacheServer = new WebsocketSyncCacheServer(properties。getWebSocketPort()); websocketSyncCacheServer。start(); } class DataSyncTask implements Runnable { private NamingService namingService; public DataSyncTask(NamingService namingService) { this。namingService = namingService; } @Override public void run() { try { // get all app names ListView services = namingService。getServicesOfServer(1, Integer。MAX_VALUE, NacosConstants。APP_GROUP_NAME); if (CollectionUtils。isEmpty(services。getData())) { return; } List appNames = services。getData(); // get all instances for (String appName : appNames) { List instanceList = namingService。getAllInstances(appName, NacosConstants。APP_GROUP_NAME); if (CollectionUtils。isEmpty(instanceList)) { continue; } ServiceCache。add(appName, buildServiceInstances(instanceList)); List pluginNames = getEnabledPlugins(instanceList); PluginCache。add(appName, pluginNames); } ServiceCache。removeExpired(appNames); PluginCache。removeExpired(appNames); } catch (NacosException e) { e。printStackTrace(); } } private List getEnabledPlugins(List instanceList) { Instance instance = instanceList。get(0); Map metadata = instance。getMetadata(); // plugins: DynamicRoute,Auth String plugins = metadata。getOrDefault(“plugins”, ShipPluginEnum。DYNAMIC_ROUTE。getName()); return Arrays。stream(plugins。split(“,”))。collect(Collectors。toList()); } private List buildServiceInstances(List instanceList) { List list = new LinkedList<>(); instanceList。forEach(instance -> { Map metadata = instance。getMetadata(); ServiceInstance serviceInstance = new ServiceInstance(); serviceInstance。setAppName(metadata。get(“appName”)); serviceInstance。setIp(instance。getIp()); serviceInstance。setPort(instance。getPort()); serviceInstance。setVersion(metadata。get(“version”)); serviceInstance。setWeight((int) instance。getWeight()); list。add(serviceInstance); }); return list; } } }

路由規則資料同步

同時,如果使用者在管理後臺更新了路由規則,ship-admin 需要推送規則資料到 ship-server,這裡參考了 soul 閘道器的做法利用 websocket 在第一次建立連線後進行全量同步,此後路由規則發生變更就只作增量同步。

服務端 WebsocketSyncCacheServer:

/** * @Author: Ship * @Description: * @Date: Created in 2020/12/28 */ public class WebsocketSyncCacheServer extends WebSocketServer { private final static Logger LOGGER = LoggerFactory。getLogger(WebsocketSyncCacheServer。class); private Gson gson = new GsonBuilder()。create(); private MessageHandler messageHandler; public WebsocketSyncCacheServer(Integer port) { super(new InetSocketAddress(port)); this。messageHandler = new MessageHandler(); } @Override public void onOpen(WebSocket webSocket, ClientHandshake clientHandshake) { LOGGER。info(“server is open”); } @Override public void onClose(WebSocket webSocket, int i, String s, boolean b) { LOGGER。info(“websocket server close。。。”); } @Override public void onMessage(WebSocket webSocket, String message) { LOGGER。info(“websocket server receive message:\n[{}]”, message); this。messageHandler。handler(message); } @Override public void onError(WebSocket webSocket, Exception e) { } @Override public void onStart() { LOGGER。info(“websocket server start。。。”); } class MessageHandler { public void handler(String message) { RouteRuleOperationDTO operationDTO = gson。fromJson(message, RouteRuleOperationDTO。class); if (CollectionUtils。isEmpty(operationDTO。getRuleList())) { return; } Map> map = operationDTO。getRuleList() 。stream()。collect(Collectors。groupingBy(AppRuleDTO::getAppName)); if (OperationTypeEnum。INSERT。getCode()。equals(operationDTO。getOperationType()) || OperationTypeEnum。UPDATE。getCode()。equals(operationDTO。getOperationType())) { RouteRuleCache。add(map); } else if (OperationTypeEnum。DELETE。getCode()。equals(operationDTO。getOperationType())) { RouteRuleCache。remove(map); } } } }

客戶端 WebsocketSyncCacheClient:

@Component public class WebsocketSyncCacheClient { private final static Logger LOGGER = LoggerFactory。getLogger(WebsocketSyncCacheClient。class); private WebSocketClient client; private RuleService ruleService; private Gson gson = new GsonBuilder()。create(); public WebsocketSyncCacheClient(@Value(“${ship。server-web-socket-url}”) String serverWebSocketUrl, RuleService ruleService) { if (StringUtils。isEmpty(serverWebSocketUrl)) { throw new ShipException(ShipExceptionEnum。CONFIG_ERROR); } this。ruleService = ruleService; ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(1, new ShipThreadFactory(“websocket-connect”, true)。create()); try { client = new WebSocketClient(new URI(serverWebSocketUrl)) { @Override public void onOpen(ServerHandshake serverHandshake) { LOGGER。info(“client is open”); List list = ruleService。getEnabledRule(); String msg = gson。toJson(new RouteRuleOperationDTO(OperationTypeEnum。INSERT, list)); send(msg); } @Override public void onMessage(String s) { } @Override public void onClose(int i, String s, boolean b) { } @Override public void onError(Exception e) { LOGGER。error(“websocket client error”, e); } }; client。connectBlocking(); //使用排程執行緒池進行斷線重連,30秒進行一次 executor。scheduleAtFixedRate(() -> { if (client != null && client。isClosed()) { try { client。reconnectBlocking(); } catch (InterruptedException e) { LOGGER。error(“reconnect server fail”, e); } } }, 10, 30, TimeUnit。SECONDS); } catch (Exception e) { LOGGER。error(“websocket sync cache exception”, e); throw new ShipException(e。getMessage()); } } public void send(T t) { while (!client。getReadyState()。equals(ReadyState。OPEN)) { LOGGER。debug(“connecting 。。。please wait”); } client。send(gson。toJson(t)); } }

四、測試

4。1 動態路由測試

本地啟動 nacos ,

sh startup.sh -m standalone

啟動 ship-admin

本地啟動兩個 ship-example 例項。

例項 1 配置:

ship: http: app-name: order version: gray_1。0 context-path: /order port: 8081 admin-url: 127。0。0。1:9001 server: port: 8081 nacos: discovery: server-addr: 127。0。0。1:8848

例項 2 配置:

ship: http: app-name: order version: prod_1。0 context-path: /order port: 8082 admin-url: 127。0。0。1:9001 server: port: 8082 nacos: discovery: server-addr: 127。0。0。1:8848

在資料庫新增路由規則配置,該規則表示當 http header 中的 name=ship 時請求路由到 gray_1。0 版本的節點。

從零開始,設計一個高效能 API 閘道器

啟動 ship-server,看到以下日誌時則可以進行測試了。

2021-01-02 19:57:09。159 INFO 30413 ——- [SocketWorker-29] cn。sp。sync。WebsocketSyncCacheServer : websocket server receive message: [{“operationType”:“INSERT”,“ruleList”:[{“id”:1,“appId”:5,“appName”:“order”,“version”:“gray_1。0”,“matchObject”:“HEADER”,“matchKey”:“name”,“matchMethod”:1,“matchRule”:“ship”,“priority”:50}]}]

用 Postman 請求http://localhost:9000/order/user/add,POST方式,header設定name=ship,可以看到只有例項1有日誌顯示。

==========add user,version:gray_1。0

4。2 效能壓測

壓測環境:

MacBook Pro 13 英寸

處理器 2。3 GHz 四核 Intel Core i7

記憶體 16 GB 3733 MHz LPDDR4X

後端節點個數一個

壓測工具:wrk

壓測結果:20 個執行緒,500 個連線數,吞吐量大概每秒 9400 個請求。

從零開始,設計一個高效能 API 閘道器

壓測結果

五、總結

千里之行始於足下,開始以為寫一個閘道器會很難,但當你實際開始行動時就會發現其實沒那麼難,所以邁出第一步很重要。過程中也遇到了很多問題,還在 github 上給 soul 和 nacos 這兩個開源專案提了兩個 issue,後來發現是自己的問題,尷尬 。本文程式碼已全部上傳到 github:

https://github。com/2YSP/ship-gate

參考資料:

https://nacos。io/zh-cn/docs/quick-start。html

https://dromara。org/website/zh-cn/docs/soul/soul。html

https://docs。spring。io/spring-framework/docs/5。1。7。RELEASE/spring-framework-reference/web-reactive。html#webflux

https://github。com/TooTallNate/Java-WebSocket