当前位置: 首页 > 编程日记 > 正文

java kafka 集群消费_kafka集群搭建和使用Java写kafka生产者消费者

转自:http://chengjianxiaoxue.iteye.com/blog/2190488

1 kafka集群搭建

1.zookeeper集群 搭建在110, 111,112

2.kafka使用3个节点110, 111,112修改配置文件config/server.properties

broker.id=110host.name=192.168.1.110log.dirs=/usr/local/kafka_2.10-0.8.2.0/logs

复制到其他两个节点,然后修改对应节点上的config/server.pro3.启动,在三个节点分别执行

bin/kafka-server-start.sh config/server.properties >/dev/null 2>&1 &

4创建主题

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 3 --topic test5查看主题详细

bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic test--topic test

Topic:test PartitionCount:3 ReplicationFactor:3Configs:

Topic: test Partition:0 Leader: 110 Replicas: 110,111,112 Isr: 110,111,112Topic: test Partition:1 Leader: 111 Replicas: 111,112,110 Isr: 111,112,110Topic: test Partition:2 Leader: 112 Replicas: 112,110,111 Isr: 112,110,111

6去zk上看kafka集群

[zk: localhost:2181(CONNECTED) 5] ls /[admin, zookeeper, consumers, config, controller, zk-fifo, storm, brokers, controller_epoch]

[zk: localhost:2181(CONNECTED) 6] ls /brokers ---->查看注册在zk内的kafka

[topics, ids]

[zk: localhost:2181(CONNECTED) 7] ls /brokers/ids

[112, 110, 111]

[zk: localhost:2181(CONNECTED) 8] ls /brokers/ids/112[]

[zk: localhost:2181(CONNECTED) 9] ls /brokers/topics

[test]

[zk: localhost:2181(CONNECTED) 10] ls /brokers/topics/test

[partitions]

[zk: localhost:2181(CONNECTED) 11] ls /brokers/topics/test/partitions

[2, 1, 0]

[zk: localhost:2181(CONNECTED) 12]

2  kafka java调用:

2.1 java端生产数据, kafka集群消费数据:

1 创建maven工程,pom.xml中增加如下:

org.apache.kafka

kafka_2.10

0.8.2.0

2 java代码: 向主题test内写入数据

import java.util.Properties;

import java.util.concurrent.TimeUnit;

import kafka.javaapi.producer.Producer;

import kafka.producer.KeyedMessage;

import kafka.producer.ProducerConfig;

import kafka.serializer.StringEncoder;

public class kafkaProducer extends Thread{

private String topic;

public kafkaProducer(String topic){

super();

this.topic = topic;

}

@Override

public void run() {

Producer producer = createProducer();

int i=0;

while(true){

producer.send(new KeyedMessage(topic, "message: " + i++));

try {

TimeUnit.SECONDS.sleep(1);

} catch (InterruptedException e) {

e.printStackTrace();

}

}

}

private Producer createProducer() {

Properties properties = new Properties();

properties.put("zookeeper.connect", "192.168.1.110:2181,192.168.1.111:2181,192.168.1.112:2181");//声明zk

properties.put("serializer.class", StringEncoder.class.getName());

properties.put("metadata.broker.list", "192.168.1.110:9092,192.168.1.111:9093,192.168.1.112:9094");// 声明kafka broker

return new Producer(new ProducerConfig(properties));

}

public static void main(String[] args) {

new kafkaProducer("test").start();// 使用kafka集群中创建好的主题 test

}

}

3 kafka集群中消费主题test的数据:

[root@h2master kafka]# bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginnin

4 启动java代码,然后在看集群消费的数据如下:

message: 0

message: 1

message: 2

message: 3

message: 4

message: 5

message: 6

message: 7

message: 8

message: 9

message: 10

message: 11

message: 12

message: 13

message: 14

message: 15

message: 16

message: 17

message: 18

message: 19

message: 20

message: 21

3 kafka 使用Java写消费者,这样 先运行kafkaProducer ,在运行kafkaConsumer,即可得到生产者的数据:

importjava.util.HashMap;importjava.util.List;importjava.util.Map;importjava.util.Properties;importkafka.consumer.Consumer;importkafka.consumer.ConsumerConfig;importkafka.consumer.ConsumerIterator;importkafka.consumer.KafkaStream;importkafka.javaapi.consumer.ConsumerConnector;/*** 接收数据

* 接收到: message: 10

接收到: message: 11

接收到: message: 12

接收到: message: 13

接收到: message: 14

*@authorzm

**/

