当前位置: 首页 > 编程日记 > 正文

Kafka 集群如何实现数据同步?

哈喽大家好,我是咸鱼

最近这段时间比较忙,将近一周没更新文章,再不更新我那为数不多的粉丝量就要库库往下掉了 T﹏T

刚好最近在学 Kafka,于是决定写篇跟 Kafka 相关的文章(文中有不对的地方欢迎大家指出)

考虑到有些小伙伴可能是第一次接触 Kafka ,所以先简单介绍一下什么是 Kafka 吧!

Kafka 介绍

Kafka 是一个高吞吐的分布式消息系统,不但像传统消息队列(RaabitMQ、RocketMQ等)那样能够【异步处理、流量消峰、服务解耦】

还能够把消息持久化到磁盘上,用于批量消费。除此之外由于 Kafka 被设计成分布式系统,吞吐量和可用性大大提高

Kafka 角色

  • kafka 客户端
    • 生产者(producer):也叫发布者,负责创建消息
    • 消费者(consumer):也叫订阅者,负责消费(读取)消息
  • Kafka 服务端(broker)
    • leader:对外提供读写服务
    • follower:不提供服务,负责向 leader 同步数据

Topic(主题)和 partition(分区)

topic 就是消息发布的地方,消费者通过订阅 topic 来消费到对应的消息

为了提高吞吐量,实现 topic 的负载均衡,Kafka 在 topic 下又引用了分区(partition)的概念,每个 topic 可以被划分成多个分区

分区允许消息在 Topic 下水平分割和存储,每个分区都是一个有序且不可变的消息队列,消费者可以以并行的方式消费同一个 topic 中的消息

PS:topic 是逻辑上的概念,消息真正是存储到 partition 中去的

例如某个 topic 下被划分成 n 个分区,那么这个 topic 的并发度就提高 n,同时可以支持 n 个 consumer 并行消费该 topic 中的消息

log(日志)

对于每一个 topic ,Kafka 都会维护一个分区日志

每个分区都是一个有序的、不可变的消息队列,且可以持续地添加消息。消息在分区中分配了唯一的序列号,被称为偏移量(Offset)

offset 用来唯一的标识分区中每一条记录

Kafka 会保留所有分区中的消息,不会自动删除消息。消息的保留策略由 Kafka 配置参数控制,消息可以在一定时间或达到一定大小后过期,过期的消息会被删除

消费者在 Kafka 中只保留自己的 Offset,用于标识它在分区中的位置。通常情况下,当 消费者消费消息时,它的 Offset 会线性增加,表示它已经消费了这些消息

消费者可以选择将 Offset 重置为更旧的值,从而重新开始读取消息

每个消费者实例唯一负责一个分区,Kafka 只保证分区内的记录是有序的,而不保证主题中不同分区的顺序

Kafka 集群

Kafka 是分布式架构,有集群(cluster)的概念

Kafka 中的一个实例被称为 broker,它接收生产者的消息并存入磁盘,消费者连接 broker 消费消息

多个 broker 组成一个 Kafka cluster,集群内某个 broker 会成为集群控制器(cluster controller),负责管理整个 Kafka 集群,包括分配分区给 broker,监控 broker 等

分区被复制成了多个副本(replica)然后均分在不同的 broker 上 ,其中一个副本 Leader,其他的是 Follower

创建副本的单位是 topic 的 分区

正常情况下,每个分区都有一个 leader 和零或多个 followers 。这样即使某个 broker 发生故障,其他 broker上的副本仍然可以继续提供服务

那如何将所有的副本均匀分布在不同 broker 上呢?

分配副本的算法如下:

  • 将所有 broker(假设共 n 个 broker)和待分配的分区排序
  • 将第 i 个分区分配到第(i mod n)个 broker上
  • 将第 i 个分区的第 j 个副本分配到第((i + j) mode n)个 broker 上

如何实现数据同步?

我们先来看下 Kafka 中的 ISR(In-Sync Replicas) 机制

既然每个 leader 下面都有至少一个 follower,于是便有了 ISR,ISR 就是 Kafka 动态维护的一组同步副本集合

ISR 中所有的 follower 都与 leader 保持同步状态,而且 leader 也在 ISR 列表中,只有在自己 ISR 列表中的副本才能参与 leader 竞选

