C++ - 如何分块文件进行同步/异步处理?
如何通过行数读取和分割/分块文件?C++ - 如何分块文件进行同步/异步处理?
我想分割一个文件到不同的缓冲区,同时确保一条线不会在两个或多个缓冲区之间分割。我打算将这些缓冲区传递到它们自己的pthread中,以便它们可以执行某种类型的同步/异步处理。
我已阅读reading and writing in chunks on linux using c下面的答案,但我不认为它完全回答了确保一条线不会分成两个或多个缓冲区的问题。
我会选择以字节为单位的块大小。然后,我会在文件中寻找适当的位置,并一次读取少量的字节,直到获得换行符。
第一个块的最后一个字符是换行符。第二个块的第一个字符是换行符之后的字符。
总是寻求一个pagesize()边界并一次读入pagesize()字节来搜索你的换行符。这样可以确保您只需从磁盘中获取所需的最低限额即可找到您的界限。您可以尝试一次读取128字节或其他内容。但是你可能会冒更多的系统调用的风险。
我写了一个示例程序,用于计算字母频率。当然,这在很大程度上毫无意义地分解为线程,因为它几乎可以肯定IO限制。换行符也不重要,因为它不是面向行的。但是,这只是一个例子。另外,它非常依赖于你有一个相当完整的C++ 11实现。
他们主要功能是这样的:
// Find the offset of the next newline given a particular desired offset.
off_t next_linestart(int fd, off_t start)
{
using ::std::size_t;
using ::ssize_t;
using ::pread;
const size_t bufsize = 4096;
char buf[bufsize];
for (bool found = false; !found;) {
const ssize_t result = pread(fd, buf, bufsize, start);
if (result < 0) {
throw ::std::system_error(errno, ::std::system_category(),
"Read failure trying to find newline.");
} else if (result == 0) {
// End of file
found = true;
} else {
const char * const nl_loc = ::std::find(buf, buf + result, '\n');
if (nl_loc != (buf + result)) {
start += ((nl_loc - buf) + 1);
found = true;
} else {
start += result;
}
}
}
return start;
}
还要注意,我用pread
。当你有多个线程从文件的不同部分读取时,这是绝对必要的。
文件描述符是您的线程之间的共享资源。当一个线程使用普通函数从文件读取时,它会改变关于这个共享资源(文件指针)的细节。文件指针是文件中下一次读取的位置。
在您每次阅读之前,只需使用lseek
就不会有帮助,因为它会在lseek
和read
之间引入争用条件。
pread
函数允许您从文件中的特定位置读取一堆字节。它也不会改变文件指针。除了不改变文件指针的事实外,它就像在同一个调用中合并lseek
和read
一样。
你怎么知道你什么时候换了换行符? – Ken
@Ken:扫描您为新行字符读取的缓冲区。记下缓冲区起始处的文件偏移量(你在哪里搜索到)和新行缓冲区内的偏移量。添加这两个,你得到一个接近你的块大小的换行符的文件偏移量。 – Omnifarious
@Ken - 我主要是给你写了一个使用我的技术的示例程序。我不会去努力确保文件访问是页面对齐的。如果你真的想尽可能快地做出事情,你应该确保所有的文件访问都是页面对齐的,如果可能的话,那些缓冲区也是页面对齐的。这允许OS通过简单地将缓冲区映射到进程的地址空间而不是复制来优化读取。 – Omnifarious
文件是如何编码的?如果每个字节表示一个字符,我会做到以下几点:
- 存储器映射使用
mmap()
文件。 - 通过根据适当的块大小对其进行计算来告诉他们大致的开始和结束。
- 通过查找下一个
'\n'
找到每个作业的实际开始和结束。 - 同时处理相应的块。
- 请注意,第一块需要特殊处理,因为它的开始不是近似的,而是确切的。
定义缓冲区的类。为每个页面分配一个大小为页面大小倍数的大型缓冲区空间和一个开始/结束索引,一种从传入流中读取缓冲区的方法以及另一个* buffer实例作为参数的'lineParse'方法。
制作一些*缓冲区并将它们存储在生产者 - 消费者池队列中。打开文件,从池中获取一个缓冲区,并从头到尾读入缓冲区空间(返回错误/ EOF的布尔值)。从池中获取另一个*缓冲区并将其传递到前一个的lineparse()。在那里,从数据的末尾向后搜索,寻找newLine。找到后,重新加载最后一行的末尾索引和memcpy片段(如果有的话 - 你可能偶尔会很幸运:),进入新的,传递的*缓冲区并设置其开始索引。第一个缓冲区现在有整行,可以排队等待处理这些行的线程。第二个缓冲区具有从第一个缓冲区复制而来的行的片段,并且更多数据可以从磁盘读取到其开始索引处的缓冲区空间中。
线处理线程可以将'used'*缓冲区回收到池中。
继续下去,直到EOF(或错误:)。
如果可以,请将方法添加到执行缓冲区处理的缓冲区类中。
使用大缓冲区类并从结尾解析将会比持续读取小比特,从一开始就寻找换行符更加高效。线程间通信速度慢,可以通过的缓冲区越大越好。
使用缓冲池消除了连续的新/删除并提供了流控制 - 如果磁盘读取线程比处理更快,则池将清空,并且磁盘读取线程将阻塞,直到某些使用的缓冲区被回收。这可以防止内存失控。
请注意,如果您使用多个处理线程,则缓冲区可能会“无序”处理 - 这可能或可能不重要。
通过确保与磁盘读取延迟并行处理的行的优点大于线程间通信的开销,您只能在这种情况下获得收益 - 在线程之间传递小缓冲区很可能是反向的,生产力。
整体速度较快,但延迟较大的网络磁盘可能会遇到最大的加速。
这些类型的方案从未按照预期工作。你有一个拥有多核的处理器,这是让多个线程高效工作的好方法。但是,你仍然只有一个磁盘,线程只是等待轮到它读取。 –
@HansPassant:如果OP知道任务是CPU绑定的,那么可能并非如此。但是,是的,你很可能是正确的。虽然'pbzip'和'xz'都在块级使用这种技术,效果很好。 – Omnifarious
我其实并没有想到这一点。自那时起,我就想到了解决我的宏观问题的更简洁的方法,而不涉及使用我提出的问题。尽管如此,我还是好奇的! – Ken