Contents

Implementing an Arrow-based SQL Query Engine with Rust - Part 3

TL;DR:为了学习查询引擎,最近从0在写 sql-query-engine-rs,它是一个用 Rust 写的 Arrow-based SQL Query Engine。本系列文章会详细介绍它的具体实现,会按照对应的 Roadmap 依次讲解,也可以 checkout 对应 tag 查看代码。Most of ideas inspired by risinglight and datafusion

这篇Part 3会引入优化器部分,说的优化器框架,可以先从 15721 的 Optimizer Implementation Slides 或 笔者的博客 了解基础概念。常见的数据库优化方式通过 Heuristic Optimizer + Cost Based Optimizer 结合,也就是我们常说的 RBO 和 CBO。而这篇主要介绍,如何引入启发式优化器框架。

目前有许多开源的 RBO 实现,笔者这里参考了 calcite HepPlanner, spark catalystdatafusion-dolomite 下面会依次介绍笔者参考的思路。

Roadmap v0.4 Heuristic Optimizer

Roadmap v0.4 主要涉及引入 Heuristic Optimizer 以及常用的 rules

Calcite HepPlanner

提到 RBO 必须要看一下 Calcite HepPlanner 的实现,由于引入了许多抽象和Java本身的表现形式,笔者认为它的代码阅读起来比较陡峭,需要debug才能帮助理解,可参考笔者之前debug记录

HepPlanner 的核心逻辑是,遍历一个 PlanTree 转换为的 DAG,对其中的每个节点执行 apply rules。其中会遇到的一些抽象概念:

  • HepInstruction:代表 planner 运行中的一个指令,它可以是设置 MatchOrder(TopDown/…),MatchLimit(applyRule最大次数),配置一个 Rule 或者 一组 Rules。
  • HepProgram:包裹 HepInstruction list,也就是一个指令链,会控制 HepProgram 内部可变的 state (MatchOrder, MatchLimit),达到控制不同的 rules 不同的 state 目的。

整个 HepPlanner 的入口是 findBestExp 函数,它内部会执行 executeProgram 来循环执行 instructions。正如上面提到的,可以执行多种 instruction 类型。而我们需要关注一个 rule 如何执行的过程。如下代码:

1
2
3
4
5
6
void executeRuleInstance(HepInstruction.RuleInstance instruction, HepInstruction.RuleInstance.State state) {
    if (state.programState.skippingGroup()) {
        return;
    }
    applyRules(state.programState, ImmutableList.of(instruction.rule), true);
}

其中核心部分是 HepPlanner#applyRules,它类似是一个多层循环,外层遍历 graph 中的每个 node, 内层对某一个 node apply 所有 rules。如果 rule 生效,会 reset graph iterator (因为 rule 改变的 graph node),重新执行外层循环。如下 applyRules代码:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
// fixed point 一个术语,代表 plan tree not changed after applying all rules. 即达到一个固定点,应该停止循环了。
// 具体到代码来说,有几个触发点,1.达到matchLimit 2.遍历rules不会有新的transform产生,即下面代码中的 continue
boolean fixedPoint;
do {
    Iterator<HepRelVertex> iter = getGraphIterator(programState, requireNonNull(root, "root"));
    fixedPoint = true;
    while (iter.hasNext()) {
        HepRelVertex vertex = iter.next();
        for (RelOptRule rule : rules) {
            HepRelVertex newVertex = applyRule(rule, vertex, forceConversions);
            if (newVertex == null || newVertex == vertex) {
                continue;
            }
            ++nMatches;
            if (nMatches >= programState.matchLimit) {
                return;
            }
            // ARBITRARY 或 DEPTH_FIRST 时,为 false,否则为,BOTTOM_UP 或 TOP_DOWN,代表每次 restart 都从 root 节点开始
            if (fullRestartAfterTransformation) {
                iter = getGraphIterator(programState, requireNonNull(root, "root"));
            } else {
                // To the extent possible, pick up where we left
                // off; have to create a new iterator because old
                // one was invalidated by transformation.
                iter = getGraphIterator(programState, newVertex);
                if (programState.matchOrder == HepMatchOrder.DEPTH_FIRST) {
                    nMatches = depthFirstApply(programState, iter, rules, forceConversions, nMatches);
                    if (nMatches >= programState.matchLimit) {
                        return;
                    }
                }
                // Remember to go around again since we're
                // skipping some stuff.
                fixedPoint = false;
            }
            break;
        }
    }
} while (!fixedPoint);

