Contents

How Query Engines Work

记录学习how-query-engines-work要点,相关代码

Type System

构建查询引擎第一步是,选择一个type system,去代表查询引擎处理的数据类型。

0x01: row-based or columnar

查询引擎 需要考虑处理数据的方式:row-by-row 还是 columnar format。

现代大多数查询引擎都是基于Volcano Query Planner,它的每个step本质是一个iterator去访问rows。 它是可以简单实现的模型,但如果处理billions级别的数据时,它的per-row overheads就会体现出来(大量虚函数调用)。

减轻overhead的方式,通过iterator over batches of data 来批量处理数据。

进一步,如果batch操作的是columnar data而不是rows,就可以利用vectorized processing,即利用SIMD去处理一个column里的多个值,在a single CPU instruction里。

0x02: type system

这里使用Apache Arrow作为基础的类型系统。会使用到的类型有

  • Schema: 提供data source的meta data。包含多个fields
  • Field: 在schema中提供一个field的name和data type
  • FieldArray: 为一个field提供columnar storage的数据
  • ArrowType: 代表一种数据类型

这里不是直接使用FieldArray来描述columnar data,而是提供一个ColumnArray接口抽象,来满足不同的column array实现。

0x03: Arrow Columnar Format

开始前先介绍一下Arrow的memory layout

Terminology:

  • Array or Vector: 具有相同类型的数据序列。不同Arrow实现里使用array或vector,都是代表相同的含义
  • Slot: array中的一个logical value
  • Buffer or Contiguous memory region: 一个连续定长的virtual address
  • Physical Layout: array的底层内存布局
  • Parent and child arrays: 嵌套类型结构中的命名关系。List作为parent array,它包含T-type array作为child
  • Primitive type: 没有child的数据类型
  • Nested type: 包含child的数据类型
  • Logical type: 面向application的语义化类型。比如Decimal类型 底层是16bytes的fixed-size binary layout。

Physical Memory Layout Each logical data type has a well-defined physical layout. Here are the different physical layouts defined by Arrow:

  • Primitive (fixed-size): 具有same byte or bit width的值 组成的序列
  • Variable-size Binary: variable byte length组成的序列. Two variants of this layout are supported using 32-bit and 64-bit length encoding.
  • Fixed-size List: a nested layout where each value has the same number of elements taken from a child data type.
  • Variable-size List: a nested layout where each value is a variable-length sequence of values taken from a child data type. Two variants of this layout are supported using 32-bit and 64-bit length encoding.
  • Struct: a nested layout consisting of a collection of named child fields each having the same length but possibly different types.
  • Sparse and Dense Union: a nested layout representing a sequence of values, each of which can have type chosen from a collection of child array types.
  • Null: a sequence of all null values, having null logical type

0x04: 实现

我们基于arrow的array类型,即用一批columnar的数据,来构造一批rows。减少了per-row overheads(虚函数调用),利用SIMD提高CPU一个cycle的处理数据量。

基于此,我们定义了ColumnArray interface,并包含了ArrowFieldArray的实现,用来代表一个column里的数据。通过NewArrowArrayBuilder方法去构造一个ArrayBuilder。

1
2
3
4
5
6
// ColumnArray Abstraction over different implementations of a column vector.
type ColumnArray interface {
	GetType() arrow.DataType
	GetValue(i int) interface{}
	Size() int
}

基于column的数据构造好后,就需要考虑怎么构造一批rows。因此定义了RecordBatch,来组织一批column data,来表示rows。其中的schema定义了column的dataType。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
type RecordBatch struct {
	Schema Schema
	Fields []ColumnArray
}

type Schema struct {
	Fields []Field
}

type Field struct {
	Name     string
	DataType arrow.DataType
}

Data Sources

实现type system后,我们有了用columnar data构建的行数据类型RecordBatch。下一步,就需要构建内存中的RecordBatch数据。

同时,考虑到iterator模型,我们定义了DataSource抽象,来表示如何批量读取数据源。利用了projection只load特定的column,更高效的利用内存。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
type DataSource interface {
	// Schema Return the schema for the underlying data source
	Schema() datatypes.Schema

	// Scan the data source, selecting the specified columns
	Scan() datatypes.RecordBatch

	// Next prepares the next recordBatch for reading with then Scan method
	Next() bool
}

func NewParquetDataSource(filename string, batchSize int, projection []string) *ParquetDataSource {
	p := &ParquetDataSource{filename: filename, batchSize: batchSize}
	p.inferSchema(projection)
	return p
}

这里提供了基于parquet的读取

Logical Plans & Expressions

Logical Plans

一个逻辑计划(logical plan) 代表 data transformation or action,它返回一些行数据(a set of tuples)。

每个logical plan都可以有 其它的logical plans作为inputs。如下Logical Plans:

  • Scan:读取DataSource。
  • Projection:对输入的input logical plan做projection,并且用输入的logical expressions对input plan进行evaluation,比如Column Reference、Math Expression、Aliased Expression等。
  • Selection:对输入的input logical plan做filter,并且用输入的logical expressions去filter,比如Comparison Expression、Boolean Expression。
  • Aggregate:对输入的input logical plan做聚合操作,用输入的group expression做分组,用输入的aggregate expression做聚合函数。
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
type LogicalPlan interface {
	// Schema Returns the schema of the data that will be produced by this logical plan.
	Schema() datatypes.Schema

	// Children Returns the children (inputs) of this logical plan.
	Children() []LogicalPlan

	String() string
}

type Scan struct {
	path       string
	dataSource datasource.DataSource
	projection []string
}

type Projection struct {
	input LogicalPlan
	exprs []LogicalExpr
}

type Selection struct {
	input LogicalPlan
	expr  LogicalExpr
}

type Aggregate struct {
	input     LogicalPlan
	groupExpr []LogicalExpr
	aggExpr   []AggregateExpr
}

重新归纳定义logical plan:通过组装不同功能的logical expressions,来实现不同用处的plan。相当于一系列logical expressions的组合。

Logical Expressions

一个逻辑表达式(logical expression) 代表 将要对input logical plan执行的expression,它最后会返回一个具体的数据类型。它是logical plan的基础构建块,在runtime时被执行。

一些logical expressions例子

Expression例子
Literal Value“hello”, 123
Column Referenceid, name
Math Expressionsalary * tax
Comparison Expressionx >= y
Boolean Expressionbirthday = today() AND age >= 10
Aggregate ExpressionMIN(age), MAX(age), SUM(num), AVG(num), COUNT(*)
Scalar FunctionCONCAT(first_name, “+”, last_name)
Aliased Expressionsalary * 2 AS pay_increase

这些logical expression最终会combined成一个nested expression trees。

logical expression会返回一个数据类型,比如Comparison Expression返回一个Boolean。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
type LogicalExpr interface {
	// ToField Return meta-data about the value that will be produced
	// by this expression when evaluated against a particular input.
	ToField(input LogicalPlan) datatypes.Field

	String() string
}

type Column struct {
	name string
}

func (c Column) ToField(input LogicalPlan) datatypes.Field {
	for _, field := range input.Schema().Fields {
		if field.Name == c.name {
			return field
		}
	}
	panic(fmt.Sprintf("No column named %s", c.name))
}

func (c Column) String() string {
	return fmt.Sprintf("#%s", c.name)
}

重新归纳定义logical expression:它是我们执行引擎的计算逻辑的主要组成部分。

Physical Plans & Expressions

将logical和physical plans明显区分开来的原因是:一个logical plan可以存在 多种physical plan去执行operation。

比如,一个物理计划可以区分CPU和GPU去执行。

比如,一个aggregate或join plan可以通过不同的算法实现,具有不同的trade-offs,如果数据是grouping keys排序了的,则通过sort aggregate来实现。如果数据未排序,大多数采用Hash Aggregate来创建HashMap维护grouping keys。

Physical Expressions

我们上面已经定义了logical plan,因此这章需要实现其对应的physical plan,会在runtime执行evaluate code。

如下定义,一个physical expression会将input的RecordBatch 进行evaluate,并产生一列数据。比如Column Expression会取出recordBatch中的某一列数据。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
// PhysicalExpr containing the code to evaluate the expressions at runtime.
type PhysicalExpr interface {
	// Evaluate the expression against an input RecordBatch and
	// produce a column of data as output.
	Evaluate(input datatypes.RecordBatch) datatypes.ColumnArray
}

// ColumnExpr Column Reference column in a batch by index
type ColumnExpr struct {
	index int
}

func (c ColumnExpr) Evaluate(input datatypes.RecordBatch) datatypes.ColumnArray {
	return input.Field(c.index)
}

还会有一些比较复杂的binary expression,比如And / Or / Eq …,它们最终都会产生Boolean类型的columnArray,其中每一个值都会代表 该行是否满足filter条件。

注意其中的LiteralLongExpr理解:字面量的变量,比如输入val=10,则会构造出一个column,其中每个值都是10,columnArray的size为input的RecordBatch。

1
2
3
4
5
6
7
type LiteralLongExpr struct {
	val int64
}

func (l LiteralLongExpr) Evaluate(input datatypes.RecordBatch) datatypes.ColumnArray {
	return datatypes.NewLiteralValueArray(datatypes.Int64Type, l.val, input.RowCount())
}

Physical Plans

