WholeStageCodeGen Internal working
In our prev blog we discussed WSCG and problem this solves in volano iterator model. In this blog post we will discuss internal implementation of WSCG.
Given WSCG is slightly complex I will take a simple query plan and try to demonstrate working of WSCG.
Let’s take a query plan with just Scan, project and filter and its WSCG counterpart.
WholeStageCodeGenExec is physical operator of WSCG and here is its doExecute method:
override def doExecute(): RDD[InternalRow] = {
val (ctx, cleanedSource) = doCodeGen()
// try to compile and fallback if it failed
val (_, compiledCodeStats) = try {
CodeGenerator.compile(cleanedSource)
} catch {
return child.execute()
}
When this operator gets executed, it call doCodeGen() method which does actual code generation process. Then it tries to compile generated code otherwise fallback to normal excution. Now let’s dive into doCodeGen() process.
All opertor which supports codeGen implements CodegenSupport interface which exposes 2 abstract function which all operator needs to implement.
def doProduce(ctx: CodegenContext): String
def doConsume(ctx: CodegenContext, input: Seq[ExprCode], row: ExprCode): String
doProduce function produce source data. All leaf node(ScanExec) implements this to produce data. Other operator just call doProduce of childNode.
doConsume function is where processing logic resides. Producer push produced data to doConsume method and all intermediate node(filter, project etc) add processing logic in doConsume method.
Here is sample doProduce code from InputRDD:
override def doProduce(ctx: CodegenContext): String = {
// Inline mutable state since an InputRDDCodegen is used once in a task for WholeStageCodegen
val input = ctx.addMutableState("scala.collection.Iterator", "input", v => s"$v = inputs[0];",
forceInline = true)
s"""
| while ($limitNotReachedCond $input.hasNext()) {
| InternalRow $row = (InternalRow) $input.next(); // produce data
| ${consume(ctx, outputVars).trim} // call to Consume method
| ${shouldStopCheckCode}
| }
""".stripMargin
}
This produces code which loop through inputRows and pass each row to consume method of parent
Here is sample doConsume mehtod from FilterExec operator.
override def doConsume(ctx: CodegenContext, input: Seq[ExprCode], row: ExprCode): String = {
// generate condition for filter
val predicateCode = generatePredicateCode(
ctx, child.output, input, output, notNullPreds, otherPreds, notNullAttributes)
// codegen output
s"""
|do {
| $predicateCode // adds own filtering logic
| ${consume(ctx, resultVars)} // call to next consume method
|} while(false);
""".stripMargin
}
Filter operator get input row from its child scan node, applies filtering logic and pass row to its parent consume function.
Now let’s combine whole thing and understand how the finally generated code would look like:
This is code generated by scan node for sample query
(SELECT sid from emps WHERE age < 36)
Next let’s looks at code generated by filterNode:
Next consume function of project and WSCG would get called:
Blocking operator in WSCG:
A blocking operator(e.g. HashAggregate, Sort operator) breaks the pipeline, so there can be multiple pipeline in one WSCG node. These operator needs to know all rows from child node before they can produce any data, hence these are blocking operator.
doConsume(): Implement the callback to build intermediate result
doProduce():
- consume the entire output from upstream to finish building intermediate result
- Start a new loop and produce output for downstream based on intermediate result
Let’s see WSCG code for HashAggregate with a sample query:
SELECT age, count(*) FROM emps GROUP BY age
First doProduce() of hashAggregate would consume all rows from its child as shown from diagram below:
After intermediate result is generated(here hashMap) it starts a new loop which iterate over intermediate result and call consume method of its parent
Reference: