Java: get+clear atomic for map

问题: I would like to implement the following logic: -the following structure is to be used //Map<String, CopyOnWriteArrayList> keeping the pending updates //grouped b...

问题:

I would like to implement the following logic:

-the following structure is to be used

//Map<String, CopyOnWriteArrayList> keeping the pending updates 
//grouped by the id of the updated object
final Map<String, List<Update>> updatesPerId = new ConcurrentHashMap<>();

-n producers will add updates to updatesPerId map (for the same id, 2 updates can be added at the same time)

-one TimerThread will run from time to time and has to process the received updates. Something like:

 final Map<String, List<Update>> toBeProcessed = new HashMap<>(updatesPerId);
 updatesPerId.clear();
 // iterate over toBeProcessed and process them

Is there any way to make this logic thread safe without synchronizing the adding logic from producers and the logic from timerThread(consumer)? I am thinking about an atomic clear+get but it seems that ConcurrentMap does not provide something like that. Also, I have to mention that updates should be kept by updated object id so I cannot replace the map with a queue or something else.

Any ideas? Thanks!


回答1:

You can leverage the fact that ConcurrentHashMap.compute executes atomically.

You can put into the updatesPerId like so:

updatesPerId.compute(k, (k, list) -> {
  if (list == null) list = new ArrayList<>();
  // ... add to the list

  // Return a non-null list, so the key/value pair is stored in the map.
  return list;
});

This is not using computeIfAbsent then adding to the return value, which would not be atomic.

Then in your thread to remove things:

for (String key : updatesPerId.keySet()) {
  List<Update> list = updatesPerId.put(key, null);
  updatesPerId.compute(key, (k, list) -> {
    // ... Process the contents of the list.

    // Removes the key/value pair from the map.
    return null;
  });
}

So, adding a key to the list (or processing all the values for that key) might block if you so happen to try to process the key in both places at once; otherwise, it will not be blocked.


Edit: as pointed out by @StuartMarks, it might be better to simply get all things out of the map first, and then process them later, in order to avoid blocking other threads trying to add:

Map<String, List<Update>> newMap = new HashMap<>();
for (String key : updatesPerId.keySet()) {
  newMap.put(key, updatesPerId.remove(key));
}
// ... Process entries in newMap.

回答2:

I'd suggest using LinkedBlockingQueue instead of CopyOnWriteArrayList as the map value. With COWAL, adds get successively more expensive, so adding N elements results in N^2 performance. LBQ addition is O(1). Also, LBQ has drainTo which can be used effectively here. You could do this:

final Map<String, Queue<Update>> updatesPerId = new ConcurrentHashMap<>();

Producer:

updatesPerId.computeIfAbsent(id, LinkedBlockingQueue::new).add(update);

Consumer:

updatesPerId.forEach((id, queue) -> {
    List<Update> updates = new ArrayList<>();
    queue.drainTo(updates);
    processUpdates(id, updates);
});

This is somewhat different from what you had suggested. This technique processes the updates for each id, but lets producers continue to add updates to the map while this is going on. This leaves map entries and queues in the map for each id. If the ids end up getting reused a lot, the number of map entries will plateau at a high-water mark.

If new ids are continually coming in, and old ids becoming disused, the map will grow continually, which probably isn't what you want. If this is the case you could use the technique in Andy Turner's answer.

If the consumer really needs to snapshot and clear the entire map, I think you have to use locking, which you wanted to avoid.


回答3:

Is there any way to make this logic thread safe without synchronizing the adding logic from producers and the logic from timerThread(consumer)?

In short, no - depending on what you mean by "synchronizing".

The easiest way is to wrap your Map into a class of your own.

class UpdateManager {
    Map<String,List<Update>> updates = new HashMap<>();
    public void add(Update update) {
        synchronized (updates) {
            updates.computeIfAbsent(update.getKey(), k -> new ArrayList<>()).add(update);
        }
    }
    public Map<String,List<Update>> getUpdatesAndClear() {
        synchronized (updates) {
            Map<String,List<Update>> copy = new HashMap<>(updates);
            updates.clear();
            return copy;
        }
    }
}
  • 发表于 2019-01-10 01:03
  • 阅读 ( 219 )
  • 分类:网络文章

条评论

请先 登录 后评论
不写代码的码农
小编

篇文章

作家榜 »

  1. 小编 文章
返回顶部
部分文章转自于网络,若有侵权请联系我们删除