public class kafkaConsumer extendsThread{privateString topic;publickafkaConsumer(String topic){super();this.topic =topic;

}

@Overridepublic voidrun() {

ConsumerConnector consumer=createConsumer();

Map topicCountMap = new HashMap();

topicCountMap.put(topic,1); //一次从主题中获取一个数据

Map>> messageStreams =consumer.createMessageStreams(topicCountMap);

KafkaStream stream = messageStreams.get(topic).get(0);//获取每次接收到的这个数据

ConsumerIterator iterator =stream.iterator();while(iterator.hasNext()){

String message= newString(iterator.next().message());

System.out.println("接收到: " +message);

}

}privateConsumerConnector createConsumer() {

Properties properties= newProperties();

properties.put("zookeeper.connect", "192.168.1.110:2181,192.168.1.111:2181,192.168.1.112:2181");//声明zk

properties.put("group.id", "group1");//必须要使用别的组名称, 如果生产者和消费者都在同一组,则不能访问同一组内的topic数据

return Consumer.createJavaConsumerConnector(newConsumerConfig(properties));

}public static voidmain(String[] args) {new kafkaConsumer("test").start();//使用kafka集群中创建好的主题 test

}

}

相关文章:

Java之替换“\n”符号

开发平台:Android 4.1.2 在去除字符串中的换行符(\n)的时候,写成str.replace("\\n", "")才能正确执行。str.replace("\n","") ,str.replaceAll("\\n",""),str.repla…

ThinkPHP项目笔记之登录,注册,安全退出篇

1.先说注册 a.准备好注册页面,register.html,当然一般有,姓名,邮箱,地址等常用的。 b."不要相信用户提交的一切数据",安全,安全是第一位的。所以要做判断,客户端要做基本判断,为了防止…

c语言第八次作业

1.选择法排序。输入一个正整数n(1<n<10)&#xff0c;再输入n个整数&#xff0c;将他们从大到小排序后输出。试写出相应程序。 #include<stdio.h>int main (void){int i,index,k,n,t;int a[10];printf("输入数据的个数n:");scanf("%d",&n);…

java线程池的工作原理_Java 线程池的介绍以及工作原理

在什么情况下使用线程池&#xff1f;1.单个任务处理的时间比较短2.将需处理的任务的数量大使用线程池的好处:1. 降低资源消耗&#xff1a;      通过重复利用已创建的线程降低线程创建和销毁造成的消耗。2. 提高响应速度&#xff1a;      当任务到达时&#xff0c…

.NET 程序设计实验 含记事本通讯录代码

实验一 .NET 程序设计基本流程 【实验内容】 一、控制台、Windows 应用程序、ASP.NET 程序开发流程 1、熟悉开发平台 2、分别开发控制台、Windows 应用程序、ASP.NET 程序下显示“Hello world&#xff01;”的应用程序&#xff0c; 掌握新建、基本输入输出、程序流程、程序调试…

复制构造函数(拷贝构造函数)

也许很多C的初学者都知道什么是构造函数&#xff0c;但是对复制构造函数&#xff08;copy constructor&#xff09;却还很陌生。对于我来说&#xff0c;在写代码的时候能用得上复制构造函数的机会并不多&#xff0c;不过这并不说明复制构造函数没什么用&#xff0c;其实复制构造…

VMware上实现LVS负载均衡(NAT)

本文LVS的实现方式採用NAT模式。关于NAT的拓扑图请參照我的上一篇文章。本文纯粹实验。NAT在生产环境中不推荐使用。原因是Load Balancereasy成为瓶颈&#xff01; 1.VMware9上安装CentOS-6.5-x86_64-minimal版 2.安装完毕后将其hostname设置为LVS-master hostname LVS-master …

java se13安装教程_在Linux发行版中安装Java 13/OpenJDK 13的方法

本文介绍在Linux发行版Ubuntu 18.04/16.04、Debian 10/9、CentOS 7/8、Fedora 31/30/29中安装Java 13/OpenJDK 13、Java SE Development Kit 13的方法。在Ubuntu 18.04/16.04、Debian 10/9、CentOS 7/8、Fedora 31/30/29中安装OpenJDK 13访问JDK 13版本页面以下载最新的版本&am…

Java api 入门教程 之 JAVA的IO处理

IO是输入和输出的简称&#xff0c;在实际的使用时&#xff0c;输入和输出是有方向的。就像现实中两个人之间借钱一样&#xff0c;例如A借钱给B&#xff0c;相对于A来说是借出&#xff0c;而相对于B来说则是借入。所以在程序中提到输入和输出时&#xff0c;也需要区分清楚是相对…

如何编辑PDF文件,PDF编辑器如何使用

如何编辑PDF呢&#xff1f;其实大多数人都不知道该如何下手&#xff0c;部分人会选择将PDF文件转换成Word然后进行编辑&#xff0c;其实这种方法比较麻烦&#xff0c;大大拉低了我们的工作效率。如果想要提高工作效率更加快速的编辑PDF文件&#xff0c;就可以选择迅捷PDF编辑器…

hdu 4366 Card Collector (容斥原理)

http://acm.hdu.edu.cn/showproblem.php?pid4336题意&#xff1a;有 n 张卡片 &#xff0c;每张卡片出现的 概率 是 pi 每包至多有 一张卡片 &#xff0c;也有可能没有 卡片 。求 需要买多少包 才能集齐 n 张卡片 &#xff0c;求包数的 期望 。题解 &#xff1a; 容斥原理…

java语言二维数组转置_java实现二维数组转置的方法示例

本文实例讲述了java实现二维数组转置的方法。分享给大家供大家参考&#xff0c;具体如下&#xff1a;这里在文件中创建Test2、Exchange、Out三个类在Exchange类中编写exchange()方法&#xff0c;在方法中创建两个数组arraryA、arraryB&#xff0c;arraryB[j][i]arraryA[i][j]实…

使用Apache cxf 和Spring在Tomcat下发布Webservice指南

转载 http://blog.csdn.net/zhangzhaokun/article/details/4750021 最近学习了如何使用apache cxf和Spring发布webservice&#xff0c;虽然网上的资料很多&#xff0c;但是没有一个文档可以让读者按照操作步骤来实现完整的发布流程&#xff0c;都需要多篇文件杂合在一起&#x…

srcache_nginx redis 构建缓存系统应用一例

为什么80%的码农都做不了架构师&#xff1f;>>> srcache_nginx模块相关参数介绍&#xff0c;可以参见 《memc_nginxsrcache_nginxmemcached构建透明的动态页面缓存》。 redis是一种高效的key-value存储。 下面举一例应用&#xff0c;看配置&#xff1a; upstream r…

mysql 删除 修改密码_Mysql数据库root密码忘记了,如何在不删除Mysql的情况下修改密码...

1.cmd中使用 net stop mysql 命令停掉正在运行的mysql 数据库。2.在本地中复制Mysql数据库的安装路径一直到bin路径下。3.到cmd执行 "pushd 步骤2复制路径" 的命令&#xff0c;就会到Mysql数据库安装的bin路径下。4.紧接著执行 mysqld --skip-grant-tables 命令…

通用权限管理系统组件 (GPM - General Permissions Manager) 权限管理以前我们都是自己开发,可是到下一个系统又不适用,又改,加上人员流动大,管理很混乱...

为什么80%的码农都做不了架构师&#xff1f;>>> 权限管理以前我们都是自己开发&#xff0c;可是到下一个系统又不适用&#xff0c;又改&#xff0c;加上人员流动大&#xff0c;管理很混乱 Ψ吉日嘎拉 采用通用权限管理系统&#xff0c;这些烦恼就少了很多了&#x…

【小白的CFD之旅】16 流程

那天听了小牛师兄关于CFD应用的四种境界的说法后&#xff0c;小白发现自己连第一种境界都算不上&#xff0c;自己对于CFD还只是停留在做了少数几个案例的基础上&#xff0c;可以说是对其一无所知。不过小白不是那种遇到挫折就退缩的人&#xff0c;他决定沿着黄师姐的方法从软件…

XWiki 4.3 正式版发布

XWiki 4.3 正式版发布了&#xff0c;工作空间、扩展管理器、分发向导和 REST API 做了很多改进&#xff0c;改进了翻译和新的体验的 Solr 搜索。 XWiki是一个由Java编写的基于LGPL协议发布的开源wiki和应用平台。它的开发平台特性允许创建协作式Web应用&#xff0c;同时也提供了…

新建异常并处理java_java – 动态创建异常的工厂模式

我创建了Exception xml并动态创建并抛出异常.com.package.CheckedExceptionChecked Exception Messagecom.package.UnCheckedExceptionUnChecked Exception Message我根据异常键使用反射动态创建异常对象.public static void throwException(final String key) throws CheckedE…

React navtive

http://www.linuxidc.com/Linux/2015-09/123239.htm 转载于:https://www.cnblogs.com/chenzhenfj/p/5203685.html

c# Pdf 转换图片

1&#xff0c;引入 dll itextsharp.dll、 PDFView.dll、 把 gsdll32.dll 拷贝在项目 bin目录下 &#xff0c;注意&#xff1a;它不能 直接引用 直接上代码&#xff1a; 1 /// <summary>2 /// 将PDF 相应的页转换为图片3 /// </summary>4 …

Entity Framework 约定

约定&#xff0c;类似于接口&#xff0c;是一个规范和规则&#xff0c;使用Code First 定义约定来配置模型和规则。在这里约定只是记本规则&#xff0c;我们可以通过Data Annotaion或者Fluent API来进一步配置模型。约定的形式有如下几种&#xff1a; 类型发现约定主键约定关系…

java用构造方法定义book类_JAVA基础学习之路(三)类定义及构造方法

类的定义及使用一&#xff0c;类的定义classBook {//定义一个类intprice;//定义一个属性intnum;public static int getMonney(int price, intnum) {//定义一个方法return price*num;}}public classtest2 {public static voidmain(String args[]) {Book monney newBook();//声明…

Linux下如何查看文档的内容

查看文档内容的命令有&#xff1a;cat tac head nl tail more less odcat命令显示文档的全部内容&#xff0c;当文档较大的时候只显示最后的部分&#xff0c;所以cat命令适合查看内容较少的文档。可加选项-n显示行数(此时空白行也会显示行号)。-b空白行则不显示行号。tac与cat显…

Java中getResourceAsStream的用法

Java中getResourceAsStream的用法 首先&#xff0c;Java中的getResourceAsStream有以下几种&#xff1a;1. Class.getResourceAsStream(String path) &#xff1a; path 不以’/开头时默认是从此类所在的包下取资源&#xff0c;以’/开头则是从 ClassPath根下获取。其只是通过p…

Asp.Net MVC3 简单入门详解过滤器Filter

为什么80%的码农都做不了架构师&#xff1f;>>> 前言 在开发大项目的时候总会有相关的AOP面向切面编程的组件&#xff0c;而MVC&#xff08;特指&#xff1a;Asp.Net MVC&#xff0c;以下皆同&#xff09;项目中不想让MVC开发人员去关心和写类似身份验证&#xff0…

mysql 锁语句_mysql-笔记 事务 锁 语句

Start Transaction 或 begin [work] 开始一个事务&#xff0c;开始一个事务&#xff0c;引起其他未提交的事务提交&#xff0c;引起表锁释放commit 提交事务&#xff0c;永久修改rollback 回滚事务&#xff0c;撤消修改set autocommit 在当前会话状态下 启用或不启用 autocommi…

【收藏】Java多线程/并发编程大合集

&#xff08;一&#xff09;、【Java并发编程】并发编程大合集-兰亭风雨 【Java并发编程】实现多线程的两种方法 【Java并发编程】线程的中断 【Java并发编程】正确挂起、恢复、终止线程 【Java并发编程】守护线程和线程阻塞 【Java并发编程】Volatile关键字&#xff08;上&…

《深入理解Android:Wi-Fi,NFC和GPS》章节连载[节选]--第六章 深入理解wi-Fi Simple Configuration...

为什么80%的码农都做不了架构师&#xff1f;>>> 首先感谢各位兄弟姐妹们的耐心等待。本书预计在4月上市发售。从今天开始&#xff0c;我将在博客中连载此书的一些内容。注意&#xff0c;此处连载的是未经出版社编辑的原始稿件&#xff0c;所以样子会有些非专业。 …

iOS 的本地化使用和创建过程

在使用本地化语言之前&#xff0c;来看看本地化语言文件内容的结构&#xff08;这里我以Chinese为例&#xff09;&#xff1a;"Cancel""取消";"OK""确定";"Tip""信息提示";"Login Faild""登陆失败…