Hive Driver源码执行流程分析

Hive Driver源码执行流程分析

引言

接着上一篇来说[执行入口的分析][1],CliDriver最终将用户指令command提交给了Driverrun方法(针对常用查询语句而言),在这里用户的command将会被编译,优化并生成MapReduce任务进行执行。所以Driver也是Hive的核心,他扮演了一个将用户查询和MapReduce Task转换并执行的角色,下面我们就看看Hive是如何一步一步操作的。

源码分析

在说run方法之前,由于CliDriver需要得到一个Driver类的实例,所以首先看一下Driver的构造方法。Driver有三个构造函数,主要功能也就是设置类的实例变量HiveConfSessionState前文已经有介绍,SessionState返回了当前会话的一些信息,提取配置文件,初始化Driver实例。

1
2
3
4
5
public Driver() {
if (SessionState.get() != null) {
conf = SessionState.get().getConf();
}
}

run

下面就开始解析Driver内部对用户命令command的处理流程,首先是入口函数run. run函数通过调用runInternal方法处理用户指令,在处理完成runInternal之后,如果执行过程中出现出错,还附加了对错误码和错误信息的处理,此处省略。

1
2
3
4
5
6
7
8
9
10
public CommandProcessorResponse run(String command)
throws CommandNeedRetryException {
return run(command, false);
}

public CommandProcessorResponse run(String command, boolean alreadyCompiled)
throws CommandNeedRetryException {
CommandProcessorResponse cpr = runInternal(command, alreadyCompiled);
...
}

runInternal

runInternal方法包含的主要操作有,处理preRunHook(具体功能可以顾名思义哦),compileexecute, 处理postRunHook以及构造CommandProcessorResponse并返回。下面依次从代码的角度分析这几步的具体操作:

PreRunHook

处理preRunHook,首先根据配置文件和指令,构造用户Hook执行的上下文hookContext,然后读取用户PreRunHook配置指定的类(字符串), 此配置项对应于Hive配置文件当中的“hive.exec.driver.run.hooks”一项,利用反射机制Class.forName实例化PreRunHook类实例(getHook函数完成),依次执行各钩子的功能(preDriverRun函数完成)。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
HiveDriverRunHookContext hookContext
= new HiveDriverRunHookContextImpl(conf, command);
// Get all the driver run hooks and pre-execute them.
List<HiveDriverRunHook> driverRunHooks;
try{
driverRunHooks = getHooks(HiveConf.ConfVars.HIVE_DRIVER_RUN_HOOKS,
HiveDriverRunHook.class);
for (HiveDriverRunHook driverRunHook : driverRunHooks) {
driverRunHook.preDriverRun(hookContext);
}
}catch (Exception e) {
errorMessage = "FAILED: Hive Internal Error: " + Utilities.getNameMessage(e);
SQLState = ErrorMsg.findSQLState(e.getMessage());
downstreamError = e;
console.printError(errorMessage + "\n"
+ org.apache.hadoop.util.StringUtils.stringifyException(e));
return createProcessorResponse(12);
}

compile

编译,直接调用complieInternal函数编译用户指令,将指令翻译成MapReduce任务。这一个过程涉及的内容比较多,也很重要,后面将单独用一篇文章说明编译优化的过程。这里借用网上的一幅图,帮助对compile的功能有个整体的理解,参考文献: Hive实现原理.pdf。
编译流程

execute

在运行之前还有获取锁的操作,由于新版本添加了ACID事务的支持,还设置了事务管理器等,目前还没详细的弄懂这块的处理逻辑和功能,先放一下,主要看下execute函数执行了什么操作,也就是如何根据编译结果执行任务的。

首先是从编译得到的查询计划QueryPlan里获取基本的查询ID,查询字串等信息,并在回话状态中把当前查询字串和查询计划插入到历史记录中。

1
2
3
4
5
6
7
8
String queryId = plan.getQueryId();
String queryStr = plan.getQueryStr();

