当前位置: 首页 > 编程日记 > 正文

rocketmq 组监听_最全的RocketMQ学习指南,程序员必备的中间件技能

一、简介

RocketMq是阿里开发出来的一个消息中间件,后捐献给Apache。官网上是这样介绍的:

1017f700cb30d38e0938754870c0a5f6.png
Apache RocketMQ™ is a unified messaging engine, lightweight data processing platform.

RocketMQ是一个统一的处理消息引擎,轻量级的数据处理平台。

  • 低延迟,在高压下,1毫秒内的响应延迟超过99.6%。
  • 高可用,具有跟踪和审核功能
  • 万亿级消息容量保证
  • 自最新的4.1版本以来,使用新的开放式分布式消息传递和流媒体标准
  • 批量传输,多功能集成,以提高吞吐量
  • 如果空间足够,可以在不损失性能的情况下存盘

二、基本概念

1、 消息模型(Message Model)

RocketMQ主要由 Producer、Broker、Consumer 三部分组成,其中Producer 负责生产消息,Consumer 负责消费消息,Broker 负责存储消息。Broker 在实际部署过程中对应一台服务器,每个 Broker 可以存储多个Topic的消息,每个Topic的消息也可以分片存储于不同的 Broker。Message Queue 用于存储消息的物理地址,每个Topic中的消息地址存储于多个 Message Queue 中。ConsumerGroup 由多个Consumer 实例构成。

2、 消息生产者(Producer)

负责生产消息,一般由业务系统负责生产消息。一个消息生产者会把业务应用系统里产生的消息发送到broker服务器。RocketMQ提供多种发送方式,同步发送、异步发送、顺序发送、单向发送。同步和异步方式均需要Broker返回确认信息,单向发送不需要。

3、 消息消费者(Consumer)

负责消费消息,一般是后台系统负责异步消费。一个消息消费者会从Broker服务器拉取消息、并将其提供给应用程序。从用户应用的角度而言提供了两种消费形式:拉取式消费、推动式消费。

4、 主题(Topic)

表示一类消息的集合,每个主题包含若干条消息,每条消息只能属于一个主题,是RocketMQ进行消息订阅的基本单位。

5、 代理服务器(Broker Server)

消息中转角色,负责存储消息、转发消息。代理服务器在RocketMQ系统中负责接收从生产者发送来的消息并存储、同时为消费者的拉取请求作准备。代理服务器也存储消息相关的元数据,包括消费者组、消费进度偏移和主题和队列消息等。

6、 名字服务(Name Server)

名称服务充当路由消息的提供者。生产者或消费者能够通过名字服务查找各主题相应的Broker IP列表。多个Namesrv实例组成集群,但相互独立,没有信息交换。

7、 拉取式消费(Pull Consumer)

Consumer消费的一种类型,应用通常主动调用Consumer的拉消息方法从Broker服务器拉消息、主动权由应用控制。一旦获取了批量消息,应用就会启动消费过程。

8、 推动式消费(Push Consumer)

Consumer消费的一种类型,该模式下Broker收到数据后会主动推送给消费端,该消费模式一般实时性较高。

9、 生产者组(Producer Group)

同一类Producer的集合,这类Producer发送同一类消息且发送逻辑一致。如果发送的是事务消息且原始生产者在发送之后崩溃,则Broker服务器会联系同一生产者组的其他生产者实例以提交或回溯消费。

10、 消费者组(Consumer Group)

同一类Consumer的集合,这类Consumer通常消费同一类消息且消费逻辑一致。消费者组使得在消息消费方面,实现负载均衡和容错的目标变得非常容易。要注意的是,消费者组的消费者实例必须订阅完全相同的Topic。RocketMQ 支持两种消息模式:集群消费(Clustering)和广播消费(Broadcasting)。

11、 集群消费(Clustering)

集群消费模式下,相同Consumer Group的每个Consumer实例平均分摊消息。

12、 广播消费(Broadcasting)

广播消费模式下,相同Consumer Group的每个Consumer实例都接收全量的消息。

13、 普通顺序消息(Normal Ordered Message)

普通顺序消费模式下,消费者通过同一个消费队列收到的消息是有顺序的,不同消息队列收到的消息则可能是无顺序的。

14、 严格顺序消息(Strictly Ordered Message)

严格顺序消息模式下,消费者收到的所有消息均是有顺序的。

15、 消息(Message)

消息系统所传输信息的物理载体,生产和消费数据的最小单位,每条消息必须属于一个主题。RocketMQ中每个消息拥有唯一的Message ID,且可以携带具有业务标识的Key。系统提供了通过Message ID和Key查询消息的功能。

16、 标签(Tag)

为消息设置的标志,用于同一主题下区分不同类型的消息。来自同一业务单元的消息,可以根据不同业务目的在同一主题下设置不同标签。标签能够有效地保持代码的清晰度和连贯性,并优化RocketMQ提供的查询系统。消费者可以根据Tag实现对不同子主题的不同消费逻辑,实现更好的扩展性。

三、架构设计

1、技术架构

