日志文件和mysql同步到kafka_logstash_output_kafka:Mysql同步Kafka深入详解
0、题记
实际业务场景中,会遇到基础数据存在Mysql中,实时写入数据量比较大的情景。迁移至kafka是一种比较好的业务选型方案。
而mysql写入kafka的选型方案有:
方案一:logstash_output_kafka 插件。
方案二:kafka_connector。
方案三:debezium 插件。
方案四:flume。
方案五:其他类似方案。
其中:debezium和flume是基于mysql binlog实现的。
如果需要同步历史全量数据+实时更新数据,建议使用logstash。
1、logstash同步原理
常用的logstash的插件是:logstash_input_jdbc实现关系型数据库到Elasticsearch等的同步。
实际上,核心logstash的同步原理的掌握,有助于大家理解类似的各种库之间的同步。
logstash核心原理:输入生成事件,过滤器修改它们,输出将它们发送到其他地方。
logstash核心三部分组成:input、filter、output。
input { }
filter { }
output { }
1.1 input输入
包含但远不限于:
jdbc:关系型数据库:mysql、oracle等。
file:从文件系统上的文件读取。
syslog:在已知端口514上侦听syslog消息。
redis:redis消息。beats:处理 Beats发送的事件。
kafka:kafka实时数据流。
1.2 filter过滤器
过滤器是Logstash管道中的中间处理设备。您可以将过滤器与条件组合,以便在事件满足特定条件时对其执行操作。
可以把它比作数据处理的ETL环节。
一些有用的过滤包括:
grok:解析并构造任意文本。Grok是目前Logstash中将非结构化日志数据解析为结构化和可查询内容的最佳方式。有了内置于Logstash的120种模式,您很可能会找到满足您需求的模式!
mutate:对事件字段执行常规转换。您可以重命名,删除,替换和修改事件中的字段。
drop:完全删除事件,例如调试事件。
clone:制作事件的副本,可能添加或删除字段。
geoip:添加有关IP地址的地理位置的信息。
1.3 output输出
输出是Logstash管道的最后阶段。一些常用的输出包括:
elasticsearch:将事件数据发送到Elasticsearch。
file:将事件数据写入磁盘上的文件。
kafka:将事件写入Kafka。
详细的filter demo参考:http://t.cn/EaAt4zP
2、同步Mysql到kafka配置参考
input {
jdbc {
jdbc_connection_string => "jdbc:mysql://192.168.1.12:3306/news_base"
jdbc_user => "root"
jdbc_password => "xxxxxxx"
jdbc_driver_library => "/home/logstash-6.4.0/lib/mysql-connector-java-5.1.47.jar"
jdbc_driver_class => "com.mysql.jdbc.Driver"
#schedule => "* * * * *"
statement => "SELECT * from news_info WHERE id > :sql_last_value order by id"
use_column_value => true
tracking_column => "id"
tracking_column_type => "numeric"
record_last_run => true
last_run_metadata_path => "/home/logstash-6.4.0/sync_data/news_last_run"
}
}
filter {
ruby{
code => "event.set('gather_time_unix',event.get('gather_time').to_i*1000)"
}
ruby{
code => "event.set('publish_time_unix',event.get('publish_time').to_i*1000)"
}
mutate {
remove_field => [ "@version" ]
remove_field => [ "@timestamp" ]
remove_field => [ "gather_time" ]
remove_field => [ "publish_time" ]
}
}
output {
kafka {
bootstrap_servers => "192.168.1.13:9092"
codec => json_lines
topic_id => "mytopic"
}
file {
codec => json_lines
path => "/tmp/output_a.log"
}
}
以上内容不复杂,不做细讲。
注意:
Mysql借助logstash同步后,日期类型格式:“2019-04-20 13:55:53”已经被识别为日期格式。
code =>
"event.set('gather_time_unix',event.get('gather_time').to_i*1000)",
是将Mysql中的时间格式转化为时间戳格式。
3、坑总结
3.1 坑1字段大小写问题
from星友:使用logstash同步mysql数据的,因为在jdbc.conf里面没有添加 lowercase_column_names
=> "false" 这个属性,所以logstash默认把查询结果的列明改为了小写,同步进了es,所以就导致es里面看到的字段名称全是小写。
最后总结:es是支持大写字段名称的,问题出在logstash没用好,需要在同步配置中加上 lowercase_column_names => "false" 。记录下来希望可以帮到更多人。
3.2 同步到ES中的数据会不会重复?
想将关系数据库的数据同步至ES中,如果在集群的多台服务器上同时启动logstash。
解读:实际项目中就是没用随机id 使用指定id作为es的_id ,指定id可以是url的md5.这样相同数据就会走更新覆盖以前数据
3.3 相同配置logstash,升级6.3之后不能同步数据。
解读:高版本基于时间增量有优化。
tracking_column_type => "timestamp"应该是需要指定标识为时间类型,默认为数字类型numeric
3.4 ETL字段统一在哪处理?
解读:可以logstash同步mysql的时候sql查询阶段处理,如:select a_value as avalue***。
或者filter阶段处理,mutate rename处理。
mutate {
rename => ["shortHostname", "hostname" ]
}
或者kafka阶段借助kafka stream处理。
4、小结
相关配置和同步都不复杂,复杂点往往在于filter阶段的解析还有logstash性能问题。
需要结合实际业务场景做深入的研究和性能分析。
有问题,欢迎留言讨论。
推荐阅读:
1、实战 | canal 实现Mysql到Elasticsearch实时增量同步
2、干货 | Debezium实现Mysql到Elasticsearch高效实时同步
3、一张图理清楚关系型数据库与Elasticsearch同步 http://t.cn/EaAceD3
4、新的实现:http://t.cn/EaAt60O
5、mysql2mysql: http://t.cn/EaAtK7r
6、推荐开源实现:http://t.cn/EaAtjqN
加入星球,更短时间更快习得更多干货!
相关文章:

TPYBoard自制微信远程智能温湿度计
2019独角兽企业重金招聘Python工程师标准>>> 智能时代一夜间什么都能远程了。创业者想着如何做智能产品,如何做远程控制。DIY爱好者也想着如何自制各种奇妙的工具。这里和大家一起学习制作一款廉价的智能温湿度计。说它廉价是因为共计花费不过40元&#…
Matlab数据的可视化 -- 简易线性函数图
本篇微信图文主要介绍Matlab数据可视化方面的内容。

浏览器tab关闭事件_Python--使用Pyqt5实现简易浏览器(最新版本测试过)
Python--使用Pyqt5实现简易浏览器(最新版本测试过)博客说明文章所涉及的资料来自互联网整理和个人总结,意在于个人学习和经验汇总,如有什么地方侵权,请联系本人删除,谢谢!准备环境首先我们需要的是我们的开发环境&…

JS 中 URL 编码的问题.
URL 编码 为什么要对 URL 编码 1. 避免解析错误 我们的 queryString 的形式是使用 ?开始, keyvalue 传递参数, key-value pairs 之间使用 & 连接.比如: ?postid5038412&t1450591802326服务器会 根据 & 解析 key-value pairs 根据 解析 key,value 那么如果 key或…

通用权限管理模块系列——需求分析——列举需求
以RBAC为理论基础 以低耦合为设计原则 我们计划设计一个独立的权限模块 碰到一些难题,后面另外开文章来讨论 本文主要是列举目前收集到的权限方面的需求信息,如果各位有能提供的用例,请不吝赐教。 各种需求都是从网络或零碎项目里提取的&…
Matlab数据的可视化 -- 散点图
本篇微信图文主要介绍Matlab数据可视化方面的内容。scatter

koa连接mysql_CentOS 环境 Node + Koa2 连接 MySQL (ECS系列三)
Koa 搭建服务新建一个文件夹 koa_server,进入后cnpm i -S koa会生成 node_modules 依赖包,以及 package.json创建文件 server.js,并写入const Koa require(koa);const app new Koa();app.use( async ( ctx ) > {ctx.body hello koa2})a…

Python CRC32 文件校验
binascii.crc32(s [,crc])返回CRC32校验。参数crc指定初始值用于循环。例如:Code>>> import binascii>>> crc binascii.crc32(spam)>>> binascii.crc32( and eggs, crc)739139840>>> binascii.crc32(spam and eggs)739139840C…

国家智慧城市战略实施 保温材料等建材万亿市场待挖掘
近年来,国家多部委联合发布《国家新型城镇化规划(2014—2020)》《关于促进智慧城市健康发展的指导意见》,提出到2020年建成一批特色鲜明的智慧城市。国家智慧城市建设与发展上升为国家战略。2014年中国智慧城市的市场规模达到800多亿元,其中不…

Matlab数据的可视化 -- 极坐标图及其与直角坐标图的转换
本篇微信图文主要介绍Matlab数据可视化方面的内容。polar、pol2cart、cart2pol

mysql 主从复制 性能_zabbix监控mysql各项性能,主从复制
######################################################监控mysql(默认监控模板不能用,再agentd.conf开启自定义key,自己编写脚本)在zabbix_agentd.conf尾部添加(注意相关文件路径,以及脚本权限等问题)vim zabbix_agentd.confUserParameter…

SQL Server基础知识之:设计和实现视图
设计和实现视图可谓是数据库物理设计中的一个非常重要的步骤。从一般意义上说,设计和实现视图应该遵循下面的一些建议和原则。 以下内容摘在文档,我对某些重点进行了补充说明(红色部分) 只能在当前数据库中创建视图。 但是&#x…
Matlab数据的可视化 -- 条形图
本篇微信图文主要介绍Matlab数据可视化方面的内容。bar、barh

全球SDN测试认证中心发布OpenDaylight测试报告
随着软件定义网络(Software Defined Network, SDN)商业部署速度地加快,关乎整个SDN 网络性能表现的控制平面核心组件——SDN 控制器也越来越成为网络用户关心的焦点。日前,天地互连-全球SDN测试认证中心(SDNCTC,www.sdnctc.com)正…

