当前位置: 首页 > 编程日记 > 正文

日志服务Flink Connector《支持Exactly Once》

摘要:
Flink log connector是阿里云日志服务推出的,用于对接Flink的工具,包含两块,分别是消费者和生产者,消费者用于从日志服务中读数据,支持exactly once语义,生产者用于将数据写到日志服务中,该Connector隐藏了日志服务的一些概念,比如Shard的分裂合并等,用户在使用时只需要专注在自己的业务逻辑即可。

阿里云日志服务是针对实时数据一站式服务,用户只需要将精力集中在分析上,过程中数据采集、对接各种存储计算、数据索引和查询等琐碎工作等都可以交给日志服务完成。

日志服务中最基础的功能是LogHub,支持数据实时采集与消费,实时消费家族除 Spark Streaming、Storm、StreamCompute(Blink外),目前新增Flink啦。

Flink Connector

Flink log connector是阿里云日志服务提供的,用于对接flink的工具,包括两部分,消费者(Consumer)和生产者(Producer)。

消费者用于从日志服务中读取数据,支持exactly once语义,支持shard负载均衡.
生产者用于将数据写入日志服务,使用connector时,需要在项目中添加maven依赖:

<dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java_2.11</artifactId><version>1.3.2</version>
</dependency>
<dependency><groupId>com.aliyun.openservices</groupId><artifactId>flink-log-connector</artifactId><version>0.1.3</version>
</dependency>
<dependency><groupId>com.google.protobuf</groupId><artifactId>protobuf-java</artifactId><version>2.5.0</version>
</dependency><dependency><groupId>com.aliyun.openservices</groupId><artifactId>aliyun-log</artifactId><version>0.6.10</version></dependency>
<dependency><groupId>com.aliyun.openservices</groupId><artifactId>log-loghub-producer</artifactId><version>0.1.8</version>
</dependency>复制代码

代码:Github

用法

  1. 请参考日志服务文档,正确创建Logstore。
  2. 如果使用子账号访问,请确认正确设置了LogStore的RAM策略。参考授权RAM子用户访问日志服务资源。

1. Log Consumer

在Connector中, 类FlinkLogConsumer提供了订阅日志服务中某一个LogStore的能力,实现了exactly once语义,在使用时,用户无需关心LogStore中shard数
量的变化,consumer会自动感知。

flink中每一个子任务负责消费LogStore中部分shard,如果LogStore中shard发生split或者merge,子任务消费的shard也会随之改变。

1.1 配置启动参数

Properties configProps = new Properties();
// 设置访问日志服务的域名
configProps.put(ConfigConstants.LOG_ENDPOINT, "cn-hangzhou.log.aliyuncs.com");
// 设置访问ak
configProps.put(ConfigConstants.LOG_ACCESSSKEYID, "");
configProps.put(ConfigConstants.LOG_ACCESSKEY, "");
// 设置日志服务的project
configProps.put(ConfigConstants.LOG_PROJECT, "ali-cn-hangzhou-sls-admin");
// 设置日志服务的LogStore
configProps.put(ConfigConstants.LOG_LOGSTORE, "sls_consumergroup_log");
// 设置消费日志服务起始位置
configProps.put(ConfigConstants.LOG_CONSUMER_BEGIN_POSITION, Consts.LOG_END_CURSOR);
// 设置日志服务的消息反序列化方法
RawLogGroupListDeserializer deserializer = new RawLogGroupListDeserializer();
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<RawLogGroupList> logTestStream = env.addSource(new FlinkLogConsumer<RawLogGroupList>(deserializer, configProps));复制代码

上面是一个简单的消费示例,我们使用java.util.Properties作为配置工具,所有Consumer的配置都可以在ConfigConstants中找到。

注意,flink stream的子任务数量和日志服务LogStore中的shard数量是独立的,如果shard数量多于子任务数量,每个子任务不重复的消费多个shard,如果少于,

那么部分子任务就会空闲,等到新的shard产生。

1.2 设置消费起始位置

Flink log consumer支持设置shard的消费起始位置,通过设置属性ConfigConstants.LOG_CONSUMER_BEGIN_POSITION,就可以定制消费从shard的头尾或者某个特定时间开始消费,具体取值如下:

  • Consts.LOG_BEGIN_CURSOR: 表示从shard的头开始消费,也就是从shard中最旧的数据开始消费。
  • Consts.LOG_END_CURSOR: 表示从shard的尾开始,也就是从shard中最新的数据开始消费。
  • UnixTimestamp: 一个整型数值的字符串,用1970-01-01到现在的秒数表示, 含义是消费shard中这个时间点之后的数据。

三种取值举例如下:

configProps.put(ConfigConstants.LOG_CONSUMER_BEGIN_POSITION, Consts.LOG_BEGIN_CURSOR);
configProps.put(ConfigConstants.LOG_CONSUMER_BEGIN_POSITION, Consts.LOG_END_CURSOR);
configProps.put(ConfigConstants.LOG_CONSUMER_BEGIN_POSITION, "1512439000");复制代码

1.3 监控:消费进度(可选)

Flink log consumer支持设置消费进度监控,所谓消费进度就是获取每一个shard实时的消费位置,这个位置使用时间戳表示,详细概念可以参考
文档消费组-查看状态,[消费组-监控报警
](help.aliyun.com/document_de…。

configProps.put(ConfigConstants.LOG_CONSUMERGROUP, "your consumer group name”);复制代码

注意上面代码是可选的,如果设置了,consumer会首先创建consumerGroup,如果已经存在,则什么都不做,consumer中的snapshot会自动同步到日志服务的consumerGroup中,用户可以在日志服务的控制台查看consumer的消费进度。

1.4 容灾和exactly once语义支持

当打开Flink的checkpointing功能时,Flink log consumer会周期性的将每个shard的消费进度保存起来,当作业失败时,flink会恢复log consumer,并
从保存的最新的checkpoint开始消费。

写checkpoint的周期定义了当发生失败时,最多多少的数据会被回溯,也就是重新消费,使用代码如下:

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 开启flink exactly once语义
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
// 每5s保存一次checkpoint
env.enableCheckpointing(5000);复制代码

更多Flink checkpoint的细节请参考Flink官方文档Checkpoints。

1.5 补充材料:关联 API与权限设置

Flink log consumer 会用到的阿里云日志服务接口如下:

  • GetCursorOrData

    用于从shard中拉数据, 注意频繁的调用该接口可能会导致数据超过日志服务的shard quota, 可以通过ConfigConstants.LOG_FETCH_DATA_INTERVAL_MILLIS和ConfigConstants.LOG_MAX_NUMBER_PER_FETCH
    控制接口调用的时间间隔和每次调用拉取的日志数量,shard的quota参考文章[shard简介](https://help.aliyun.com/document_detail/28976.html).复制代码
    configProps.put(ConfigConstants.LOG_FETCH_DATA_INTERVAL_MILLIS, "100");
    configProps.put(ConfigConstants.LOG_MAX_NUMBER_PER_FETCH, "100");复制代码
  • ListShards

     用于获取logStore中所有的shard列表,获取shard状态等.如果您的shard经常发生分裂合并,可以通过调整接口的调用周期来及时发现shard的变化。复制代码
    // 设置每30s调用一次ListShards
    configProps.put(ConfigConstants.LOG_SHARDS_DISCOVERY_INTERVAL_MILLIS, "30000");复制代码
  • CreateConsumerGroup

    该接口调用只有当设置消费进度监控时才会发生,功能是创建consumerGroup,用于同步checkpoint。复制代码
  • ConsumerGroupUpdateCheckPoint

    该接口用户将flink的snapshot同步到日志服务的consumerGroup中。
    复制代码

子用户使用Flink log consumer需要授权如下几个RAM Policy:


2. Log Producer

注意producer只支持Flink at-least-once语义,这就意味着在发生作业失败的情况下,写入日志服务中的数据有可能会重复,但是绝对不会丢失。

用法示例如下,我们将模拟产生的字符串写入日志服务:

// 将数据序列化成日志服务的数据格式
class SimpleLogSerializer implements LogSerializationSchema<String> {public RawLogGroup serialize(String element) {RawLogGroup rlg = new RawLogGroup();RawLog rl = new RawLog();rl.setTime((int)(System.currentTimeMillis() / 1000));rl.addContent("message", element);rlg.addLog(rl);return rlg;}
}
public class ProducerSample {public static String sEndpoint = "cn-hangzhou.log.aliyuncs.com";public static String sAccessKeyId = "";public static String sAccessKey = "";public static String sProject = "ali-cn-hangzhou-sls-admin";public static String sLogstore = "test-flink-producer";private static final Logger LOG = LoggerFactory.getLogger(ConsumerSample.class);public static void main(String[] args) throws Exception {final ParameterTool params = ParameterTool.fromArgs(args);final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.getConfig().setGlobalJobParameters(params);env.setParallelism(3);DataStream<String> simpleStringStream = env.addSource(new EventsGenerator());Properties configProps = new Properties();// 设置访问日志服务的域名configProps.put(ConfigConstants.LOG_ENDPOINT, sEndpoint);// 设置访问日志服务的akconfigProps.put(ConfigConstants.LOG_ACCESSSKEYID, sAccessKeyId);configProps.put(ConfigConstants.LOG_ACCESSKEY, sAccessKey);// 设置日志写入的日志服务projectconfigProps.put(ConfigConstants.LOG_PROJECT, sProject);// 设置日志写入的日志服务logStoreconfigProps.put(ConfigConstants.LOG_LOGSTORE, sLogstore);FlinkLogProducer<String> logProducer = new FlinkLogProducer<String>(new SimpleLogSerializer(), configProps);simpleStringStream.addSink(logProducer);env.execute("flink log producer");}// 模拟产生日志public static class EventsGenerator implements SourceFunction<String> {private boolean running = true;@Overridepublic void run(SourceContext<String> ctx) throws Exception {long seq = 0;while (running) {Thread.sleep(10);ctx.collect((seq++) + "-" + RandomStringUtils.randomAlphabetic(12));}}@Overridepublic void cancel() {running = false;}}
}复制代码

2.1 初始化

Producer初始化主要需要做两件事情:

  • 初始化配置参数Properties, 这一步和Consumer类似, Producer有一些定制的参数,一般情况下使用默认值即可,特殊场景可以考虑定制:

    // 用于发送数据的io线程的数量,默认是8
    ConfigConstants.LOG_SENDER_IO_THREAD_COUNT
    // 该值定义日志数据被缓存发送的时间,默认是3000
    ConfigConstants.LOG_PACKAGE_TIMEOUT_MILLIS
    // 缓存发送的包中日志的数量,默认是4096
    ConfigConstants.LOG_LOGS_COUNT_PER_PACKAGE
    // 缓存发送的包的大小,默认是3Mb
    ConfigConstants.LOG_LOGS_BYTES_PER_PACKAGE
    // 作业可以使用的内存总的大小,默认是100Mb
    ConfigConstants.LOG_MEM_POOL_BYTES复制代码
    上述参数不是必选参数,用户可以不设置,直接使用默认值。复制代码
  • 重载LogSerializationSchema,定义将数据序列化成RawLogGroup的方法。

    RawLogGroup是log的集合,每个字段的含义可以参考文档[日志数据模型](https://help.aliyun.com/document_detail/29054.html)。
    复制代码

如果用户需要使用日志服务的shardHashKey功能,指定数据写到某一个shard中,可以使用LogPartitioner产生数据的hashKey,用法例子如下:

FlinkLogProducer<String> logProducer = new FlinkLogProducer<String>(new SimpleLogSerializer(), configProps);
logProducer.setCustomPartitioner(new LogPartitioner<String>() {// 生成32位hash值public String getHashKey(String element) {try {MessageDigest md = MessageDigest.getInstance("MD5");md.update(element.getBytes());String hash = new BigInteger(1, md.digest()).toString(16);while(hash.length() < 32) hash = "0" + hash;return hash;} catch (NoSuchAlgorithmException e) {}return  "0000000000000000000000000000000000000000000000000000000000000000";}});复制代码

注意LogPartitioner是可选的,不设置情况下, 数据会随机写入某一个shard。

2.2 权限设置:RAM Policy

Producer依赖日志服务的API写数据,如下:

  • log:PostLogStoreLogs
  • log:ListShards

当RAM子用户使用Producer时,需要对上述两个API进行授权:



相关文章:

【组队学习】【27期】Java编程语言

Java编程语言 论坛版块&#xff1a; http://datawhale.club/c/team-learning/33-category/33 开源内容&#xff1a; https://github.com/datawhalechina/team-learning-program/tree/master/Java 学习目标 Java独特的面向对象的抽象类编程特点&#xff0c;广泛应用于应用…

UI培训分享:如何提升自己的UI设计能力

相信很多UI设计师在工作中经常会遇到瓶颈&#xff0c;那么如何提升自己的UI设计能力?是我们要思考的一个问题&#xff0c;下面小编就为大家分享—些建议。 UI培训分享&#xff1a;如何提升自己的UI设计能力 1、多看 国内知名的设计网站&#xff0c;比如站酷网、花瓣网、多看优…

微信小程序使用阿里巴巴iconfont字体图标

打开阿里巴巴iconfont官网(http://www.iconfont.cn/);把用到的字体图标加到项目里面; 进入到项目里面&#xff0c;选择font class方式来使用&#xff0c;如果没有生成过代码的同学点生成&#xff0c;已经有代码的直接复制代码;iconfont.pngiconfont.png4.浏览器新建页面&…

IIS6 MVC3 配置

用mvc3做了一个网站&#xff0c;重写了下URL&#xff0c;http://www.xxxx.com/news/details/54.html. 结果在iis上预览找不到页面&#xff0c;但是在vs下就没问题直接运行就没问题。 具体的原因应该是找不到映射。 所以需要在iis上添加映射。 添加MVC的解析&#xff1a; 右击II…

【组队学习】【27期】动手学数据分析

动手学数据分析 论坛版块&#xff1a; http://datawhale.club/c/team-learning/25-category/25 开源内容&#xff1a; https://github.com/datawhalechina/hands-on-data-analysis 学习目标 以项目为主线&#xff0c;通过边学&#xff0c;边做以及边被引导的方式&#xf…

参加UI培训后可以找什么工作

UI设计在近几年备受大家的关注&#xff0c;很多企业对UI设计这个岗位也显得尤为重要&#xff0c;很多人都想转型学习UI设计技术&#xff0c;大多数人选择参加UI培训机构进行系统学习&#xff0c;那么通过系统培训的同学参加UI培训后可以找什么工作呢?来看看下面的详细介绍。 参…

Datawhale组队学习周报(第021周)

本文总结了本周&#xff08;07月05日~07月11日&#xff09;Datawhale组队学习的运行情况&#xff0c;我们一直秉承“与学习者一起成长的理念”&#xff0c;希望这个活动能够让更多的学习者受益。 第 25 期组队学习一共有 3 门开源课程&#xff0c;共组建了 3 个学习群&#xf…

alias重启终端失效的问题

如果使用命令 alias xxxxxx 那么登出以后&#xff0c;别名就会失效。下次登入的时候就不能用了。 为了保持别名可以把它写入.bashrc 在.bashrc的最后写入想要的别名&#xff0c;比如 alias zpll -al 然后退出&#xff0c;重新登入的时候就可以用了。 或者直接运行命令 source ~…

使用文本用户界面(NMTUI)进行网络配置

NetworkManager 文本用户界面&#xff08;TUI&#xff09;工具 nmtui 可提供一个文本界面配置由 NetworkManager 控制的网络。该工具包含在 NetworkManager-tui 子软件包中。写入时&#xff0c;不会默认随 NetworkManager 安装该子软件包。要安装 NetworkManager-tui&#xff0…

UI培训教程分享:常用的商业插画风格有哪些?

在UI设计岗位中插画设计是作为一名合格的UI设计师都会接触到的工作内容&#xff0c;运用插画设计&#xff0c;做到图文结合&#xff0c;令用户的印象也更加深刻&#xff0c;今天小编为大家分享的UI培训教程就是关于常用的商业插画风格有哪些?希望能够给大家带来帮助。 UI培训教…

快速构建Windows 8风格应用32-构建辅助磁贴

引言 Windows Phone中&#xff0c;我们开发者可能会开发的一个功能点是将数据列表中某一项“Pin To Start(固定到开始屏幕)”&#xff0c;大家都知道这种固定到开始屏幕的磁贴叫做辅助磁贴&#xff08;也叫二级磁贴&#xff09;&#xff0c;用户可以通过该辅助磁贴启动应用程序…

【第22周复盘】可以查成绩了!

「青少年编程竞赛交流群」已成立&#xff08;适合6至18周岁的青少年&#xff09;&#xff0c;公众号后台回复【Scratch】或【Python】&#xff0c;即可进入。如果加入了之前的社群不需要重复加入。 微信后台回复“资料下载”可获取以往学习的材料&#xff08;视频、代码、文档&…

UI培训教程分享:APP启动页UI界面设计

本期为大家分享的ui培训教程是关于APP启动页面的UI设计方面&#xff0c;作为一名合格的UI设计师&#xff0c;APP产品的启动页是需要会的&#xff0c;下面就来看看详细的教程吧。 UI培训教程分享&#xff1a;APP启动页UI界面设计 启动页面在APP中还是很有必要的&#xff0c;对于…

Nginx 配置

content_by_lua ‘’&#xff1b; set_by_lua $c "return ngx.var.a ngx.var.b"; rewrite_by_lua "ngx.var.a ngx.var.a 1"; access_by_lua if ngx.var.remote_addr "127.0.0.1" thenreturnendngx.exit(403);虚拟主机 server_name .a.org; …

cd: mkdir: rmdir: rm

1、1 绝对路径和相对路径绝对路径&#xff1a;是指在目录下的绝对位置&#xff0c;直接到的目标位置例如&#xff1a;只要看到这个路径&#xff1a;D&#xff1a;\图片\徐慧茹.jpg"文件是在D盘的“图片”目录中。类似这样完整的描述文件的路径就是绝对路径。相对路径&…

【Whalepaper】CV论文研读 - Involution内卷:超越Convolution的新算子

Whalepaper是由周郴莲负责的一个每周分享论文的活动&#xff0c;带你研读AI领域的论文&#xff0c;快来一起开源学术科研吧&#xff01; NLP 论文分享&#xff1a;每周日 晚上 九点CV 论文分享&#xff1a; 每周日 晚上 九点Res 论文分享&#xff1a;每周六 晚上 九点半 欢迎…

UI培训分享:UI设计师要掌握哪些知识点

UI设计师除了要学会自己所在行业的技术之外&#xff0c;还要了解一些其他的知识点&#xff0c;这些知识点都会在今后的工作中带来帮助&#xff0c;甚至是加分项&#xff0c;那么下面来看看小编为大家介绍的UI培训分享&#xff1a;UI设计师要掌握哪些知识点? UI培训分享&#x…

JavaScript 关闭窗口事件

方式一&#xff1a;(适用与IE浏览器&#xff0c;而且刷新不提示&#xff0c;只在点击浏览器关闭按钮的时候提示) <script type"text/javascript"> window.οnbefοreunlοadonclose; function onclose() { if(event.clientX>document.body.clientWidth&…

【直播】闫强:文本分类上分利器 -- Bert微调技巧大全

文本分类上分利器 – Bert微调技巧大全 直播信息 主讲人&#xff1a;ChallengeHub成员&#xff0c;中国人民大学硕士。 直播时间&#xff1a;2021年07月25日 15:00~16:00 直播内容&#xff1a; 经典论文介绍与解读模型继续预训练实现交流&答疑 直播网址&#xff1a; …

计算机基础第二课时

文件后缀名 1.也称文件扩展名(filename extension) 2.是操作系统用来标志文件类型的一种机制 3.通常来说&#xff0c;一个扩展名是跟在主文件名后面的&#xff0c;由一个分隔符分隔。例如&#xff1a;“前端开发知识要点.txt”的文件名中&#xff0c;前端开发知识要点是主文件名…

ui培训教程分享:平面设计怎样视觉空间感?

本期UI培训教程为大家分享的是关于平面设计怎样视觉空间感?在UI设计岗位中&#xff0c;视觉空间感对于设计师来说是非常重要的&#xff0c;所谓空间&#xff0c;是指立体形态周围的空虚部分&#xff0c;空间是无限的。任何空间形态的建立都必须借助立体形态来表达&#xff0c;…

Android组件框架:Android组件管理者ActivityManager

关于作者 郭孝星&#xff0c;程序员&#xff0c;吉他手&#xff0c;主要从事Android平台基础架构方面的工作&#xff0c;欢迎交流技术方面的问题&#xff0c;可以去我的Github提issue或者发邮件至guoxiaoxingse163.com与我交流。 第一次阅览本系列文章&#xff0c;请参见导读&a…

[转]HTTP协议详解

当今web程序的开发技术真是百家争鸣&#xff0c;ASP.NET, PHP, JSP&#xff0c;Perl, AJAX 等等。 无论Web技术在未来如何发展&#xff0c;理解Web程序之间通信的基本协议相当重要, 因为它让我们理解了Web应用程序的内部工作. 本文将对HTTP协议进行详细的实例讲解&#xff0c;内…

【Whalepaper】NLP论文研读 - Keyword-Attentive Deep Semantic Matching

Whalepaper是由周郴莲负责的一个每周分享论文的活动&#xff0c;带你研读AI领域的论文&#xff0c;快来一起开源学术科研吧&#xff01; NLP 论文分享&#xff1a;每周日 晚上 九点CV 论文分享&#xff1a; 每周日 晚上 九点Res 论文分享&#xff1a;每周六 晚上 九点半 欢迎…

web前端的就业前景好不好

web前端的就业前景好不好?一直有人都想知道这个答案&#xff0c;其实放眼互联网未来&#xff0c;web前端的发展前景都是非常好的&#xff0c;那么它的就业前景自热也是不错&#xff0c;具体来看看下面的详细介绍就知道了。 web前端的就业前景好不好?近几年的热门行业里&#…

android:HTTP通信 .

HTTP&#xff1a; 超文本传送协议&#xff08;hypertext transport protocol&#xff09;&#xff0c;用于传送WWW方式的数据。属于应用层的面向对象的协议。HTTP采用了请求/响应模型。客户端向服务器发送的请求包含了&#xff1a;请求的方法、URL、协议版本、请求修饰符、客户…

【青少年编程】【三级】打气球游戏

「青少年编程竞赛交流群」已成立&#xff08;适合6至18周岁的青少年&#xff09;&#xff0c;公众号后台回复【Scratch】或【Python】&#xff0c;即可进入。如果加入了之前的社群不需要重复加入。 微信后台回复“资料下载”可获取以往学习的材料&#xff08;视频、代码、文档&…

(转)如何修改maven的默认jdk版本

背景&#xff1a;在maven的配置文件中配置编译的jdk插件&#xff0c;就不需要在eclipse中进行重新的指定了。 问题 1、创建maven项目的时候&#xff0c;jdk版本是1.5版本&#xff0c;而自己安装的是1.7或者1.8版本。 2、每次右键项目名-maven->update project 时候&#xff…

Python适合初学者或者0基础学习吗?

Python适合初学者或者0基础小白学习吗?很多人都比较关注这个问题&#xff0c;因为近几年Python在互联网行业的发展显而易见&#xff0c;它的就业几率也非常高&#xff0c;具体来看看下面的详细介绍吧。 Python适合初学者或者0基础小白学习吗?与Java、C不同的是&#xff0c;Py…

CS5中动作和批处理

动作类似office里的宏。 窗口---动作。排列过多的图片可以窗口---排列。 先新组&#xff0c;然后新动作&#xff0c;完成后停止录制&#xff1b;点击新图片使其成为当前图片&#xff0c;再点击播放动作。 一个新组下可以有很多动作。 动作定义为快捷键&#xff0c;使用时可以双…