大家好,我是指北君。
(資料圖片僅供參考)
今天指北君帶領大家接著學習RabbitMQ,了解RabbitMQ的五大通信模型之一的Work模型;接下來還會有關于RabbitMQ的系列教程,對你有幫助的話記得關注哦~
回顧上一篇文章中,簡單的介紹了一下RabbitMQ,以及安裝和hello world。
有的小伙伴留言說看不懂其中的方法參數,這里先解釋一下幾個基本的方法參數。
// 聲明隊列方法channel.queueDeclare(QUEUE_NAME, false, false, false, null);/** * param1:queue 隊列的名字 * param2:durable 是否持久化;比如現在發送到隊列里面的消息,如果沒有持久化,重啟這個隊列后數 據會丟失(false) true:重啟之后數據依然在 * param3:exclusive 是否排外(是否是當前連接的專屬隊列),排外的意思是: * 1:連接關閉之后 這個隊列是否自動刪除(false:不自動刪除) * 2:是否允許其他通道來進行訪問這個數據(false:不允許) * param4:autoDelete 是否自動刪除 * 就是當最后一個連接斷開的時候,是否自動刪除這個隊列(false:不刪除) * param5:arguments(map) 聲明隊列的時候,附帶的一些參數 */
// 發送數據到隊列channel.basicPublish("", QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN, "第一個隊列消息...".getBytes());/** * param1:exchange 交換機 沒有就設置為 "" 值就可以了 * param2:routingKey 路由的key 現在沒有設置key,直接使用隊列的名字 * param3:BasicProperties 發送數據到隊列的時候,是否要帶一些參數。 * MessageProperties.PERSISTENT_TEXT_PLAIN表示沒有帶任何參數 * param4:body 向隊列中發送的消息數據 */Work模型
work模型稱為工作隊列或者競爭消費者模式,多個消費者消費的數據之和才是原來隊列中的所有數據,適用于流量的削峰。
演示寫個簡單的測試:
生產者
public class Producer { private static final String QUEUE_NAME = "queue_work_1"; public static void main(String[] args) throws IOException, TimeoutException { Connection connection = ConnectionUtils.getConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME, false, false, false, null); for (int i = 0; i < 100; i++) { channel.basicPublish("", QUEUE_NAME, null, ("work模型:" + i).getBytes()); } channel.close(); connection.close(); }}
消費者
// 消費者1public class Consumer { private static final String QUEUE_NAME = "queue_work_1"; public static void main(String[] args) throws IOException, TimeoutException { Connection connection = ConnectionUtils.getConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME, false, false, false, null); // channel.basicQos(0, 1, false); DefaultConsumer defaultConsumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println(System.currentTimeMillis() + "消費者1接收到信息:" + new String(body)); channel.basicAck(envelope.getDeliveryTag(), false); } }; channel.basicConsume(QUEUE_NAME, false, defaultConsumer); }}
// 消費者2public class Consumer2 { private static final String QUEUE_NAME = "queue_work_1"; public static void main(String[] args) throws IOException, TimeoutException { Connection connection = ConnectionUtils.getConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME, false, false, false, null); // channel.basicQos(0, 1, false); DefaultConsumer defaultConsumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println(System.currentTimeMillis() + "消費者2接收到信息:" + new String(body)); channel.basicAck(envelope.getDeliveryTag(), false); // 這里加了個延遲,表示處理業務時間 try { Thread.sleep(200); } catch (InterruptedException e) { e.printStackTrace(); } } }; channel.basicConsume(QUEUE_NAME, false, defaultConsumer); }}
結果
可以看出來:100條消息,消費者之間是平分的,消費者1 幾乎是瞬間完成,消費者2 則是慢慢吞吞的運行完畢,消費者1大量時間處于空閑狀態,消費者2則一直忙碌。這顯然是不適用于實際開發中。
我們需要遵從一個原則,就是能者多勞,消費越快的人,消費的越多;
現在我們把消費者1和2的代碼中// channel.basicQos(0, 1, false);這行代碼取消注釋,再次運行;
現在的結果就比較符合能者多勞,雖然你干的多,但是工資是一樣的呀~
work模型的一個主要的方法是basicQos();這里也解釋一下其參數:
// 設置限流機制channel.basicQos(0, 1, false);/** * param1: prefetchSize,消息本身的大小 如果設置為0 那么表示對消息本身的大小不限制 * param2: prefetchCount,告訴rabbitmq不要一次性給消費者推送大于N個消息 * param3:global,是否將上面的設置應用于整個通道,false表示只應用于當前消費者 */小結
本文到這里就結束了,主要介紹了RabbitMQ通信模型中的work模型,適用于限流、削峰等應用場景。
X 關閉
X 關閉
- 15G資費不大降!三大運營商誰提供的5G網速最快?中國信通院給出答案
- 2聯想拯救者Y70發布最新預告:售價2970元起 迄今最便宜的驍龍8+旗艦
- 3亞馬遜開始大規模推廣掌紋支付技術 顧客可使用“揮手付”結賬
- 4現代和起亞上半年出口20萬輛新能源汽車同比增長30.6%
- 5如何讓居民5分鐘使用到各種設施?沙特“線性城市”來了
- 6AMD實現連續8個季度的增長 季度營收首次突破60億美元利潤更是翻倍
- 7轉轉集團發布2022年二季度手機行情報告:二手市場“飄香”
- 8充電寶100Wh等于多少毫安?鐵路旅客禁止、限制攜帶和托運物品目錄
- 9好消息!京東與騰訊續簽三年戰略合作協議 加強技術創新與供應鏈服務
- 10名創優品擬通過香港IPO全球發售4100萬股 全球發售所得款項有什么用處?
