Sentinel客户端的规则大部分都是在Dashboard上完成配置,那么Sentinel客户端与Dashboard之间是如何进行通讯的呢?
客户端定时向dashboard发送心跳
要想实现Sentinel客户端与Dashboard进行通讯,我们需要引入下面的依赖:
<dependency>
<groupId>com.alibaba.csp</groupId>
<artifactId>sentinel-transport-common</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>com.alibaba.csp</groupId>
<artifactId>sentinel-transport-simple-http</artifactId>
<version>${project.version}</version>
</dependency>
当然只需要引入sentinel-transport-simple-http
即可,他里面已经引入了sentinel-transport-common
,如果使用了spring-cloud-starter-alibaba-sentinel
也会引入上述的包。
sentinel-transport-common
中通过spi机制引入了两个实现了InitFunc接口的实现类:
- HeartbeatSenderInitFunc:负责客户端发送心跳至dashboard
- CommandCenterInitFunc:负责各种HTTP接口请求
HeartbeatSenderInitFunc#init主要就是负责构建一个定时线程池,然后定时发送心跳给dashboard,默认间隔时间为10s。
com.alibaba.csp.sentinel.transport.init.HeartbeatSenderInitFunc#init
private void initSchedulerIfNeeded() {
if (pool == null) {
pool = new ScheduledThreadPoolExecutor(2,
new NamedThreadFactory("sentinel-heartbeat-send-task", true),
new DiscardOldestPolicy());
}
}
@Override
public void init() {
HeartbeatSender sender = HeartbeatSenderProvider.getHeartbeatSender();
if (sender == null) {
RecordLog.warn("[HeartbeatSenderInitFunc] WARN: No HeartbeatSender loaded");
return;
}
// 初始化ScheduledThreadPoolExecutor
initSchedulerIfNeeded();
// 定时发送间隔 1000 * 10
long interval = retrieveInterval(sender);
setIntervalIfNotExists(interval);
// 定时发送心跳任务
scheduleHeartbeatTask(sender, interval);
}
private boolean isValidHeartbeatInterval(Long interval) {
return interval != null && interval > 0;
}
private void setIntervalIfNotExists(long interval) {
SentinelConfig.setConfig(TransportConfig.HEARTBEAT_INTERVAL_MS, String.valueOf(interval));
}
long retrieveInterval(/*@NonNull*/ HeartbeatSender sender) {
Long intervalInConfig = TransportConfig.getHeartbeatIntervalMs();
if (isValidHeartbeatInterval(intervalInConfig)) {
RecordLog.info("[HeartbeatSenderInitFunc] Using heartbeat interval "
+ "in Sentinel config property: " + intervalInConfig);
return intervalInConfig;
} else {
long senderInterval = sender.intervalMs();
RecordLog.info("[HeartbeatSenderInit] Heartbeat interval not configured in "
+ "config property or invalid, using sender default: " + senderInterval);
return senderInterval;
}
}
private void scheduleHeartbeatTask(/*@NonNull*/ final HeartbeatSender sender, /*@Valid*/ long interval) {
pool.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
// 定时发送心跳任务
sender.sendHeartbeat();
} catch (Throwable e) {
RecordLog.warn("[HeartbeatSender] Send heartbeat error", e);
}
}
}, 5000, interval, TimeUnit.MILLISECONDS);
RecordLog.info("[HeartbeatSenderInit] HeartbeatSender started: "
+ sender.getClass().getCanonicalName());
}
sendHeartbeat()负责具体的HTTP调用。
com.alibaba.csp.sentinel.transport.heartbeat.SimpleHttpHeartbeatSender#sendHeartbeat
public boolean sendHeartbeat() throws Exception {
if (TransportConfig.getRuntimePort() <= 0) {
RecordLog.info("[SimpleHttpHeartbeatSender] Command server port not initialized, won't send heartbeat");
return false;
}
// 获取dashboard的地址。在配置文件中配置的地址
Endpoint addrInfo = getAvailableAddress();
if (addrInfo == null) {
return false;
}
SimpleHttpRequest request = new SimpleHttpRequest(addrInfo, TransportConfig.getHeartbeatApiPath());
// 请求的参数,主要是当前节点的端口,默认为8719
request.setParams(heartBeat.generateCurrentMessage());
try {
// 请求dashboard
SimpleHttpResponse response = httpClient.post(request);
if (response.getStatusCode() == OK_STATUS) {
return true;
} else if (clientErrorCode(response.getStatusCode()) || serverErrorCode(response.getStatusCode())) {
RecordLog.warn("[SimpleHttpHeartbeatSender] Failed to send heartbeat to " + addrInfo
+ ", http status code: " + response.getStatusCode());
}
} catch (Exception e) {
RecordLog.warn("[SimpleHttpHeartbeatSender] Failed to send heartbeat to " + addrInfo, e);
}
return false;
}
注册完成后,就可以在dashboard的机器列表菜单看到了。
客户端启动HTTP服务
客户端会通过CommandCenterInitFunc类来启动一个HTTP服务。
com.alibaba.csp.sentinel.transport.init.CommandCenterInitFunc#init
public void init() throws Exception {
CommandCenter commandCenter = CommandCenterProvider.getCommandCenter();
if (commandCenter == null) {
RecordLog.warn("[CommandCenterInitFunc] Cannot resolve CommandCenter");
return;
}
/**
* 注册CommandHandler
* @see com.alibaba.csp.sentinel.transport.command.SimpleHttpCommandCenter#beforeStart()
*/
commandCenter.beforeStart();
/**
* 启动ServerSocket,监听http请求
* @see com.alibaba.csp.sentinel.transport.command.SimpleHttpCommandCenter#start()
*/
commandCenter.start();
RecordLog.info("[CommandCenterInit] Starting command center: "
+ commandCenter.getClass().getCanonicalName());
}
SimpleHttpCommandCenter会使用最原始的BIO(ServerSocket)来处理请求。
public void beforeStart() throws Exception {
// Register handlers
// 通过SPI加载所有的CommandHandler
Map<String, CommandHandler> handlers = CommandHandlerProvider.getInstance().namedHandlers();
// 将CommandHandler放入handlerMap
registerCommands(handlers);
}
public void start() throws Exception {
int nThreads = Runtime.getRuntime().availableProcessors();
// bizExecutor负责处理业务请求
this.bizExecutor = new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS,
new ArrayBlockingQueue<Runnable>(10),
new NamedThreadFactory("sentinel-command-center-service-executor"),
new RejectedExecutionHandler() {
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
CommandCenterLog.info("EventTask rejected");
throw new RejectedExecutionException();
}
});
Runnable serverInitTask = new Runnable() {
int port;
{
try {
port = Integer.parseInt(TransportConfig.getPort());
} catch (Exception e) {
port = DEFAULT_PORT;
}
}
@Override
public void run() {
boolean success = false;
// 启动ServerSocket
ServerSocket serverSocket = getServerSocketFromBasePort(port);
if (serverSocket != null) {
CommandCenterLog.info("[CommandCenter] Begin listening at port " + serverSocket.getLocalPort());
socketReference = serverSocket;
// ServerThread负责接收请求
executor.submit(new ServerThread(serverSocket));
success = true;
port = serverSocket.getLocalPort();
} else {
CommandCenterLog.info("[CommandCenter] chooses port fail, http command center will not work");
}
if (!success) {
port = PORT_UNINITIALIZED;
}
TransportConfig.setRuntimePort(port);
executor.shutdown();
}
};
// 初始化ServerSocket
new Thread(serverInitTask).start();
}
dashboard拉取客户端的规则
Dashboard会通过接口http://localhost:8719/getRules?type=flow
来拉取客户端的规则。
ServerThread主要负责接收请求,具体请求的处理交给bizExecutor线程池。
com.alibaba.csp.sentinel.transport.command.SimpleHttpCommandCenter.ServerThread#run
public void run() {
while (true) {
Socket socket = null;
try {
// 接收请求
socket = this.serverSocket.accept();
setSocketSoTimeout(socket);
HttpEventTask eventTask = new HttpEventTask(socket);
// 异步处理请求
bizExecutor.submit(eventTask);
} catch (Exception e) {
CommandCenterLog.info("Server error", e);
if (socket != null) {
try {
socket.close();
} catch (Exception e1) {
CommandCenterLog.info("Error when closing an opened socket", e1);
}
}
try {
// In case of infinite log.
Thread.sleep(10);
} catch (InterruptedException e1) {
// Indicates the task should stop.
break;
}
}
}
}
HttpEventTask主要负责解析请求的url,根据url找到CommandHandler,具体请求的响应式交给CommandHandler来处理的,例如这里的url是/getRules
,找到对应的CommandHandler为FetchActiveRuleCommandHandler。
com.alibaba.csp.sentinel.transport.command.http.HttpEventTask#run
public void run() {
if (socket == null) {
return;
}
PrintWriter printWriter = null;
InputStream inputStream = null;
try {
long start = System.currentTimeMillis();
inputStream = new BufferedInputStream(socket.getInputStream());
OutputStream outputStream = socket.getOutputStream();
printWriter = new PrintWriter(
new OutputStreamWriter(outputStream, Charset.forName(SentinelConfig.charset())));
// GET /getRules?type=flow HTTP/1.1
String firstLine = readLine(inputStream);
CommandCenterLog.info("[SimpleHttpCommandCenter] Socket income: " + firstLine
+ ", addr: " + socket.getInetAddress());
CommandRequest request = processQueryString(firstLine);
if (firstLine.length() > 4 && StringUtil.equalsIgnoreCase("POST", firstLine.substring(0, 4))) {
// Deal with post method
processPostRequest(inputStream, request);
}
// Validate the target command.
// getRules
String commandName = HttpCommandUtils.getTarget(request);
if (StringUtil.isBlank(commandName)) {
writeResponse(printWriter, StatusCode.BAD_REQUEST, INVALID_COMMAND_MESSAGE);
return;
}
// Find the matching command handler.
CommandHandler<?> commandHandler = SimpleHttpCommandCenter.getHandler(commandName);
if (commandHandler != null) {
/**
* @see FetchActiveRuleCommandHandler#handle(com.alibaba.csp.sentinel.command.CommandRequest)
*/
CommandResponse<?> response = commandHandler.handle(request);
handleResponse(response, printWriter);
} else {
// No matching command handler.
writeResponse(printWriter, StatusCode.BAD_REQUEST, "Unknown command `" + commandName + '`');
}
long cost = System.currentTimeMillis() - start;
CommandCenterLog.info("[SimpleHttpCommandCenter] Deal a socket task: " + firstLine
+ ", address: " + socket.getInetAddress() + ", time cost: " + cost + " ms");
} catch (RequestException e) {
writeResponse(printWriter, e.getStatusCode(), e.getMessage());
} catch (Throwable e) {
CommandCenterLog.warn("[SimpleHttpCommandCenter] CommandCenter error", e);
try {
if (printWriter != null) {
String errorMessage = SERVER_ERROR_MESSAGE;
e.printStackTrace();
if (!writtenHead) {
writeResponse(printWriter, StatusCode.INTERNAL_SERVER_ERROR, errorMessage);
} else {
printWriter.println(errorMessage);
}
printWriter.flush();
}
} catch (Exception e1) {
CommandCenterLog.warn("Failed to write error response", e1);
}
} finally {
closeResource(inputStream);
closeResource(printWriter);
closeResource(socket);
}
}
FetchActiveRuleCommandHandler主要是根据不同的规则类型从RuleManager中读取规则。
com.alibaba.csp.sentinel.command.handler.FetchActiveRuleCommandHandler#handle
public CommandResponse<String> handle(CommandRequest request) {
String type = request.getParam("type");
if ("flow".equalsIgnoreCase(type)) {
return CommandResponse.ofSuccess(JSON.toJSONString(FlowRuleManager.getRules()));
} else if ("degrade".equalsIgnoreCase(type)) {
return CommandResponse.ofSuccess(JSON.toJSONString(DegradeRuleManager.getRules()));
} else if ("authority".equalsIgnoreCase(type)) {
return CommandResponse.ofSuccess(JSON.toJSONString(AuthorityRuleManager.getRules()));
} else if ("system".equalsIgnoreCase(type)) {
return CommandResponse.ofSuccess(JSON.toJSONString(SystemRuleManager.getRules()));
} else {
return CommandResponse.ofFailure(new IllegalArgumentException("invalid type"));
}
}
dashboard修改配置推送客户端
Dashboard会通过接口http://localhost:8719/setRules
来修改客户端的规则。
根据urlsetRules
找到对应的CommandHandler为ModifyRulesCommandHandler。
ModifyRulesCommandHandler根据type找到RuleManager重新加载所有的规则,注意是全量更新而不是增加,所以如果想删除所有的规则,也需要传入一个空的集合。
com.alibaba.csp.sentinel.command.handler.ModifyRulesCommandHandler#handle
public CommandResponse<String> handle(CommandRequest request) {
// XXX from 1.7.2, force to fail when fastjson is older than 1.2.12
// We may need a better solution on this.
if (VersionUtil.fromVersionString(JSON.VERSION) < FASTJSON_MINIMAL_VER) {
// fastjson too old
return CommandResponse.ofFailure(new RuntimeException("The \"fastjson-" + JSON.VERSION
+ "\" introduced in application is too old, you need fastjson-1.2.12 at least."));
}
String type = request.getParam("type");
// rule data in get parameter
String data = request.getParam("data");
if (StringUtil.isNotEmpty(data)) {
try {
data = URLDecoder.decode(data, "utf-8");
} catch (Exception e) {
RecordLog.info("Decode rule data error", e);
return CommandResponse.ofFailure(e, "decode rule data error");
}
}
RecordLog.info("Receiving rule change (type: {}): {}", type, data);
String result = "success";
if (FLOW_RULE_TYPE.equalsIgnoreCase(type)) {
List<FlowRule> flowRules = JSONArray.parseArray(data, FlowRule.class);
// 重新加载所有的规则
FlowRuleManager.loadRules(flowRules);
if (!writeToDataSource(getFlowDataSource(), flowRules)) {
result = WRITE_DS_FAILURE_MSG;
}
return CommandResponse.ofSuccess(result);
} else if (AUTHORITY_RULE_TYPE.equalsIgnoreCase(type)) {
List<AuthorityRule> rules = JSONArray.parseArray(data, AuthorityRule.class);
AuthorityRuleManager.loadRules(rules);
if (!writeToDataSource(getAuthorityDataSource(), rules)) {
result = WRITE_DS_FAILURE_MSG;
}
return CommandResponse.ofSuccess(result);
} else if (DEGRADE_RULE_TYPE.equalsIgnoreCase(type)) {
List<DegradeRule> rules = JSONArray.parseArray(data, DegradeRule.class);
DegradeRuleManager.loadRules(rules);
if (!writeToDataSource(getDegradeDataSource(), rules)) {
result = WRITE_DS_FAILURE_MSG;
}
return CommandResponse.ofSuccess(result);
} else if (SYSTEM_RULE_TYPE.equalsIgnoreCase(type)) {
List<SystemRule> rules = JSONArray.parseArray(data, SystemRule.class);
SystemRuleManager.loadRules(rules);
if (!writeToDataSource(getSystemSource(), rules)) {
result = WRITE_DS_FAILURE_MSG;
}
return CommandResponse.ofSuccess(result);
}
return CommandResponse.ofFailure(new IllegalArgumentException("invalid type"));
}
至此我们可以发现Dashboard只是提供一个页面操作规则数据,本身并不存储任何的数据,所以Dashboard的重启并不会导致规则数据的丢失,但是应用的重启会导致规则数据的丢失。
上面的代码我们发现在重新加载完规则后,都会调用一个writeToDataSource()方法,这个方法可能为我们持久化规则数据提供了扩展点。