当前位置: 首页 > 编程日记 > 正文

kafka生产者、消费者java示例

1. 生产者

import java.util.Properties; import kafka.javaapi.producer.Producer; 
import kafka.producer.KeyedMessage; 
import kafka.producer.ProducerConfig; public class MyProducer {   public static void main(String[] args) {   Properties props = new Properties();   props.setProperty("metadata.broker.list","localhost:9092");   props.setProperty("serializer.class","kafka.serializer.StringEncoder");   props.put("request.required.acks","1");   ProducerConfig config = new ProducerConfig(props);   //创建生产这对象Producer<String, String> producer = new Producer<String, String>(config);//生成消息KeyedMessage<String, String> data = new KeyedMessage<String, String>("mykafka","test-kafka");try {   int i =1; while(i < 100){    //发送消息
                    producer.send(data);   } } catch (Exception e) {   e.printStackTrace();   }   producer.close();   }   
}
View Code

2. 消费者

import java.util.HashMap; 
import java.util.List;   
import java.util.Map;   
import java.util.Properties;   import kafka.consumer.ConsumerConfig;   
import kafka.consumer.ConsumerIterator;   
import kafka.consumer.KafkaStream;   
import kafka.javaapi.consumer.ConsumerConnector;  public class MyConsumer extends Thread{ //消费者连接private final ConsumerConnector consumer;   //要消费的话题private final String topic;   public MyConsumer(String topic) {   consumer =kafka.consumer.Consumer   .createJavaConsumerConnector(createConsumerConfig());   this.topic =topic;   }   //配置相关信息private static ConsumerConfig createConsumerConfig() {   Properties props = new Properties();   
//        props.put("zookeeper.connect","localhost:2181,10.XX.XX.XX:2181,10.XX.XX.XX:2181");//配置要连接的zookeeper地址与端口//The ‘zookeeper.connect’ string identifies where to find once instance of Zookeeper in your cluster.//Kafka uses ZooKeeper to store offsets of messages consumed for a specific topic and partition by this Consumer Groupprops.put("zookeeper.connect","localhost:2181");//配置zookeeper的组id (The ‘group.id’ string defines the Consumer Group this process is consuming on behalf of.)props.put("group.id", "0");//配置zookeeper连接超时间隔//The ‘zookeeper.session.timeout.ms’ is how many milliseconds Kafka will wait for //ZooKeeper to respond to a request (read or write) before giving up and continuing to consume messages.props.put("zookeeper.session.timeout.ms","10000"); //The ‘zookeeper.sync.time.ms’ is the number of milliseconds a ZooKeeper ‘follower’ can be behind the master before an error occurs.props.put("zookeeper.sync.time.ms", "200");//The ‘auto.commit.interval.ms’ setting is how often updates to the consumed offsets are written to ZooKeeper. //Note that since the commit frequency is time based instead of # of messages consumed, if an error occurs between updates to ZooKeeper on restart you will get replayed messages.props.put("auto.commit.interval.ms", "1000");return new ConsumerConfig(props);   }   public void run(){ Map<String,Integer> topickMap = new HashMap<String, Integer>();   topickMap.put(topic, 1);   Map<String, List<KafkaStream<byte[],byte[]>>>  streamMap =consumer.createMessageStreams(topickMap);   KafkaStream<byte[],byte[]>stream = streamMap.get(topic).get(0);   ConsumerIterator<byte[],byte[]> it =stream.iterator();   System.out.println("*********Results********");   while(true){   if(it.hasNext()){ //打印得到的消息   System.err.println(Thread.currentThread()+" get data:" +new String(it.next().message()));   } try {   Thread.sleep(1000);   } catch (InterruptedException e) {   e.printStackTrace();   }   }   }  public static void main(String[] args) {   MyConsumer consumerThread = new MyConsumer("mykafka");   consumerThread.start();   }   
}
View Code

3. 消费者的线程执行器实现

首先建立一个处理消息的类Consumer

import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
public class Consumer implements Runnable {private KafkaStream stream;private int threadNumber;public Consumer(KafkaStream a_stream, int a_threadNumber) {threadNumber = a_threadNumber;stream = a_stream;}public void run() {ConsumerIterator<byte[], byte[]> it = stream.iterator();while (it.hasNext())System.out.println("Thread " + threadNumber + ": " + new String(it.next().message()));System.out.println("Shutting down Thread: " + threadNumber);}
}
View Code

其次实现多线程的调用

