当前位置: 首页 > 编程日记 > 正文

RabbitMQ 实战(四)消费者 ack 以及 生产者 confirms

2019独角兽企业重金招聘Python工程师标准>>> hot3.png

这篇文章主要讲 RabbitMQ 中 消费者 ack 以及 生产者 confirms。

如上图,生产者把消息发送到 RabbitMQ,然后 RabbitMQ 再把消息投递到消费者。

生产者和 RabbitMQ,以及 RabbitMQ 和消费者都是通过 TCP 连接,但是他们之间是通过信道(Channel)传递数据的。多个线程共享一个连接,但是每个线程拥有独自的信道。

消费者 ack

  • 问题:怎么保证 RabbitMQ 投递的消息被成功投递到了消费者?

    RabbitMQ 投递的消息,刚投递一半,产生了网络抖动,就有可能到不了消费者。

  • 解决办法:

    RabbitMQ 对消费者说:“如果你成功接收到了消息,给我说确认收到了,不然我就当你没有收到,我还会重新投递”

在 RabbitMQ 中,有两种 acknowledgement 模式。

自动 acknowledgement 模式

这也称作发后即忘模式

在这种模式下,RabbitMQ 投递了消息,在投递成功之前,如果消费者的 TCP 连接 或者 channel 关闭了,这条消息就会丢失。

会有丢失消息问题。

手动 acknowledgement 模式

在这种模式下,RabbitMQ 投递了消息,在投递成功之前,如果消费者的 TCP 连接 或者 channel 关闭了,导致这条消息没有被 acked,RabbitMQ 会自动把当前消息重新入队,再次投递。

会有重复投递消息的问题,所以消费者得准备好处理重复消息的问题,就是所谓的:幂等性。

为了启用 手动 ack 模式,消费者需要实现 ChannelAwareMessageListener 接口。

@Component
public class Consumer implements ChannelAwareMessageListener {@Autowiredprivate MessageConverter messageConverter;@Overridepublic void onMessage(Message message, Channel channel) throws Exception {MessageProperties messageProperties = message.getMessageProperties();// 代表投递的标识符,唯一标识了当前信道上的投递,通过 deliveryTag ,消费者就可以告诉 RabbitMQ 确认收到了当前消息,见下面的方法long deliveryTag = messageProperties.getDeliveryTag();// 如果是重复投递的消息,redelivered 为 trueBoolean redelivered = messageProperties.getRedelivered();// 获取生产者发送的原始消息Object originalMessage = messageConverter.fromMessage(message);Console.log("consume message = {} , deliveryTag = {} , redelivered = {}", originalMessage, deliveryTag, redelivered);// 代表消费者确认收到当前消息,第二个参数表示一次是否 ack 多条消息channel.basicAck(deliveryTag, false);// 代表消费者拒绝一条或者多条消息,第二个参数表示一次是否拒绝多条消息,第三个参数表示是否把当前消息重新入队
//        channel.basicNack(deliveryTag, false, false);// 代表消费者拒绝当前消息,第二个参数表示是否把当前消息重新入队
//        channel.basicReject(deliveryTag,false);}
}
  • channel.basicAck

    代表消费者确认收到当前消息,语义上表示消费者成功处理了当前消息。

  • channel.basicNack

    代表消费者拒绝一条或者多条消息。basicNack 算是 basicReject 的一个扩展,因为 basicReject 不能一次拒绝多条消息。

  • channel.basicReject

    代表消费者拒绝这条消息,语义上表示消费者没有处理当前消息。

    对于 basicNack 和 basicReject ,如果参数 boolean requeue 传入 false,消息还是会从队列里面删除。这三个方法只是语义上的不同。

  • deliveryTag

