版权说明:本文档由用户提供并上传,收益归属内容提供方,若内容存在侵权,请进行举报或认领
文档简介
RabbitMQ七种队列模式应用场景案例分析(通俗易懂)七种模式介绍与应用场景简单模式(HelloWorld)做最简单的事情,一个生产者对应一个消费者,RabbitMQ相当于一个消息代理,负责将A的消息转发给B应用场景:
将发送的电子邮件放到消息队列,然后邮件服务在队列中获取邮件并发送给收件人工作队列模式(Workqueues)在多个消费者之间分配任务(竞争的消费者模式),一个生产者对应多个消费者,一般适用于执行资源密集型任务,单个消费者处理不过来,需要多个消费者进行处理应用场景:
一个订单的处理需要10s,有多个订单可以同时放到消息队列,然后让多个消费者同时处理,这样就是并行了,而不是单个消费者的串行情况订阅模式(Publish/Subscribe)一次向许多消费者发送消息,一个生产者发送的消息会被多个消费者获取,也就是将消息将广播到所有的消费者中。应用场景:
更新商品库存后需要通知多个缓存和多个数据库,这里的结构应该是:一个fanout类型交换机扇出两个个消息队列,分别为缓存消息队列、数据库消息队列一个缓存消息队列对应着多个缓存消费者一个数据库消息队列对应着多个数据库消费者路由模式(Routing)有选择地(Routingkey)接收消息,发送消息到交换机并且要指定路由key,消费者将队列绑定到交换机时需要指定路由key,仅消费指定路由key的消息应用场景:
如在商品库存中增加了1台iphone12,iphone12促销活动消费者指定routingkey为iphone12,只有此促销活动会接收到消息,其它促销活动不关心也不会消费此routingkey的消息主题模式(Topics)根据主题(Topics)来接收消息,将路由key和某模式进行匹配,此时队列需要绑定在一个模式上,#匹配一个词或多个词,*只匹配一个词。应用场景:
同上,iphone促销活动可以接收主题为iphone的消息,如iphone12、iphone13等远程过程调用(RPC)如果我们需要在远程计算机上运行功能并等待结果就可以使用RPC,具体流程可以看图。应用场景:需要等待接口返回数据,如订单支付发布者确认(PublisherConfirms)与发布者进行可靠的发布确认,发布者确认是RabbitMQ扩展,可以实现可靠的发布。在通道上启用发布者确认后,RabbitMQ将异步确认发送者发布的消息,这意味着它们已在服务器端处理。应用场景:
对于消息可靠性要求较高,比如钱包扣款代码演示代码中没有对后面两种模式演示,有兴趣可以自己研究简单模式import
com.rabbitmq.client.Channel;
import
com.rabbitmq.client.Connection;
import
com.rabbitmq.client.ConnectionFactory;
import
java.io.IOException;
import
java.util.concurrent.TimeoutException;
public
class
Sender
{
private
final
static
String
QUEUE_NAME
=
"simple_queue";
public
static
void
main(String[]
args)
throws
IOException,
TimeoutException
{
ConnectionFactory
factory
=
new
ConnectionFactory();
factory.setHost("localhost");
factory.setPort(5672);
Connection
connection
=
factory.newConnection();
Channel
channel
=
connection.createChannel();
//
声明队列
//queue:队列名
//durable:是否持久化
//exclusive:是否排外
即只允许该channel访问该队列
一般等于true的话用于一个队列只能有一个消费者来消费的场景
//autoDelete:是否自动删除
消费完删除
//arguments:其他属性
channel.queueDeclare(QUEUE_NAME,
false,
false,
false,
null);
//消息内容
String
message
=
"simplest
mode
message";
channel.basicPublish("",
QUEUE_NAME,
null,
message.getBytes());
System.out.println("[x]Sent
'"
+
message
+
"'");
//最后关闭通关和连接
channel.close();
connection.close();
}
}import
com.rabbitmq.client.Channel;
import
com.rabbitmq.client.Connection;
import
com.rabbitmq.client.ConnectionFactory;
import
com.rabbitmq.client.DeliverCallback;
import
java.io.IOException;
import
java.util.concurrent.TimeoutException;
public
class
Receiver
{
private
final
static
String
QUEUE_NAME
=
"simplest_queue";
public
static
void
main(String[]
args)
throws
IOException,
InterruptedException,
TimeoutException
{
//
获取连接
ConnectionFactory
factory
=
new
ConnectionFactory();
factory.setHost("localhost");
factory.setPort(5672);
Connection
connection
=
factory.newConnection();
Channel
channel
=
connection.createChannel();
channel.queueDeclare(QUEUE_NAME,
false,
false,
false,
null);
DeliverCallback
deliverCallback
=
(consumerTag,
delivery)
->
{
String
message
=
new
String(delivery.getBody(),
"UTF-8");
System.out.println("
[x]
Received
'"
+
delivery.getEnvelope().getRoutingKey()
+
"':'"
+
message
+
"'");
};
channel.basicConsume(QUEUE_NAME,
true,
deliverCallback,
consumerTag
->
{
});
}
}工作队列模式搜索公众号后端架构师后台回复“架构整洁”,获取一份惊喜礼包。import
com.rabbitmq.client.Channel;
import
com.rabbitmq.client.Connection;
import
com.rabbitmq.client.ConnectionFactory;
import
com.rabbitmq.client.DeliverCallback;
import
java.io.IOException;
import
java.util.concurrent.TimeoutException;
public
class
Receiver1
{
private
final
static
String
QUEUE_NAME
=
"queue_work";
public
static
void
main(String[]
args)
throws
IOException,
InterruptedException,
TimeoutException
{
ConnectionFactory
factory
=
new
ConnectionFactory();
factory.setHost("localhost");
factory.setPort(5672);
Connection
connection
=
factory.newConnection();
Channel
channel
=
connection.createChannel();
channel.queueDeclare(QUEUE_NAME,
false,
false,
false,
null);
//
同一时刻服务器只会发送一条消息给消费者
channel.basicQos(1);
DeliverCallback
deliverCallback
=
(consumerTag,
delivery)
->
{
String
message
=
new
String(delivery.getBody(),
"UTF-8");
System.out.println("
[x]
Received
'"
+
delivery.getEnvelope().getRoutingKey()
+
"':'"
+
message
+
"'");
};
channel.basicConsume(QUEUE_NAME,
true,
deliverCallback,
consumerTag
->
{
});
}
}import
com.rabbitmq.client.Channel;
import
com.rabbitmq.client.Connection;
import
com.rabbitmq.client.ConnectionFactory;
import
com.rabbitmq.client.DeliverCallback;
import
java.io.IOException;
import
java.util.concurrent.TimeoutException;
public
class
Receiver2
{
private
final
static
String
QUEUE_NAME
=
"queue_work";
public
static
void
main(String[]
args)
throws
IOException,
InterruptedException,
TimeoutException
{
ConnectionFactory
factory
=
new
ConnectionFactory();
factory.setHost("localhost");
factory.setPort(5672);
Connection
connection
=
factory.newConnection();
Channel
channel
=
connection.createChannel();
channel.queueDeclare(QUEUE_NAME,
false,
false,
false,
null);
//
同一时刻服务器只会发送一条消息给消费者
channel.basicQos(1);
DeliverCallback
deliverCallback
=
(consumerTag,
delivery)
->
{
String
message
=
new
String(delivery.getBody(),
"UTF-8");
System.out.println("
[x]
Received
'"
+
delivery.getEnvelope().getRoutingKey()
+
"':'"
+
message
+
"'");
};
channel.basicConsume(QUEUE_NAME,
true,
deliverCallback,
consumerTag
->
{
});
}
}import
com.rabbitmq.client.Channel;
import
com.rabbitmq.client.Connection;
import
com.rabbitmq.client.ConnectionFactory;
import
java.io.IOException;
import
java.util.concurrent.TimeoutException;
public
class
Sender
{
private
final
static
String
QUEUE_NAME
=
"queue_work";
public
static
void
main(String[]
args)
throws
IOException,
InterruptedException,
TimeoutException
{
ConnectionFactory
factory
=
new
ConnectionFactory();
factory.setHost("localhost");
factory.setPort(5672);
Connection
connection
=
factory.newConnection();
Channel
channel
=
connection.createChannel();
//
声明队列
channel.queueDeclare(QUEUE_NAME,
false,
false,
false,
null);
for
(int
i
=
0;
i
<
100;
i++)
{
String
message
=
"work
mode
message"
+
i;
channel.basicPublish("",
QUEUE_NAME,
null,
message.getBytes());
System.out.println("[x]
Sent
'"
+
message
+
"'");
Thread.sleep(i
*
10);
}
channel.close();
connection.close();
}
}发布订阅模式import
com.rabbitmq.client.Channel;
import
com.rabbitmq.client.Connection;
import
com.rabbitmq.client.ConnectionFactory;
import
com.rabbitmq.client.DeliverCallback;
public
class
Receive1
{
private
static
final
String
EXCHANGE_NAME
=
"logs";
public
static
void
main(String[]
argv)
throws
Exception
{
ConnectionFactory
factory
=
new
ConnectionFactory();
factory.setHost("localhost");
Connection
connection
=
factory.newConnection();
Channel
channel
=
connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME,
"fanout");
String
queueName
=
channel.queueDeclare().getQueue();
channel.queueBind(queueName,
EXCHANGE_NAME,
"");
System.out.println("
[*]
Waiting
for
messages.
To
exit
press
CTRL+C");
//
订阅消息的回调函数
DeliverCallback
deliverCallback
=
(consumerTag,
delivery)
->
{
String
message
=
new
String(delivery.getBody(),
"UTF-8");
System.out.println("
[x]
Received
'"
+
message
+
"'");
};
//
消费者,有消息时出发订阅回调函数
channel.basicConsume(queueName,
true,
deliverCallback,
consumerTag
->
{
});
}
}import
com.rabbitmq.client.Channel;
import
com.rabbitmq.client.Connection;
import
com.rabbitmq.client.ConnectionFactory;
import
com.rabbitmq.client.DeliverCallback;
public
class
Receive2
{
private
static
final
String
EXCHANGE_NAME
=
"logs";
public
static
void
main(String[]
argv)
throws
Exception
{
ConnectionFactory
factory
=
new
ConnectionFactory();
factory.setHost("localhost");
Connection
connection
=
factory.newConnection();
Channel
channel
=
connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME,
"fanout");
String
queueName
=
channel.queueDeclare().getQueue();
channel.queueBind(queueName,
EXCHANGE_NAME,
"");
System.out.println("
[*]
Waiting
for
messages.
To
exit
press
CTRL+C");
//
订阅消息的回调函数
DeliverCallback
deliverCallback
=
(consumerTag,
delivery)
->
{
String
message
=
new
String(delivery.getBody(),
"UTF-8");
System.out.println("
[x]
Received2
'"
+
message
+
"'");
};
//
消费者,有消息时出发订阅回调函数
channel.basicConsume(queueName,
true,
deliverCallback,
consumerTag
->
{
});
}
}import
com.rabbitmq.client.Channel;
import
com.rabbitmq.client.Connection;
import
com.rabbitmq.client.ConnectionFactory;
public
class
Sender
{
private
static
final
String
EXCHANGE_NAME
=
"logs";
public
static
void
main(String[]
argv)
throws
Exception
{
ConnectionFactory
factory
=
new
ConnectionFactory();
factory.setHost("localhost");
Connection
connection
=
factory.newConnection();
Channel
channel
=
connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME,
"fanout");
String
message
=
"publish
subscribe
message";
channel.basicPublish(EXCHANGE_NAME,
"",
null,
message.getBytes("UTF-8"));
System.out.println("
[x]
Sent
'"
+
message
+
"'");
channel.close();
connection.close();
}
}路由模式import
com.rabbitmq.client.Channel;
import
com.rabbitmq.client.Connection;
import
com.rabbitmq.client.ConnectionFactory;
import
com.rabbitmq.client.DeliverCallback;
import
java.io.IOException;
import
java.util.concurrent.TimeoutException;
public
class
Receiver1
{
private
final
static
String
QUEUE_NAME
=
"queue_routing";
private
final
static
String
EXCHANGE_NAME
=
"exchange_direct";
public
static
void
main(String[]
args)
throws
IOException,
TimeoutException
{
ConnectionFactory
factory
=
new
ConnectionFactory();
factory.setHost("localhost");
factory.setPort(5672);
Connection
connection
=
factory.newConnection();
Channel
channel
=
connection.createChannel();
channel.queueDeclare(QUEUE_NAME,
false,
false,
false,
null);
//
指定路由的key,接收key和key2
channel.queueBind(QUEUE_NAME,
EXCHANGE_NAME,
"key");
channel.queueBind(QUEUE_NAME,
EXCHANGE_NAME,
"key2");
channel.basicQos(1);
DeliverCallback
deliverCallback
=
(consumerTag,
delivery)
->
{
String
message
=
new
String(delivery.getBody(),
"UTF-8");
System.out.println("
[x]
Received
'"
+
delivery.getEnvelope().getRoutingKey()
+
"':'"
+
message
+
"'");
};
channel.basicConsume(QUEUE_NAME,
true,
deliverCallback,
consumerTag
->
{
});
}
}import
com.rabbitmq.client.Channel;
import
com.rabbitmq.client.Connection;
import
com.rabbitmq.client.ConnectionFactory;
import
com.rabbitmq.client.DeliverCallback;
import
java.io.IOException;
import
java.util.concurrent.TimeoutException;
public
class
Receiver2
{
private
final
static
String
QUEUE_NAME
=
"queue_routing2";
private
final
static
String
EXCHANGE_NAME
=
"exchange_direct";
public
static
void
main(String[]
args)
throws
IOException,
TimeoutException
{
ConnectionFactory
factory
=
new
ConnectionFactory();
factory.setHost("localhost");
factory.setPort(5672);
Connection
connection
=
factory.newConnection();
Channel
channel
=
connection.createChannel();
channel.queueDeclare(QUEUE_NAME,
false,
false,
false,
null);
//
仅接收key2
channel.queueBind(QUEUE_NAME,
EXCHANGE_NAME,
"key2");
channel.basicQos(1);
DeliverCallback
deliverCallback
=
(consumerTag,
delivery)
->
{
String
message
=
new
String(delivery.getBody(),
"UTF-8");
System.out.println("
[x]
Received
'"
+
delivery.getEnvelope().getRoutingKey()
+
"':'"
+
message
+
"'");
};
channel.basicConsume(QUEUE_NAME,
true,
deliverCallback,
consumerTag
->
{
});
}
}搜索公众号后端架构师后台回复“面试”,获取一份惊喜礼包。import
com.rabbitmq.client.Channel;
import
com.rabbitmq.client.Connection;
import
com.rabbitmq.client.ConnectionFactory;
import
java.io.IOException;
import
java.util.concurrent.TimeoutException;
public
class
Sender
{
private
final
static
String
EXCHANGE_NAME
=
"exchange_direct";
private
final
static
String
EXCHANGE_TYPE
=
"direct";
public
static
void
main(String[]
args)
throws
IOException,
TimeoutException
{
ConnectionFactory
factory
=
new
ConnectionFactory();
factory.setHost("localhost");
factory.setPort(5672);
Connection
connection
=
factory.newConnection();
Channel
channel
=
connection.createChannel();
//
交换机声明
channel.exchangeDeclare(EXCHANGE_NAME,
EXCHANGE_TYPE);
//
只有routingKey相同的才会消费
String
message
=
"routing
mode
message";
channel.basicPublish(EXCHANGE_NAME,
"key2",
null,
message.getBytes());
System.out.println("[x]
Sent
'"
+
message
+
"'");
//
channel.basicPublish(EXCHANGE_NAME,
"key",
null,
message.getBytes());
//
System.out.println("[x]
Sent
'"
+
message
+
"'");
channel.close();
connection.close();
}
}主题模式import
com.rabbitmq.client.Channel;
import
com.rabbitmq.client.Connection;
import
com.rabbitmq.client.ConnectionFactory;
import
com.rabbitmq.client.DeliverCallback;
import
java.io.IOException;
import
java.util.concurrent.TimeoutException;
public
class
Receiver1
{
private
final
static
String
QUEUE_NAME
=
"queue_topic";
private
final
static
String
EXCHANGE_NAME
=
"exchange_topic";
public
static
void
main(String[]
args)
throws
IOException,
InterruptedException,
TimeoutException
{
ConnectionFactory
factory
=
new
ConnectionFactory();
factory.setHost("localhost");
factory.setPort(5672);
Connection
connection
=
factory.newConnection();
Channel
channel
=
connection.createChannel();
channel.queueDeclare(QUEUE_NAME,
false,
false,
false,
null);
//
可以接收key.1
channel.queueBind(QUEUE_NAME,
EXCHANGE_NAME,
"key.*");
channel.basicQos(1);
DeliverCallback
deliverCallback
=
(consumerTag,
delivery)
->
{
String
message
=
new
String(delivery.getBody(),
"UTF-8");
System.out.println("
[x]
Received
'"
+
delivery.getEnvelope().getRoutingKey()
+
"':'"
+
message
+
"'");
};
channel.basicConsume(QUEUE_NAME,
true,
deliverCallback,
consumerTag
->
{
});
}
}import
com.rabbitmq.client.Channel;
import
com.rabbitmq.client.Connection;
import
com.rabbitmq.client.ConnectionFactory;
import
com.rabbitmq.client.DeliverCallback;
import
java.io.IOException;
import
java.util.concurrent.TimeoutException;
public
class
Receiver2
{
private
final
static
String
QUEUE_NAME
=
"queue_topic2";
private
final
static
String
EXCHANGE_NAME
=
"exchange_topic";
private
final
static
String
EXCHANGE_TYPE
=
"topic";
public
static
void
main(String[]
args)
throws
IOException,
InterruptedException,
TimeoutException
{
ConnectionFactory
factory
=
new
ConnectionFactory();
factory.setHost("localhost");
factory.setPort(5672);
Connection
connection
=
factory.newConnection();
Channel
channel
=
connection.createChannel();
channel.queueDeclare(QUEUE_NAME,
false,
false,
false,
null);
//
*号代表单个单词,可以接收key.1
channel.queueBind(QUEUE_NAME,
EXCHANGE_NAME,
"*.*");
//
#号代表多个单词,可以接收key.1.2
channel.queueBind(QUEUE_NAME,
EXCHANGE_NAME,
"*.#");
channel.basicQos(1);
DeliverCallback
deliverCallback
=
(consumerTag,
delivery)
->
{
String
message
=
new
String(delivery.getBody(),
"UTF-8
温馨提示
- 1. 本站所有资源如无特殊说明,都需要本地电脑安装OFFICE2007和PDF阅读器。图纸软件为CAD,CAXA,PROE,UG,SolidWorks等.压缩文件请下载最新的WinRAR软件解压。
- 2. 本站的文档不包含任何第三方提供的附件图纸等,如果需要附件,请联系上传者。文件的所有权益归上传用户所有。
- 3. 本站RAR压缩包中若带图纸,网页内容里面会有图纸预览,若没有图纸预览就没有图纸。
- 4. 未经权益所有人同意不得将文件中的内容挪作商业或盈利用途。
- 5. 人人文库网仅提供信息存储空间,仅对用户上传内容的表现方式做保护处理,对用户上传分享的文档内容本身不做任何修改或编辑,并不能对任何下载内容负责。
- 6. 下载文件中如有侵权或不适当内容,请与我们联系,我们立即纠正。
- 7. 本站不保证下载资源的准确性、安全性和完整性, 同时也不承担用户因使用这些下载资源对自己和他人造成任何形式的伤害或损失。
最新文档
- 商业街运营策划方案(2篇)
- 人力资源公司宣传方案(2篇)
- 化学(人教版)九上同步讲练测:3.3 元素(教师版)
- 学校打通消防“生命通道”推进工作实施方案
- 2024-2034年中国醚后碳四(C4)市场评估分析及发展前景调研战略研究报告
- 2024-2034年中国速溶茶行业供需趋势及投资风险研究报告
- 2024-2034年中国轨道交通PIS系统市场深度分析及投资战略咨询报告
- 2024-2034年中国超声医用显示器行业发展趋势预测及投资战略规划分析报告
- 2024-2034年中国计算机软件程序编制行业市场深度分析及投资战略规划建议报告
- 2024-2034年中国螺丝机市场运行态势及行业发展前景预测报告
- 超星尔雅学习通《国际金融》2020章节测试含答案(上)
- 储粮化学药剂管理和使用规范
- 轮胎配方及原料介绍
- 上一年度的资金来源及使用情况、公益活动和非营利活动的明细情况上课讲义
- 我的初中数学教学故事
- 中国石油大学毕业生登记表
- 国内己内酰胺及尼龙6生产现状
- 三国立志传2秘籍
- 个人征信查询授权书模板
- 青年教师培训
- 手术室各种无菌巾的标准规格
评论
0/150
提交评论