c2e7092c318724ed43aacb0ed3fa3e57.png

RocketMQ架构上主要分为四部分,如上图所示:

  • Producer:消息发布的角色,支持分布式集群方式部署。Producer通过MQ的负载均衡模块选择相应的Broker集群队列进行消息投递,投递的过程支持快速失败并且低延迟。
  • Consumer:消息消费的角色,支持分布式集群方式部署。支持以push推,pull拉两种模式对消息进行消费。同时也支持集群方式和广播方式的消费,它提供实时消息订阅机制,可以满足大多数用户的需求。
  • NameServer:NameServer是一个非常简单的Topic路由注册中心,其角色类似Dubbo中的zookeeper,支持Broker的动态注册与发现。主要包括两个功能:Broker管理,NameServer接受Broker集群的注册信息并且保存下来作为路由信息的基本数据。然后提供心跳检测机制,检查Broker是否还存活;路由信息管理,每个NameServer将保存关于Broker集群的整个路由信息和用于客户端查询的队列信息。然后Producer和Conumser通过NameServer就可以知道整个Broker集群的路由信息,从而进行消息的投递和消费。NameServer通常也是集群的方式部署,各实例间相互不进行信息通讯。Broker是向每一台NameServer注册自己的路由信息,所以每一个NameServer实例上面都保存一份完整的路由信息。当某个NameServer因某种原因下线了,Broker仍然可以向其它NameServer同步其路由信息,Producer,Consumer仍然可以动态感知Broker的路由的信息。
  • BrokerServer:Broker主要负责消息的存储、投递和查询以及服务高可用保证,为了实现这些功能,Broker包含了以下几个重要子模块。
  1. Remoting Module:整个Broker的实体,负责处理来自clients端的请求。
  2. Client Manager:负责管理客户端(Producer/Consumer)和维护Consumer的Topic订阅信息
  3. Store Service:提供方便简单的API接口处理消息存储到物理硬盘和查询功能。
  4. HA Service:高可用服务,提供Master Broker 和 Slave Broker之间的数据同步功能。
  5. Index Service:根据特定的Message key对投递到Broker的消息进行索引服务,以提供消息的快速查询。

79dd0dae50e5aeda44eeb7f7347ead9d.png

2、部署架构

20d76599c297f72856b619c2921bd6ac.png

RocketMQ 网络部署特点

  • NameServer是一个几乎无状态节点,可集群部署,节点之间无任何信息同步。
  • Broker部署相对复杂,Broker分为Master与Slave,一个Master可以对应多个Slave,但是一个Slave只能对应一个Master,Master与Slave 的对应关系通过指定相同的BrokerName,不同的BrokerId 来定义,BrokerId为0表示Master,非0表示Slave。Master也可以部署多个。每个Broker与NameServer集群中的所有节点建立长连接,定时注册Topic信息到所有NameServer。 注意:当前RocketMQ版本在部署架构上支持一Master多Slave,但只有BrokerId=1的从服务器才会参与消息的读负载。
  • Producer与NameServer集群中的其中一个节点(随机选择)建立长连接,定期从NameServer获取Topic路由信息,并向提供Topic 服务的Master建立长连接,且定时向Master发送心跳。Producer完全无状态,可集群部署。
  • Consumer与NameServer集群中的其中一个节点(随机选择)建立长连接,定期从NameServer获取Topic路由信息,并向提供Topic服务的Master、Slave建立长连接,且定时向Master、Slave发送心跳。Consumer既可以从Master订阅消息,也可以从Slave订阅消息,消费者在向Master拉取消息时,Master服务器会根据拉取偏移量与最大偏移量的距离(判断是否读老消息,产生读I/O),以及从服务器是否可读等因素建议下一次是从Master还是Slave拉取。

结合部署架构图,描述集群工作流程:

  • 启动NameServer,NameServer起来后监听端口,等待Broker、Producer、Consumer连上来,相当于一个路由控制中心。
  • Broker启动,跟所有的NameServer保持长连接,定时发送心跳包。心跳包中包含当前Broker信息(IP+端口等)以及存储所有Topic信息。注册成功后,NameServer集群中就有Topic跟Broker的映射关系。
  • 收发消息前,先创建Topic,创建Topic时需要指定该Topic要存储在哪些Broker上,也可以在发送消息时自动创建Topic。
  • Producer发送消息,启动时先跟NameServer集群中的其中一台建立长连接,并从NameServer中获取当前发送的Topic存在哪些Broker上,轮询从队列列表中选择一个队列,然后与队列所在的Broker建立长连接从而向Broker发消息。
  • Consumer跟Producer类似,跟其中一台NameServer建立长连接,获取当前订阅Topic存在哪些Broker上,然后直接跟Broker建立连接通道,开始消费消息。

四、示例

1、 基本样例

在基本样例中我们提供如下的功能场景:

  • 使用RocketMQ发送三种类型的消息:同步消息、异步消息和单向消息。其中前两种消息是可靠的,因为会有发送是否成功的应答。
  • 使用RocketMQ来消费接收到的消息。1.1 加入依赖:
    maven:
<dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-client</artifactId><version>4.3.0</version>
</dependency>

gradle:

compile 'org.apache.rocketmq:rocketmq-client:4.3.0'

1.2 消息发送

1、Producer端发送同步消息 这种可靠性同步地发送方式使用的比较广泛,比如:重要的消息通知,短信通知。

public class SyncProducer {public static void main(String[] args) throws Exception {// 实例化消息生产者ProducerDefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");// 设置NameServer的地址producer.setNamesrvAddr("localhost:9876");// 启动Producer实例producer.start();for (int i = 0; i < 100; i++) {// 创建消息,并指定Topic,Tag和消息体Message msg = new Message("TopicTest" /* Topic */,"TagA" /* Tag */,("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */);// 发送消息到一个BrokerSendResult sendResult = producer.send(msg);// 通过sendResult返回消息是否成功送达System.out.printf("%s%n", sendResult);}// 如果不再发送消息,关闭Producer实例。producer.shutdown();}
}

2、发送异步消息 异步消息通常用在对响应时间敏感的业务场景,即发送端不能容忍长时间地等待Broker的响应。

public class AsyncProducer {public static void main(String[] args) throws Exception {// 实例化消息生产者ProducerDefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");// 设置NameServer的地址producer.setNamesrvAddr("localhost:9876");// 启动Producer实例producer.start();producer.setRetryTimesWhenSendAsyncFailed(0);int messageCount = 100;// 根据消息数量实例化倒计时计算器final CountDownLatch2 countDownLatch = new CountDownLatch2(messageCount);for (int i = 0; i < messageCount; i++) {final int index = i;// 创建消息,并指定Topic,Tag和消息体Message msg = new Message("TopicTest","TagA","OrderID188","Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));// SendCallback接收异步返回结果的回调producer.send(msg, new SendCallback() {@Overridepublic void onSuccess(SendResult sendResult) {System.out.printf("%-10d OK %s %n", index,sendResult.getMsgId());}@Overridepublic void onException(Throwable e) {System.out.printf("%-10d Exception %s %n", index, e);e.printStackTrace();}});}// 等待5scountDownLatch.await(5, TimeUnit.SECONDS);// 如果不再发送消息,关闭Producer实例。producer.shutdown();}
}

3、单向发送消息 这种方式主要用在不特别关心发送结果的场景,例如日志发送。

public class OnewayProducer {public static void main(String[] args) throws Exception{// 实例化消息生产者ProducerDefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");// 设置NameServer的地址producer.setNamesrvAddr("localhost:9876");// 启动Producer实例producer.start();for (int i = 0; i < 100; i++) {// 创建消息,并指定Topic,Tag和消息体Message msg = new Message("TopicTest" /* Topic */,"TagA" /* Tag */,("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */);// 发送单向消息,没有任何返回结果producer.sendOneway(msg);}// 如果不再发送消息,关闭Producer实例。producer.shutdown();}
}

1.3 消费消息

public class Consumer {public static void main(String[] args) throws InterruptedException, MQClientException {// 实例化消费者DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name");// 设置NameServer的地址consumer.setNamesrvAddr("localhost:9876");// 订阅一个或者多个Topic,以及Tag来过滤需要消费的消息consumer.subscribe("TopicTest", "*");// 注册回调实现类来处理从broker拉取回来的消息consumer.registerMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);// 标记该消息已经被成功消费return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});// 启动消费者实例consumer.start();System.out.printf("Consumer Started.%n");}
}

1.4 发送延时消息

import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.common.message.Message;public class ScheduledMessageProducer {public static void main(String[] args) throws Exception {// 实例化一个生产者来产生延时消息DefaultMQProducer producer = new DefaultMQProducer("ExampleProducerGroup");// 启动生产者producer.start();int totalMessagesToSend = 100;for (int i = 0; i < totalMessagesToSend; i++) {Message message = new Message("TestTopic", ("Hello scheduled message " + i).getBytes());// 设置延时等级3,这个消息将在10s之后发送(现在只支持固定的几个时间,详看delayTimeLevel)message.setDelayTimeLevel(3);// 发送消息producer.send(message);}// 关闭生产者producer.shutdown();}
}

1.5 批量消息

批量发送消息能显著提高传递小消息的性能。限制是这些批量消息应该有相同的topic,相同的waitStoreMsgOK,而且不能是延时消息。此外,这一批消息的总大小不应超过4MB。

String topic = "BatchTest";
List<Message> messages = new ArrayList<>();
messages.add(new Message(topic, "TagA", "OrderID001", "Hello world 0".getBytes()));
messages.add(new Message(topic, "TagA", "OrderID002", "Hello world 1".getBytes()));
messages.add(new Message(topic, "TagA", "OrderID003", "Hello world 2".getBytes()));
try {producer.send(messages);
} catch (Exception e) {e.printStackTrace();//处理error
}

复杂度只有当你发送大批量时才会增长,你可能不确定它是否超过了大小限制(4MB)。这时候你最好把你的消息列表分割一下:

public class ListSplitter implements Iterator<List<Message>> { private final int SIZE_LIMIT = 1024 * 1024 * 4;private final List<Message> messages;private int currIndex;public ListSplitter(List<Message> messages) { this.messages = messages;}@Override public boolean hasNext() {return currIndex < messages.size(); }@Override public List<Message> next() { int startIndex = getStartIndex();int nextIndex = startIndex;int totalSize = 0;for (; nextIndex < messages.size(); nextIndex++) {Message message = messages.get(nextIndex); int tmpSize = calcMessageSize(message);if (tmpSize + totalSize > SIZE_LIMIT) {break; } else {totalSize += tmpSize; }}List<Message> subList = messages.subList(startIndex, nextIndex); currIndex = nextIndex;return subList;}private int getStartIndex() {Message currMessage = messages.get(currIndex); int tmpSize = calcMessageSize(currMessage); while(tmpSize > SIZE_LIMIT) {currIndex += 1;Message message = messages.get(curIndex); tmpSize = calcMessageSize(message);}return currIndex; }private int calcMessageSize(Message message) {int tmpSize = message.getTopic().length() + message.getBody().length(); Map<String, String> properties = message.getProperties();for (Map.Entry<String, String> entry : properties.entrySet()) {tmpSize += entry.getKey().length() + entry.getValue().length(); }tmpSize = tmpSize + 20; // 增加⽇日志的开销20字节return tmpSize; }
}
//把大的消息分裂成若干个小的消息
ListSplitter splitter = new ListSplitter(messages);
while (splitter.hasNext()) {try {List<Message>  listItem = splitter.next();producer.send(listItem);} catch (Exception e) {e.printStackTrace();//处理error}
}

1.6 过滤消息

在大多数情况下,TAG是一个简单而有用的设计,其可以来选择您想要的消息。例如:

DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("CID_EXAMPLE");
consumer.subscribe("TOPIC", "TAGA || TAGB || TAGC");

消费者将接收包含TAGA或TAGB或TAGC的消息。但是限制是一个消息只能有一个标签,这对于复杂的场景可能不起作用。在这种情况下,可以使用SQL表达式筛选消息。SQL特性可以通过发送消息时的属性来进行计算。在RocketMQ定义的语法下,可以实现一些简单的逻辑。下面是一个例子:

------------
| message  |
|----------|  a > 5 AND b = 'abc'
| a = 10   |  --------------------> Gotten
| b = 'abc'|
| c = true |
------------
------------
| message  |
|----------|   a > 5 AND b = 'abc'
| a = 1    |  --------------------> Missed
| b = 'abc'|
| c = true |
------------

1 基本语法

RocketMQ只定义了一些基本语法来支持这个特性。你也可以很容易地扩展它。

数值比较,比如:>,>=,<,<=,BETWEEN,=;
字符比较,比如:=,<>,IN;
IS NULL 或者 IS NOT NULL;
逻辑符号 AND,OR,NOT;
常量支持类型为:数值,比如:123,3.1415;
字符,比如:'abc',必须用单引号包裹起来;
NULL,特殊的常量
布尔值,TRUE 或 FALSE

只有使用push模式的消费者才能用使用SQL92标准的sql语句,接口如下:

public void subscribe(finalString topic, final MessageSelector messageSelector)

2、生产者样例 发送消息时,你能通过putUserProperty来设置消息的属性

DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
producer.start();
Message msg = new Message("TopicTest",tag,("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET)
);
// 设置一些属性
msg.putUserProperty("a", String.valueOf(i));
SendResult sendResult = producer.send(msg);producer.shutdown();

3、消费者样例 用MessageSelector.bySql来使用sql筛选消息

DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_4");
// 只有订阅的消息有这个属性a, a >=0 and a <= 3
consumer.subscribe("TopicTest", MessageSelector.bySql("a between 0 and 3");
consumer.registerMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}
});
consumer.start();

五、集群部署

1 集群搭建

1.1 单Master模式

这种方式风险较大,一旦Broker重启或者宕机时,会导致整个服务不可用。不建议线上环境使用,可以用于本地测试。

1)启动 NameServer

### 首先启动Name Server
$ nohup sh mqnamesrv &### 验证Name Server 是否启动成功
$ tail -f ~/logs/rocketmqlogs/namesrv.log
The Name Server boot success...

2)启动 Broker

### 启动Broker
$ nohup sh bin/mqbroker -n localhost:9876 &### 验证Name Server 是否启动成功,例如Broker的IP为:192.168.1.2,且名称为broker-a
$ tail -f ~/logs/rocketmqlogs/Broker.log 
The broker[broker-a, 192.169.1.2:10911] boot success...

1.2 多Master模式

一个集群无Slave,全是Master,例如2个Master或者3个Master,这种模式的优缺点如下:

  • 优点:配置简单,单个Master宕机或重启维护对应用无影响,在磁盘配置为RAID10时,即使机器宕机不可恢复情况下,由于RAID10磁盘非常可靠,消息也不会丢(异步刷盘丢失少量消息,同步刷盘一条不丢),性能最高;
  • 缺点:单台机器宕机期间,这台机器上未被消费的消息在机器恢复之前不可订阅,消息实时性会受到影响。

1)启动NameServer

NameServer需要先于Broker启动,且如果在生产环境使用,为了保证高可用,建议一般规模的集群启动3个NameServer,各节点的启动命令相同,如下:

### 首先启动Name Server
$ nohup sh mqnamesrv &### 验证Name Server 是否启动成功
$ tail -f ~/logs/rocketmqlogs/namesrv.log
The Name Server boot success...

2)启动Broker集群

### 在机器A,启动第一个Master,例如NameServer的IP为:192.168.1.1
$ nohup sh mqbroker -n 192.168.1.1:9876 -c $ROCKETMQ_HOME/conf/2m-noslave/broker-a.properties &### 在机器B,启动第二个Master,例如NameServer的IP为:192.168.1.1
$ nohup sh mqbroker -n 192.168.1.1:9876 -c $ROCKETMQ_HOME/conf/2m-noslave/broker-b.properties &...

如上启动命令是在单个NameServer情况下使用的。对于多个NameServer的集群,Broker启动命令中-n后面的地址列表用分号隔开即可,例如 192.168.1.1:9876;192.161.2:9876。

1.3 多Master多Slave模式-异步复制

每个Master配置一个Slave,有多对Master-Slave,HA采用异步复制方式,主备有短暂消息延迟(毫秒级),这种模式的优缺点如下:

  • 优点:即使磁盘损坏,消息丢失的非常少,且消息实时性不会受影响,同时Master宕机后,消费者仍然可以从Slave消费,而且此过程对应用透明,不需要人工干预,性能同多Master模式几乎一样;
  • 缺点:Master宕机,磁盘损坏情况下会丢失少量消息。

1)启动NameServer

### 首先启动Name Server
$ nohup sh mqnamesrv &### 验证Name Server 是否启动成功
$ tail -f ~/logs/rocketmqlogs/namesrv.log
The Name Server boot success...

2)启动Broker集群

### 在机器A,启动第一个Master,例如NameServer的IP为:192.168.1.1
$ nohup sh mqbroker -n 192.168.1.1:9876 -c $ROCKETMQ_HOME/conf/2m-2s-async/broker-a.properties &### 在机器B,启动第二个Master,例如NameServer的IP为:192.168.1.1
$ nohup sh mqbroker -n 192.168.1.1:9876 -c $ROCKETMQ_HOME/conf/2m-2s-async/broker-b.properties &### 在机器C,启动第一个Slave,例如NameServer的IP为:192.168.1.1
$ nohup sh mqbroker -n 192.168.1.1:9876 -c $ROCKETMQ_HOME/conf/2m-2s-async/broker-a-s.properties &### 在机器D,启动第二个Slave,例如NameServer的IP为:192.168.1.1
$ nohup sh mqbroker -n 192.168.1.1:9876 -c $ROCKETMQ_HOME/conf/2m-2s-async/broker-b-s.properties &

1.4 多Master多Slave模式-同步双写

每个Master配置一个Slave,有多对Master-Slave,HA采用同步双写方式,即只有主备都写成功,才向应用返回成功,这种模式的优缺点如下:

  • 优点:数据与服务都无单点故障,Master宕机情况下,消息无延迟,服务可用性与数据可用性都非常高;
  • 缺点:性能比异步复制模式略低(大约低10%左右),发送单个消息的RT会略高,且目前版本在主节点宕机后,备机不能自动切换为主机。

1)启动NameServer

### 首先启动Name Server
$ nohup sh mqnamesrv &### 验证Name Server 是否启动成功
$ tail -f ~/logs/rocketmqlogs/namesrv.log
The Name Server boot success...

2)启动Broker集群

### 在机器A,启动第一个Master,例如NameServer的IP为:192.168.1.1
$ nohup sh mqbroker -n 192.168.1.1:9876 -c $ROCKETMQ_HOME/conf/2m-2s-sync/broker-a.properties &### 在机器B,启动第二个Master,例如NameServer的IP为:192.168.1.1
$ nohup sh mqbroker -n 192.168.1.1:9876 -c $ROCKETMQ_HOME/conf/2m-2s-sync/broker-b.properties &### 在机器C,启动第一个Slave,例如NameServer的IP为:192.168.1.1
$ nohup sh mqbroker -n 192.168.1.1:9876 -c $ROCKETMQ_HOME/conf/2m-2s-sync/broker-a-s.properties &### 在机器D,启动第二个Slave,例如NameServer的IP为:192.168.1.1
$ nohup sh mqbroker -n 192.168.1.1:9876 -c $ROCKETMQ_HOME/conf/2m-2s-sync/broker-b-s.properties &

以上Broker与Slave配对是通过指定相同的BrokerName参数来配对,Master的BrokerId必须是0,Slave的BrokerId必须是大于0的数。另外一个Master下面可以挂载多个Slave,同一Master下的多个Slave通过指定不同的BrokerId来区分。$ROCKETMQ_HOME指的RocketMQ安装目录,需要用户自己设置此环境变量。

2 mqadmin管理工具

