程序员

每日一学:你知道如何在 RabbitMQ 中实现 Work queues工作队列模

作者:admin 2021-07-04 我要评论

一、模式说明 Work Queues 与入门程序的简单模式相比多了一个或一些消费端多个消费端共同消费同一个队列中的消息。 应用场景 对于任务过重或任务较多情况使用工...

在说正事之前,我要推荐一个福利:你还在原价购买阿里云、腾讯云、华为云服务器吗?那太亏啦!来这里,新购、升级、续费都打折,能够为您省60%的钱呢!2核4G企业级云服务器低至69元/年,点击进去看看吧>>>)

一、模式说明


Work Queues 与入门程序的简单模式相比,多了一个或一些消费端,多个消费端共同消费同一个队列中的消息。

应用场景 :对于任务过重或任务较多情况使用工作队列可以提高任务处理的速度。

二、代码

Work Queues 与入门程序的 简单模式 的代码是几乎一样的:可以完全复制,并复制多一个消费者进行多个消费者同时消费消息的测试。

①生产者

package com.itheima.rabbitmq.work; 

import com.itheima.rabbitmq.util.ConnectionUtil; 
import com.rabbitmq.client.Channel; 
import com.rabbitmq.client.Connection; 
import com.rabbitmq.client.ConnectionFactory; 

public class Producer { 
	static final String QUEUE_NAME = "work_queue"; 
	public static void main(String[] args) throws Exception { 
	
		//创建连接 
		Connection connection = ConnectionUtil.getConnection(); 
		
		// 创建频道 
		Channel channel = connection.createChannel(); 
		
		// 声明(创建)队列 
		/**
		 * 参数1:队列名称 
		 * 参数2:是否定义持久化队列 
		 * 参数3:是否独占本次连接 
		 * 参数4:是否在不使用的时候自动删除队列 
		 * 参数5:队列其它参数 
		*/ 
		channel.queueDeclare(QUEUE_NAME, true, false, false, null); 
		for (int i = 1; i <= 30; i++) { 
			// 发送信息 
			String message = "你好;小兔子!work模式--" + i; 
			
			/**
			 * 参数1:交换机名称,如果没有指定则使用默认Default Exchage 
			 * 参数2:路由key,简单模式可以传递队列名称 
			 * 参数3:消息其它属性 
			 * 参数4:消息内容 
			*/ 
			channel.basicPublish("", QUEUE_NAME, null, message.getBytes()); 
			System.out.println("已发送消息:" + message); 
		}
		// 关闭资源 
		channel.close(); connection.close(); 
	} 
}

②消费者1

package com.itheima.rabbitmq.work; 

import com.itheima.rabbitmq.util.ConnectionUtil; 
import com.rabbitmq.client.*;
import java.io.IOException; 

public class Consumer1 { 
	public static void main(String[] args) throws Exception { 
		Connection connection = ConnectionUtil.getConnection(); 
		
		// 创建频道 
		Channel channel = connection.createChannel(); 
		
		// 声明(创建)队列 
		/**
		 * 参数1:队列名称 
		 * 参数2:是否定义持久化队列 
		 * 参数3:是否独占本次连接 
		 * 参数4:是否在不使用的时候自动删除队列 
		 * 参数5:队列其它参数 
		*/ 
		channel.queueDeclare(Producer.QUEUE_NAME, true, false, false, null); 
		
		//一次只能接收并处理一个消息 
		channel.basicQos(1); 
		
		//创建消费者;并设置消息处理 
		DefaultConsumer consumer = new DefaultConsumer(channel){ 
		
			@Override 
			/**
			 * consumerTag 消息者标签,在channel.basicConsume时候可以指定 
			 * envelope 消息包的内容,可从中获取消息id,消息routingkey,交换机,消息和重传标志(收到消息失败后是否需要重新发送) 
			 * properties 属性信息 
			 * body 消息 
			*/ 
			public void handleDelivery(String consumerTag, Envelope envelope, 
					AMQP.BasicProperties properties, byte[] body) throws IOException { 
				try {
				
					//路由key 
					System.out.println("路由key为:" + envelope.getRoutingKey()); 
					
					//交换机 
					System.out.println("交换机为:" + envelope.getExchange()); 
					
					//消息id 
					System.out.println("消息id为:" + envelope.getDeliveryTag()); 
					
					//收到的消息 
					System.out.println("消费者1-接收到的消息为:" + new String(body, "utf-8")); 
					Thread.sleep(1000); 
					
					//确认消息 
					channel.basicAck(envelope.getDeliveryTag(), false); 
				} 
				catch (InterruptedException e) { 
					e.printStackTrace(); 
				} 
			} 
		};
		//监听消息 
		/**
		 * 参数1:队列名称
		 * 参数2:是否自动确认,设置为true为表示消息接收到自动向mq回复接收到了,mq接收到回复会删除消息,设置为false则需要手动确认 
		 * 参数3:消息接收到后回调 
		*/ 
		channel.basicConsume(Producer.QUEUE_NAME, false, consumer); 
	} 
}

③消费者2

package com.itheima.rabbitmq.work; 

import com.itheima.rabbitmq.util.ConnectionUtil; 
import com.rabbitmq.client.*; 
import java.io.IOException; 

public class Consumer2 { 
	public static void main(String[] args) throws Exception { 
		Connection connection = ConnectionUtil.getConnection(); 
		
		// 创建频道 
		Channel channel = connection.createChannel(); 
		
		// 声明(创建)队列 
		/**
		 * 参数1:队列名称 
		 * 参数2:是否定义持久化队列 
		 * 参数3:是否独占本次连接 
		 * 参数4:是否在不使用的时候自动删除队列 
		 * 参数5:队列其它参数 
		*/ 
		channel.queueDeclare(Producer.QUEUE_NAME, true, false, false, null); 
		
		//一次只能接收并处理一个消息 
		channel.basicQos(1); 
		
		//创建消费者;并设置消息处理 
		DefaultConsumer consumer = new DefaultConsumer(channel){ 
		
			@Override 
			/**
			 * consumerTag 消息者标签,在channel.basicConsume时候可以指定 
			 * envelope 消息包的内容,可从中获取消息id,消息routingkey,交换机,消息和重传标志(收到消息失败后是否需要重新发送) 
			 * properties 属性信息 
			 * body 消息 
			*/ 
			public void handleDelivery(String consumerTag, Envelope envelope, 
					AMQP.BasicProperties properties, byte[] body) throws IOException { 
				try {
					//路由key 
					System.out.println("路由key为:" + envelope.getRoutingKey()); 
					
					//交换机 
					System.out.println("交换机为:" + envelope.getExchange()); 
					
					//消息id 
					System.out.println("消息id为:" + envelope.getDeliveryTag());
					
					//收到的消息 
					System.out.println("消费者2-接收到的消息为:" + new String(body, "utf-8")); 
					Thread.sleep(1000); 
					
					//确认消息 
					channel.basicAck(envelope.getDeliveryTag(), false); 
				} catch (InterruptedException e) { 
					e.printStackTrace(); 
				} 
			} 
		};
		//监听消息 
		/**
		 * 参数1:队列名称 
		 * 参数2:是否自动确认,设置为true为表示消息接收到自动向mq回复接收到了,mq接收到回复会删除消息,设置为false则需要手动确认 
		 * 参数3:消息接收到后回调 
		*/ 
		channel.basicConsume(Producer.QUEUE_NAME, false, consumer); 
	} 
}

三、测试

启动两个消费者,然后再启动生产者发送消息;到IDEA的两个消费者对应的控制台查看是否竞争性的接收到消息。


总结

在一个队列中如果有多个消费者,那么消费者之间对于同一个消息的关系是竞争的关系。

;原文链接:https://blog.csdn.net/Java_Caiyo/article/details/115689824

版权声明:本文转载自网络,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。本站转载出于传播更多优秀技术知识之目的,如有侵权请联系QQ/微信:153890879删除

相关文章
  • 单链表全解(从零开始)——数据结构(C语

    单链表全解(从零开始)——数据结构(C语

  • 每天学一个jquery插件-做一个便签

    每天学一个jquery插件-做一个便签

  • 跟踪多个 Git 远程仓库

    跟踪多个 Git 远程仓库

  • ARM版Windows 10用户狂喜 微软全新补丁

    ARM版Windows 10用户狂喜 微软全新补丁

腾讯云代理商
海外云服务器