序
本文主要研究一下HystrixEventNotifier
HystrixEventNotifier
/**
* Abstract EventNotifier that allows receiving notifications for different events with default implementations.
* <p>
* See {@link HystrixPlugins} or the Hystrix GitHub Wiki for information on configuring plugins: <a
* >https://github.com/Netflix/Hystrix/wiki/Plugins</a>.
* <p>
* <b>Note on thread-safety and performance</b>
* <p>
* A single implementation of this class will be used globally so methods on this class will be invoked concurrently from multiple threads so all functionality must be thread-safe.
* <p>
* Methods are also invoked synchronously and will add to execution time of the commands so all behavior should be fast. If anything time-consuming is to be done it should be spawned asynchronously
* onto separate worker threads.
*/
public abstract class HystrixEventNotifier {
/**
* Called for every event fired.
* <p>
* <b>Default Implementation: </b> Does nothing
*
* @param eventType event type
* @param key event key
*/
public void markEvent(HystrixEventType eventType, HystrixCommandKey key) {
// do nothing
}
/**
* Called after a command is executed using thread isolation.
* <p>
* Will not get called if a command is rejected, short-circuited etc.
* <p>
* <b>Default Implementation: </b> Does nothing
*
* @param key
* {@link HystrixCommandKey} of command instance.
* @param isolationStrategy
* {@link ExecutionIsolationStrategy} the isolation strategy used by the command when executed
* @param duration
* time in milliseconds of executing <code>run()</code> method
* @param eventsDuringExecution
* {@code List<HystrixEventType>} of events occurred during execution.
*/
public void markCommandExecution(HystrixCommandKey key, ExecutionIsolationStrategy isolationStrategy, int duration, List<HystrixEventType> eventsDuringExecution) {
// do nothing
}
}
- 這個(gè)notifier是同步調(diào)用的,因此里頭方法的實(shí)現(xiàn)不能太耗時(shí),不然則會(huì)阻塞,如果方法太耗時(shí)則需要考慮異步到其他線程
- markEvent,在每個(gè)事件觸發(fā)的時(shí)候調(diào)用,markCommandExecution是在使用線程隔離方式的時(shí)候會(huì)調(diào)用
HystrixEventNotifierDefault
hystrix-core-1.5.12-sources.jar!/com/netflix/hystrix/strategy/eventnotifier/HystrixEventNotifierDefault.java
/**
* Default implementations of {@link HystrixEventNotifier} that does nothing.
*
* @ExcludeFromJavadoc
*/
public class HystrixEventNotifierDefault extends HystrixEventNotifier {
private static HystrixEventNotifierDefault INSTANCE = new HystrixEventNotifierDefault();
private HystrixEventNotifierDefault() {
}
public static HystrixEventNotifier getInstance() {
return INSTANCE;
}
}
默認(rèn)實(shí)現(xiàn)不做任何操作
HystrixEventType
hystrix-core-1.5.12-sources.jar!/com/netflix/hystrix/HystrixEventType.java
/**
* Various states/events that execution can result in or have tracked.
* <p>
* These are most often accessed via {@link HystrixRequestLog} or {@link HystrixCommand#getExecutionEvents()}.
*/
public enum HystrixEventType {
EMIT(false),
SUCCESS(true),
FAILURE(false),
TIMEOUT(false),
BAD_REQUEST(true),
SHORT_CIRCUITED(false),
THREAD_POOL_REJECTED(false),
SEMAPHORE_REJECTED(false),
FALLBACK_EMIT(false),
FALLBACK_SUCCESS(true),
FALLBACK_FAILURE(true),
FALLBACK_REJECTION(true),
FALLBACK_MISSING(true),
EXCEPTION_THROWN(false),
RESPONSE_FROM_CACHE(true),
CANCELLED(true),
COLLAPSED(false),
COMMAND_MAX_ACTIVE(false);
private final boolean isTerminal;
HystrixEventType(boolean isTerminal) {
this.isTerminal = isTerminal;
}
public boolean isTerminal() {
return isTerminal;
}
//......
/**
* List of events that throw an Exception to the caller
*/
public final static List<HystrixEventType> EXCEPTION_PRODUCING_EVENT_TYPES = new ArrayList<HystrixEventType>();
/**
* List of events that are terminal
*/
public final static List<HystrixEventType> TERMINAL_EVENT_TYPES = new ArrayList<HystrixEventType>();
static {
EXCEPTION_PRODUCING_EVENT_TYPES.add(BAD_REQUEST);
EXCEPTION_PRODUCING_EVENT_TYPES.add(FALLBACK_FAILURE);
EXCEPTION_PRODUCING_EVENT_TYPES.add(FALLBACK_MISSING);
EXCEPTION_PRODUCING_EVENT_TYPES.add(FALLBACK_REJECTION);
for (HystrixEventType eventType: HystrixEventType.values()) {
if (eventType.isTerminal()) {
TERMINAL_EVENT_TYPES.add(eventType);
}
}
}
//......
}
- 這里定了HystrixEvent的枚舉,然后還對(duì)這些事件進(jìn)行了分類,分為EXCEPTION_PRODUCING_EVENT_TYPES以及EXCEPTION_PRODUCING_EVENT_TYPES兩類
- EXCEPTION_PRODUCING_EVENT_TYPES包括BAD_REQUEST、FALLBACK_FAILURE、FALLBACK_MISSING、FALLBACK_REJECTION
- TERMINAL_EVENT_TYPES主要是根據(jù)event枚舉的isTerminal屬性來,包括SUCCESS、BAD_REQUEST、FALLBACK_SUCCESS、FALLBACK_FAILURE、FALLBACK_REJECTION、FALLBACK_MISSING、RESPONSE_FROM_CACHE、CANCELLED
markCommandExecution
hystrix-core-1.5.12-sources.jar!/com/netflix/hystrix/AbstractCommand.java
/**
* This decorates "Hystrix" functionality around the run() Observable.
*
* @return R
*/
private Observable<R> executeCommandAndObserve(final AbstractCommand<R> _cmd) {
final HystrixRequestContext currentRequestContext = HystrixRequestContext.getContextForCurrentThread();
final Action1<R> markEmits = new Action1<R>() {
@Override
public void call(R r) {
if (shouldOutputOnNextEvents()) {
executionResult = executionResult.addEvent(HystrixEventType.EMIT);
eventNotifier.markEvent(HystrixEventType.EMIT, commandKey);
}
if (commandIsScalar()) {
long latency = System.currentTimeMillis() - executionResult.getStartTimestamp();
eventNotifier.markEvent(HystrixEventType.SUCCESS, commandKey);
executionResult = executionResult.addEvent((int) latency, HystrixEventType.SUCCESS);
eventNotifier.markCommandExecution(getCommandKey(), properties.executionIsolationStrategy().get(), (int) latency, executionResult.getOrderedList());
circuitBreaker.markSuccess();
}
}
};
final Action0 markOnCompleted = new Action0() {
@Override
public void call() {
if (!commandIsScalar()) {
long latency = System.currentTimeMillis() - executionResult.getStartTimestamp();
eventNotifier.markEvent(HystrixEventType.SUCCESS, commandKey);
executionResult = executionResult.addEvent((int) latency, HystrixEventType.SUCCESS);
eventNotifier.markCommandExecution(getCommandKey(), properties.executionIsolationStrategy().get(), (int) latency, executionResult.getOrderedList());
circuitBreaker.markSuccess();
}
}
};
final Func1<Throwable, Observable<R>> handleFallback = new Func1<Throwable, Observable<R>>() {
@Override
public Observable<R> call(Throwable t) {
circuitBreaker.markNonSuccess();
Exception e = getExceptionFromThrowable(t);
executionResult = executionResult.setExecutionException(e);
if (e instanceof RejectedExecutionException) {
return handleThreadPoolRejectionViaFallback(e);
} else if (t instanceof HystrixTimeoutException) {
return handleTimeoutViaFallback();
} else if (t instanceof HystrixBadRequestException) {
return handleBadRequestByEmittingError(e);
} else {
/*
* Treat HystrixBadRequestException from ExecutionHook like a plain HystrixBadRequestException.
*/
if (e instanceof HystrixBadRequestException) {
eventNotifier.markEvent(HystrixEventType.BAD_REQUEST, commandKey);
return Observable.error(e);
}
return handleFailureViaFallback(e);
}
}
};
final Action1<Notification<? super R>> setRequestContext = new Action1<Notification<? super R>>() {
@Override
public void call(Notification<? super R> rNotification) {
setRequestContextIfNeeded(currentRequestContext);
}
};
Observable<R> execution;
if (properties.executionTimeoutEnabled().get()) {
execution = executeCommandWithSpecifiedIsolation(_cmd)
.lift(new HystrixObservableTimeoutOperator<R>(_cmd));
} else {
execution = executeCommandWithSpecifiedIsolation(_cmd);
}
return execution.doOnNext(markEmits)
.doOnCompleted(markOnCompleted)
.onErrorResumeNext(handleFallback)
.doOnEach(setRequestContext);
}
- 這里在markEmits的action里頭如果是scalar command,會(huì)調(diào)用markCommandExecution
- 這里在markOnCompleted的action里頭,如果不是scalar command,則會(huì)調(diào)用markCommandExecution
小結(jié)
在HystrixCommand以及HystrixObservableCommand調(diào)用的時(shí)候,都會(huì)調(diào)用HystrixEventNotifier來發(fā)布事件,提供給開發(fā)者自定義實(shí)現(xiàn),來做指標(biāo)收集及監(jiān)控報(bào)警。