猜想
AcitveMQ依賴Message中JMSXGroupID屬性來做消息分組,那分組信息會(huì)怎么維護(hù)呢?剛開始猜想是維護(hù)在類似Map的數(shù)據(jù)結(jié)構(gòu)中?但是如果消息分組很細(xì),例如按照訂單號(hào)來分組,勢(shì)必MQ需要維護(hù)大量的分組信息,那這樣不是會(huì)導(dǎo)致OOM?
消息分組接口
找了一下,最后發(fā)現(xiàn)ActiveMQ使用MessageGroupMap接口定義消息分組的相關(guān)邏輯
* Represents a map of JMSXGroupID values to consumer IDs
*/
public interface MessageGroupMap {
void put(String groupId, ConsumerId consumerId);
ConsumerId get(String groupId);
ConsumerId removeGroup(String groupId);
MessageGroupSet removeConsumer(ConsumerId consumerId);
void removeAll();
/**
* @return a map of group names and associated consumer Id
*/
Map<String,String> getGroups();
String getType();
void setDestination(Destination destination);
}
MQ 自身有三個(gè)MessageGroupMap接口的實(shí)現(xiàn)類,包括CachedMessageGroupMap,MessageGroupHashBucket,SimpleMessageGroupMap三個(gè)。

MessageGroupMap實(shí)現(xiàn)類
然后通過監(jiān)控MQ的堆內(nèi)存,發(fā)現(xiàn)實(shí)際上是使用了CachedMessageGroupMap

ActiveMQ-內(nèi)存監(jiān)控
看看CachedMessageGroupMap的實(shí)現(xiàn)
/**
* A simple implementation which tracks every individual GroupID value in a LRUCache
*/
public class CachedMessageGroupMap implements MessageGroupMap {
private final LRUMap<String, ConsumerId> cache;
private final int maximumCacheSize;
Destination destination;
CachedMessageGroupMap(int size){
cache = new LRUMap<String, ConsumerId>(size) {
@Override
public boolean removeEldestEntry(final Map.Entry eldest) {
boolean remove = super.removeEldestEntry(eldest);
if (remove) {
if (destination != null) {
for (Subscription s : destination.getConsumers()) {
if (s.getConsumerInfo().getConsumerId().equals(eldest.getValue())) {
s.getConsumerInfo().decrementAssignedGroupCount(destination.getActiveMQDestination());
break;
}
}
}
}
return remove;
}
};
maximumCacheSize = size;
}
public synchronized void put(String groupId, ConsumerId consumerId) {
cache.put(groupId, consumerId);
}
public synchronized ConsumerId get(String groupId) {
return cache.get(groupId);
}
public synchronized ConsumerId removeGroup(String groupId) {
return cache.remove(groupId);
}
public synchronized MessageGroupSet removeConsumer(ConsumerId consumerId) {
SimpleMessageGroupSet ownedGroups = new SimpleMessageGroupSet();
Map<String,ConsumerId> map = new HashMap<String, ConsumerId>();
map.putAll(cache);
for (Iterator<String> iter = map.keySet().iterator(); iter.hasNext();) {
String group = iter.next();
ConsumerId owner = map.get(group);
if (owner.equals(consumerId)) {
ownedGroups.add(group);
}
}
for (String group:ownedGroups.getUnderlyingSet()){
cache.remove(group);
}
return ownedGroups;
}
@Override
public synchronized void removeAll(){
cache.clear();
if (destination != null) {
for (Subscription s : destination.getConsumers()) {
s.getConsumerInfo().clearAssignedGroupCount(destination.getActiveMQDestination());
}
}
}
@Override
public synchronized Map<String, String> getGroups() {
Map<String,String> result = new HashMap<String,String>();
for (Map.Entry<String,ConsumerId>entry: cache.entrySet()){
result.put(entry.getKey(),entry.getValue().toString());
}
return result;
}
@Override
public String getType() {
return "cached";
}
public int getMaximumCacheSize(){
return maximumCacheSize;
}
public String toString() {
return "message groups: " + cache.size();
}
public void setDestination(Destination destination) {
this.destination = destination;
}
}
主要關(guān)注點(diǎn)還是在CachedMessageGroupMap中存儲(chǔ)的結(jié)構(gòu)——LRUMap,原來是利用LinkedHashMap實(shí)現(xiàn)了一個(gè)LRUMap,如果分組信息過多,會(huì)剔除最老的分組信息。
public class LRUMap<K,V> extends LinkedHashMap<K,V>{
protected static final float DEFAULT_LOAD_FACTOR = (float) 0.75;
protected static final int DEFAULT_INITIAL_CAPACITY = 5000;
private static final long serialVersionUID = -9179676638408888162L;
private int maximumSize;
public LRUMap(int maximumSize) {
this(DEFAULT_INITIAL_CAPACITY, DEFAULT_LOAD_FACTOR, true, maximumSize);
}
public LRUMap(int maximumSize, boolean accessOrder) {
this(DEFAULT_INITIAL_CAPACITY, DEFAULT_LOAD_FACTOR, accessOrder, maximumSize);
}
public LRUMap(int initialCapacity, float loadFactor, boolean accessOrder, int maximumSize) {
super(initialCapacity, loadFactor, accessOrder);
this.maximumSize = maximumSize;
}
protected boolean removeEldestEntry(Map.Entry<K, V> eldest) {
return size() > maximumSize;
}
}
end~~