并行处理大文本文件

集算器可以方便地用并行方式处理大文本文件,下面通过一个例子来说明使用方法。

假设有个一千万条销售记录的文本文件sales.txt,其主要字段是SellerID(销售员)、OrderDate(订单日期)、Amount(订单金额),请计算每个销售员在近四年里的大订单总金额。其中,金额在2000以上的属于大订单。

要进行并行处理,首先要能对文件进行分段,集算器提供了游标数据对象cursor及其函数,可以方便地分段读取大文本文件。比如file(“e:/sales.txt”)[email protected](;, 3:24),这表示将文件按字节数大致均分为24段,然后读取其中第3段。简单的按字节拆分文件时会产生半行数据即半条记录的情况,还需要再编程处理才行,而如果按行拆分则需要遍历前面所有的数据行,完全达不到采用分段并行方案期望的高性能。集算器在拆分文件时自动进行了去头补尾的工作,保证数据的正确性。

分段后只要进行简单的并行处理就可以了,代码如下:

 

主程序


并行处理大文本文件
 

         A1:并行任务数设为24,即将文件分为24段。

         A2:进行多线程并行计算,任务是to(A1),其是[1,2,3…24],这表示每个任务分配到的段数。所有任务都结束后,计算结果会统一存储在本单元格。B2-B5是线程内代码。

B2:用游标读取文件,按照主线程传来的参数决定当前任务应该处理文件中的第几段。

B3:选出时间是2011年之后的,订单金额在2000以上的记录。

B4:对本段数据分组汇总。

B5:将本线程的计算结果返回主线程单元格,即A2

 

         A6:对A2中各任务的计算结果归并,部分数据如下:


并行处理大文本文件
 

         A7:对归并结果再进行分组汇总,求得每个销售员的销售额,如下:


并行处理大文本文件
 

代码说明

         对于NCPU,似乎设置为N个任务更加自然,但事实上执行任务时时总会有快有慢(比如过滤出的数据不同),因此常会遇到这种情况:其他核心已经完成了较快的任务,正在空闲等待,而个别核心还在执行较慢的任务。相反,如果每个核心依次执行多个任务,则快慢任务会趋于平均,整体运行会更加稳定。所以上述例子是将任务分为24份,并分给CPU8个核心进行处理(同时最多允许并行多少个线程可以在集算器的环境中配置)。当然,任务分得太多也会带来坏处,首先是整体性能会下降,其次是各任务产生的计算结果加起来会更大,会占用更多的内存。

         fork将复杂的多线程计算封装了起来,程序员可以专心于业务算法,而不必纠结于复杂的信号量控制,开发过程变简单了。

         主程序中A6的计算结果已经按照SellerId自动排序,因此A7的分组汇总不必再排序,groups的函数选项@o可以实现免排序的高效分组汇总。

扩展:

 

         有时候文本文件的数据量会达到几个TB,此时就需要使用基于集群的多节点并行计算。集算器游标及其相关函数支持廉价横向扩展和分布式文件系统,可以轻松实现并行计算。代码形如:=callx("sub.dfx", to(A1), A1; ["192.168.1.200:8281","192.168.1.201:8281",”......”]),具体用法参考相关文档。