当前位置: 首页 > 编程日记 > 正文

[译] RabbitMQ tutorials (3) ---- 'Pub/Sub' (Javascript)

发布与订阅 (Publish/Subscribe)

在之前的章节中,我们创建了工作队列,之前的工作队列的假设是每个任务只被分发到一个worker。在这一节中,我们会做一些完全不一样的事--把一条消息发送给多个消费者,这个模式叫做“发布/订阅”(publish/subscribe)。

举个例子,我们要构建一个简易的日志系统。由两个程序组成---一个来发出日志消息,另一个接收并把消息显示出来。

在我们的日志系统当中,每一个正在运行的接收程序都会收到消息。这样,我们可以运行一个receiver并把log定向到磁盘,然后再跑一个receiver,看看它是否会在屏幕上显示日志。

事实上,被发布的消息会被广播到所有的receiver那里。

交换器(Exchanges)

在之前的引导中,我们从一个队列中做了收发的操作。是时候介绍在Rabbit中的全部的消息模型了。

让我们先快速地回顾一下之前学习的,

  • producer 是一个发送消息的应用

  • queue 是一个存储消息的buffer

  • consumer 是一个接收消息的应用

RabbitMQ中,消息模型的核心思想是生产者绝不会把消息直接发到队列。实际上,生产者通常不知道一条消息是否已经被发送到任意一个队列中。

生产者只能把消息发到交换器。交换器是个简单的东西。一方面接收从生产者那边来的消息,另一方面把他们push到队列中。交换器一定要知道当它们接收到消息之后要如何处理。是否要追加到一个特殊的队列?是否要追加到许多的队列?或者丢掉这条消息?这些规则被定义为交换类型。

exchanges

以下是可以使用的交换类型:direct, topic, header, fanout。我们介绍一下最后一个--fanout。让我们先创建一个fanout类型的交换器“logs”:

ch.assertExchange('logs', 'fanout', {durable: false})

fanout类型的交换器非常简单,我们可以从单单从名字上猜测,它就是把它接收到的消息广播给所有已知的队列。这也就是我们的logger所需要的。


列出所有的交换器(Listing exchanges)
你可以使用rabbitmqctl

$sudo rabbitmqctl list_exchanges
Listing exchanges ...
direct
amq.direct      direct
amq.fanout      fanout
amq.headers     headers
amq.match       headers
amq.rabbitmq.log        topic
amq.rabbitmq.trace      topic
amq.topic       topic
logs    fanout
...done.

在列表中,一些amq.*的交换器和一些默认的(未命名的),都是被默认创建的,但是可能是你用不到的

未命名的交换器(Nameless exchange)
在之前的章节中我们未提过交换器,但是我们仍然能够把消息传到队列中,这就是我们使用了默认的交换器,因为我们使用了空的字符串("")。
之前我们是这样发布一条消息的
ch.sendToQueue('hello', new Buffer('Hello World!'));
这里我们使用默认的或者未命名的交换器,如果第一个参数存在的话,消息会被路由到这个参数名的队列。


现在,我们可以使用我们定义好的交换器

ch.publish('logs', '', new Buffer('Hello World!'));

第二个参数为空的话代表我们不想把消息推到指定的队列,只是想发布到logs的交换器中。

临时队列 (Temporary queues)

你还记得我们之前用的声明过的队列(hello 和 task_queue)吗?。能够指明一个队列的名字对我们来说是重要的--我们需要把workers指到相同的队列。
当你想要分享给消费者和生产者队列的时候,给队列起一个名字很重要。

但着不是我们logger这个程序需要的,我们想监听所有的log消息,不是一部分log消息。同样的,我们对正在流动的消息也感兴趣(not in the old ones).我们需要完成两件事情:
第一,不管我们什么时候连接Rabbit,都需要一个新的,空的队列。我们可以创建一个随机的队列名字,或者让服务器为我们随机选择一个队列名字。
第二,不管我们什么时候断开与消费者的连接,队列需要自动销毁。

amqp.node的客户端中,当我们传入字符串的时候,可以创建一个带有名字的未持久化的队列

ch.assertQueue('', {exclusive: true});

这个方法返回一个带有随机名字的队列实例,比如amq.gen-JzTY20BRgKO-HjmUJj0wLg
当连接被断开的时候,这个队列会被销毁,因为我们在声明的时候{exclusive:true}

绑定 (Bindings)

binding

我们已经创建了一个fanout类型的交换器和一个队列,现在我们需要告诉交换器把消息发送给队列,队列与交换器之间的关系我们称之为绑定。
ch.bindQueue(queue_name, 'logs', '');
现在开始,logs的交换器为追加消息到我们的队列

Listing bindings:

你可以列出已经存在的绑定关系,你应该猜到。rabbitmqctl list_bindings

整合(Putting it all together)

all

生产者的程序,用来发出log消息,和之前章节没有太多的不同,最重要的改变就是现在我们是把消息发布到我们的logs的交换器中,而不是之前的在未声明的情况下使用。发送的时候我们需要提供一个路由键,但是在fanout类型当中,这个可以忽略。下面是emit_log.js的代码

#!/usr/bin/env nodevar amqp = require('amqplib/callback_api');amqp.connect('amqp://localhost', function(err, conn) {conn.createChannel(function(err, ch) {var ex = 'logs';var msg = process.argv.slice(2).join(' ') || 'Hello World!';ch.assertExchange(ex, 'fanout', {durable: false});ch.publish(ex, '', new Buffer(msg));console.log(" [x] Sent %s", msg);});setTimeout(function() { conn.close(); process.exit(0) }, 500);
});

(emit_log.js 源码)

正如你所见,在与交换器建立连接之后。有一点很关键,向不存在的交换器发布消息是被禁止的。
如果仍然没有队列绑定交换器,消息会丢失。但是对我们来说还好,如果仍然没有消费者监听,我们可以安全地丢弃这些消息。

receive_logs.js的代码

#!/usr/bin/env nodevar amqp = require('amqplib/callback_api');amqp.connect('amqp://localhost', function(err, conn) {conn.createChannel(function(err, ch) {var ex = 'logs';ch.assertExchange(ex, 'fanout', {durable: false});ch.assertQueue('', {exclusive: true}, function(err, q) {console.log(" [*] Waiting for messages in %s. To exit press CTRL+C", q.queue);ch.bindQueue(q.queue, ex, '');ch.consume(q.queue, function(msg) {console.log(" [x] %s", msg.content.toString());}, {noAck: true});});});
});

(receive_logs,js源码)

如果你想要保存log,你可以打开控制台输入

$ ./receive_logs.js > logs_from_rabbit.log

如果你想在屏幕上看到log,再打开一个控制台

$ ./receive_logs.js

当然,需要发出logs

$ ./emit_log.js

使用rabbitmqctl list_bindings,你可以确定刚才的代码确实创建了交换器和队列,有两个receive_logs.js的程序在运行。

$ sudo rabbitmqctl list_bindings
Listing bindings ...
logs    exchange        amq.gen-JzTY20BRgKO-HjmUJj0wLg  queue           []
logs    exchange        amq.gen-vso0PVvyiRIL2WoV3i48Yg  queue           []
...done.

这个结果的简要解释:数据从logs交换器到两个服务器分配的队列。这也是我们想要的结果。

要如何监听一部分的消息?让我们移到下一章。

相关文章:

如何选取合适的前端动效方案?

一、原因 前端动画场景需求多对众多动画场景的技术实现方案选择上比较模糊 各动画方案的优劣及适用场景认识模糊现有动画库太多,不知道选哪个 主流动画库的适用场景认识模糊二、分类 动画用途 展示型的动画,类似于一张GIF图,或者一段视频交互…

【C++】algorithm具体操作记录

find寻找特定元素位置 int main(char argc, int* argv[]) {vector<int> intVec { 0,1,1,1,1,2,3,4,5,6,7,8,9 };if (pos ! intVec.end())cout << "The value 5 exists,and its position is " <<distance(intVec.begin(), pos) 1 << endl;…

图像 DFT 尺寸转换

const int nRows srcImage.rows; const int nCols srcImage.cols; std::cout << "srcImage row:" << nRows << std::endl; std::cout << "srcImage col:" << nCols << std::endl; // 获取DFT尺寸 int cRo…

[python]目录及文件操作

Python OS模块和shutil模块 获取路径# 获取当前路径 pwd os.getcwd()# 获取上级路径 a_pwd os.path.abspath(os.path.dirname(os.getcwd())) a_pwd os.path.abspath(os.path.join(os.getcwd(), ..))# 获取上上级路径 aa_pwd os.path.abspath(os.path.join(os.getcwd(), ../…

【C++】关联容器学习记录

STL六大组件关系 Containe通过Allocator取得数据存储空间&#xff0c;Algorithm通过Iterator存取Container&#xff0c;Functor内容可以协助Algorithm完成不同的策略变化&#xff0c;Adaptor可以修饰或者套接Functor。 关联容器特性 1. 关联容器定义 顺序容器支持高效的关键…

图像 DFT 变换

// 通道组建立&#xff0c;cv::Mat groupMats[] {cv::Mat_<float>(sizeConvMat),cv::Mat::zeros(sizeConvMat.size(), CV_32F)};cv::Mat mergeMat;// 通道合并merge(groupMats,2,mergeMat);// DFT变换dft(mergeMat, mergeMat);// 分离通道split(mergeMat, groupMats);/…

MySQL innodb_autoinc_lock_mode 详解

innodb_autoinc_lock_mode这个参数控制着在向有auto_increment 列的表插入数据时&#xff0c;相关锁的行为&#xff1b; 通过对它的设置可以达到性能与安全(主从的数据一致性)的平衡 【0】我们先对insert做一下分类 首先insert大致上可以分成三类&#xff1a;     1、simpl…

小程序代理加盟实现月入1800到50K

他出身寒门&#xff0c;没钱没资源&#xff0c;搬过砖利用身边的健身达人赚过钱&#xff0c;毕业了院长看他很努力还安排过维修多媒体的工作&#xff0c;拒绝院长好意的他&#xff0c;月收入从1800到1万&#xff0c;再到赚到人生首个5万。 只用了一年&#xff0c;他到底是凭什么…

CCF201503-4 网络延时(100分)

试题编号&#xff1a; 201503-4 试题名称&#xff1a; 网络延时 时间限制&#xff1a; 1.0s 内存限制&#xff1a; 256.0MB 问题描述&#xff1a; 问题描述给定一个公司的网络&#xff0c;由n台交换机和m台终端电脑组成&#xff0c;交换机与交换机、交换机与电脑之间使用网络连…

【Smart_Point】动态内存与智能指针

动态内存 动态内存使用的三种原因 程序不知道自己需要多少对象程序不知道所需对象的准确类型程序需要在多个对线之间共享数据 文章目录动态内存动态内存使用的三种原因实例1&#xff1a; Exercise 12.2:Write your own version of the StrBlob class including the const ver…

JS中的null和undefined,undefined为啥用void 0代替?

起因 某天,在看某位同学的js代码,代码中发现了一个奇怪的东西 void 0,虽然第一眼看不懂这是什么东西,但是根据上下文,这里应该是想判断是否等于undefined,为什么要这样写的,有什么渊源吗?顺便就把undefined和null都拿出来复习了一下. 介绍 undefined和null是js中类型七种数据类…

【Smart_Point】动态内存与智能指针实战:文本查询程序(设计set,map,智能指针的应用)

文章目录Cpp读入结构性数组文本查询程序文本查询程序本版1Cpp读入结构性数组 #include<sstream> #include<iostream> #include<string>std::vector<cv::Point2f> point_calibartion_position;std::string filename "C:/Users/Administrator/Des…

我眼中的DevOps(转)

过去一年以来&#xff0c;一批来自欧美的、不墨守陈规的系统管理员和开发人员一直在谈论一个新概念&#xff1a;DevOps。DevOps 就是开发&#xff08;Development&#xff09;和运维&#xff08;Operations&#xff09;这两个领域的合并。&#xff08;如果没错的话&#xff0c;…

【阿圆实验】Consul HA 高可用方案

一、建立Consul Cluster环境 利用Consul提供的服务实现服务的注册与发现&#xff0c;需要建立Consul Cluster。在Consul方案中&#xff0c;每个提供服务的节点上都要部署和运行Consul的agent&#xff0c;所有运行Consul agent节点的集合构成Consul Cluster。Consul agent有两种…

【C++】拷贝,赋值与构造

拷贝&#xff0c;赋值与构造 文章目录拷贝&#xff0c;赋值与构造1. 拷贝构造函数/合成拷贝构造函数&#xff08;copy constructor&#xff09;2. 拷贝赋值运算符3. 析构函数1. 拷贝构造函数/合成拷贝构造函数&#xff08;copy constructor&#xff09; 1.1 定义&#xff1a;复…

Java 内存查看与分析

2019独角兽企业重金招聘Python工程师标准>>> 1&#xff1a;gc日志输出 在jvm启动参数中加入 -XX:PrintGC -XX:PrintGCDetails -XX:PrintGCTimestamps -XX:PrintGCApplicationStopedTime&#xff0c;jvm将会按照这些参数顺序输出gc概要信息&#xff0c;详细信息&…

玩转Vuejs--核心原理

一、摘要&#xff1a; Vuejs是一款前端MVVM框架&#xff0c;利用Vuejs、webpack以及周边一系列生态工具我们可以快速的构建起一个前端应用&#xff0c;网上对于Vue的分析大都是基于各个模块&#xff0c;理解起来不够顺畅&#xff0c;本文将从整个执行过程出发&#xff0c;讲一下…

【C++】拷贝控制与资源管理

1. 拷贝控制与资源管理 管理类外资源的类必须定义拷贝控制成员。如P447中所见&#xff0c;这种类需要通过析构函数来释放对象所分配的资源。一旦一个类需要析构函数&#xff0c;那么几乎可确定它也需要一个拷贝构造函数和一个拷贝赋值函数。 明确拷贝语义&#xff1a;可以定义…

leangoo V5.4.2版上线

本次更新增加了“卡片编辑面板内显示成员、截止日期、工作量”的功能。同时我们也进行了大量的功能优化&#xff0c;以下是此次更新详情&#xff1a; 1. 新增“卡片编辑面板内显示成员、截止日期、工作量”功能 本次更新后 &#xff0c;您在卡片编辑面板内添加成员&#xff0c;…

差分边缘检测实现

#include <opencv2/core/core.hpp> #include <opencv2/highgui/highgui.hpp> #include <opencv2/opencv.hpp> using namespace cv; // 图像差分操作 void diffOperation(const cv::Mat srcImage, cv::Mat& edgeXImage,cv::Mat& edgeYImage) {cv::Mat…

2.1:CGPROGRAM

文章著作权归作者所有。转载请联系作者&#xff0c;并在文中注明出处&#xff0c;给出原文链接。 本系列原更新于作者的github博客&#xff0c;这里给出链接。 前言 经过前面两个章节的铺垫&#xff0c;我们对渲染以及Unity Shaderlab相关的知识已经有了大概的认识&#xff0c;…

【OpenCV】OpenCV中积分图函数与应用

OpenCV中积分图函数与应用 参考资料 opencv 查找integral&#xff0c;目前网上大部分的资料来自于opencv https://docs.opencv.org/master/d7/d1b/group__imgproc__misc.html#gadeaf38d7701d7ad371278d663c50c77dhttps://blog.csdn.net/jia20003/article/details/52710751ht…

django学习笔记【003】创建第一个带有model的app

【1】python应用程序要连接mysql有多个驱动程序可供选择&#xff1a; 1、MySQLdb 这个只支持python2.x 所以在这里就不说了&#xff1b; 2、mysqlclient 下载地址   https://pypi.python.org/pypi/mysqlclient/1.3.9 3、MySQL Connector/python 这个是mysql官方主推的mysql驱…

图像非极大值抑制 Sobel 边缘实现

bool SobelVerEdge(cv::Mat srcImage, cv::Mat& resultImage) {CV_Assert(srcImage.channels() 1);srcImage.convertTo(srcImage, CV_32FC1);// 水平方向的 Sobel 算子cv::Mat sobelx (cv::Mat_<float>(3,3) << -0.125, 0, 0.125,-0.25, 0, 0.25,-0.125, 0, …

第四次作业 (日期和jieba库的运用)

设计题1&#xff1a; 设计一个本月份日历&#xff0c;输出格式如下&#xff1a; 要求&#xff1a; 1.初始化start_day&#xff0c;end_day两个日期 from datetime import datetime start_daydatetime(2019,4,1) end_daydatetime(2019,4,30) 其它时间数据生成要用datetime或date…

【C++】LINK类型错误分析记录

LINK类型错误 情况1&#xff1a; 根据生成路径&#xff0c;查找是否成功生成静态库/动态库&#xff0c;一般在./bin文件中。 情况2&#xff1a; 是否在CMakeLists中target_link_libraries中添加链接静态库操作 情况3&#xff1a; 是都存在类模板&#xff0c;需要实例化&a…

eBay宣布发布全新的购买和销售APIs

eBay最近宣布发布两款全新的购买和销售APIs。这些APIs旨在促进eBay产品在第三方应用程序中的更好集成。eBay于10月19日在他们的博客上发表了几篇文章&#xff0c;不仅详细介绍了这些全新的购买和销售APIs提供的功能&#xff0c;而且还详细地总结了他们公司从SOAP&#xff08;简…

Sobel 边缘实现

#include <opencv2/core/core.hpp> #include <opencv2/highgui/highgui.hpp> #include "opencv2/imgproc/imgproc.hpp" #include <iostream> using namespace cv; // 非极大值抑制实现sobel竖直细化边缘 bool SobelVerEdge(cv::Mat srcImage, cv::…

vue下实现textarea类似密码框的功能之探索input输入框keyup,keydown,input事件的触发顺序...

项目中引入element的input框组件&#xff0c;触发事件必须要加上.native <el-input placeholder"请输入" type"textarea" v-model"valueText" keyup.native"keyUp(valueText,$event)" keydown.native"keyDown($event)" …

【C++】动态内存管理/move/以及移动构造与移动赋值运算符

文章目录1 .对象移动与右值引用 实际应用过程中遇到的问题及其解决方案c中临时变量不能作为非const的引用参数2. 动态内存管理类3. 对象移动与右值引用4. 移动构造与移动复制运算符1 .对象移动与右值引用 实际应用过程中遇到的问题及其解决方案 问题描述&#xff1a; bool Cr…