RabbitMQ部分API詳解(含demo)
demo配置文件rabbitmq.properties
host=192.168.221.138
port=5672
username=guest
password=guest
讀取配置文件內(nèi)容
public class RabbitConfig {
public static String host ;
public static int port ;
public static String username;
public static String password ;
static{
try {
ClassLoader classLoader=RabbitConfig.class.getClassLoader();
InputStream is=classLoader.getResourceAsStream("rabbitmq.properties");
Properties properties=new Properties();
properties.load(is);
host = properties.getProperty("host");
port = Integer.parseInt(properties.getProperty("port"));
username =properties.getProperty("username");
password =properties.getProperty("password");
}catch (IOException e) {
e.printStackTrace();
}
}
}
創(chuàng)建連接工廠
public class RabbitUtil {
public static Connection GetRabbitConnection() {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost(RabbitConfig.host);
factory.setPort(RabbitConfig.port);
factory.setUsername(RabbitConfig.username);
factory.setPassword(RabbitConfig.password);
Connection connection = null;
try {
connection = factory.newConnection();
} catch (Exception e) {
e.printStackTrace();
}
return connection;
}
}
創(chuàng)建provider
public class FanoutProvider {
public static void main(String[] args) throws Exception {
// 創(chuàng)建一個連接
Connection connection = RabbitUtil.GetRabbitConnection();
// 創(chuàng)建通道
Channel channel = connection.createChannel();
//聲明exchange
//exchange:exchange名稱
//type:exchange類型
//durable:exchange是否持久化(不代表消息持久化)
//autoDelete:已經(jīng)沒有消費者了,服務(wù)器是否可以刪除該Exchange
//exchangeDeclare(String exchange, String type, boolean durable, boolean autoDelete,Map<String, Object> arguments)
channel.exchangeDeclare("zx_fanout", BuiltinExchangeType.FANOUT, true,false,null);
//聲明queue
//queue:queue名稱
//durable:queue是否持久化
//exclusive:是否為當(dāng)前連接的專用隊列,在連接斷開后,會自動刪除該隊列
//autodelete:當(dāng)沒有任何消費者使用時,自動刪除該隊列
//queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete,Map<String, Object> arguments)
channel.queueDeclare("zx_queue_f1", true, false, false, null);
channel.queueDeclare("zx_queue_f2", true, false, false, null);
channel.queueDeclare("zx_queue_f3", true, false, false, null);
//queue:queue名稱
//exchange:exchange名稱
//routingKey:路由鍵;用來綁定queue和exchange
//queueBind(String queue, String exchange, String routingKey)
channel.queueBind("zx_queue_f1","zx_fanout","");
channel.queueBind("zx_queue_f2","zx_fanout","");
channel.queueBind("zx_queue_f3","zx_fanout","");
//開啟confirm機制
// (即rabbitmq-server收到生產(chǎn)端的消息,會給生產(chǎn)端發(fā)送一個確認,如果沒法送確認,生產(chǎn)端重新發(fā)送消息到server中)
channel.confirmSelect();
for (int i = 1 ; i <= 100 ;i++){
String message = "fanout:廣播的第 "+ i +" 條消息";
//exchange
//routingKey:路由鍵
//mandatory:true=如果exchange根據(jù)自身類型和消息routeKey無法找到一個符合條件的queue,那么會調(diào)用basic.return方法將消息返還給生產(chǎn)者。false=出現(xiàn)上述情形broker會直接將消息扔掉
//immediate:true=如果exchange在將消息route到queue(s)時發(fā)現(xiàn)對應(yīng)的queue上沒有消費者,那么這條消息不會放入隊列中。false=當(dāng)與消息routeKey關(guān)聯(lián)的所有queue(一個或多個)都沒有消費者時,該消息會通過basic.return方法返還給生產(chǎn)者。
//BasicProperties:消息的基本屬性,例如路由頭等(MessageProperties.PERSISTENT_TEXT_PLAIN:消息的持久化)
//basicPublish(String exchange, String routingKey, boolean mandatory, boolean immediate, BasicProperties props, byte[] body)
channel.basicPublish("zx_fanout","",false,false,MessageProperties.PERSISTENT_TEXT_PLAIN,message.getBytes("utf-8"));
System.out.println(message);
}
// 關(guān)閉連接
channel.close();
connection.close();
}
}
創(chuàng)建consumer
public class Consumer1 {
public static void main(String[] args) throws Exception {
// 創(chuàng)建一個連接
Connection conn = RabbitUtil.GetRabbitConnection();
// 創(chuàng)建通道
Channel channel = conn.createChannel();
//聲明exchange
//exchange:exchange名稱
//type:exchange類型
//durable:exchange是否持久化(不代表消息持久化)
//autoDelete:已經(jīng)沒有消費者了,服務(wù)器是否可以刪除該Exchange
//exchangeDeclare(String exchange, String type, boolean durable, boolean autoDelete,Map<String, Object> arguments)
channel.exchangeDeclare("zx_fanout", BuiltinExchangeType.FANOUT,true,false,null);
//聲明queue
//queue:queue名稱
//durable:queue是否持久化
//exclusive:是否為當(dāng)前連接的專用隊列,在連接斷開后,會自動刪除該隊列
//autodelete:當(dāng)沒有任何消費者使用時,自動刪除該隊列
//queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete,Map<String, Object> arguments)
channel.queueDeclare("zx_queue_f1", true, false, false, null);
//queue:queue名稱
//exchange:exchange名稱
//routingKey:路由鍵;用來綁定queue和exchange
//queueBind(String queue, String exchange, String routingKey)
channel.queueBind("zx_queue_f1","zx_fanout","");
//uack的消息最多有幾條
channel.basicQos(200);
//消息消費
//queue:綁定隊列名稱
//autoAck:是否自動ack,如果不自動ack,需要使用channel.ack、channel.nack、channel.basicReject 進行消息應(yīng)答
//basicConsume(String queue, boolean autoAck, Consumer callback)
channel.basicConsume("zx_queue_f1",false,new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
try {
String message = new String(body,"utf-8");
System.out.println(Consumer1.class.getSimpleName()+"--接收消息--"+message);
} catch (UnsupportedEncodingException e) {
//deliveryTag:該消息的唯一ID
//multiple:是否批量. true:將一次性拒絕所有小于deliveryTag的消息。
//requeue:被拒絕的是否重新入隊列
//channel.basicNack(long deliveryTag, boolean multiple, boolean requeue);
channel.basicNack(envelope.getDeliveryTag(),false,true);
//deliveryTag:該消息的index
//requeue:被拒絕的是否重新入隊列
//channel.basicReject(delivery.getEnvelope().getDeliveryTag(), false);
//channel.basicNack 與 channel.basicReject 的區(qū)別在于basicNack可以拒絕多條消息,而basicReject一次只能拒絕一條消息
} finally {
//basicAck(long deliveryTag, boolean multiple)
//deliveryTag:該消息的唯一ID
//multiple:是否批量.true:將一次性ack所有小于deliveryTag的消息。
channel.basicAck(envelope.getDeliveryTag(), false);
}
}
});
}
}
demo的github地址