当前位置: 首页 > 编程日记 > 正文

柯南君:看大数据时代下的IT架构(5)消息队列之RabbitMQ--案例(Work Queues起航)...

二、Work Queues(using the Java Client) 走起

 	在第上一个教程中我们写程序从一个命名队列发送和接收消息。在这一次我们将创建一个工作队列,将用于分发耗时的任务在多个工作者(worker)之间。
背后的主要思想工作队列(又名:任务队列)是为了避免立即做一个资源密集型任务,不得不等待它完成。相反,我们安排的任务要做。我们封装任务作为消息并将其发送到一个队列。工作进程在后台运行将流行的任务和最终执行的工作。当您运行许多worker的任务将在他们之间共享。这个概念是特别有用的web应用程序中处理复杂的任务是不可能在一个短的HTTP请求窗口。  为了避免等待一些占用大量资源、时间的操作。当我们把任务(Task)当作消息发送到队列中,一个运行在后台的工作者(worker)进程就会取出任务然后处理。当你运行多个工作者(workers),任务就会在它们之间共享。 

三、Preparation(预备)

	在RabbitMQ系列教程中,在前一章柯南君:看大数据时代下的IT架构(4)消息队列之RabbitMQ--案例(Helloword起航)
  • 我们稍微改一下send.java 代码(我们上一章的事例中有源代码),允许任意从命令文件发送消息,这个程序将我们的工作计划放在任务队列中,现在让我先做个类NewTask.java
String message = getMessage(argv); channel.basicPublish("", "hello", null, message.getBytes()); System.out.println(" [x] Sent '" + message + "'");
  从命令行参数,得到一些帮助信息:
private static String getMessage(String[] strings){ if (strings.length < 1) return "Hello World!"; return joinStrings(strings, " "); } private static String joinStrings(String[] strings, String delimiter) { int length = strings.length; if (length == 0) return ""; StringBuilder words = new StringBuilder(strings[0]); for (int i = 1; i < length; i++) { words.append(delimiter).append(strings[i]); } return words.toString(); }
我们的老的Recv.java程序也需要一些变化:
  • 它需要假的第二个消息体的每一个点的工作。
  • 它将从队列中发布消息并执行任务,所以我们称之为Worker.java:

while (true) {QueueingConsumer.Delivery delivery = consumer.nextDelivery(); String message = new String(delivery.getBody()); System.out.println(" [x] Received '" + message + "'"); doWork(message); System.out.println(" [x] Done"); }

我们的假任务模拟执行时间

private static void doWork(String task) throws InterruptedException { for (char ch: task.toCharArray()) { if (ch == '.') Thread.sleep(1000); } }
编译它们 (工作目录中的jar文件):

$ javac -cp rabbitmq-client.jar NewTask.java Worker.java

四、Round-robin dispatching

	使用一个任务队列的优点之一是能够轻易平行的工作。如果我们建立一个积压的工作,我们可以添加更多的任务(worker),这样,很容易。
首先,让我们试着两个任务(worker)实例同时运行。他们都将从队列中获取消息,但究竟如何?让我们来看看。你需要三个主机开放。 两人将运行工计划。这些游戏机将我们两个消费者- C1和C2。
shell1$ java -cp .:commons-io-1.2.jar:commons-cli-1.1.jar:rabbitmq-client.jar
Worker[*] Waiting for messages. To exit press CTRL+C
shell2$ java -cp .:commons-io-1.2.jar:commons-cli-1.1.jar:rabbitmq-client.jar
Worker[*] Waiting for messages. To exit press CTRL+C

     在第三个我们将发布新的任务。一旦你开始了消费者可以发布几条消息:

shell3$ java -cp .:commons-io-1.2.jar:commons-cli-1.1.jar:rabbitmq-client.jar
NewTask First message.
shell3$ java -cp .:commons-io-1.2.jar:commons-cli-1.1.jar:rabbitmq-client.jar
NewTask Second message..
shell3$ java -cp .:commons-io-1.2.jar:commons-cli-1.1.jar:rabbitmq-client.jar
NewTask Third message...
shell3$ java -cp .:commons-io-1.2.jar:commons-cli-1.1.jar:rabbitmq-client.jar
NewTask Fourth message....
shell3$ java -cp .:commons-io-1.2.jar:commons-cli-1.1.jar:rabbitmq-client.jar
NewTask Fifth message.....
   让我们看看我们的workers都传输什么:
shell1$ java -cp .:commons-io-1.2.jar:commons-cli-1.1.jar:rabbitmq-client.jar
Worker[*] Waiting for messages. To exit press CTRL+C[x] Received 'First message.' [x] Received 'Third message...' [x] Received 'Fifth message.....'
java -cp .:commons-io-1.2.jar:commons-cli-1.1.jar:rabbitmq-client.jar
Worker[*] Waiting for messages. To exit press CTRL+C[x] Received 'Second message..' [x] Received 'Fourth message....'
默认情况下,RabbitMQ将每个消息发送到下一个消费者,在序列。平均每个消费者将获得相同数量的信息。这种方式称为循环分配消息。试试这三个或更多的worker。

五、Message acknowledgment

	做一个任务可能需要几秒钟。如果一个消费者开始漫长的任务而死,只有部分完成,你可能想知道到底发生什么事情了?
我们当前的代码情况下,一旦RabbitMQ向客户传递一个消息立即从内存中删除。在这种情况下,如果你kill(杀)了一个任务(worker), 我们将失去刚刚处理的消息。我们也会失去所有的消息被派往这个特殊的工人,但尚未处理。  但是我们不想失去任何任务。如果一个工作者(worker)死亡,我们想要交付的任务到另一个worker。 为了确保消息是从来都不会迷失的,RabbitMQ支持消息应答。发送ack(knowledgement)从消费者告诉RabbitMQ特定的消息已经收到, 处理和RabbitMQ是免费的,删除它。如果一个消费者停止没有发送ack,RabbitMQ会明白一个消息