if (SessionState.get() != null) {
SessionState.get().getHiveHistory().startQuery(queryStr,
conf.getVar(HiveConf.ConfVars.HIVEQUERYID));
SessionState.get().getHiveHistory().logPlanProgress(plan);
}

PreRunHook类似,在执行任务之前,检查并执行用户设定的"hive.pre.exec.hooks",此处不再详述。完成这部操作之后,向控制台简单的打印一些信息之后,就开始正式执行任务了。

  • DriverContext

创建执行上下文DriverContext,它记录的信息主要包括可执行的任务队列(Queue runnable), 正在运行的任务队列(Queue running), 当前启动的任务数curJobNo, statsTasks(Map, what used for?)以及语义分析Semantic Analyzers依赖的Context对象等。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
DriverContext driverCxt = new DriverContext(ctx); driverCxt.prepare(plan);

public DriverContext(Context ctx) {
this.runnable = new ConcurrentLinkedQueue<Task<? extends Serializable>>();
this.running = new LinkedBlockingQueue<TaskRunner>();
this.ctx = ctx;
}

public void prepare(QueryPlan plan) {
// extract stats keys from StatsTask
List<Task<?>> rootTasks = plan.getRootTasks();
NodeUtils.iterateTask(rootTasks, StatsTask.class, new Function<StatsTask>()
{
public void apply(StatsTask statsTask) {
statsTasks.put(statsTask.getWork().getAggKey(), statsTask);
}
});
}

顺便提一下Context对象,在Context的源码注释当中提到, 每一个查询都要对应一个Context对象,不同查询之间Context对象是不可重用的, 执行完一个查询之后需要clear对应的Context对象(主要是语法分析用到的temp文件目录),在Hive的实现中也是这么做的。回顾上一篇文章,从CliDriver循环的读取用户指令,每读取到一条指令都要进行processLineprocessCmdprocessLocalCmd的处理,然后提交给Driver编译解析。Context对象是在compile函数中实例化的,也就说每一条查询都会创建一个Context对象,当执行完一条查询从Driver返回到processLocalCmd中时,都会调用Driver对象的close函数对Context进行清理(ctx.clear),这样就保证了一条查询对应一个Context对象。对于DriverContext对象也是类似,在execute函数中实例化,Driverclose函数中关闭(driverCtx.shutdown),和Context相比一个用来辅助语义分析,一个用来辅助任务执行。还有,我们发现在processCmd函数中通过CommandProcessorFactory设置了Driver类的实例对象,也就是每一条查询都需要一个Driver对象进行处理,那这些Driver对象之间是否可以共享呢?答案是肯定的,在CommandProcessorFactory中维持了一个HiveConfDriver的Map,每次获取Driver对象时都是根据conf对象来查找到的,如果不存在才重新创建一个Driver对象,而HiveConf对象又是在CliDriverrun方法中实例化的,与一个CliSessionState对应,所以Driver实例应该是与一个Cli的会话对应,同一个会话内部的查询共享一个Driver实例。

  • Manage and run all tasks

扯得有点远,继续看Driver对查询任务的执行,在实例化DriverContext对象之后,就将查询计划plan中的任务放入到DriverContextrunnable队列中。

1
2
3
4
for (Task<? extends Serializable> tsk : plan.getRootTasks()) {
assert tsk.getParentTasks() == null || tsk.getParentTasks().isEmpty();
driverCxt.addToRunnable(tsk);
}

下面就开始运行任务Task,整个任务的运行由一个循环控制,只要DriverContext没有被关闭,并且runnablerunning队列中还有任务就一直循环。为了方便描述,下文将一次对任务循环过程的每一步进行说明,这里只给出循环判断条件。

1
2
3
4
5
while (!destroyed && driverCxt.isRunning()) {}

public synchronized boolean isRunning() {
return !shutdown && (!running.isEmpty() || !runnable.isEmpty());
}

1. Put all the tasks into runnable queue

在循环内部,首先不停的从runnable队列中抽取队首的任务,然后launch该任务。

