一、JMS的API結(jié)構(gòu)
二、一個(gè)JMS應(yīng)用的基本步驟
1:創(chuàng)建一個(gè)JMS connection factory
2:通過connection factory來創(chuàng)建JMS connection
3:啟動(dòng)JMS connection
4:通過connection創(chuàng)建JMS session
5:創(chuàng)建JMS destination
6:創(chuàng)建JMS producer,或者創(chuàng)建JMS message,并設(shè)置destination
7:創(chuàng)建JMS consumer,或者是注冊一個(gè)JMS message listener
8:發(fā)送或者接受JMS message(s)
9:關(guān)閉所有的JMS資源(connection, session, producer, consumer等)
三、topic
3.1 關(guān)于持久化和非持久化消息
持久化消息:
這是ActiveMQ的默認(rèn)傳送模式,此模式保證這些消息只被傳送一次和成功使用一次。對于這些消息,可靠性是優(yōu)先考慮的因素。
可靠性的另一個(gè)重要方面是確保持久性消息傳送至目標(biāo)后,消息服務(wù)在向消費(fèi)者傳送它們之前不會(huì)丟失這些消息。
這意味著在持久性消息傳送至目標(biāo)時(shí),消息服務(wù)將其放入持久性數(shù)據(jù)存儲。如果消息服務(wù)由于某種原因?qū)е率?,它可以恢?fù)
此消息并將此消息傳送至相應(yīng)的消費(fèi)者。雖然這樣增加了消息傳送的開銷,但卻增加了可靠性。
非持久化消息:
保證這些消息最多被傳送一次。對于這些消息,可靠性并非主要的考慮因素。 此模式并不要求持久性的數(shù)據(jù)存儲,也不保證
消息服務(wù)由于某種原因?qū)е率『笙⒉粫?huì)丟失。 有兩種方法指定傳送模式:
1.使用setDeliveryMode 方法,這樣所有的消息都采用此傳送模式; 如:
producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
2.使用send 方法為每一條消息設(shè)置傳送模式
3.2 非持久的Topic消息
3.2.1 消息的發(fā)送
基本跟前面發(fā)送隊(duì)列信息是一樣的,只是把創(chuàng)建Destination的地方,由創(chuàng)建隊(duì)列替換成創(chuàng)建Topic,例如:
Destination destination = session.createTopic("my-topic");
3.2.1 消息的接收
1:必須要接收方在線,然后客戶端再發(fā)送信息,接收方才能接收到消息
2:同樣把創(chuàng)建Destination的地方,由創(chuàng)建隊(duì)列替換成創(chuàng)建Topic,例如:
Destination destination = session.createTopic("my-topic");
3:由于不知道客戶端發(fā)送多少信息,因此改成while循環(huán)的方式了,例如:
Message message = consumer.receive();
while(message!=null) {
TextMessage txtMsg = (TextMessage)message;
System.out.println("收到消 息:" + txtMsg.getText());
message = consumer.receive(1000L);
}
3.3 持久的Topic消息
3.3.1 消息的發(fā)送****
/**持久的*/
public void test2() throws Exception {
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("liuy","123456","tcp://192.168.91.8:61616");
Connection connection = connectionFactory.createConnection();
Session session = connection.createSession(Boolean.TRUE,Session.AUTO_ACKNOWLEDGE);
Destination destination = session.createTopic("my-topic");
MessageProducer producer = session.createProducer(destination);
producer.setDeliveryMode(DeliveryMode.PERSISTENT); // 設(shè)置DeliveryMode.PERSISTENT模式
connection.start();
for (int i = 0; i < 3; i++) {
TextMessage message = session.createTextMessage("message22--" + i);
producer.send(message);
}
session.commit();
session.close();
connection.close();
}
1:要用持久化訂閱,發(fā)送消息者要用DeliveryMode.PERSISTENT模式發(fā)現(xiàn),在連接之前設(shè)定
2:一定要設(shè)置完成后,再start這個(gè)connection
3.3.2 消息的接收
public void test2() throws Exception {
ConnectionFactory cf = new ActiveMQConnectionFactory("liuy","123456","tcp://192.168.91.8:61616");
Connection connection = cf.createConnection();
connection.setClientID("cc1");
final Session session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
Topic topic = session.createTopic("my-topic");
TopicSubscriber ts = session.createDurableSubscriber(topic, "T1");
connection.start();
Message message = ts.receive();
while(message!=null) {
TextMessage txtMsg = (TextMessage)message;
System.out.println("收到消息:" + txtMsg.getText());
message = ts.receive(1000L);
}
session.commit();
session.close();
connection.close();
}
1:需要在連接上設(shè)置消費(fèi)者id,用來識別消費(fèi)者
2:需要?jiǎng)?chuàng)建TopicSubscriber來訂閱
3:要設(shè)置好了過后再start 這個(gè) connection
4:一定要先運(yùn)行一次,等于向消息服務(wù)中間件注冊這個(gè)消費(fèi)者,然后再運(yùn)行客戶端發(fā)送信息,這個(gè)時(shí)候,無論消費(fèi)者是否在線,
都會(huì)接收到,不在線的話,下次連接的時(shí)候,會(huì)把沒有收過的消息都接收下來。
歡迎工作一到五年的Java工程師朋友們加入Java高并發(fā)QQ群: 957734884,群內(nèi)提供免費(fèi)的Java架構(gòu)學(xué)習(xí)資料(里面有高可用、高并發(fā)、高性能及分布式、Jvm性能調(diào)優(yōu)、Spring源碼,MyBatis,Netty,Redis,Kafka,Mysql,Zookeeper,Tomcat,Docker,Dubbo,Nginx等多個(gè)知識點(diǎn)的架構(gòu)資料)合理利用自己每一分每一秒的時(shí)間來學(xué)習(xí)提升自己,不要再用"沒有時(shí)間“來掩飾自己思想上的懶惰!趁年輕,使勁拼,給未來的自己一個(gè)交代!


