1. 概述

Apache Drill是一个低延迟的分布式查询引擎,用于大规模数据集,包括结构化和半结构化/嵌套数据。受Google的Dremel启发,Drill被设计成可扩展到数千个节点,并以BI/分析环境所需的交互速度查询数PB的数据。Drill对于大型数据集上的简短、交互式即席查询也很有用。Drill能够以JSON和Parquet等格式查询嵌套数据,并执行动态模式发现。

Drill具有高吞吐和低延迟的特性,它不使用像MapReduce、Tez或Spark这样的通用执行引擎。因此,Drill是灵活和高效的。Drill的优化器利用了基于规则和成本的优化技术,以及数据局部性和操作下推,具备将查询片段下推到后端数据源的能力。Drill还提供了一个列式和矢量化的执行引擎,从而提高了内存和CPU的效率。

如果为了简单使用,可以下载并且直接运行在笔记本电脑。当用于分析更大的数据集时,可以在Hadoop集群(最多可达1000个节点)上直接部署Drill。Drill利用集群中的聚合内存,使用乐观的流水线模型执行查询,并在工作集不适合内存时自动溢出到磁盘。

下面,本文将从Drill执行引擎的整体架构以及优化器技术、物理计划生成和执行方面,结合源代码进行分析介绍。

2. 整体架构

Drill包括一个分布式执行环境,专门为大规模数据处理而构建。它的核心是Drillbit服务,负责接收来自客户端的请求、处理查询并将结果返回给客户端(如下图一所示)。可以在Hadoop集群中的所有必须节点上安装并运行Drillbit服务,以形成分布式集群环境。当Drillbit在集群中的每个数据节点上运行时,Drill可以在查询执行过程中最大化数据局部性,而无需再网络上或节点之间移动数据。Drill使用Zookeeper维护集群成员和健康检测信息。当然,Drill不与Hadoop绑定 ,可以在任何分布式集群环境中运行。唯一的前提条件是依赖Zookeeper。

图一 客户端与Drillbit的关系

当Drillbit接收到客户端查询时,就成为了Foreman来驱动整个查询,如下图二所示。首先,Foreman中的解析器负责解析SQL,应用自定义规则将特定的SQL运算符转换为Drill可以理解的特定逻辑预算符,以此来形成一个逻辑计划。逻辑计划描述了生成查询结果所需的工作,并定义了要应用的数据源和操作。

其次,Foreman将逻辑计划发送到基于成本的优化器中,优化器应用各种类型的规则,将运算符和函数重新排列到最优计划中。优化器将逻辑计划转换为描述如何执行查询的物理计划。

图二 SQL到物理计划流程

Foreman中的并行化器将物理计划转换为多个阶段,称为major fragment和minor fragment。这些fragment创建了一个多级执行树,该树重写查询并对配置的数据源并行执行查询,将结果发送回客户端或应用程序。如图三所示。

图三 执行树

3. 查询接入

第一步,Drillbit服务端接收请求的处理器。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
package org.apache.drill.exec.rpc.user;
//包引入省略

class UserServerRequestHandler implements RequestHandler<BitToUserConnection> {
//处理各种用户的请求
private final UserWorker worker;

@Override
public void handle(BitToUserConnection connection, int rpcType, ByteBuf pBody, ByteBuf dBody, ResponseSender responseSender) {
case RpcType.RUN_QUERY_VALUE://执行查询
logger.debug("Received query to run. Returning query handle.");
try {
//解析请求为RunQuery protobuf信息
final RunQuery query = RunQuery.PARSER.parseFrom(new ByteBufInputStream(pBody));
//提交查询,并返回一个查询ID
final QueryId queryId = worker.submitWork(connection, query);
//设置返回的响应信息
responseSender.send(new Response(RpcType.QUERY_HANDLE, queryId));
break;
} catch (InvalidProtocolBufferException e) {
throw new RpcException("Failure while decoding RunQuery body.", e);
}
case RpcType.CANCEL_QUERY_VALUE://取消查询
//省略 worker.cancelQuery(queryId);
case RpcType.RESUME_PAUSED_QUERY_VALUE://恢复暂停的查询
//省略
case RpcType.GET_QUERY_PLAN_FRAGMENTS_VALUE://获取查询计划fragment
//省略 worker.getQueryPlan(connection, req)
case RpcType.GET_CATALOGS_VALUE://获取所有数据库分类,默认为DRILL
//省略
case RpcType.GET_SCHEMAS_VALUE://获取所有数据库
//省略
case RpcType.GET_TABLES_VALUE://获取指定库下的所有表
//省略
case RpcType.GET_COLUMNS_VALUE://获取指定表下的所有列
//省略
case RpcType.CREATE_PREPARED_STATEMENT_VALUE:
//省略
case RpcType.GET_SERVER_META_VALUE://获取服务端相关配置信息
//省略
default:
throw new UnsupportedOperationException(
String.format("UserServerRequestHandler received rpc of unknown type. Type was %d.", rpcType));
}
}