注意:

  • 执行命令方法:./mqadmin {command} {args}
  • 几乎所有命令都需要配置-n表示NameServer地址,格式为ip:port
  • 几乎所有命令都可以通过-h获取帮助
  • 如果既有Broker地址(-b)配置项又有clusterName(-c)配置项,则优先以Broker地址执行命令,如果不配置Broker地址,则对集群中所有主机执行命令,只支持一个Broker地址。-b格式为ip:port,port默认是10911
  • 在tools下可以看到很多命令,但并不是所有命令都能使用,只有在MQAdminStartup中初始化的命令才能使用,你也可以修改这个类,增加或自定义命令
  • 由于版本更新问题,少部分命令可能未及时更新,遇到错误请直接阅读相关命令源码

六、FAQ

1、新的consumer从哪里消费?

  • 如果topic里面的消息是三天以内的,新consumer从所有消息的第一条开始消费
  • 如果有超过三天的消息,新consumer从往前递推第三天的那条消息开始
  • 如果是consumer重启,从它上次的位置继续

2、消费失败了怎么处理?

如果返回 ReconsumerLater,null或者抛出异常,这条消息会重试,最多16次

3、失败的消息怎么查询?

  • 根据topic和时间片查询
  • 根据topic和messageId查询
  • 根据topic和messageKey查询

4、消息只传递一次吗?

rocketMQ能保证所有的消息至少执行一次,在大部分情况下,消息不会重复消费,但不是绝对,要自己做幂等设计

5、每条消息保存多久?

3天

6、message body的大小限制?

256kb

7、怎么设置consumer线程数?

consumer.setConsumeThreadMin(20);
consumer.setConsumeThreadMax(20);

8、常见错误

  • 一个jvm程序只能有一个生产者/消费者实例;
  • fastjson版本不能太低;
  • topic要提前新建好,并保证有权限。

9、为什么是RocketMQ?

根据我们的研究,随着队列和虚拟主题的增加,ActiveMQ IO模块达到了一个瓶颈。我们试图通过节流、断路器或降质来解决这个问题,但效果并不理想。所以我们开始关注流行的消息传递解决方案Kafka。不幸的是,Kafka不能满足我们的要求,特别是在低延迟和高可靠性方面。

在这种情况下,我们决定发明一个新的消息传递引擎来处理更广泛的用例集,从传统的发布/订阅场景到高容量实时零损失容忍事务系统。

下表展示了RocketMQ、ActiveMQ和Kafka(根据awesome java,Apache最流行的消息传递解决方案)之间的比较:

6daab662931365a69b91afa9f29fd86e.png

欢迎关注公众号:丰极,更多技术学习分享。

参考网址:

http://rocketmq.apache.org/

http://rocketmq.apache.org/docs/quick-start/

相关文章:

【刷题】BZOJ 4516 [Sdoi2016]生成魔咒

Description 魔咒串由许多魔咒字符组成&#xff0c;魔咒字符可以用数字表示。例如可以将魔咒字符 1、2 拼凑起来形成一个魔咒串 [1,2]。 一个魔咒串 S 的非空字串被称为魔咒串 S 的生成魔咒。 例如 S[1,2,1] 时&#xff0c;它的生成魔咒有 [1]、[2]、[1,2]、[2,1]、[1,2,1] 五种…

jQuery的文档操作方法

jQuery 文档操作方法 这些方法对于 XML 文档和 HTML 文档均是适用的&#xff0c;除了&#xff1a;html()。 方法描述addClass()向匹配的元素添加指定的类名。after()在匹配的元素之后插入内容。append()向匹配元素集合中的每个元素结尾插入由参数指定的内容。appendTo()向目标结…

原 EOS智能合约开发入门

链客&#xff0c;专为开发者而生&#xff0c;有问必答&#xff01; 此文章来自区块链技术社区&#xff0c;未经允许拒绝转载。 EOS智能合约的开发需要使用llvm和abigen来生成abi文件。 为此eos提供了一个 名为eosiocpp的工具。 在这篇文章中&#xff0c;我们介绍如何使用这个工…

python安装虚拟环境virtualenv

虚拟环境 虚拟环境是一个将不同项目所需求的依赖分别放在独立的地方的一个工具&#xff0c;它给这些工程创建虚拟的Python环境。它解决了“项目X依赖于版本1.x&#xff0c;而项目Y需要项目4.x”的两难问题&#xff0c;而且使你的全局site-packages目录保持干净和可管理。 比如&…

可变分区存储管理实验报告总结_操作系统实验报告-可变分区存储管理方式的内存分配回收...

一&#xff0e;实验目的(1)深入了解可变分区存储管理方式的内存分配回收的实现。二&#xff0e;实验内容编写程序完成可变分区存储管理方式的内存分配回收&#xff0c;要求有内存空间分配表&#xff0c;并采用最优适应算法完成内存的分配与回收。三&#xff0e;实验原理在可变分…

ubuntu/linuxmint如何添加和删除PPA源