import kafka.consumer.ConsumerConfig;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;import java.util.concurrent.*; 
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;public class ConsumerGroup {private final ConsumerConnector consumer;private final String topic;private  ExecutorService executor;public ConsumerGroup(String a_zookeeper, String a_groupId, String a_topic) {consumer = kafka.consumer.Consumer.createJavaConsumerConnector(createConsumerConfig(a_zookeeper, a_groupId));this.topic = a_topic;}public void shutdown() {if (consumer != null) consumer.shutdown();if (executor != null) executor.shutdown();try {if (!executor.awaitTermination(5000, TimeUnit.MILLISECONDS)) {System.out.println("Timed out waiting for consumer threads to shut down, exiting uncleanly");}} catch (InterruptedException e) {System.out.println("Interrupted during shutdown, exiting uncleanly");}}public void run(int a_numThreads) {Map<String, Integer> topicCountMap = new HashMap<String, Integer>();topicCountMap.put(topic, new Integer(a_numThreads));Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap);List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topic);// now launch all the threads//
        executor = Executors.newFixedThreadPool(a_numThreads);// now create an object to consume the messages//
        int threadNumber = 0;for (final KafkaStream stream : streams) {executor.submit(new Consumer(stream, threadNumber));threadNumber++;}}private static ConsumerConfig createConsumerConfig(String a_zookeeper, String a_groupId) {Properties props = new Properties();props.put("zookeeper.connect", a_zookeeper);props.put("group.id", a_groupId);props.put("zookeeper.session.timeout.ms", "400");props.put("zookeeper.sync.time.ms", "200");props.put("auto.commit.interval.ms", "1000");return new ConsumerConfig(props);}public static void main(String[] args) {String zooKeeper = "localhost:2181";String groupId = "0";String topic = "mykafka";int threads = 2;  //启动的线程数
 ConsumerGroup group = new ConsumerGroup(zooKeeper, groupId, topic);group.run(threads);try {Thread.sleep(10000);} catch (InterruptedException ie) {}group.shutdown();}
}
View Code

相关文章:

bug诞生记——临时变量、栈变量导致的双杀

这是《bug诞生记》的第一篇文章。本来想起个文艺点的名字&#xff0c;比如《Satan&#xff08;撒旦&#xff09;来了》&#xff0c;但是最后还是想让这系列的重心放在“bug的产生过程”和“缺失的知识点”上&#xff0c;于是就有了本系列这个稍微中性的名称。&#xff08;转载请…

波士顿动力的机器狗上班了!巡逻、检测不在话下,挪威公司为其编发工号

作者 | 神经小刀来源 | HyperAI超神经&#xff08;ID: HyperAI&#xff09;AI 界的网红&#xff0c;波士顿动力机器狗 Spot &#xff0c;近日获得了新的工作岗位&#xff0c;它将入职挪威的石油天然气开采公司 Aker &#xff0c;负责巡检工作&#xff0c;还破天荒地获得了工牌和…

C语言运算符的优先级

() [] -> . (--->) ! ~ -- - (type类型) * & sizeof (<---) 单目运算符 * / % (--->) - (--->) << >> (--->) 移位运算符 < < > > (--->) 关系运算符 ! (--->) 等于或不等于&…

Java 多线程(六) synchronized关键字详解

多线程的同步机制对资源进行加锁&#xff0c;使得在同一个时间&#xff0c;只有一个线程可以进行操作&#xff0c;同步用以解决多个线程同时访问时可能出现的问题。 同步机制可以使用synchronized关键字实现。 当synchronized关键字修饰一个方法的时候&#xff0c;该方法叫做同…

bug诞生记——const_cast引发只读数据区域写违例

对于C这种强类型的语言&#xff0c;明确的类型既带来了执行的高效&#xff0c;又让错误的发生提前到编译期。所以像const这类体现设计者意图的关键字&#xff0c;可以隐性的透露给我们它描述的对象的使用边界。它是我们的朋友&#xff0c;我们要学会和它相处&#xff0c;而不是…

听说Java被玩儿淘汰了?骨灰级程序员:远比你们想象的强大!

听过Java快被淘汰了&#xff1f;告诉你&#xff1a;没那么容易&#xff01;Java从诞生至今&#xff0c;已经走过了20多年的历史&#xff0c;虽然相比新型的技术语言算是“老古董”&#xff0c;但是它的应用依然最为广泛&#xff0c;并且有着非常成熟的生态。而且我相信&#xf…

RapidFramework使用Oracle的步骤

1.在lib包中加入Oracle驱动2.修改generator.properties,注释掉mysql,打开oracle注释 jdbc.urljdbc:oracle:thin:127.0.0.1:1521:orcljdbc.driveroracle.jdbc.driver.OracleDriver 3.设置jdbc.schemaMEETRICEjdbc.catalog 注意:jdbc.schema的值为大写 4.修改jdbc.properties jdb…

拇指接龙游戏升级记录03(升级MainScene.cpp)

MainScene是拇指接龙游戏的主游戏场景文件&#xff0c;拥有近5000行代码。说实在的&#xff0c;实现自cocos2d-x 2.x版本向当下最新的3.8.1版本的升级过程&#xff0c;其中涉及的技术不是一下能够说明的。有些是形式上的简单修改&#xff0c;更多的则是性能上的提升相应的修改。…

Flex与.NET互操作(十二):FluorineFx.Net的及时通信应用(Remote Shared Objects)(三)

远程共享对象(Remote Shared Objects) 可以用来跟踪、存储、共享以及做多客户端的数据同步操作。只要共享对象上的数据发生了改变&#xff0c;将会把最新数据同步到所有连接到该共享对象的应用程序客户端。FluorineFx所提供的远程共享对象(Remote Shared Objects)和FMS的共享对…

ffmpeg api的应用——提取视频图片

这些年来&#xff0c;“短视频”吸引了无数网民的注意。相对于丰富有趣的内容&#xff0c;我们码农可能更关心其底层技术实现。本系列文章将结合ffmpeg&#xff0c;讲解几则视频处理案例。&#xff08;转载请指明出于breaksoftware的csdn博客&#xff09; “短视频”都是以“文…

蚂蚁金服AAAI论文:基于长短期老师的样本蒸馏方法和自动车险定损系统的最新突破...

来源 | 蚂蚁金服出品 | AI科技大本营&#xff08;ID:rgznai100&#xff09;一年一度在人工智能方向的顶级会议之一AAAI 2020于2月7日至12日在美国纽约举行&#xff0c;旨在汇集世界各地的人工智能理论和领域应用的最新成果。以下是蚂蚁金服的技术专家对入选论文《基于长短期老师…

C# 实现HTML转换成图片的方法

/// <summary> /// 通过WebBrowser控件来实现从HTML到Bmp图片的生成。 /// </summary> /// <param name"htmPath">HTML路径</param> /// <returns>Bmp图片路径</returns> private static st…

一套使用注入和Hook技术托管入口函数的方案

工作中&#xff0c;我们可能会经常使用开源项目解决一些领域中的问题。这种“拿来主义”是一种“专业人干专业事”的思想&#xff0c;非常实用。&#xff08;转载请指明出于breaksoftware的csdn博客&#xff09; 一般场景下&#xff0c;我们都是把开源项目代码编译到我们自己的…

微软发布虚机管理SCVMM 2008 R2 RC版

来源&#xff1a;IT168服务器频道近日&#xff0c;微软发布了SCVMM&#xff08;系统中心虚拟机管理器&#xff0c;System Center Virtual Machine Manager&#xff09;2008 R2的RC版本。据了解&#xff0c;新的SCVMM相比去年的版本增加了六大新功能&#xff0c;目前用户可以从W…

AI芯片行业发展的来龙去脉

作者 | 清华大学微电子学研究所尹首一来源 | 《微纳电子与智能制造》期刊引言人 工 智 能( aritificial intelligence &#xff0c;AI )是 一 门融合了数学 、计算机科学 、统计学 、脑神经学和社会科学 的前沿综合性技术。它的目标是希望计算机可以像 人一样思考 &#xff0c;…

8)排序②排序算法之选择排序[1]直接选择排序

1 #include<iostream>2 using namespace std;3 4 //*******直接选择排序*********5 int select_sort(int n,int array[100]){6 int i,j;7 for(i0;i<n;i){8 for(ji;j<n;j){9 if(array[i]>array[j]){ 10 int temparr…

一份招聘需求的分析

今早&#xff0c;长期合作伙伴又给我们一份招聘需求&#xff0c;以下是招聘条件&#xff1a; 工作性质&#xff1a;全职 工作地点&#xff1a;南京 发布日期&#xff1a;2009/6/15 截止日期&#xff1a;2009/6/30 招聘人数&#xff1a;6 工作经验&#xff1a;不限 学  历&…

bug诞生记——隐蔽的指针偏移计算导致的数据错乱

C语言为了兼容C语言&#xff0c;做了很多设计方面的考量。但是有些兼容设计产生了不清晰的认识。本文就将讨论一个因为认知不清晰而导致的bug。&#xff08;转载请指明出于breaksoftware的csdn博客&#xff09; class Base { public:Base() default;void set_v_b(int v_b) {_…

福利直投!这个活动承包你2020全年技术干货

CSDN技术公开课有奖评选开始啦~~听过课的小伙伴们&#xff0c;哪位讲师的分享让你获益匪浅&#xff1f;记得给TA投票哦&#xff01;投票后获取入群方式&#xff0c;参与抽奖&#xff0c;奖品很丰厚哦~~进入付费时代&#xff0c;如今我们看似只要招招手&#xff0c;一切知识随手…

第四章 Controller接口控制器详解(5)——跟着开涛学SpringMVC

2019独角兽企业重金招聘Python工程师标准>>> 原创内容&#xff0c;转载请注明iteye http://jinnianshilongnian.iteye.com/ 4.15、MultiActionController 之前学过的控制器如AbstractCommandController、SimpleFormController等一般对应一个功能处理方法&#xff…

自动机器学习:团队如何在自动学习项目中一起工作?(附链接)

来源 | 数据派THU作者 | Francesca Lazzeri翻译 | 王琦责编 | Carol出品 | AI科技大本营&#xff08;ID:rgznai100&#xff09;去年11月&#xff0c;我写了一篇关于使用自动机器学习来进行AI民主化&#xff08;democratization&#xff09;的文章&#xff08;见下面链接&#x…

C++拾趣——STL容器的插入、删除、遍历和查找操作性能对比(ubuntu g++)——插入

操作系统是ubuntu 18.04.1 server amd64&#xff0c;gcc是 7.3.0。编译产出是64位测试程序。&#xff08;转载请指明出于breaksoftware的csdn博客&#xff09; 因为加入测量&#xff0c;就会导致误差。我已经尽量将环境影响降低&#xff0c;但是还是难免有误差。大家可以通过文…

SSIS中的记录集目标

这一篇&#xff0c;我们来看看另外一个特殊的目标组件&#xff1a;记录集目标。它与DataReader目标有些类似&#xff0c;也是在内存中的。但与DataReader目标不同的是&#xff0c;它可以被下游任务使用。 它的使用也比较简单&#xff0c;我们一般指定一个变量来接收它的结果&am…

Leetcode: Maximum Depth of Binary Tree

题目&#xff1a;算出二叉树的最大深度 解决方案&#xff1a;&#xff08;1&#xff09;BFS &#xff08;2&#xff09;DFS (1)BFS 一层一层往下搜索&#xff0c;一直找到最深的点&#xff0c;这里由于节点的val是没有用的&#xff0c;所以可以用来存储当前节点的深度&#xff…

C++拾趣——STL容器的插入、删除、遍历和查找操作性能对比(ubuntu g++)——删除

相关环境和说明在《C拾趣——STL容器的插入、删除、遍历和查找操作性能对比&#xff08;ubuntu g&#xff09;——插入》已给出。本文将分析从头部、中间和尾部对各个容器进行删除的性能。&#xff08;转载请指明出于breaksoftware的csdn博客&#xff09; 删除 头部删除 元素…

一文告诉你,如何使用Python构建一个“谷歌搜索”系统 | 内附代码

来源 | hackernoon编译 | 武明利责编 | Carol出品 | AI科技大本营&#xff08;ID:rgznai100&#xff09;在这篇文章中&#xff0c;我将向您展示如何使用Python构建自己的答案查找系统。基本上&#xff0c;这种自动化可以从图片中找到多项选择题的答案。有一件事我们要清楚&…

WatchStor观察:思科携EMC等合作伙伴 圈地数据中心市场

早在今年3月&#xff0c;思科在加利福尼亚州圣何塞市展会中展示了“统一计算系统”(Unified Computing System)之后&#xff0c;我们就明白&#xff0c;数据中心市场将会发生巨大改变&#xff0c;传统的以IBM、惠普、戴尔和Sun为主导的服务器电脑市场&#xff0c;将受到以思科为…

使用BabeLua3.x在cocos2d-x中编辑和调试Lua

BabeLua是一款基于VS2012/2013的Lua集成开发环境&#xff0c;具有Lua语法高亮&#xff0c;语法检查&#xff0c;自动补全&#xff0c;快速搜索&#xff0c;注入宿主程序内对Lua脚本进行调试&#xff0c;设置断点观察变量值&#xff0c;查看堆栈信息等功能。 如何安装 请参考《系…

ASA与PIX的区别

很多年来&#xff0c;Cisco PIX一直都是Cisco确定的防火墙。但是在2005年5月&#xff0c;Cisco推出了一个新的产品——适应性安全产品&#xff08;ASA&#xff0c;Adaptive Security Appliance&#xff09;。不过&#xff0c;PIX还依旧可用。我已听到很多人在多次询问这两个产品…

C++拾趣——STL容器的插入、删除、遍历和查找操作性能对比(ubuntu g++)——遍历和查找

相关环境和说明在《C拾趣——STL容器的插入、删除、遍历和查找操作性能对比&#xff08;ubuntu g&#xff09;——插入》已给出。本文将分析各个容器中遍历和查找的性能。&#xff08;转载请指明出于breaksoftware的csdn博客&#xff09; 遍历 从前往后 元素个数>15000 t…