如下代码是具体 applyRule 的逻辑,其中:

  • matchOperands 用于 rule 匹配的 check,同时会记录 bindings(满足rule条件的节点),以便后续 fireRule 时使用
  • fireRule 具体执行 rule#onMatch 方法,会将 rule 转换的结果写入 HepRuleCall 的 result 中
  • applyTransformationResults,将 HepRuleCall 的 result 结果得出 best,如果 result 中包含多个结果,会比较 cost 选择最优的结果
    • 有了 bestRel 后,方法内会重新 reconstruction 整个 graph,包括维护 parents 和 children 节点

整体 applyRule 代码如下:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
private HepRelVertex applyRule(RelOptRule rule, HepRelVertex vertex, boolean forceConversions) {
    if (!graph.vertexSet().contains(vertex)) {
      return null;
    }
    RelTrait parentTrait = null;
    List<RelNode> parents = null;
    // ... 省略 ConverterRule, CommonRelSubExprRule 处理逻辑
    
    final List<RelNode> bindings = new ArrayList<>();
    final Map<RelNode, List<RelNode>> nodeChildren = new HashMap<>();
    boolean match = matchOperands(rule.getOperand(), vertex.getCurrentRel(), bindings, nodeChildren);
    if (!match) {
      return null;
    }

    HepRuleCall call = new HepRuleCall(this, rule.getOperand(), bindings.toArray(new RelNode[0]), nodeChildren, parents);

    // Allow the rule to apply its own side-conditions.
    if (!rule.matches(call)) {
      return null;
    }

    fireRule(call);

    if (!call.getResults().isEmpty()) {
      return applyTransformationResults(vertex, call, parentTrait);
    }

    return null;
}

举一个 applyRule 的示例帮助理解,如下 SQL 与 FILTER_INTO_JOIN rule:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
String sql = """
    select name, role_name from t_users a left join t_roles b on a.role_id = b.id
    where a.id = 1
    """;
printOptimizeBeforeAfter(sql, CoreRules.FILTER_INTO_JOIN);

/*
Before:
LogicalProject(NAME=[$1], ROLE_NAME=[$4])
  LogicalFilter(condition=[=($0, 1)])
    LogicalJoin(condition=[=($2, $3)], joinType=[left])
      LogicalTableScan(table=[[t_users]])
      LogicalTableScan(table=[[t_roles]])

After:
LogicalProject(NAME=[$1], ROLE_NAME=[$4])
  LogicalJoin(condition=[=($2, $3)], joinType=[left])
    LogicalFilter(condition=[=($0, 1)])
      LogicalTableScan(table=[[t_users]])
    LogicalTableScan(table=[[t_roles]])
 */

printOptimizeBeforeAfter 是构造 HepPlanner 的 util 方法,并将 rule 传入到 optimizer中。对应上面的步骤:

  • matchOperands:正如上面的 plan tree,rule.Operand 去 top-down 匹配每一个 node 时,发现 LogicalFilter 满足 rule 条件,从而将匹配到的 nodes 放入 bindings中,这里是 [LogicalFilter, LogicalJoin]
  • fireRule:构造 HepRuleCall,并执行 rule onMatch(call) 方法,进行节点的转换,并将结果保存在 call results 中,这里是将 Filter{Join{Scan, Scan}} 转换为 Join{Filter{Scan}, Scan}
  • applyTransformationResults:将 result apply 到 graph 中,并维护 parents 和 children 节点

最后一步是 buildFinalPlan 方法,遍历 graph 重建 plan tree,至此 HepPlanner 优化阶段完毕。

Spark Catalyst

spark catalyst 也包含了 RBO 优化,相比 calcite 的 HepPlanner,Catalyst 阅读起来比较直观,得益于 Scala 的 partial functionpattern matching,使得 rule 实现更加简单。

开始前可以先从这个 Spark SQL’s Catalyst optimizer demo 了解 Catalyst 大量利用到的 Scala FP 特性。如下代码展示了 rule 的定义:

其中 tree.transformUp 包裹的部分就是一个 partial function,用来匹配 Add 节点。同时,Add(n, Add(m, next)) 用到了 pattern matching,匹配(n + (m + next))这种形式的 Add plan node。

同时,注意 TreeNode.transformUp 方法,它是一个递归函数,从最底层从 child node 依次往上 apply rule,直到 root node。对比,calcite 的 遍历 graph node 执行 rule matching 过程,本质来说是一致的。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
object MergeAdds extends Rule[Instruction] {
  override def apply(tree: Instruction) =
    tree.transformUp {
      case Add(n, Add(m, next)) => if (n + m == 0) next else Add(n + m, next)
    }
}

// TreeNode 的方法
def transformUp(rule: PartialFunction[BaseType, BaseType]): BaseType = {
  val childrenTransformed = transformChildrenUp(rule)
  if (this same childrenTransformed) rule.applyOrElse(this, identity[BaseType])
  else rule.applyOrElse(childrenTransformed, identity[BaseType])
}

private def transformChildrenUp(rule: PartialFunction[BaseType, BaseType]): BaseType =
  this.withChildren(children.map(_ transformUp rule))

具体举例观察 rule 匹配的过程,类似上面提到的 FILTER_INTO_JOIN rule,对应到 spark 中的 PushPredicateThroughJoin rule,如下测试所在 spark 文件,这里给 originalQuery 多加了 select node 来观察递归匹配:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
// 只配置 PushPredicateThroughJoin rule
test("joins: push to either side") {
  val x = testRelation.subquery("x")
  val y = testRelation.subquery("y")

  val originalQuery = {
    x.join(y)
      .where("x.b".attr === 1)
      .where("y.b".attr === 2)
  }.select("x.b")

  val optimized = Optimize.execute(originalQuery.analyze)
  val left = testRelation.where($"b" === 1)
  val right = testRelation.where($"b" === 2)
  val correctAnswer =
    left.join(right).analyze

  comparePlans(optimized, correctAnswer)
}

object PushPredicateThroughJoin extends Rule[LogicalPlan] with PredicateHelper {
  def apply(plan: LogicalPlan): LogicalPlan = plan transform applyLocally
  
  val applyLocally: PartialFunction[LogicalPlan, LogicalPlan] = {
    // push the where condition down into join filter
    case f @ Filter(filterCondition, Join(left, right, joinType, joinCondition, hint)) 
      if canPushThrough(joinType) => // ...
    // push down the join filter into sub query scanning if applicable
    case j @ Join(left, right, joinType, joinCondition, hint) 
      if canPushThrough(joinType) => // ...
}

// 最初输入的 plan tree
'Project [unresolvedalias(x.b, None)]
+- 'Filter ('y.b = 2)
   +- 'Filter ('x.b = 1)
      +- 'Join Inner
         :- SubqueryAlias x
         :  +- LocalRelation <empty>, [a#0, b#1, c#2]
         +- SubqueryAlias y
            +- LocalRelation <empty>, [a#0, b#1, c#2]

// 第一次匹配到的 plan tree
Filter (b#1 = 1)
+- Join Inner
   :- LocalRelation <empty>, [a#0, b#1, c#2]
   +- LocalRelation <empty>, [a#10, b#11, c#12]
// 第二次匹配到的 plan tree
Filter (b#11 = 2)
+- Join Inner
   :- Filter (b#1 = 1)
   :  +- LocalRelation <empty>, [a#0, b#1, c#2]
   +- LocalRelation <empty>, [a#10, b#11, c#12]
// 第三次次匹配到的 plan tree
Join Inner
:- Filter (b#1 = 1)
:  +- LocalRelation <empty>, [a#0, b#1, c#2]
+- Filter (b#11 = 2)
   +- LocalRelation <empty>, [a#10, b#11, c#12]

// apply 完毕的 plan tree
Project [x.b AS x.b#13]
+- Join Inner
   :- Filter (b#1 = 1)
   :  +- LocalRelation <empty>, [a#0, b#1, c#2]
   +- Filter (b#11 = 2)
      +- LocalRelation <empty>, [a#10, b#11, c#12]

总结一下上面的流程,catalyst 匹配 rule 的思路和 calcite 类似,遍历 plan tree 的每个 node,依次尝试匹配。但是 catalyst 是直接在原 LogicalPlan 上直接更新,而 calcite 通过将 apply 结果写入到 result 中,再进行选择。

笔者这里粗浅认为,catalyst 在 one-to-one 的 transformation rules 中 和 calcite 类似,但涉及到 exploration rules 时候,通过 graph 的组织方式,对多个组合进行筛选,看起来思路更加清晰,符合直觉。

除了 Rule 的定义与匹配,下一步就需要如何组织起它们,因此 Catalyst 中引入了一些概念:

  • Tree:由 nodes 组成的数据结构,node 定义为 Scala subclass。比如,x+1 表达式代表的 Tree 为:Add(Attribute(x), Literal(1)),其中每个 node 都是一个 subclass
  • Rules:functions that transform trees。由于每个 node 都是 subclass,利用 pattern matching 来匹配 rule 也很方便。同时,支持 TreeNode.transformDown 和 TreeNode.transformUp,来选择匹配的方向
  • Batches:一组 rules,包含相同的 strategy(max_iterations),有 Once 和 FixedPoint(Apply repeatedly until the tree dosen't change) 两种类型
  • RuleExecutor:批量执行一组 batches

如下是 RuleExecutor 的核心实现与注释:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
/**
 * Executes the batches of rules defined by the subclass. The batches are executed serially
 * using the defined execution strategy. Within each batch, rules are also executed serially.
 */
def execute(plan: TreeType): TreeType = {
  var curPlan = plan
  batches.foreach { batch =>
    var iteration = 1
    var lastPlan = curPlan
    var continue = true

    // Run until fix point (or the max number of iterations as specified in the strategy.
    while (continue) {
      // 遍历 rules 依次 apply, result 作为下次 input
      curPlan = batch.rules.foldLeft(curPlan) {
        case (plan, rule) =>
          // apply rule 返回结果
          val result = rule(plan)
          result
      }

      // 达到当前 batch maxIterations,break 执行下个 batch
      iteration += 1
      if (iteration > batch.strategy.maxIterations) {
        continue = false
      }

      // 当前 batch rules 执行完后,plan tree not changed,代表达到 fixed point,当前 batch 无需循环执行了
      if (curPlan.fastEquals(lastPlan)) {
        logTrace(
          s"Fixed point reached for batch ${batch.name} after ${iteration - 1} iterations.")
        continue = false
      }

      // track 当前 batch 的优化结果,用于上面的 fixed point 判断
      lastPlan = curPlan
    }
  }
  curPlan
}

这里的 batch 概念和 executor 核心逻辑我们在 rust 实现中可做了借鉴。

Rust 实现

有了上面的两种实现,需要构思如果在 sql-query-engine-rs 上实现一个 HepOptimizer 了。

rules 匹配

首先是 rule 的匹配,立马想到了 rust 的 pattern matching,但是 sql-query-engine-rs 中的 plan tree node 都是 Arc 包裹的 trait object。如果对 nested node 做匹配需要 deref 会很麻烦如下代码:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
enum Direction {
    Left,
    Right,
    Nested(Arc<Direction>),
}

fn move_direction(direction: Direction) {
    match direction {
        Direction::Left => println!("Move Left"),
        Direction::Right => println!("Move Right"),
        Direction::Nested(d) => match *d {
            Direction::Left => todo!(),
            Direction::Right => todo!(),
            Direction::Nested(_) => todo!(),
        },
    }
}

因此,这里参考了 calcite 和 datafusion-dolomite 中类似 pattern 的结构,去手动构造一个 match tree,其中的 predicate 是校验 node 的函数,如下:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
/// The pattern tree to match a plan tree. It defined in `Rule` and used in `PatternMatcher`.
pub struct Pattern {
    /// The root node predicate, not contains the children.
    pub predicate: fn(&PlanRef) -> bool,
    /// The children's predicate of current node.
    pub children: PatternChildrenPredicate,
}

pub enum PatternChildrenPredicate {
    /// All children and their children are matched and will be collected as
    /// `OptExprNode::PlanRef`. Currently used in one-time-applied rule.
    MatchedRecursive,
    /// All children will be evaluated in `PatternMatcher`, if pattern match, node will collected
    /// as `OptExprNode::PlanRef`. if vec is empty, it means no children are matched and
    /// collected.
    Predicate(Vec<Pattern>),
    /// We don't care the children, and them will be collected as existing nodes
    /// `OptExprNode::OptExpr` in OptExpr tree.
    None,
}

有了 rule pattern 后,就需要考虑如何利用它去匹配 plan node。首先,我们采用了类似 Calcite 的实现,将 PlanTree 转换为 graph 后遍历 nodes。

当 rule pattern 匹配成功时,返回匹配到的 sub-plan tree。注意,这里的 sub-plan tree 是新的结构 OptExpr,因为它的 node 可能是 rule pattern 需要的 PlanNode(包含了graph中的新children) 或者 rule pattern 不需要的 PlanNode (只保存 graph node id 用于重建 graph)。

因此,对于具体一个 rule 的实现逻辑,它只需要关注 pattern 中定义的 nodes 如何进行 transform 即可,剩余的 nodes 作为 children 直接构造在新的 sub-plan tree 中即可。具体可以参考 optimizer/core 的定义代码。

rules 执行

笔者参考的是 Catalyst 的 batch 组织方式与执行 rules 的方式,因为它使用起来比较直观,而不是 Calcite 的 HepProgram 的方式。Batch 的定义包含了一组 Rules,其中的 Strategy 定义 max_iteration 和 match_order。

编排执行 rules 的时候需要注意 fixed_point 和 max_iterations 的判断,如下代码,整理思路,与 catalyst 类似:

  • 最外层遍历 batches 循环,控制着这个 batch 的 fixed_point 和 max_iteration 的判断,决定何时跳出这个 batch
  • 下一层是遍历 batch.rules 循环,对 graph 中的每个 node 都会尝试 apply_rule(如果有一个 rule apply 成功,直接退出当前 graph 遍历,继续遍历后续的 rules。保证了一个 Rule 在一次 iteration 中只会 apply 一次)
  • 最内层是 apply_rule 逻辑,通过 HepMatcher 匹配并返回匹配结果,即使匹配成功,substitute 为空也代表 rule 没有成功 apply
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
pub fn find_best(&mut self) -> PlanRef {
    let batches = self.batches.clone().into_iter();
    for batch in batches {
        let mut iteration = 1_usize;
        // fixed_point means plan tree not changed after applying all rules.
        let mut fixed_point = false;
        // run until fix point or reach the max number of iterations as specified in the strategy.
        while !fixed_point {
            println!("-----------------------------------------------------");
            println!("Start Batch: {}, iteration: {}", batch.name, iteration);

            fixed_point = self.apply_batch(&batch);

            // max_iteration check priority is higher than fixed_point.
            iteration += 1;
            if iteration > batch.strategy.max_iteration {
                println!("Max iteration {} reached for batch: {}", iteration - 1, batch.name);
                break;
            }

            // if the plan tree not changed after applying all rules,
            // it reaches fix point, should stop.
            if fixed_point {
                println!("Fixed point reached for batch: {}, after {} iterations", batch.name, iteration - 1);
                break;
            }
        }
    }
    self.graph.to_plan()
}

pub fn apply_batch(&mut self, batch: &HepBatch) -> bool {
    let original_plan = self.graph.to_plan();

    // for each rule will apply each node in graph.
    for rule in batch.rules.iter() {
        for node_id in self.graph.nodes_iter(batch.strategy.match_order) {
            if !self.apply_rule(rule.clone(), node_id) {
                // not matched, will try next rule
                continue;
            }

            println!("After apply plan tree:\n{}", pretty_plan_tree_string(&*self.graph.to_plan()));

            // if the rule is applied, continue to try next rule in batch,
            // max_iteration only controls the iteration num of a batch.
            println!("Try next rule in batch ...");
            break;
        }
    }

    // Compare the two plan trees, if they are the same, it means the plan tree not changed
    let new_plan = self.graph.to_plan();
    let reach_fixed_point = original_plan == new_plan;
    println!("Batch: {} finished, reach_fixed_point: {}", batch.name, reach_fixed_point);
    reach_fixed_point
}

/// return true if the rule is applied which means the rule matched and the plan tree changed.
fn apply_rule(&mut self, rule: RuleImpl, node_id: HepNodeId) -> bool {
    let matcher = HepMatcher::new(rule.pattern(), node_id, &self.graph);

    if let Some(opt_expr) = matcher.match_opt_expr() {
        let mut substitute = Substitute::default();
        let opt_expr_root = opt_expr.root.clone();
        rule.apply(opt_expr, &mut substitute);

        if !substitute.opt_exprs.is_empty() {
            assert!(substitute.opt_exprs.len() == 1);
            self.graph.replace_node(node_id, substitute.opt_exprs[0].clone());
            println!("Apply {:?} at node {:?}: {:?}", rule, node_id, opt_expr_root);
            return true;
        }
        println!("Skip {:?} at node {:?}", rule, node_id);
        false
    } else {
        println!("Skip {:?} at node {:?}", rule, node_id);
        false
    }
}

支持 sql-query-engine-rs 的 heuristic optimizer 基本框架完成,剩下的核心就是添加各种 optimization rules 了。

Rules 实现

优化规则,之前听过各种术语:ColumnPruning, PushDownPredicates, LimitPushDown, Constant folding 等。但缺乏一个整体的归纳认知,笔者也是主要参考 Spark Catalyst Rules 的实现。

下面会依次介绍 sql-query-engine-rs 目前已支持的语法,所可以应用的 rules。还有些常见的优化,目前还未实现对应算子,后续会慢慢加入,比如 distinct 和 subquery 等。

Predicates PushDown

谓词下推是一个常见的 rewrite rule,将 filter 尽可能下推的最下层节点,来提前减少返回的数据量。比如,where/having 条件下推到单表中,subquery情况下的外层 query block 的 predicate 下推到内存的 query block 中。

PushPredicateThroughJoin

首先参考 catalyst PushPredicateThroughJoin,它是将 predicate 尽可能下推到 join 算子内部。写代码之前,先明确如下几种下推情况。

根据 join type 的类型不同,可能有如下几种情况:

  • filter 下推到 left child node 上层
  • filter 下推到 right child node 上层
  • filter 下推到 join condition 中
  • filter 不能下推

其次在一个 join 语义下,将 filter 拆分为 leftFilter,rightFilter,commonFilter 它们的下推情况也不同:

  • left join:语义表明 join 结果数据中 left 边是全量数据,所以 leftFilter 下推到 left 边后,与下推前对结果影响相同,都相当于在全量数据上做 filter。但是,对于 right 边,如果下推了 filter,则本来是对 join 结果数据做 filter,变成了只对 right 边先做 filter,这必然会导致了数据量变大,因为未匹配到的 left 数据仍会保留下来。如下图:
/images/sql-query-engine-rs/push-predicate-through-join-left-join-filter.png
push-predicate-through-join-left-join-filter
  • right join:与 left join 下推逻辑类似,只能下推 rightFilter 到 right 边,leftFilter + commonFilter 不能下推。
  • inner join:它比较特殊,leftFilter 和 rightFilter 都可以下推,commonFilter (t1.a>t2.a) 不能下推。
  • full join:语义要求,保持两边数据全量,因此不能下推任何 filter。

至此,基本逻辑弄清晰了,代码就好实现了。首先是,定义该 rule 的 pattern,即 filter + join 的组合:

1
2
3
4
5
6
7
Pattern {
  predicate: |p| p.node_type() == PlanNodeType::LogicalFilter,
  children: PatternChildrenPredicate::Predicate(vec![Pattern {
      predicate: |p| p.node_type() == PlanNodeType::LogicalJoin,
      children: PatternChildrenPredicate::None,
  }]),
}

其次,我们上面提到的 leftFilter,rightFilter,commonFilter 是根据 join left/right output_columns 将 filter 拆分而来的,如果 filter 中的 columns 是 left schema 的子集,则是 leftFilter,同理,推断出 rightFilter,最后留下的是 commonFilter(同时包含了left和right的column:t1.a>t2.a)。

具体 apply 逻辑和测试用例,可以查看 rules/pushdown_predicates.rs 文件。最终达到的效果如下:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
select t1.* from t1 inner join t2 on t1.a=t2.b where t2.a>2 and t1.a>1;

-- 优化前
LogicalProject: exprs [t1.a:Int64, t1.b:Int64, t1.c:Int64]
  LogicalFilter: expr t2.a:Int64 > Cast(2 as Int64) AND t1.a:Int64 > Cast(1 as Int64)
    LogicalJoin: type Inner, cond On { on: [(t1.a:Int64, t2.b:Int64)], filter: None }
      LogicalTableScan: table: #t1, columns: [a, b, c]
      LogicalTableScan: table: #t2, columns: [a, b, c]

-- 优化后 (only apply PushPredicateThroughJoin rule)
LogicalProject: exprs [t1.a:Int64, t1.b:Int64, t1.c:Int64]
  LogicalJoin: type Inner, cond On { on: [(t1.a:Int64, t2.b:Int64)], filter: None }
    LogicalFilter: expr t1.a:Int64 > Cast(1 as Int64)
      LogicalTableScan: table: #t1, columns: [a, b, c]
    LogicalFilter: expr t2.a:Int64 > Cast(2 as Int64)
      LogicalTableScan: table: #t2, columns: [a, b, c]

Limit PushDown

limit pushdown 也是常见的优化,尽可能减少每个算子处理的数据量,但是它的遇到包含 order column 的 plan 时,从语义上来说,是不能下推的。如果满足条件的 plan,下推 limit 尽可能到最下层的算子,比如是否可以到 TableScan 层。

基于这样的思路,参考了 Catalyst LimitPushDown 和 Calcite SORT 相关 rules,实现了下面几种 rules。

LimitProjectTranspose

这个 rule 比较直接,将 limit 下推到 project 下层。如下:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
select a from t1 offset 2 limit 1

-- 优化前
LogicalLimit: limit Some(1), offset Some(2)
  LogicalProject: exprs [t1.a:Nullable(Int32)]
    LogicalTableScan: table: #t1, columns: [a, b, c]

-- 优化后 (only apply LimitProjectTranspose rule)
LogicalProject: exprs [t1.a:Nullable(Int32)]
  LogicalLimit: limit Some(1), offset Some(2)
    LogicalTableScan: table: #t1, columns: [a, b, c]
PushLimitIntoTableScan

这个 rule 是将 limit 下推到存储层,直接减少扫描的数据,需要相应的 storage 支持。如下:

1
2
3
4
5
6
7
8
9
select a from t1 offset 2 limit 1

-- 优化前
LogicalLimit: limit Some(1), offset Some(2)
  LogicalProject: exprs [t1.a:Nullable(Int32)]
    LogicalTableScan: table: #t1, columns: [a, b, c]

-- 优化后 (apply LimitProjectTranspose, PushLimitIntoTableScan rules)
LogicalTableScan: table: #t1, columns: [a, b, c], bounds: (offset:2,limit:1)
PushLimitThroughJoin

参考 catalyst LimitPushDown,将 limit 下推到 join 下层。

首先关注的是存在 offset 的情况。比如,对于 offset 1 limit 1 来说,下层需要扫描的行数是2,才能满足 offset 1 后再 limit 1 的情况。

其次,对于不同的 join type,limit 下推的边也不同。left join 时,limit 下推到 left 边。同理 right join。

但是对于 inner join 来说,如果 join condition 不为空,从语义上来说 limit 也无法下推。最终效果如下:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
select t1.a from t1 left join t2 on t1.a=t2.b offset 1 limit 1

-- 优化前
LogicalLimit: limit Some(1), offset Some(1)
  LogicalProject: exprs [t1.a:Nullable(Int32)]
    LogicalJoin: type Left, cond On { on: [(t1.a:Nullable(Int32), t2.b:Nullable(Int32))], filter: None }
      LogicalTableScan: table: #t1, columns: [a, b, c]
      LogicalTableScan: table: #t2, columns: [a, b, c]

-- 优化后 (apply LimitProjectTranspose, PushLimitThroughJoin, EliminateLimits rules)
LogicalProject: exprs [t1.a:Nullable(Int32)]
  LogicalLimit: limit Some(1), offset Some(1)
    LogicalJoin: type Left, cond On { on: [(t1.a:Nullable(Int32), t2.b:Nullable(Int32))], filter: None }
      LogicalLimit: limit Some(2), offset None
        LogicalTableScan: table: #t1, columns: [a, b, c]
      LogicalTableScan: table: #t2, columns: [a, b, c]
EliminateLimits

这个 rule 的存在目的,是为了消除一些 rule 带来的 extra limit 的副作用,比如上面的 PushLimitThroughJoin,每次遇到 limit + join 的情况,都会尝试 push extra limit,产生了重复的算子。因此,通过 apply 这个 role 消除了重复的2个 limit。

Column Pruning

列裁剪也是一个重要的优化环节,减少扫描的数据量和算子处理的数据量,尽可能只保留所有算子会用到的列。

PushProjectThroughChild

首先,第一步就是将 Project 尽可能下推。前提是:project 下层算子的 child 的 output_columns 不是 required cols 的子集,则代表当前有可以裁剪的列。

接着,对 project 的 child 的 children 再依次判断 output_columns 是否 required cols 的子集,如果不是,则在这个 child 之上 新增一个 Project 算子。具体的优化效果如下:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
select t1.a, t2.b from t1 left join t2 on t1.a = t2.a where t2.b > 1

-- 优化前
LogicalProject: exprs [t1.a:Nullable(Int32), t2.b:Nullable(Int32)]
  LogicalFilter: expr t2.b:Nullable(Int32) > 1
    LogicalJoin: type Left, cond On { on: [(t1.a:Nullable(Int32), t2.a:Nullable(Int32))], filter: None }
      LogicalTableScan: table: #t1, columns: [a, b, c]
      LogicalTableScan: table: #t2, columns: [a, b, c]

-- 优化后 (apply PushProjectThroughChild rule)
LogicalProject: exprs [t1.a:Nullable(Int32), t2.b:Nullable(Int32)]
  LogicalFilter: expr t2.b:Nullable(Int32) > 1
    LogicalProject: exprs [t1.a:Nullable(Int32), t2.b:Nullable(Int32)]
      LogicalJoin: type Left, cond On { on: [(t1.a:Nullable(Int32), t2.a:Nullable(Int32))], filter: None }
        LogicalProject: exprs [t1.a:Nullable(Int32)]
          LogicalTableScan: table: #t1, columns: [a, b, c]
        LogicalProject: exprs [t2.a:Nullable(Int32), t2.b:Nullable(Int32)]
          LogicalTableScan: table: #t2, columns: [a, b, c]
PushProjectIntoTableScan

project 下推的最终点是 table scan,来达到扫描更少的列的需求。因此,这个 rule 就很直接,将 project 中的 columns 传递给 table scan。还是上面的例子:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
select t1.a, t2.b from t1 left join t2 on t1.a = t2.a where t2.b > 1

-- 优化前
LogicalProject: exprs [t1.a:Nullable(Int32), t2.b:Nullable(Int32)]
  LogicalFilter: expr t2.b:Nullable(Int32) > 1
    LogicalJoin: type Left, cond On { on: [(t1.a:Nullable(Int32), t2.a:Nullable(Int32))], filter: None }
      LogicalTableScan: table: #t1, columns: [a, b, c]
      LogicalTableScan: table: #t2, columns: [a, b, c]

-- 优化后 (apply PushProjectThroughChild, PushProjectIntoTableScan rules)
LogicalProject: exprs [t1.a:Nullable(Int32), t2.b:Nullable(Int32)]
  LogicalFilter: expr t2.b:Nullable(Int32) > 1
    LogicalProject: exprs [t1.a:Nullable(Int32), t2.b:Nullable(Int32)]
      LogicalJoin: type Left, cond On { on: [(t1.a:Nullable(Int32), t2.a:Nullable(Int32))], filter: None }
        LogicalTableScan: table: #t1, columns: [a]
        LogicalTableScan: table: #t2, columns: [a, b]
RemoveNoopOperators

plan tree 中存在相邻 project ,project 相邻 aggregate 的情况,同时相邻的这两个算子的最终的输出表达式是完全一致的。这时,就可以省略一个多余的 project。如下示例:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
select sum(b) from t1 where a > 1

-- 优化前
LogicalProject: exprs [Sum(t1.b:Nullable(Int32)):Int32]
  LogicalAgg: agg_funcs [Sum(t1.b:Nullable(Int32)):Int32] group_by []
    LogicalFilter: expr t1.a:Nullable(Int32) > 1
      LogicalTableScan: table: #t1, columns: [a, b, c]

-- 优化后 (apply PushProjectThroughChild, PushProjectIntoTableScan, RemoveNoopOperators rules)
LogicalAgg: agg_funcs [Sum(t1.b:Nullable(Int32)):Int32] group_by []
  LogicalProject: exprs [t1.b:Nullable(Int32)]
    LogicalFilter: expr t1.a:Nullable(Int32) > 1
      LogicalTableScan: table: #t1, columns: [a, b]

Rules 组合

至此,我们有了最基础的常见规则,下一步需要组合起它们应用到优化器中。这里,为了逻辑清晰,笔者将一个 domain 下的 rules 都放在一个 batch 中去循环执行,直到 fixed_point 或 max_iteration。

如下:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
let batches = vec![
  HepBatch::new(
      "Predicate pushdown".to_string(),
      HepBatchStrategy::fix_point_topdown(10),
      vec![PushPredicateThroughJoin::create()],
  ),
  HepBatch::new(
      "Limit pushdown".to_string(),
      HepBatchStrategy::fix_point_topdown(10),
      vec![
          LimitProjectTranspose::create(),
          PushLimitThroughJoin::create(),
          PushLimitIntoTableScan::create(),
          EliminateLimits::create(),
      ],
  ),
  HepBatch::new(
      "Column pruning".to_string(),
      HepBatchStrategy::fix_point_topdown(10),
      vec![
          PushProjectThroughChild::create(),
          PushProjectIntoTableScan::create(),
          RemoveNoopOperators::create(),
      ],
  ),
  HepBatch::new(
      "Rewrite physical plan".to_string(),
      HepBatchStrategy::once_topdown(),
      vec![PhysicalRewriteRule::create()],
  ),
];

同时,不同的 batch 它们的执行顺序,也会影响着 rule 的 matching 规则。比如,先执行 column pruning 会将 project 下推到下层算子,后执行 predicate pushdown 也会将 filter 下推到下层算子。但是 filter 会在 project 上层。这时 PushPredicateThroughJoin 就无法 match 上了。

因此,需要一个额外的 PushPredicateThroughNonJoin 规则来 transpose project 与 filter。

类似的情况,随着引入的算子增多,情况也会更多。这里,也可以参考 Catalyst 中 PushDownPredicates 和 ColumnPruning 的匹配情况。

Summary

至此,针对 sql-query-engine-rs 中已支持的 SQL 语法,有了基础版本的优化规则。下个 roadmap 考虑加入 subquery 的支持。

References