1. 概述 Apache Drill是一个低延迟的分布式查询引擎,用于大规模数据集,包括结构化和半结构化/嵌套数据。受Google的Dremel启发,Drill被设计成可扩展到数千个节点,并以BI/分析环境所需的交互速度查询数PB的数据。Drill对于大型数据集上的简短、交互式即席查询也很有用。Drill能够以JSON和Parquet等格式查询嵌套数据,并执行动态模式发现。
Drill具有高吞吐和低延迟的特性,它不使用像MapReduce、Tez或Spark这样的通用执行引擎。因此,Drill是灵活和高效的。Drill的优化器利用了基于规则和成本的优化技术,以及数据局部性和操作下推,具备将查询片段下推到后端数据源的能力。Drill还提供了一个列式和矢量化的执行引擎,从而提高了内存和CPU的效率。
如果为了简单使用,可以下载并且直接运行在笔记本电脑。当用于分析更大的数据集时,可以在Hadoop集群(最多可达1000个节点)上直接部署Drill。Drill利用集群中的聚合内存,使用乐观的流水线模型执行查询,并在工作集不适合内存时自动溢出到磁盘。
下面,本文将从Drill执行引擎的整体架构以及优化器技术、物理计划生成和执行方面,结合源代码进行分析介绍。
2. 整体架构 Drill包括一个分布式执行环境,专门为大规模数据处理而构建。它的核心是Drillbit服务,负责接收来自客户端的请求、处理查询并将结果返回给客户端(如下图一所示)。可以在Hadoop集群中的所有必须节点上安装并运行Drillbit服务,以形成分布式集群环境。当Drillbit在集群中的每个数据节点上运行时,Drill可以在查询执行过程中最大化数据局部性,而无需再网络上或节点之间移动数据。Drill使用Zookeeper维护集群成员和健康检测信息。当然,Drill不与Hadoop绑定 ,可以在任何分布式集群环境中运行。唯一的前提条件是依赖Zookeeper。
图一 客户端与Drillbit的关系 当Drillbit接收到客户端查询时,就成为了Foreman来驱动整个查询,如下图二所示。首先,Foreman中的解析器负责解析SQL,应用自定义规则将特定的SQL运算符转换为Drill可以理解的特定逻辑预算符,以此来形成一个逻辑计划。逻辑计划描述了生成查询结果所需的工作,并定义了要应用的数据源和操作。
其次,Foreman将逻辑计划发送到基于成本的优化器中,优化器应用各种类型的规则,将运算符和函数重新排列到最优计划中。优化器将逻辑计划转换为描述如何执行查询的物理计划。
图二 SQL到物理计划流程 Foreman中的并行化器将物理计划转换为多个阶段,称为major fragment和minor fragment。这些fragment创建了一个多级执行树,该树重写查询并对配置的数据源并行执行查询,将结果发送回客户端或应用程序。如图三所示。
图三 执行树
3. 查询接入 第一步,Drillbit服务端接收请求的处理器。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 package org.apache.drill.exec.rpc.user;class UserServerRequestHandler implements RequestHandler <BitToUserConnection > { private final UserWorker worker; @Override public void handle (BitToUserConnection connection, int rpcType, ByteBuf pBody, ByteBuf dBody, ResponseSender responseSender) { case RpcType.RUN_QUERY_VALUE: logger.debug("Received query to run. Returning query handle." ); try { final RunQuery query = RunQuery.PARSER.parseFrom(new ByteBufInputStream(pBody)); final QueryId queryId = worker.submitWork(connection, query); responseSender.send(new Response(RpcType.QUERY_HANDLE, queryId)); break ; } catch (InvalidProtocolBufferException e) { throw new RpcException("Failure while decoding RunQuery body." , e); } case RpcType.CANCEL_QUERY_VALUE: case RpcType.RESUME_PAUSED_QUERY_VALUE: case RpcType.GET_QUERY_PLAN_FRAGMENTS_VALUE: case RpcType.GET_CATALOGS_VALUE: case RpcType.GET_SCHEMAS_VALUE: case RpcType.GET_TABLES_VALUE: case RpcType.GET_COLUMNS_VALUE: case RpcType.CREATE_PREPARED_STATEMENT_VALUE: case RpcType.GET_SERVER_META_VALUE: default : throw new UnsupportedOperationException( String.format("UserServerRequestHandler received rpc of unknown type. Type was %d." , rpcType)); } }
第二步,解析出请求类型后,进行相应处理。比如是查询请求,则初始化Foreman开始处理。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 package org.apache.drill.exec.work.user;public class UserWorker { private final WorkerBee bee; public QueryId submitWork (UserClientConnection connection, RunQuery query) { final QueryId id = queryIdGenerator(); incrementer.increment(connection.getSession()); Foreman foreman = new Foreman(bee, bee.getContext(), connection, id, query); bee.addNewForeman(foreman); return id; } }
第三步,Foreman在驱动节点或根节点上,为单个查询管理所有的fragments(本地或远程)。主要流程为:当Foreman初始化时,查询处于准备状态。作为Runnable被提交进入队列排队,用于执行查询计划。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 package org.apache.drill.exec.work.foreman;public class Foreman implements Runnable { private final QueryManager queryManager; private final QueryStateProcessor queryStateProcessor; public Foreman (...) { this .queryManager = new QueryManager(queryId, queryRequest, drillbitContext.getStoreProvider(), drillbitContext.getClusterCoordinator(), this ); this .queryStateProcessor = new QueryStateProcessor(queryIdString, queryManager, drillbitContext, new ForemanResult()); } @Override public void run () { try { if (!drillbitContext.isForemanOnline()) { throw new ForemanException("Query submission failed since Foreman is shutting down." ); } } catch (ForemanException e) { logger.debug("Failure while submitting query" , e); queryStateProcessor.addToEventQueue(QueryState.FAILED, e); } queryStateProcessor.moveToState(QueryState.PLANNING, null ); try { switch (queryRequest.getType()) { case LOGICAL: parseAndRunLogicalPlan(queryRequest.getPlan()); break ; case PHYSICAL: parseAndRunPhysicalPlan(queryRequest.getPlan()); break ; case SQL: final String sql = queryRequest.getPlan(); logger.info("Query text for query with id {} issued by {}: {}" , queryIdString,queryContext.getQueryUserName(), sql); runSQL(sql); break ; case EXECUTION: runFragment(queryRequest.getFragmentsList()); break ; case PREPARED_STATEMENT: runPreparedStatement(queryRequest.getPreparedStatementHandle()); break ; default : throw new IllegalStateException(); } } catch (...) { } } }
4. 查询执行 查询请求进入Foreman后,针对SQL类查询,进入foreman#runSQL(sql)方法处理。如下代码所示:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 package org.apache.drill.exec.work.foreman;public class Foreman implements Runnable { private void runSQL (final String sql) throws ExecutionSetupException { final Pointer<String> textPlan = new Pointer<>(); final PhysicalPlan plan = DrillSqlWorker.getPlan(queryContext, sql, textPlan); runPhysicalPlan(plan, textPlan); } }
4.1 SQL转物理计划 第一步,根据SQL类型进行相应的处理。比如,SQL查询进入DefaultSqlHandler#.getPlan(sqlNode)。代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 package org.apache.drill.exec.planner.sql;public class DrillSqlWorker { public static PhysicalPlan getPlan (QueryContext context, String sql, Pointer<String> textPlan) throws ForemanSetupException { } private static PhysicalPlan convertPlan (QueryContext context, String sql, Pointer<String> textPlan) { } private static PhysicalPlan getQueryPlan (QueryContext context, String sql, Pointer<String> textPlan) throws ForemanSetupException, RelConversionException, IOException, ValidationException { final SqlConverter parser = new SqlConverter(context); final SqlNode sqlNode = checkAndApplyAutoLimit(parser, context, sql); final AbstractSqlHandler handler; final SqlHandlerConfig config = new SqlHandlerConfig(context, parser); switch (sqlNode.getKind()) { case EXPLAIN: case SET_OPTION: case DESCRIBE_TABLE: case DESCRIBE_SCHEMA: case CREATE_TABLE: case DROP_TABLE: case CREATE_VIEW: case DROP_VIEW: case OTHER_DDL: case OTHER: default : handler = new DefaultSqlHandler(config, textPlan); context.setSQLStatementType(SqlStatementType.OTHER); } return handler.getPlan(sqlNode); } }
第二步,DefaultSqlHandler#getPlan(sqlNode)实现了AST转物理计划的功能。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 package org.apache.drill.exec.planner.sql.handlers;public class DefaultSqlHandler extends AbstractSqlHandler { @Override public PhysicalPlan getPlan (SqlNode sqlNode) throws ValidationException, RelConversionException, IOException, ForemanSetupException { final ConvertedRelNode convertedRelNode = validateAndConvert(sqlNode); final RelDataType validatedRowType = convertedRelNode.getValidatedRowType(); final RelNode queryRelNode = convertedRelNode.getConvertedNode(); final DrillRel drel = convertToDrel(queryRelNode); final Prel prel = convertToPrel(drel, validatedRowType); logAndSetTextPlan("Drill Physical" , prel, logger); final PhysicalOperator pop = convertToPop(prel); final PhysicalPlan plan = convertToPlan(pop); log("Drill Plan" , plan, logger); return plan; } protected DrillRel convertToRawDrel (final RelNode relNode) throws SqlUnsupportedException { if (context.getOptions().getOption(ExecConstants.EARLY_LIMIT0_OPT) && context.getPlannerSettings().isTypeInferenceEnabled() && FindLimit0Visitor.containsLimit0(relNode)) { final DrillRel shorterPlan; if ((shorterPlan = FindLimit0Visitor.getDirectScanRelIfFullySchemaed(relNode)) != null ) { return shorterPlan; } if (FindHardDistributionScans.canForceSingleMode(relNode)) { context.getPlannerSettings().forceSingleMode(); } } try { final RelNode setOpTransposeNode = transform(PlannerType.HEP, PlannerPhase.PRE_LOGICAL_PLANNING, relNode); final RelNode pruned = transform(PlannerType.HEP_BOTTOM_UP, PlannerPhase.DIRECTORY_PRUNING, setOpTransposeNode); final RelTraitSet logicalTraits = pruned.getTraitSet().plus(DrillRel.DRILL_LOGICAL); final RelNode convertedRelNode; if (!context.getPlannerSettings().isHepOptEnabled()) { convertedRelNode = transform(PlannerType.VOLCANO, PlannerPhase.LOGICAL_PRUNE_AND_JOIN, pruned, logicalTraits); } else { final RelNode intermediateNode2; final RelNode intermediateNode3; if (context.getPlannerSettings().isHepPartitionPruningEnabled()) { final RelNode intermediateNode = transform(PlannerType.VOLCANO, PlannerPhase.LOGICAL, pruned, logicalTraits); final RelNode transitiveClosureNode = transform(PlannerType.HEP, PlannerPhase.TRANSITIVE_CLOSURE, intermediateNode); intermediateNode2 = transform(PlannerType.HEP_BOTTOM_UP, PlannerPhase.PARTITION_PRUNING, transitiveClosureNode); } else { final RelNode intermediateNode = transform(PlannerType.VOLCANO, PlannerPhase.LOGICAL_PRUNE, pruned, logicalTraits); intermediateNode2 = transform(PlannerType.HEP, PlannerPhase.TRANSITIVE_CLOSURE, intermediateNode); } intermediateNode3 = transform(PlannerType.HEP_BOTTOM_UP, PlannerPhase.JOIN_PLANNING, intermediateNode2); if (context.getPlannerSettings().isRowKeyJoinConversionEnabled()) { convertedRelNode = transform(PlannerType.HEP_BOTTOM_UP, PlannerPhase.ROWKEYJOIN_CONVERSION, intermediateNode3); } else { convertedRelNode = intermediateNode3; } } return drillRel; } } catch (RelOptPlanner.CannotPlanException ex) { } } }
4.2 物理计划转执行树 首先,将优化后的物理计划转为多个Fragment,其次,将多个Fragment通过并行器parallelizer形成一个可跨节点执行的fragment树。主要代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 package org.apache.drill.exec.work.foreman;public class Foreman implements Runnable { private final QueryManager queryManager; private final QueryResourceManager queryRM; private final FragmentsRunner fragmentsRunner; public Foreman (...) { this .queryRM = drillbitContext.getResourceManager().newQueryRM(this ); } private void runPhysicalPlan (final PhysicalPlan plan, Pointer<String> textPlan) throws ExecutionSetupException { validatePlan(plan); queryRM.visitAbstractPlan(plan); final QueryWorkUnit work = getQueryWorkUnit(plan, queryRM); if (enableRuntimeFilter) { runtimeFilterRouter = new RuntimeFilterRouter(work, drillbitContext); runtimeFilterRouter.collectRuntimeFilterParallelAndControlInfo(); } if (textPlan != null ) { queryManager.setPlanText(textPlan.value); } queryRM.visitPhysicalPlan(work); queryRM.setCost(plan.totalCost()); queryManager.setTotalCost(plan.totalCost()); work.applyPlan(drillbitContext.getPlanReader()); logWorkUnit(work); fragmentsRunner.setFragmentsInfo(work.getFragments(), work.getRootFragment(), work.getRootOperator()); startQueryProcessing(); } private QueryWorkUnit getQueryWorkUnit (final PhysicalPlan plan, final QueryResourceManager rm) throws ExecutionSetupException { final PhysicalOperator rootOperator = plan.getSortedOperators(false ).iterator().next(); final Fragment rootFragment = rootOperator.accept(MakeFragmentsVisitor.INSTANCE, null ); return rm.getParallelizer(plan.getProperties().hasResourcePlan).generateWorkUnit(queryContext.getOptions().getOptionList(), queryContext.getCurrentEndpoint(), queryId, queryContext.getOnlineEndpoints(), rootFragment, initiatingClient.getSession(), queryContext.getQueryContextInfo()); } private void startQueryProcessing () { enqueue(); runFragments(); queryStateProcessor.moveToState(QueryState.RUNNING, null ); } }
4.3 执行树运行 (1) 物理执行计划转为执行树后,主要获得三个主要信息FragmentRoot
、PlanFragment
以及List<PlanFragment>
。fragments分发的执行逻辑在FragmentRunner中,代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 package org.apache.drill.exec.work.foreman;public class FragmentsRunner { private final WorkerBee bee; private List<PlanFragment> planFragments; private PlanFragment rootPlanFragment; private FragmentRoot rootOperator; public void submit () throws ExecutionSetupException { logger.debug("Submitting fragments to run." ); setupRootFragment(rootPlanFragment, rootOperator); setupNonRootFragments(planFragments); logger.debug("Fragments running." ); } private void setupNonRootFragments (final Collection<PlanFragment> fragments) throws ExecutionSetupException { if (fragments.isEmpty()) { return ; } final Multimap<DrillbitEndpoint, PlanFragment> leafFragmentMap = ArrayListMultimap.create(); final Multimap<DrillbitEndpoint, PlanFragment> intFragmentMap = ArrayListMultimap.create(); scheduleIntermediateFragments(intFragmentMap); injector.injectChecked(foreman.getQueryContext().getExecutionControls(), "send-fragments" , ForemanException.class ) ; for (final DrillbitEndpoint ep : leafFragmentMap.keySet()) { sendRemoteFragments(ep, leafFragmentMap.get(ep), null , null ); } } }
(2) fragment的接收通过ControlMessageHandler来处理,代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 package org.apache.drill.exec.work.batch;public class ControlMessageHandler implements RequestHandler <ControlConnection > { @Override public void handle (ControlConnection connection, int rpcType, ByteBuf pBody, ByteBuf dBody,ResponseSender sender) throws RpcException { switch (rpcType) { case RpcType.REQ_INITIALIZE_FRAGMENTS_VALUE: { final InitializeFragments fragments = get(pBody, InitializeFragments.PARSER); initializeFragment(fragments); sender.send(ControlRpcConfig.OK); break ; } default : throw new RpcException("Not yet supported." ); } } public Ack initializeFragment (InitializeFragments fragments) throws RpcException { final DrillbitContext drillbitContext = bee.getContext(); for (int i = 0 ; i < fragments.getFragmentCount(); i++) { startNewFragment(fragments.getFragment(i), drillbitContext); } return Acks.OK; } }
(3) leaf fragment通过FragmentExecutor执行,代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 package org.apache.drill.exec.work.fragment;public class FragmentExecutor implements Runnable { private volatile RootExec root; @Override public void run () { final FragmentRoot rootOperator = this .rootOperator != null ? this .rootOperator : fragmentContext.getPlanReader().readFragmentRoot(fragment.getFragmentJson()); root = ImplCreator.getExec(fragmentContext, rootOperator); if (root == null ) { return ; } updateState(FragmentState.RUNNING); final UserGroupInformation queryUserUgi = fragmentContext.isImpersonationEnabled() ? ImpersonationUtil.createProxyUgi(fragmentContext.getQueryUserName()) :ImpersonationUtil.getProcessUserUGI(); queryUserUgi.doAs(new PrivilegedExceptionAction<Void>() { @Override public Void run () throws Exception { injector.injectChecked(fragmentContext.getExecutionControls(), "fragment-execution" , IOException.class ) ; while (shouldContinue()) { for (FragmentHandle fragmentHandle; (fragmentHandle = receiverFinishedQueue.poll()) != null ;) { root.receivingFragmentFinished(fragmentHandle); } if (!root.next()) { break ; } } return null ; } }); } }
4.4 CodeGen 之所以需要代码生成,是为了提升CPU的执行效率。比如,我们生成的物理执行计划中会存在一些类似if或switch这样的判断逻辑,而在运行时条件的内容已经可知,我们可以直接去掉不必要的分支。另外,运行时可以了解到循环代码的具体循环次数,从而可以将循环展开,同样去除了分支判断逻辑。通过类似这样的优化可以消除分支预测,从而极大的提升性能。可以参考这个帖子 。
DRILL的算子支持批量的处理,通过代码模板和列式数据,去除无效的分支处理,平铺代码,重新生成充分利用cache和利于CPU计算的代码,并编译交给JVM运行。
比如,拿FilterRecordBatch中的filter算子举例,代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 package org.apache.drill.exec.physical.impl.filter;public class FilterRecordBatch extends AbstractSingleRecordBatch <Filter > { private SelectionVector2 sv2; private Filterer filter; public FilterRecordBatch (Filter pop, RecordBatch incoming, FragmentContext context) throws OutOfMemoryException { super (pop, context, incoming); } @Override protected boolean setupNewSchema () throws SchemaChangeException { if (sv2 != null ) { sv2.clear(); } switch (incoming.getSchema().getSelectionVectorMode()) { case NONE: if (sv2 == null ) { sv2 = new SelectionVector2(oContext.getAllocator()); } this .filter = generateSV2Filterer(); break ; case TWO_BYTE: sv2 = new SelectionVector2(oContext.getAllocator()); this .filter = generateSV2Filterer(); break ; case FOUR_BYTE: default : throw new UnsupportedOperationException(); } if (container.isSchemaChanged()) { container.buildSchema(SelectionVectorMode.TWO_BYTE); return true ; } return false ; } @Override protected IterOutcome doWork () { container.zeroVectors(); int recordCount = incoming.getRecordCount(); try { filter.filterBatch(recordCount); } catch (SchemaChangeException e) { throw new UnsupportedOperationException(e); } return getFinalOutcome(false ); } protected Filterer generateSV2Filterer () throws SchemaChangeException { final ErrorCollector collector = new ErrorCollectorImpl(); final List<TransferPair> transfers = Lists.newArrayList(); final ClassGenerator<Filterer> cg = CodeGenerator.getRoot(Filterer.TEMPLATE_DEFINITION2, context.getOptions()); final LogicalExpression expr = ExpressionTreeMaterializer.materialize(popConfig.getExpr(), incoming, collector, context.getFunctionRegistry(), false , unionTypeEnabled); if (collector.hasErrors()) { throw new SchemaChangeException(String.format("Failure while trying to materialize incoming schema. Errors:\n %s." , collector.toErrorString())); } cg.addExpr(new ReturnValueExpression(expr), ClassGenerator.BlkCreateMode.FALSE); for (final VectorWrapper<?> v : incoming) { final TransferPair pair = v.getValueVector().makeTransferPair(container.addOrGet(v.getField(), callBack)); transfers.add(pair); } try { final TransferPair[] tx = transfers.toArray(new TransferPair[transfers.size()]); CodeGenerator<Filterer> codeGen = cg.getCodeGenerator(); codeGen.plainJavaCapable(true ); final Filterer filter = context.getImplementationClass(codeGen); filter.setup(context, incoming, this , tx); return filter; } catch (ClassTransformationException | IOException e) { throw new SchemaChangeException("Failure while attempting to load generated class" , e); } } }
5. 参考资料