EventBus為第三方消息通信的框架,因為使用比Handler便利,廣受開發(fā)者喜愛,其底層實現還是利用的Handler,在其基礎上增加了注解,并根據注解在內部實現線程切換接收消息
EventBus使用只有簡單的三步:
- 注解方法
- 調用register方法注冊
- 調用post方法發(fā)送消息
一、EventBus源碼分析
源碼主要從兩個方法入手,就是register方法和post方法
獲取EventBus實例的getDefault方法就是一個單例:
public static EventBus getDefault() {
if (defaultInstance == null) {
synchronized (EventBus.class) {
if (defaultInstance == null) {
defaultInstance = new EventBus();
}
}
}
return defaultInstance;
}
此外,它還支持使用EventBusBuilder自定義構建實例,感興趣的自己查看下源碼
1.register
首先來看register方法的具體實現:
public void register(Object subscriber) {
Class<?> subscriberClass = subscriber.getClass();
// 獲取被注解的方法
List<SubscriberMethod> subscriberMethods = subscriberMethodFinder.findSubscriberMethods(subscriberClass);
synchronized (this) {
// 訂閱
for (SubscriberMethod subscriberMethod : subscriberMethods) {
subscribe(subscriber, subscriberMethod);
}
}
}
其中會獲取注冊對象的類對象,并調用findSubscriberMethods方法獲取類中EventBus注解的方法,并將方法包裝成SubscriberMethod列表,存入以注冊對象的Class為key的METHOD_CACHE 這個Map中,findSubscriberMethods方法的調用鏈實現如下:
// (Class : 方法包裝類列表)的緩存
private static final Map<Class<?>, List<SubscriberMethod>> METHOD_CACHE = new ConcurrentHashMap<>();
// 獲取類中EventBus注解的方法
List<SubscriberMethod> findSubscriberMethods(Class<?> subscriberClass) {
List<SubscriberMethod> subscriberMethods = METHOD_CACHE.get(subscriberClass);
if (subscriberMethods != null) {// 獲取到緩存,直接返回
return subscriberMethods;
}
if (ignoreGeneratedIndex) {
// 利用反射獲取注解方法列表
subscriberMethods = findUsingReflection(subscriberClass);
} else {
subscriberMethods = findUsingInfo(subscriberClass);
}
if (subscriberMethods.isEmpty()) {
throw new EventBusException("Subscriber " + subscriberClass
+ " and its super classes have no public methods with the @Subscribe annotation");
} else {// 將方法列表加入緩存后,返回
METHOD_CACHE.put(subscriberClass, subscriberMethods);
return subscriberMethods;
}
}
// 利用反射獲取注解方法列表
private List<SubscriberMethod> findUsingReflection(Class<?> subscriberClass) {
FindState findState = prepareFindState();
findState.initForSubscriber(subscriberClass);
while (findState.clazz != null) {
// 利用反射獲取當前類的方法
findUsingReflectionInSingleClass(findState);
// 向上查詢父類中被注解的方法
findState.moveToSuperclass();
}
// 將方法列表返回
return getMethodsAndRelease(findState);
}
// 真正利用反射獲取的方法
private void findUsingReflectionInSingleClass(FindState findState) {
Method[] methods;
try {
// This is faster than getMethods, especially when subscribers are fat classes like Activities
// 獲取當前類的所有方法
methods = findState.clazz.getDeclaredMethods();
} catch (Throwable th) {
// Workaround for java.lang.NoClassDefFoundError, see https://github.com/greenrobot/EventBus/issues/149
methods = findState.clazz.getMethods();
findState.skipSuperClasses = true;
}
for (Method method : methods) {
int modifiers = method.getModifiers();
// 如果是Public方法,并且不是抽象、靜態(tài)等方法
if ((modifiers & Modifier.PUBLIC) != 0 && (modifiers & MODIFIERS_IGNORE) == 0) {
Class<?>[] parameterTypes = method.getParameterTypes();
if (parameterTypes.length == 1) {// EventBus只支持一個參數
// 獲取Subscribe注解
Subscribe subscribeAnnotation = method.getAnnotation(Subscribe.class);
if (subscribeAnnotation != null) {
// 獲取入參的類對象
Class<?> eventType = parameterTypes[0];
if (findState.checkAdd(method, eventType)) {// 判斷下有沒有添加過
ThreadMode threadMode = subscribeAnnotation.threadMode();
// 包裝后,添加到方法列表中
findState.subscriberMethods.add(new SubscriberMethod(method, eventType, threadMode,
subscribeAnnotation.priority(), subscribeAnnotation.sticky()));
}
}
} else if (strictMethodVerification && method.isAnnotationPresent(Subscribe.class)) {
String methodName = method.getDeclaringClass().getName() + "." + method.getName();
throw new EventBusException("@Subscribe method " + methodName +
"must have exactly 1 parameter but has " + parameterTypes.length);
}
} else if (strictMethodVerification && method.isAnnotationPresent(Subscribe.class)) {
String methodName = method.getDeclaringClass().getName() + "." + method.getName();
throw new EventBusException(methodName +
" is a illegal @Subscribe method: must be public, non-static, and non-abstract");
}
}
}
回到register方法中,最后調用了subscribe方法,入參為注冊對象和包裝方法類SubscriberMethod,其中又做了緩存:
- 將注冊對象和方法包裝類SubscriberMethod重新包裝成Subscription對象,并將入參類型為Key,包裝類Subscription列表為Value存入subscriptionsByEventType這個Map中
- 再將該訂閱方法的參數類型存入以注冊對象為key,以參數類型集合為value的typesBySubscriber這個Map中
private void subscribe(Object subscriber, SubscriberMethod subscriberMethod) {
// 入參參數的類對象
Class<?> eventType = subscriberMethod.eventType;
// 將方法包裝類對象subscriberMethod和注冊時傳入對象包裝為Subscription類
Subscription newSubscription = new Subscription(subscriber, subscriberMethod);
CopyOnWriteArrayList<Subscription> subscriptions = subscriptionsByEventType.get(eventType);
if (subscriptions == null) {
subscriptions = new CopyOnWriteArrayList<>();
// 以入參參數的類對象為Key,Subscription列表為Value進行緩存
subscriptionsByEventType.put(eventType, subscriptions);
} else {
if (subscriptions.contains(newSubscription)) {
throw new EventBusException("Subscriber " + subscriber.getClass() + " already registered to event "
+ eventType);
}
}
int size = subscriptions.size();
for (int i = 0; i <= size; i++) {// 遍歷包裝類Subscription集合,根據優(yōu)先級插入新包裝的Subscription
if (i == size || subscriberMethod.priority > subscriptions.get(i).subscriberMethod.priority) {
subscriptions.add(i, newSubscription);
break;
}
}
// 獲取參數類型集合(EventBus的訂閱方法為:一個方法對應一個參數類型)
List<Class<?>> subscribedEvents = typesBySubscriber.get(subscriber);
if (subscribedEvents == null) {
subscribedEvents = new ArrayList<>();
// 存入以注冊對象為key,參數類型集合為value的Map中
typesBySubscriber.put(subscriber, subscribedEvents);
}
// 參數類型列表添加該訂閱方法的參數類型
subscribedEvents.add(eventType);
if (subscriberMethod.sticky) {// 默認為false,暫不做研究
if (eventInheritance) {
// Existing sticky events of all subclasses of eventType have to be considered.
// Note: Iterating over all events may be inefficient with lots of sticky events,
// thus data structure should be changed to allow a more efficient lookup
// (e.g. an additional map storing sub classes of super classes: Class -> List<Class>).
Set<Map.Entry<Class<?>, Object>> entries = stickyEvents.entrySet();
for (Map.Entry<Class<?>, Object> entry : entries) {
Class<?> candidateEventType = entry.getKey();
if (eventType.isAssignableFrom(candidateEventType)) {
Object stickyEvent = entry.getValue();
checkPostStickyEventToSubscription(newSubscription, stickyEvent);
}
}
} else {
Object stickyEvent = stickyEvents.get(eventType);
checkPostStickyEventToSubscription(newSubscription, stickyEvent);
}
}
}
總的來說,register方法做的事情就是利用反射獲取方法,往三個Map中存入對應的值,再來看下這三個Map:
METHOD_CACHE :以注冊對象的Class為key,以包裝方法類SubscriberMethod列表為value
subscriptionsByEventType:以訂閱方法的唯一入參參數:Class為key,以Subscription(包裝了注冊對象和SubscriberMethod(包裝了Method、注解信息等))列表為value
typesBySubscriber:以注冊對象為key,以訂閱方法的參數類型Class集合為value
簡單分析下這三個Map的作用:
當發(fā)送消息時,可以通過subscriptionsByEventType快速獲取到Subscription列表,進而進行方法的反射調用,反射需要對象和Method,這兩個Subscription都有
當調用unregister方法,取消訂閱時,通過typesBySubscriber可以快速獲取到訂閱方法的參數類型Class集合,通過遍歷參數類型Class集合,移除subscriptionsByEventType中以這些Class為key的Subscription集合,這樣注冊時的對象就不會內存泄漏了
至于METHOD_CACHE,存放著Class和Method的包裝類SubscriberMethod,以便通過Class快速獲取這個類的SubscriberMethod(Method、注解信息等)的列表,本來通過Class就可以獲取到Method集合,而Class加載后一般不會進行垃圾回收,所以不存在內存泄漏問題,必要時也可以通過clear方法清空這個Map

2.post
post方法的調用分析比較簡單,這邊直接跳到調用鏈后面的方法:postSingleEventForEventType,該方法根據訂閱方法入參的參數類型獲取到所有需要通知的方法,最后遍歷調用postToSubscription方法進行線程調度:
// 根據post傳入的對象,進行一對多消息分發(fā)
private boolean postSingleEventForEventType(Object event, PostingThreadState postingState, Class<?> eventClass) {
CopyOnWriteArrayList<Subscription> subscriptions;
synchronized (this) {
// 根據訂閱方法入參的參數類型獲取到所有需要通知的方法
subscriptions = subscriptionsByEventType.get(eventClass);
}
if (subscriptions != null && !subscriptions.isEmpty()) {
for (Subscription subscription : subscriptions) {
postingState.event = event;
postingState.subscription = subscription;
boolean aborted = false;
try {
// 消息分發(fā)
postToSubscription(subscription, event, postingState.isMainThread);
aborted = postingState.canceled;
} finally {
postingState.event = null;
postingState.subscription = null;
postingState.canceled = false;
}
if (aborted) {
break;
}
}
return true;
}
return false;
}
postToSubscription方法會根據注解信息,來進行線程調度,invokeSubscriber方法則會直接利用反射調用訂閱方法,而mainThreadPoster對象就是Handler,發(fā)送消息后,最終通過Handler的handleMessage方法中在主線程調用invokeSubscriber方法,backgroundPoster、asyncPoster則會通過線程池,在子線程中異步執(zhí)行
// 線程調度
private void postToSubscription(Subscription subscription, Object event, boolean isMainThread) {
switch (subscription.subscriberMethod.threadMode) {
case POSTING:
invokeSubscriber(subscription, event);
break;
case MAIN:
if (isMainThread) {
invokeSubscriber(subscription, event);
} else {
mainThreadPoster.enqueue(subscription, event);
}
break;
case BACKGROUND:
if (isMainThread) {
backgroundPoster.enqueue(subscription, event);
} else {
invokeSubscriber(subscription, event);
}
break;
case ASYNC:
asyncPoster.enqueue(subscription, event);
break;
default:
throw new IllegalStateException("Unknown thread mode: " + subscription.subscriberMethod.threadMode);
}
}
// 反射調用方法
void invokeSubscriber(Subscription subscription, Object event) {
try {
subscription.subscriberMethod.method.invoke(subscription.subscriber, event);
} catch (InvocationTargetException e) {
handleSubscriberException(subscription, event, e.getCause());
} catch (IllegalAccessException e) {
throw new IllegalStateException("Unexpected exception", e);
}
}