1
2
3
4
5
6
7
8
9
10
while (!destroyed && driverCxt.isRunning()) {
// Launch upto maxthreads tasks
Task<? extends Serializable> task;
while ((task = driverCxt.getRunnable(maxthreads)) != null) {
perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.TASK + task.getName() + "." + task.getId());
TaskRunner runner = launchTask(task, queryId, noName, jobname, jobs, driverCxt);
if (!runner.isRunning()) {
break;
}
}

2. Launch a task

在launch一个任务的过程中,根据任务类型(是不是MapReduceTask或者ConditialTask),做一些操作(don’t know what used for),将DriverContext当前已启动任务数curJobNo加1,然后根据配置文件conf,查询计划plan,执行上下文cxt(DriverContext),初始化一个任务,接着创建任务结果TaskResult对象和任务执行对象TaskRunner,将TaskRunner放入DriverContextrunning队列中,表示该任务正在运行。最后,根据配置文件指定的任务运行模式,即是否支持并行运行,启动任务。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
private TaskRunner launchTask(Task<? extends Serializable> tsk,
String queryId, boolean noName,
String jobname, int jobs, DriverContext cxt) throws HiveException {

if (SessionState.get() != null) {
SessionState.get().getHiveHistory().startTask(queryId, tsk,
tsk.getClass().getName());
}

if (tsk.isMapRedTask() && !(tsk instanceof ConditionalTask)) {
if (noName) {
conf.setVar(HiveConf.ConfVars.HADOOPJOBNAME, jobname + "(" +
tsk.getId() + ")");
}
conf.set("mapreduce.workflow.node.name", tsk.getId());
Utilities.setWorkflowAdjacencies(conf, plan);
cxt.incCurJobNo(1);
console.printInfo("Launching Job " + cxt.getCurJobNo() + " out of " + jobs);
}

tsk.initialize(conf, plan, cxt);
TaskResult tskRes = new TaskResult();
TaskRunner tskRun = new TaskRunner(tsk, tskRes);

cxt.launching(tskRun);
// Launch Task
if (HiveConf.getBoolVar(conf, HiveConf.ConfVars.EXECPARALLEL)
&& (tsk.isMapRedTask() || (tsk instanceof MoveTask))) {
// Launch it in the parallel mode, as a separate thread only for MR tasks
//并发执行
if (LOG.isInfoEnabled()){
LOG.info("Starting task [" + tsk + "] in parallel");
}
tskRun.setOperationLog(OperationLog.getCurrentOperationLog());
tskRun.start();
} else {
if (LOG.isInfoEnabled()){
LOG.info("Starting task [" + tsk + "] in serial mode");
}
//顺序执行
tskRun.runSequential();
}
return tskRun;
}

3. Poll a finished task

完成任务的启动之后,将调用DriverContextpollFinished函数,查看任务是否执行完毕,如果有任务完成,则将该任务出队,并将已完成的任务添加到钩子上下文HookContext中。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
TaskRunner tskRun = driverCxt.pollFinished();
if (tskRun == null) {
continue;
}

hookContext.addCompleteTask(tskRun);

public synchronized TaskRunner pollFinished() throws InterruptedException {
while (!shutdown) {
Iterator<TaskRunner> it = running.iterator();
while (it.hasNext()) {
TaskRunner runner = it.next();
if (runner != null && !runner.isRunning()) {
it.remove();
return runner;
}
}
wait(SLEEP_TIME);
}
return null;
}

4. Handle the finished task

针对一个已完成的任务,首先获取任务的结果对象TaskResult和退出状态, 如果任务非正常退出,则第一步先判断任务是否支持Retry,如果支持,关闭当前DriverContext,设置jobTracker为初始状态,抛出CommandNeedRetry异常,这个异常会在CliDriverprocessLocalCmd中捕获,然后尝试重新处理该命令,参见上一篇文章的说明。如果任务不支持Retry,则启动备份任务backupTask(类似于回滚?),并添加到runnable队列,在下次循环过程中执行。如果没有backupTask,则查找用户配置“hive.exec.failure.hooks”,根据用户配置相应出错处理,并关闭DriverContext, 返回退出码。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
Task<? extends Serializable> tsk = tskRun.getTask();
TaskResult result = tskRun.getTaskResult();