QueueingConsumer consumer = new QueueingConsumer(channel); boolean autoAck = false; channel.basicConsume("hello", autoAck, consumer); while (true) { QueueingConsumer.Delivery delivery = consumer.nextDelivery(); //... channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); }

使用这段代码,我们可以确信,即使你杀了一个工作者(worker)使用CTRL + C处理消息时,没有将丢失。工作者(worker)死亡后不久所有未得到确认的消息将被发送。
  问题:
   消息响应
  • 我们不想丢失任何任务消息。如果一个工作者(worker)挂掉了,我们希望任务会重新发送给其他的工作者(worker)。
  • 为了防止消息丢失,RabbitMQ提供了消息[i]响应(acknowledgments)[/i]。消费者会通过一个ack(响应),告诉RabbitMQ已经收到并处理了某条消息,然后RabbitMQ就会释放并删除这条消息。
  • 如果消费者(consumer)挂掉了,没有发送响应,RabbitMQ就会认为消息没有被完全处理,然后重新发送给其他消费者(consumer)。这样,及时工作者(workers)偶尔的挂掉,也不会丢失消息。
  • 消息是没有超时这个概念的;当工作者与它断开连的时候,RabbitMQ会重新发送消息。这样在处理一个耗时非常长的消息任务的时候就不会出问题了。
  • 消息响应默认是开启的。之前的例子中我们可以使用no_ack=True标识把它关闭。是时候移除这个标识了,当工作者(worker)完成了任务,就发送一个响应。


六、Message durability

	我们已经学会了如何确保即使消费者死亡,任务也不会丢失。但是如果RabbitMQ服务器停止我们的任务仍将失去。
RabbitMQ退出或崩溃时它会忘记队列和消息,除非你告诉它不要。两件事必须确保消息不会丢失:我们需要两个队列和消息标记为耐用。
首先,我们需要确保RabbitMQ永远不会失去我们的队列。为了这样做,我们需要声明它经久耐用:  channel.queueDeclare("hello", durable, false, false, null); 尽管这个命令本身是正确的,它不会在我们目前的设置工作。这是因为我们已经定义了一个名为hello的队列不耐用。RabbitMQ不允许您重新定义现有队列具有不同参数并返回一个错误的任何程序,试图这样做。但有一个快速解决方案——让我们声明一个队列具有不同名称,例如task_queue:  channel.queueDeclare("task_queue", durable, false, false, null); 这queueDeclare改变需要应用于生产者和消费者的代码。在这一点上我们确信task_queue队列不会丢失,即使RabbitMQ重启。现在我们需要我们的消息标记为持久性——通过设置MessageProperties PERSISTENT_TEXT_PLAIN(实现BasicProperties)的价值。  channel.basicPublish("", "task_queue", MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());  问题: 消息标记为持久性并不能完全保证信息不会丢失。虽然它告诉RabbitMQ将消息保存到磁盘,还有一个短的时间窗口当RabbitMQ已经接受消息和尚未保存它。 RabbitMQ也不做fsync(2)为每个消息——它可能只是保存到缓存和不写入磁盘。持久性保证不强,但它是足够为我们简单的任务队列。 如果你需要一个更强的保证,那么你可以使用发布者证实。

boolean durable = true;

boolean durable = true;
import com.rabbitmq.client.MessageProperties;

Fair dispatch

	您可能已经注意到,调度仍然不会完全按照我们想要的工作。例如在两名工人(worker)的情况,当所有奇怪的消息是沉重的,甚至消息是光,一名工人将不停地忙,另一个几乎不做任何工作。RabbitMQ并不了解,仍将均匀调度信息。这仅仅是因为RabbitMQ分派消息进入队列的消息的时候。它不看看消费者未得到确认的消息的数量。只是盲目地分派每n个消息到n个消费者。
为了打败,我们可以使用basicQos prefetchCount = 1设置方法。这告诉RabbitMQ不给多个消息到一个工人。或者,换句话说,不要派遣工人的新消息,直到处理和承认了前一个。相反,它会分派到下一个工人,不是仍然很忙。
  channel.basicQos(prefetchCount);

int prefetchCount = 1;
问题:
如果所有的工人们正忙着,你的队列可以填满。你要留意,也许添加更多的工人,或有其他一些策略。


Putting it all together

	Final code of our NewTask.java class:
import java.io.IOException;
import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.Connection; import com.rabbitmq.client.Channel; import com.rabbitmq.client.MessageProperties; public class NewTask { private static final String TASK_QUEUE_NAME = "task_queue"; public static void main(String[] argv) throws java.io.IOException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null); String message = getMessage(argv); channel.basicPublish( "", TASK_QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes()); System.out.println(" [x] Sent '" + message + "'"); channel.close(); connection.close(); } //... } 

(NewTask.java source)

And our Worker.java:

import java.io.IOException;
import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.Connection; import com.rabbitmq.client.Channel; import com.rabbitmq.client.QueueingConsumer; public class Worker { private static final String TASK_QUEUE_NAME = "task_queue"; public static void main(String[] argv) throws java.io.IOException, java.lang.InterruptedException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null); System.out.println(" [*] Waiting for messages. To exit press CTRL+C"); channel.basicQos(1); QueueingConsumer consumer = new QueueingConsumer(channel); channel.basicConsume(TASK_QUEUE_NAME, false, consumer); while (true) { QueueingConsumer.Delivery delivery = consumer.nextDelivery(); String message = new String(delivery.getBody()); System.out.println(" [x] Received '" + message + "'"); doWork(message); System.out.println(" [x] Done" ); channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); } } //... }

相关文章:

图像分析用 OpenCV 与 Skimage,哪一个更好?

作者 | 小白来源 | 小白学视觉这两种算法在它们可以检测到的和不能检测到的方面都有其起伏。OpenCV 是用 C 在后端进行编程的&#xff0c;并作为一个机器学习包&#xff0c;来分析 Python 中的图像模式。Skimage 也称为 Scikit-Image &#xff0c;是一个机器学习软件包&#xf…

NetBeans配置Xdebug

这篇文章已经更新&#xff0c;看 Windows环境配置xdebug调试PHP Windows环境 或者 NetBeans配置Xdebug 远程调试PHP Linux环境nebeans配置xdebug可以方便我们逐步的查看程序的运行情况对我们调试程序是非常有利的下面我就来介绍下配置的过程。先要安装xdebug&#xff0c;可以参…

[译] Don’t call me, I’ll call you:使用 Redux-Saga 管理 React 应用中的异步 action (上)...

原文地址&#xff1a;Don’t call me, I’ll call you: Side effects management with Redux-Saga (Part 1)原文作者&#xff1a;David Dvora译文出自&#xff1a;掘金翻译计划本文永久链接&#xff1a;github.com/xitu/gold-m…译者&#xff1a;jonjia校对者&#xff1a;smile…

CentOS下安装NetBeans集成开发环境

下载NetBeans以netbeans-7.0beta2-ml-javaee-linux.sh为例#sh netbeans-7.0beta2-ml-javaee-linux.sh之后进入安装界面&#xff08;接下来和windows下几乎一样不在举例&#xff09; 前提是要安装了Java 主要不要在本地远程用SecureCRT输入命令啊&#xff0c;要在Linux下用终端输…

我的Android进阶之旅------Android嵌入图像InsetDrawable的用法

面试题&#xff1a;为一个充满整个屏幕的LinearLayout布局指定背景图&#xff0c;是否可以让背景图不充满屏幕&#xff1f;请用代码描述实现过程。 解决此题&#xff0c;可以使用嵌入(Inset)图像资源来指定图像&#xff0c;然后像使用普通图像资源一样使用嵌入图像资源。 语法如…

沉痛悼念游戏开发大神毛星云

惟愿所有的“爆料”都是造谣&#xff0c;惟愿我们能够一起去创造并让大家都能玩到蕴藏着中国上下五千年本土文化的优质游戏大作&#xff0c;惟愿我们能等到你的好消息......让人难过的是&#xff0c;据银柿财经报道&#xff0c;针对近日“网传腾讯天美员工离世”的消息&#xf…

April Fools Contest 2018

这个比赛不正经&#xff0c;但是我可以一本正经的写代码啊 A. Quirky Quantifierstime limit per test2 secondsmemory limit per test64 megabytesinputstandard inputoutputstandard outputInputThe input contains a single integer a (10 ≤ a ≤ 999). OutputOutput 0…

如何查找僵尸进程并Kill之,杀不掉的要查看父进程并杀之

用ps和grep命令寻找僵尸进程#ps -A -ostat,ppid,pid,cmd | grep -e ^[Zz]命令注解&#xff1a;-A 参数列出所有进程-o 自定义输出字段 我们设定显示字段为 stat&#xff08;状态&#xff09;, ppid&#xff08;进程父id&#xff09;, pid(进程id)&#xff0c;cmd&#xff08;命…

PHP计划任务:如何使用Linux的Crontab执行PHP脚本(转)

我们的PHP程序有时候需要定时执行&#xff0c;我们可以使用ignore_user_abort函数或是在页面放置js让用户帮我们实现。但这两种方法都不太可靠&#xff0c;不稳定。我们可以借助Linux的Crontab工具来稳定可靠地触发PHP执行任务。下面介绍Crontab的两种方法。一、在Crontab中使用…

OpenAI 开放 GPT-3 微调功能,让开发者笑开了花

出品 | AI科技大本营&#xff08;ID:rgznai100&#xff09; 近日&#xff0c;OpenAI宣布&#xff0c;允许用户创建自定义版的 GPT-3。 OpenAI 表示&#xff0c;开发人员可以使用微调来创建针对其应用程序和服务中的特定内容量身定制的 GPT-3 模型&#xff0c;从而在任务和工作…

