如何在未等待的情况下轮询未来状态?
我正在尝试使用Future.rs来管理一个单独的过程中的一些任务。我看到了如何等待每个创建的未来以及如何一个接一个地处理它们,但我无法在执行过程中调查未来以了解其状态。我总是有错误:如何在未等待的情况下轮询未来状态?
thread 'main' panicked at 'no Task is currently running'
我想在将来的处理过程中做某件事,直到它完成。也许我没有正确地使用它?我设法通过使用频道使其工作,但我认为应该有可能轮询未来,以及何时可以得到结果。 我用它来测试它的代码是:
fn main() {
println!("test future");
let thread_pool = CpuPool::new(4);
let mut future_execution_list = vec![];
let mutex = Arc::new(AtomicUsize::new(0));
//create the future to process
for _ in 0..10 {
let send_mutex = mutex.clone();
let future = thread_pool.spawn_fn(move || {
//Simulate long processing
thread::sleep(time::Duration::from_millis(10));
let num = send_mutex.load(Ordering::Relaxed);
send_mutex.store(num + 1, Ordering::Relaxed);
let res: Result<usize,()> = Ok(num);
res
});
future_execution_list.push(future);
}
// do the job
loop {
for future in &mut future_execution_list {
match future.poll() {
Ok(Async::NotReady) =>(), //do nothing
Ok(Async::Ready(num)) => {
//update task status
println!(" future {:?}", num);
}
Err(_) => {
//log error and set task status to err
()
}
};
}
//do something else
}
}
所以我完成Shepmaster答案后,我的问题。您的评论非常有趣,但我仍无法找到解决我的问题的方案。我会添加一些关于我的问题的信息。我想在自动化计划任务上同时管理多个任务。有一个循环,其中管理事件并计算任务计划。当一个任务被安排时,它就产生了。当任务结束时,新的调度完成。在任务执行期间,管理事件。一个speudo代码可以是:
loop {
event.try_recv() { ...} //manage user command for exemple
if (schedule) {
let tasks_to_spawn = schedule_task();
let futures = tasks_to_spawn.map(|task| {
thread_pool.spawn_fn(....)});
let mut one = future::select_all(futures);
while let Ok((value, _idx, remaining)) = one.wait() {..} //wait here
}
//depend on task end state and event set schedule to true or false.
}
我可以联合调度,并像在未来的任务:
let future = schedule.and_them(|task| execute_task);
但我仍然需要等待第一任务执行结束。 我可以把所有事情放在未来(事件管理,时间表,任务),并等待第一个像你提议的结束。我尝试了,但是我没有看到如何用不同的Item和Error类型来制作未来的vec。有了这个概念,我必须在线程之间管理更多的数据。事件管理和调度不必在不同的线程中执行。
我看到另一个问题,select_all采取vec的所有权。如果在执行另一个任务期间需要新的任务,我该如何改变vec并添加新的未来?
不知道你是否有一个简单的解决方案。我在想,使用isDone()等方法在执行过程中未来的状态很容易,而不用等待。也许是有计划的,我没有看到有关这方面的公关。 如果你有一个简单的解决方案,它会很好,否则我会重新思考我的概念。
要投票Future
您必须有Task
。要获得Task
,您可以轮询Future
传递给futures::executor::spawn()
。如果你重写你的例子的loop
像这样:
futures::executor::spawn(futures::lazy(|| {
// existing loop goes here
})).wait_future();
它运行。
至于为什么 a Future
只能在任务中进行轮询,我相信这样可以让轮询可以拨打Task::Unpark
。
I want to do something during the future processing
据我了解,这就是期货是 - 东西,可以并行发生。如果你想做其他事情,那么再创造一个未来并投入其中!
你基本上已经在做这个 - 你的每个线程都是“做别的事情”。
poll the future and when it's ready get the result
使用future::select_all
,您可以合并多个期货并获得取其完成第一。然后由你决定等待下一个。
一种可能实现:
extern crate rand;
extern crate futures;
extern crate futures_cpupool;
use rand::Rng;
use futures::{future, Future};
use futures_cpupool::CpuPool;
use std::{thread, time};
fn main() {
let thread_pool = CpuPool::new(4);
let futures = (0..10).map(|i| {
thread_pool.spawn_fn(move || -> Result<usize,()> {
let mut rng = rand::thread_rng();
// Simulate long processing
let sleep_time = rng.gen_range(10, 100);
let sleep_time = time::Duration::from_millis(sleep_time);
for _ in 0..10 {
println!("Thread {} sleeping", i);
thread::sleep(sleep_time);
}
Ok(i)
})
});
let mut one = future::select_all(futures);
while let Ok((value, _idx, remaining)) = one.wait() {
println!("Future #{} finished", value);
if remaining.is_empty() {
break;
}
one = future::select_all(remaining);
}
}
在通话过程中,以wait
,多事情发生的同时!这可以通过对交织的输出中可以看出:
Thread 2 sleeping
Thread 0 sleeping
Thread 3 sleeping
Thread 1 sleeping
Thread 3 sleeping
Thread 0 sleeping
Thread 1 sleeping
Thread 2 sleeping
Thread 3 sleeping
可以验证的东西被并行由睡眠时间设定为1秒为每个线程和定时的整体方案发生。由于有10个期货,需要1秒,并行度为4,整个程序需要3秒才能运行。
红利代码审查:
- 不拆装载并设置一个原子变量来实现递增 - 存储的值可能已经由两个电话之间另一个线程改变。使用
fetch_add
。 - 您真的在使用它们之前应该知道what the orderings mean。我不知道他们,所以我总是使用
SeqCst
。 - 由于这个例子并不重要,我完全删除了原子变量。
- 总是喜欢收集到
Vec
而不是在循环内推入。这允许更优化的分配。 - 在这种情况下,根本不需要
Vec
,因为select_all
接受任何可以变成迭代器的东西。
有一个99.9%的机会,你做**不**要以这种方式使用原子变量。相反,你希望'fetch_add'和绝大多数人不需要'Relaxed'排序。 – Shepmaster
您的权利,我只是复制/粘贴一些代码,以表明我想从未来的执行中得到一个结果,这取决于其他期货的执行情况。 –
这是[*非常糟糕的形式*在您收到答案后更改您的问题**,特别是当更改使这些答案无效时](http://meta.stackoverflow.com/q/309237/155423)。从一开始就提出一个包含任何相关细节的好问题是提问者的问题。 – Shepmaster