mysql可以做日期处理吗_mysql可以做日期处理吗
最近一个项目,需要写很多的sql语句。涉及到很多与时间相关的处理,现在做一下简单的总结。1.时间格式:在这个项目中,或许是由不同的人建立的数据库表结构吧,对时间的格式步统一,有的表中存储的long类型的时间…

大型网站采用什么系统架构保证性能稳定性
from http://www.bobd.cn/design/web/Theory/200904/31145.html 千万级的注册用户,千万级的帖子,nTB级的附件,还有巨大的日访问量,大型网站采用什么系统架构保证性能和稳定性? 首先讨论一下大型网站需要注意和考虑的问…
Matlab数据的可视化 -- 饼图
本篇微信图文主要介绍Matlab数据可视化方面的内容。pie

Rushcrm:企业部署CRM系统做什么
现在很流行的一个词就是大数据,很多企业开始重视数据的开发利用。CRM在这样的环境背景下诞生了,但这并不意味着所有的企业都能很好的利用好CRM系统(客户关系管理系统)。CRM带来的效益是显而易见的,但是购置了CRM系统仅仅是第一步,…

slot没有毁灭的问题_解析flink之perjob模式下yn参数不生效问题
概要:0. 问题背景1. Stream Job的切分2. 计算资源的调度 & 任务的执行3. 最后的总结0. 问题背景:开始用flink处理流式作业的时候,用yarn-cluster模式提交作业的时候,脚本如下:$FLINK_BIN run -m yarn-cluster -yqu…

获取access中表的相关信息
就用到两条.net自带的获取数据库信息的语句 OleDbConnection con new OleDbConnection(connection); // OleDB数据库连接实例 // 获取数据库中表的相关信息DataTable dt con.GetOleDbSchemaTable(OleDbSchemaGuid.Tables, new Object[] { null, null, null, "Table"…
Matlab数据的可视化 -- 茎干图
本篇微信图文主要介绍Matlab数据可视化方面的内容。stem

Wink发布Wink Hub2家庭物联网控制中心
智能家居主要厂商之一Wink宣布推出Wink Hub2家庭物联网控制中心,支持更多的家庭物联网产品,更广泛的互联互通,以及提供更好用的控制程序。Wink Hub2主要作用是连接第三方家庭物联网产品,并从一个集中的应用程序进行控制。 Wink Hu…

mysql repalication_MySQL主从复制的原理及配置方法(比较详细)
一、复制的原理MySQL 复制基于主服务器在二进制日志中跟踪所有对数据库的更改(更新、删除等等)。每个从服务器从主服务器接收主服务器已经记录到其二进制日志的保存的更新,以便从服务器可以对其数据拷贝执行相同的更新。将主服务器的数据拷贝到从服务器的一个途径是…

贸易保护主义不能解决德国光伏企业的问题
针对德国媒体近期将该国光伏巨头SolarWorld宣布破产一事与中国相关联,甚至指责“中国盗走德国工作”的论调,中国驻德国大使馆经济商务参赞处公使衔参赞王卫东18日向记者表示,SolarWorld作为当初推动欧盟发起对华光伏产品“双反”措施的领头公…
Matlab数据的可视化 -- 平面多边形的着色
本篇微信图文主要介绍Matlab数据可视化方面的内容。fill

ASP.net session 使用总结(2)
Session又称为会话状态,是Web系统中最常用的状态,用于维护和当前浏览器实例相关的一些信息。举个例子来说,我们可以把已登录用户的用户名放在Session中,这样就能通过判断Session中的某个Key来判断用户是否登录,如果登录…

python re模块_Python re模块
正则表达式元字符说明. 匹配除换行符以外的任意字符^ 匹配字符串的开始$ 匹配字符串的结束[] 用来匹配一个指定的字符类别? 对于前一个字符字符重复0次到1次* 对于前一个字符重复0次到无穷次{} 对于前一个字符重复m次{m,n} 对前一个字符重复为m到n次\d 匹配数字,相…

国外优秀开源PHP建站程序一览
大量的PHP开源(开放源代码/Open Source)应用改变了这个世界,改变了互联网,以下我们总结从数据库到购物、博客等众多类型的开源PHP软件,供网站开发者们参考。 博客:WordPress WordPress是使用PHP开发的著名博…

《数据科学R语言实践:面向计算推理与问题求解的案例研究法》一一2.1 引言...
本节书摘来自华章计算机《数据科学R语言实践:面向计算推理与问题求解的案例研究法》一书中的第2章,第2.1节,作者:[美] 德博拉诺兰(Deborah Nolan) 邓肯坦普朗(Duncan Temple Lang) 更多章…
Matlab数据的可视化 -- 三维特殊图形
本篇微信图文主要介绍Matlab数据可视化方面的内容。plot3,bar3,barh3,scatter3