觀察者模式

觀察者模式 - 發(fā)布訂閱模式

1. 什么是觀察者模式

  • 在對(duì)象間建立一個(gè)"一對(duì)多"的依賴. 當(dāng)一個(gè)對(duì)象的狀態(tài)發(fā)生變化, 其他依賴的所有對(duì)象都會(huì)自動(dòng)收到通知.
    被依賴的對(duì)象叫做被觀察者(Observable) , 依賴的對(duì)象叫做觀察者 (Observer). 同樣含義的稱呼有: pub-sub, 生產(chǎn)者消費(fèi)者, eventemitter-eventlistener
    觀察者模式根據(jù)場(chǎng)景不同有不同實(shí)現(xiàn):
  • 實(shí)現(xiàn)方式
    • 進(jìn)程內(nèi)的

      • 同步阻塞方式
      • 異步非阻塞模式 (Guava EventBus)
    • 跨進(jìn)程的
      rabbitmq

2. 經(jīng)典實(shí)現(xiàn): List[Observer]

場(chǎng)景: 用戶注冊(cè)后, 進(jìn)行消息推動(dòng)和用戶通知. 用戶注冊(cè)這個(gè)個(gè)動(dòng)作做為報(bào)觀察者, 消息推送和用戶通知作為觀察者, 調(diào)用者調(diào)用者

/**
 * 觀察者模式: 將觀察者和被觀察者解耦
 * 觀察者接口
 */
interface RegObserver {
    void handleRegSuccess(long userId);
}

/**
 * 負(fù)責(zé)推送消息的觀察者
 */
class RegPromotionObserver implements RegObserver {
    private PromotionService promotionService; // 依賴注入

    @Override
    public void handleRegSuccess(long userId) {
        promotionService.issueNewUserExperienceCash(userId);
    }
}

/**
 * 負(fù)責(zé)通知用戶的觀察者
 */
class RegNotificationObserver implements RegObserver {
    private NotificationService notificationService;

    @Override
    public void handleRegSuccess(long userId) {
        notificationService.sendInboxMessage(userId, "Welcome...");
    }
}

class UserController {
    private UserService userService; // 依賴注入
    private List<RegObserver> regObservers = new ArrayList<>();

    // 一次性設(shè)置好,之后也不可能動(dòng)態(tài)的修改
    public void setRegObservers(List<RegObserver> observers) {
        regObservers.addAll(observers);
    }

    public Long register(String telephone, String password) {
        //省略輸入?yún)?shù)的校驗(yàn)代碼
        //省略u(píng)serService.register()異常的try-catch代碼
        long userId = userService.register(telephone, password);  // 真實(shí)的注冊(cè)邏輯

        //todo 下面是同步阻塞方式實(shí)現(xiàn)觀察者和非觀察者, 如果在線程池中執(zhí)行, 就是異步非阻塞模式
        for (RegObserver observer : regObservers) {
            observer.handleRegSuccess(userId);
        }

        return userId;
    }
}


interface PromotionService {
    void issueNewUserExperienceCash(long userId);
}

interface NotificationService{
    void sendInboxMessage(long userId, String msg);
}

interface UserService {
    long register(String telephone, String password);
}

3. Guava EventBus

EventBus 翻譯為“事件總線”,它提供了實(shí)現(xiàn)觀察者模式的骨架代碼。利用 EventBus 框架實(shí)現(xiàn)的觀察者模式,跟從零開始編寫的觀察者模式相比,從大的流程上來(lái)說(shuō),實(shí)現(xiàn)思路大致一樣,都需要:

  • 定義 Observer 類, 但不需要定義 Observer 接口,任意類型的對(duì)象都可以注冊(cè)到 EventBus 中,通過(guò) @Subscribe 注解來(lái)標(biāo)明類中哪個(gè)函數(shù)可以接收被觀察者發(fā)送的消息。
  • 并且通過(guò) register() 函數(shù)注冊(cè) Observer,
  • 也都需要通過(guò)調(diào)用某個(gè)函數(shù)(比如,EventBus 中的 post() 函數(shù))來(lái)給 Observer 發(fā)送消息(在 EventBus 中消息被稱作事件 event)

