Perl - 并行编程 - 运行两个外部程序
问题描述:
我有一个perl脚本,它运行两个外部程序,一个依赖于另一个,用于一系列数据集。目前,我只为每个数据集执行一次,通过第一个程序运行它,使用qx收集结果,并使用这些结果运行第二个程序。使用第二个程序的结果将数据添加到输出文件,每个数据集一个文件。我创建了希望抓住我目前的做法简单重复的例子:Perl - 并行编程 - 运行两个外部程序
#!/usr/bin/perl
#
# stackoverflow_q_7-7-2016.pl
use warnings;
use strict;
my @queries_list = (2, 4, 3, 1);
foreach my $query (@queries_list) {
#Command meant to simulate the first, shorter process, and return a list of results for the next process
my $cmd_1 = "sleep " . $query . "s; shuf -i 4-8 -n 3";
print "Running program_1 on query $query...\n";
my @results = qx($cmd_1);
foreach (@results) {
chomp $_;
#Command meant to simulate a longer process whose input depends on program_1; the output I write to a separate file for each query
my $cmd_2 = "sleep " . $_ . "s; fortune -s | head -c " . $_ * 5 . " >> $query.output";
print "\tRunning program_2 on query $query with input param $_...\n";
system($cmd_2); }
}
由于第一个程序通常完成比第二个快,我认为这可能是通过持续运行新来加快这一整个交易通过program_1同时查询program_2也在以前的查询上运行。加速完成会很棒,因为目前需要花费很多小时的时间来完成。但是,我不知道如何去做这件事。像Parallel :: ForkManager会有解决方案吗?或在Perl中使用线程?
现在在我的实际代码中,我做了一些错误处理,并为program_2设置了一个超时时间 - 我使用fork,exec和$ SIG {ALRM}来做到这一点,但我并不真正知道我在做什么那些。重要的是我仍然有能力做到这一点,否则program_2可能会卡住或没有充分报告为什么失败。以下是错误处理代码的样子。我不认为它在可重复的例子中应该如此,但至少你会希望看到我想要做的事情。这里有错误处理:
#!/usr/bin/perl
#
# stackoverflow_q_7-7-2016.pl
use warnings;
use strict;
my @queries_list = (2, 4, 3, 1);
foreach my $query (@queries_list) {
#Command meant to simulate the first, shorter process, and return a list of results for the next process
my $cmd_1 = "sleep " . $query . "s; shuf -i 4-15 -n 3";
print "Running program_1 on query $query...\n";
my @results = qx($cmd_1);
foreach (@results) {
chomp $_;
#Command meant to simulate a longer process whose input depends on program_1; the output I write to a separate file for each query
my $cmd_2 = "sleep " . $_ . "s; fortune -s | head -c " . $_ * 3 . " >> $query.output";
print "\tRunning program_2 on query $query with input param $_...\n";
my $childPid;
eval {
local $SIG{ALRM} = sub { die "Timed out" };
alarm 10;
if ($childPid = fork()) {
wait();
} else {
exec($cmd_2);
}
alarm 0;
};
if ($? != 0) {
my $exitCode = $? >> 8;
print "Program_2 exited with error code $exitCode. Retry...\n";
}
if ([email protected] =~ /Timed out/) {
print "\tProgram_2 timed out. Skipping...\n";
kill 2, $childPid;
wait;
};
}
}
所有帮助表示赞赏。
答
一个解决方案:
use threads;
use Thread::Queue; # 3.01+
sub job1 { ... }
sub job2 { ... }
{
my $job1_request_queue = Thread::Queue->new();
my $job2_request_queue = Thread::Queue->new();
my $job1_thread = async {
while (my $job = $job1_request_queue->dequeue()) {
my $result = job1($job);
$job2_request_queue->enqueue($result);
}
$job2_request_queue->end();
};
my $job2_thread = async {
while (my $job = $job2_request_queue->dequeue()) {
job2($job);
}
};
$job1_request_queue->enqueue($_) for ...;
$job1_request_queue->end();
$_->join() for $job1_thread, $job2_thread;
}
你甚至可以有任意多个工作/这两种类型。
use threads;
use Thread::Queue; # 3.01+
use constant NUM_JOB1_WORKERS => 1;
use constant NUM_JOB2_WORKERS => 3;
sub job1 { ... }
sub job2 { ... }
{
my $job1_request_queue = Thread::Queue->new();
my $job2_request_queue = Thread::Queue->new();
my @job1_threads;
for (1..NUM_JOB1_WORKERS) {
push @job1_threads, async {
while (my $job = $job1_request_queue->dequeue()) {
my $result = job1($job);
$job2_request_queue->enqueue($result);
}
};
}
my @job2_threads;
for (1..NUM_JOB2_WORKERS) {
push @job2_threads, async {
while (my $job = $job2_request_queue->dequeue()) {
job2($job);
}
};
}
$job1_request_queue->enqueue($_) for ...;
$job1_request_queue->end();
$_->join() for @job1_threads;
$job2_request_queue->end();
$_->join() for @job2_threads;
}
使用IPC::Run,而不是qx
添加超时。不需要信号。
嗨池上,谢谢你的帮助。你能解释一下线程的结束和连接吗?当我尝试使用多工人方法时,我得到了“Perl退出时出现活动线程”的错误,大部分运行和未加入,并且有一些已完成并未加入。我可以在下面发布我的最新代码作为答案。 – Tsaari
这告诉工人以前没有更多的工作,然后等待他们完成。否则,程序会过早结束。 – ikegami
修复了我的代码中的一个错误。 ('@ job1_threads'和'@ job2_threads'没有填充) – ikegami