同时阅读一个地图,而单个后台线程定期修改它
我有一个类,我在updateLiveSockets()
方法内每隔30秒从单个后台线程填充地图liveSocketsByDatacenter
,然后我有一个方法getNextSocket()
将被调用多个阅读器线程来获取可用的活动套接字,它使用相同的地图来获取此信息。同时阅读一个地图,而单个后台线程定期修改它
public class SocketManager {
private static final Random random = new Random();
private final ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor();
private final AtomicReference<Map<Datacenters, List<SocketHolder>>> liveSocketsByDatacenter =
new AtomicReference<>(Collections.unmodifiableMap(new HashMap<>()));
private final ZContext ctx = new ZContext();
// Lazy Loaded Singleton Pattern
private static class Holder {
private static final SocketManager instance = new SocketManager();
}
public static SocketManager getInstance() {
return Holder.instance;
}
private SocketManager() {
connectToZMQSockets();
scheduler.scheduleAtFixedRate(new Runnable() {
public void run() {
updateLiveSockets();
}
}, 30, 30, TimeUnit.SECONDS);
}
// during startup, making a connection and populate once
private void connectToZMQSockets() {
Map<Datacenters, ImmutableList<String>> socketsByDatacenter = Utils.SERVERS;
// The map in which I put all the live sockets
Map<Datacenters, List<SocketHolder>> updatedLiveSocketsByDatacenter = new HashMap<>();
for (Map.Entry<Datacenters, ImmutableList<String>> entry : socketsByDatacenter.entrySet()) {
List<SocketHolder> addedColoSockets = connect(entry.getKey(), entry.getValue(), ZMQ.PUSH);
updatedLiveSocketsByDatacenter.put(entry.getKey(),
Collections.unmodifiableList(addedColoSockets));
}
// Update the map content
this.liveSocketsByDatacenter.set(Collections.unmodifiableMap(updatedLiveSocketsByDatacenter));
}
private List<SocketHolder> connect(Datacenters colo, List<String> addresses, int socketType) {
List<SocketHolder> socketList = new ArrayList<>();
for (String address : addresses) {
try {
Socket client = ctx.createSocket(socketType);
// Set random identity to make tracing easier
String identity = String.format("%04X-%04X", random.nextInt(), random.nextInt());
client.setIdentity(identity.getBytes(ZMQ.CHARSET));
client.setTCPKeepAlive(1);
client.setSendTimeOut(7);
client.setLinger(0);
client.connect(address);
SocketHolder zmq = new SocketHolder(client, ctx, address, true);
socketList.add(zmq);
} catch (Exception ex) {
// log error
}
}
return socketList;
}
// this method will be called by multiple threads to get the next live socket
// is there any concurrency or thread safety issue or race condition here?
public Optional<SocketHolder> getNextSocket() {
// For the sake of consistency make sure to use the same map instance
// in the whole implementation of my method by getting my entries
// from the local variable instead of the member variable
Map<Datacenters, List<SocketHolder>> liveSocketsByDatacenter =
this.liveSocketsByDatacenter.get();
Optional<SocketHolder> liveSocket = Optional.absent();
List<Datacenters> dcs = Datacenters.getOrderedDatacenters();
for (Datacenters dc : dcs) {
liveSocket = getLiveSocket(liveSocketsByDatacenter.get(dc));
if (liveSocket.isPresent()) {
break;
}
}
return liveSocket;
}
// is there any concurrency or thread safety issue or race condition here?
private Optional<SocketHolder> getLiveSocketX(final List<SocketHolder> endpoints) {
if (!CollectionUtils.isEmpty(endpoints)) {
// The list of live sockets
List<SocketHolder> liveOnly = new ArrayList<>(endpoints.size());
for (SocketHolder obj : endpoints) {
if (obj.isLive()) {
liveOnly.add(obj);
}
}
if (!liveOnly.isEmpty()) {
// The list is not empty so we shuffle it an return the first element
Collections.shuffle(liveOnly);
return Optional.of(liveOnly.get(0));
}
}
return Optional.absent();
}
// Added the modifier synchronized to prevent concurrent modification
// it is needed because to build the new map we first need to get the
// old one so both must be done atomically to prevent concistency issues
private synchronized void updateLiveSockets() {
Map<Datacenters, ImmutableList<String>> socketsByDatacenter = Utils.SERVERS;
// Initialize my new map with the current map content
Map<Datacenters, List<SocketHolder>> liveSocketsByDatacenter =
new HashMap<>(this.liveSocketsByDatacenter.get());
for (Entry<Datacenters, ImmutableList<String>> entry : socketsByDatacenter.entrySet()) {
List<SocketHolder> liveSockets = liveSocketsByDatacenter.get(entry.getKey());
List<SocketHolder> liveUpdatedSockets = new ArrayList<>();
for (SocketHolder liveSocket : liveSockets) { // LINE A
Socket socket = liveSocket.getSocket();
String endpoint = liveSocket.getEndpoint();
Map<byte[], byte[]> holder = populateMap();
Message message = new Message(holder, Partition.COMMAND);
boolean status = SendToSocket.getInstance().execute(message.getAdd(), holder, socket);
boolean isLive = (status) ? true : false;
// is there any problem the way I am using `SocketHolder` class?
SocketHolder zmq = new SocketHolder(socket, liveSocket.getContext(), endpoint, isLive);
liveUpdatedSockets.add(zmq);
}
liveSocketsByDatacenter.put(entry.getKey(),
Collections.unmodifiableList(liveUpdatedSockets));
}
this.liveSocketsByDatacenter.set(Collections.unmodifiableMap(liveSocketsByDatacenter));
}
}
正如你可以在我的班级看到:
- 从一个运行每30秒一个后台线程,我填充
liveSocketsByDatacenter
地图与updateLiveSockets()
方法所有带电的插座。 - 然后从多个线程,我打电话
getNextSocket()
方法给我一个活插座可用,它使用liveSocketsByDatacenter
地图来获取所需的信息。
我有我的代码工作正常,没有任何问题,并希望看看是否有任何更好或更有效的方式来写这个。我也想得到关于线程安全问题或任何竞争条件的意见(如果有的话),但到目前为止我还没有看到,但我可能是错的。
我大多担心updateLiveSockets()
方法和getLiveSocketX()
方法。我在LINE A迭代liveSockets
这是List
的SocketHolder
,然后制作一个新的SocketHolder
对象并添加到另一个新列表中。这里好吗?
注意:SocketHolder
是一个不可变的类。你可以忽略我拥有的东西ZeroMQ
。
您使用以下同步技术。
- 带有活套接字数据的地图位于原子参考之后,这允许安全地切换地图。
-
updateLiveSockets()
方法是同步的(隐式地在此上),这将阻止同时由两个线程切换地图。 - 如果在
getNextSocket()
方法中发生切换,您在使用它时避免混淆,从而制作本地地图。
它是否像现在一样是线程安全的?
线程安全总是取决于共享可变数据是否有适当的同步。在这种情况下,共享可变数据是数据中心到其SocketHolders列表的映射。
地图位于AtomicReference
以及制作本地副本以供使用的事实在地图上具有足够的同步性。由于AtomicReference
的性质,您的方法采用地图版本并使用该版本,切换版本是线程安全的。这也可以通过仅为地图volatile
创建成员字段来实现,因为您所做的只是更新引用(您不对其执行任何检查 - 然后动作操作)。
由于scheduleAtFixedRate()
保证了通过Runnable
将不能同时与自身运行,在updateLiveSockets()
的是不需要的,但是,它也不会产生什么负面影响。
所以,这个类是线程安全的,因为它是。
但是,并不完全清楚SocketHolder
是否可以被多个线程同时使用。实际上,这个类只是试图通过选择一个随机的活动来尽量减少并发使用SocketHolder
s(不需要随机选择一个随机索引来拖动整个数组)。它没有什么实际阻止并发使用。
它可以提高效率吗?
我相信可以。在查看updateLiveSockets()
方法时,它似乎构建了完全相同的地图,但SocketHolder
可能具有与isLive
标志不同的值。这使我得出结论,我不想切换整个地图,而只想切换地图中的每个列表。并且为了以线程安全的方式更改地图中的条目,我可以使用ConcurrentHashMap
。
如果我使用一个ConcurrentHashMap
,并且不切换地图,而是在地图上的值,我可以摆脱AtomicReference
。
要改变映射,我可以建立新列表并将其直接放入地图。这样更高效,因为我可以更快地发布数据,并且创建更少的对象,而我的同步只是建立在现成的组件上,这有利于可读性。
这里是我的版本(省略了不太相关的一些地方,为了简洁)
public class SocketManager {
private static final Random random = new Random();
private final ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor();
private final Map<Datacenters, List<SocketHolder>> liveSocketsByDatacenter = new ConcurrentHashMap<>(); // use ConcurrentHashMap
private final ZContext ctx = new ZContext();
// ...
private SocketManager() {
connectToZMQSockets();
scheduler.scheduleAtFixedRate(this::updateLiveSockets, 30, 30, TimeUnit.SECONDS);
}
// during startup, making a connection and populate once
private void connectToZMQSockets() {
Map<Datacenters, List<String>> socketsByDatacenter = Utils.SERVERS;
for (Map.Entry<Datacenters, List<String>> entry : socketsByDatacenter.entrySet()) {
List<SocketHolder> addedColoSockets = connect(entry.getValue(), ZMQ.PUSH);
liveSocketsByDatacenter.put(entry.getKey(), addedColoSockets); // we can put it straight into the map
}
}
// ...
// this method will be called by multiple threads to get the next live socket
// is there any concurrency or thread safety issue or race condition here?
public Optional<SocketHolder> getNextSocket() {
for (Datacenters dc : Datacenters.getOrderedDatacenters()) {
Optional<SocketHolder> liveSocket = getLiveSocket(liveSocketsByDatacenter.get(dc)); // no more need for a local copy, ConcurrentHashMap, makes sure I get the latest mapped List<SocketHolder>
if (liveSocket.isPresent()) {
return liveSocket;
}
}
return Optional.absent();
}
// is there any concurrency or thread safety issue or race condition here?
private Optional<SocketHolder> getLiveSocket(final List<SocketHolder> listOfEndPoints) {
if (!CollectionUtils.isEmpty(listOfEndPoints)) {
// The list of live sockets
List<SocketHolder> liveOnly = new ArrayList<>(listOfEndPoints.size());
for (SocketHolder obj : listOfEndPoints) {
if (obj.isLive()) {
liveOnly.add(obj);
}
}
if (!liveOnly.isEmpty()) {
// The list is not empty so we shuffle it an return the first element
return Optional.of(liveOnly.get(random.nextInt(liveOnly.size()))); // just pick one
}
}
return Optional.absent();
}
// no need to make this synchronized
private void updateLiveSockets() {
Map<Datacenters, List<String>> socketsByDatacenter = Utils.SERVERS;
for (Map.Entry<Datacenters, List<String>> entry : socketsByDatacenter.entrySet()) {
List<SocketHolder> liveSockets = liveSocketsByDatacenter.get(entry.getKey());
List<SocketHolder> liveUpdatedSockets = new ArrayList<>();
for (SocketHolder liveSocket : liveSockets) { // LINE A
Socket socket = liveSocket.getSocket();
String endpoint = liveSocket.getEndpoint();
Map<byte[], byte[]> holder = populateMap();
Message message = new Message(holder, Partition.COMMAND);
boolean status = SendToSocket.getInstance().execute(message.getAdd(), holder, socket);
boolean isLive = (status) ? true : false;
SocketHolder zmq = new SocketHolder(socket, liveSocket.getContext(), endpoint, isLive);
liveUpdatedSockets.add(zmq);
}
liveSocketsByDatacenter.put(entry.getKey(), Collections.unmodifiableList(liveUpdatedSockets)); // just put it straigth into the map, the mapping will be updated in a thread safe manner.
}
}
}
感谢您的详细解释。我一方面很困惑,一旦状态在'updateLiveSockets()'方法中改变,它是否能够保证所有读者线程都能从'getNextSocket()'方法看到套接字的状态? – john
是的,一旦'updateLiveSockets()'将新列表放入地图中,读者线程中的所有后续'get()'将会看到新列表 – bowmore
好吧,让我试试看看它是如何工作的。我得到了另一个[问题](https://stackoverflow.com/questions/47107331/how-to-make-sure-that-i-am-not-sharing-same-socket-between-two-threads-at-a - 相同)的基础上只有这一个,我试图确保我不会在两个线程之间共享相同的套接字。我已经得到了解决方案,但试图看到在这里工作还是还有更好的方法?也许我们可以使用ThreadLocal?但我还不确定。 – john
如果SocketHolder
和Datacenters,
是不可变的,你的程序看起来很好。不过,这里有一些小反馈。
1的AtomicReference的用法
AtomicReference<Map<Datacenters, List<SocketHolder>>> liveSocketsByDatacenter
此成员变量不需要被包裹在一个的AtomicReference。你没有对它进行任何原子CAS操作。你可以简单地声明一个volative Map<Datacenters, List<SocketHolder>>
,阅读时只需创建一个本地引用。这足以保证对新映射的引用进行原子交换。
2. Synchronized方法
private synchronized void updateLiveSockets()
该方法是从一个单独的线程执行调用,所以没有必要为它进行同步。
3。一些简化
从当前的这个类的用法,好像你可以过滤掉插座其不在
updateLiveSockets
活着,避免了每次来筛选客户端调用getNextSocket
-
您可以更换
Map<Datacenters, ImmutableList<String>> socketsByDatacenter = Utils.SERVERS
作者:Set<Datacenters> datacenters = Utils.SERVERS.keySet()
并使用钥匙工作。4. Java的8
如果可能的话,切换到Java 8流连同Java8的可选将消除大量的样板代码,使你的代码更易于阅读。
我正在通过你的答案,我想知道我怎么能实现你在'3.1'中提到的。如果我筛选出没有活动的套接字,那么我将如何在我的'updateLiveSockets'方法中重新遍历所有套接字?正如你在'updateLiveSockets'方法中看到的那样,我正在遍历所有的套接字并且对它们进行ping处理,以了解它们是否存在或不存在? – john
它看起来我就像'liveSocketsByDatacenter'是** **不可改变。这是一条使这一切变得更加简单的途径。 –
加上你的逻辑在很多地方都非常清晰。 “可选”的滥用使我的眼睛水。我会完全摆脱它 - 你不会在任何地方正确使用它。提示:调用'Optional.isPresent'总是一个坏主意。 –
我有几种方法可以出错。首先是每隔30秒由后台线程调用的'updateLiveSockets',其次是由多个读取器线程同时调用'getNextSocket'方法,这个方法在内部调用'getLiveSocket'方法,所以这三种方法在线程安全问题上都是正确的。你认为他们都做对了吗?我更害怕'updateLiveSockets'方法。 – john