第二步,解析出请求类型后,进行相应处理。比如是查询请求,则初始化Foreman开始处理。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
package org.apache.drill.exec.work.user;
//包引入省略

public class UserWorker{

//管理foreman的执行
private final WorkerBee bee;

//提交查询
public QueryId submitWork(UserClientConnection connection, RunQuery query) {
//生成查询ID
final QueryId id = queryIdGenerator();
//本次会话查询数量加1
incrementer.increment(connection.getSession());
//初始化包工头,是一个线程
Foreman foreman = new Foreman(bee, bee.getContext(), connection, id, query);
//加入执行队列
bee.addNewForeman(foreman);
return id;
}
}

第三步,Foreman在驱动节点或根节点上,为单个查询管理所有的fragments(本地或远程)。主要流程为:当Foreman初始化时,查询处于准备状态。作为Runnable被提交进入队列排队,用于执行查询计划。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
package org.apache.drill.exec.work.foreman;
//包引入省略

public class Foreman implements Runnable {

//处理查询执行的一些细节,每个Foreman都持有一个
private final QueryManager queryManager;
private final QueryStateProcessor queryStateProcessor;

public Foreman(...) {

this.queryManager = new QueryManager(queryId, queryRequest, drillbitContext.getStoreProvider(), drillbitContext.getClusterCoordinator(), this);
//初始状态为:QueryState.PREPARING;
this.queryStateProcessor = new QueryStateProcessor(queryIdString, queryManager, drillbitContext, new ForemanResult());

}

@Override
public void run() {
//部分省略...
try {
//检测foreman是否在线,否则不能接收新的查询
if (!drillbitContext.isForemanOnline()) {
throw new ForemanException("Query submission failed since Foreman is shutting down.");
}
} catch (ForemanException e) {
logger.debug("Failure while submitting query", e);
//新增QueryState.FAILED的事件
queryStateProcessor.addToEventQueue(QueryState.FAILED, e);
}

//从QueryState.PREPARING状态切换到QueryState.PLANNING状态
queryStateProcessor.moveToState(QueryState.PLANNING, null);

try {
// 将查询请求转为具体的行为
switch (queryRequest.getType()) {
case LOGICAL://请求是逻辑计划
parseAndRunLogicalPlan(queryRequest.getPlan());
break;
case PHYSICAL://请求是物理计划
parseAndRunPhysicalPlan(queryRequest.getPlan());
break;
case SQL://请求是SQL
final String sql = queryRequest.getPlan();
logger.info("Query text for query with id {} issued by {}: {}", queryIdString,queryContext.getQueryUserName(), sql);
runSQL(sql);
break;
case EXECUTION://请求是fragments
runFragment(queryRequest.getFragmentsList());
break;
case PREPARED_STATEMENT://请求是预编译
runPreparedStatement(queryRequest.getPreparedStatementHandle());
break;
default:
throw new IllegalStateException();
}

} catch (...) {
//...
}

//当run方法执行完成后,foreman实例还一直存在,并且通过QueryManager的stateListener来间接地接收关于fragment完成的事件,直到一切都完成、失败或者被取消。
}
}

4. 查询执行

查询请求进入Foreman后,针对SQL类查询,进入foreman#runSQL(sql)方法处理。如下代码所示:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
package org.apache.drill.exec.work.foreman;
//包引入省略