Guava EventBus 框架跟經(jīng)典的觀察者模式的不同之處在于,當(dāng)我們調(diào)用 post() 函數(shù)發(fā)送消息的時(shí)候,不是只把消息發(fā)送給該消息類型對(duì)應(yīng)的觀察者,而是發(fā)送給消息類型可匹配的觀察者。所謂可匹配指的是,能接收的消息類型是發(fā)送消息(post 函數(shù)定義中的 event)類型的父類

/**
 * 調(diào)用者
 */
class UserControllerGuava {
    private UserService userService; // 依賴注入

    private EventBus eventBus;
    private static final int DEFAULT_EVENTBUS_THREAD_POOL_SIZE = 20;

    public UserControllerGuava() {
        //eventBus = new EventBus(); // 同步阻塞模式
        eventBus = new AsyncEventBus(Executors.newFixedThreadPool(DEFAULT_EVENTBUS_THREAD_POOL_SIZE)); // 異步非阻塞模式
    }

    public void setRegObservers(List<Object> observers) {
        for (Object observer : observers) {
            eventBus.register(observer);
        }
    }

    public Long register(String telephone, String password) {
        //省略輸入?yún)?shù)的校驗(yàn)代碼
        //省略u(píng)serService.register()異常的try-catch代碼
        long userId = userService.register(telephone, password);

        eventBus.post(userId);

        return userId;
    }
}

/**
 * 觀察者一
 */
class RegPromotionObserverGuava {
    private PromotionService promotionService; // 依賴注入

    @Subscribe
    public void handleRegSuccess(Long userId) {
        promotionService.issueNewUserExperienceCash(userId);
    }
}

/**
 * 觀察者二
 */
class RegNotificationObserverGuava {
    private NotificationService notificationService;

    @Subscribe
    public void handleRegSuccess(Long userId) {
        notificationService.sendInboxMessage(userId, "...");
    }
}

4. 自己實(shí)現(xiàn)一個(gè) EventBUs

整個(gè)小框架的代碼實(shí)現(xiàn)包括 5 個(gè)類:EventBus、AsyncEventBus、Subscribe、ObserverAction、ObserverRegistry。接下來(lái),我們依次來(lái)看下這 5 個(gè)類。

  • @Subscribe: 是一個(gè)注解,用于標(biāo)明觀察者中的哪個(gè)函數(shù)可以接收消息。
@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.METHOD)
@Beta
public @interface Subscribe {}
  • ObserverAction: 表示對(duì)觀察者方法的調(diào)用 (target 表示觀察者類,method 表示方法。)
public class ObserverAction {
    private Object target;
    private Method method;

    public ObserverAction(Object target, Method method) {
        this.target = Preconditions.checkNotNull(target);
        this.method = method;
        this.method.setAccessible(true);
    }

    public void execute(Object event) { // event是method方法的參數(shù)
        try {
            method.invoke(target, event);
        } catch (InvocationTargetException | IllegalAccessException e) {
            e.printStackTrace();
        }
    }
}
  • ObserverRegistryObserverRegistry 類就是前面講到的 Observer 注冊(cè)表,記錄被觀察的事件類型和 OberserverAction 映射
    這是一個(gè)復(fù)雜的類, 包括注冊(cè)監(jiān)聽者, 查找監(jiān)聽事件類型和其父類類型的 action,
public class ObserverRegistry {
    private ConcurrentMap<Class<?>, CopyOnWriteArraySet<ObserverAction>> registry = new ConcurrentHashMap<>();