PHP----------php封装的一些简单实用的方法汇总

1、xml转换成array&#xff0c;格式不对的xml则返回false function xml_parser($str){ $xml_parser xml_parser_create(); if(!xml_parse($xml_parser,$str,true)){ xml_parser_free($xml_parser); return false; } else { …

PHP函数--var_dump

var_dump(PHP 3 > 3.0.5, PHP 4, PHP 5)var_dump -- 打印变量的相关信息描述void var_dump ( mixed expression [, mixed expression [, ...]] )此函数显示关于一个或多个表达式的结构信息&#xff0c;包括表达式的类型与值。数组将递归展开值&#xff0c;通过缩进显示其结构…

Mozilla公布WebVR API标准草案

随着信息技术的迅速发展&#xff0c;虚拟现实&#xff08;Virtual Reality&#xff0c;VR&#xff09;技术在近些年不断完善&#xff0c;其应用范围也变得十分广泛。为了搭建逼真的虚拟场景&#xff0c;VR技术一般都需要用到大量精美的图像和复杂的动作。因此&#xff0c;大部分…

不到 100 行 Python 代码教你做出精美炫酷的可视化大屏

作者 |俊欣来源 |关于数据分析与可视化“碳达峰、碳中和”是2021年政府在不断强调与非常重视的事儿&#xff0c;那什么是“碳达峰”、什么又是“碳中和”呢&#xff1f;这里小编来为大家科普一下&#xff0c;所谓的“碳达峰”指的是在某一时间点&#xff0c;二氧化碳的排放不再…

JavaScript实现冒泡排序

说明 对数组进行 冒泡排序 算是比较简单的&#xff0c;冒泡排序也是容易理解的一种排序算法了&#xff0c;在面试的时候&#xff0c;很可能就会问到。 实现原理 数组中有 n 个数&#xff0c;比较每相邻两个数&#xff0c;如果前者大于后者&#xff0c;就把两个数交换位置&#…

PHP--isset()和unset()函数的用法

isset(PHP 3, PHP 4, PHP 5 )isset -- 检测变量是否设置描述bool isset ( mixed var [, mixed var [, ...]])如果 var 存在则返回 TRUE&#xff0c;否则返回 FALSE。 如果已经使用 unset() 释放了一个变量之后&#xff0c;它将不再是 isset()。若使用 isset() 测试一个被设置成…

有关任意多条曲线的拟合度算法

为什么80%的码农都做不了架构师&#xff1f;>>> 在股市中&#xff0c;经常会遇到趋势的预判。所谓趋势&#xff0c;即相对而言的规律化的模式识别形态。形象来讲&#xff0c;就是个股的一段时间内的曲线分布状况。 那么&#xff0c;问题来了。 我们虽然可以在少量的…

从深度学习到深度森林方法(Python)

作者 |泳鱼来源 |算法进阶一、深度森林的介绍 目前深度神经网络&#xff08;DNN&#xff09;做得好的几乎都是涉及图像视频&#xff08;CV&#xff09;、自然语言处理&#xff08;NLP&#xff09;等的任务&#xff0c;都是典型的数值建模任务&#xff08;在表格数据tabular dat…

LHC大神问的矩阵转置问题

数学中线性代数中提到的矩阵转置&#xff0c;其实在我们的业务场景中也有需要的地方&#xff0c;比如LHC大神问到的这个问题 那么如何进行行列转换呢&#xff1f; 代码如下&#xff1a; <?php$arrayarray(部门1>array(费用1>100,费用2>200,费用3>300),部门2>…

不同机器互相调用WebService或者HTTP一定要telnet 测试

ping的通不一定就telnet的通 一定要#telnet 目标机器IP 目标机器端口如果一直是 Trying 目标IP那么不通如果是 Trying 目标IP Connection to 目标IP 说明通的

亮相百度WAVE SUMMIT+2021,Intel OpenVINO带来新气象

北京时间12月12日&#xff0c;百度WAVE SUMMIT2021深度学习开发者峰会在上海举办。这场属于AI的科技盛会之上&#xff0c;英特尔OpenVINO联手百度PaddlePaddle为开发者带来了一系列的技术内容&#xff0c;为开源生态构建持续合作&#xff0c;为产业进步提供新的动力。 OpenVIN…

精品德国软件 UltraShredder 文件粉碎机

出自德国的文件粉碎机&#xff0c;整合了回收站的相关操作&#xff0c;特点是兼容性好&#xff0c;支持9X以上的Win全系列&#xff08;不包括64位系统哦&#xff09;。该软件绿色免费&#xff0c;建议收藏于U盘^_^ 它和偶之前汉化的加密软件Omziff一样&#xff0c;来自XTort&am…

JavaEE 银联支付之手机控件支付-消费类交易

0. workflow app端request->后台封装参数->后台进行签名->请求银联平台->解析响应->响应需求信息 复制代码1. acp_sdk.properties ##############SDK配置文件&#xff08;证书方式签名&#xff09;################ # 说明&#xff1a; # 1. 使用时请删除后缀的…

php singleton()

common.php <?phpclass CC{private static $ins;public static function singleton(){if (!isset(self::$ins)){$c __CLASS__;self::$ins new $c;}return self::$ins;}public function EventResult($Id){return $Id;}}?>index.php <html><head><title…

2015 Multi-University Training Contest 2 1002 Buildings

Buildings Problems Link: http://acm.hdu.edu.cn/showproblem.php?pid5301 Mean: n*m列的网格&#xff0c;删除一个格子x,y&#xff0c;用矩形来填充矩阵。且矩形至少有一边是在矩阵的边缘上。 要使最大矩形的面积最小&#xff0c;求满足条件的矩形填充方式中面积最大的…

Meta 发布 Bean Machine 帮助衡量 AI 模型的不确定性

编译 | 禾木木 出品 | AI科技大本营&#xff08;ID:rgznai100&#xff09; Meta 近日宣布发布 Bean Machine&#xff0c;这是一种概率编程系统&#xff0c;表面上可以更轻松地表示和了解 AI 模型中的不确定性。 在早期测试版中&#xff0c;Bean Machine 可用于通过自动的“不确…

【跃迁之路】【425天】刻意练习系列184—SQL(2018.04.06)

(跃迁之路)专栏 叨叨两句 技术的精进不能只是简单的刷题&#xff0c;而应该是不断的“刻意”练习该系列改版后正式纳入【跃迁之路】专栏&#xff0c;持续更新刻意练习——MySQL 2018.04.02 题目描述 DROP TABLE IF EXISTS test1;CREATE TABLE test1 (id int(11) NOT NULL AUTO_…

安利一个超好用的 Pandas 数据挖掘分析神器

作者 |欣一来源 |Python爱好者集中营今天小编继续来给大家介绍一款用于做EDA(探索性数据分析)的利器&#xff0c;并且可以自动生成代码&#xff0c;帮助大家极大节省工作时间与提升工作效率的利器&#xff0c;叫做Bamboolib。大家可以将其理解为是Pandas的GUI扩展工具&#xff…

PHP魔术常量

PHP 向它运行的任何脚本提供了大量的预定义常量。不过很多常量都是由不同的扩展库定义的&#xff0c;只有在加载了这些扩展库时才会出现&#xff0c;或者动态加载后&#xff0c;或者在编译时已经包括进去了。 有七个魔术常量它们的值随着它们在代码中的位置改变而改变。例如 _…

vim 打开Linux下文件每一行后面都有^M的样式

由于服务器不是我一个人在操作&#xff0c;在修改apache配置文件时发现了一个很奇怪的问题&#xff0c;vim编辑打开配置文件发现后面都有一个^M的标记 虽然不会影响服务的运行&#xff0c;但总感觉不对劲&#xff0c;所以在此我尝试用替换的方式来设置它 :%s/\^M//g 虽然也成功…