日期:2025/04/04 19:53来源:未知 人气:55
消息队列在实际应用中常用的使用场景 。异步处理, 应用解耦, 流量削锋和消息通讯 四个场景
异步处理
场景说明: 用户注册后, 需要发注册邮件和注册短信。
传统的做法有两种 1.串行的方式; 2.并行方式
(1) 串行方式: 将注册信息写入数据库成功后, 发送注册邮件, 再发送注册短信 。以 上三个任务全部完成后, 返回给客户端
(2) 并行方式: 将注册信息写入数据库成功后, 发送注册邮件的同时, 发送注册短 信 。以上三个任务完成后, 返回给客户端 。与串行的差别是, 并行的方式可以提高处理的时间
假设三个业务节点每个使用 50 毫秒钟, 不考虑网络等其他开销, 则串行方式的时间是 150 毫秒, 并行 的时间可能是 100 毫秒 。
因为 CPU 在单位时间内处理的请求数是一定的, 假设 CPU1 秒内吞吐量是 100 次 。则串行方式 1 秒内 CPU 可处理的请求量是 7 次 (1000/150) 。并行方式处理的请求量是 10 次 (1000/100)
小结: 如以上案例描述, 传统的方式系统的性能 (并发量, 吞吐量, 响应时间) 会有 瓶颈 。如何解决这个问题呢?
引入消息队列, 将不是必须的业务逻辑, 异步处理 。改造后的架构如下:
按照以上约定, 用户的响应时间相当于是注册信息写入数据库的时间, 也就是 50 毫 秒 。注册邮件, 发送短信写入消息队列后, 直接返回, 因此写入消息队列的速度很 快, 基本可以忽略, 因此用户的响应时间可能是 50 毫秒 。因此架构改变后, 系统的 吞吐量提高到每秒 20 QPS 。比串行提高了 3 倍, 比并行提高了两倍
应用解耦
场景说明: 用户下单后, 订单系统需要通知库存系统。传统的做法是, 订单系统调用库存系统的接口 。如下图
传统模式的缺点:
假如库存系统无法访问, 则订单减库存将失败, 从而导致订单失败
订单系统与库存系统耦合
如何解决以上问题呢? 引入应用消息队列后的方案, 如下图:
订单系统: 用户下单后, 订单系统完成持久化处理, 将消息写入消息队列, 返回用户订单下单成功
库存系统: 订阅下单的消息, 采用拉/推的方式, 获取下单信息, 库存系统根据下单信 息, 进行库存操作
假如: 在下单时库存系统不能正常使用 。也不影响正常下单, 因为下单后, 订单系统 写入消息队列就不再关心其他的后续操作了 。实现订单系统与库存系统的应用解耦
流量削锋
流量削锋也是消息队列中的常用场景, 一般在秒杀或团抢活动中使用广泛
应用场景: 秒杀活动, 一般会因为流量过大, 导致流量暴增, 应用挂掉 。为解决这个 问题, 一般需要在应用前端加入消息队列 。
可以控制活动的人数
可以缓解短时间内高流量压垮应用
用户的请求, 服务器接收后, 首先写入消息队列 。假如消息队列长度超过最大数量, 则直接抛弃用户请求或跳转到错误页面
秒杀业务根据消息队列中的请求信息, 再做后续处理
日志处理
日志处理是指将消息队列用在日志处理中, 比如 Kafka 的应用, 解决大量日志传输的 问题。
日志采集客户端, 负责日志数据采集, 定时写受写入 Kafka 队列
Kafka 消息队列, 负责日志数据的接收, 存储和转发
日志处理应用: 订阅并消费 kafka 队列中的日志数据
以下是新浪 kafka 日志处理应用案例:
(1)Kafka: 接收用户日志的消息队列
(2)Logstash: 做日志解析, 统一成 JSON 输出给 Elasticsearch
(3)Elasticsearch: 实时日志分析服务的核心技术, 一个 schemaless, 实时的数据存储 服务, 通过 index 组织数据, 兼具强大的搜索和统计功能
(4)Kibana: 基于 Elasticsearch 的数据可视化组件, 超强的数据可视化能力是众多公司 选择 ELK stack 的重要原因
消息通讯
消息通讯是指, 消息队列一般都内置了高效的通信机制, 因此也可以用在纯的消息通 讯 。比如实现点对点消息队列, 或者聊天室等
点对点通讯:
客户端 A 和客户端 B 使用同一队列, 进行消息通讯 。
聊天室通讯:
客户端 A, 客户端 B, 客户端 N 订阅同一主题, 进行消息发布和接收 。实现类似聊天 室效果 。
以上实际是消息队列的两种消息模式, 点对点或发布订阅模式 。模型为示意图, 供参考 。
消息队列中的若干消息如果是对同一个数据进行操作, 这些操作具有前后的关系, 必 须要按前后的顺序执行, 否则就会造成数据异常 。举例:
比如通过 mysql binlog 进行两个数据库的数据同步, 由于对数据库的数据操作是具有 顺序性的, 如果操作顺序搞反, 就会造成不可估量的错误 。比如数据库对一条数据依 次进行了 插入->更新->删除操作, 这个顺序必须是这样, 如果在同步过程中, 消息的 顺序变成了 删除->插入->更新, 那么原本应该被删除的数据, 就没有被删除, 造成数据的不一致问题 。
举例场景:
RabbitMQ: ①一个 queue, 有多个 consumer 去消费, 这样就会造成顺序的错误 , consumer 从 MQ 里面读取数据是有序的, 但是每个 consumer 的执行时间是不固定 的, 无法保证先读到消息的 consumer 一定先完成操作, 这样就会出现消息并没有按 照顺序执行, 造成数据顺序错误。
②一个 queue 对应一个 consumer, 但是 consumer 里面进行了多线程消费, 这样也 会造成消息消费顺序错误。
解决方案:
①拆分多个 queue, 每个 queue 一个 consumer, 就是多一些 queue 而已, 确实是麻烦点; 这样也会造成吞吐量下降, 可以在消费者内部采用多线程的方式取消费。
一个 queue 对应一个 consumer
②或者就一个 queue 但是对应一个 consumer, 然后这个 consumer 内部用内存队列做排队, 然后分发给底层不同的 worker 来处理
一个 queue 对应一个 consumer, 采用多线程
分布式事务: 不同的服务操作不同的数据源 (库或表), 保证数据一致性的问题。
解决: 采用 RabbitMQ 消息最终一致性的解决方案, 解决分布式事务问题 。 分布式事务场景:
1 、电商项目中的商品库和 ES 库数据同步问题 。
2 、电商项目中: 支付----!订单---!库存, 一系列操作, 进行状态更改等 。
在互联网应用中, 基本都会有用户注册的功能。在注册的同时, 我们会做出如下操 作:
收集用户录入信息, 保存到数据库向用户的手机或邮箱发送验证码等等 …
如果是传统的集中式架构, 实现这个功能非常简单: 开启一个本地事务, 往本地数据 库中插入一条用户数据, 发送验证码, 提交事物 。
但是在分布式架构中, 用户和发送验证码是两个独立的服务, 它们都有各自的数据 库, 那么就不能通过本地事物保证操作的原子性 。这时我们就需要用到 RabbitMQ (消 息队列) 来为我们实现这个需求。
在用户进行注册操作的时候, 我们为该操作创建一条消息, 当用户信息保存成功时, 把这条消息发送到消息队列 。验证码系统会监听消息, 一旦接受到消息, 就会给该用 户发送验证码 。
消息可靠性一般来说由 3 方面来保证:
1) 生产者
RabbitMQ 提供 transaction 事务和 confirm 模式来确保生产者不丢消息;
Transaction 事务机制就是说: 发送消息前, 开启事务 (channel.txSelect()) ,然后 发送消息, 如果发送过程中出现什么异常, 事务就会回滚
(channel.txRollback()) ,如果发送成功则提交事务
(channel.txCommit()), 然而, 这种方式有个缺点: 吞吐量下降 。
confirm 模式用的居多: 一旦 channel 进入 confirm 模式, 所有在该信道上发布的 消息都将会被指派一个唯一的 ID (从 1 开始), 一旦消息被投递到所有匹配的队列 之后;
rabbitMQ 就会发送一个 ACK 给生产者 (包含消息的唯一 ID), 这就使得生产者知 道消息已经正确到达目的队列了;
如果 rabbitMQ 没能处理该消息, 则会发送一个 Nack 消息给你, 可以进行重试操 作 。
2) 消息队列本身
可以进行消息持久化, 即使 rabbitMQ 挂了, 重启后也能恢复数据 如果要进行消息持久化, 那么需要对以下 3 种实体均配置持久化
a) Exchange
声明 exchange 时设置持久化 (durable = true) 并且不自动删除(autoDelete = false)
b) Queue
声明 queue 时设置持久化 (durable = true) 并且不自动删除(autoDelete = false)
c) message
发送消息时通过设置 deliveryMode=2 持久化消息
3) 消费者
消费者丢数据一般是因为采用了自动确认消息模式, 消费者在收到消息之后, 处理 消息之前, 会自动回复 RabbitMQ 已收到消息; 如果这时处理消息失败, 就会丢失该 消息; 改为手动确认消息即可! 手动确认模式下消费失败时, 不将其重新放入队列 (确认重试也不会成功的情形), 打印错误信息后, 通知相关人员, 人工介入处理 。
保证消息幂等性 。幂等性概念: 一个请求, 不管重复来多少次, 结果是不会改变的 。
RabbitMQ 、RocketMQ 、Kafka 等任何队列不保证消息不重复, 如果业务需要消息不重 复消费, 则需要消费端处理业务消息要保持幂等性
方式一: Redis 的 setNX() , 做消息 id 去重 java 版本目前不支持设置过期时间
//Redis 中操作, 判断是否已经操作过 TODO
boolean flag = jedis.setNX(key);
if(flag){
//消费
}else{
//忽略, 重复消费
}
方式二: redis 的 Incr 原子操作: key 自增, 大于 0 返回值大于 0 则说明消费过, (key 可以是消息的 md5 取值, 或者如果消息 id 设计合理直接用 id 做 key)
int num = jedis.incr(key);
if(num == 1){
//消费
}else{
//忽略, 重复消费
}
方式三: 数据库去重表
设计一个去重表, 某个字段使用 Message 的 key 做唯一索引, 因为存在唯一 索引, 所以重复消费会失败
CREATE TABLE message_record
( id
int(11) unsigned NOT NULL AUTO_INCREMENT, key
varchar(128) DEFAULT NULL, create_time
datetime DEFAULT NULL, PRIMARY KEY (id
), UNIQUE KEY key
(key
) ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;