Contents

Paper Reading: Morsel-Driven Parallelism

这篇paper是 Morsel-Driven Parallelism: A NUMA-Aware Query Evaluation Framework for the Many-Core Age, in SIGMOD, 2014。

对应15-721的Scheduling部分,主要涉及数据库的并行查询执行框架,它已经在HyPer中使用,用来解决many-cores和NUMA情况下,查询的scalability问题。

Introduction

随着现代计算机架构发展,cpu cores不断增多,为了解决单点memory controller的性能问题,many-cores的架构基于了decentralized memory controller,即 Non-Uniform Memory Access (NUMA)。

如下图,两种基于Intel CPU硬件架构,它们都是32 cores,支持4路socket。但是 Nehalem EX 的每两个socket之间可以直接通信,而 Sandy Bridge EP 上间隔的socket只能通过中间socket通信。

同时,也观察到,访问其它socket的内存带宽,只有访问本socket的带宽一半。因此,在many-cores情况下,实现NUMA-local可以有效提高内存访问带宽。

/images/morsel-driven-parallelism/figure10.png
2种硬件平台,它们的网络拓扑 和 理论带宽

本文的思路:

以往的volcano模型,各个算子之间感知不到并行执行,只有exchange operator负责并行数据的交互,并行的thread运行着相同的pipeline。这种模式,称作 plan-driven,optimizer在查询编译阶段已经决定好,运行的线程数。

而HyPer采用了,一种自适应的数据驱动的scheduling方案,将数据分为细粒度单元(morsel),然后将其与对应的operator pipeline(一系列的operators组成的执行序列),封装为task,交给worker pool去执行,从而实现了运行时动态调度。同时它还结合NUMA-local,实现内存最大化访问。

Volcano-based parallel framework的问题,参考下图中的exchange operator位置:

/images/morsel-driven-parallelism/volcano-figure5.png
exchange operator在volcano parallel framework中

operator感知不到并行,同时 shared state 是被避免的,这就要求了,在exchange operator中做即时(on-the-fly)的数据分区后,在assign给每个thread的pipeline。这个过程是静态的分配,它带来的问题是,提前的分区并不总是有效果的,相比来说,NUMA-local aware的dispatcher可以带来更多的优势。后面会提到。

由上面几点引出,HyPer查询引擎的主要特点:

  • Morsel-driven query execution:不同于传统volcano模型,它是 动态分发任务的 并行查询引擎。优势:充分利用的CPU资源,带来的elasticity。比如:同一时刻,CPU之间可以被assign不同的查询task
  • 实现了一系列的parallel algorithms
  • 将NUMA-awareness带入到database领域

Pipeline parallelization

如下图,是一个三张表join的例子,左边为关系代数表达式,右边为并行化执行的过程。

/images/morsel-driven-parallelism/figure2.png
pipeline并行化示例

QEPobject 将可执行的pipeline交给dispatcher,同时它会分配一个临时区域(NUMA-local)给执行线程,在pipeline执行完后,将执行结果分割成细粒度的morsel后,写入其中。这样,后续的pipeline基于分割好的morsel继续执行。

上图中的每一种颜色,代表CPU拓扑图中的每一个socket内的pipeline线程,每个pipeline线程一次处理一个morsel,所以可以动态调整线程数(基于硬件CPU的线程数)。是不是有点像单机版的map-reduce计算模型,thread类比计算资源,NUMA-local类比本地数据。

/images/morsel-driven-parallelism/figure3.png
并行执行结果合并

如上图,是在join阶段需要构造的hashtable。上一阶段已经将scan和filter的结果写入NUMA-local的区域,这一阶段会有worker线程扫描这些区域,并写入全局hashtable中。整个过程由dispatcher来控制并发插入。

总结:与volcano不同的是,morsel-driven并行执行的pipeline,它们之间并不是独立的,需要共享数据结构并同步(build hashtable阶段)。同时,它主要特点是,并行度是弹性的。

Dispatcher

/images/morsel-driven-parallelism/figure5.jpg
dispatcher的工作流

dispatcher负责给并行的pipeline分配计算资源,它内部维护着一个pipeline jobs的list(全局的无锁队列),当一个thread请求一个task时,会优先分配thread当前socket中的morsel,否则会steal一个其它socket中的morsel,保证负载均衡(每个CPU都有活干)。

dispatcher的本质是一个逻辑上的概念,为了防止它成为并发竞争点,它的逻辑存在每个thread请求task的逻辑中。

从更上层来看,dispatcher负责所有的query,因此我们可以基于不同的调度策略,来提高整个系统的吞吐量。比如,一个优先级较高的TP查询,会被优先插入到pipeline job list中;系统query过多时,动态调整保证 所有query都能交错执行,提高可用性。

Parallel Operators

HyPer针对每个算子,实现了并行化的优化。

Hash Join

Hash Join分为2个阶段:

  1. 每个thread读取NUMA-local的数据进行filter,并将结果写入到thread-local临时存储
  2. 采用lock-free hashtable结构,每个thread并发将每个记录的pointer插入到全局hashtable中,通过CAS指令

lock-free hashtable结构如下图,它是在每个bucket的header中,加入了一个tag,通过这个tag就能知道,是否命中了当前bucket,有点类似Bloom Filter。 文中解释了为什么没用Bloom Filter:对大表来说,布隆过滤器只适合相对较慢的缓存,因为布隆过滤器的大小 必须与 哈希表的大小 成正比,才能有效。因此,对大表来说,开销会很大。

读者对这一点的猜测:如果构造的hashtable很大,则代表布隆过滤器的bit array会很大,同时为了避免hash冲突,hash函数也需要增多,这些都会带来额外的overhead。而文中的tag只有16位,具体细节没有透露,但作为基于内存的查询缓存,文中方式可能会更加适合。

/images/morsel-driven-parallelism/figure7.jpg
lock free tagged hash table

Grouping/Aggregation

聚合算子,本质就是一个HashAggregation,它内部需要构造一个hashtable,用来分组计算。

文章主要涉及,两阶段聚合,通过并行提高性能:

  1. local pre-aggregation阶段:每个thread并行地将 NUMA-local 的数据,聚合到size-fixed的thread-local hashtable中,直到这个hashtable写满(size固定的),也就是下图中的小ht;写满后,会将结果分别写到对应的 thread-local partitions中,这一步相当于轻量级地分区数据收集;当所有input的morsel-data都聚合到了thread-local partitions后,第一阶段完成,构成了一个逻辑上的全局partition。
  2. partition-wise aggregation:各个thread去访问,全局partition中属于自己的partition,然后聚合到自己的thread-local的hashtable中,提供给后续算子使用,第二阶段完成,构成了一个分区后的聚合结果。

整个过程,很类似于分治算法,对最开始NUMA-local的数据,在读最快的地方,执行了聚合函数。接着进行分区,将相同partition的数据放在一个thread的NUMA-local里,方便后续 相同partition的数据 在一个socket内,从而达到NUMA-local的计算效果。

/images/morsel-driven-parallelism/figure8.jpg
分治聚合

Sorting

并行排序算子,策略和上面类似:local order -> 并行merge sort

  1. 每个thread会将NUMA-local的数据先排序,然后根据local等距的separator key,计算出global的separator key,将数据分割为对应global的小块
  2. 每个thread在自己负责的范围内,执行merge sort。
/images/morsel-driven-parallelism/figure9.jpg
Merge Sort

总结

文章最后的evaluation贴图,这里就不重复展示了,但会列出每种测试的核心点:

  • figure11:表明了NUMA aware会带来的很大的性能提升
  • table1:rd和wr代表内存读写速度,remote表示访问其它socket数据比例。直观看出,HyPer的内存读取带宽利用率更高
  • figure12:随着并行查询增大,系统整体的吞吐和延迟基本保持平衡。体现HyPer的动态调度带来吞吐的提升
  • figure13:优先级更高的q14会被优先调度完成。体现了动态调度的优势

最后再重复一下,上面提到的HyPer核心特点:

  • 多核时代下的,morsel driver的动态调度引擎
  • NUMA-aware,更高效内存访问
  • parallel operators,支持并行化的算子,尤其是table scan/aggregation/join

Reference

  • Volcano - An Extensible and Parallel Query Evaluation System

    • 解耦 + 抽象 + 标准接口,从而将各个operator独立起来考虑,提供data stream的抽象,各个operator可以灵活组合和协作,增加新算子/算法,对原有完全不需要修改,很容易扩展(似乎灵活性+扩展性是整个volcano project的核心目标)
    • 整个框架实现了operator的组合和数据的整体处理流程,每个operator实现的就是数据的处理流程,但这些都是机制(mechanism),与具体的执行策略(policy)无关,两者是隔离的,因此具有很强的灵活性
    • exchange operator:可以帮助实现parallel query,将数据处理与并行化进行了隔离。
  • 十年后数据库还是不敢拥抱NUMA?