physical expression有了后,就可以组装起physical plans。从ScanPlan扫描数据,到SelectionPlan去filter数据,到最后projection数据选取需要的列。

如下plan定义:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
type ScanExec struct {
	ds         datasource.DataSource
	projection []string
}

type SelectionExec struct {
	// children plan: scan
	input physicalplan.PhysicalPlan

	// filter expressions for input. example: Eq, Or and other BooleanBinaryExpr.
	// these exprs always used in SQL where condition.
	expr physicalplan.PhysicalExpr
}

type ProjectionExec struct {
	// already selected RecordBatch plan
	input physicalplan.PhysicalPlan
	// used for the following exprs
	schema datatypes.Schema
	// projections exprs, like Column, ColumnIndex...
	exprs []physicalplan.PhysicalExpr
}

其中比较复杂的是 aggregate plan,它需要对所有的recordBatch进行 分组聚合操作,也就代表这个plan 内部需要维护一个hashmap,记录分组的keys + 聚合函数的结果,并最后输出它们。如下定义:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
type HashAggregateExec struct {
	// input plan to scan datasource and produce recordBatch
	input physicalplan.PhysicalPlan

	// groupExpr to evaluate recordBatch and produce grouping keys. eg: Col.
	groupExpr []physicalplan.PhysicalExpr

	// aggExpr to evaluate recordBatch and produce needed agg exprs. eg: Col.
	// it will also produce needed accumulators for every aggExprs
	aggExpr []exprs.AggregateExpr

	// schema represents groupExprs and aggExprs
	schema datatypes.Schema
}

其中input还是为Scan plan,groupExpr一般为需要选取的列ColumnExpr,aggExpr为需要执行的聚合函数,schema为最终输出的列(包含groupExpr和aggeExpr)。

Query Optimizations

之前的query plans依赖于用于的输入实现是否高效,这章我们会实现简单的query optimizer来优化query plans

Rule-Based Optimizations

Rule based optimizations是简单直接的优化方式,当logical plan转为physical plan时,rule based optimization会被执行。

处理方法:visitor遍历logical plan,并创建each step的copy,包含一些plan修改。

Projection Push-Down

核心目的是,减少scan plan扫描的column个数。通过递归plan,找出所有expression所需要的columns,并修改scan plan的projection。

Cost-Based Optimization

基于成本的优化,基于一些optimization rules(真实数据的统计信息,是否有索引,join大小表等),从而选择成本最低的logical plan。可以参考这篇博客Apache Calcite RBO和CBO模型

Query Execution

有了optimized后的logical plan,我们可以通过physical plan来执行query。因此需要一个planer去进行logical plan到physical plan的转换。

转换完后的physical plan,直接交给execution执行。如下方法

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
type Ctx struct {
	BatchSize    int
	PhysicalPlan physicalplan.PhysicalPlan
}

func (c *Ctx) Next() bool {
	return c.PhysicalPlan.Next()
}

func (c *Ctx) Execute() datatypes.RecordBatch {
	return c.PhysicalPlan.Execute()
}

SQL Support

到这一章之前,我们已经实现了一个简单的query plan,通过DataFrame API来操作数据。但是,我们还需要一个更加灵活的接口,来支持SQL语句。

这一章我们会实现一个简单的SQL parser,并且支持一些常用的SQL语句。

Tokenizer

第一步,将SQL query string转换为token list,包含keywoards、literals、identifiers、operators等。这一步包含大量体力活。

pratt parser

这里将实现一个基于Top Down Operator Precedence的SQL parser。

本书作者说,尽管有许多方式实现SQL parser,比如Parser Generator and Parser Combinator,但是Pratt parser实现代码会更高效,容易理解和调试。参考tutorial on TDOP parsers

TDOP:Top Down Operator Precedence,即 自上而下的运算符优先级解析。它有以下特点:

  • 通过binding power机制,来决定operator的优先级

    比如有表达式:1 + 2 * 4。我们将运算符*的binding power设置为20,+的binding power设置为10,那么*优先级高于+。因此2就好bind到power更大的*上。

  • 实现不同功能的token,依赖于它们相对于neighbors的位置。可以分为:prefix和infix。

    前缀和中缀运算符,比如+*,前缀运算符只能放在表达式的开头,中缀运算符可以放在表达式的任意位置。

    在pratt中,prefix token被称作nud(for null denotation),infix token被称作led(for left denotation)。lbp(left binding power)和rbp(right binding power)是两个token的binding power。

golang版参考代码,它的核心是一个递归的expression函数。 主要部分是:

  • lbp(left binding power): it tells us how strongly the operator binds to the argument at its left。
  • 一个infix operator都会有left binding power,来表明它左边的argument有多大power作为这个operator的参数。

3 + 1 * 2 * 4 + 5举例来说:+的lbp=10, *的lbp=20

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
<prefix 3, rbp 0, nextOp lbp 10>			# 数字3,它的rbp=0,下一个op的lbp=10,因此数字3属于+这个operator
<infix +, lbp 10>								# 继续递归直到遇到下个op的lbp<=10	
	<prefix 1, rbp 10, nextOp lbp 20>		# 同理,数字1属于*这个operator,
	<infix *, lbp 20>							# 继续递归直到遇到下个op的lbp<=20
		<prefix 2, rbp 20, nextOp lbp 20>	# 数字2,它需要找的rbp=20,下一个op的lbp=20,因此递归结束
	<infix *> = 2							# 返回上一步计算结果: 1*2=2
	<infix *, lbp 20>							# 继续递归直到遇到下个op的lbp<=20
		<prefix 4, rbp 20, nextOp lbp 10>	# 数字4,它需要找的rbp=20,下一个op的lbp=10,因此递归结束
	<infix *> = 8							# 返回上一步计算结果: 2*4=8
<infix +> = 11								# 返回上一步计算结果: 8+3=11
<infix +, lbp 10>								# 继续递归直到遇到下个op的lbp<=10
	<prefix 5, rbp 10, nextOp lbp 0>		# 数字5,它需要找的rbp=10,下一个op的lbp=0,因此递归结束
<infix +> = 16								# 返回上一步计算结果: 11+5=16

Distributed Query Execution

这章目的:介绍distributed query execution如果利用multi-core CPU 和 multi-servers。内容主要都是high level层介绍。

作者在开始前,列出了一些分布式查询引擎 相比单机查询引擎,它复杂的因素有哪些:

  • How do we start and stop executors in a cluster?
  • How do we send a query plan over the network?
  • How do we stream results over the network?
  • How do we avoid undue overhead due to serialization and deserialization costs?
  • How do we support execution of custom code as part of a distributed query?
  • How do we determine when a query should be distributed versus executed on a single node?
  • How do we decide how many executors to use, and how many threads per executor?
  • What happens if an executor fails?

下面内容,会尽可能回答这些问题。

首先,第一步我们需要一个resource scheduler和orchestator存在。现有的比如YARN, Mesos 和Kubernetes。而k8s如今也是容器编排的事实标准。

第二步,需要将Query Plan发送到其它节点,由于我们logical plan和physical plan都是层次化的数据结构,因此这里选用Google的Protocol Buffer。

第三步,executor之间也需要serializing data传输,Apache Arrow提供了IPC(Inter-process communication)的支持。

第四步,至此已经有了serialization formats给query plan和data,因此还需要RPC framework来定义distributed processes如何exchange data。 Arrow Flight则提供这种机制。

Distributed Query Planning:

分布式查询计划相比于单机的查询计划,包含了许多其它overhead cost需要考虑,比如:

  • Memory: We are typically concerned with availability of memory rather than performance. Processing data in memory is orders of magnitude faster than reading and writing to disk.
  • CPU: For workloads that are parallelizable, more CPU cores means better throughput.
  • GPU: Some operations are orders of magnitude faster on GPUs compared to CPUs.
  • Disk: Disks have finite read and write speeds and cloud vendors typically limit the number of I/O operations per second (IOPS). Different types of disk have different performance characteristics (spinning disk vs SSD vs NVMe).
  • Network: Distributed query execution involves streaming data between nodes. There is a throughput limitation imposed by the networking infrastructure.
  • Distributed Storage: It is very common for source data to be stored in a distributed file system (HDFS) or object store (Amazon S3, Azure Blob Storage) and there is a cost in transferring data between distributed storage and local file systems.
  • Data Size: The size of the data matters. When performing a join between two tables and data needs to be transferred over the network, it is better to transfer the smaller of the two tables. If one of the tables can fit in memory than a more efficient join operation can be used.
  • Monetary Cost: If a query can be computed 10% faster at 3x the cost, is it worth it? That is a question best answered by the user of course. Monetary costs are typically controlled by limiting the amount of compute resource that is available.

Query Cost的计算方式有两种:

  • 通过现有的数据统计信息,比如数据量大小,查询用到partition keys等等,来进行预计算。
  • Adaptive Query Execution,每个operator在runtime根据input的数据,来自动调整query plan。

读完总结

可以作为0基础的查询引擎的入门资料,尤其是从RecordBatch定义,到DataSource构建,再到logical plan和physical plan实现,会对查询引擎的基本概念有一定的了解。

但是从SQL Support开始,后面章节的内容过于太generial了,只能做简单了解。

总体来说,还是很适合从0入门的,可以使用自己擅长的语言实现一遍,比如我的golang port

Reference