int exitVal = result.getExitVal();
if (exitVal != 0) {
if (tsk.ifRetryCmdWhenFail()) {
driverCxt.shutdown();
// in case we decided to run everything in local mode, restore the
// the jobtracker setting to its initial value
ctx.restoreOriginalTracker();
throw new CommandNeedRetryException();
}
Task<? extends Serializable> backupTask = tsk.getAndInitBackupTask();
if (backupTask != null) {
setErrorMsgAndDetail(exitVal, result.getTaskError(), tsk);
console.printError(errorMessage);
errorMessage = "ATTEMPT: Execute BackupTask: " + backupTask.getClass().getName();
console.printError(errorMessage);

// add backup task to runnable
if (DriverContext.isLaunchable(backupTask)) {
driverCxt.addToRunnable(backupTask);
}

continue;

} else {
hookContext.setHookType(HookContext.HookType.ON_FAILURE_HOOK);
// Get all the failure execution hooks and execute them.
for (Hook ofh : getHooks(HiveConf.ConfVars.ONFAILUREHOOKS)) {
perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.FAILURE_HOOK + ofh.getClass().getName());

((ExecuteWithHookContext) ofh).run(hookContext);

perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.FAILURE_HOOK + ofh.getClass().getName());
}
setErrorMsgAndDetail(exitVal, result.getTaskError(), tsk);
SQLState = "08S01";
console.printError(errorMessage);
driverCxt.shutdown();
// in case we decided to run everything in local mode, restore the
// the jobtracker setting to its initial value
ctx.restoreOriginalTracker();
return exitVal;
}
}

5. Find children tasks

最后调用DriverContextfinished函数,对完成的任务进行处理(处理逻辑没看懂), 然后判断当前任务是否包含子任务,如果包含则依次将子任务添加到runnable队列,下次循环中被启动执行。

1
2
3
4
5
6
7
8
9
driverCxt.finished(tskRun);

if (tsk.getChildTasks() !=
for (Task<? extends Serializable> child : tsk.getChildTasks()) {
if (DriverContext.isLaunchable(child)) {
driverCxt.addToRunnable(child);
}
}
}

6. Do something before return

当所有的任务都完成之后,如果发现DriverContext已经被关闭,表明任务取消,打印信息并返回对应的状态码。最后清楚任务执行中不完整的输出,并加载执行用户指定的"hive.exec.post.hooks",完成对应的钩子功能。对于执行过程中出现的异常,CommandNeedRetryException将会直接向上抛出,其他Exception,直接打印出错信息。无论是否发生异常,只要能够获取到任务执行过程中的MapReduce状态信息,都将在finally语句块中打印。(限于篇幅,此处只给出部分代码,钩子的处理方式前文已经给出不再详述,异常处理的部分,有兴趣的执行查看)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
//判断DriverContext是否被关闭
if (driverCxt.isShutdown()) {
SQLState = "HY008";
errorMessage = "FAILED: Operation cancelled";
console.printError(errorMessage);
return 1000;
}

//删除不完整的输出
HashSet<WriteEntity> remOutputs = new HashSet<WriteEntity>();
for (WriteEntity output : plan.getOutputs()) {
if (!output.isComplete()) {
remOutputs.add(output);
}
}

for (WriteEntity output : remOutputs) {
plan.getOutputs().remove(output);
}

最后的最后,如果所有的任务都正常执行完毕,此次查询完成,plan.setDone(),打印OK~

PostRunHook and return

还没完~当execute函数执行完成后,返回到runInternal函数中,接着释放锁,与之前的PreRunHook相对应,还需要加载相应用户自定义的PostRunHook(代码不再重复),最后才调用creatProcessorResponse,创建响应对象CommandProcessorResponse并返回。

1
2
3
private CommandProcessorResponse createProcessorResponse(int ret) {
return new CommandProcessorResponse(ret, errorMessage, SQLState, downstreamError);
}


  Hive

评论

Your browser is out-of-date!

Update your browser to view this website correctly. Update my browser now

×