Contents

Implementing an Arrow-based SQL Query Engine with Rust - Part 2

TL;DR:为了学习查询引擎,最近从0在写 sql-query-engine-rs,它是一个用 Rust 写的 Arrow-based SQL Query Engine。本系列文章会详细介绍它的具体实现,会按照对应的 Roadmap 依次讲解,也可以 checkout 对应 tag 查看代码。Most of ideas inspired by risinglight and datafusion

这篇Part 2会引入更多算子,比如:limit、orderby 和 join。为后续优化器提供更多算子。

Roadmap v0.3

Roadmap v0.3 新增的算子有:

  • limit operator
  • order operator
  • join operator

它的 milestone 是支持这几种 SQL:

  • limit 和 order by:select id from employee order by id desc offset 2 limit 1
  • join:select employee.first_name, department.department_name from employee left join department on employee.department_id = department.id and department.id > 1

limit operator

和之前引入新算子的流程类似,流程会从:构建BoundSelect -> 定义Logical/Physical Limit Node -> planner构造LogicalPlanTree -> input_ref_rewriter重写BoundExpr -> physical_rewriter转换PhysicalPlanTree -> 定义LimitExecutor -> visit physical_limit_node时执行LimitExecutor -> 返回数据。如下图所示:

第一步构造 BoundSelect 的 limit 和 offset,它们都不需要引入新的 BoundExpr,因为它们的值一般情况下都为 ScalarValue,直接 bind 成 Constant 就可以了。

同时,对于一个 select 语句来说,limit 和 offset 都是 optional 的,因此整体的定义如下:

1
2
3
4
5
6
7
8
pub struct BoundSelect {
    pub select_list: Vec<BoundExpr>,
    pub from_table: Option<BoundTableRef>,
    pub where_clause: Option<BoundExpr>,
    pub group_by: Vec<BoundExpr>,
    pub limit: Option<BoundExpr>,
    pub offset: Option<BoundExpr>,
}

Optimizer 部分和之前的 operator 类似,就不在重复了,它的 Executor 逻辑也很直接,根据传入的 limit 和 offset 去计算从当前 batch 数据中,截取的 start 和 end index。

同时,采用test-case来编写参数化测试,来覆盖 limit 的各种 case。

最后,需要注意的地方是,LogicalLimit 这个算子在 LogicalPlanTree 中的位置,我们在不考虑优化器的情况下,从直觉上来看,对数据条数的限制,应该放在 PlanTree 的最顶层,即,它作为最后一个算子,等待所有下层算子计算完后,再进行数据截取

order operator

排序也是很常见的需求,表现为 order by 类型的子句。

首先,从 binder 层看,orderby 语句后面跟的 expr 不会引入新的 BoundExpr 类型。因此,我们将它看做一个 SubStatement 定义在 BoundSelect 中。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
#[derive(Debug)]
pub struct BoundSelect {
    pub select_list: Vec<BoundExpr>,
    pub from_table: Option<BoundTableRef>,
    pub where_clause: Option<BoundExpr>,
    pub group_by: Vec<BoundExpr>,
    pub limit: Option<BoundExpr>,
    pub offset: Option<BoundExpr>,
    pub order_by: Vec<BoundOrderBy>,
}

#[derive(Debug, Clone, PartialEq)]
pub struct BoundOrderBy {
    pub expr: BoundExpr,
    pub asc: bool,
}

Optimizer 部分和 limit 类似,直接进入 OrderExecutor 部分。

如果对数据进行排序,就需要先获取全量的数据,看起来和 aggregation 的算子有些类似,我们这里也不考虑 external ordering,数据全放在内存中进行。

由于底层数据格式是 arrow 的 RecordBatch,因此我们大量使用了 arrow::compute 提供的排序方法。首先,构造 SortColumn,再传给 lexsort_to_indices 进行字典序排序,获取排序后的 row indices。

接着,利用 compute::take 方法,根据传入的 row indices 进行数据的 reorder。如下代码:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
let sort_columns = self
    .order_by
    .iter()
    .map(|expr| -> Result<SortColumn, ExecutorError> {
        let sort_array = expr.expr.eval_column(&batch)?;
        Ok(SortColumn {
            values: sort_array,
            options: Some(SortOptions {
                descending: !expr.asc,
                ..Default::default()
            }),
        })
    })
    .try_collect::<Vec<_>>()?;

let indices = lexsort_to_indices(&sort_columns, None)?;

let sorted_batch = RecordBatch::try_new(
    schema,
    batch
        .columns()
        .iter()
        .map(|column| {
            take(
                column.as_ref(),
                &indices,
                // disable bound check overhead since indices are already generated from
                // the same record batch
                Some(TakeOptions {
                    check_bounds: false,
                }),
            )
        })
        .try_collect::<Vec<_>>()?,
)?;

对 arrow compute 提供的排序方法,讨论几个有意思的问题:

lexsort_to_indices

lexsort_to_indices 底层是如何实现的?

首先看它的核心实现:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
pub fn lexsort_to_indices(
    columns: &[SortColumn],
    limit: Option<usize>,
) -> Result<UInt32Array> {
    //...

    let lexicographical_comparator = LexicographicalComparator::try_new(columns)?;
    // uint32 can be sorted unstably
    sort_unstable_by(&mut value_indices, len, |a, b| {
        lexicographical_comparator.compare(a, b)
    });

    //...
}

#[inline]
fn sort_unstable_by<T, F>(array: &mut [T], limit: usize, cmp: F)
where
    F: FnMut(&T, &T) -> Ordering,
{
    if array.len() == limit {
        array.sort_unstable_by(cmp);
    } else {
        partial_sort(array, limit, cmp);
    }
}

pub fn partial_sort<T, F>(v: &mut [T], limit: usize, mut is_less: F)
where
    F: FnMut(&T, &T) -> Ordering,
{
    let (before, _mid, _after) = v.select_nth_unstable_by(limit, &mut is_less);
    before.sort_unstable_by(is_less);
}

其中,LexicographicalComparator 作为字典序的比较器,提供给排序算法。

接着 sort_unstable_by 会根据 limit 做排序优化(这也是后续优化器可优化的点)。如果数据行数和 limit 相同,也就是全量排序,这里就直接使用了 rust 标准库中 slice 的 sort_unstable_by 方法,它的内部实现是 pattern-defeating quicksort

回到上面的 limit 优化,如果 sort_unstable_by 时候存在 limit ,排序就会分为两个阶段:第一阶段是 partial_sort,第二阶段是 slice 的 sort_unstable_by。

其中的 partial_sort 内部使用的是 sort::partition_at_index,即 slice 的粗粒度排序,简单的按照取的 index,将数据分为两部分,我们使用前半部分进行排序即可。

compute::take

排序完后,根据 row indices 获取排序后的数据,就需要用到 compute::take 方法了。它会按照传入的 row indices 里的值,按顺序取出数据,就相当于 reorder 了。正如代码中的注释:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
/// Take elements by index from [Array], creating a new [Array] from those indexes.
///
/// ```text
/// ┌─────────────────┐      ┌─────────┐                              ┌─────────────────┐
/// │        A        │      │    0    │                              │        A        │
/// ├─────────────────┤      ├─────────┤                              ├─────────────────┤
/// │        D        │      │    2    │                              │        B        │
/// ├─────────────────┤      ├─────────┤   take(values, indicies)     ├─────────────────┤
/// │        B        │      │    3    │ ─────────────────────────▶   │        C        │
/// ├─────────────────┤      ├─────────┤                              ├─────────────────┤
/// │        C        │      │    1    │                              │        D        │
/// ├─────────────────┤      └─────────┘                              └─────────────────┘
/// │        E        │
/// └─────────────────┘
///    values array            indicies array                              result
/// ```

take 的实现逻辑,highlevel 上来说,就是遍历 indices 来依次从 arrays 里取出对应下标的数据,即 array[index],需要特殊处理就是 null 值的特殊处理,可参考

hash join

join 算子相比之前的来说,算是比较复杂的一个算子了。它需要从 binder 一直改造到 executor。下面会依次介绍每个步骤的考虑。

BoundTableRef

从 high level 角度去看 join 语句,select * from a left join b on a.id = b.id,join 表达式也是一种描述表的形式,不过它描述了2个表,但对上层其它算子来说,它还是当成一个表来处理。比如,a left join b on a.id = b.id,它代表了 a 表 left join b 表,然后生成一个join后的表,提供给上层算子。

同时,join 语句还包含了 on 子句,它可以是 equi 或 non-equi。比如:on a.id = b.idon a.id > 1。因此,在定义 join condition 的时候,需要将这两种情况分开考虑。对于 equi 的表达式,在 join 阶段进行两边的 hash match,而对于 non-equi 的表达式,在 join 后的结果上进行 filter(这里先不考虑下推)。

基于这个思路,我们定义了 Join 的结构:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
pub enum BoundTableRef {
    Table(TableCatalog),
    Join(Join),
}

pub struct Join {
    pub left: Box<BoundTableRef>,
    pub right: Box<BoundTableRef>,
    pub join_type: JoinType,
    pub join_condition: JoinCondition,
}

pub enum JoinType {
    Inner,
    Left,
    Right,
    Full,
    Cross,
}

pub enum JoinCondition {
    On {
        /// Equijoin clause expressed as pairs of (left, right) join columns
        on: Vec<(BoundExpr, BoundExpr)>,
        /// Filters applied during join (non-equi conditions)
        filter: Option<BoundExpr>,
    },
    None,
}

有了 Join 的定义,接着需要考虑如果组装起多个 join 表达式。对于2个表的 join 语句,一个 BoundTableRef::Join 就可以代表。但对于3个表的 join 语句,就需要考虑如果构造 join tree 了。比如,采用 left-deep plan 还是 bushy plan。可参考cockroachlabs join-ordering 中的解释。

我们为了简单考虑,在 binder join ordering 时,采用了 left-deep 的方式,依次将 join 作为 left-child 拼接下去,如下图所示:

/images/sql-query-engine-rs/join-left-deep-plan.png
join-left-deep-plan

举例 left-deep 来说,对于 SQL: select * from a left join b on a.id = b.id inner join c on b.id = c.id,构造的 bound tree 如下:

1
2
3
4
5
6
7
8
9
Join {
    left: Join {
        left: a
        right: b
        join_type: Left
    }
    right: c
    join_type: Inner
}

InputRefRewriter

有了 BoundTree 后,我们定义了 LogicalJoin 和 PhysicalHashJoin 两个 PlanNode,构造方式和之前类似,具体可查看代码。从 BoundTree 到 LogicalPlanTree 的方式,也是 left-deep 去构造。

这里介绍 join 在 optimizer 中特殊处理的部分,即 InputRef 的解析。

回忆一下,之前的 LogicalPlanNode 的解析流程:获取 child 的 bindings 数组,遍历自己的 exprs 从中找到相同 expr 的 index 即可。

而对于 join 来说,它包含2个 child,而 join condition 中的 on keys,是分别作用到不同的 child 上。举例来说:select * from a left join b on a.id = b.id,join on keys = [(a.id, b.id)],join left child = TableScan(a),因此,a.id 的 InputRef index 应该从 TableScan(a) 的 bindings 中匹配。这也符合,join executor 在处理 on keys 的逻辑,后面会提到。

而对于 join condition 的 filter 来说,它是作用于 join 合并后的结果上,因此,它的 InputRef index 是从 left bindings + right bindings 中匹配。

代码如下:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
/// In join executor internal logic, the join condition `on(expr1, expr2)` is
/// spereated into left and right keys, and them are against on different RecordBatch,
/// so the left and right InputRef index should be resolved using different bindings.
/// For join condition `filter` expr, it will against on the last RecordBatch which is merged
/// from left and right RecordBatch, so its InputRef index be resolved using merged bindings.
fn rewrite_logical_join(&mut self, plan: &LogicalJoin) -> PlanRef {
    let new_left = self.rewrite(plan.left());
    let mut right_input_ref_rewriter = InputRefRewriter::default();
    let new_right = right_input_ref_rewriter.rewrite(plan.right());

    // rewrite the condition expr. left index and right index are calculated from different
    // binding.
    let new_on = if let JoinCondition::On { on, filter: _ } = plan.join_condition() {
        let mut on_left_keys = on.iter().map(|o| o.0.clone()).collect::<Vec<_>>();
        let mut on_right_keys = on.iter().map(|o| o.1.clone()).collect::<Vec<_>>();

        // 1.use left bindings to rewrite left keys.
        for expr in &mut on_left_keys {
            self.rewrite_expr(expr);
        }

        // 2.use right bindings to rewrite right keys.
        for expr in &mut on_right_keys {
            right_input_ref_rewriter.rewrite_expr(expr);
        }

        // 3.combine left and right keys into new tuples.
        let new_on = on_left_keys
            .into_iter()
            .zip(on_right_keys.into_iter())
            .map(|(l, r)| (l, r))
            .collect::<Vec<_>>();
        Some(new_on)
    } else {
        None
    };

    // 4.combine the bindings of left and right, and consumed by upper logical plan, such as
    // LogicalProject.
    self.bindings.append(&mut right_input_ref_rewriter.bindings);

    // 5.use merged bindings(left + right) to rewrite condition and the filter will against on
    // result batch in join executor.
    let new_join_condition = if let JoinCondition::On { on: _, filter } = plan.join_condition()
    {
        let new_filter = match filter {
            Some(mut expr) => {
                self.rewrite_expr(&mut expr);
                Some(expr)
            }
            None => None,
        };
        JoinCondition::On {
            on: new_on.unwrap(),
            filter: new_filter,
        }
    } else {
        plan.join_condition()
    };

    Arc::new(LogicalJoin::new(
        new_left,
        new_right,
        plan.join_type(),
        new_join_condition,
    ))
}

JoinOutputSchema

我们知道,实现 join_type 的语义,在最后的 output schema 中,有些列只可能是没有匹配到,也就是说是 nullable 的。

我们支持4中 join 语义,它们从 force nullable 的语义来说,如下:

1
2
3
4
5
6
7
let (left_join_keys_force_nullable, right_join_keys_force_nullable) = match self.join_type {
    JoinType::Inner => (false, false),
    JoinType::Left => (false, true),
    JoinType::Right => (true, false),
    JoinType::Full => (true, true),
    JoinType::Cross => unreachable!(""),
};

举例来说,如果是 a left join b,那么 a 的所有 columns 都不需要 force nullable,也就是说保持原有列的 nullable,对 b 的所有列,force nullable = true,即它的列值可能为 null。

因此,column 的 nullable = force_nullable || c.original_nullable。

最后一点需要注意的是,output schema 也需要考虑到多表 join 的情况,即 ouput schema 也需要递归计算出来。

HashJoinExecutor

到这一步,终于把准备工作完成了,可以开始实现 join 的逻辑了,核心逻辑参考于 datafusion 的 hash_join.rs。

先看下 HashJoinExecutor 的定义,其中的 join_output_schema 就是上一步计算的 schema,用于构建最终的 RecordBatch

1
2
3
4
5
6
7
8
pub struct HashJoinExecutor {
    pub left_child: BoxedExecutor,
    pub right_child: BoxedExecutor,
    pub join_type: JoinType,
    pub join_condition: JoinCondition,
    /// The schema once the join is applied
    pub join_output_schema: Vec<Vec<ColumnCatalog>>,
}

execute 的核心分为 build 和 probe 阶段:

  • build phase:
    • 构造 left child hashtable,其中 one hash key -> multiple rows indices
    • merge all left batches into a single batch,提供后续 probe 取出对应行数据
  • probe phase:
    • 构造 visited_left_side bool_array 来记录 left 边哪些行还未匹配到,在最后阶段更加 join_type 来决定是否加入这些 未匹配到的 left rows
    • 遍历 right child batches,与 left hashtable 匹配,将匹配到的 index 分别记录在 left_indices 和 right_indices 中
    • 取出 left 和 right 对应的行,构造当前迭代的 RecordBatch 返回出去
build phase

build 阶段用到的 create_hashes 函数和之前 HashAgg 一样,输出的是每行对应的 hash 值,但这里需要记录相同 hash 对应的所有 row indices。代码如下:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
#[for_await]
for batch in self.left_child {
    let batch = batch?;

    let left_keys: Vec<_> = on_left_keys
        .iter()
        .map(|key| key.eval_column(&batch))
        .try_collect()?;
    let mut every_rows_hashes = vec![0; batch.num_rows()];
    create_hashes(&left_keys, &hash_random_state, &mut every_rows_hashes)?;

    for (row, hash) in every_rows_hashes.iter().enumerate() {
        left_hashmap
            .entry(*hash)
            .or_insert_with(Vec::new)
            .push(row + left_row_offset);
    }

    left_row_offset += batch.num_rows();
    left_batches.push(batch);
}

if left_batches.is_empty() {
    return Ok(());
}

let left_single_batch = RecordBatch::concat(&left_batches[0].schema(), &left_batches)?;
probe phase

build 阶段把 left child 数据全放在内存中后,probe 就开始依次遍历 right child 的 batch 数据,将满足条件数据返回到上层算子。

第一步,也是通过 create_hashes 构造当前 right batch 每行的 hash 值,right_rows_hashes。

第二步,遍历 right_rows_hashes 与 left hashtable 匹配,将匹配到的 left index 和 right index 分别记录下来。

这里需要注意,根据 join_type 的语义不同,记录 index 的方式也不同。

由于,我们是遍历 right_rows_hashes 去 probe left_hashtable,当 join type 为:

  • Inner / Left 时:
    • right_row 匹配到 left_hashtable,则同时记录 left_index 和 right_index,代表left 和 right 匹配的 index
    • right_row 未匹配到 left_hashtable,则不做记录。代表,这个 right_row 直接过滤了,因为未匹配到 left。从而,来满足 inner / left join 的语义。
  • Right / Full 时:
    • right_row 匹配到 left_hashtable,则同时记录 left_index 和 right_index,代表left 和 right 匹配的 index
    • right_row 未匹配到 left_hashtable,则记录 left index 与 right index 的组合为:null <-> right_index,代表右侧虽然未匹配到数据,但为了满足 right / full join 的语义,必须记录这个 right row,但是它对应的 left row 就为 null。

重新思考 probe 构造的 indices 组合,你会发现,它的构造视角是从 right batch 开始,根据 join_type 来决定是否要保留,右侧未匹配到的 rows,如果 inner/left join 则不保留,如果 right/full join 则需要保留。

所以,可以把这个过程生成的 left/right indices,看成是:返回数据中 右侧必须保留的数据。比如,inner/left join 时,同时 右侧未匹配到数据,而 join 语义要求返回数据以 两侧同时匹配/左侧数据为准,则返回数据中,不需要包含右侧这一行数据,也就可以不记录这个 index。同理,推敲 right/full join 的情况。

probe 阶段最后是,通过上面的 left/right indices 从 left/right batch 中取出对应数据,然后返回给上层算子。

整个过程,总结来说,从右侧取出的这批 batch 数据,在满足 join_type 语义下,返回了 右侧必须保留的数据 + 左侧匹配到的数据。核心代码如下:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
// 1. build left and right indices
let mut left_indices = UInt64Builder::new(0);
let mut right_indices = UInt32Builder::new(0);
match self.join_type {
    JoinType::Inner | JoinType::Left => {
        for (row, hash) in right_rows_hashes.iter().enumerate() {
            if let Some(indices) = left_hashmap.get(hash) {
                for &i in indices {
                    left_indices.append_value(i as u64)?;
                    right_indices.append_value(row as u32)?;
                }
            }
        }
    }
    JoinType::Right | JoinType::Full => {
        for (row, hash) in right_rows_hashes.iter().enumerate() {
            if let Some(indices) = left_hashmap.get(hash) {
                for &i in indices {
                    left_indices.append_value(i as u64)?;
                    right_indices.append_value(row as u32)?;
                }
            } else {
                // when no match, add the row with None for the left side
                left_indices.append_null()?;
                right_indices.append_value(row as u32)?;
            }
        }
    }
    JoinType::Cross => unreachable!("Cross join should not be in HashJoinExecutor"),
}

对这段代码的举例:

left.a 为 [10, 30, 20, 10],right.a 为 [10, 20, 30, 10],它们两做 inner join 完后,得到的相互匹配的两个 indices 为:

  • right: [0, 0, 1, 2, 3, 3] 对应的 on keys 为 [10,10,20,30,10,10]
  • left: [0, 3, 2, 1, 0, 3] 对应的 on keys 为 [10,10,20,30,10,10]

最后,细心的你会发现,整个 join 完的数据中,还漏了一个,左侧必须保留的数据

left-unvisited-rows

左侧必须保留的数据,这个要求自然想到了 Left / Full join 语义。

在 probe 阶段我们维护了 left_unvisited_rows,这这里就发挥了作用。因为,probe 中返回的数据,已经包含了所有满足条件的右侧数据了,所以,这一阶段,只会构造 左侧必须保留的数据 + 右侧相同行数的NULL。

核心代码如下:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
// handle left side unvisited rows: to generate last result_batch which is consist of left
// unvisited rows and null rows on right side.
match self.join_type {
    JoinType::Left | JoinType::Full => {
        let indices = UInt64Array::from_iter_values(
            (0..visited_left_side.len())
                .filter_map(|v| (!visited_left_side.get_bit(v)).then(|| v as u64)),
        );
        let left_array: Vec<_> = left_single_batch
            .columns()
            .iter()
            .map(|col| compute::take(col, &indices, None))
            .try_collect()?;
        let offset = left_array.len();
        let right_array = join_output_schema
            .fields()
            .iter()
            .enumerate()
            .filter(|(idx, _)| *idx >= offset)
            .map(|(_, field)| {
                arrow::array::new_null_array(field.data_type(), indices.len())
            })
            .collect::<Vec<_>>();
        let data = vec![left_array, right_array].concat();
        yield RecordBatch::try_new(join_output_schema.clone(), data)?;
    }
    JoinType::Right | JoinType::Inner => {}
    JoinType::Cross => unreachable!(""),
}

non-equi filter

注意,在 join 最终阶段做 filter 时,也需要考虑到 join_type 语义的语义。

举例来说:有 t1 t2 两个表

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
create table t1(a int, b int, c int);
create table t2(a int, b int, c int);

insert into t1 values (0,4,7), (1,5,8), (2,7,9), (2,8,1);
insert into t2 values (10,2,7), (20,2,5), (30,3,6), (40,4,6);

select * from t1;
+---+---+---+
| a | b | c |
|---+---+---|
| 0 | 4 | 7 |
| 1 | 5 | 8 |
| 2 | 7 | 9 |
| 2 | 8 | 1 |
+---+---+---+

select * from t2;
+----+---+---+
| a  | b | c |
|----+---+---|
| 10 | 2 | 7 |
| 20 | 2 | 5 |
| 30 | 3 | 6 |
| 40 | 4 | 6 |
+----+---+---+

对了它们做 left join filter 结果如下:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
select t1.*, t2.* from t1 left join t2 on t1.a=t2.b;
+---+---+---+--------+--------+--------+
| a | b | c | a      | b      | c      |
|---+---+---+--------+--------+--------|
| 0 | 4 | 7 | <null> | <null> | <null> |
| 1 | 5 | 8 | <null> | <null> | <null> |
| 2 | 7 | 9 | 10     | 2      | 7      |
| 2 | 7 | 9 | 20     | 2      | 5      |
| 2 | 8 | 1 | 10     | 2      | 7      |
| 2 | 8 | 1 | 20     | 2      | 5      |
+---+---+---+--------+--------+--------+

select t1.*, t2.* from t1 left join t2 on t1.a=t2.b and t1.c > t2.c;
+---+---+---+--------+--------+--------+
| a | b | c | a      | b      | c      |
|---+---+---+--------+--------+--------|
| 0 | 4 | 7 | <null> | <null> | <null> |
| 1 | 5 | 8 | <null> | <null> | <null> |
| 2 | 7 | 9 | 10     | 2      | 7      |
| 2 | 7 | 9 | 20     | 2      | 5      |
| 2 | 8 | 1 | <null> | <null> | <null> |
+---+---+---+--------+--------+--------+

具体到我们实现的逻辑是,对最终的 RecordBatch 执行 eval filter expr,获取满足条件的 row indices,然后每个 column 去 filter 这些 rows,也就是上面例子中的 t1(2,7,9) 对应的 rows。

同时我们是 left join 语义,在最后一个 batch 中,会 append unvisit left rows,也就是会补全左侧其它行。因此,left join filter 的改造比较容易。

我们再看下 right join filter:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
select t1.*, t2.* from t1 right join t2 on t1.a=t2.b;
+--------+--------+--------+----+---+---+
| a      | b      | c      | a  | b | c |
|--------+--------+--------+----+---+---|
| 2      | 7      | 9      | 10 | 2 | 7 |
| 2      | 8      | 1      | 10 | 2 | 7 |
| 2      | 7      | 9      | 20 | 2 | 5 |
| 2      | 8      | 1      | 20 | 2 | 5 |
| <null> | <null> | <null> | 30 | 3 | 6 |
| <null> | <null> | <null> | 40 | 4 | 6 |
+--------+--------+--------+----+---+---+

select t1.*, t2.* from t1 right join t2 on t1.a=t2.b and t1.c > t2.c;
+--------+--------+--------+----+---+---+
| a      | b      | c      | a  | b | c |
|--------+--------+--------+----+---+---|
| 2      | 7      | 9      | 10 | 2 | 7 |
| 2      | 7      | 9      | 20 | 2 | 5 |
| <null> | <null> | <null> | 30 | 3 | 6 |
| <null> | <null> | <null> | 40 | 4 | 6 |
+--------+--------+--------+----+---+---+

实现逻辑也是先 eval filter expr,获取 t1(2,7,9) 对应的 rows,但是,我们为了满足 right join 的语义,就必须要输出右侧所有行。

这里有个逻辑:因为最终结果里,right join 边,如果被 filter 掉的数据(比如 t2(30,3,6)),也需要展示出来。因此,我还需要记录 右侧未被 filter 的行(相同 row hash key 算一个)

arrow-datafusion 计算右侧这段逻辑,利用了 right indices 数组中连续的 index 特点。比如,[(0,1), (2,1)] 代表右侧 index=1 对应了 左侧 index=0 和 index=2 的行。但是,逻辑理解起来有点绕。

因此,我们这里采用了计算 visited_left_side 相同的方式来计算 visited_right_side。简单说来,分为几个步骤:

  • 先标记右侧 batch 所有行 visited=false
  • 遍历 right indices,标记 filter 出的 index visited=true
  • 取出 visited_right_side 中 visited=false 的 row index

至此,我们已经可以计算出,右侧必须保留的 row indices = filtered row indices + unvisited row indices

而左侧数据,插入相同行数的 null value 即可满足 join 的语义。

至此,一个单机版的 hash join filter 已经实现完成了。

Summary

至此,我们的 sql-query-engine 已经有支持了一些基础的 SQL,下一个 Roadmap 开始规划优化器的实现。