    deliveryTag 是 64 bit long 值,从 1 开始,不停的递增 1。不同的 channel 有独立的 deliveryTag。比如有两个消费者,你会发现,都是从 1 开始递增,互不影响。

由于上面创建的消费者,没有指明监听那个队列,所以还需要创建一个 MessageListenerContainer

@Bean
public SimpleMessageListenerContainer container(ConnectionFactory connectionFactory, ChannelAwareMessageListener listener) {SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();container.setConnectionFactory(connectionFactory);// 指定消费者container.setMessageListener(listener);// 指定监听的队列container.setQueueNames(QUEUE_NAME);// 设置消费者的 ack 模式为手动确认模式container.setAcknowledgeMode(AcknowledgeMode.MANUAL);container.setPrefetchCount(300);return container;
}

这样就开启了消费者手动 ack 模式。

注意

如果开启了消费者手动 ack 模式,但是又没有调用手动确认方法(比如:channel.basicAck),那问题就大了,RabbitMQ 会在当前 channel 上一直阻塞,等待消费者 ack。

生产者 confirms

  • 问题:怎么保证生产者发送的消息被 RabbitMQ 成功接收?

    生产者发送的消息,刚发送一半,产生了网络抖动,就有可能到不了 RabbitMQ。

  • 解决办法:

    生产者对 RabbitMQ 说:“如果你成功接收到了消息,给我说确认收到了,不然我就当你没有收到”

自定义消息元数据

/*** 自定义消息元数据*/
@NoArgsConstructor
@Data
public class RabbitMetaMessage implements Serializable{/*** 是否是 returnCallback*/private boolean returnCallback;/*** 承载原始消息数据数据*/private Object payload;public RabbitMetaMessage(Object payload) {this.payload = payload;}
}
  • returnCallback 标记当前消息是否触发了 returnCallback(后面会解释)
  • payload 保存原始消息数据

生产者

先把消息存储到 redis,再发送到 rabbitmq

@RestController
public class ProducerController {@Autowiredprivate RabbitTemplate rabbitTemplate;@Autowiredprivate RedisTemplate redisTemplate;@Autowiredprivate DefaultKeyGenerator keyGenerator;@GetMapping("/sendMessage")public Object sendMessage() {new Thread(() -> {HashOperations hashOperations = redisTemplate.opsForHash();for (int i = 0; i < 1; i++) {String id = keyGenerator.generateKey() + "";String value = "message " + i;RabbitMetaMessage rabbitMetaMessage = new RabbitMetaMessage(value);// 先把消息存储到 redishashOperations.put(RedisConfig.RETRY_KEY, id, rabbitMetaMessage);Console.log("send message = {}", value);// 再发送到 rabbitmqrabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE_NAME, RabbitMQConfig.ROUTING_KEY, value, (message) -> {message.getMessageProperties().setMessageId(id);return message;}, new CorrelationData(id));}}).start();return "ok";}}

配置 ConnectionFactory

@Bean
public ConnectionFactory connectionFactory() {CachingConnectionFactory connectionFactory = new CachingConnectionFactory("192.168.238.132", 5672);connectionFactory.setUsername("guest");connectionFactory.setPassword("guest");// 设置 生产者 confirmsconnectionFactory.setPublisherConfirms(true);// 设置 生产者 ReturnsconnectionFactory.setPublisherReturns(true);return connectionFactory;
}

配置 RabbitTemplate

@Bean
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);// 必须设置为 true,不然当 发送到交换器成功,但是没有匹配的队列,不会触发 ReturnCallback 回调// 而且 ReturnCallback 比 ConfirmCallback 先回调,意思就是 ReturnCallback 执行完了才会执行 ConfirmCallbackrabbitTemplate.setMandatory(true);// 设置 ConfirmCallback 回调rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {Console.log("ConfirmCallback , correlationData = {} , ack = {} , cause = {} ", correlationData, ack, cause);// 如果发送到交换器都没有成功(比如说删除了交换器),ack 返回值为 false// 如果发送到交换器成功,但是没有匹配的队列(比如说取消了绑定),ack 返回值为还是 true (这是一个坑,需要注意)if (ack) {String messageId = correlationData.getId();RabbitMetaMessage rabbitMetaMessage = (RabbitMetaMessage) redisTemplate.opsForHash().get(RedisConfig.RETRY_KEY, messageId);Console.log("rabbitMetaMessage = {}", rabbitMetaMessage);if (!rabbitMetaMessage.isReturnCallback()) {// 到这一步才能完全保证消息成功发送到了 rabbitmq// 删除 redis 里面的消息redisTemplate.opsForHash().delete(RedisConfig.RETRY_KEY, messageId);}}});// 设置 ReturnCallback 回调// 如果发送到交换器成功,但是没有匹配的队列,就会触发这个回调rabbitTemplate.setReturnCallback((message, replyCode, replyText,exchange, routingKey) -> {Console.log("ReturnCallback unroutable messages, message = {} , replyCode = {} , replyText = {} , exchange = {} , routingKey = {} ", message, replyCode, replyText, exchange, routingKey);// 从 redis 取出消息,设置 returnCallback 设置为 trueString messageId = message.getMessageProperties().getMessageId();RabbitMetaMessage rabbitMetaMessage = (RabbitMetaMessage) redisTemplate.opsForHash().get(RedisConfig.RETRY_KEY, messageId);rabbitMetaMessage.setReturnCallback(true);redisTemplate.opsForHash().put(RedisConfig.RETRY_KEY, messageId, rabbitMetaMessage);});return rabbitTemplate;
}

ReturnCallback 回调

必须 rabbitTemplate.setMandatory(true),不然当 发送到交换器成功,但是没有匹配的队列,不会触发 ReturnCallback 回调。而且 ReturnCallback 比 ConfirmCallback 先回调。

如何模拟 发送到交换器成功,但是没有匹配的队列,先把项目启动,然后再把队列解绑,再发送消息,就会触发 ReturnCallback 回调,而且发现消息也丢失了,没有到任何队列。

这样就解绑了。

运行项目,然后打开浏览器,输入 http://localhost:9999/sendMessage

控制台打出如下日志

这样就触发了 ReturnCallback 回调 ,从 redis 取出消息,设置 returnCallback 设置为 true。你会发现 ConfirmCallback 的 ack 返回值还是 true。

ConfirmCallback 回调

这里有个需要注意的地方,如果发送到交换器成功,但是没有匹配的队列(比如说取消了绑定),ack 返回值为还是 true (这是一个坑,需要注意,就像上面那种情况!!!)。所以不能单靠这个来判断消息真的发送成功了。这个时候会先触发 ReturnCallback 回调,我们把 returnCallback 设置为 true,所以还得判断 returnCallback 是否为 true,如果为 ture,表示消息发送不成功,false 才能完全保证消息成功发送到了 rabbitmq。

如何模拟 ack 返回值为 false,先把项目启动,然后再把交换器删除,就会发现 ConfirmCallback 的 ack 为 false。

运行项目,然后打开浏览器,输入 http://localhost:9999/sendMessage

控制台打出如下日志

你会发现 ConfirmCallback 的 ack 返回值才是 false。

注意

不能单单依靠 ConfirmCallback 的 ack 返回值为 true,就断定当前消息发送成功了。

源码地址

  • GitHub

参考资料

Consumer Acknowledgements and Publisher Confirms

结语

由于本人知识和能力有限,文中如有没说清楚的地方,希望大家能在评论区指出,以帮助我将博文写得更好。

转载于:https://my.oschina.net/u/3523423/blog/1620885

相关文章:

【imx6】/dev中fb和video的对应关系

imx6q关于fb和video的设备信息 设备节点 rootmyzr:/unit_tests# ls /dev/fb* -l lrwxrwxrwx 1 root root 3 Jan 1 1970 /dev/fb -> fb0 crw-rw---- 1 root video 29, 0 Jan 1 1970 /dev/fb0 crw-rw---- 1 root video 29, 1 Jan 1 1970 /dev/fb1 crw-rw---- 1 r…

flash绘图API:恋上你的CD

早上&#xff0c;我无意间碰撞到一个女孩&#xff0c;那时候&#xff0c;她匆匆忙地走了。从她的口袋里面掉下了一本陈旧的书&#xff0c;在哪里我看到她藏在书中的那封陈旧的信和cd。我好奇打开它&#xff0c;一边听着她那张cd&#xff0c;一边看她的写的信&#xff0c;忽然间…

【Ubuntu】ubuntu工具 记录shell终端的内容到文件中:script

###用法 $ script -h Usage: script [options] [file] Options: -a, --append append the output -c, --command run command rather than interactive shell -r, --return return exit code of the child process -f, --flush run flush after each write –force use outpu…

弃Java、Swift于不顾,为何选Python?

作者 | JACE HARR译者 | 姜松浩转载自 CSDN&#xff08;ID&#xff1a;CSDNNews&#xff09;以下为译文&#xff1a;刚入行的程序员总是询问他们应该从哪种语言开始&#xff0c;我告诉他们&#xff0c;他们应该首先学习 Python。以下是使用 Python 开始自学编程去探险的一些原因…

iOS事件处理,看我就够了~

该文章属于<简书 — 刘小壮>原创&#xff0c;转载请注明&#xff1a; <简书 — 刘小壮> https://www.jianshu.com/p/b0884faae603 好久没写博客了&#xff0c;前后算起来刚好有一年了。这期间博客也不是一直没变化&#xff0c;细心的同学应该能发现&#xff0c;我一…

ISO9000机房管理办法

1 总则<?xml:namespace prefix o ns "urn:schemas-microsoft-com:office:office" />1.1制定目的(1) 规范公司机房管理以及网管相关工作。1.2适用范围公司网络机房以及资讯组人员。1.3权责单位(1) 资讯组负责本办法制定、修改、废止之起草工作。(2) 总…

1400小时开源语音数据集,你想要都在这儿

整理 | 一一出品 | AI科技大本营&#xff08;ID:rgznai100&#xff09;3 月 1 日&#xff0c;由 Mozilla 基金会发起的 Common Voice 项目&#xff0c;发布新版语音识别数据集&#xff0c;包括来自 42000 名贡献者&#xff0c;超过 1400 小时的语音样本数据&#xff0c;涵盖包括…

【VirtualBox】VirtualBox使用现有的虚拟盘文件(如VHD)创建虚拟机时,报错:打开虚拟硬盘失败,“UUID already exist”的解决方法

###0、问题描述 使用现有的虚拟盘文件&#xff08;如VHD&#xff09;创建虚拟机时&#xff0c;报错&#xff1a;打开虚拟硬盘失败&#xff0c;“UUID already exists”的错误。 ###1、参考博客 https://www.cnblogs.com/xqzt/p/5053338.html https://jingyan.baidu.com/articl…

JDK10 EA版特性速览

今天收到一封邮件组的邮件&#xff0c;是关于JDK 10 First Release Candidate的&#xff0c; JDK10 b43版将作为第一个JDK10的RC版。 b43版特性包括&#xff1a; 286: Local-Variable Type Inference296: Consolidate the JDK Forest into a Single Repository304: Garbage-Col…

linux主机常用管理命令

1.杀掉MYSQL SHELL ps aux|grep mysql|grep -v grep|awk {print $2}|xargs kill -9 2.删除当前目录下0字节的文件 find -type f -size 0 -exec rm -rf {} \; 3.匹配当data里包含"donald"&#xff0c;输出第4列 awk /donald/ {print $4} data 扩展1&#xff1a; awk /…

【Qt】新安装的虚拟机,使用QtCreator第一次编译时报错:g++: Command not found

1、问题描述 新安装的虚拟机&#xff0c;使用QtCreator第一次编译时报错&#xff1a;g: Command not found &#xff08;或着报&#xff0c;make执行失败之类的错误&#xff09; 2、原因分析 新安装的虚拟机中&#xff0c;没有g。一般默认情况是&#xff0c;只安装了gcc 3…

爬一爬那些年你硬盘存过的“老师”

作者 | PayneLi 转载自Python全家桶&#xff08;ID: python-0321&#xff09; 人工智能的现状及今后发展趋势如何&#xff1f; https://edu.csdn.net/topic/ai30?utm_sourcecsdn_bw 最近在Github发现一个基于google浏览器的爬虫项目&#xff0c;此项目是由美国大神2018年开源…

python 打印调用栈

import tracebackdef BBQ():traceback.print_stack() 引入 traceback 包&#xff0c;在某个函数中执行 traceback.print_stack()。 转载于:https://www.cnblogs.com/yourstars/p/8448471.html

(转)修改ETM,用Ogre实现《天龙八部》地形与部分场景详解(附源码)

本文主要讲的是《天龙八部》游戏的地形和一部分场景的具体实现&#xff0c;使用C, Ogre1.6&#xff0c;我摸索了段时间&#xff0c;可能方法用的并不是最好的&#xff0c;但好歹实现了。文章可能讲得有点罗嗦&#xff0c;很多简单的东西都讲了。我是修改了ETM&#xff08;Edita…

【Qt】错误GL/gl.h: No such file or directory的解决方法(以及cannot find -lGL解决方法)

1、问题描述 QtCreator第一次编译时&#xff0c;报错GL/gl.h: No such file or directory 错误信息如下&#xff1a; /home/Qt5.6.3/5.6.3/gcc_64/include/QtGui/qopengl.h:136: error: GL/gl.h: No such file or directory include <GL/gl.h> ^2、原因分析 说明系统里…

java并发之同步辅助类CyclicBarrier和CountDownLatch

CyclicBarrier 的字面意思是可循环使用&#xff08;Cyclic&#xff09;的屏障&#xff08;Barrier&#xff09;。它要做的事情是&#xff0c;让一组线程到达一个屏障&#xff08;也可以叫同步点&#xff09;时被阻塞&#xff0c;直到最后一个线程到达屏障时&#xff0c;屏障才会…

投稿近2000,NAACL 2019接收率仅为22.6%|附录取论文名单

整理 | 若名 出品 | AI科技大本营&#xff08;ID:rgznai100&#xff09; 人工智能的现状及今后发展趋势如何&#xff1f; https://edu.csdn.net/topic/ai30?utm_sourcecsdn_bw 最近真是学术界公布论文产出结果的火热时期&#xff0c;距离计算机视觉领域的顶级盛会 CVPR 2019…

Lucene.Net无障碍学习和使用:索引篇

项目中可能需要再次用到Lucene.Net&#xff0c;利用空闲时间写了个demo&#xff0c;主要涉及到索引的创建、删除、更新和一个简单查询。在本文示例中&#xff0c;Lucene.Net的版本是2.4.0&#xff0c;某些类和方法与最新版本或者较旧的版本有较多不同&#xff0c;希望您阅读顺利…

【ubuntu】vim中鼠标选中时变成 可视模式,不能复制的解决方法

1、问题描述 配置好vim后&#xff0c;打开一个文件&#xff0c;鼠标选中文本时&#xff0c;选中的内容变成可视模式。 可视模式&#xff0c;不能将选中内容复制到剪切板 2、解决方法 在用户根目录下&#xff0c;打开 .vimrc ~$ vi .vimrc 将set mousea 删除或注释掉 3、我的…

Ruby11 拾遗

Agenda LoopExpressionFile Read/WriteDebugProcess & ThreadLoop while a 10 while a > 0puts aa - 1 enduntil a 100until a 0puts aa - 1 endloop a 10loop dobreak if a < 0puts aa - 1 end循环控制 breaknextbreak for x in 1..10break if x 5puts x endne…

CVPR2019|微软、中科大开源基于深度高分辨表示学习的姿态估计算法

作者 | 周强&#xff08;CV君&#xff09; 来源 | 我爱计算机视觉&#xff08;公众号id&#xff1a;aicvml&#xff09; 如何挑战百万年薪的人工智能&#xff01; https://edu.csdn.net/topic/ai30?utm_sourcecsdn_bw 昨天arXiv出现了好几篇被CVPR 2019接收的论文。 其中来自…

本地连接受限制或无法连接怎么办?

一个非常常见的问题&#xff0c;就是我们家用电脑安装宽带后&#xff0c;任务栏上的“本地连接”图标有一个的叹号。查看状态&#xff1a;“受限制或无连接”&#xff0c;点“修复”却无法修复&#xff0c;显示无法获取IP地址&#xff0c;获得私网地址&#xff01;但ADSL又可以…

糟心!苹果无人车裁员190人,程序员却首当其冲

整理 | 琥珀 出品 | AI科技大本营&#xff08;ID:rgznai100&#xff09; 60s测试&#xff1a;你是否适合转型人工智能&#xff1f; https://edu.csdn.net/topic/ai30?utm_sourcecxrs_bw 仅过了一个多月&#xff0c;苹果公司自动驾驶部门裁员的真相便已浮出水面&#xff0c;该…

【工具】Internet Download Manager( IDM )抓取站点

软件说明&#xff1a; 扒网站的好东西&#xff0c;免费&#xff0c;支持中文 下载地址&#xff08;官网&#xff09; http://www.internetdownloadmanager.com/ 使用方法 https://jingyan.baidu.com/article/a681b0de0be10b3b1943465d.html

2010-12-30

I want to invest in stocks to make a quick buck.我想炒股&#xff0c;尽快赚点儿钱花&#xff11;、Which company are you going to invest in?那你打算买什么股&#xff1f;&#xff12;、Dont put all your eggs in one basket造成不要孤注一掷。&#xff13;、You need…

PFLD:简单、快速、超高精度人脸特征点检测算法

作者 | 周强&#xff08;CV君&#xff09; 来源 | 我爱计算机视觉&#xff08;公众号id&#xff1a;aicvml&#xff09; 60s测试&#xff1a;你是否适合转型人工智能&#xff1f; https://edu.csdn.net/topic/ai30?utm_sourcecxrs_bw 什么样的算法才是好算法&#xff1f; 真…

Cookie实现记住密码、自动登录

前端代码 <form id"form" action"xxx" method"post"><div><input type"text" name"account" id"account" placeholder"账号"><input type"text" name"pwd" i…

【Ubuntu】VirtualBox+ubuntu中显示摄像头

1、下载插件 https://www.virtualbox.org/wiki/Downloads 2、修改下载的插件的后缀 将后缀名改为vbox-extpack 如下载的插件为Oracle_VM_VirtualBox_Extension_Pack-5.2.14.txt&#xff0c;改为Oracle_VM_VirtualBox_Extension_Pack-5.2.14.vbox-extpack 3、安装插件 点击…

以SIGSEGV为例详解信号处理(与栈回溯)

以SIGSEGV为例详解信号处理(与栈回溯) 信号是内核提供的向用户态进程发送信息的机制, 常见的有使用SIGUSR1唤醒用户进程执行子程序或发生段错误时使用SIGSEGV保存用户错误现场. 本文以SIGSEGV为例, 详细分析信号使用方法, 内核信号的发送与接收机制. 1. 信号处理例程 以下是一…

十个jQuery图片画廊插件推荐

2019独角兽企业重金招聘Python工程师标准>>> jQuery的画廊插件可以将分组图像和多媒体资料转成类似Flash的图像或照片。当幻灯片已经成为网站的重要组成部分&#xff0c;jQuery的重要性不能被忽视。下面为你介绍了10个最有美感&#xff0c;创新性和创造性的jQuery图…