Perl - 并行编程 - 运行两个外部程序

问题描述:

我有一个perl脚本,它运行两个外部程序,一个依赖于另一个,用于一系列数据集。目前,我只为每个数据集执行一次,通过第一个程序运行它,使用qx收集结果,并使用这些结果运行第二个程序。使用第二个程序的结果将数据添加到输出文件,每个数据集一个文件。我创建了希望抓住我目前的做法简单重复的例子:Perl - 并行编程 - 运行两个外部程序

#!/usr/bin/perl 
# 
# stackoverflow_q_7-7-2016.pl 

use warnings; 
use strict; 

my @queries_list = (2, 4, 3, 1); 

foreach my $query (@queries_list) { 
    #Command meant to simulate the first, shorter process, and return a list of results for the next process 
    my $cmd_1 = "sleep " . $query . "s; shuf -i 4-8 -n 3"; 
    print "Running program_1 on query $query...\n"; 
    my @results = qx($cmd_1); 

    foreach (@results) { 
     chomp $_; 
     #Command meant to simulate a longer process whose input depends on program_1; the output I write to a separate file for each query 
     my $cmd_2 = "sleep " . $_ . "s; fortune -s | head -c " . $_ * 5 . " >> $query.output"; 
     print "\tRunning program_2 on query $query with input param $_...\n"; 
     system($cmd_2);   } 
} 

由于第一个程序通常完成比第二个快,我认为这可能是通过持续运行新来加快这一整个交易通过program_1同时查询program_2也在以前的查询上运行。加速完成会很棒,因为目前需要花费很多小时的时间来完成。但是,我不知道如何去做这件事。像Parallel :: ForkManager会有解决方案吗?或在Perl中使用线程?

现在在我的实际代码中,我做了一些错误处理,并为program_2设置了一个超时时间 - 我使用fork,exec和$ SIG {ALRM}来做到这一点,但我并不真正知道我在做什么那些。重要的是我仍然有能力做到这一点,否则program_2可能会卡住或没有充分报告为什么失败。以下是错误处理代码的样子。我不认为它在可重复的例子中应该如此,但至少你会希望看到我想要做的事情。这里有错误处理:

#!/usr/bin/perl 
# 
# stackoverflow_q_7-7-2016.pl 

use warnings; 
use strict; 

my @queries_list = (2, 4, 3, 1); 

foreach my $query (@queries_list) { 
    #Command meant to simulate the first, shorter process, and return a list of results for the next process 
    my $cmd_1 = "sleep " . $query . "s; shuf -i 4-15 -n 3"; 
    print "Running program_1 on query $query...\n"; 
    my @results = qx($cmd_1); 

    foreach (@results) { 
     chomp $_; 
     #Command meant to simulate a longer process whose input depends on program_1; the output I write to a separate file for each query 
     my $cmd_2 = "sleep " . $_ . "s; fortune -s | head -c " . $_ * 3 . " >> $query.output"; 
     print "\tRunning program_2 on query $query with input param $_...\n"; 

     my $childPid; 
     eval { 
      local $SIG{ALRM} = sub { die "Timed out" }; 
      alarm 10; 
      if ($childPid = fork()) { 
       wait(); 
      } else { 
       exec($cmd_2); 
      } 
      alarm 0; 
     }; 
     if ($? != 0) { 
      my $exitCode = $? >> 8; 
      print "Program_2 exited with error code $exitCode. Retry...\n"; 
     } 
     if ([email protected] =~ /Timed out/) { 
      print "\tProgram_2 timed out. Skipping...\n"; 
      kill 2, $childPid; 
      wait; 
     }; 
    } 
} 

所有帮助表示赞赏。

一个解决方案:

use threads; 

use Thread::Queue; # 3.01+ 

sub job1 { ... } 
sub job2 { ... } 

{ 
    my $job1_request_queue = Thread::Queue->new(); 
    my $job2_request_queue = Thread::Queue->new(); 

    my $job1_thread = async { 
     while (my $job = $job1_request_queue->dequeue()) { 
     my $result = job1($job); 
     $job2_request_queue->enqueue($result); 
     } 

     $job2_request_queue->end(); 
    }; 

    my $job2_thread = async { 
     while (my $job = $job2_request_queue->dequeue()) { 
     job2($job); 
     } 
    }; 

    $job1_request_queue->enqueue($_) for ...; 

    $job1_request_queue->end();  
    $_->join() for $job1_thread, $job2_thread; 
} 

你甚至可以有任意多个工作/这两种类型。

use threads; 

use Thread::Queue; # 3.01+ 

use constant NUM_JOB1_WORKERS => 1; 
use constant NUM_JOB2_WORKERS => 3; 

sub job1 { ... } 
sub job2 { ... } 

{ 
    my $job1_request_queue = Thread::Queue->new(); 
    my $job2_request_queue = Thread::Queue->new(); 

    my @job1_threads; 
    for (1..NUM_JOB1_WORKERS) { 
     push @job1_threads, async { 
     while (my $job = $job1_request_queue->dequeue()) { 
      my $result = job1($job); 
      $job2_request_queue->enqueue($result); 
     } 
     }; 
    } 

    my @job2_threads; 
    for (1..NUM_JOB2_WORKERS) { 
     push @job2_threads, async { 
     while (my $job = $job2_request_queue->dequeue()) { 
      job2($job); 
     } 
     }; 
    } 

    $job1_request_queue->enqueue($_) for ...; 

    $job1_request_queue->end();  
    $_->join() for @job1_threads; 
    $job2_request_queue->end(); 
    $_->join() for @job2_threads; 
} 

使用IPC::Run,而不是qx添加超时。不需要信号。

+0

嗨池上,谢谢你的帮助。你能解释一下线程的结束和连接吗?当我尝试使用多工人方法时,我得到了“Perl退出时出现活动线程”的错误,大部分运行和未加入,并且有一些已完成并未加入。我可以在下面发布我的最新代码作为答案。 – Tsaari

+0

这告诉工人以前没有更多的工作,然后等待他们完成。否则,程序会过早结束。 – ikegami

+0

修复了我的代码中的一个错误。 ('@ job1_threads'和'@ job2_threads'没有填充) – ikegami