使用Java线程池,如何根据消息特性并行处理一些消息和其他消息?

问题描述:

这是更多的Java并发设计问题。我正在研究需要为许多不同客户端处理许多消息的应用程序。如果两条消息具有不同的客户端名称,则可以并行处理它们。但是,如果它们具有相同的客户端名称,则需要按顺序进行处理。使用Java线程池,如何根据消息特性并行处理一些消息和其他消息?

实现此目的的最佳方式是什么?

我目前的实现非常简单:我写了一个名为OrderedExecutorPool的包装类。它有一个单线程执行程序的列表。在其提交的方法,它下面的找出将任务提交到执行:

int executorNum = Math.abs(clientName.hashCode()) % numExecutors; 
executorList.get(executorNum).submit(task); 

这保证了与同一客户的所有消息去同一个执行者,同时还支持并行方式为不同的客户端处理的消息。

有几个与此设计问题:

1)如果大多数客户的名字有相同的散列码,那么只有少数执行者正在做的工作

2)如果一个客户有很多消息,只有一个执行者可能无法跟上

有没有一个优雅的解决方案可以解决上述缺点的问题?

编辑 clientName只是一个字符串。我只是调用它的String.hashCode()方法。

+0

你是否用合理的方法重载hashCode? 它确保您始终会为唯一字符串返回唯一值吗? 你确定你总是少于2^32个唯一的客户端名称(因此是hashcode)吗? 你确定你现在的hashCode实现是否是无碰撞的? 如果你对任何这些问题回答“否”......你犯了一个错误,你的应用程序会崩溃/在某些时候产生错误的结果。 – specializt 2014-09-12 16:46:25

+0

“如果一个客户端有很多消息,只有一个执行者可能无法跟上”,因为您需要单个客户端进行串行处理,唯一的选择是少做一些工作,或者使其效率更高。 – 2014-09-12 16:48:46

+0

注意:Math.abs(Integer.MIN_VALUE) 2014-09-12 16:51:20

我不知道jdk内建解决方案。我已经使用这个基本逻辑在我目前的工作中实现了一个自定义执行器解决方案。

  • 保持CLIENTNAME的内部地图的工作队列(每个客户都有自己的队列)
  • 工作进来的客户端时,将其添加到他们的队列
    • ,如果这是在第一份工作队列中,创建一个Runnable此客户端名/队列,并将其推入“真正的”执行者(标准的JDK的线程池)
  • Runnable接口IMPL只是消耗的任务从单一的客户端队列中,直到空,然后退出

这个简单的实现是“贪婪”的方法(客户端将继续工作,直到其队列为空)。如果您的客户端数量多于底层线程,则您可能需要更“公平”的方法,即客户端执行一些任务并在底层执行程序中重新排队(从而允许其他客户端完成一些工作)。