limit 和 order by:select id from employee order by id desc offset 2 limit 1
join:select employee.first_name, department.department_name from employee left join department on employee.department_id = department.id and department.id > 1
letsort_columns=self.order_by.iter().map(|expr|-> Result<SortColumn,ExecutorError>{letsort_array=expr.expr.eval_column(&batch)?;Ok(SortColumn{values: sort_array,options: Some(SortOptions{descending: !expr.asc,..Default::default()}),})}).try_collect::<Vec<_>>()?;letindices=lexsort_to_indices(&sort_columns,None)?;letsorted_batch=RecordBatch::try_new(schema,batch.columns().iter().map(|column|{take(column.as_ref(),&indices,// disable bound check overhead since indices are already generated from
// the same record batch
Some(TakeOptions{check_bounds: false,}),)}).try_collect::<Vec<_>>()?,)?;
排序完后,根据 row indices 获取排序后的数据,就需要用到 compute::take 方法了。它会按照传入的 row indices 里的值,按顺序取出数据,就相当于 reorder 了。正如代码中的注释:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
/// Take elements by index from [Array], creating a new [Array] from those indexes.
///
/// ```text
/// ┌─────────────────┐ ┌─────────┐ ┌─────────────────┐
/// │ A │ │ 0 │ │ A │
/// ├─────────────────┤ ├─────────┤ ├─────────────────┤
/// │ D │ │ 2 │ │ B │
/// ├─────────────────┤ ├─────────┤ take(values, indicies) ├─────────────────┤
/// │ B │ │ 3 │ ─────────────────────────▶ │ C │
/// ├─────────────────┤ ├─────────┤ ├─────────────────┤
/// │ C │ │ 1 │ │ D │
/// ├─────────────────┤ └─────────┘ └─────────────────┘
/// │ E │
/// └─────────────────┘
/// values array indicies array result
/// ```
take 的实现逻辑,highlevel 上来说,就是遍历 indices 来依次从 arrays 里取出对应下标的数据,即 array[index],需要特殊处理就是 null 值的特殊处理,可参考。
从 high level 角度去看 join 语句,select * from a left join b on a.id = b.id,join 表达式也是一种描述表的形式,不过它描述了2个表,但对上层其它算子来说,它还是当成一个表来处理。比如,a left join b on a.id = b.id,它代表了 a 表 left join b 表,然后生成一个join后的表,提供给上层算子。
而对于 join 来说,它包含2个 child,而 join condition 中的 on keys,是分别作用到不同的 child 上。举例来说:select * from a left join b on a.id = b.id,join on keys = [(a.id, b.id)],join left child = TableScan(a),因此,a.id 的 InputRef index 应该从 TableScan(a) 的 bindings 中匹配。这也符合,join executor 在处理 on keys 的逻辑,后面会提到。
而对于 join condition 的 filter 来说,它是作用于 join 合并后的结果上,因此,它的 InputRef index 是从 left bindings + right bindings 中匹配。
/// In join executor internal logic, the join condition `on(expr1, expr2)` is
/// spereated into left and right keys, and them are against on different RecordBatch,
/// so the left and right InputRef index should be resolved using different bindings.
/// For join condition `filter` expr, it will against on the last RecordBatch which is merged
/// from left and right RecordBatch, so its InputRef index be resolved using merged bindings.
fnrewrite_logical_join(&mutself,plan: &LogicalJoin)-> PlanRef{letnew_left=self.rewrite(plan.left());letmutright_input_ref_rewriter=InputRefRewriter::default();letnew_right=right_input_ref_rewriter.rewrite(plan.right());// rewrite the condition expr. left index and right index are calculated from different
// binding.
letnew_on=ifletJoinCondition::On{on,filter: _}=plan.join_condition(){letmuton_left_keys=on.iter().map(|o|o.0.clone()).collect::<Vec<_>>();letmuton_right_keys=on.iter().map(|o|o.1.clone()).collect::<Vec<_>>();// 1.use left bindings to rewrite left keys.
forexprin&muton_left_keys{self.rewrite_expr(expr);}// 2.use right bindings to rewrite right keys.
forexprin&muton_right_keys{right_input_ref_rewriter.rewrite_expr(expr);}// 3.combine left and right keys into new tuples.
letnew_on=on_left_keys.into_iter().zip(on_right_keys.into_iter()).map(|(l,r)|(l,r)).collect::<Vec<_>>();Some(new_on)}else{None};// 4.combine the bindings of left and right, and consumed by upper logical plan, such as
// LogicalProject.
self.bindings.append(&mutright_input_ref_rewriter.bindings);// 5.use merged bindings(left + right) to rewrite condition and the filter will against on
// result batch in join executor.
letnew_join_condition=ifletJoinCondition::On{on: _,filter}=plan.join_condition(){letnew_filter=matchfilter{Some(mutexpr)=>{self.rewrite_expr(&mutexpr);Some(expr)}None=>None,};JoinCondition::On{on: new_on.unwrap(),filter: new_filter,}}else{plan.join_condition()};Arc::new(LogicalJoin::new(new_left,new_right,plan.join_type(),new_join_condition,))}
pubstructHashJoinExecutor{publeft_child: BoxedExecutor,pubright_child: BoxedExecutor,pubjoin_type: JoinType,pubjoin_condition: JoinCondition,/// The schema once the join is applied
pubjoin_output_schema: Vec<Vec<ColumnCatalog>>,}
execute 的核心分为 build 和 probe 阶段:
build phase:
构造 left child hashtable,其中 one hash key -> multiple rows indices
merge all left batches into a single batch,提供后续 probe 取出对应行数据
probe phase:
构造 visited_left_side bool_array 来记录 left 边哪些行还未匹配到,在最后阶段更加 join_type 来决定是否加入这些 未匹配到的 left rows
遍历 right child batches,与 left hashtable 匹配,将匹配到的 index 分别记录在 left_indices 和 right_indices 中
right_row 匹配到 left_hashtable,则同时记录 left_index 和 right_index,代表left 和 right 匹配的 index。
right_row 未匹配到 left_hashtable,则记录 left index 与 right index 的组合为:null <-> right_index,代表右侧虽然未匹配到数据,但为了满足 right / full join 的语义,必须记录这个 right row,但是它对应的 left row 就为 null。
重新思考 probe 构造的 indices 组合,你会发现,它的构造视角是从 right batch 开始,根据 join_type 来决定是否要保留,右侧未匹配到的 rows,如果 inner/left join 则不保留,如果 right/full join 则需要保留。
// 1. build left and right indices
letmutleft_indices=UInt64Builder::new(0);letmutright_indices=UInt32Builder::new(0);matchself.join_type{JoinType::Inner|JoinType::Left=>{for(row,hash)inright_rows_hashes.iter().enumerate(){ifletSome(indices)=left_hashmap.get(hash){for&iinindices{left_indices.append_value(iasu64)?;right_indices.append_value(rowasu32)?;}}}}JoinType::Right|JoinType::Full=>{for(row,hash)inright_rows_hashes.iter().enumerate(){ifletSome(indices)=left_hashmap.get(hash){for&iinindices{left_indices.append_value(iasu64)?;right_indices.append_value(rowasu32)?;}}else{// when no match, add the row with None for the left side
left_indices.append_null()?;right_indices.append_value(rowasu32)?;}}}JoinType::Cross=>unreachable!("Cross join should not be in HashJoinExecutor"),}
// handle left side unvisited rows: to generate last result_batch which is consist of left
// unvisited rows and null rows on right side.
matchself.join_type{JoinType::Left|JoinType::Full=>{letindices=UInt64Array::from_iter_values((0..visited_left_side.len()).filter_map(|v|(!visited_left_side.get_bit(v)).then(||vasu64)),);letleft_array: Vec<_>=left_single_batch.columns().iter().map(|col|compute::take(col,&indices,None)).try_collect()?;letoffset=left_array.len();letright_array=join_output_schema.fields().iter().enumerate().filter(|(idx,_)|*idx>=offset).map(|(_,field)|{arrow::array::new_null_array(field.data_type(),indices.len())}).collect::<Vec<_>>();letdata=vec![left_array,right_array].concat();yieldRecordBatch::try_new(join_output_schema.clone(),data)?;}JoinType::Right|JoinType::Inner=>{}JoinType::Cross=>unreachable!(""),}