java kafka 集群消费_kafka集群搭建和使用Java写kafka生产者消费者
转自:http://chengjianxiaoxue.iteye.com/blog/2190488
1 kafka集群搭建
1.zookeeper集群 搭建在110, 111,112
2.kafka使用3个节点110, 111,112修改配置文件config/server.properties
broker.id=110host.name=192.168.1.110log.dirs=/usr/local/kafka_2.10-0.8.2.0/logs
复制到其他两个节点,然后修改对应节点上的config/server.pro3.启动,在三个节点分别执行
bin/kafka-server-start.sh config/server.properties >/dev/null 2>&1 &
4创建主题
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 3 --topic test5查看主题详细
bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic test--topic test
Topic:test PartitionCount:3 ReplicationFactor:3Configs:
Topic: test Partition:0 Leader: 110 Replicas: 110,111,112 Isr: 110,111,112Topic: test Partition:1 Leader: 111 Replicas: 111,112,110 Isr: 111,112,110Topic: test Partition:2 Leader: 112 Replicas: 112,110,111 Isr: 112,110,111
6去zk上看kafka集群
[zk: localhost:2181(CONNECTED) 5] ls /[admin, zookeeper, consumers, config, controller, zk-fifo, storm, brokers, controller_epoch]
[zk: localhost:2181(CONNECTED) 6] ls /brokers ---->查看注册在zk内的kafka
[topics, ids]
[zk: localhost:2181(CONNECTED) 7] ls /brokers/ids
[112, 110, 111]
[zk: localhost:2181(CONNECTED) 8] ls /brokers/ids/112[]
[zk: localhost:2181(CONNECTED) 9] ls /brokers/topics
[test]
[zk: localhost:2181(CONNECTED) 10] ls /brokers/topics/test
[partitions]
[zk: localhost:2181(CONNECTED) 11] ls /brokers/topics/test/partitions
[2, 1, 0]
[zk: localhost:2181(CONNECTED) 12]
2 kafka java调用:
2.1 java端生产数据, kafka集群消费数据:
1 创建maven工程,pom.xml中增加如下:
org.apache.kafka
kafka_2.10
0.8.2.0
2 java代码: 向主题test内写入数据
import java.util.Properties;
import java.util.concurrent.TimeUnit;
import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;
import kafka.serializer.StringEncoder;
public class kafkaProducer extends Thread{
private String topic;
public kafkaProducer(String topic){
super();
this.topic = topic;
}
@Override
public void run() {
Producer producer = createProducer();
int i=0;
while(true){
producer.send(new KeyedMessage(topic, "message: " + i++));
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
private Producer createProducer() {
Properties properties = new Properties();
properties.put("zookeeper.connect", "192.168.1.110:2181,192.168.1.111:2181,192.168.1.112:2181");//声明zk
properties.put("serializer.class", StringEncoder.class.getName());
properties.put("metadata.broker.list", "192.168.1.110:9092,192.168.1.111:9093,192.168.1.112:9094");// 声明kafka broker
return new Producer(new ProducerConfig(properties));
}
public static void main(String[] args) {
new kafkaProducer("test").start();// 使用kafka集群中创建好的主题 test
}
}
3 kafka集群中消费主题test的数据:
[root@h2master kafka]# bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginnin
4 启动java代码,然后在看集群消费的数据如下:
message: 0
message: 1
message: 2
message: 3
message: 4
message: 5
message: 6
message: 7
message: 8
message: 9
message: 10
message: 11
message: 12
message: 13
message: 14
message: 15
message: 16
message: 17
message: 18
message: 19
message: 20
message: 21
3 kafka 使用Java写消费者,这样 先运行kafkaProducer ,在运行kafkaConsumer,即可得到生产者的数据:
importjava.util.HashMap;importjava.util.List;importjava.util.Map;importjava.util.Properties;importkafka.consumer.Consumer;importkafka.consumer.ConsumerConfig;importkafka.consumer.ConsumerIterator;importkafka.consumer.KafkaStream;importkafka.javaapi.consumer.ConsumerConnector;/*** 接收数据
* 接收到: message: 10
接收到: message: 11
接收到: message: 12
接收到: message: 13
接收到: message: 14
*@authorzm
**/
public class kafkaConsumer extendsThread{privateString topic;publickafkaConsumer(String topic){super();this.topic =topic;
}
@Overridepublic voidrun() {
ConsumerConnector consumer=createConsumer();
Map topicCountMap = new HashMap();
topicCountMap.put(topic,1); //一次从主题中获取一个数据
Map>> messageStreams =consumer.createMessageStreams(topicCountMap);
KafkaStream stream = messageStreams.get(topic).get(0);//获取每次接收到的这个数据
ConsumerIterator iterator =stream.iterator();while(iterator.hasNext()){
String message= newString(iterator.next().message());
System.out.println("接收到: " +message);
}
}privateConsumerConnector createConsumer() {
Properties properties= newProperties();
properties.put("zookeeper.connect", "192.168.1.110:2181,192.168.1.111:2181,192.168.1.112:2181");//声明zk
properties.put("group.id", "group1");//必须要使用别的组名称, 如果生产者和消费者都在同一组,则不能访问同一组内的topic数据
return Consumer.createJavaConsumerConnector(newConsumerConfig(properties));
}public static voidmain(String[] args) {new kafkaConsumer("test").start();//使用kafka集群中创建好的主题 test
}
}
相关文章:

Java之替换“\n”符号
开发平台:Android 4.1.2 在去除字符串中的换行符(\n)的时候,写成str.replace("\\n", "")才能正确执行。str.replace("\n","") ,str.replaceAll("\\n",""),str.repla…

ThinkPHP项目笔记之登录,注册,安全退出篇
1.先说注册 a.准备好注册页面,register.html,当然一般有,姓名,邮箱,地址等常用的。 b."不要相信用户提交的一切数据",安全,安全是第一位的。所以要做判断,客户端要做基本判断,为了防止…

c语言第八次作业
1.选择法排序。输入一个正整数n(1<n<10),再输入n个整数,将他们从大到小排序后输出。试写出相应程序。 #include<stdio.h>int main (void){int i,index,k,n,t;int a[10];printf("输入数据的个数n:");scanf("%d",&n);…

java线程池的工作原理_Java 线程池的介绍以及工作原理
在什么情况下使用线程池?1.单个任务处理的时间比较短2.将需处理的任务的数量大使用线程池的好处:1. 降低资源消耗: 通过重复利用已创建的线程降低线程创建和销毁造成的消耗。2. 提高响应速度: 当任务到达时,…

.NET 程序设计实验 含记事本通讯录代码
实验一 .NET 程序设计基本流程 【实验内容】 一、控制台、Windows 应用程序、ASP.NET 程序开发流程 1、熟悉开发平台 2、分别开发控制台、Windows 应用程序、ASP.NET 程序下显示“Hello world!”的应用程序, 掌握新建、基本输入输出、程序流程、程序调试…

复制构造函数(拷贝构造函数)
也许很多C的初学者都知道什么是构造函数,但是对复制构造函数(copy constructor)却还很陌生。对于我来说,在写代码的时候能用得上复制构造函数的机会并不多,不过这并不说明复制构造函数没什么用,其实复制构造…
VMware上实现LVS负载均衡(NAT)
本文LVS的实现方式採用NAT模式。关于NAT的拓扑图请參照我的上一篇文章。本文纯粹实验。NAT在生产环境中不推荐使用。原因是Load Balancereasy成为瓶颈! 1.VMware9上安装CentOS-6.5-x86_64-minimal版 2.安装完毕后将其hostname设置为LVS-master hostname LVS-master …

java se13安装教程_在Linux发行版中安装Java 13/OpenJDK 13的方法
本文介绍在Linux发行版Ubuntu 18.04/16.04、Debian 10/9、CentOS 7/8、Fedora 31/30/29中安装Java 13/OpenJDK 13、Java SE Development Kit 13的方法。在Ubuntu 18.04/16.04、Debian 10/9、CentOS 7/8、Fedora 31/30/29中安装OpenJDK 13访问JDK 13版本页面以下载最新的版本&am…

Java api 入门教程 之 JAVA的IO处理
IO是输入和输出的简称,在实际的使用时,输入和输出是有方向的。就像现实中两个人之间借钱一样,例如A借钱给B,相对于A来说是借出,而相对于B来说则是借入。所以在程序中提到输入和输出时,也需要区分清楚是相对…

如何编辑PDF文件,PDF编辑器如何使用
如何编辑PDF呢?其实大多数人都不知道该如何下手,部分人会选择将PDF文件转换成Word然后进行编辑,其实这种方法比较麻烦,大大拉低了我们的工作效率。如果想要提高工作效率更加快速的编辑PDF文件,就可以选择迅捷PDF编辑器…

hdu 4366 Card Collector (容斥原理)
http://acm.hdu.edu.cn/showproblem.php?pid4336题意:有 n 张卡片 ,每张卡片出现的 概率 是 pi 每包至多有 一张卡片 ,也有可能没有 卡片 。求 需要买多少包 才能集齐 n 张卡片 ,求包数的 期望 。题解 : 容斥原理…

java语言二维数组转置_java实现二维数组转置的方法示例
本文实例讲述了java实现二维数组转置的方法。分享给大家供大家参考,具体如下:这里在文件中创建Test2、Exchange、Out三个类在Exchange类中编写exchange()方法,在方法中创建两个数组arraryA、arraryB,arraryB[j][i]arraryA[i][j]实…

使用Apache cxf 和Spring在Tomcat下发布Webservice指南
转载 http://blog.csdn.net/zhangzhaokun/article/details/4750021 最近学习了如何使用apache cxf和Spring发布webservice,虽然网上的资料很多,但是没有一个文档可以让读者按照操作步骤来实现完整的发布流程,都需要多篇文件杂合在一起&#x…

srcache_nginx redis 构建缓存系统应用一例
为什么80%的码农都做不了架构师?>>> srcache_nginx模块相关参数介绍,可以参见 《memc_nginxsrcache_nginxmemcached构建透明的动态页面缓存》。 redis是一种高效的key-value存储。 下面举一例应用,看配置: upstream r…

mysql 删除 修改密码_Mysql数据库root密码忘记了,如何在不删除Mysql的情况下修改密码...
1.cmd中使用 net stop mysql 命令停掉正在运行的mysql 数据库。2.在本地中复制Mysql数据库的安装路径一直到bin路径下。3.到cmd执行 "pushd 步骤2复制路径" 的命令,就会到Mysql数据库安装的bin路径下。4.紧接著执行 mysqld --skip-grant-tables 命令…

通用权限管理系统组件 (GPM - General Permissions Manager) 权限管理以前我们都是自己开发,可是到下一个系统又不适用,又改,加上人员流动大,管理很混乱...
为什么80%的码农都做不了架构师?>>> 权限管理以前我们都是自己开发,可是到下一个系统又不适用,又改,加上人员流动大,管理很混乱 Ψ吉日嘎拉 采用通用权限管理系统,这些烦恼就少了很多了&#x…

【小白的CFD之旅】16 流程
那天听了小牛师兄关于CFD应用的四种境界的说法后,小白发现自己连第一种境界都算不上,自己对于CFD还只是停留在做了少数几个案例的基础上,可以说是对其一无所知。不过小白不是那种遇到挫折就退缩的人,他决定沿着黄师姐的方法从软件…

XWiki 4.3 正式版发布
XWiki 4.3 正式版发布了,工作空间、扩展管理器、分发向导和 REST API 做了很多改进,改进了翻译和新的体验的 Solr 搜索。 XWiki是一个由Java编写的基于LGPL协议发布的开源wiki和应用平台。它的开发平台特性允许创建协作式Web应用,同时也提供了…

新建异常并处理java_java – 动态创建异常的工厂模式
我创建了Exception xml并动态创建并抛出异常.com.package.CheckedExceptionChecked Exception Messagecom.package.UnCheckedExceptionUnChecked Exception Message我根据异常键使用反射动态创建异常对象.public static void throwException(final String key) throws CheckedE…

React navtive
http://www.linuxidc.com/Linux/2015-09/123239.htm 转载于:https://www.cnblogs.com/chenzhenfj/p/5203685.html

c# Pdf 转换图片
1,引入 dll itextsharp.dll、 PDFView.dll、 把 gsdll32.dll 拷贝在项目 bin目录下 ,注意:它不能 直接引用 直接上代码: 1 /// <summary>2 /// 将PDF 相应的页转换为图片3 /// </summary>4 …

Entity Framework 约定
约定,类似于接口,是一个规范和规则,使用Code First 定义约定来配置模型和规则。在这里约定只是记本规则,我们可以通过Data Annotaion或者Fluent API来进一步配置模型。约定的形式有如下几种: 类型发现约定主键约定关系…

java用构造方法定义book类_JAVA基础学习之路(三)类定义及构造方法
类的定义及使用一,类的定义classBook {//定义一个类intprice;//定义一个属性intnum;public static int getMonney(int price, intnum) {//定义一个方法return price*num;}}public classtest2 {public static voidmain(String args[]) {Book monney newBook();//声明…

Linux下如何查看文档的内容
查看文档内容的命令有:cat tac head nl tail more less odcat命令显示文档的全部内容,当文档较大的时候只显示最后的部分,所以cat命令适合查看内容较少的文档。可加选项-n显示行数(此时空白行也会显示行号)。-b空白行则不显示行号。tac与cat显…

Java中getResourceAsStream的用法
Java中getResourceAsStream的用法 首先,Java中的getResourceAsStream有以下几种:1. Class.getResourceAsStream(String path) : path 不以’/开头时默认是从此类所在的包下取资源,以’/开头则是从 ClassPath根下获取。其只是通过p…

Asp.Net MVC3 简单入门详解过滤器Filter
为什么80%的码农都做不了架构师?>>> 前言 在开发大项目的时候总会有相关的AOP面向切面编程的组件,而MVC(特指:Asp.Net MVC,以下皆同)项目中不想让MVC开发人员去关心和写类似身份验证࿰…

mysql 锁语句_mysql-笔记 事务 锁 语句
Start Transaction 或 begin [work] 开始一个事务,开始一个事务,引起其他未提交的事务提交,引起表锁释放commit 提交事务,永久修改rollback 回滚事务,撤消修改set autocommit 在当前会话状态下 启用或不启用 autocommi…

【收藏】Java多线程/并发编程大合集
(一)、【Java并发编程】并发编程大合集-兰亭风雨 【Java并发编程】实现多线程的两种方法 【Java并发编程】线程的中断 【Java并发编程】正确挂起、恢复、终止线程 【Java并发编程】守护线程和线程阻塞 【Java并发编程】Volatile关键字(上&…

《深入理解Android:Wi-Fi,NFC和GPS》章节连载[节选]--第六章 深入理解wi-Fi Simple Configuration...
为什么80%的码农都做不了架构师?>>> 首先感谢各位兄弟姐妹们的耐心等待。本书预计在4月上市发售。从今天开始,我将在博客中连载此书的一些内容。注意,此处连载的是未经出版社编辑的原始稿件,所以样子会有些非专业。 …

iOS 的本地化使用和创建过程
在使用本地化语言之前,来看看本地化语言文件内容的结构(这里我以Chinese为例):"Cancel""取消";"OK""确定";"Tip""信息提示";"Login Faild""登陆失败…