[每日一句]
也许你度过了很糟糕的一天,但这并不代表你会因此度过糟糕的一生。
[背景介绍]
- 分布式系统的规模和复杂度不断增加,随着而来的是对分布式系统可用性的要求越来越高。在各种高可用设计模式中,【熔断、隔离、降级、限流】是经常被使用的。而相关的技术,Hystrix本身早已算不上什么新技术,但它却是最经典的技术体系!。
- Hystrix以实现熔断降级的设计,从而提高了系统的可用性。
- Hystrix是一个在调用端上,实现断路器模式,以及隔舱模式,通过避免级联故障,提高系统容错能力,从而实现高可用设计的一个Java服务组件库。
- *Hystrix实现了资源隔离机制
前提介绍
Hystrix的超时检测本质上通过启动单独线程去检测的,线程的执行的时间刚好就是任务超时的时间,本质上就是这么个简单的逻辑。
Hystrix超时后会抛出一个 HystrixTimeoutException的异常。
超时检测逻辑
Hystrix的超时包括注册过程和执行过程两个,注册过程如下:
- 执行lift(new HystrixObservableTimeoutOperator(_cmd))关联超时检测任务。
- 在HystrixObservableTimeoutOperator类中,new TimerListener()负责创建检测任务,HystrixTimer.getInstance().addTimerListener(listener)负责关联定时任务。
- 在HystrixObservableTimeoutOperator类中,addTimerListener通过java的定时任务服务scheduleAtFixedRate在延迟超时时间后执行。
Hystrix的超时执行过程如下:
- 在超时后执行listener.tick()方法后执行类TimerListener的tick方法
- 在TimerListener类的tick方法中执行timeoutRunnable.run()后执行HystrixContextRunnable的run方法
- 在HystrixContextRunnable类run方法中执行child.onError(new HystrixTimeoutException())实现超时。
- executeCommandWithSpecifiedIsolation(_cmd).lift(new HystrixObservableTimeoutOperator(_cmd));
private static class HystrixObservableTimeoutOperator implements Operator {
final AbstractCommand originalCommand;
public HystrixObservableTimeoutOperator(final AbstractCommand originalCommand) {
this.originalCommand = originalCommand;
}
public Subscribersuper R> call(final Subscribersuper R> child) {
final CompositeSubscription s = new CompositeSubscription();
child.add(s);
final HystrixRequestContext hystrixRequestContext =
HystrixRequestContext.getContextForCurrentThread();
TimerListener listener = new TimerListener() {
public void tick() {
if(originalCommand.isCommandTimedOut
.compareAndSet(TimedOutStatus.NOT_EXECUTED, TimedOutStatus.TIMED_OUT)) {
originalCommand.eventNotifier.markEvent(HystrixEventType.TIMEOUT,
originalCommand.commandKey);
s.unsubscribe();
final HystrixContextRunnable timeoutRunnable = new HystrixContextRunnable(
originalCommand.concurrencyStrategy, hystrixRequestContext, new Runnable() {
public void run() {
child.onError(new HystrixTimeoutException());
}
});
timeoutRunnable.run();
}
}
public int getIntervalTimeInMilliseconds() {
return originalCommand.properties.executionTimeoutInMilliseconds().get();
}
};
final Reference tl = HystrixTimer.getInstance().addTimerListener(listener);
originalCommand.timeoutTimer.set(tl);
Subscriber parent = new Subscriber() {
public void onCompleted() {
if (isNotTimedOut()) {
tl.clear();
child.onCompleted();
}
}
public void one rror(Throwable e) {
if (isNotTimedOut()) {
tl.clear();
child.onError(e);
}
}
public void onNext(R v) {
if (isNotTimedOut()) {
child.onNext(v);
}
}
private boolean isNotTimedOut() {
return originalCommand.isCommandTimedOut.get() == TimedOutStatus.COMPLETED ||
originalCommand.isCommandTimedOut.compareAndSet(TimedOutStatus.NOT_EXECUTED,
TimedOutStatus.COMPLETED);
}
};
s.add(parent);
return parent;
}
}
public Reference addTimerListener(final TimerListener listener) {
startThreadIfNeeded();
Runnable r = new Runnable() {
public void run() {
try {
listener.tick();
} catch (Exception e) {
logger.error("Failed while ticking TimerListener", e);
}
}
};
ScheduledFuture f = executor.get().getThreadPool().scheduleAtFixedRate(r,
listener.getIntervalTimeInMilliseconds(), listener.getIntervalTimeInMilliseconds(),
TimeUnit.MILLISECONDS);
return new TimerReference(listener, f);
}
public class HystrixContextRunnable implements Runnable {
private final Callable actual;
private final HystrixRequestContext parentThreadState;
public HystrixContextRunnable(Runnable actual) {
this(HystrixPlugins.getInstance().getConcurrencyStrategy(), actual);
}
public HystrixContextRunnable(HystrixConcurrencyStrategy concurrencyStrategy, final Runnable
actual) {
this(concurrencyStrategy, HystrixRequestContext.getContextForCurrentThread(), actual);
}
public HystrixContextRunnable(final HystrixConcurrencyStrategy concurrencyStrategy,
final HystrixRequestContext hystrixRequestContext, final Runnable actual) {
this.actual = concurrencyStrategy.wrapCallable(new Callable() {
public Void call() throws Exception {
actual.run();
return null;
}
});
this.parentThreadState = hystrixRequestContext;
}
public void run() {
HystrixRequestContext existingState = HystrixRequestContext.getContextForCurrentThread();
try {
HystrixRequestContext.setContextOnCurrentThread(parentThreadState);
try {
actual.call();
} catch (Exception e) {
throw new RuntimeException(e);
}
} finally {
HystrixRequestContext.setContextOnCurrentThread(existingState);
}
}
}
复制代码
分享资源
扫码关注发送:资源 获取以上资源