当前位置: 首页 > 编程日记 > 正文

SpringBoot实战(十四)之整合KafKa

本人今天上午参考了不少博文,发现不少博文不是特别好,不是因为依赖冲突问题就是因为版本问题。

于是我结合相关的博文和案例,自己改写了下并参考了下,于是就有了这篇文章。希望能够给大家帮助,少走一些弯路。

一、KafKa的介绍

1.主要功能

根据官网的介绍,ApacheKafka®是一个分布式流媒体平台,它主要有3种功能:

a.发布和订阅消息流,这个功能类似于消息队列,这也是kafka归类为消息队列框架的原因。

b.以容错的方式记录消息流,kafka以文件的方式来存储消息流。

c.可以再消息发布的时候进行处理。

2.使用场景

a.在系统或应用程序之间构建可靠的用于传输实时数据的管道,消息队列功能。

b.构建实时的流数据处理程序来变换或处理数据流,数据处理功能。

3.详细介绍

Kafka目前主要作为一个分布式的发布订阅式的消息系统使用,下面简单介绍一下kafka的基本机制

消息传输过程:

Producer即生产者,向Kafka集群发送消息,在发送消息之前,会对消息进行分类,即Topic,上图展示了两个producer发送了分类为topic1的消息,另外一个发送了topic2的消息。

Topic即主题,通过对消息指定主题可以将消息分类,消费者可以只关注自己需要的Topic中的消息

Consumer即消费者,消费者通过与kafka集群建立长连接的方式,不断地从集群中拉取消息,然后可以对这些消息进行处理。

二、安装

安装包下载地址:http://kafka.apache.org/downloads

找到0.11.0.1版本,如图:

1.下载

wget https://archive.apache.org/dist/kafka/0.11.0.1/kafka_2.11-0.11.0.1.tgz

2.解压

tar -xzvf kafka_2.11-0.11.0.1.tgz

配置说明:

consumer.properites 消费者配置,这个配置文件用于配置开启的消费者,此处我们使用默认的即可。

producer.properties 生产者配置,这个配置文件用于配置开启的生产者,此处我们使用默认的即可。

server.properties kafka服务器的配置,此配置文件用来配置kafka服务器,目前仅介绍几个最基础的配置。

a.broker.id 申明当前kafka服务器在集群中的唯一ID,需配置为integer,并且集群中的每一个kafka服务器的id都应是唯一的,我们这里采用默认配置即可。

b.listeners 申明此kafka服务器需要监听的端口号,如果是在本机上跑虚拟机运行可以不用配置本项,默认会使用localhost的地址,如果是在远程服务器上运行则必须配置,

例如:listeners=PLAINTEXT:// 192.168.126.143:9092。并确保服务器的9092端口能够访问。

c.zookeeper.connect 申明kafka所连接的zookeeper的地址 ,需配置为zookeeper的地址,由于本次使用的是kafka高版本中自带zookeeper,

使用默认配置即可,zookeeper.connect=localhost:2181。

3.运行

首先运行zookeeper

bin/zookeeper-server-start.sh config/zookeeper.properties

运行成功,显示如图:

然后运行kafka

bin/kafka-server-start.sh config/server.properties

运行成功,显示如图:

三、整合KafKa

1.新建Maven项目导入Maven依赖

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>cn.test</groupId><artifactId>kafka_demo</artifactId><version>0.0.1-SNAPSHOT</version><parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>1.5.9.RELEASE</version><relativePath/> <!-- lookup parent from repository --></parent><properties><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding><project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding><java.version>1.8</java.version></properties><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><optional>true</optional></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency><dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId><version>1.1.1.RELEASE</version></dependency><dependency><groupId>com.google.code.gson</groupId><artifactId>gson</artifactId><version>2.8.2</version></dependency></dependencies><build><plugins><plugin><groupId>org.springframework.boot</groupId><artifactId>spring-boot-maven-plugin</artifactId></plugin><!-- 指定编译版本 --><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-compiler-plugin</artifactId><configuration><source>1.8</source><target>1.8</target></configuration></plugin></plugins><finalName>${project.artifactId}</finalName></build></project>

2.编写消息实体

package com.springboot.kafka.bean;import java.util.Date;import lombok.Data;@Data
public class Message {private Long id;    //idprivate String msg; //消息private Date sendTime;  //时间戳}

有了lombok,每次编写实体不必要使用快捷键生成seter或geter方法了,代码看起来更加简洁了。

3.编写消息发送者(可以理解为生产者,最好联系详细介绍中的图)

package com.springboot.kafka.producer;import java.util.Date;
import java.util.UUID;import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import com.springboot.kafka.bean.Message;import lombok.extern.slf4j.Slf4j;@Component
@Slf4j
public class KafkaSender {@Autowiredprivate KafkaTemplate<String, String> kafkaTemplate;private Gson gson = new GsonBuilder().create();//发送消息方法public void send() {Message message = new Message();message.setId(System.currentTimeMillis());message.setMsg(UUID.randomUUID().toString());message.setSendTime(new Date());log.info("+++++++++++++++++++++  message = {}", gson.toJson(message));kafkaTemplate.send("zhisheng", gson.toJson(message));}
}

4.编写消息接收者(可以理解为消费者)

package com.springboot.kafka.producer;import java.util.Date;
import java.util.UUID;import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import com.springboot.kafka.bean.Message;import lombok.extern.slf4j.Slf4j;@Component
@Slf4j
public class KafkaSender {@Autowiredprivate KafkaTemplate<String, String> kafkaTemplate;private Gson gson = new GsonBuilder().create();//发送消息方法public void send() {Message message = new Message();message.setId(System.currentTimeMillis());message.setMsg(UUID.randomUUID().toString());message.setSendTime(new Date());log.info("+++++++++++++++++++++  message = {}", gson.toJson(message));kafkaTemplate.send("zhisheng", gson.toJson(message));}
}

5.编写启动类

package com.springboot.kafka;import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.ConfigurableApplicationContext;import com.springboot.kafka.producer.KafkaSender;@SpringBootApplication
public class KafkaApplication {public static void main(String[] args) {ConfigurableApplicationContext context = SpringApplication.run(KafkaApplication.class, args);KafkaSender sender = context.getBean(KafkaSender.class);for (int i = 0; i < 3; i++) {//调用消息发送类中的消息发送方法
            sender.send();try {Thread.sleep(3000);} catch (InterruptedException e) {e.printStackTrace();}}}
}

6.编写application.properties配置文件

#============== kafka ===================
# \u6307\u5B9Akafka \u4EE3\u7406\u5730\u5740\uFF0C\u53EF\u4EE5\u591A\u4E2A
spring.kafka.bootstrap-servers=192.168.126.143:9092#=============== provider  =======================spring.kafka.producer.retries=0
# \u6BCF\u6B21\u6279\u91CF\u53D1\u9001\u6D88\u606F\u7684\u6570\u91CF
spring.kafka.producer.batch-size=16384
spring.kafka.producer.buffer-memory=33554432# \u6307\u5B9A\u6D88\u606Fkey\u548C\u6D88\u606F\u4F53\u7684\u7F16\u89E3\u7801\u65B9\u5F0F
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer#=============== consumer  =======================
# \u6307\u5B9A\u9ED8\u8BA4\u6D88\u8D39\u8005group id
spring.kafka.consumer.group-id=test-consumer-groupspring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.enable-auto-commit=true
spring.kafka.consumer.auto-commit-interval=100# \u6307\u5B9A\u6D88\u606Fkey\u548C\u6D88\u606F\u4F53\u7684\u7F16\u89E3\u7801\u65B9\u5F0F
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer

7.运行结果

示例代码地址:https://github.com/youcong1996/study_simple_demo/tree/kafka_demo

如果按照上述流程没有达到预计的效果可以git clone到本地。

转载于:https://www.cnblogs.com/youcong/p/10216573.html

相关文章:

Java 对象的生命周期

Java对象的生命周期 在Java中&#xff0c;对象的生命周期包括以下几个阶段&#xff1a; 创建阶段(Created)应用阶段(In Use)不可见阶段(Invisible)不可达阶段(Unreachable)收集阶段(Collected)终结阶段(Finalized)对象空间重分配阶段(De-allocated) 图1. JavaObject Life Cyc…

ThinkPad -- Intel 无线网卡网络连接方法限制及无法用 Fn + F5 控制的问题

[url]http://www-900.ibm.com/cn/support/tscdoc/html/JLII-6R6E5C.HTML[/url]文章编号:JLII-6R6E5CThinkPad -- Intel 无线网卡网络连接方法限制及无法用 Fn F5 控制的问题适用机型:所有ThinkPad R50; 所有ThinkPad R50e; 所有ThinkPad R50p; 所有ThinkPad R51; 所有ThinkPad…

TreeSet集合(自然排序和比较器排序)

TreeSet集合自然排序和比较器排序 ​ 当指执行插入排序、希尔排序、归并排序等算法时&#xff0c;比较两个对象“大小”的比较操作。我们很容易理解整型的 i>j 这样的比较方式&#xff0c;但当我们对多个对象进行排序时&#xff0c;如何比较两个对象的“大小”呢&#xff1…

java字节流

一 字节流 1.1字节输出流OutputStream OutputStream是一个抽象类&#xff0c;操作的数据都是字节。 输出流中定义都是写write方法&#xff0c;如下图: 1.1.1 FileOutputStream类 OutputStream有很多子类&#xff0c;其中子类FileOutputStream可用来写入数据到文件。FileOutput…

MySQL:讨人喜欢的 MySQL replace into 用法(insert into 的增强版)

讨人喜欢的 MySQL replace into 用法&#xff08;insert into 的增强版&#xff09; 在向表中插入数据的时候&#xff0c;经常遇到这样的情况&#xff1a;1. 首先判断数据是否存在&#xff1b; 2. 如果不存在&#xff0c;则插入&#xff1b;3.如果存在&#xff0c;则更新。 在 …

读农民工兄弟学C#文章后的感觉

呵呵,这些文章让原本枯燥的技术文章变得简单,通俗,易懂此类文章是编程入门新手必读的好文章,当然高手可以看一下这些文章的语言风格,等日后自己出书时,可以参考之!哈哈,哪天我也出一本属于自己的技术书^_^好可惜的是自己离"高手"还有那么一段路要走!当然喽,我不是高手…

36晋级12第五场:冯志刚决胜入围(节目视频)

视频地址&#xff1a;[url]http://win.cn.yahoo.com/070613/16/mn53.html[/url]比赛时间&#xff1a;2007年4月17日 选手&#xff1a;王阳、冯志刚、刘恩霞、王嵩 评委&#xff1a; 熊晓鸽&#xff1a;IDG创业投资基金 创始合伙人 马云&#xff1a;阿里巴巴集团董事局主席及雅虎…

顯示密碼安全性強度

<div class"from-nav"><div class"Mtitle"><span>密码:</span></div><div class"Mright"><input name"password"type"password"id"password"size"20"class&quo…

HashMap集合遍历的五种方式

文章目录创建HashMap集合&#xff0c;添加数据一、第一种foreach遍历二、第二种foreach遍历三、第一种迭代器遍历 效率高&#xff08;建议使用&#xff09;四、第二种迭代器遍历 效率低&#xff08;不建议使用&#xff09;五、JDK8新特性&#xff0c;通过lambda遍历Map创建Hash…

JAVA核心技术I---JAVA基础知识(查漏补缺private,static)

一&#xff1a;private对于类和对象&#xff08;同C&#xff09; private是只有这个类内部可以访问&#xff08;类的成员函数和定义初始化&#xff09; private是类之间的限制&#xff0c;而不是对对象的限制《重点》 同类对象是可以直接访问对象的私有成员 class priTest{priv…

『TensorFlow』命令行参数解析

argparse很强大&#xff0c;但是我们未必需要使用这么繁杂的东西&#xff0c;TensorFlow自己封装了一个简化版本的解析方式&#xff0c;实际上是对argparse的封装 脚本化调用tensorflow的标准范式&#xff1a; import pprint import tensorflow as tfflags tf.app.flags# 脚本…

IIS重起批处理

将以下代码保存到一个BAT文件中,执行后便可释放所有被锁定的组件。执行顺序是&#xff1a;停止&#xff37;&#xff37;&#xff37;服务&#xff0d;〉停止组件保护&#xff0d;〉开启组件保护&#xff0d;〉开启&#xff37;&#xff37;&#xff37;服务。net stop w3svc /…

RenderTree渲染树

RenderTree渲染树对类中的静态成员有很重要的关系&#xff0c;这个和多态是有很重要的关系&#xff0c;举个简单的例子&#xff0c;在游戏中&#xff0c;马里奥需要渲染&#xff0c;蘑菇也需要渲染&#xff0c;怪兽也需要渲染&#xff0c;其是串在一个树上的&#xff0c;但是不…

JAVA IO流复制文件夹及里面的所有文件

public static void main(String[] args) throws Exception {//复制到哪个路径&#xff08;path&#xff09;中String path "E:\\main";File file new File("F:\\main");copyAll(file, path);}public static void copy(File file, String path) throws E…

python RSA 加密与签名

PyCrypto装起来就简单多了&#xff0c;我是直接 sudo easy_install pycrypto 直接搞定的先生成rsa的公私钥&#xff1a;打开控制台&#xff0c;输入 openssl 再输入 genrsa -out private.pem 1024 来生成私钥接着输入 rsa -in private.pem -pubout -out public.pem 来生成公钥$…

深度解析VC中的消息(上)

消息是指什么&#xff1f;消息系统对于一个win32程序来说十分重要&#xff0c;它是一个程序运行的动力源泉。一个消息&#xff0c;是系统定义的一个32位的值&#xff0c;他唯一的定义了一个事件&#xff0c;向Windows发出一个通知&#xff0c;告诉应用程序某个事情发生了。例如…

漫谈C#之关键字(1)

每一种语言都有非常多的关键字&#xff0c;而且这些关键字也都大同小异&#xff0c;不过毕竟还是有些许的不一样。有些关键字大家碰到的多了&#xff0c;自然就熟悉了&#xff0c;但是有些关键字用得不大多&#xff0c;或者是新引入的&#xff0c;所以就不大熟悉了。我平常在用…

Python-CSS整理

CSS层叠样式表 1.格式 h1 {name:abc&#xff0c;color:red}&#xff1b; 选择器-属性-值 2.ID和类选择 #ID .类 3.CSS引用方式 link文件-外部样式表 script style-内部样式表 标签 style-内联样式 4.background背景 ~background ~-color 背景颜色 ~-repeat 是否重复 ~-pos…

利用 socket 获取 tcp 包并解析的问题。

服务器端代码如下&#xff1a;&#xff08;Java Servlet 实现&#xff09; protected void doPost(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException { OutputStream out response.getOutputStream(); try { …

Mysql排序后显示排序序号

网上找的一个比较齐全的解释案例&#xff0c;拿来记录一下 ① : 与 的区别 : 赋值的意思。在set update select 中表示赋值的意思&#xff0c;用的比较少一般都用&#xff0c;但是在用变量实现行号时&#xff08;比如本文标题获取排列序号&#xff09;&#xff0c;一定要用:…

解决Visual Studio For Mac Restore失败的问题

之前就了解到微软出了mac版的VS&#xff0c;没太多的关注&#xff0c;自己也就是使用 DotNet Core SDK VS Code 做一些小demo。 前两天发布了DotNet Core 2.0 &#xff0c;Visual Studio For Mac 7.1 之后&#xff0c;感觉可以装起来用用&#xff0c;把win下面的项目转到Core上…

应用Etherchannel扩展企业服务的高可用性

什么是Etherchannel&#xff1f;Etherchannel可以说是Cisco特有的技术&#xff0c;也就是我们在交换机以及路由器上所要配置Etherchannel就不得不使用CISCO设备。在实际工程中一般用于出口与上层设备的连接。实际上通过Etherchannel的中文名-端口绑定就很容易理解&#xff0c;它…

Leetcode 391.完美矩形

完美矩形 我们有 N 个与坐标轴对齐的矩形, 其中 N > 0, 判断它们是否能精确地覆盖一个矩形区域。 每个矩形用左下角的点和右上角的点的坐标来表示。例如&#xff0c; 一个单位正方形可以表示为 [1,1,2,2]。 ( 左下角的点的坐标为 (1, 1) 以及右上角的点的坐标为 (2, 2) )。…

Meteor计时器

Meteor有提供它自己的setTimeout和setInterval方法。这些方法被用于确保所有全局变量都具有正确的值。它们就像普通 JavaScript 中的setTimeout 和 setInterval 一样工作。Timeout - 超时 Meteor.setTimeout 的例子。Meteor.setTimeout(function(){console.log("Timeout c…

WEB程序代码优化入手的几方面

这里对web程序方面的优化作一个总结.1.编码规范化可读性优化编码规范我想一般程序员不会不了解&#xff0c;如果你这方面是空白你应该好好补补基础了&#xff0c;做到编码规范是一个好的程序员的最基础要求&#xff0c;一个团队也应该有自己的编码规范。所以程序的优化也应该包…

【Elastic Stack(一)】Elastic Stack简介

如果你没有听说过Elastic Stack&#xff0c;那你一定听说过ELK。实际上ELK是三款软件的简称&#xff0c;分别是Elasticsearch、Logstash、Kibana组成&#xff0c;在发展的过程中&#xff0c;又有新成员Beats的加入&#xff0c;所以就形成了Elastic Stack。 所以说&#xff0c;…

c#创建、保存excel正常执行要点补疑

网上搜索C#实现excel操作的示例太多了&#xff0c;但不知道有多少是经过验证确实可行才发布出来的&#xff0c;也是因为开发需要&#xff0c;我找了一些代码却发现大多都不能正确执行完毕&#xff0c;于是决定补充自己在实践中遇到的要点以供参考。如下示例&#xff1a;usingMi…

动态更新 AGS Cache

作者&#xff1a;Flyingis 提升ArcGIS Server访问速度最佳的方式是Cache&#xff0c;将所有图层切片保存在服务器&#xff0c;客户端请求时直接访问cache好的图片&#xff0c;这里分为两种情况&#xff0c;一是所有图层都做cache&#xff0c;二是部分图层做cache&#xff0…

SVN状态图标不显示的两种解决办法

文章目录第一种方法第二种方法首先情况如下&#xff1a;这样看不到状态是不是就很难受 本博主最近也是第一次使用SVN做版本控制 然后就出现了这样的情况后来经过查询才知道SVN刚下载安装后 设置什么的都是默认的 需要手动设置一下就OK啦 第一种方法 我们先在桌面或者资源管理…

SPOJ ATOMS - Atoms in the Lab

题目链接&#xff1a;http://www.spoj.com/problems/ATOMS/ 题目大意&#xff1a;有N个原子&#xff0c;他们每秒分裂成K个新原子&#xff0c;新原子也能继续分裂。问如果要控制他的数量为M以内&#xff0c;应在什么时候使其停止分裂。其实时间为0. 解题思路&#xff1a;可以发…