public class Foreman implements Runnable {
//省略部分

private void runSQL(final String sql) throws ExecutionSetupException {
final Pointer<String> textPlan = new Pointer<>();
//将sql转为物理计划
final PhysicalPlan plan = DrillSqlWorker.getPlan(queryContext, sql, textPlan);
//执行物理计划
runPhysicalPlan(plan, textPlan);
}
}
4.1 SQL转物理计划

第一步,根据SQL类型进行相应的处理。比如,SQL查询进入DefaultSqlHandler#.getPlan(sqlNode)。代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
package org.apache.drill.exec.planner.sql;

public class DrillSqlWorker {

/**
* 将查询sql语句转为查询物理计划
* 捕获各种异常并尽可能转为用户异常
*/
public static PhysicalPlan getPlan(QueryContext context, String sql, Pointer<String> textPlan) throws ForemanSetupException {
//省略 convertPlan(context, sql, textPlan);
}

/**
* sql中使用的函数,本地函数库和远程函数库同步问题
*/
private static PhysicalPlan convertPlan(QueryContext context, String sql, Pointer<String> textPlan) {
// 省略 getQueryPlan(context, sql, textPlan);
}

private static PhysicalPlan getQueryPlan(QueryContext context, String sql, Pointer<String> textPlan) throws ForemanSetupException, RelConversionException, IOException, ValidationException {

//sql解析器,基于calcite parser.jj和自定义扩展parser.jj形成drill的sql解析
final SqlConverter parser = new SqlConverter(context);
//生成calcite表示的抽象语法树(AST)
final SqlNode sqlNode = checkAndApplyAutoLimit(parser, context, sql);
//sql操作处理
final AbstractSqlHandler handler;
final SqlHandlerConfig config = new SqlHandlerConfig(context, parser);

switch(sqlNode.getKind()) {
case EXPLAIN: //查看sql执行计划
//省略
case SET_OPTION://设置配置项
//省略
case DESCRIBE_TABLE://查看表元数据
//省略
case DESCRIBE_SCHEMA://查看数据库元数据
//省略
case CREATE_TABLE://建表
//省略
case DROP_TABLE:
case CREATE_VIEW:
case DROP_VIEW:
case OTHER_DDL:
case OTHER:
//省略
// fallthrough
default:
//sql处理,主要用于AST向执行计划的转换(含基于规则和成本的优化)
handler = new DefaultSqlHandler(config, textPlan);
context.setSQLStatementType(SqlStatementType.OTHER);
}

//省略...
return handler.getPlan(sqlNode);
}
}

第二步,DefaultSqlHandler#getPlan(sqlNode)实现了AST转物理计划的功能。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
package org.apache.drill.exec.planner.sql.handlers;

public class DefaultSqlHandler extends AbstractSqlHandler {

@Override
public PhysicalPlan getPlan(SqlNode sqlNode) throws ValidationException, RelConversionException, IOException, ForemanSetupException {
/**
* 1. 将AST转为关系表达式
* 其中采用HepPlanner将subquery和window function阶段设置的静态规则进行SQL改写
*/
final ConvertedRelNode convertedRelNode = validateAndConvert(sqlNode);
final RelDataType validatedRowType = convertedRelNode.getValidatedRowType();
final RelNode queryRelNode = convertedRelNode.getConvertedNode();
/**
* 2. 将原始关系表达式RelNode转为Drill逻辑关系表达式DrillRelNode
* 2.1. 调用convertToRawDrel
* 2.2. 在最上层添加一个Screen算子,用于显示查询结果
*/
final DrillRel drel = convertToDrel(queryRelNode);
/**
* 3. 将DrillRelNode转为Drill物理表达式PhysicalRelNode
* 3.1 通过VolcanoPlanner通过内置的以及存储自己设置的物理规则进行优化调整 (考虑成本)
* 3.2 针对join sql中多表字段冲突重命名
* 3.3 inner join时左右表交换,大表在左,小表在右(构建hash表)
* 3.4 将所有复杂输出拆解为对应的EXP$名称
*/
final Prel prel = convertToPrel(drel, validatedRowType);
logAndSetTextPlan("Drill Physical", prel, logger);
// 4. 将Drill物理关系表达式转为Drill物理算子
final PhysicalOperator pop = convertToPop(prel);
// 5. 提取算子,构建root、leaf的graph
final PhysicalPlan plan = convertToPlan(pop);
log("Drill Plan", plan, logger);
return plan;
}

protected DrillRel convertToRawDrel(final RelNode relNode) throws SqlUnsupportedException {
//开启针对含有limit 0的查询进行优化
if (context.getOptions().getOption(ExecConstants.EARLY_LIMIT0_OPT) &&
context.getPlannerSettings().isTypeInferenceEnabled() &&
FindLimit0Visitor.containsLimit0(relNode)) {
final DrillRel shorterPlan;
//返回列类型都是可识别的,则直接生成特定执行计划
if ((shorterPlan = FindLimit0Visitor.getDirectScanRelIfFullySchemaed(relNode)) != null) {
return shorterPlan;
}

//针对limit 0的查询,关闭分布式模式
if (FindHardDistributionScans.canForceSingleMode(relNode)) {
context.getPlannerSettings().forceSingleMode();
}
}

try {
// 适用于HepPlanner优化器的规则,在VolcanoPlanner优化器中会失败的
// 将集合操作算子(union)和其他算子进行转换
final RelNode setOpTransposeNode = transform(PlannerType.HEP, PlannerPhase.PRE_LOGICAL_PLANNING, relNode);

// 通过HepPlanner优化器从下往上的匹配顺序,进行目录剪枝优化。主要规则有:
// 1. *转为字段引用,便于分区剪枝和下推规则来检测可以修改或下推的字段
// 2. 没有group by或distinct的聚合,将scan优化为可以直接从统计信息获取
final RelNode pruned = transform(PlannerType.HEP_BOTTOM_UP, PlannerPhase.DIRECTORY_PRUNING, setOpTransposeNode);
//为剪枝后的关系表达式加上Drill逻辑特性,关联Drill对关系表达式的相关实现
final RelTraitSet logicalTraits = pruned.getTraitSet().plus(DrillRel.DRILL_LOGICAL);

final RelNode convertedRelNode;
if (!context.getPlannerSettings().isHepOptEnabled()) {
// HepPlanner关闭(默认开启),采用VolcanoPlanner
// LOGICAL_PRUNE_AND_JOIN 限定的规则集合,如分区剪枝和Join置换
convertedRelNode = transform(PlannerType.VOLCANO, PlannerPhase.LOGICAL_PRUNE_AND_JOIN, pruned, logicalTraits);
} else {
final RelNode intermediateNode2;
final RelNode intermediateNode3;
//默认开启通过HepPlanner进行分区剪枝
if (context.getPlannerSettings().isHepPartitionPruningEnabled()) {
//采用基于成本代价的VolcanoPlanner进行逻辑计划优化
// 1. PlannerPhase.LOGICAL 限定的规则集合
final RelNode intermediateNode = transform(PlannerType.VOLCANO, PlannerPhase.LOGICAL, pruned, logicalTraits);

// Hep对Join相关的优化,比如将join中的谓词传递到具体表中
// 2. PlannerPhase.TRANSITIVE_CLOSURE 限定的规则集合
final RelNode transitiveClosureNode =
transform(PlannerType.HEP, PlannerPhase.TRANSITIVE_CLOSURE, intermediateNode);

// Hep 分区剪枝,包含自定义的逻辑优化规则、limit推到scan等
// 3. PARTITION_PRUNING 限定的规则集合,主要侧重文件系统上的目录文件存储
intermediateNode2 = transform(PlannerType.HEP_BOTTOM_UP, PlannerPhase.PARTITION_PRUNING, transitiveClosureNode);

} else {
// 1. VolcanoPlanner 执行 PlannerPhase.LOGICAL_PRUNE 限定的规则
final RelNode intermediateNode =
transform(PlannerType.VOLCANO, PlannerPhase.LOGICAL_PRUNE, pruned, logicalTraits);
// 2. HepPlanner 执行 PlannerPhase.TRANSITIVE_CLOSURE 限定的规则
intermediateNode2 = transform(PlannerType.HEP, PlannerPhase.TRANSITIVE_CLOSURE, intermediateNode);
}

// HepPlanner 执行 PlannerPhase.JOIN_PLANNING 限定的规则,主要是转为DrillJoinRel
intermediateNode3 = transform(PlannerType.HEP_BOTTOM_UP, PlannerPhase.JOIN_PLANNING, intermediateNode2);

//默认开启
if (context.getPlannerSettings().isRowKeyJoinConversionEnabled()) {
// 将Join转为主键join,方便join条件下推
convertedRelNode = transform(PlannerType.HEP_BOTTOM_UP, PlannerPhase.ROWKEYJOIN_CONVERSION, intermediateNode3);
} else {
convertedRelNode = intermediateNode3;
}
}

//省略...

return drillRel;
}
} catch (RelOptPlanner.CannotPlanException ex) {
//省略...
}
}

}
4.2 物理计划转执行树

首先,将优化后的物理计划转为多个Fragment,其次,将多个Fragment通过并行器parallelizer形成一个可跨节点执行的fragment树。主要代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
package org.apache.drill.exec.work.foreman;
//包引入省略

public class Foreman implements Runnable {

private final QueryManager queryManager;

// 用于控制算子中buffer的大小
private final QueryResourceManager queryRM;

// 负责执行local或remote的fragment
private final FragmentsRunner fragmentsRunner;

public Foreman(...) {
// 集群模式下是DynamicResourceManager,
// 通过配置exec.queue.enable 可以任意开启或关闭ThrottledResourceManager
this.queryRM = drillbitContext.getResourceManager().newQueryRM(this);
}

private void runPhysicalPlan(final PhysicalPlan plan, Pointer<String> textPlan) throws ExecutionSetupException {
// 验证该计划是可执行的
validatePlan(plan);
// 为sort、join等算子分配内存
queryRM.visitAbstractPlan(plan);
// 封装了fragment的root、child的fragment信息
final QueryWorkUnit work = getQueryWorkUnit(plan, queryRM);
if (enableRuntimeFilter) {
runtimeFilterRouter = new RuntimeFilterRouter(work, drillbitContext);
runtimeFilterRouter.collectRuntimeFilterParallelAndControlInfo();
}
if (textPlan != null) {
queryManager.setPlanText(textPlan.value);
}
// 计算算子需要的内存是否可以获取
queryRM.visitPhysicalPlan(work);
// 根据cost判断是large query还是small query
queryRM.setCost(plan.totalCost());
queryManager.setTotalCost(plan.totalCost());
// work将fragment转为json信息
work.applyPlan(drillbitContext.getPlanReader());
logWorkUnit(work);

// 将work中的fragment相关信息传给fragmentrunner
fragmentsRunner.setFragmentsInfo(work.getFragments(), work.getRootFragment(), work.getRootOperator());

// 开始查询
startQueryProcessing();
}

private QueryWorkUnit getQueryWorkUnit(final PhysicalPlan plan, final QueryResourceManager rm) throws ExecutionSetupException {

final PhysicalOperator rootOperator = plan.getSortedOperators(false).iterator().next();
// 通过MakeFragmentsVisitor将物理算子分解为构成fragment的各个组成部分
final Fragment rootFragment = rootOperator.accept(MakeFragmentsVisitor.INSTANCE, null);
// 并行器创建工作单元
// 主要工作是初始化fragment树,基于每个fragment代价进行并行化;
// 计算每个minor fragment的算子需要的memory
return rm.getParallelizer(plan.getProperties().hasResourcePlan).generateWorkUnit(queryContext.getOptions().getOptionList(), queryContext.getCurrentEndpoint(), queryId, queryContext.getOnlineEndpoints(), rootFragment, initiatingClient.getSession(), queryContext.getQueryContextInfo());
}

private void startQueryProcessing() {
// 压入队列,进入QueryState.ENQUEUED状态
// 如果开启查询队列,先阻塞直到可以查询
enqueue();
// fragmentsRunner.submit();
// 提交root and non-root fragments执行
runFragments();
queryStateProcessor.moveToState(QueryState.RUNNING, null);
}
}
4.3 执行树运行

(1) 物理执行计划转为执行树后,主要获得三个主要信息FragmentRootPlanFragment以及List<PlanFragment>。fragments分发的执行逻辑在FragmentRunner中,代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
package org.apache.drill.exec.work.foreman;

public class FragmentsRunner {

private final WorkerBee bee;

private List<PlanFragment> planFragments;
private PlanFragment rootPlanFragment;
private FragmentRoot rootOperator;

public void submit() throws ExecutionSetupException {
// 省略...
logger.debug("Submitting fragments to run.");
// 首先启动root fragment,获取incoming buffers
setupRootFragment(rootPlanFragment, rootOperator);
// 启动non root fragment(local or remote),没有完成前可能会返回数据
setupNonRootFragments(planFragments);
logger.debug("Fragments running.");
}

// 启动非根fragment
private void setupNonRootFragments(final Collection<PlanFragment> fragments) throws ExecutionSetupException {
if (fragments.isEmpty()) {
return;
}

/*
* 向每个节点发送一条消息,不管该节点运行多少fragment
* 我们需要先运行intermediate fragments处于准备状态,因为leaf fragments一开始运行就会产生数据
* 下面将intermediate和leaf fragments分出来
*/
final Multimap<DrillbitEndpoint, PlanFragment> leafFragmentMap = ArrayListMultimap.create();
final Multimap<DrillbitEndpoint, PlanFragment> intFragmentMap = ArrayListMultimap.create();

// 这里省略分离的逻辑

// 将intermediate fragments调度到相应的节点准备就绪
scheduleIntermediateFragments(intFragmentMap);

injector.injectChecked(foreman.getQueryContext().getExecutionControls(), "send-fragments", ForemanException.class);

// 将属于一个节点的所有leaf fragments一个请求发过去
for (final DrillbitEndpoint ep : leafFragmentMap.keySet()) {
sendRemoteFragments(ep, leafFragmentMap.get(ep), null, null);
}
}

}

(2) fragment的接收通过ControlMessageHandler来处理,代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
package org.apache.drill.exec.work.batch;

public class ControlMessageHandler implements RequestHandler<ControlConnection> {

@Override
public void handle(ControlConnection connection, int rpcType, ByteBuf pBody, ByteBuf dBody,ResponseSender sender) throws RpcException {
switch (rpcType) {
// 部分省略

// 接受fragment并初始化
case RpcType.REQ_INITIALIZE_FRAGMENTS_VALUE: {
final InitializeFragments fragments = get(pBody, InitializeFragments.PARSER);
initializeFragment(fragments);
sender.send(ControlRpcConfig.OK);
break;
}
default:
throw new RpcException("Not yet supported.");
}
}

public Ack initializeFragment(InitializeFragments fragments) throws RpcException {
final DrillbitContext drillbitContext = bee.getContext();
for (int i = 0; i < fragments.getFragmentCount(); i++) {
// 在当前节点启动一个fragment(leaf or intermediate fragment, 本地或远程节点分发过来的)
// leaf fragment通过FragmentExecutor执行
// intermediate fragment通过NonRootFragmentManager执行
startNewFragment(fragments.getFragment(i), drillbitContext);
}
return Acks.OK;
}

}

(3) leaf fragment通过FragmentExecutor执行,代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
package org.apache.drill.exec.work.fragment;

public class FragmentExecutor implements Runnable {

private volatile RootExec root;

@Override
public void run() {
//省略...

// 查询计划中最后一个处理的节点,其他节点包括exchange和storage节点,驱动整个查询
final FragmentRoot rootOperator = this.rootOperator != null ? this.rootOperator :
fragmentContext.getPlanReader().readFragmentRoot(fragment.getFragmentJson());
// 根据物理算子树创建RecordBatch树(物理算子实现树)
root = ImplCreator.getExec(fragmentContext, rootOperator);
if (root == null) {
return;
}
// fragment切换到运行状态
updateState(FragmentState.RUNNING);
// 获取用户信息
final UserGroupInformation queryUserUgi = fragmentContext.isImpersonationEnabled() ? ImpersonationUtil.createProxyUgi(fragmentContext.getQueryUserName()) :ImpersonationUtil.getProcessUserUGI();

queryUserUgi.doAs(new PrivilegedExceptionAction<Void>() {
@Override
public Void run() throws Exception {
injector.injectChecked(fragmentContext.getExecutionControls(), "fragment-execution", IOException.class);

while (shouldContinue()) {

for (FragmentHandle fragmentHandle; (fragmentHandle = receiverFinishedQueue.poll()) != null;) {
// 针对完成的请求进行处理
root.receivingFragmentFinished(fragmentHandle);
}

// 算子的next,驱动下游算子
// 比如join、sort、fliter等算子会通过codegen技术提升cpu计算性能
if (!root.next()) {
// Fragment已经处理完所有数据
break;
}
}

return null;
}
});
}
}
4.4 CodeGen

之所以需要代码生成,是为了提升CPU的执行效率。比如,我们生成的物理执行计划中会存在一些类似if或switch这样的判断逻辑,而在运行时条件的内容已经可知,我们可以直接去掉不必要的分支。另外,运行时可以了解到循环代码的具体循环次数,从而可以将循环展开,同样去除了分支判断逻辑。通过类似这样的优化可以消除分支预测,从而极大的提升性能。可以参考这个帖子

DRILL的算子支持批量的处理,通过代码模板和列式数据,去除无效的分支处理,平铺代码,重新生成充分利用cache和利于CPU计算的代码,并编译交给JVM运行。

比如,拿FilterRecordBatch中的filter算子举例,代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
package org.apache.drill.exec.physical.impl.filter;

public class FilterRecordBatch extends AbstractSingleRecordBatch<Filter> {
//一个选择向量,最多可前置64K个值。用于两种情况:
// 1. 创建由筛选器保留的值列表
// 2. 为已排序的批次提供重定向级别
private SelectionVector2 sv2;
private Filterer filter;

public FilterRecordBatch(Filter pop, RecordBatch incoming, FragmentContext context) throws OutOfMemoryException {
super(pop, context, incoming);
}

@Override
protected boolean setupNewSchema() throws SchemaChangeException {
//清空选择器向量
if (sv2 != null) {
sv2.clear();
}
switch (incoming.getSchema().getSelectionVectorMode()) {
case NONE:
if (sv2 == null) {
sv2 = new SelectionVector2(oContext.getAllocator());
}
this.filter = generateSV2Filterer();
break;
case TWO_BYTE:
sv2 = new SelectionVector2(oContext.getAllocator());
this.filter = generateSV2Filterer();
break;
case FOUR_BYTE:
default:
throw new UnsupportedOperationException();
}

if (container.isSchemaChanged()) {
container.buildSchema(SelectionVectorMode.TWO_BYTE);
return true;
}
return false;
}

@Override
protected IterOutcome doWork() {
container.zeroVectors();
int recordCount = incoming.getRecordCount();
try {
//执行过滤操作
filter.filterBatch(recordCount);
} catch (SchemaChangeException e) {
throw new UnsupportedOperationException(e);
}

return getFinalOutcome(false);
}

// 创建selection vector v2 filterer
protected Filterer generateSV2Filterer() throws SchemaChangeException {
final ErrorCollector collector = new ErrorCollectorImpl();
final List<TransferPair> transfers = Lists.newArrayList();

// CodeGenerator用于创建源代码来实现abstract template
// 它包含多个ClassGenerator,用于实现外部类和内部类,与运行时生成的实例相关联
// DRILL支持两种方式生成和编译代码。1. 控制字节码 2. java源文件
final ClassGenerator<Filterer> cg = CodeGenerator.getRoot(Filterer.TEMPLATE_DEFINITION2, context.getOptions());

final LogicalExpression expr = ExpressionTreeMaterializer.materialize(popConfig.getExpr(), incoming, collector,
context.getFunctionRegistry(), false, unionTypeEnabled);
if (collector.hasErrors()) {
throw new SchemaChangeException(String.format("Failure while trying to materialize incoming schema. Errors:\n %s.", collector.toErrorString()));
}

cg.addExpr(new ReturnValueExpression(expr), ClassGenerator.BlkCreateMode.FALSE);

for (final VectorWrapper<?> v : incoming) {
final TransferPair pair = v.getValueVector().makeTransferPair(container.addOrGet(v.getField(), callBack));
transfers.add(pair);
}

try {
final TransferPair[] tx = transfers.toArray(new TransferPair[transfers.size()]);
CodeGenerator<Filterer> codeGen = cg.getCodeGenerator();
codeGen.plainJavaCapable(true);
//代码模板和incoming数据相集合,生成高效执行的filter实例
final Filterer filter = context.getImplementationClass(codeGen);
filter.setup(context, incoming, this, tx);
return filter;
} catch (ClassTransformationException | IOException e) {
throw new SchemaChangeException("Failure while attempting to load generated class", e);
}
}
}

5. 参考资料