当生产者写入数据时,leader 更新数据,follower 是怎么知道 leader 更新然后去同步数据的呢?

follower 会通过定期向 leader 发送 fetch 请求来实现数据同步,这个是由 fetcher 线程来负责的

当一个副本被选举成为 follower 后,会启动副本的 fetcher 线程,随后 Follower 会定期向 Leader 发送心跳请求,以保持连接,并发送 fetch 请求来获取最新的数据

如果 follower 发现自己的 LEO(Log End Offset,日志结束偏移量)与 Leader 的 LEO 有差距时,会触发同步数据请求,以便将自身日志同步至 Leader 的对应位置,确保与 Leader 的数据保持一致

如果一个 follower 在指定时间内(配置字段为 replica.lag.time.max.ms)没有发送 fecth 请求或者没有追上 leader 的 LEO,就会从 ISR 中移除

最后总结一下:

  • Kafka 中的 topic 是逻辑概念,每个 topic 可以被划分为多个分区,而分区才是存储消息的实体
  • 每一个分区会被复制成多个副本,然后选取其中一个副本当作 leader,剩下的则是 follower
  • follower 会定期去向 leader 发送 fetch 请求来保证数据的同步
  • leader 不会关心 follower 的数据是不是同步好了的,只要你在指定时间内没有找我来 fetch ,我就把你从 ISR 中剔除出来

相关文章:

使用 Hexo 搭建个人博客并部署到云服务器

目录1 整体流程2. 本地环境准备2.1 安装 Node.js 和 Git2.2 安装 Hexo3. 服务端环境准备3.1 Nginx 环境配置3.1.1 安装 Nginx3.1.2 更改 Nginx 配置文件3.2 Node.js 环境配置3.3 Git 环境配置3.3.1 安装 Git3.3.2

过滤器模式 rust和java的实现

文章浏览阅读301次。我们将创建一个 Person 对象、Criteria 接口和实现了该接口的实体类,来过滤 Person 对象的列表。我们制作一个Person实体类,Criteria为标准条件,CriteriaMale等为实现的具体判断器,是需要为person类使用meetCriteria方法便可以进行不同条件的判断。我们制作一个Person实体类,Criteria为标准条件,CriteriaMale等为实现的具体判断器,是需要为person类使用meetCriteria方法便可以进行不同条件的判断。

python最流行的适合计算积分和微分方程的库

SciPy有一个子模块scipy.integrate,包含多种数值积分方法,如牛顿哥特法(quad)、梯形法(trapz)、辛普森法(simps)等

c语言如何生成随机数以及设置随机数的范围

这篇文章介绍c语言如何生成随机数以及设置随机数的范围。本文主要介绍了rand函数、srand函数、以及time函数和时间戳的概念和如何控制随机数的范围。下一篇文章将介绍利用随机数和循环来写一个猜数字游戏。

Java虚拟机的垃圾回收机制

在Java语言中不再被任何引用所指向的对象被称为垃圾,这也是非常容易理解的,因为我们在对对象进行创建时,就需要给出一个对象引用变量用于指向我们在堆内存中创建的对象,所以如果一个对象没有被任何引用变量所指向的话,那么我们也就获取不到对象,也就没有办法对对象进行操作,这个对象自然就成了垃圾。引用计数算法判断对象是否为垃圾时比较方便快捷只是根据计数器中的值进行判断,但是会在每个对象的对象头中添加属性占用额外的空间,并且引用变量每一次指向对象都需要对对象的属性进行更新操作,占用大量的时间。为什么需要回收垃圾?

require()、import、import()有哪些区别?

require()、import、import()是我们常用的引入模块的三种方式,代码中几乎处处用到。如果对它们存在模糊,就会在工作过程中不断产生困惑,更无法做到对它们的使用挥洒自如。今天我们来一起捋一下,它们之间有哪些区别? 一、前世今生 学一个东西,先弄清楚它为什么会出现、它的发展历史、它是做什

如何保护电动汽车充电站免受网络攻击

文章浏览阅读228次。从电动汽车、传感器、充电站和支持基础设施的设计阶段开始就强调安全性作为首要任务。

Vite4+Typescript+Vue3+Pinia 从零搭建(3) - vite配置

项目代码同步至码云 weiz-vue3-template 关于vite的详细配置可查看 vite官方文档,本文简单介绍vite的常用配置。 初始内容 项目初建后,vite.config.ts 的默认内容如下: import { defineConfig } from 'vite' i

原型模式 rust和java的实现

文章浏览阅读218次。意图:用原型实例指定创建对象的种类,并且通过拷贝这些原型创建新的对象。主要解决:在运行期建立和删除原型。何时使用: 1、当一个系统应该独立于它的产品创建,构成和表示时。2、当要实例化的类是在运行时刻指定时,例如,通过动态装载。3、为了避免创建一个与产品类层次平行的工厂类层次时。4、当一个类的实例只能有几个不同状态组合中的一种时。建立相应数目的原型并克隆它们可能比每次用合适的状态手工实例化该类更方便一些。如何解决:利用已有的一个原型对象,快速地生成和原型对象一样的实例。关键代码。

c语言const修饰变量与assert断言详解

const修饰变量与assert断言详解,const修饰变量。作用:const用于修饰变量使其不能再被修改。

Linux如何修改主机名(hostname)(亲测可用)

文章浏览阅读1k次。要想在虚拟机的 Linux 系统内部改变主机名(hostname),需要通过系统的配置来修改。文件,将其中引用旧主机名的条目更新为新主机名。文件,并将里面的内容替换为新主机名。但是大多数情况可能无需更改,除非在。文件里做了什么硬编码骚操作🤣。替换为想要设置的新主机名。或者使用文本编辑器手动编辑。需要重新设置主机名。

Linux下内网穿透实现云原生观测分析工具的远程访问

夜莺监控是一款开源云原生观测分析工具,采用 All-in-One 的设计理念,集数据采集、可视化、监控告警、数据分析于一体,与云原生生态紧密集成,提供开箱即用的企业级监控分析和告警能力。夜莺于 2020 年 3 月 20 日,在 github 上发布 v1 版本,已累计迭代 100 多个版本。本地部署后,为解决无法远程访问的难题,今天我们介绍如何实现让本地nightingale 结合cpolar 内网穿透工具实现 远程也可以访问,提高运维效率.

Java Lambda 表达式笔记

文章浏览阅读71次。Lambda 表达式,也可称为闭包.Lambda 允许把函数作为一个方法的参数(函数作为参数传递进方法中)。

JavaScript中如何终止forEach循环?

在JavaScript中,forEach方法是用于遍历数组的,通常没有直接终止循环的机制。然而,我们可以使用一些技巧来模拟终止forEach循环。以下是几种常见的方法

Docker本地部署Drupal并实现公网访问

文章浏览阅读498次,点赞35次,收藏33次。Dupal是一个强大的CMS,适用于各种不同的网站项目,从小型个人博客到大型企业级门户网站。它的学习曲线可能相对较陡,但一旦熟悉了它的工作方式,用户就能够充分利用其功能和灵活性。在本文中,我们将介绍如何使用Docker快速部署Drupal,并且结合cpolar内网穿透工具实现公网远程访问首先,您需要在您的机器上安装Docker,并且启动,可以按照Docker官方文档中的说明进行安装。

如何使用 RestTemplate 进行 Spring Boot 微服务通信示例

概述 下面我们将学习如何创建多个 Spring boot 微服务以及如何使用 RestTemplate 类在多个微服务之间进行同步通信。 微服务通信有两种风格: 同步通讯 异步通信 同步通讯 在同步通信的情况下,客户端发送请求并等待服务的响应。这里重要的一点是协议(HTTP/HTTPS)是同步的,客

win10 通过wmic命令行设置系统环境变量

而通过编程修改系统环境变量,需要调用注册表API或调用wmi API接口,都有些过于麻烦。此时,如果通过system函数,直接调用批处理文件,则只需要一行代码。批处理中,分别给出了创建环境变量,修改环境变量,删除环境变量的demo。可以根据需要调整批处理文件。在系统维护或编写程序过程中,经常需要对系统环境变量进行设置、修改、删除炒作。注:修改系统环境变量,需要有管理员权限。

c#操作mongodb数据库工具类

新建c#项目,在nuget中引入MongoDB.Driver驱动,然后新建一个MongoDBToolHelper类,将下面代码复制进去 using MongoDB.Bson; using MongoDB.Bson.Serialization; using MongoDB.Driver; using