    /** 注冊(cè)的事件動(dòng)作 */
    public void register(Object observer) {
        Map<Class<?>, Collection<ObserverAction>> observerActions = findAllObserverActions(observer);
        for (Map.Entry<Class<?>, Collection<ObserverAction>> entry : observerActions.entrySet()) {
            Class<?> eventType = entry.getKey();
            Collection<ObserverAction> eventActions = entry.getValue();
            CopyOnWriteArraySet<ObserverAction> registeredEventActions = registry.get(eventType);
            if (registeredEventActions == null) {
                registry.putIfAbsent(eventType, new CopyOnWriteArraySet<>());
                registeredEventActions = registry.get(eventType);
            }
            registeredEventActions.addAll(eventActions);
        }
    }

    /** 在緩存 map 中查找該監(jiān)聽對(duì)象和該監(jiān)聽對(duì)象的父類 */
    public List<ObserverAction> getMatchedObserverActions(Object event) {
        List<ObserverAction> matchedObservers = new ArrayList<>();
        Class<?> postedEventType = event.getClass();
        for (Map.Entry<Class<?>, CopyOnWriteArraySet<ObserverAction>> entry : registry.entrySet()) {
            // map 中對(duì)應(yīng)的事件類型
            Class<?> eventType = entry.getKey();
            Collection<ObserverAction> eventActions = entry.getValue();
            if (postedEventType.isAssignableFrom(eventType)) {
                matchedObservers.addAll(eventActions);
            }
        }
        return matchedObservers;
    }

    /** 在 @Subscribe 注解方法只有一個(gè)參數(shù), 表示被監(jiān)聽的對(duì)象. 監(jiān)聽列表 */
    private Map<Class<?>, Collection<ObserverAction>> findAllObserverActions(Object observer) {
        Map<Class<?>, Collection<ObserverAction>> observerActions = new HashMap<>();
        Class<?> clazz = observer.getClass();
        for (Method method : getAnnotatedMethods(clazz)) {
            Class<?>[] parameterTypes = method.getParameterTypes();
            Class<?> eventType = parameterTypes[0];
            if (!observerActions.containsKey(eventType)) {
                observerActions.put(eventType, new ArrayList<>());
            }
            observerActions.get(eventType).add(new ObserverAction(observer, method));
        }
        return observerActions;
    }

    /** 獲取某個(gè)類下面被 @Subscribe 注解的方法 */
    private List<Method> getAnnotatedMethods(Class<?> clazz) {
        List<Method> annotatedMethods = new ArrayList<>();
        for (Method method : clazz.getDeclaredMethods()) {
            if (method.isAnnotationPresent(Subscribe.class)) {
                Class<?>[] parameterTypes = method.getParameterTypes();
                // 校驗(yàn)參數(shù)個(gè)數(shù)只有一個(gè)
                Preconditions.checkArgument(parameterTypes.length == 1,
                        "Method %s has @Subscribe annotation but has %s parameters."
                                + "Subscriber methods must have exactly 1 parameter.",
                        method, parameterTypes.length);
                annotatedMethods.add(method);
            }
        }
        return annotatedMethods;
    }
}
  • EventBus: 同步事件總線. 對(duì)事件的調(diào)用方

public class EventBus {
  private Executor executor;
  private ObserverRegistry registry = new ObserverRegistry();

  public EventBus() {
    this(MoreExecutors.directExecutor());
  }

  protected EventBus(Executor executor) {
    this.executor = executor;
  }

  public void register(Object object) {
    registry.register(object);
  }

  public void post(Object event) {
    List<ObserverAction> observerActions = registry.getMatchedObserverActions(event);
    for (ObserverAction observerAction : observerActions) {
      executor.execute(new Runnable() {
        @Override
        public void run() {
          observerAction.execute(event);
        }
      });
    }
  }
}
  • AsyncEventBus: 異步事件總線. 需要傳入一個(gè)線程池

public class AsyncEventBus extends EventBus {
  public AsyncEventBus(Executor executor) {
    super(executor);
  }
}
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容