ActiveMQ的消息分組機(jī)制會(huì)造成OOM嗎?

猜想

AcitveMQ依賴Message中JMSXGroupID屬性來做消息分組,那分組信息會(huì)怎么維護(hù)呢?剛開始猜想是維護(hù)在類似Map的數(shù)據(jù)結(jié)構(gòu)中?但是如果消息分組很細(xì),例如按照訂單號(hào)來分組,勢(shì)必MQ需要維護(hù)大量的分組信息,那這樣不是會(huì)導(dǎo)致OOM?

消息分組接口

找了一下,最后發(fā)現(xiàn)ActiveMQ使用MessageGroupMap接口定義消息分組的相關(guān)邏輯

 * Represents a map of JMSXGroupID values to consumer IDs
 */
public interface MessageGroupMap {
    void put(String groupId, ConsumerId consumerId);
    ConsumerId get(String groupId);
    ConsumerId removeGroup(String groupId);
    MessageGroupSet removeConsumer(ConsumerId consumerId);
    void removeAll();
    /**
     * @return  a map of group names and associated consumer Id
     */
    Map<String,String> getGroups();
    String getType();
    void setDestination(Destination destination);
}

MQ 自身有三個(gè)MessageGroupMap接口的實(shí)現(xiàn)類,包括CachedMessageGroupMap,MessageGroupHashBucket,SimpleMessageGroupMap三個(gè)。


MessageGroupMap實(shí)現(xiàn)類

然后通過監(jiān)控MQ的堆內(nèi)存,發(fā)現(xiàn)實(shí)際上是使用了CachedMessageGroupMap


ActiveMQ-內(nèi)存監(jiān)控

看看CachedMessageGroupMap的實(shí)現(xiàn)

/**
 * A simple implementation which tracks every individual GroupID value in a LRUCache
 */
public class CachedMessageGroupMap implements MessageGroupMap {
    private final LRUMap<String, ConsumerId> cache;
    private final int maximumCacheSize;
    Destination destination;

    CachedMessageGroupMap(int size){
      cache = new LRUMap<String, ConsumerId>(size) {
          @Override
          public boolean removeEldestEntry(final Map.Entry eldest) {
              boolean remove = super.removeEldestEntry(eldest);
              if (remove) {
                  if (destination != null) {
                      for (Subscription s : destination.getConsumers()) {
                        if (s.getConsumerInfo().getConsumerId().equals(eldest.getValue())) {
                            s.getConsumerInfo().decrementAssignedGroupCount(destination.getActiveMQDestination());
                            break;
                          }
                      }
                  }
              }
              return remove;
          }
      };
      maximumCacheSize = size;
    }
    public synchronized void put(String groupId, ConsumerId consumerId) {
        cache.put(groupId, consumerId);
    }

    public synchronized ConsumerId get(String groupId) {
        return cache.get(groupId);
    }

    public synchronized ConsumerId removeGroup(String groupId) {
        return cache.remove(groupId);
    }

    public synchronized MessageGroupSet removeConsumer(ConsumerId consumerId) {
        SimpleMessageGroupSet ownedGroups = new SimpleMessageGroupSet();
        Map<String,ConsumerId> map = new HashMap<String, ConsumerId>();
        map.putAll(cache);
        for (Iterator<String> iter = map.keySet().iterator(); iter.hasNext();) {
            String group = iter.next();
            ConsumerId owner = map.get(group);
            if (owner.equals(consumerId)) {
                ownedGroups.add(group);
            }
        }
        for (String group:ownedGroups.getUnderlyingSet()){
            cache.remove(group);
        }
        return ownedGroups;
    }


    @Override
    public synchronized void removeAll(){
        cache.clear();
        if (destination != null) {
            for (Subscription s : destination.getConsumers()) {
                s.getConsumerInfo().clearAssignedGroupCount(destination.getActiveMQDestination());
            }
        }
    }

    @Override
    public synchronized Map<String, String> getGroups() {
        Map<String,String> result = new HashMap<String,String>();
        for (Map.Entry<String,ConsumerId>entry: cache.entrySet()){
            result.put(entry.getKey(),entry.getValue().toString());
        }
        return result;
    }

    @Override
    public String getType() {
        return "cached";
    }

    public int getMaximumCacheSize(){
        return maximumCacheSize;
    }

    public String toString() {
        return "message groups: " + cache.size();
    }

    public void setDestination(Destination destination) {
        this.destination = destination;
    }
}

主要關(guān)注點(diǎn)還是在CachedMessageGroupMap中存儲(chǔ)的結(jié)構(gòu)——LRUMap,原來是利用LinkedHashMap實(shí)現(xiàn)了一個(gè)LRUMap,如果分組信息過多,會(huì)剔除最老的分組信息。

public class LRUMap<K,V> extends LinkedHashMap<K,V>{

    protected static final float DEFAULT_LOAD_FACTOR = (float) 0.75;
    protected static final int DEFAULT_INITIAL_CAPACITY = 5000;
    private static final long serialVersionUID = -9179676638408888162L;

    private int maximumSize;

    public LRUMap(int maximumSize) {
        this(DEFAULT_INITIAL_CAPACITY, DEFAULT_LOAD_FACTOR, true, maximumSize);
    }

    public LRUMap(int maximumSize, boolean accessOrder) {
        this(DEFAULT_INITIAL_CAPACITY, DEFAULT_LOAD_FACTOR, accessOrder, maximumSize);
    }

    public LRUMap(int initialCapacity, float loadFactor, boolean accessOrder, int maximumSize) {
        super(initialCapacity, loadFactor, accessOrder);
        this.maximumSize = maximumSize;
    }

    protected boolean removeEldestEntry(Map.Entry<K, V> eldest) {
        return size() > maximumSize;
    }
}

end~~

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

友情鏈接更多精彩內(nèi)容