脚本宝典收集整理的这篇文章主要介绍了RabbitMQ详解,脚本宝典觉得挺不错的,现在分享给大家,也给大家做个参考。
订单系统:用户下单后,订单系统完成持久化处理,将消息写入消息队列,返回用户,下单成功;
库存系统:订阅下单的消息,获取下单信息,库存系统根据下单信息,再进行库存操作;
假如:下单的时候,库存系统不能正常运行,也不会影响下单,因为下单后,订单系统写入消息队列就不再关心其他的后续操作了,实现了订单系统和库存系统的应用解耦;
所以说,消息队列是典型的:生产者消费者模型
生产者不断的向消息队列中生产消息,消费者不断的从队列中获取消息
因为消息的生产和消费都是异步的,而且只关心消息的发送和接收,没有业务逻辑的入侵,这样就实现了生产者和消费者的解耦
重点掌握
broker:消息队列服务器实体
Virtual Host:虚拟主机
Exchange:交换器(路由)
Queue:消息队列
Banding:绑定,用于消息队列和交换机之间的关联。
Channel:通道(信道)
Connection:网络连接,比如一个TCP连接。
Publisher:消息的生产者,也是一个向交换器发布消息的客户端应用程序。
Consumer:消息的消费者,表示一个从消息队列中取得消息的客户端应用程序。
Message:消息
erlang下载:https://dl.bintray.com/rabbitmq-erlang/rpm/erlang socat下载:http://repo.iotti.biz/CentOS/7/x86_64/socat-1.7.3.2-5.el7.lux.x86_64.rpm RabbitMQ下载:https://www.rabbitmq.com/install-rpm.html#downloads
[root@localhost opt]# rpm -ivh erlang-21.3.8.16-1.el7.x86_64.rpm
[root@localhost opt]# rpm -ivh socat-1.7.3.2-5.el7.lux.x86_64.rpm
[root@localhost opt]# rpm -ivh rabbitmq-server-3.8.6-1.el7.noarch.rpm
[root@localhost opt]# rabbitmq-plugins enable rabbitmq_management
[root@localhost opt]# Systemctl start rabbitmq-server.service
[root@localhost opt]# systemctl status rabbitmq-server.service
[root@localhost opt]# systemctl restart rabbitmq-server.service
[root@localhost opt]# systemctl stop rabbitmq-server.service
[root@localhost opt]# ps -ef | grep rabbitmq
关闭防火墙:systemctl stop firewalld
浏览器输入:http://ip:15672
默认帐号密码:guest,guest用户默认不允许远程连接
创建账号
[root@localhost opt]# rabbitmqctl add_user soldier 123456
设置用户角色
[root@localhost opt]# rabbitmqctl set_user_tags laosun administrator
设置用户权限
[root@localhost opt]# rabbitmqctl set_PErmissions -p "/" laosun ".*" ".*" ".*"
查看当前用户和角色
[root@localhost opt]# rabbitmqctl list_users
查看当前用户和角色
[root@localhost opt]# rabbitmqctl change_password laosun 123123
管理界面介绍
<dependencies>
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.7.3</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-LOG4j12</artifactId>
<version>1.7.25</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
<version>3.9</version>
</dependency>
</dependencies>
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.Target=System.out
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss} %m%n
log4j.appender.file=org.apache.log4j.FileAppender
log4j.appender.file.File=rebbitmq.log
log4j.appender.file.layout=org.apache.log4j.PatternLayout
log4j.appender.file.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss} %l %m%n
log4j.rootLogger=debug, stdout,file
package util;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
/**
* @BelongsProject: lagou-rabbitmq
* @Author: GuoAn.Sun
* @Description: 专门与RabbitMQ获得连接
*/
public class ConnectionUtil {
public static Connection getConnection() throws Exception{
//1.创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
//2.在工厂对象中设置MQ的连接信息(ip,port,vhost,username,password)
factory.setHost("192.168.204.141");
factory.setPort(5672);
factory.setVirtualHost("/lagou");
factory.setUsername("laosun");
factory.setPassword("123123");
//3.通过工厂获得与MQ的连接
Connection connection = factory.newConnection();
return connection;
}
public static void main(String[] args) throws Exception{
Connection connection = getConnection();
System.out.println("connection = " + connection);
connection.close();;
}
}
5种消息模型,大体分为两类:
1和2属于点对点 3、4、5属于发布订阅模式(一对多)
点对点模式:P2P(point to point)模式包含三个角色:
发布订阅模式:publish(Pub)/subscribe(Sub)
下面引用官网的一段介绍:
Introduction
RabbitMQ is a message broker: it accepts and forwards messages. You can think about it as a post office: when you put the mail that you want posting in a post box, you can be sure that Mr. or Ms. Mailperson will eventually deliver the mail to your recipient. In this analogy, RabbitMQ is a post box, a post office and a postman.
译文:RabbitMQ是一个消息代理:它接收和转发消息。你可以把它想象成一个邮局:当你把你想要寄的邮件放到一个邮箱里,你可以确定邮递员先生或女士最终会把邮件送到你的收件人那里。在这个类比中,RabbitMQ是一个邮箱、一个邮局和一个邮递员。
RabbitMQ本身只是接收,存储和转发消息,并不会对信息进行处理!
类似邮局,处理信件的应该是收件人而不是邮局!
package simplest;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import util.ConnectionUtil;
/**
* @BelongsProject: lagou-rabbitmq
* @Author: GuoAn.Sun
* @Description: 消息生产者
*/
public class Sender {
public static void main(String[] args) throws Exception {
String msg = "老孙:Hello,RabbitMQ!";
// 1.获得连接
Connection connection = ConnectionUtil.getConnection();
// 2.在连接中创建通道(信道)
Channel channel = connection.createChannel();
// 3.创建消息队列(1,2,3,4,5)
/*
参数1:队列的名称
参数2:队列中的数据是否持久化
参数3:是否排外(是否支持扩展,当前队列只能自己用,不能给别人用)
参数4:是否自动删除(当队列的连接数为0时,队列会销毁,不管队列是否还存保存数据)
参数5:队列参数(没有参数为null)
*/
channel.queueDeclare("queue1",false,false,false,null);
// 4.向指定的队列发送消息(1,2,3,4)
/*
参数1:交换机名称,当前是简单模式,也就是P2P模式,没有交换机,所以名称为""
参数2:目标队列的名称
参数3:设置消息的属性(没有属性则为null)
参数4:消息的内容(只接收字节数组)
*/
channel.basicPublish("","queue1",null,msg.getBytes());
System.out.println("发送:" + msg);
// 5.释放资源
channel.close();
connection.close();
}
}
启动生产者,即可前往管理端查看队列中的信息,会有一条信息没有处理和确认
package simplest;
import com.rabbitmq.client.*;
import util.ConnectionUtil;
import java.io.IOException;
/**
* @BelongsProject: lagou-rabbitmq
* @Author: GuoAn.Sun
* @Description: 消息接收者
*/
public class Recer {
public static void main(String[] args) throws Exception {
// 1.获得连接
Connection connection = ConnectionUtil.getConnection();
// 2.获得通道(信道)
Channel channel = connection.createChannel();
// 3.从信道中获得消息
DefaultConsumer consumer = new DefaultConsumer(channel){
@override //交付处理(收件人信息,包裹上的快递标签,协议的配置,消息)
public void handleDelivery(String consumerTag, envelope envelope,
AMQP.BasicProperties properties, byte[] body) throws IOException {
// body就是从队列中获取的消息
String s = new String(body);
System.out.println("接收 = " + s);
}
};
// 4.监听队列 true:自动消息确认
channel.basicConsume("queue1",true,consumer);
}
}
启动消费者,前往管理端查看队列中的信息,所有信息都已经处理和确认,显示0
// false:手动消息确认
channel.basicConsume("queue1", false, consumer);
public class RecerByACK {
public static void main(String[] args) throws Exception {
// 1.获得连接
Connection connection = ConnectionUtil.getConnection();
// 2.获得通道(信道)
final Channel channel = connection.createChannel();
// 3.从信道中获得消息
DefaultConsumer consumer = new DefaultConsumer(channel){
@Override //交付处理(收件人信息,包裹上的快递标签,协议的配置,消息)
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body) throws IOException {
// body就是从队列中获取的消息
String s = new String(body);
System.out.println("接收 = " + s);
// 手动确认(收件人信息,是否同时确认多个消息)
channel.basicAck(envelope.getDeliveryTag(),false);
}
};
// 4.监听队列,false:手动消息确认
channel.basicConsume("queue1",false,consumer);
}
}
public class MessageSender {
public static void main(String[] args) throws Exception{
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
// 声明队列(此处为生产者,创建队列)注明出餐口位置,通知大家来排队
channel.queueDeclare("test_work_queue",false,false,false,null);
for(int i = 1;i<=100;i++) {
String msg = "羊肉串 --> "+i;
channel.basicPublish("", "test_work_queue", null, msg.getBytes());
System.out.println("师傅烤好:" + msg);
}
channel.close();
connection.close();
}
}
public class MessageReceiver1 {
static int i = 1; // 记录执行次数
public static void main(String[] args) throws IOException, TimeoutException
{
Connection connection = ConnectionUtil.getConnection();
final Channel channel = connection.createChannel();
// 声明队列(此处为消费者,不是声明创建队列,而且获取,二者代码相同)出餐口排队
channel.queueDeclare("test_work_queue",false,false,false,null);
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[]
body) throws IOException {
String msg = new String(body);
System.out.println("【顾客1】吃掉 " + msg+" ! 共吃【"+i++ +"】串");
// 撸一会,有延迟
try {
Thread.sleep(200);
}catch (InterruptedException e){
}
channel.basicAck(envelope.getDeliveryTag(),false);
}
};
channel.basicConsume("test_work_queue", false, consumer);
}
}
public class MessageReceiver2 {
static int i = 1; // 记录执行次数
public static void main(String[] args) throws IOException, TimeoutException
{
Connection connection = ConnectionUtil.getConnection();
final Channel channel = connection.createChannel();
// 声明队列(此处为消费者,不是声明创建队列,而且获取,二者代码相同)出餐口排队
channel.queueDeclare("test_work_queue",false,false,false,null);
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[]
body) throws IOException {
String msg = new String(body);
System.out.println("【顾客2】吃掉 " + msg+" ! 共吃【"+i++ +"】串");
// 撸一会,有延迟
try {
Thread.sleep(200);
}catch (InterruptedException e){
}
channel.basicAck(envelope.getDeliveryTag(),false);
}
};
channel.basicConsume("test_work_queue", false, consumer);
}
}
公平的分配 您可能已经注意到分派仍然不能完全按照我们的要求工作。例如,如果有两个员工,当所有奇怪的消息都很重,甚至消息都很轻时,一个员工会一直很忙,而另一个人几乎什么工作都不做。好吧,RabbitMQ对此一无所知,它仍然会均匀地分派消息。 这是因为RabbitMQ只在消息进入队列时发送消息。它不查看用户未确认消息的数量。它只是盲目地将每条第n个消息分派给第n个消费者。 为了克服这个问题,我们可以使用设置为prefetchCount = 1的basicQos方法。这告诉RabbitMQ一次不要给一个worker发送一条以上的消息。或者,换句话说,在worker处理并确认前一个消息之前,不要向它发送新消息。相反,它将把它分派到下一个不繁忙的worker。
// 声明队列(此处为消费者,不是声明创建队列,而且获取,二者代码相同)出餐口排队
channel.queueDeclare("test_work_queue",false,false,false,null);
// 可以理解为:快递一个一个送,送完一个再送下一个,速度快的送件就多
channel.basicQos(1);
看官网:
Publish/Subscribe In the previous tutorial we created a work queue. The assumption behind a work queue is that each task is delivered to exactly one worker. In this part we'll do something completely different -- we'll deliver a message to multiple consumers. This pattern is known as "publish/subscribe". To illustrate the pattern, we're going to build a simple logging system. It will consist of two programs -- the First will emit log messages and the second will receive and print them. In our logging system every running copy of the receiver program will get the messages. That way we'll be able to run one receiver and direct the logs to disk; and at the same time we'll be able to run another receiver and see the logs on the screen. Essentially, published log messages are going to be broadcast to all the receivers. 发布-订阅 在上一篇教程中,我们创建了一个工作队列。工作队列背后的假设是,每个任务都被准确地交付给一个工作者。在这一部分中,我们将做一些完全不同的事情——将消息传递给多个消费者。此模式称为“发布/订阅”。 为了演示这个模式,我们将构建一个简单的日志记录系统。它将由两个程序组成——第一个将发送日志消息,第二个将接收和打印它们。 在我们的日志系统中,接收程序的每一个正在运行的副本都将获得消息。这样我们就可以运行一个接收器并将日志指向磁盘;与此同时,我们可以运行另一个接收器并在屏幕上看到日志。 基本上,发布的日志消息将广播到所有接收方。
生活中的案例:就是玩抖音快手,众多粉丝关注一个视频主,视频主发布视频,所有粉丝都可以得到视频通知
public class Sender {
public static void main(String[] args) throws Exception {
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
// 声明路由(路由名,路由类型)
// fanout:不处理路由键(只需要将队列绑定到路由上,发送到路由的消息都会被转发到与该路由绑定的所有队列上)
channel.exchangeDeclare("test_exchange_fanout", "fanout");
String msg = "hello,大家好!";
channel.basicPublish("test_exchange_fanout", "", null, msg.getBytes());
System.out.println("生产者:" + msg);
channel.close();
connection.close();
}
}
public class Recer1 {
public static void main(String[] args) throws Exception {
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
// 声明队列
channel.queueDeclare("test_exchange_fanout_queue_1",false,false,false,null);
// 绑定路由(关注)
/*
参数1:队列名
参数2:交换器名称
参数3:路由key(暂时无用,""即可)
*/
channel.queueBind("test_exchange_fanout_queue_1", "test_exchange_fanout", "");
DefaultConsumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body) throws IOException {
String s = new String(body);
System.out.println("【消费者1】 = " + s);
}
};
// 4.监听队列 true:自动消息确认
channel.basicConsume("test_exchange_fanout_queue_1", true,consumer);
}
}
public class Sender {
public static void main(String[] args) throws Exception {
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
// 声明路由(路由名,路由类型)
// direct:根据路由键进行定向分发消息
channel.exchangeDeclare("test_exchange_direct", "direct");
String msg = "用户注册,【userid=S101】";
channel.basicPublish("test_exchange_direct", "insert", null, msg.getBytes());
System.out.println("[用户系统]:" + msg);
channel.close();
connection.close();
}
}
public class Recer1 {
public static void main(String[] args) throws Exception {
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
// 声明队列
channel.queueDeclare("test_exchange_direct_queue_1",false,false,false,null);
// 绑定路由(如果路由键的类型是 添加,删除,修改 的话,绑定到这个队列1上)
channel.queueBind("test_exchange_direct_queue_1", "test_exchange_direct", "insert");
channel.queueBind("test_exchange_direct_queue_1", "test_exchange_direct", "update");
channel.queueBind("test_exchange_direct_queue_1", "test_exchange_direct", "delete");
DefaultConsumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String s = new String(body);
System.out.println("【消费者1】 = " + s);
}
};
// 4.监听队列 true:自动消息确认
channel.basicConsume("test_exchange_direct_queue_1", true,consumer);
}
}
public class Recer2 {
public static void main(String[] args) throws Exception {
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
// 声明队列
channel.queueDeclare("test_exchange_direct_queue_2",false,false,false,null);
// 绑定路由(如果路由键的类型是 查询 的话,绑定到这个队列2上)
channel.queueBind("test_exchange_direct_queue_2", "test_exchange_direct", "select");
DefaultConsumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body) throws IOException {
String s = new String(body);
System.out.println("【消费者2】 = " + s);
}
};
// 4.监听队列 true:自动消息确认
channel.basicConsume("test_exchange_direct_queue_2", true,consumer);
}
}
quick.orange.rabbit # Q1 Q2
lazy.orange.elephant # Q1 Q2
quick.orange.fox # Q1
lazy.brown.fox # Q2
lazy.pink.rabbit # Q2
quick.brown.fox # 无
orange # 无
quick.orange.male.rabbit # 无
public class Sender {
public static void main(String[] args) throws Exception {
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
// 声明路由(路由名,路由类型)
// topic:模糊匹配的定向分发
channel.exchangeDeclare("test_exchange_topic", "topic");
String msg = "商品降价";
channel.basicPublish("test_exchange_topic", "product.price", null, msg.getBytes());
System.out.println("[用户系统]:" + msg);
channel.close();
connection.close();
}
}
public class Recer1 {
public static void main(String[] args) throws Exception {
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
// 声明队列
channel.queueDeclare("test_exchange_topic_queue_1",false,false,false,null);
// 绑定路由(绑定 用户相关 的消息)
channel.queueBind("test_exchange_topic_queue_1", "test_exchange_topic", "user.#");
DefaultConsumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body) throws IOException {
String s = new String(body);
System.out.println("【消费者1】 = " + s);
}
};
// 4.监听队列 true:自动消息确认
channel.basicConsume("test_exchange_topic_queue_1", true,consumer);
}
}
public class Recer2 {
public static void main(String[] args) throws Exception {
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
// 声明队列
channel.queueDeclare("test_exchange_topic_queue_2",false,false,false,null);
// 绑定路由(绑定 商品和订单相关 的消息)
channel.queueBind("test_exchange_topic_queue_2", "test_exchange_topic", "product.#");
channel.queueBind("test_exchange_topic_queue_2", "test_exchange_topic", "order.#");
DefaultConsumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body) throws IOException {
String s = new String(body);
System.out.println("【消费者2】 = " + s);
}
};
// 4.监听队列 true:自动消息确认
channel.basicConsume("test_exchange_topic_queue_2", true,consumer);
}
}
public class Sender {
public static void main(String[] args) throws Exception {
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
// 声明路由(路由名,路由类型,持久化)
channel.exchangeDeclare("test_exchange_topic", "topic",true);
String msg = "商品降价";
// 发送消息(第三个参数作用是让消息持久化)
channel.basicPublish("test_exchange_topic", "product.price",
MessageProperties.PERSISTENT_TEXT_PLAIN, msg.getBytes());
System.out.println("[用户系统]:" + msg);
channel.close();
connection.close();
}
}
public class Recer1 {
public static void main(String[] args) throws Exception {
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
// 声明队列( 第二个参数为true:支持持久化)
channel.queueDeclare("test_exchange_topic_queue_1",true,false,false,null);
channel.queueBind("test_exchange_topic_queue_1", "test_exchange_topic", "user.#");
DefaultConsumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body) throws IOException {
String s = new String(body);
System.out.println("【消费者1】 = " + s);
}
};
channel.basicConsume("test_exchange_topic_queue_1", true,consumer);
}
}
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbit</artifactId>
<version>2.0.1.RELEASE</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.7.25</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
<version>3.9</version>
</dependency>
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:rabbit="http://www.springframework.org/schema/rabbit"
xsi:schemaLocation="
http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/rabbit
http://www.springframework.org/schema/rabbit/spring-rabbit.xsd">
<!-- 1.配置连接 -->
<rabbit:connection-factory
id="connectionFactory"
host="192.168.154.128"
port="5672"
username="soldier"
password="123456"
virtual-host="/soldier"/>
<!-- 2.配置队列 -->
<rabbit:queue name="test_spring_queue_1"/>
<!-- 3.配置rabbitAdmin:主要用于在Java代码中对理队和队列进行管理,用于创建、绑定、删除队列与交换机,发送消息等 -->
<rabbit:admin connection-factory="connectionFactory"/>
<!-- 4.配置topic类型exchange;队列绑定到交换机 -->
<rabbit:topic-exchange name="spring_topic_exchange">
<rabbit:bindings>
<rabbit:binding queue="test_spring_queue_1" pattern="msg.#"/>
</rabbit:bindings>
</rabbit:topic-exchange>
<!-- 5. 配置消息对象json转换类 -->
<bean id="jsonMessageConverter"
class="org.springframework.amqp.support.converter.Jackson2JsonMessageConverter"/
>
<!-- 6. 配置RabbitTemplate(消息生产者) -->
<rabbit:template id="rabbitTemplate"
connection-factory="connectionFactory"
exchange="spring_topic_exchange"
message-converter="jsonMessageConverter"
/>
</beans>
public class Sender {
public static void main(String[] args) {
// 1.创建spring容器
ClassPathXmlApplicationContext context = new
ClassPathXmlApplicationContext("spring/spring-rabbitmq-producer.xml");
// 2.从容器中获取对象
RabbitTemplate template = context.getBean(RabbitTemplate.class);
// 3.发送消息
Map<String, String> map = new HashMap();
map.put("name", "大佬孙");
map.put("email", "19998539@QQ.com");
template.convertAndSend("msg.user", map);
context.close();
}
}
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:rabbit="http://www.springframework.org/schema/rabbit"
xmlns:context="http://www.springframework.org/schema/context"
xsi:schemaLocation="
http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/rabbit
http://www.springframework.org/schema/rabbit/spring-rabbit.xsd
http://www.springframework.org/schema/context
http://www.springframework.org/schema/context/spring-context.xsd">
<!-- 1. 配置连接 -->
<rabbit:connection-factory
id="connectionFactory"
host="192.168.204.141"
port="5672"
username="laosun"
password="123123"
virtual-host="/lagou"
/>
<!-- 2. 配置队列 -->
<rabbit:queue name="test_spring_queue_1"/>
<!-- 3.配置rabbitAdmin -->
<rabbit:admin connection-factory="connectionFactory"/>
<!-- 4.springIOC注解扫描包-->
<context:component-scan base-package="listener"/>
<!-- 5.配置监听 -->
<rabbit:listener-container connection-factory="connectionFactory">
<rabbit:listener ref="consumerListener" queue-names="test_spring_queue_1" />
</rabbit:listener-container>
</beans>
@Component
public class ConsumerListener implements MessageListener {
// jackson提供序列化和反序列中使用最多的类,用来转换json的
private static final ObjectMapper MAPPER = new ObjectMapper();
public void onMessage(Message message) {
try {
// 将message对象转换成json
JsonNode jsonNode = MAPPER.readTree(message.getBody());
String name = jsonNode.get("name").asText();
String email = jsonNode.get("email").asText();
System.out.println("从队列中获取:【"+name+"的邮箱是:"+email+"】");
} catch (Exception e){
e.printStackTrace();
}
}
}
public class TestRunner {
public static void main(String[] args) throws Exception {
// 获得容器
ClassPathXmlApplicationContext context = new
ClassPathXmlApplicationContext("spring/spring-rabbitmq-consumer.xml");
// 让程序一直运行,别终止
System.in.read();
}
}
在实际场景下,有的生产者发送的消息是必须保证成功发送到消息队列中,那么如何保证成功投递呢?
public class Sender {
public static void main(String[] args) throws Exception {
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare("test_transaction", "topic");
channel.txSelect(); // 开启事务
try {
channel.basicPublish("test_transaction", "product.price", null, "商品降价1".getBytes());
// System.out.println(1/0); // 模拟异常!
channel.basicPublish("test_transaction", "product.price", null, "商品降价2".getBytes());
System.out.println("消息全部发出!");
channel.txCommit(); // 事务提交
}catch (Exception e){
System.out.println("由于系统异常,消息全部撤回!");
channel.txRollback(); // 事务回滚
e.printStackTrace();
}finally {
channel.close();
connection.close();
}
}
}
public class Recer {
public static void main(String[] args) throws Exception {
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
channel.queueDeclare("test_transaction_queue",false,false,false,null);
channel.queueBind("test_transaction_queue", "test_transaction", "product.#");
DefaultConsumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body) throws IOException {
String s = new String(body);
System.out.println("【消费者】 = " + s);
}
};
channel.basicConsume("test_transaction_queue", true,consumer);
}
}
Using standard AMQP 0-9-1, the only way to guarantee that a message isn’t lost is by using transactions – make the channel transactional then for each message or set of messages publish, commit. In this case, transactions are unnecessarily heavyweight and decrease throughput by a factor of 250. To remedy this, a confirmation mechanism was introduced. It mimics the consumeracknowledgements mechanism already present in the protocol. 关键性译文:开启事务性能最大损失超过250倍
<!--1.配置连接,启动生产者确认机制: publisher-confirms="true"-->
<rabbit:connection-factory id="connectionFactory"
host="192.168.204.141"
port="5672"
username="laosun"
password="123123"
virtual-host="/lagou"
publisher-confirms="true"/>
<!--6.配置rabbitmq的模版,添加确认回调处理类:confirm-callback="msgSendConfirmCallback"-->
<rabbit:template id="rabbitTemplate"
connection-factory="connectionFactory"
exchange="spring_topic_exchange"
message-converter="jsonMessageConverter"
confirm-callback="msgSendConfirmCallback"/>
<!--7.确认机制处理类-->
<bean id="msgSendConfirmCallback" class="confirm.MsgSendConfirmCallback"/>
package confirm;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.support.CorrelationData;
import org.springframework.stereotype.Component;
import java.io.IOException;
/**
* @BelongsProject: spring-rabbitmq-producer
* @Author: GuoAn.Sun
* @Description: 确认机制
*/
@Component
public class MsgSendConfirmCallback implements
RabbitTemplate.ConfirmCallback {
public void confirm(CorrelationData correlationData, boolean b, String
s) {
if (b){
System.out.println("消息确认成功!!");
} else {
System.out.println("消息确认失败。。。");
// 如果本条消息一定要发送到队列中,例如下订单消息,我们可以采用消息补发
// 采用递归(固定次数,不可无限)
// 或 redis+定时任务(jdk的timer,或者定时任务框架Quartz)
}
}
}
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.Target=System.out
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss} %m%n
log4j.appender.file=org.apache.log4j.FileAppender
log4j.appender.file.File=rabbitmq.log
log4j.appender.file.layout=org.apache.log4j.PatternLayout
log4j.appender.file.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss} %l %m%n
log4j.rootLogger=debug, stdout,file
public class Sender {
public static void main(String[] args) {
ClassPathXmlApplicationContext context = new
ClassPathXmlApplicationContext("spring/spring-rabbitmq-producer.xml");
RabbitTemplate rabbitTemplate =
context.getBean(RabbitTemplate.class);
Map<String,String> map = new HashMap<String, String>();
map.put("name", "吕布");
map.put("email", "666@qq.com");
// 第一个参数是路由名称,
// 不写,则使用spring容器中创建的路由
// 乱写一个,因为路由名错误导致报错,则进入消息确认失败流程
rabbitTemplate.convertAndSend("x","msg.user",map);
System.out.println("ok");
context.close();
}
}
for(int i = 1;i<=10;i++) {
rabbitTemplate.convertAndSend("msg.user", map);
System.out.println("消息已发出...");
}
<!--5.配置监听-->
<!-- prefetch="3" 一次性消费的消息数量。会告诉 RabbitMQ 不要同时给一个消费者推送多于
N 个消息,一旦有 N 个消息还没有ack,则该 consumer 将阻塞,直到消息被ack-->
<!-- acknowledge-mode: manual 手动确认-->
<rabbit:listener-container connection-factory="connectionFactory"
prefetch="3" acknowledge="manual">
<rabbit:listener ref="consumerListener" queue-
names="test_spring_queue_1" />
</rabbit:listener-container>
// AbstractAdaptableMessageListener用于在spring容器接收到消息后用于处理消息的抽象基类
@Component
public class ConsumerListener extends AbstractAdaptableMessageListener {
// jackson提供序列化和反序列中使用最多的类,用来转换json的
private static final ObjectMapper MAPPER = new ObjectMapper();
public void onMessage(Message message, Channel channel) throws Exception
{
try {
// String str = new String(message.getBody());
// 将message对象转换成json
JsonNode jsonNode = MAPPER.readTree(message.getBody());
String name = jsonNode.get("name").asText();
String email = jsonNode.get("email").asText();
System.out.println("从队列中获取:【"+name+"的邮箱是:"+email+"】");
//确认收到(参数1,参数2)
/*
参数1:RabbitMQ 向该 Channel 投递的这条消息的唯一标识 ID,是一个单调递增的正整数,delivery_tag 的范围仅限于 Channel
参数2:为了减少网络流量,手动确认可以被批处理,当该参数为 true 时,则可以一次性确认 delivery_tag 小于等于传入值的所有消息
*/
long msgId = message.getMessageProperties().getDeliveryTag();
channel.basicAck(msgId , true);
Thread.sleep(3000)
System.out.println("休息三秒然后再接收消息");
} catch (Exception e){
e.printStackTrace();
}
}
}
Time To Live:生存时间、还能活多久,单位毫秒
在这个周期内,消息可以被消费者正常消费,超过这个时间,则自动删除(其实是被称为dead message并投入到死信队列,无法消费该消息)
RabbitMQ可以对消息和队列设置TTL
<!--2.重新配置一个队列,同时,对队列中的消息设置过期时间-->
<rabbit:queue name="test_spring_queue_ttl" auto-declare="true">
<rabbit:queue-arguments>
<entry key="x-message-ttl" value-type="long" value="5000"></entry>
</rabbit:queue-arguments>
</rabbit:queue>
<!--2.配置队列-->
<rabbit:queue name="test_spring_queue_ttl_2">
package test;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.context.support.ClassPathXmlApplicationContext;
import java.util.HashMap;
import java.util.Map;
/**
* @BelongsProject: spring-rabbitmq-producer
* @Author: GuoAn.Sun
* @Description: 生产者
*/
public class Sender2 {
public static void main(String[] args) {
ClassPathXmlApplicationContext context = new
ClassPathXmlApplicationContext("spring/spring-rabbitmq-producer.xml");
RabbitTemplate rabbitTemplate = context.getBean(RabbitTemplate.class);
// 创建消息配置对象
MessageProperties messageProperties = new MessageProperties();
// 设置消息过期时间
messageProperties.setExpiration("6000");
// 创建消息
Message message = new Message("6秒后自动删除".getBytes(),
messageProperties);
// 发送消息
rabbitTemplate.convertAndSend("msg.user", message);
System.out.println("消息已发出...");
context.close();
}
}
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:rabbit="http://www.springframework.org/schema/rabbit"
xsi:schemaLocation="
http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/rabbit
http://www.springframework.org/schema/rabbit/spring-rabbit.xsd">
<!-- 1.配置连接 -->
<rabbit:connection-factory
id="connectionFactory"
host="192.168.154.128"
port="5672"
username="soldier"
password="123456"
virtual-host="/soldier"
publisher-confirms="true"/>
<!--3.配置rabbitAdmin:主要用于在java代码中对队列的管理,用来创建,绑定,删除队列与交换机,发送消息等-->
<rabbit:admin connection-factory="connectionFactory"/>
<!--6.配置rabbitmq的模版-->
<rabbit:template id="rabbitTemplate"
connection-factory="connectionFactory"
exchange="my_exchange"/>
<!--######################################################################################################################-->
<!--死信队列-->
<rabbit:queue name="dlx_queue"/>
<!--定向死信交换机-->
<rabbit:direct-exchange name="dlx_exchange" >
<rabbit:bindings>
<rabbit:binding key="dlx_ttl" queue="dlx_queue"></rabbit:binding>
<rabbit:binding key="dlx_max" queue="dlx_queue"></rabbit:binding>
</rabbit:bindings>
</rabbit:direct-exchange>
<!--测试超时的队列-->
<rabbit:queue name="test_ttl_queue">
<rabbit:queue-arguments>
<!--队列的过期时间ttl为6秒-->
<entry key="x-message-ttl" value-type="long" value="6000"/>
<!--超时 消息 投递给 死信交换机-->
<entry key="x-dead-letter-exchange" value="dlx_exchange"/>
</rabbit:queue-arguments>
</rabbit:queue>
<!--测试超长度的队列-->
<rabbit:queue name="test_max_queue">
<rabbit:queue-arguments>
<!--队列的额定长度(本对列最多装两个消息)-->
<entry key="x-max-length" value-type="long" value="2"/>
<!--如果超出长度,就将消息投递给死信交换机-->
<entry key="x-dead-letter-exchange" value="dlx_exchange"/>
</rabbit:queue-arguments>
</rabbit:queue>
<!--定向测试交换机-->
<rabbit:direct-exchange name="my_exchange" >
<rabbit:bindings>
<rabbit:binding key="dlx_ttl" queue="test_ttl_queue">
</rabbit:binding>
<rabbit:binding key="dlx_max" queue="test_max_queue">
</rabbit:binding>
</rabbit:bindings>
</rabbit:direct-exchange>
</beans>
public class SenderDLX {
public static void main(String[] args) {
ClassPathXmlApplicationContext context = new
ClassPathXmlApplicationContext("spring/spring-rabbitmq-producer-dlx.xml");
RabbitTemplate rabbitTemplate =
context.getBean(RabbitTemplate.class);
// rabbitTemplate.convertAndSend("dlx_ttl", "测试超时".getBytes());
rabbitTemplate.convertAndSend("dlx_max", "测试长度1".getBytes());
rabbitTemplate.convertAndSend("dlx_max", "测试长度2".getBytes());
rabbitTemplate.convertAndSend("dlx_max", "测试长度3".getBytes());
System.out.println("消息已发出...");
context.close();
}
}
沿用上面死信队列案例的超时测试,超时时间改为订单关闭时间即可
<!-- 监听死信队列 -->
<rabbit:listener-container connection-factory="connectionFactory" prefetch="3" acknowledge="manual">
<rabbit:listener ref="consumerListener" queue-names="dlx_queue"/>
</rabbit:listener-container>
rabbitmq有3种模式,但集群模式是2种。详细如下:
前置条件:准备两台linux,并安装好rabbitmq
127.0.0.1 A localhost localhost.localdomain localhost4
localhost4.localdomain4
::1 A localhost localhost.localdomain localhost6
localhost6.localdomain6
192.168.204.141 A
192.168.204.142 B
2号服务器:
127.0.0.1 B localhost localhost.localdomain localhost4
localhost4.localdomain4
::1 B localhost localhost.localdomain localhost6
localhost6.localdomain6
192.168.204.141 A
192.168.204.142 B
[root@A opt]# scp /VAR/lib/rabbitmq/.erlang.cookie 192.168.204.142:/var/lib/rabbitmq
修改cookie文件,要重启服务器,reboot
[root@A ~]# systemctl stop firewalld
[root@A ~]# systemctl start rabbitmq-server
[root@B ~]# rabbitmqctl stop_app
[root@B ~]# rabbitmqctl join_cluster rabbit@A
[root@B ~]# rabbitmqctl start_app
[root@B ~]# rabbitmqctl cluster_status
搭建集群结构之后,之前创建的交换机、队列、用户都属于单一结构,在新的集群环境中是不能用的
所以在新的集群中重新手动添加用户即可(任意节点添加,所有节点共享)
[root@A ~]# rabbitmqctl add_user soldier 123456
[root@A ~]# rabbitmqctl set_user_tags soldier administrator
[root@A ~]# rabbitmqctl set_permissions -p "/" soldier ".*" ".*" ".*"
注意:当节点脱离集群还原成单一结构后,交换机,队列和用户等数据 都会重新回来
将所有队列设置为镜像队列,即队列会被复制到各个节点,各个节点状态一致
语法:set_policy {name} {pattern} {definition}
name:策略名,可自定义
pattern:队列的匹配模式(正则表达式)
definition:镜像定义,包括三个部分ha-mode, ha-params, ha-sync-mode
ha-mode:(High Available,高可用)模式,指明镜像队列的模式,有效值为all/exactly/nodes,当前策略模式为all,即复制到所有节点,包含新增节点
all:表示在集群中所有的节点上进行镜像
exactly:表示在指定个数的节点上进行镜像,节点的个数由ha-params指定
nodes:表示在指定的节点上进行镜像,节点名称通过ha-params指定
ha-params:ha-mode模式需要用到的参数
ha-sync-mode:进行队列中消息的同步方式,有效值为automatic和manual
[root@A ~]# rabbitmqctl set_policy xall "^" '{"ha-mode":"all"}'
OSI:(Open System Interconnection:开放式系统互联 是把网络通信的工作分为7层,分别是物理层,数据链路层,网络层,传输层,会话层,表示层和应用层)
Nginx的优点:
HAProxy的优点:
性能上HA胜,功能性和便利性上Nginx胜
对于Http协议,Haproxy处理效率比Nginx高。所以,没有特殊要求的时候或者一般场景,建议使用Haproxy来做Http协议负载
但如果是Web应用,那么建议使用Nginx!
总之,大家可以结合各自使用场景的特点来进行合理地选择
HAProxy下载:http://www.haproxy.org/download/1.8/src/haproxy-1.8.12.tar.gz
[root@localhost opt]# tar -zxvf haproxy-1.8.12.tar.gz
[root@localhost opt]# uname -r
3.10.0-514.6.2.el7.x86_64
根据内核版本选择编译参数:
[root@localhost opt]# cd haproxy-1.8.12
[root@localhost haproxy-1.8.12]# make TARGET=linux2628 PREFIX=/usr/local/haproxy
[root@localhost haproxy-1.8.12]# make install PREFIX=/usr/local/haproxy
[root@localhost haproxy-1.8.12]# /usr/local/haproxy/sbin/haproxy -v
[root@localhost haproxy-1.8.12]# cp /usr/local/haproxy/sbin/haproxy /usr/sbin/
[root@localhost haproxy-1.8.12]# cp ./examples/haproxy.init /etc/init.d/haproxy
[root@localhost haproxy-1.8.12]# chmod 755 /etc/init.d/haproxy
[root@localhost haproxy-1.8.12]# useradd -r haproxy
[root@localhost haproxy-1.8.12]# mkdir /etc/haproxy
[root@localhost haproxy-1.8.12]# vim /etc/haproxy/haproxy.cfg
#全局配置
global
#设置日志
log 127.0.0.1 local0 info
#当前工作目录
chroot /usr/local/haproxy
#用户与用户组
user haproxy
group haproxy
#运行进程ID
uid 99
gid 99
#守护进程启动
daemon
#最大连接数
maxconn 4096
#默认配置
defaults
#应用全局的日志配置
log global
#默认的模式mode {tcp|http|health},TCP是4层,HTTP是7层,health只返回OK
mode tcp
#日志类别tcplog
option tcplog
#不记录健康检查日志信息
option dontlognull
#3次失败则认为服务不可用
retries 3
#每个进程可用的最大连接数
maxconn 2000
#连接超时
timeout connect 5s
#客户端超时30秒,ha就会发起重新连接
timeout client 30s
#服务端超时15秒,ha就会发起重新连接
timeout server 15s
#绑定配置
listen rabbitmq_cluster
bind 192.168.154.128:5672
#配置TCP模式
mode tcp
#简单的轮询
balance roundrobin
#RabbitMQ集群节点配置,每隔5秒对mq集群做检查,2次正确证明服务可用,3次失败证明服务不可用
server A 192.168.154.128:5672 check inter 5000 rise 2 fall 3
server B 192.168.154.129:5672 check inter 5000 rise 2 fall 3
#haproxy监控页面地址
listen monitor
bind 192.168.154.128:8100
mode http
option httplog
stats enable
# 监控页面地址 http://192.168.204.143:8100/monitor
stats uri /monitor
stats refresh 5s
[root@localhost haproxy]# service haproxy start
项目发消息,只需要将服务器地址修改为143即可,其余不变
所有的请求都会交给HAProxy,其负载均衡给每个rabbitmq服务器
现在的最后一个问题暴露出来了,如果HAProxy服务器宕机,rabbitmq服务器就不可用了。所以我们需要对HAProxy也要做高可用的集群
[root@C ~]# yum install -y keepalived
[root@C ~]# rm -rf /etc/keepalived/keepalived.conf
[root@C ~]# vim /etc/keepalived/keepalived.conf
! configuration File for keepalived
global_defs {
router_id C ## 非常重要,标识本机的hostname
}
vrrp_script chk_haproxy{
script "/etc/keepalived/haproxy_check.sh" ## 执行的脚本位置
interval 2 ## 检测时间间隔
weight -20 ## 如果条件成立则权重减20
}
vrrp_instance VI_1 {
state MASTER ## 非常重要,标识主机,备用机143改为 BACKUP
interface ens33 ## 非常重要,网卡名(ifconfig查看)
virtual_router_id 66 ## 非常重要,自定义,虚拟路由ID号(主备节点要相同)
priority 100 ## 优先级(0-254),一般主机的大于备机
advert_int 1 ## 主备信息发送间隔,两个节点必须一致,默认1秒
authentication { ## 认证匹配,设置认证类型和密码,MASTER和BACKUP必须使用相同的密码才能正常通信
auth_type PASS
auth_pass 1111
}
track_script {
chk_haproxy ## 检查haproxy健康状况的脚本
}
virtual_ipaddress { ## 简称“VIP”
192.168.154.66/24 ## 非常重要,虚拟ip,可以指定多个,以后连接mq就用这个虚拟ip
}
}
virtual_server 192.168.154.66 5672 { ## 虚拟ip的详细配置
delay_loop 6 # 健康检查间隔,单位为秒
lb_algo rr # lvs调度算法rr|wrr|lc|wlc|lblc|sh|dh
lb_kind NAT # 负载均衡转发规则。一般包括DR,NAT,TUN 3种
protocol TCP # 转发协议,有TCP和UDP两种,一般用TCP
real_server 192.168.154.128 5672 { ## 本机的真实ip
weight 1 # 默认为1,0为失效
}
}
#!/bin/bash
COUNT=`ps -C haproxy --no-header |wc -l`
if [ $COUNT -eq 0 ];then
/usr/local/haproxy/sbin/haproxy -f /etc/haproxy/haproxy.cfg
sleep 2
if [ `ps -C haproxy --no-header |wc -l` -eq 0 ];then
killall keepalived
fi
fi
Keepalived组之间的心跳检查并不能察觉到HAproxy负载是否正常,所以需要使用此脚本。
在Keepalived主机上,开启此脚本检测HAproxy是否正常工作,如正常工作,记录日志。
如进程不存在,则尝试重启HAproxy,2秒后检测,如果还没有,则关掉主Keepalived,此时备Keepalived检测到主Keepalived挂掉,接管VIP,继续服务
[root@C etc]# chmod +x /etc/keepalived/haproxy_check.sh
[root@C etc]# systemctl stop firewalld
[root@C etc]# service keepalived start | stop | status | restart
[root@C etc]# ps -ef | grep haproxy
[root@C etc]# ps -ef | grep keepalived
[root@C etc]# ip a
@H_360_1869@
@H_127_1909@
[root@A ~]# curl 192.168.1.66:5672
AMQP ## 正常提供AMQP服务,表示通过vip访问mq服务正常
以上是脚本宝典为你收集整理的RabbitMQ详解全部内容,希望文章能够帮你解决RabbitMQ详解所遇到的问题。
本图文内容来源于网友网络收集整理提供,作为学习参考使用,版权属于原作者。
如您有任何意见或建议可联系处理。小编QQ:384754419,请注明来意。