深入详解高性能消息队列中间件 RabbitMQ
目录
在进行系统设计的时候,各个模块、服务器之间为了实现数据的交互,通常是建立连接通过发送消息来进行。如果将他们一一建立连接,就会出现链路太多,每一条链路都必须感知对端等问题。此场景下消息将非常混乱,后期维护也将非常痛苦。为了解决这个问题,精简系统,引入RabbitMq。各相关模块不在相互发送消息,而将消息都发送给RabbitMQ,由RabbitMQ负责将消息传递出去。
那么,什么是RabbitMQ?RabbitMQ又是如何实现这些功能的呢?
2、什么是 RabbitMQ ?
在讲RabbitMQ之前,需要先了解一下AMQP的概念。
AMQP,即Advanced Message Queuing Protocol(高级消息队列协议),是一个提供统一消息服务的应用层标准高级消息队列协议。AMQP是应用层协议的一个开放标准,为面向消息的中间件设计,基于此协议的客户端与消息中间件传递消息,不受客户端/中间件不同产品、不同开发语言等条件的限制。该协议是一种二进制协议,提供客户端应用于消息中间件之间异步、安全、高效的交互。相对于我们常见的REST API,AMQP更容易实现,可以降低开销,同时灵活性高,可以轻松的添加负载平衡和高可用性的功能,并保证消息传递,在性能上AMQP协议也相对更好一些。
RabbitMQ是AMQP的一个开源实现,服务器端用Erlang语言编写,用于在分布式系统中存储转发消息,支持多种客户端,如:Python、Ruby、.NET、Java、JMS、C、PHP、 ActionScript、XMPP、STOMP等,支持AJAX。 MQ(Messages Queue)是一种应用程序与应用程的通信方法。RabbitMQ相当于生产者与消费者的模式,消息发送端(生产者)将消息写入消息队列,消息接收端(消费者)从消息队列中取出消息、消费消息;而消息的发送端无需知道消息接受端的存在,反之亦然。
3、RabbitMQ 优势
RabbitMQ主要有以下几个优势:
- 可靠性(Reliablity):使用了一些机制来保证可靠性,比如持久化、传输确认、发布确认。
- 灵活的路由(Flexible Routing):在消息进入队列之前,通过Exchange来路由消息。对于典型的路由功能,Rabbit已经提供了一些内置的Exchange来实现。针对更复杂的路由功能,可以将多个Exchange绑定在一起,也通过插件机制实现自己的Exchange。
- 消息集群(Clustering):多个RabbitMQ服务器可以组成一个集群,形成一个逻辑Broker。
- 高可用(Highly Avaliable Queues):队列可以在集群中的机器上进行镜像,使得在部分节点出问题的情况下队列仍然可用。
- 多种协议(Multi-protocol):支持多种消息队列协议,如STOMP、MQTT等。
- 多种语言客户端(Many Clients):几乎支持所有常用语言,比如Java、.NET、Ruby等。
- 管理界面(Management UI):提供了易用的用户界面,使得用户可以监控和管理消息Broker的许多方面。
- 跟踪机制(Tracing):如果消息异常,RabbitMQ提供了消息的跟踪机制,使用者可以找出发生了什么。
- 插件机制(Plugin System):提供了许多插件,来从多方面进行扩展,也可以编辑自己的插件。
4、RabbitMQ整体架构剖析
在详细介绍RabbitMQ之前,先介绍几个重要的概念:
- Queue:消息队列
- Exchange:交换机,它会按照路由规则来投递消息
- Routing key:路由关键字,exchange会根据它来进行消息投递
- Bind:绑定了queue和exchange,根据路由规则将消息会投递到对应的消息队列中去。
- Producer:消息生产者
- Consumer:消息的消费者
RabbitMQ的整体架构图如下所示:
P(Producer,消息生产者)负责发送,C(Consumer,消息消费者)负责消费消息。其中交换机exchange、队列Queue的定义、exchange与Queue的绑定既可以放在发送端,也可以放在消费端,但是不管放在何处定义,要在使用前定义,否则会出错。本文统一将exchange放在生产者端来定义,而将queue的定义,queue与exchange的绑定放在消费端来处理。另外,为了防止第一次使用exchange是在消费端,可以在消费端也同时定义exchange。本文不考虑这种情况,默认在消费端使用exchange的时候已定义过。
4.1、发送消息流程
P端发送消息的基本过程是:
1)连接服务器;
2)声明exchange,并设置其相关属性;
3)将消息发送到exchange。
其中,exchange有3种类型:fanout、routing、topic:
1)fanout不处理路由键,为空即可,只要简单的将队列绑定到交换机上,那么发送到交换机上的消息都会被转发到与该交换机绑定的所有队列上。
2)Routing处理路由键,需要将一个队列绑定到交换机上,要求消息与一个特定的路由键完全匹配。
3)Topic将路由键与某模式进行匹配,此时队列需要绑定到一个模式上。匹配的规格是”#”匹配一个或多个词,”*”匹配一个词。
4.2、消费消息流程
C消费消息的基本过程是:
1)连接服务器;
2)声明队列queue及其属性(持久化、无消费者时是否自动删除队列等等);
3)设置routingkey,并且通过routingkey将queue与exchange绑定到一起;
4)等待消息,消费消息。
其中,queue可以设置的属性有:Exclusive、auto_delete、durable。
1)Exclusive:排他队列,如果一个队列被声明为排他队列,该队列仅对首次声明它的连接可见,并在连接断开时自动删除。
2)Auto_delete:自动删除,如果该队列没有任何订阅的消费者的话,该队列会被自动删除。这种队列适用于临时队列。
3)Durable:服务器重启后,队列不会丢失。
对上述的exchange、queue、binding的一个例子:
Mq.queue_bind(“QueueTest”, “ExchangeTest”, “Test”)
这个绑定的意思是:任何发送到交换机ExchangeTest的具有路由键Test的消息都会被路由到名为QueueTest的队列中。
5、RabbitMQ 应用
一般平台的消息大致分为两种类型:notif和req-ack-notif。对应于rabbitmq正好有两种模型:publish/subscribe和rpc。下面根据实际应用来讲解这两个模型。
5.1、广播
假设应用服务器收到了一条消息A,需要广播给其他多个业务服务器。按照图一中rabbitmq的基本结构我们应该能想到两种方式:
Method1
Method2
上述两种方法哪一种能实现我们的目的?答案是Method1,如果采用Method2的话,queue会将消息依次分发给两个消费端,例如客户端C1收到消息1,3,5…,客户端C2收到消息2,4,6…。
虽然此种方法不能实现我们的目的,但在此处插入一点,及每条消息的处理量可能而且几乎肯定是不同的,所以有时会出现客户端C1处理完了N条消息,但客户端C2一条还没处理完,为了解决这个问题,rabbitmq提供了公平调度的概念即Fair dispatch:Rabbitmq不会在同一时间给工作者分配多个任务,只有在工作者完成任务之后,才会再次接收到任务。
回到刚才讨论的地方,我们已经确立了使用Method1来完成该功能,现在根据该方法进行一些简单的编码验证(注:验证语言为python)。publish/subscribe模型之P客户端代码如下:
import pika #建立连接 connection = pika.BlockingConnection(pika.ConnectionParameters('localhost')) channel = connection.channel() #声明交换机 channel.exchange_declare(exchange='exchangeTest', type='fanout') #发送消息 channel.basic_publish(exchange='exchangeTest', routing_key='', body='Hello World!') connection.close()
publish/subscribe模型之C客户端代码:
import pika #建立连接 connection = pika.BlockingConnection(pika.ConnectionParameters('localhost')) channel = connection.channel() #创建queue channel.queue_declare(queue=’QueueTest’) #绑定 channel.queue_bind(exchange=’exchangeTest’, queue=’QueueTest’) def callback(ch, method, properties, body): print “ [x] Received %r” %(body, ) channel.basic_consume(callback, queue =’QueueTest’, no_ack=True) channel.start_consuming()
AMQP支持在一个TCP连接上启用多个MQ通信channel,每个channel都可以被应用作为通信流, 被分配了一个整数标识,自动由Connection()类的.channel()方法维护。每个AMQP程序至少要有一个连接和一个channel。
5.2、RPC
对于大部分消息我们不仅仅是通知,更多的是需要对方在接收到消息后给我们回复的。此时,
我们就需要rabbitmq提供的RPC模型,如下图所示:
RPC模型与广播模型相比,最大的区别是消费者客户端在接收到消息的时候,需要给发送者P回复消息。而同样的,消息生产者P也不仅仅是做为发送端了,他还需要接收来自消费端C回复的消息。
由P到C我们知道直接将Queue1绑定到exchange上就OK了,那么C回复消息的时候通过什么回给P呢?为此,rabbitmq在P发送消息的时候,提供设置回调队列及关联ID,C在给P回复消息的时候,通过回调队列即可。提供关联ID的目的是即使P端收到Queue2的消息,也要验证Correlation_Id是否匹配,不匹配的话,直接忽略。
使用如下的代码进行验证(注:验证语言为python),RPC模型之P端的代码如下:
import pika class Center(object): def __init__(self): self.connection = pika.BlockingConnection(pika.ConnectionParameters( host='localhost')) self.channel = self.connection.channel() #定义接收返回消息的队列,此处为一随机生成的队列 result = self.channel.queue_declare(exclusive=True) self.callback_queue = result.method.queue #等待接收消息 self.channel.basic_consume(self.on_response, no_ack=True,queue=self.callback_queue) #定义接收到返回消息的处理方法 def on_response(self, ch, method, props, body): self.response = body def request(self, n): self.response = None #发送计算请求 self.channel.basic_publish(exchange='', routing_key='compute_queue', properties=pika.BasicProperties (reply_to = self.callback_queue,), body=str(n)) #接收返回的数据 while self.response is None: self.connection.process_data_events() return int(self.response) center = Center() response = center.request(30) print " [.] Got %r" % (response,)
RPC模型之C端代码:
import pika class Center(object): def __init__(self): self.connection = pika.BlockingConnection(pika.ConnectionParameters( host='localhost')) self.channel = self.connection.channel() #定义接收返回消息的队列,此处为一随机生成的队列 result = self.channel.queue_declare(exclusive=True) self.callback_queue = result.method.queue #等待接收消息 self.channel.basic_consume(self.on_response, no_ack=True,queue=self.callback_queue) #定义接收到返回消息的处理方法 def on_response(self, ch, method, props, body): self.response = body def request(self, n): self.response = None #发送计算请求 self.channel.basic_publish(exchange='', routing_key='compute_queue', properties=pika.BasicProperties (reply_to = self.callback_queue,),body=str(n)) #接收返回的数据 while self.response is None: self.connection.process_data_events() return int(self.response) center = Center() response = center.request(30) print " [.] Got %r" % (response,)
相关文章:

ModuleNotFoundError: No module named ‘config‘
当你在Python代码中看到类似"ModuleNotFoundError: No module named 'config'"的错误消息时,意味着你尝试导入一个名为'config'的模块,但Python无法找到该模块

【云原生基础】了解云原生,什么是云原生?
云原生(Cloud-Native)是一种软件开发和部署方法论,旨在利用云计算、容器化、微服务架构和持续交付等现代技术和最佳实践,以构建、部署和运行可伸缩、高可用、弹性和易于管理的应用程序。云原生应用程序旨在充分发挥云计算的潜力,以更好地满足快速发展的数字业务需求。容器化:云原生应用程序通常使用容器技术(如Docker)进行封装,使应用程序及其依赖项可以在不同环境中一致运行。容器化提供了隔离、可重复部署和快速部署的好处。

【微服务】mysql + elasticsearch数据双写设计与实现
在很多电商网站中,对商品的搜索要求很高,主要体现在页面快速响应搜索结果。这就对服务端接口响应速度提出了很高的要求。

Python异常处理:try、except、else 和 finally 的使用指南
Python异常处理:try、except、else 和 finally 的使用指南

HarmonyOS 数据管理与应用数据持久化(二)
文章浏览阅读31次。关系型数据库基于 SQLite 组件,适用于存储包含复杂关系数据的场景,比如一个班级的学生信息,需要包括姓名、学号、各科成绩等,又或者公司的雇员信息,需要包括姓名、工号、职位等,由于数据之间有较强的对应关系,复杂程度比键值型数据更高,此时需要使用关系型数据库来持久化保存数据。

HarmonyOS数据管理与应用数据持久化(一)
文章浏览阅读599次。一.

Ubuntu系统下怎么安装Docker(linux安装docker教程)
更新系统软件包 在安装 Docker 前,首先需要更新系统软件包,确保系统上的软件都是最新的版本

python实战讲解之使用Python批量发送个性化邮件
通过上述Python脚本,我们可以批量发送个性化的邮件。我们首先设置发件人邮箱和密码,然后指定SMTP服务器和端口号。接下来,我们读取包含员工信息的Excel文件,并获取唯一的员工姓名列表和对应的邮箱地址。然后,我们遍历员工数据,并为每个员工创建邮件,附带相应的附件。最后,我们通过SMTP服务器发送邮件,并在发送完成后删除生成的员工数据文件。

什么是加密?浅谈MD5加密
文章浏览阅读108次。加密解密是一种通过使用密码算法对信息进行转换,以使其在传输或存储过程中变得不可读或难以理解,从而保护信息的安全性和隐私性的过程。加密是将明文(原始文本)转换为密文(加密后的文本)的过程。在加密过程中,使用密钥和特定的算法来改变原始文本的形式,使其在未经授权的情况下无法理解。只有持有正确密钥的人才解密是将密文转换回明文的过程。数字摘要算法(Digital Digest Algorithm)是一种加密算法,用于将任意长度的数据转换为固定长度的摘要(也称为哈希值)。

Python自动化处理Excel数据
需求描述:数据格式如下所示,需要分离出2023年7月1号之后的数据明细 数据核对与处理:从Excel文件中提取特定日期后的签收数据 1. 引言 在实际数据处理和分析过程中,经常会遇到需要从大量数据中提取出特定日期范围内的信息的需求。本文将介绍如何使用Python的pandas库来处理Excel文件,

Linux 环境下 安装 Elasticsearch 7.13.2
文章浏览阅读42次。借公司的 centos 7 服务器,搭建一个 Es,正好熟悉熟悉 Linux 下的安装流程。
基于 MySQL 多通道主主复制的机房容灾方案
文章中介绍了多种 MySQL 高可用技术,并介绍了根据自身需求选择多通道主主复制技术的过程和注意事项。

C#利用一段字节序列构建一个数组对象
《.NET中的数组在内存中如何布局? 》介绍了一个.NET下针对数组对象的内存布局。既然我们知道了内存布局,我们自然可以按照这个布局规则创建一段字节序列来表示一个数组对象,就像《以纯二进制的形式在内存中绘制一个对象》构建一个普通的对象,以及《你知道.NET的字符串在内存中是如何存储的吗?》构建一个字

【卷积神经网络】YOLO 算法原理
文章浏览阅读128次。在计算机视觉领域中,目标检测(Object Detection)是一个具有挑战性且重要的新兴研究方向。目标检测不仅要预测图片中是否包含待检测的目标,还需要在图片中指出它们的位置。2015年,Joseph Redmon, Santosh Divvala 等人提出第一个 YOLO 模型,该模型具有实时性高、支持多物体检测的特点,已成为目标检测领域热门的研究算法。本文主要介绍 YOLO 算法及其基本原理。

C#,数值计算——分类与推理Svmlinkernel的计算方法与源程序
文章浏览阅读27次。C#,数值计算——分类与推理Svmlinkernel的计算方法与源程序

MyBatis-Plus 实战教程四 idea插件
文章浏览阅读269次。在未引入分页插件的情况下,MybatisPlus是不支持分页功能的,IService和BaseMapper中的分页方法都无法正常起效。其中缺少的仅仅是分页条件,而分页条件不仅仅用户分页查询需要,以后其它业务也都有分页查询的需求。在刚才的代码中,从PageQuery到MybatisPlus的Page之间转换的过程还是比较麻烦的。这里用到了分页参数,Page,即可以支持分页参数,也可以支持排序参数。在查询出分页结果后,数据的非空校验,数据的vo转换都是模板代码,编写起来很麻烦。

测开不得不会的python之类(class)的定义和使用
文章浏览阅读147次。我们经常讲‘物以类聚,人以群分’,这里的类指相同属性的一类东西,而python中的类(class)也可以这么理解为相同属性的一些函数聚在一起,而通过__init__()初始化方法可以传入不同参数得到不同的实例对象,进而每个不同实例对象的一些属性值会不同。比如很多书籍经常讲到的例子Car类,他们属性可以有颜色,品牌,性价比等。

STM32F3系列 ADC采样单端采样模式(基于LL库)
STM32F3系列 ADC 单端采样(基于LL库) 芯片型号:STM32f303RBT6 开发软件:MDK5 & CubeMX & VS Code 目录 目录STM32F3系列 ADC 单端采样(基于LL库)目录引言1 基础知识1.1ADC转换基本流程1.2 时钟树1.3 关键参数1

【剑指offer|图解|双指针】移除元素 + 合并两个有序数组
文章浏览阅读186次,点赞23次,收藏17次。本文结合两道题目对顺表的考察方向有的初步了解,每个题都有详细的图文讲解,更多精彩内容等你来阅读哦!

鸿蒙ArkUI-X跨端应用开发,一套代码构建多平台应用
文章浏览阅读164次。ArkUI是一套构建分布式应用的声明式UI开发框架。它具备简洁自然的UI信息语法、丰富的UI组件、多维的状态管理,以及实时界面预览等相关能力,帮助您提升应用开发效率,并能在多种设备上实现生动而流畅的用户体验。详情可参考ArkUI框架介绍ArkUI-X进一步将ArkUI扩展到了多个OS平台:目前支持OpenHarmony、HarmonyOS、Android、 iOS,后续会逐步增加更多平台支持。开发者基于一套主代码,就可以构建支持多平台的精美、高性能应用。

【Java集合】了解集合的框架体系结构及常用实现类,从入门到精通!
前言 通过Java基础的学习,我们掌握了主要的Java语言基本的语法,同时了解学习了Java语言的核心-面向对象编程思想。 从集合框架开始,也就是进入了java这些基础知识及面向对象思想进入实际应用编码的过程,通过jdk中集合这部分代码的阅读学习,就能发现这一点。 本计划在这篇中把框架体系和一些集合

MySQL中的 增 删 查 改(CRUD)
文章浏览阅读1.8k次,点赞60次,收藏56次。insert into 表名 value(数据,数据),.......;可以单行,多行插入。为查询结果的列取别名select 表达式/列名as 别名 from 表名;去重:DISTINCTselect distinct 单列/多列 from 表名;会去除查询结果中的重复项(只保留一项)select 条件查询的执行顺序遍历表中的每个记录把当前记录的值带入条件,根据条件进行筛选如果这条记录满足条件,保留并进行列上的表达式的计算如果有 order by 会在所有行都被获取到之后(表

BETWEEN操作符:选取介于两个值之间的数据范围内的值
文章浏览阅读246次,点赞6次,收藏5次。BETWEEN操作符:选取介于两个值之间的数据范围内的值。这些值可以是数值、文本或者日期。

【Java 基础篇】Java网络编程:文件下载详解
文件下载是指从一个网络服务器或远程位置传输文件到本地计算机或设备的过程。这些文件可以是文本文件、图像、音频、视频或任何其他类型的数据文件。文件下载在Web浏览器、移动应用程序和桌面应用程序中都是常见的操作。在Java中,您可以使用各种网络协议(如HTTP、FTP、SFTP等)来执行文件下载操作。下面我们将以HTTP协议为例,介绍如何使用Java进行文件下载。本文介绍了如何使用Java进行文件下载,以及文件下载的一些进阶功能和注意事项。

【Java 基础篇】Java网络编程:下载进度监控实现详解
下载进度监控是一种用户界面元素或功能,用于显示文件下载的实时进度。通常以百分比的形式显示已下载的数据量与总数据量的比例,让用户知道下载的进展情况。实现下载进度监控的关键是获取已下载数据的大小并将其与总数据大小进行比较,然后将结果以可视化的方式呈现给用户。下载进度监控是提高文件下载体验的重要组成部分。通过合理设置Range头部请求,并在用户界面上显示下载进度,您可以实现有效的下载进度监控功能。此外,要注意异常处理和性能,以提供更好的用户体验。

【Java 基础篇】Java Lambda表达式详解
Lambda表达式,也称为闭包,是一种匿名函数,它可以传递到方法作为参数,并且可以在方法中使用。它是Java 8引入的一个新特性,用于简化代码的编写,特别是在使用函数式接口时。匿名性:Lambda表达式没有显式的名称,因此可以被当做一种匿名函数使用。简洁性:Lambda表达式可以大大减少代码的冗余,使代码更加简洁。传递性:Lambda表达式可以作为参数传递给方法,从而实现更灵活的代码组织。除了Java标准库中的函数式接口,您还可以定义自己的函数式接口,以适应特定的需求。// 结果为8。

【Java 基础篇】Java 接口组成与更新详解
在Java编程中,接口(interface)是一种非常重要的概念。它允许类定义一组抽象方法,这些方法可以在不同的类中实现。接口在Java中起到了重要的角色,被广泛应用于代码的组织和设计中。本文将详细解释Java接口的组成和最新的更新,包括默认方法、静态方法、私有方法以及接口的使用场景。

【MySQL从删库到跑路 | 基础第二篇】——谈谈SQL中的DML语句
详细介绍SQL中的DML语句(增加、修改、删除操作)。

【MySQL基础|第三篇】--- 详谈SQL中的DQL语句
详解MySQL中SQL的基础查询、条件查询、聚合函数、分组查询、排序查询、分页查询。

【MySQL函数篇】—— 字符串函数(超详细)
详细介绍MySQL函数中的字符串函数。