【添加】 1、sudo add-apt-repository ppa:user/ppa-name 2、sudo apt-get update (然后再安装软件sudo apt-get install <package-name>或更新软件sudo apt-get upgrade) 【删除】 1、cd /etc/apt/source.list.d/ 2、sudo rm <ppa-name> 转载于:https://www.cnblo…

了解EOS看这一篇就够了一、团队二、技术三、项目进度四、争议和风险五、展望

链客&#xff0c;专为开发者而生&#xff0c;有问必答&#xff01; 此文章来自区块链技术社区&#xff0c;未经允许拒绝转载。 无论是混迹于币圈、链圈还是矿圈&#xff0c;对BTC&#xff08;比特币&#xff09;、ETH&#xff08;以太坊&#xff09;、EOS这三大主流币一定不会…

qt 多个模型如何显示在表格中_Qt MOOC系列教程 第五章第四节:QML中的C++模型

我们已经多次讨论过如何创建自己的模型来表示QML中的数据&#xff0c;并且在上一节中我们看到了QStandardItemModel的基本示例。通常&#xff0c;出于性能和功能方面的原因&#xff0c;需要从一开始就要实现自己的模型。QAbstactItemModel类为项目模型类提供了抽象接口&#xf…

ubuntu终端基础命令

1. 启动终端的快捷键: ctr alt t2. 终端字体放大: ctrshift3. 终端字体放大: ctr-4. ls : 查看当前目录的文件信息 4.1 ls 路径&#xff1a; 查看指定目录的信息 4.1. pwd: 查看目录所在的路径5. touch: 创建文件 5.1 touch 1.txt 2.txt 创建多个文件6. mkdir: 创建…

js中操作数组的一些方法

增 push 在数组的末尾添加一个或多个元素&#xff0c;并返回新的长度。 array.push(1,2,3.........) unshift 在数组的开头添加一个或多个元素&#xff0c;并返回新的长度。 array.unshift(1,2,3......) splice 在制定位置添加一个活多个元素&#xff0c;splice(s…

a标签怎么传参_jsp页面中怎么利用a标签的href进行传递参数以及需要注意的地方...

jsp页面中&#xff1a;这是正确写法。需要注意的地方&#xff1a;1、传递的参数是数字2、传递的参数是字符串注意多了个单引号后台直接用request.getParameter("productIdStr"); 接收就可以了。此处也有要注意的地方&#xff1a;接收后要进行判空&#xff0c;否则会报…

转】windows下使用批处理脚本实现多个版本的JDK切换

原博文出自于:  http://www.cnblogs.com/xdp-gacl/p/5209386.html      感谢&#xff01; 一.JDK版本切换批处理脚本 我们平时在window上做开发的时候&#xff0c;可能需要同时开发两个甚至多个项目&#xff0c;有时不同的项目对JDK的版本要求有区别&#xff0c;这时候…

mysql引擎介绍

1.myisam存储引擎&#xff1a;不支持事务&#xff0c;也不支持外键&#xff0c;优势是访问速度快&#xff0c;对事务完整性没有要求或者以select&#xff0c;insert为主的应用基本上可以用这个引擎来创建表。 2.innodb存储引擎&#xff1a;innodb引擎提供了具有提交&#xff0c…

为什么2100万个BTC发行总量少了0.0231?

链客&#xff0c;专为开发者而生&#xff0c;有问必答&#xff01; 此文章来自区块链技术社区&#xff0c;未经允许拒绝转载。 许多人只是听说比特币的总数为2100万个&#xff0c;但不知道这个数字的准确来源。实际上&#xff0c;2100万个只是一个近似数&#xff0c;精确的数…

Navicat连接数据库成功,新建查询时提示错误“Cannot create file ……”

Navicat连接数据库成功&#xff0c;新建查询时提示错误“Cannot create file ……” 原因:编辑连接{高级}<设置位置>被修改&#xff0c;该oci.dll不正确 解决方案&#xff1a;删除该连接信息&#xff0c;重新新建。编辑连接{高级}<设置位置>会自动生成&#xff0c;…

[深入React] 2.综述

在开始本教程前&#xff0c;请先查看官方示例&#xff1a;https://github.com/facebook/react/archive/master.zip 里的 examples 目录。 学习react是一个循序渐进的过程&#xff0c;虽然它概念较少&#xff0c;但在思想上和jQuery相差甚远。我在学的时候也是边开发边查官方文档…

element select 自动展开_原生js 让select下拉框自动展开 可用size 属性来代替展开动作...

找遍全网都没一个比较好的例子&#xff01;&#xff01;&#xff01;什么获得焦点事件&#xff0c;或者添加一个点击事件都不能使下拉框自动展开最后用标签的siz属性定义和用法size 属性规定下拉列表中可见选项的数目。如果 size 属性的值大于 1&#xff0c;但是小于列表中选项…

一文看懂怎样用 Python 创建比特币交易

链客&#xff0c;专为开发者而生&#xff0c;有问必答&#xff01; 此文章来自区块链技术社区&#xff0c;未经允许拒绝转载。 比特币价格的上上下下&#xff0c;始终撩动着每一个人无比关切的小心脏。从去年初的 800 美元左右&#xff0c;飞涨到去年底到 19783.21 美元最高点…

[转]mysql 数据类型

原文地址:https://github.com/jaywcjlove/handbook/blob/master/MySQL/MySQL%E6%95%B0%E6%8D%AE%E7%B1%BB%E5%9E%8B.md MySQL数据类型 数字类型 整数: tinyint、smallint、mediumint、int、bigint浮点数: float、double、real、decimal日期和时间: date、time、datetime、times…

dev treeview控件_在Winform开发框架中使用DevExpress的TreeList和TreeListLookupEdit控件

DevExpress提供的树形列表控件TreeList和树形下拉列表控件TreeListLookupEdit都是非常强大的一个控件&#xff0c;它和我们传统Winform的TreeView控件使用上有所不同&#xff0c;我一般在Winform开发中根据情况混合使用这些控件&#xff0c;不过整体来看&#xff0c;基于DevExp…

util包下的Date与sql包下的Date之间的转换

Java中的时间类型 java.sql包下给出三个与数据库相关的日期时间类型&#xff0c;分别是&#xff1a; Date&#xff1a;表示日期&#xff0c;只有年月日&#xff0c;没有时分秒。会丢失时间&#xff1b; Time&#xff1a;表示时间&#xff0c;只有时分秒&#xff0c;没有年月日。…

【MySQL解惑笔记】忘记MySQL数据库密码

破解MySQL密码 一、MySQL5.7.5之前 只要有系统root密码就可以破解&#xff1a; [roothost-131 ~]# vim /etc/my.cnf //在配置文件中加入如下内容 [mysqld] skip-grant-tables[roothost-131 ~]# systemctl restart mysqld //重启…

接口自动化测试框架

现在市面上做接口测试的工具很多&#xff0c;比如Postman&#xff0c;soapUI, JMeter, Python unittest等等&#xff0c;各种不同的测试工具拥有不同的特色。但市面上的接口测试工具都存在一个问题就是无法完全吻合的去适用没一个项目&#xff0c;比如数据的处理&#xff0c;加…

arcpy实现空间查询_布隆过滤!Python实现亿级数据集中元素快速查找

前段时间在做数据碰撞分析时&#xff0c;遇到一个在数亿级的int型数据集中查找30万个特定int值是否存在的需求&#xff0c;当时尝试了几种方式通过分片&#xff0c;然后做增量分析HashMap这两种方式第一种太慢&#xff0c;即使后面进一步实现了分布式计算&#xff0c;可仍然无法…

比特币如何实现—《区块链历史链条》2

链客&#xff0c;专为开发者而生&#xff0c;有问必答&#xff01; 此文章来自区块链技术社区&#xff0c;未经允许拒绝转载。 11比特币为什么还没有挖完 比特币系统靠调节难度系数保证比特币不被太快挖完。每10分钟&#xff0c;全网矿工共同计算一道难题&#xff0c;竞争记账…

centos7 系统下搭建 lnmp 环境

目录 目录概述准备工作开始编译安装1. 安装 Nginx1. 解压2. 环境准备3. 编译过程4. Nginx 服务2. 安装 MySQL1. 解压2. 环境准备3. 安装 CMake 编译器&#xff1a;4. 编译过程5. 初始化数据库6. MySQL 服务3. 安装 PHP1. 安装依赖包2. 编译安装3. 配置 PHP4. 整合 LNMP1 编辑 N…

dp uva1218

题目链接 一共有三种状态&#xff1a; 1、d[u][0]&#xff1a;u是服务器&#xff0c;每个子结点可以是也可以不是。 2、d[u][1]&#xff1a;u不是服务器&#xff0c;但u的父亲是&#xff0c;u的子结点都不是服务器。 3、d[u][2]&#xff1a;u和u的父亲都不是服务器&#xff0c;…

浏览器安全检查己通过_百度主动推送三项合一功能

百度主动推送三项合一功能作者&#xff1a;68喜功能模块&#xff1a;搜索关键词记录推送熊掌号当天推送熊掌号历史推送普通主动推送*///错误显示屏蔽error_reporting(E_ERROR | E_WARNING | E_PARSE);require ./common.inc.php; //引入公用函数$starid 1; //初始ID 开$limit…

EOS账户权限

链客&#xff0c;专为开发者而生&#xff0c;有问必答&#xff01; 此文章来自区块链技术社区&#xff0c;未经允许拒绝转载。 账户和权限 钱包 账户 授权和权限 其他 默认账户配置(单个签名) 多签名账户和自定义权限 帐户是存储在区块链中的人类可读标识符。 每个交易都根据…

怎样在表格中选出同一类_3分钟教会你如何将不同表格中的数据关联在一起

原标题&#xff1a; 3分钟教会你如何将不同表格中的数据关联在一起版权声明&#xff1a;本文为博主原创文章&#xff0c;未经博主允许不得转载。智能输入超级表格 微视频关键词&#xff1a; 智能输入 关联不同表格 逻辑输入「超级表格微视频」第六期提升效率、 增强免疫力&…