一、总体介绍
1.1 背景介绍
近年来,随着存储硬件的革新与网络技术的突飞猛进,如NVMe SSD和超高速网络接口的普及应用,I/O性能瓶颈已得到显著改善。然而,在2020年及以后的技术背景下,尽管SSD速度通过NVMe接口得到了大幅提升,并且网络传输速率也进入了新的高度,但CPU主频发展并未保持同等步调,3GHz左右的核心频率已成为常态。
在当前背景下Apache Spark等大数据处理工具中,尽管存储和网络性能的提升极大地减少了数据读取和传输的时间消耗,但Apache Spark框架基于类火山模型的行式处理,在执行复杂查询、迭代计算时对现代CPU并行计算特性和向量化计算优势的利用率仍然有待提高。同时,传统TCP/IP网络通信模式下,CPU承担了大量的协议解析、包构建和错误处理任务,进一步降低了整体数据处理效率,这导致Apache Spark 在实际运行中并没有达到网络、磁盘、CPU的IO瓶颈。
1.2 挑战和困难
在Apache Spark的数据处理流程中,如上图所示,整个过程从数据源开始,首先经历数据加载阶段。Spark作业启动时,任务被分配到各个运算节点(executor),它们从诸如HDFS、S3(Amazon Simple Storage Service)或其他支持的存储系统中高效地获取数据。
一旦数据成功加载至内存或磁盘上,运算节点首先开始数据的解压缩工作,然后执行相应的计算操作,例如map、filter、reduceByKey等转换操作。
接下来,在执行涉及不同分区间数据交换的操作(如join、groupByKey)时,Spark会触发Shuffle阶段。在这个阶段,各个executor将计算后的中间结果按照特定键值进行排序,并通过网络传输至其他executor,以便进行进一步的合并和聚集操作。
完成Shuffle后,数据再次经过一轮或多轮计算处理,以产出最终结果。当所有任务完成后,结果数据会被压缩后写回目标存储系统,如HDFS、数据库或其他外部服务,从而完成整个数据处理生命周期。
综上所述,在Apache Spark的数据处理过程中,尽管其设计高度优化,但仍面临多个技术挑战:
首先,传统硬件中,数据传输需要CPU解码并通过PCIe总线多次搬运至内存和硬件,使用反弹缓冲区,虽然是临时存储但也增加复杂度与延迟;列式存储(Parquet、ORC)高效压缩同类数据减少I/O,但读取时的解压和编解码操作加重了CPU负担,尤其在仅需处理部分列时效率下降;Apache Spark采用行式处理,如DataSourceScanExec按行扫描可能导致冗余加载,并因频繁调用“next”及虚函数引发CPU中断,全行扫描对部分列查询性能损耗显著。
其次,随着数据量不断激增以及IO技术的提升,基于CPU的优化带来的收益越来越不明显,传统的CPU算力逐渐成为计算瓶颈。在涉及特定算子操作时,性能问题尤为突出。例如,高散列度数据的 join,高散列度数据的 aggregate。在执行join操作时,尤其当采用基于哈希的join策略时,因为数据散列程度越高,哈希计算的负载就越大,对CPU和内存的计算能力要求也就越高。哈希计算是一个计算密集型任务,需要CPU执行大量的计算操作,并且可能涉及到内存的读取和写入。当数据散列程度较高时,哈希计算的复杂度增加,可能导致CPU和内存的使用率增加,从而影响系统的整体性能。其次,数据散列程度较高可能会增加哈希冲突的概率,进而影响内存的使用效率。当哈希冲突较多时,可能需要额外的操作来解决冲突,例如使用链表或者开放地址法来处理冲突。这些额外的操作可能会占用额外的内存空间,并且增加内存的读写次数,从而降低内存的使用效率。
此外,Shuffle作为Spark计算框架中决定整体性能的关键环节之一,其内部包含了多次序列化与反序列化过程、跨节点网络传输以及磁盘IO操作等复杂行为。这些操作不仅增加了系统开销,而且可能导致数据局部性的丧失,进一步拖慢整个任务的执行速度。因此,如何优化Shuffle过程,减小其对系统资源的影响,是Spark性能调优的重要方向。
因此,面对硬件条件的新格局,开发者不仅需要深入研究如何优化Apache Spark内部机制以适应大规模并行计算需求,还应探索将特定类型的数据运算任务转移到诸如GPU、FPGA或其他专用加速器等更高效能的硬件上,从而在CPU资源有限的情况下,实现更高层次的大数据处理性能提升。
二、整体方案
我们采用软硬件结合方式,无侵入式的集成Apache Spark,并在Spark数据计算全链路的3大方面都进行了全面提升和加速。
在「数据加载」方面,自研的DPU数据加载引擎直读硬件存储的数据到计算卡中,优化掉大量的内存与加速卡的PCIe数据传输性能损耗,同时在计算卡中进行数据的解压缩计算。同理在最后的结果输出阶段,也在计算卡中进行数据压缩。
在「离线计算」方面,自研的DPU计算引擎拥有强大的计算能力、容错能力、并满足 Spark 引擎日益复杂的离线处理场景和机器学习场景。
在「数据传输」方面,采用基于远程直接内存访问(RDMA)技术,以提高集群间的数据传输效率。
三、核心加速阶段
加速阶段如下图所示,核心数据加速分为四个部分,分别为 1.数据读取阶段;2.任务处理阶段;3.Shuffle数据传输阶段;4.数据输出阶段。
3.1数据加载阶段
3.1.1 面临挑战
在Apache Spark的数据处理流程中,数据加载阶段是整个ELT(Extract, Load, Transform)作业的关键起始步骤。首先,当从传统硬件架构的数据源加载本地数据时,面临如下挑战:
数据传输瓶颈:在传统的硬件体系结构中,数据加载过程涉及到多次通过CPU和PCIe总线的搬运操作。具体而言,每一份数据需要先从数据源经过CPU解码并通过PCIe通道传输至系统内存,然后再次经由CPU控制并经过PCIe接口发送到特定硬件如GPU进行进一步处理。在此过程中,“反弹缓冲区”作为临时存储区域,在系统内存中起到了桥接不同设备间数据传输的作用,但这也会增加数据搬移的复杂性和潜在延迟。
列式存储的优势与挑战:在大数据环境中,列式存储格式如Parquet和ORC等被广泛采用,因其高效性而备受青睐。由于同一列中的数据类型相同,可以实现高效的压缩率,并且内存访问模式相对线性,从而减少I/O开销。然而,这也带来了问题,即在读取数据前必须进行解压缩处理,这无疑增加了CPU额外的计算负担,尤其是在需要对部分列进行运算的情况下。
行式处理效率考量:在早期版本或某些特定场景下,Apache Spark在执行物理计划时倾向于行式处理数据。例如DataSourceScanExec算子负责底层数据源扫描,其默认按行读取数据,这种策略可能导致不必要的列数据冗余加载及频繁调用“next”方法获取下一行记录。这一过程中,大量的虚函数调用可能引发CPU中断,降低了处理效率,特别是在仅关注部分列时,会因为全行扫描而引入额外的性能损失。
在数据压缩解压缩过程中,压缩解压缩策略选择阶段是整个过程的开始。传统的硬件体系结构中,数据的压缩和解压缩过程通常只能依赖CPU完成,没有其他策略可以选择,从而无法利用GPU、DPU等其他处理器资源。这种局限性导致数据压缩解压缩过程会大量占用CPU资源,同时,与DPU相比,CPU的并行处理能力相对较弱,无法充分发挥硬件资源的潜力。在大规模数据处理的场景下,数据压缩解压缩过程可能成为CPU的瓶颈,导致系统性能下降。此外,由于数据压缩解压缩是一个计算密集型任务,当系统中同时存在其他需要CPU资源的任务时,压缩解压缩过程可能会与其他任务产生竞争,进一步加剧了CPU资源的紧张程度,导致系统整体的响应速度变慢。
3.1.2 解决方案与原理
在DPU(Data Processing Unit)架构设计中,采用了一种直接内存访问(DMA)技术,该技术构建了从DPU内存到存储设备之间直接的数据传输路径。相较于传统的数据读取方式,DMA机制有效地消除了CPU及其回弹缓冲区作为中间环节的必要性,从而显著提升了系统的数据传输带宽,并减少了由数据中转造成的CPU延迟以及利用率压力。
具体而言,在DPU系统内部或与之紧密集成的存储设备(例如NVMe SSD)上,内置了支持DMA功能的引擎,允许数据块以高效、直接的方式在存储介质与DPU内存之间进行双向传递。为了确保这一过程的精确执行,系统精心设计了针对DMA操作的专用机制和缓存管理策略,其中包含了由存储驱动程序发起的DMA回调函数,用于验证并转换DPU内存中的虚拟地址至物理地址,进而保证数据能够准确无误地从NVMe设备复制到DPU指定的内存区域。
当应用程序通过虚拟文件系统(VFS)向底层硬件提交DPU缓冲区地址作为DMA目标位置时,用户空间库将捕获这些特定于DPU的缓冲区地址,并将其替换为原本提交给VFS的代理CPU缓冲区地址。在实际执行DMA操作之前,运行于DPU环境的软件会在适当的时机调用相关接口,识别出原始的CPU缓冲区地址,并重新提供有效的DPU缓冲区地址,以便DMA操作能正确且高效地进行。由此,在不增加CPU处理负担的前提下,实现了NVMe设备与DPU内存间数据块迁移的高度优化流程。
3.1.3 优势与效果
吞吐提升:CPU的PCIe吞吐可能低于DPU的吞吐能力。这种差异是由于基于服务器的PCIe拓扑到CPU的PCIe路径较少。如图所示DPU支持直接数据路径(绿色),而非通过CPU中的反弹缓冲区间接读取的路径(红色)。这可以提高带宽、降低延迟并减少CPU和DPU吞吐量负载。
降低延迟:DPU直读数据路径只有一个副本,直接从源到目标。如果CPU执行数据移动,则延迟可能会受到CPU可用性冲突的影响,这可能会导致抖动。DPU缓解了这些延迟问题。
提升CPU利用率:如果使用CPU移动数据,则投入到数据搬运的CPU利用率会增加,并干扰CPU上的其余工作。使用DPU可减少CPU在数据搬用的工作负载,使应用程序代码能够在更短的时间内运行。
解压缩:DPU读取parquet文件的同时会将文件解压,不用通过CPU进行编译码与解压计算,直接进行谓词下推减少读取数据量从而提升数据读取效率。
更适合列式处理数据结构:列式数据结构在运算中有更好的性能,有利于Spark的Catalyst优化器做出更加智能的决策,如过滤条件下推、列剪枝等,减少了不必要的计算和数据移动。在执行阶段会对列式数据进行向量化操作,将多条记录打包成一个批次进行处理,提升运算效率。
3.2离线计算阶段
3.2.1 面临挑战
2015年基于Spark Summit调研显示,2010年硬件的基本情况是存50+MB/s(HDD),网络是1Gpbs,CPU是~3GHz;五年后,存储和网络都有了10倍以上的提升,但是CPU却并没有什么变化。
2020年,硬件的变化让io性能有了进一步提升。SSD有了NVMe接口,同时有了超高速网络,但CPU仍然是3赫兹。那么当下我们的挑战是在这样的硬件条件下,如何最大化CPU性能,如何使用更高效的硬件替代CPU进行专业数据运算。
3.2.2 解决方案与原理
我们考虑如何将具有高性能计算能力的DPU用到 Spark 里来,从而提升 Spark 的计算性能,突破 CPU 瓶颈。接下来将介绍DPU计算引擎:
上图是DPU的整体设计。目前支持的算子覆盖Spark生产环境常用算子,包括Scan、Filter、Project、Union、Hash Aggregation、Sort、Join、Exchange等。表达式方面,我们开发了目前生产环境常用的布尔函数、Sum/Count/AVG/Max/Min等聚合函数。
在整个流转过程中,RACE Plugin层起到承上启下的关系:
1. 最核心的是Plan Conversion组件,在Spark优化 Physical Plan时,会应用一批规则,Race通过插入的自定义规则可以拦截到优化后的Physical Plan,如果发现当前算子上的所有表达式可以下推给DPU,那么替换Spark原生算子为相应的可以在DPU上执行的自定义算子,由HADOS将其下推给DPU 来执行并返回结果。
2. Fallback组件,Spark支持的Operator和Expression非常多,在Race研发初期,无法 100% 覆盖 Spark 查询执行计划中的算子和表达式,因此 Race必须有先前兼容回退执行的能力。
3. Strategy组件, 因为fallback 这个 operator 前后插入行转列、列转行的算子。因为这两次转换对整个执行的过程的性能损耗是很大的。针对这种情况,最稳妥的方式就是整个子树Query全部回退到CPU,而选择哪些情况下执行这个操作至关重要。
4. Metric组件,Race会收集DPU执行过程中的指标统计,然后上报给Spark的Metrics System做展示、Debug、API调用。
3.2.3 优势与效果
通过Spark Plugin机制成功地将Spark计算任务卸载至DPU上执行,充分利用了DPU强大的计算处理能力,有效解决了CPU在复杂数据处理中的性能瓶颈问题。经过一系列详尽的测试与验证,在TPC-DS基准测试中99条SQL语句的执行表现显著提升:
显著提高查询性能:本方案在相同硬件条件下,使得单个查询的执行时间最多可缩短到原来的四分之一左右,即最高性能提升达到4.48倍;而在表达式操作层面,性能优化效果更为突出,某些情况下甚至能提升至原始速度的8.47倍。
算子级加速明显:在对关键算子如Filter和哈希聚合操作进行评估时,相较于原生Spark解决方案,Filter算子的执行效率提升了高达43倍,而哈希聚合算子性能也提升了13倍。这得益于我们减少了列式数据转换为行式数据的额外开销,以及DPU硬件层面对运算密集型任务的强大加速作用。
大幅度降低CPU资源占用:通过DPU加速卸载后,系统资源利用率得到显著改善。在TPC-DS测试场景下,CPU平均使用率从60%大幅下降至5%,释放出更多CPU资源用于其他业务逻辑处理,增强了系统的整体并发能力和响应速度。
3.3数据传输阶段
3.3.1 面临挑战
在Apache Spark的数据处理框架中,Shuffle阶段扮演着至关重要的角色,然而,该过程因其涉及大规模数据在网络中的传输而显著增加了执行时间,容易成为制约Spark作业性能的关键瓶颈环节。传统的网络通信机制在 Shuffle 过程中的表现不尽如人意。具体表现为:
· 数据的发送和接收过程中,操作系统内核参与了必要的管理与调度工作,这一介入导致了额外的延迟开销。每个数据单元在经过操作系统的网络协议栈进行传递时,需历经多次上下文切换以及数据复制操作,这些都无形中加重了系统负担。
· 另一方面,在基于传统TCP/IP等网络协议的通信模式下,CPU需要承担大量的协议解析、包构建及错误处理等任务,这不仅大量消耗了宝贵的计算资源,而且对通信延迟产生了不利影响,进一步降低了整体的数据处理效率。
3.3.2 解决方案与原理
将RDMA技术应用于Apache Spark,尤其是在Shuffle过程中,可以大幅度减轻网络瓶颈带来的影响。通过利用RDMA的高带宽和低延迟特性,Spark的数据处理性能有望得到显著的提升。
RDMA技术允许网络设备直接访问应用程序内存空间,实现了内核旁路(kernel bypass)。这意味着数据可以直接从发送方的内存传输到接收方的内存,无需CPU介入,减少了传输过程中的延迟。如下图所示。
3.2.3 优势与效果
在该方案中,Netty客户端被RDMA客户端所取代,并充分利用了RDMA单边操作的特性。具体实现时,磁盘数据通过内存映射(MMAP)技术加载至用户空间内存,此后,客户端利用RDMA能力在网络层面执行直接内存访问操作。这一改进避免了数据在操作系统内核内存和网络接口间多次复制,从而提高了数据传输速度、降低了延迟并减轻了CPU负载。
数据传输效率提升:得益于RDMA的低延迟与高带宽优势,Spark中的数据处理速率显著提高。这是因为RDMA能够实现在网络设备与应用内存之间的直接数据传输,减少了对CPU的依赖,进而降低了数据传输过程中的延时。
CPU占用率降低:RDMA的Kernel Bypass特性使得数据可以直接从内存绕过内核进行传输,这不仅大大减少了CPU在数据传输阶段的工作负担,而且提升了CPU资源的有效利用率,释放出更多计算资源用于Spark的核心计算任务。
端到端处理时间缩短:对比传统TCP传输方式,在多项性能测试中,采用RDMA的方案明显缩短了端到端的数据处理时间,这意味着整体数据处理流程更加高效,能够在更短的时间内完成相同规模的计算任务。
Shuffle阶段性能优化:在Apache Spark框架中,Shuffle阶段是一个关键且对性能影响较大的环节。借助RDMA减少数据传输和处理所需时间的优势,有效地优化了Shuffle阶段的性能表现,从而全面提升整个数据处理流程的效率。
大规模数据处理能力增强:对于处理大规模数据集的场景,RDMA所提供的高效数据传输及低延迟特性尤为重要。它确保Spark能在更高效地处理大量数据的同时,提高了大规模数据处理任务的可扩展性和处理效率。
值得注意的是,在多种实际应用场景下,使用RDMA通常能够带来大约10%左右的性能提升效果。然而,具体的加速效果会受到业务逻辑复杂性、数据处理工作负载特性的综合影响,因此可能有所波动。
四、加速效果
4.1端到端整体加速效果
在严格的单机单线程本地(local)模式测试环境下,未使用RDMA技术,针对1TB规模的数据集,通过对比分析TPC-DS基准测试SQL语句执行时间,其中有5条语句的E2E(由driver端提交任务到driver接收输出结果的时间)执行时间得到了显著提升,提升比例均超过2倍,最高可达到4.56倍提升。
4.2运算符加速效果
进一步聚焦于运算符层面的性能改进,在对DPU加速方案与Spark原生运算符进行比较时,观察到运算符执行效率的最大提升比率达到9.97倍。这一显著的加速效果主要源于DPU硬件层面的优化设计和高效运算能力。
4.3算子加速效果
在遵循TPC-DS基准测试标准的前提下,相较于未经优化的原生Spark解决方案,本方案在关键算子性能方面实现了显著提升。根据测试数据表明,Filter算子的执行效率提升了43倍,而哈希聚合算子的处理速率也提高了13倍之多。这一显著性能飞跃的取得,主要归因于方案深入挖掘并有效利用了DPU所具备的强大计算能力和并行处理特性,从而大幅缩短了相关算子的执行时间,并提升了整个系统的运算效能。通过这种优化措施,不仅确保了在复杂查询和大数据处理任务中更高的响应速度,同时也验证了结合现代硬件技术对Spark性能进行深度优化的有效性和可行性。
4.4 RDMA加速效果
在实际应用环境中,RDMA(Remote Direct Memory Access)技术展现出显著的性能提升效果。通过直接访问远程内存而无需CPU过多介入,RDMA能够极大地减少数据传输过程中的延迟和CPU占用率,在多个不同场景中实现至少10%以上的性能增长。这一优势体现在网络密集型操作中尤为明显,如大规模分布式系统间的通信与数据交换。
4.5 压缩解压缩加速效果
基于目前HADOS-RACE已经实现的Snappy压缩解压缩方案,制定了对应的性能测试计划。首先生成snappy测试数据,使用基于CPU和DPU的Spark分别对数据进行处理,记录各自的Snappy压缩解压缩阶段和Spark整体端到端的耗时和吞吐。执行的测试语句为:select * from table where a1 is not null and a2 is not null(尽量减少中间的计算过程,突出Snappy压缩解压缩的过程)。
单独分析Snappy压缩解压缩阶段,基于CPU的Snappy解压缩,吞吐量为300MB/s。而将解压缩任务卸载到DPU后,DPU核内计算的吞吐量可达到1585MB/s。可以看到,基于DPU进行Snappy解压缩,相比基于CPU进行Snappy解压缩,性能可提升约5倍。
基于CPU的Spark计算过程总体比基于DPU的Spark计算过程耗时减少了约50%。相当于基于DPU的端到端执行性能是基于CPU端到端性能的两倍。详细测试结果如下所示:
五、未来规划
5.1 现有优势
性能方面,得益于DPU做算力卸载的高效性,相对于社区版本Spark具备较为明显的优势,尤其是单机场景下,该场景下由于更偏重于纯算力,优势更加明显。
资源方面,得益于更优秀的数据结构设计,在内存、IO和网络资源使用上,都具备不同程度的优势,特别是内存资源上,较社区版本Spark优势明显。
5.2 现有不足
· 集群场景下性能提升较单机场景减弱,网络传输的性能损耗削弱了整体性能提升能力。
· 功能覆盖上,目前主要围绕TPC-DS场景以及一些客户提出的业务场景,未来还需要覆盖更多的业务场景。
5.3 未来规划
· 优化和完善现有架构,继续完善基础功能覆盖。
· 未来计划在加速纯计算场景的同时,也同步引入更多维度的加速方案(如存储加速、网络加速),提升集群模式下的加速性能。
· 除了加速Spark,也同步探索更多的加速场景,如实时大数据、AI等算法场景加速。