当前位置: 首页 > 编程日记 > 正文

Drill storage plugin实现原理分析

Drill Storage Plugin介绍

Drill是一个交互式SQL查询引擎,官方默认支持的数据源有hive、hbase、kafka、kudu、mongo、opentsdb、jdbc等,其中jdbc storage plugin可以覆盖所有支持jdbc协议的数据源,如:mysql、oracle等关系型数据库。所有数据源的接入都是通过drill的storage plugin实现的,理论上Drill通过storage plugin机制可以支持对任何数据源进行异构查询。

Drill作为一个SQL查询引擎,它跟传统数据库有着很多相似之处,主要包括SQL Parser、SQL Validator、Query Optimizer、Data Flow Operators等几部分。如下图所示,SQL Parser阶段会把SQL语句解析为SQL查询语法树,这个阶段Storage Plugin没有介入。 从SQL Valiator阶段Storage Plugin开始介入,在这个阶段会通过Storage Plugin获取Schema信息对SQL进行校验,如判断表、字段是否存在等。Query Optimzer阶段会把SqlNode转换为PhysicalPlan,在这个过程中会通过Storage Plugin获取Planner Rule对SQL进行优化。Data Flow Operators阶段是对目标数据源进行数据读取,这部分操作是通过Storage Plugin的RecordReader实现的。

clipboard.png
图 1

Drill Storage Plugin加载机制

DrillBit为drill的主类,drill启动时会自动加载所有有效的Storage Plugin,加载时序图如图2所示。

clipboard.png
图 2

Plugin的注册主要是在类StoragePluginRegistryImpl中完成,插件注册主要分为以下几步。

第一步是加载classpath下所有drill-module.conf文件,这个文件配置了需要扫描的包路径,在这个包路径下接口StoragePlugin所有实现类都会被加载;第二步是校验,首先校验的是接口StoragePlugin的实现类的构造方法是否符合标准要求,构造方法参数必须为3个,且三个参数的类型必须分别为StoragePluginConfig,DrillbitContext、String。其次是校验plugin的配置是否有效,加载plugin配置,如果是首次启动,会读取classpath下bootstrap-storage-plugins.json文件,每个plugin都会对应一个这样的json文件。这个json文件最终会反序列为StoragePluginConfig实现类对象。非首次启动bootstrap-storage-plugins.json文件不会被加载。drill会以本地/tmp/drill/sys.storage_plugins目录下配置文件为准,集群模式配置信息保存在zookeeper /drill/sys.storage_plugins目录下。第三步是通过发射的方式进行插件实例化并注册。整个Plugin的注册流程如图 3所示

clipboard.png
图 3

Drill查询流程分析

在正式介绍Drill Storage Plugin开发实战之前我们先了解下Drill的查询流程,这样有助于对Storage Plugin进行深入理解,而不是简单的依葫芦画瓢。Drill是分布式的,并且节点之间是对等的,所有drill节点都可以对外提供服务,当节点接收到sql查询请求之后,在UserWorker中会拉起一个Foreman线程来单独处理这个请求,Foreman会完成sql到物理执行计划的转换,并根据物理执行计划切分成可并行执行的Fragment,Foreman根据一定的算法把Fragment分发到本机或者其他drill节点进行执行,执行完之后会在接收初始请求的Drill节点中进行结果合并,然后返回给客户端。如图4 所示。

clipboard.png
图 4

一条SQL到物理执行计划,会经过SqlNode(sql节点解析树)、RelNode(关系表达式)、DrillRel(drill关系表达式)、Prel(物理关系表达式)、PhysicalPlan(物理执行计划)几个步骤的转换。如图 5所示

clipboard.png
图 5

SqlNode、RelNode、DrillRel、Prel都是树形结构,以一条简单查询druid数据源的SQL为例,SQL->SqlNode->RelNode这两个阶段只会存在节点之间的转换,不会考虑目标数据源之间的差异进行SQL优化和改写之类的动作。RelNode->DrilRel节点会进行逻辑执行计划的优化,示例中对filter进行了下推操作。DrillRel->Prel节点会进行物理执行计划的优化。 各阶段文本化之后如下所示。

原始SQL

clipboard.png

RelNode(关系表达式节点树),有3个节点分别为LogicalProject、LogicalFilter、EnumerableTableScan

clipboard.png

DrillRel(Drill关系表达式节点树),转换为drill中关系表达式节点

clipboard.png

Prel(物理关系表达式节点树),这一步应用了物理优化规则,把filter下推到scan里面了

clipboard.png

Drill Storage Plugin开发实战

经过前面的介绍,大家对Drill Storage Plugin作用与原理应该已经有一个比较全面的了解。接下来以hbase为例详细介绍drill storage plugin开发流程。 Hbase是一个分布式列存数据库,默认是不支持SQL查询的。为了实现在Drill中用SQL对Hbase进行异构查询,需要实现一个Hbase的storage plugin。 下面以Hbase storage plugin为例介绍storage plugin的开发流程。

1、在目录contrib新建mvn模块,如: stroage-hbase

2、在新建的模块resource目录新建两个文件drill-module.conf和bootstrap-storage-plugins.json。drill-module.conf定义plugin所在的包路径,在plugin加载的时候会用到。bootstrap-storage-plugins.json文件是一些必要连接Hbase的配置。Drill首次启动时会用这个文件作为Plugin的初始配置。

clipboard.png

3、修改UserBitShared.proto文件,在CoreOpertorType对象里面新增一行HBASE_SUB_SCAN = 33,33这个数字需根据自身实际情况进行递增。修改proto文件之后需要重新编译,具体参考protocol模块下的readme.txt

4、修改distribution模块下的bin.xml, 新增org.apache.drill.contrib:drill-hbase-storage

5、代码实现,部分代码剖析如下

HbaseStoragePlugin: 相当于plugin的总入口,对scheme进行注册,加载插件配置,指定优化规则等

clipboard.png

HbaseStoragePluginConfig: Plugin配置,参数与bootstrap-storage-plugins.json对应

HBaseSchemaFactory: Schema工厂,Schema相当于一个表元数据,包括表名、字段、以及字段类型等信息

public class HBaseSchemaFactory extends AbstractSchemaFactory {//注册schema,schema是有层级,查询时每层之间用.分隔@Overridepublic void registerSchemas(SchemaConfig schemaConfig, SchemaPlus parent) throws IOException {HBaseSchema schema = new HBaseSchema(getName());SchemaPlus hPlus = parent.add(getName(), schema);schema.setHolder(hPlus);}class HBaseSchema extends AbstractSchema {HBaseSchema(String name) {super(Collections.emptyList(), name);}// hbase schema只有一层@Overridepublic AbstractSchema getSubSchema(String name) {return null;}@Overridepublic Table getTable(String name) {HBaseScanSpec scanSpec = new HBaseScanSpec(name);try {return new DrillHBaseTable(getName(), plugin, scanSpec);} catch (Exception e) {// Calcite firstly looks for a table in the default schema, if the table was not found,// it looks in the root schema.// If the table does not exist, a query will fail at validation stage,// so the error should not be thrown here.logger.warn("Failure while loading table '{}' for database '{}'.", name, getName(), e.getCause());return null;}}//调用hbase提供api,获取表信息@Overridepublic Set<String> getTableNames() {try(Admin admin = plugin.getConnection().getAdmin()) {HTableDescriptor[] tables = admin.listTables();Set<String> tableNames = Sets.newHashSet();for (HTableDescriptor table : tables) {tableNames.add(new String(table.getTableName().getNameAsString()));}return tableNames;} catch (Exception e) {logger.warn("Failure while loading table names for database '{}'.", getName(), e.getCause());return Collections.emptySet();}}@Overridepublic String getTypeName() {return HBaseStoragePluginConfig.NAME;}}
}
public abstract class AbstractHBaseDrillTable extends DrillTable {protected HTableDescriptor tableDesc;public AbstractHBaseDrillTable(String storageEngineName, StoragePlugin plugin, Object selection) {super(storageEngineName, plugin, selection);}//字段类型转换,把hbase中的字段类型映射为SQL类型@Overridepublic RelDataType getRowType(RelDataTypeFactory typeFactory) {ArrayList<RelDataType> typeList = new ArrayList<>();ArrayList<String> fieldNameList = new ArrayList<>();fieldNameList.add(ROW_KEY);typeList.add(typeFactory.createSqlType(SqlTypeName.ANY));Set<byte[]> families = tableDesc.getFamiliesKeys();for (byte[] family : families) {fieldNameList.add(Bytes.toString(family));//family映射为map结构typeList.add(typeFactory.createMapType(typeFactory.createSqlType(SqlTypeName.VARCHAR), typeFactory.createSqlType(SqlTypeName.ANY)));}return typeFactory.createStructType(typeList, fieldNameList);}
}

HbaseSubScan: 关系表达式的叶子节点,目标数据源能够识别的查询语言会在这里面定义

HbaseGroupScan: SubScan的一个超集

HbaseScanBatchCreator:根据节点泛型HbaseSubScan反射获取,获取HbaseSubScan参数并创建HbaseRecordReader对象

HbaseRecordReader:实现对目标数据源的进行记录读取,setup方法是在读取记录之前进行一些初始化工作, next方法中会调用hbase的api获取数据并放入OutputMutator对象中。

Rule: drill的优化规则,可用在逻辑计划、物理计划等优化阶段

实现一个Storage Plugin主要难点是在如何实现优化规则,where条件、聚合函数、分组、排序等是否可以下推都是由优化规则决定。下面以一个where条件下推为例介绍如何实现一个Rule。如图7所示,Filter经过下推转换为一颗 等价的查询树

clipboard.png
图 7

Drill中优化规则很多,所有规则都是StoragePluginOptimizerRule类的子类,在进行逻辑计划和物理计划优化时并不是所有规则都会应用,只有匹配上的规则才会应用。匹配策略分两级,一级匹配比较粗略,只要查询节点树最小子树与规则类的构造放中操作类型class匹配就算匹配。如图7左边圈中部分和图8圈中部分所示。二级匹配是在matches方法,返回true才会执行onMatch方法进行关系表达式等价转换,这个方法默认是返回true,需要根据实际情况决定是否重写。在这个列子中我们进一步判断GroupScan是否是HbaseGroupScan实例,也就是说只有查询Hbase数据源的查询才会匹配这个规则。这里要说明一点的是,你在其中一个Storage Plugin中写的规则,对其他Storage Plugin来说都是可以使用的。

public abstract class HBasePushFilterIntoScan extends StoragePluginOptimizerRule {private HBasePushFilterIntoScan(RelOptRuleOperand operand, String description) {super(operand, description);}//FilterPrel.class、ScanPrel.class与图6圈中的部分匹配public static final StoragePluginOptimizerRule FILTER_ON_SCAN = new HBasePushFilterIntoScan(RelOptHelper.some(FilterPrel.class, RelOptHelper.any(ScanPrel.class)), "HBasePushFilterIntoScan:Filter_On_Scan") {@Overridepublic void onMatch(RelOptRuleCall call) {final ScanPrel scan = (ScanPrel) call.rel(1);final FilterPrel filter = (FilterPrel) call.rel(0);final RexNode condition = filter.getCondition();HBaseGroupScan groupScan = (HBaseGroupScan)scan.getGroupScan();if (groupScan.isFilterPushedDown()) {/** The rule can get triggered again due to the transformed "scan => filter" sequence* created by the earlier execution of this rule when we could not do a complete* conversion of Optiq Filter's condition to HBase Filter. In such cases, we rely upon* this flag to not do a re-processing of the rule on the already transformed call.*/return;}doPushFilterToScan(call, filter, null, scan, groupScan, condition);}//二级匹配@Overridepublic boolean matches(RelOptRuleCall call) {final ScanPrel scan = (ScanPrel) call.rel(1);//hbase数据源才会匹配if (scan.getGroupScan() instanceof HBaseGroupScan) {return super.matches(call);}return false;}};protected void doPushFilterToScan(final RelOptRuleCall call, final FilterPrel filter, final ProjectPrel project, final ScanPrel scan, final HBaseGroupScan groupScan, final RexNode condition) {final LogicalExpression conditionExp = DrillOptiq.toDrill(new DrillParseContext(PrelUtil.getPlannerSettings(call.getPlanner())), scan, condition);final HBaseFilterBuilder hbaseFilterBuilder = new HBaseFilterBuilder(groupScan, conditionExp);final HBaseScanSpec newScanSpec = hbaseFilterBuilder.parseTree();if (newScanSpec == null) {return; //no filter pushdown ==> No transformation.}final HBaseGroupScan newGroupsScan = new HBaseGroupScan(groupScan.getUserName(), groupScan.getStoragePlugin(),newScanSpec, groupScan.getColumns());newGroupsScan.setFilterPushedDown(true);//filter下推至scan中final ScanPrel newScanPrel = ScanPrel.create(scan, filter.getTraitSet(), newGroupsScan, scan.getRowType());// Depending on whether is a project in the middle, assign either scan or copy of project to childRel.final RelNode childRel = project == null ? newScanPrel : project.copy(project.getTraitSet(), ImmutableList.of(newScanPrel));if (hbaseFilterBuilder.isAllExpressionsConverted()) {/** Since we could convert the entire filter condition expression into an HBase filter,* we can eliminate the filter operator altogether.*/call.transformTo(childRel);} else {call.transformTo(filter.copy(filter.getTraitSet(), ImmutableList.of(childRel)));}}}

效果演示

演示数据准备,表staff包含一个列簇f1, 数据详细信息如下

clipboard.png

演示SQL 1

clipboard.png

结果1

clipboard.png

从上图结果可看出,同一个列簇f1是在同一个字段显示的,并且是一个json格式,列值都是经过编码的,这些结果的输出形式都是在HbaseRecordReader类中指定的,在类HbaseRecordReader中指定了row_key的输出类型为VarBinary, 列簇的输出类型为Map,Map中value为VarBinary类型。如果想要个列单独显示,SQL可以按以下方式书写。

演示SQL 2

clipboard.png

结果2

clipboard.png

演示SQL 3

按列条件查询

clipboard.png

结果3

clipboard.png

小结

本文对Drill SQL查询流程、Storage Plugin加载机制、以及Storage Plugin实现原理进行了分析。 希望对读者自己实现一个Storage Plugin有所帮助

本文作者:择云

阅读原文

本文为云栖社区原创内容,未经允许不得转载。

相关文章:

C# 创建控制台应用程序

C#的语法总结将全部通过控制台应用程序总结。 一、文件-》新建项目 二、 三、创建后的界面。 转载于:https://www.cnblogs.com/nevernet/archive/2008/12/08/1350024.html

(C++)1009 Product of Polynomials

笔记&#xff1a;1.如果开三个数组&#xff0c;乘积数组的大小是2*10^3&#xff0c;而不是10^6&#xff08;汗颜&#xff09;&#xff0c;如果开后者那么大的数组&#xff0c;需要定义在main函数外面了(实现一) 2.注意到double数组可能是非常稀疏的&#xff0c;为了减少运算量…

lispbox 安装运行.sh的时候出现 lispbox.sh: 2: lispbox.sh: Bad substitution

安装lispbox时使用tar命令将压缩文件解压之后cd进入之后在运行.sh文件时出现了如下情况。 $ sh lispbox.sh lispbox.sh: 2: lispbox.sh: Bad substitution 提示 bad substitution 这是因为linux有不同的shell 使用sh运行错误应该使用bash运行即可。 改成bash -x lispbox.sh即可…

Python-字符串操作方法 [转]

Python-String-Function 字符串中字符大小写的变换&#xff1a; * S.lower() #小写 * S.upper() #大写 * S.swapcase() #大小写互换 * S.capitalize() #首字母大写 * String.capwords(S) #这是模块中的方法。它把S用split()函数分开&#xff0…

旅行家的预算[贪心]

题目 Problem description 一个旅行家想驾驶汽车以最少的费用从一个城市到另一个城市&#xff08;假设出发时油箱是空的&#xff09;。给定两个城市之间的距离D1、汽车油箱的容量C&#xff08;以升为单位&#xff09;&#xff0e;每升汽油能行驶的距离D2、出发点每升汽油价格P和…

(C++)1028 人口普查

笔记&#xff1a;把年龄转化成一个七位的整数是创举&#xff0c;但是要想清楚&#xff0c;年龄越大&#xff0c;这个数字越小orz #include<cstdio> #include<cmath> #include<cstring> #include<algorithm> using namespace std;struct person{char n…

说说.net事件和委托。

一说到.net的事件&#xff0c;也许你会想都说教程满天飞&#xff0c;一个被说烂了的东西还有什么可以说的啊&#xff1f;是啊&#xff0c;的确有很多好文章剖析事件&#xff0c;比如张子阳先生的C# 中的委托和事件重温Observer模式--热水器改 这两篇文章让我弄懂了委托、事件和…

【cs229-Lecture2】Linear Regression with One Variable (Week 1)(含测试数据和源码)

从Ⅱ到Ⅳ都在讲的是线性回归&#xff0c;其中第Ⅱ章讲得是简单线性回归&#xff08;simple linear regression, SLR&#xff09;&#xff08;单变量&#xff09;&#xff0c;第Ⅲ章讲的是线代基础&#xff0c;第Ⅳ章讲的是多元回归&#xff08;大于一个自变量&#xff09;。 本…

101种设计模式

https://sourcemaking.com/design-patterns-and-tips

(C++)1032 挖掘机技术哪家强

笔记&#xff1a;考虑到输入只有一所学校&#xff0c;且得分还为0的特殊情况&#xff0c;应该把high初始化为1 #include<cstdio> #include<cmath> #include<cstring> #include<algorithm> using namespace std;int grds[100010] {0};int main(){int …

数据库打开报错: 值不能为空

报错信息如下&#xff1a; 数据库客户端打不开 解决方案&#xff1a; 找到下面的目录C:\Users\<username>\AppData\Local\Temp 创建一个空文件夹 名称是&#xff1a; 2 重新打开数据库转载于:https://www.cnblogs.com/Mander/p/3921251.html

学习 JavaScript (四)核心概念:操作符

JavaScript 的核心概念主要由语法、变量、数据类型、操作符、语句、函数组成&#xff0c;前面三个上一篇文章已经讲解完了。后面三个内容超级多&#xff0c;这篇文章主要讲解的是操作符。 操作符 什么叫做操作符&#xff1f; 这是一种工具&#xff0c;帮助我们操作字符串、数字…

(C++)1011 World Cup Betting

笔记&#xff1a;我觉得这一次的代码很优雅 #include<cstdio> #include<cmath> #include<cstring> #include<algorithm> using namespace std;int maxPro(double a[3]){//返回值最大的下标 int idx0,max_pro0;for(int i0;i<3;i){if(a[i]>max_pr…

Ext学习-前后交互模式介绍

在前后台交互模式的介绍中&#xff0c;实际上就是Store中Proxy相关的内容&#xff0c;比如Ajax提交。 所以详细的文档请参考&#xff1a; Ext学习-基础概念&#xff0c;核心思想介绍 中关于数据模型和MVC结构部分。 作者&#xff1a;sdjnzqr 出处&#xff1a;http://www.cnblog…

让你彻底明白什么叫游戏引擎(1)

在阅读各种游戏介绍的时候我们常常会碰见“引擎”&#xff08;Engine&#xff09;这个单词&#xff0c;引擎在游戏中究竟起着什么样的作用&#xff1f;它的进化对于游戏的发展产生了哪些影响&#xff1f;希望下面这篇文章能为大家释疑。以希望能够帮助一些刚进入游戏行业的小菜…

185.dubbo 后台管理系统

2019独角兽企业重金招聘Python工程师标准>>> 1. 效果及目的 效果&#xff1a; 目的&#xff1a;查看 管理服务 2. 启动要求 &#xff08;1&#xff09;项目是dubbo &#xff08;2&#xff09;jdk 1.7 (3) dubbo的war要与zookeeper在同一台服务上 3. 安装zookeeper 要…

(C++)1027 打印沙漏

笔记&#xff1a;星号右边的空格不用打印 #include<cstdio> #include<cmath> #include<cstring> #include<algorithm> using namespace std;int main(){int n;char c;scanf("%d %c",&n,&c);int clock[23];int col;for(int i1;i<…

黑帽大会2014:10个酷炫的黑客工具

http://www.csdn.net/article/2014-08-21/2821304 用于恶意软件分析的Maltrieve 安全研究人员使用Maltrieve工具收集服务器上的恶意软件。通过这个开源工具&#xff0c;恶意软件分析人员可以通过分析URL链表和已知的托管地址获得最新鲜的样本。 Kyle Maxwell是VeriSign的一名威…

C#无符号右移

/// <summary>/// 无符号右移&#xff0c;与JS中的>>>等价/// </summary>/// <param name"x">要移位的数</param>/// <param name"y">移位数</param>/// <returns></returns>public static int …

1027 Colors in Mars

笔记&#xff1a;本题属于进制转换&#xff0c;但是考察的重点不在除基取余上&#xff0c;因为转化得到的数只有两位&#xff0c;很容易得到每位上面应该是什么&#xff0c;但是和其他题不同的地方在于&#xff0c;每位可填的不见得是0~9&#xff0c;还包括ABC&#xff0c;这就…

json对象和json字符串转换方法

在WEB数据传输过程中&#xff0c;json是以文本&#xff0c;即字符串的轻量级形式传递的&#xff0c;而客户端一般用JS操作的是接收到的JSON对象&#xff0c;所以&#xff0c;JSON对象和JSON字符串之间的相互转换、JSON数据的解析是关键。 先明确2个概念例如&#xff1a; JSON字…

python-docx操作

import docx# 读取docx文档内容def readWord():doc docx.Document(demo.docx)fullText []for para in doc.paragraphs:fullText.append( para.text)print(\n . join(fullText))readWord()官方API&#xff1a;https://python-docx.readthedocs.io/en/latest/index.html ;转载…

javascript中FORM表单的submit()方法经验教训.

author songfeng 因为JS内对象的方法实际上是存储语句的一个类似于指针的东西. 其指向了内存的一个位置, 也就是其函数的位置,当然也可以让其指向一个变量值. var foo new Object();foo.bar function() {} //现在foo.bar就是指向了这个函数的内存位置.foo.bar &q…

1058 A+B in Hogwarts

笔记&#xff1a;和乙级的在霍格沃兹找零钱不同&#xff0c;这里不需要判断给出的两个数的大小&#xff0c;也没必要先都换算成最小的单位&#xff0c;可以直接从最低位开始加&#xff0c;如果超过该位的范围&#xff0c;则向上一位进一即可。 #include<cstdio> #includ…

DDD领域驱动设计之聚合、实体、值对象

关于具体需求&#xff0c;请看前面的博文&#xff1a;DDD领域驱动设计实践篇之如何提取模型&#xff0c;下面是具体的实体、聚合、值对象的代码&#xff0c;不想多说什么是实体、聚合等概念&#xff0c;相信理论的东西大家已经知晓了。本人对DDD表示好奇&#xff0c;没有在真正…

C#用 SendKyes 结合 Process 或 API FindWindow、SendMessage(PostMessage) 等控制外部程序

Win32 平台是 消息驱动模式.Net 框架是 事件驱动模式标题所指的 “控制外部程序”&#xff0c;外部程序是指与本程序无内在相关性的另外一个程序 基于上面提到的&#xff0c;对于.NET的winform程序&#xff0c;在默认情况下&#xff08;即未对接收消息的事件做自定义处理&#…

springMVC swagger2

参考地址&#xff1a;https://www.cnblogs.com/exmyth/p/7183753.html https://blog.csdn.net/programmer_sean/article/details/72236948 1. maven 依赖 <dependency><groupId>io.springfox</groupId><artifactId>springfox-swagger2</artifactId&…

1061 Dating

笔记&#xff1a; 第一个输出根据的是大写字母 第二个输出根据的是0-9andA-N 第三个输出根据的是大写字母和小写字母 知道范围便方便确定边界 两两比对时&#xff0c;先遍历一个字符串&#xff0c;遇到在范围内的字符&#xff0c;看其和第二个字符串同位置的字符是否相等 …

PA 项目创建任务

---- 创建任务 DECLAREp_project_id NUMBER : 155233;p_task_number VARCHAR2(240) : CXYTEST0001;p_task_name VARCHAR2(240) : 接口测试CXYTEST0001;p_task_description VARCHAR2(240) : TASKCXYTEST0001;p_scheduled_start_date DAT…

SSM登陆拦截器实现

首先在springmvc中配置拦截器 <!-- 配置拦截器 --><mvc:interceptors><mvc:interceptor><!-- 拦截所有mvc控制器 --><mvc:mapping path"/**"/><!-- mvc:exclude-mapping是另外一种拦截&#xff0c;它可以在你后来的测试中对某个页面…