EventBus的核心設計模式是訂閱者模式,利用注解獲取到一個對象中所有的訂閱方法并緩存,最后利用反射調用這些方法
二、手寫實現EventBus
1.定義注解和線程模式
注解:
/**
* 該注解表示方法需要被EventBus訂閱
*/
@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
public @interface Subscribe {
ThreadMode threadMode() default ThreadMode.POST;
}
線程模式:
public enum ThreadMode {
Main,//主線程執(zhí)行訂閱方法
POST,//默認
BACKGROUND//后臺線程訂閱執(zhí)行
}
2.定義方法包裝類:SubscriberMethod
/**
* 方法包裝類
*/
class SubscriberMethod {
final Method method;//方法
final ThreadMode threadMode;//線程模式
final Class<?> eventType;//方法的入參參數類型
public SubscriberMethod(Method method, ThreadMode threadMode, Class<?> eventType) {
this.method = method;
this.threadMode = threadMode;
this.eventType = eventType;
}
}
3.定義對象 方法包裝類:Subscription
/**
* 對象 方法包裝類
*/
class Subscription {
// 注冊對象
final Object subscriber;
// 方法包裝類
final SubscriberMethod subscriberMethod;
public Subscription(Object subscriber, SubscriberMethod subscriberMethod) {
this.subscriber = subscriber;
this.subscriberMethod = subscriberMethod;
}
}
4.定義EventBus類實現功能
public class EventBus {
private static final int BRIDGE = 0x40;
private static final int SYNTHETIC = 0x1000;
private static final int MODIFIERS_IGNORE = Modifier.ABSTRACT | Modifier.STATIC | BRIDGE | SYNTHETIC;
//以注冊對象的Class為key,以包裝方法類SubscriberMethod列表為value
private final Map<Class<?>, List<SubscriberMethod>> METHOD_CACHE = new ConcurrentHashMap<>();
//以訂閱方法的唯一入參參數:Class為key,以Subscription(包裝了注冊對象和SubscriberMethod(包裝了Method、注解信息等))列表為value
private final Map<Class<?>, CopyOnWriteArrayList<Subscription>> subscriptionsByEventType = new HashMap<>();
//以注冊對象為key,以訂閱方法的參數類型Class集合為value
private final Map<Object, List<Class<?>>> typesBySubscriber = new HashMap<>();
//線程池
private final ExecutorService executorService;
private static final EventBus instance = new EventBus();
public static EventBus getInstance() {
return instance;
}
private EventBus() {
executorService = Executors.newCachedThreadPool();
}
/**
* 訂閱
*
* @param obj
*/
public void register(Object obj) {
Class<?> clz = obj.getClass();
//查找訂閱方法
List<SubscriberMethod> methods = findSubscriberMethod(clz);
//加入訂閱map
if (!methods.isEmpty()) {
synchronized (this) {
subscribe(obj, methods);
}
}
}
/**
* 將訂閱方法加入兩個map緩存
*
* @param obj
* @param methods
*/
private void subscribe(Object obj, List<SubscriberMethod> methods) {
//對象中訂閱方法的入參類型集合
Set<Class<?>> ObjEventTypes = new HashSet<>();
for (SubscriberMethod subscriberMethod : methods) {
//根據入參類型獲取Subscription列表
Class<?> eventType = subscriberMethod.eventType;
CopyOnWriteArrayList<Subscription> subscriptions = subscriptionsByEventType.get(eventType);
if (subscriptions == null) {
subscriptions = new CopyOnWriteArrayList<>();
subscriptionsByEventType.put(eventType, subscriptions);
}
//包裝成Subscription存入
subscriptions.add(new Subscription(obj, subscriberMethod));
//記錄入參類型
ObjEventTypes.add(eventType);
}
// 將對象:入參類型列表存入map
if (typesBySubscriber.get(obj) == null) {
List<Class<?>> eventTypes = new ArrayList<>(ObjEventTypes);
typesBySubscriber.put(obj, eventTypes);
}
}
/**
* 根據class查找訂閱方法
*
* @param clz
* @return
*/
private List<SubscriberMethod> findSubscriberMethod(Class<?> clz) {
List<SubscriberMethod> methods = METHOD_CACHE.get(clz);
if (methods != null) {
return methods;
}
// 利用反射獲取
return getMethodsByReflect(clz);
}
/**
* 根據class反射獲取訂閱方法
*
* @param clz
* @return
*/
private List<SubscriberMethod> getMethodsByReflect(Class<?> clz) {
// 訂閱方法集合
List<SubscriberMethod> subscriberMethods = new ArrayList<>();
while (clz != null && clz != Object.class) {
Method[] declaredMethods = clz.getDeclaredMethods();
for (Method method : declaredMethods) {
Subscribe annotation = method.getAnnotation(Subscribe.class);
if (!Modifier.isPublic(method.getModifiers()) ||
(method.getModifiers() & MODIFIERS_IGNORE) != 0 || annotation == null)
continue;
//獲取線程模式
ThreadMode threadMode = annotation.threadMode();
//獲取入參參數類型
Class<?>[] parameterTypes = method.getParameterTypes();
if (parameterTypes.length != 1) throw new RuntimeException("入參參數必須為一個");
Class<?> eventType = parameterTypes[0];
// 構造方法包裝類
SubscriberMethod subscriberMethod = new SubscriberMethod(method, threadMode, eventType);
//加入緩存
subscriberMethods.add(subscriberMethod);
}
clz = clz.getSuperclass();
}
if (subscriberMethods.isEmpty()) throw new RuntimeException("該類及其父類沒有任何沒有訂閱方法");
//加入緩存
METHOD_CACHE.put(clz, subscriberMethods);
return subscriberMethods;
}
/**
* 取消訂閱
*
* @param obj
*/
public synchronized void unregister(Object obj) {
//獲取該對象訂閱方法的所有參數類型
List<Class<?>> eventTypes = typesBySubscriber.get(obj);
if (eventTypes != null) {
for (Class<?> eventType : eventTypes) {
//獲取Subscription集合
CopyOnWriteArrayList<Subscription> subscriptions = subscriptionsByEventType.get(eventType);
for (int i = 0; i < subscriptions.size(); i++) {
if (subscriptions.get(i).subscriber == obj) {//判斷是不是該對象的方法
subscriptions.remove(i);//移除
i--;
}
}
}
// 移除
typesBySubscriber.remove(obj);
}
}
/**
* 發(fā)送消息
*
* @param event
*/
public void post(Object event) {
//獲取到訂閱方法
CopyOnWriteArrayList<Subscription> subscriptions = subscriptionsByEventType.get(event.getClass());
if (subscriptions != null && !subscriptions.isEmpty()) {
for (Subscription subscription : subscriptions) {
postToSubscription(event, subscription.subscriber, subscription.subscriberMethod);
}
}
}
/**
* 線程調度
*
* @param event
* @param subscriberMethod
*/
private void postToSubscription(Object event, Object subscriber, SubscriberMethod subscriberMethod) {
// 當前線程是否是主線程
boolean isMainThread = Looper.myLooper() == Looper.getMainLooper();
switch (subscriberMethod.threadMode) {
case Main:
if (isMainThread) {//主線程直接調用
invokeSubscriberMethod(event, subscriber, subscriberMethod);
} else {
// 偷懶
new Handler(Looper.getMainLooper()).post(new Runnable() {
@Override
public void run() {
invokeSubscriberMethod(event, subscriber, subscriberMethod);
}
});
}
break;
case POST:
invokeSubscriberMethod(event, subscriber, subscriberMethod);
break;
case BACKGROUND:
// 偷懶
getExecutorService().execute(new Runnable() {
@Override
public void run() {
invokeSubscriberMethod(event, subscriber, subscriberMethod);
}
});
break;
}
}
/**
* 反射調用訂閱方法
*
* @param event
* @param subscriber
* @param subscriberMethod
*/
private void invokeSubscriberMethod(Object event, Object subscriber, SubscriberMethod subscriberMethod) {
try {
subscriberMethod.method.invoke(subscriber, event);
} catch (IllegalAccessException e) {
e.printStackTrace();
} catch (InvocationTargetException e) {
e.printStackTrace();
}
}
/**
* 獲取線程池
*
* @return ExecutorService
*/
public ExecutorService getExecutorService() {
return executorService;
}
}
最后寫個按鈕,測試下

Activity中的代碼如下:
public class MainActivity extends AppCompatActivity {
@Subscribe
public void onMessage(String msg) {
Log.i("aruba", "onMessage:" + msg + "Thread:" + Thread.currentThread().getName());
}
@Subscribe(threadMode = ThreadMode.BACKGROUND)
public void onMessageBackground(String msg) {
Log.i("aruba", "onMessageBackground:" + msg + "Thread:" + Thread.currentThread().getName());
}
public void onMessageNotSubscribe(String msg) {
Log.i("aruba", "onMessageNotSubscribe:" + msg);
}
@Override
protected void onCreate(Bundle savedInstanceState) {
super.onCreate(savedInstanceState);
setContentView(R.layout.activity_main);
EventBus.getInstance().register(this);
}
@Override
protected void onPause() {
super.onPause();
EventBus.getInstance().unregister(this);
}
public void sendMessage(View view) {
EventBus.getInstance().post("Hello EventBus");
}
}
結果:
