java:组合多线程/单线程任务队列
我喜欢ExecutorService系列的类/接口。我不必担心线程;我收集一个ExecutorService
实例并用它来安排任务,如果我想要使用8线程或16线程池,那么非常好,我根本不必担心这个问题,它只是依赖于如何设置ExecutorService。欢呼!java:组合多线程/单线程任务队列
但是,如果我的一些任务需要按连续顺序执行,我该怎么办?理想情况下,我会要求ExecutorService让我在单个线程上安排这些任务,但似乎没有任何方法这样做。
编辑:的任务不会提前知道的,他们是一个无限系列是不稳定的各种事件(认为随机/未知的到来过程中产生的任务:如盖革计数器的点击,或击键事件)。
你可以编写一个Runnable
的实现,它接受一些任务并连续执行它们。
喜欢的东西:
public class SerialRunner implements Runnable {
private List<Runnable> tasks;
public SerialRunner(List<Runnable> tasks) {
this.tasks = tasks;
}
public void run() {
for (Runnable task: tasks) {
task.run();
}
}
}
这不起作用,这些任务是一系列无限制的任务,一次排队。 – 2010-01-28 19:45:44
在这种情况下,如果它们的顺序对于执行的其他任务并不重要,请使用'Executors.newSingleThreadExecutor()' – danben 2010-01-28 19:52:19
+1:这个答案不是我想要的,但它给了我一个想法...... – 2010-01-28 20:01:27
我使用的是Executors.newSingleThreadExecutor()的,我要排队,只运行一次一个任务,创建一个独立的执行者。 另一种方法是刚刚组成几个任务,并提交一个,
executor.submit(new Runnable() {
public void run() {
myTask1.call();
myTask2.call();
myTask3.call();
}});
虽然你可能需要更复杂的,如果仍想myTask2
运行即使myTask1
抛出异常。
使用一个单独的执行程序可以工作,但如果可能的话,这是一个我想避免的限制。 – 2010-01-28 19:45:02
我这样做的方式是通过一些本地代码,根据任务所说的关键是什么(这可以是完全任意的或有意义的值)将流式传输到不同的线程上。除了提供给Queue
并让其他线程关闭它(或者在您的案例中使用ExecutorService
进行工作并使该服务维护一个线程池以启动内部工作队列),您可以提供Pipelineable
(又名一个任务)到PipelineManager
,该任务为该任务的密钥找到正确的队列并将任务粘贴到该队列中。还有其他一些代码管理线程,这些代码用于管理排队队列,以确保始终有1个线程取下队列,以确保为同一个键提供的所有工作将以串行方式执行。
使用这种方法,您可以轻松地为n组串行工作留出某些密钥,同时对剩余的密钥进行循环移动以便按照任何旧顺序进行工作,或者可以通过审慎保留某些管道(线程)关键选择。
这种做法是不是对JDK ExecutorService
实施可行的,因为它们是由一个单一的BlockingQueue
(至少ThreadPoolExecutor
的)支持,因此没有办法说“做这个工作在任何旧秩序,但这项工作必须序列化”。我假设你想要这样做以保持吞吐量,否则按照Danben的评论将所有内容粘贴到singleThreadExecutor
。
(编辑)
你能做什么,而不是,要保持相同的抽象,是创建创建自己的实现的ExecutorService
与会代表的多个实例ThreadPoolExecutor
(或相似)你需要的; 1支持n个线程和1个或多个单线程实例。类似下面(这绝不是在所有的工作代码,但希望你的想法!)
public class PipeliningExecutorService<T extends Pipelineable> implements ExecutorService {
private Map<Key, ExecutorService> executors;
private ExecutorService generalPurposeExecutor;
// ExecutorService methods here, for example
@Override
public <T> Future<T> submit(Callable<T> task) {
Pipelineable pipelineableTask = convertTaskToPipelineable(task);
Key taskKey = pipelineable.getKey();
ExecutorService delegatedService = executors.get(taskKey);
if (delegatedService == null) delegatedService = generalPurposeExecutor;
return delegatedService.submit(task);
}
}
public interface Pipelineable<K,V> {
K getKey();
V getValue();
}
这是非常丑陋的,为了这个目的,该ExecutorService
方法是通用的,而不是服务本身,这意味着你需要一些标准的方式来编组传递给Pipelineable和后备的任何传递,如果你不能(例如把它扔到通用池中)。
嗯,我想到了一些东西,不太确定这是否可行,但也许它会(未经测试的代码)。这略过了微妙之处(异常处理,取消,对底层执行者的其他任务的公平性等),但也许是有用的。
class SequentialExecutorWrapper implements Runnable
{
final private ExecutorService executor;
// queue of tasks to execute in sequence
final private Queue<Runnable> taskQueue = new ConcurrentLinkedQueue<Runnable>();
// semaphore for pop() access to the task list
final private AtomicBoolean taskInProcess = new AtomicBoolean(false);
public void submit(Runnable task)
{
// add task to the queue, try to run it now
taskQueue.offer(task);
if (!tryToRunNow())
{
// this object is running tasks on another thread
// do we need to try again or will the currently-running thread
// handle it? (depends on ordering between taskQueue.offer()
// and the tryToRunNow(), not sure if there is a problem)
}
}
public void run()
{
tryToRunNow();
}
private boolean tryToRunNow()
{
if (taskInProcess.compareAndSet(false, true))
{
// yay! I own the task queue!
try {
Runnable task = taskQueue.poll();
while (task != null)
{
task.run();
task = taskQueue.poll();
}
}
finally
{
taskInProcess.set(false);
}
return true;
}
else
{
return false;
}
}
出于单线程事件的目的,我有一个单独的单线程执行程序。这意味着我有可能再使用一条线索,但我不相信这种影响是如此之高。 – 2010-01-28 21:01:40
可能的重复:http://stackoverflow.com/questions/2153663/controlling-task-execution-order-with-executorservice – finnw 2010-01-28 22:50:00
@finnw,你有一个点。不太确定它是否相同(我试图限制自己使用一个可能是多线程的ExecutorService)。 – 2010-01-28 23:06:15