Redis查看集群状态有节点显示fail,连接失败

现有6台redis,为集群,3主3从,由于两台服务器故障重装系统之后进行重新安装2台redis,安装完成之后。查询当前redis状态,发现有存留的节点,但连接为fai。将新建的两台redis启动,保证配置文件一致。查询fail节点的node_id。将新建的两台redis设为从节点。登录新建的两台redis服务器。先通过ID删掉无用的节点。

SpringBoot整合Kafka (二)

Kafka是最初由Linkedin公司开发,是一个分布式、支持分区的(partition)、多副本的(replica),基于zookeeper协调的分布式消息系统,它的最大的特性就是可以实时的处理大量数据以满足各种需求场景,比如基于hadoop的批处理系统、低延迟的实时系统、Storm/Spark流式处理引擎,web/nginx日志、访问日志,消息服务等等,用scala语言编写,Linkedin于2010年贡献给了Apache基金会并成为顶级开源 项目。

SpringBoot整合Kafka (一)

Kafka是最初由Linkedin公司开发,是一个分布式、支持分区的(partition)、多副本的(replica),基于zookeeper协调的分布式消息系统,它的最大的特性就是可以实时的处理大量数据以满足各种需求场景,比如基于hadoop的批处理系统、低延迟的实时系统、Storm/Spark流式处理引擎,web/nginx日志、访问日志,消息服务等等,用scala语言编写,Linkedin于2010年贡献给了Apache基金会并成为顶级开源 项目。

深入详解高性能消息队列中间件 RabbitMQ

在进行系统设计的时候,各个模块、服务器之间为了实现数据的交互,通常是建立连接通过发送消息来进行。如果将他们一一建立连接,就会出现链路太多,每一条链路都必须感知对端等问题。此场景下消息将非常混乱,后期维护也将非常痛苦。

ModuleNotFoundError: No module named ‘config‘

当你在Python代码中看到类似"ModuleNotFoundError: No module named 'config'"的错误消息时,意味着你尝试导入一个名为'config'的模块,但Python无法找到该模块

【云原生基础】了解云原生,什么是云原生?

云原生(Cloud-Native)是一种软件开发和部署方法论,旨在利用云计算、容器化、微服务架构和持续交付等现代技术和最佳实践,以构建、部署和运行可伸缩、高可用、弹性和易于管理的应用程序。云原生应用程序旨在充分发挥云计算的潜力,以更好地满足快速发展的数字业务需求。容器化:云原生应用程序通常使用容器技术(如Docker)进行封装,使应用程序及其依赖项可以在不同环境中一致运行。容器化提供了隔离、可重复部署和快速部署的好处。

【微服务】mysql + elasticsearch数据双写设计与实现

在很多电商网站中,对商品的搜索要求很高,主要体现在页面快速响应搜索结果。这就对服务端接口响应速度提出了很高的要求。

Python异常处理:try、except、else 和 finally 的使用指南

Python异常处理:try、except、else 和 finally 的使用指南

HarmonyOS 数据管理与应用数据持久化(二)

文章浏览阅读31次。关系型数据库基于 SQLite 组件,适用于存储包含复杂关系数据的场景,比如一个班级的学生信息,需要包括姓名、学号、各科成绩等,又或者公司的雇员信息,需要包括姓名、工号、职位等,由于数据之间有较强的对应关系,复杂程度比键值型数据更高,此时需要使用关系型数据库来持久化保存数据。

HarmonyOS数据管理与应用数据持久化(一)

文章浏览阅读599次。一.

Ubuntu系统下怎么安装Docker(linux安装docker教程)

更新系统软件包 在安装 Docker 前,首先需要更新系统软件包,确保系统上的软件都是最新的版本

python实战讲解之使用Python批量发送个性化邮件

通过上述Python脚本,我们可以批量发送个性化的邮件。我们首先设置发件人邮箱和密码,然后指定SMTP服务器和端口号。接下来,我们读取包含员工信息的Excel文件,并获取唯一的员工姓名列表和对应的邮箱地址。然后,我们遍历员工数据,并为每个员工创建邮件,附带相应的附件。最后,我们通过SMTP服务器发送邮件,并在发送完成后删除生成的员工数据文件。