本文共 8310 字,大约阅读时间需要 27 分钟。
同步容器类包括:
* Vector和Hashtable * Collections.synchronizedXxx等工厂方法创建的。特点是对每一个共有方法都进行同步。
一般情况下是线程安全的,但是在某些情况下需要客户端加锁才能保证线程安全,比如“若没有就添加”。
同步容器使用自身的锁保护它的每一个方法,因此在扩展的时候,为了保证原子性,只需要对同步容器加锁即可。
public static Object getLast(Vector list) { synchronized (list) { int lastIndex = list.size() - 1; return list.get(lastIndex); }}public static void deleteLast(Vector list) { synchronized (list) { int lastIndex = list.size() - 1; list.remove(lastIndex); }}
无论直接迭代还是直接for-each语法,jvm都会按照迭代器处理。如果迭代器在迭代期间出现并发修改,则会抛出ConcurrentModificationException异常。
解决的方法:
public class HiddenIterator { @GuardedBy("this") private final Setset = new HashSet (); public synchronized void add(Integer i) { set.add(i); } public synchronized void remove(Integer i) { set.remove(i); } public void addTenThings() { Random r = new Random(); for (int i = 0; i < 10; i++) add(r.nextInt()); System.out.println("DEBUG: added ten elements to " + set);//这里可能会抛出ConcurrentModificationException异常 }}
容器的操作:
都会引起隐藏的Iterator操作,在并发环境中使用的时候一定要特别小心。
与HashMap一样,两者都是AbstractMap的直接子类,都是基于Hash的Map。
不同于HashMap的是,ConcurrentHashMap使用一种粒度更细的加锁机制,这种机制叫做分段锁,任意数量的读线程可以并发读,一定数量的写线程可以并发写,提供的迭代器不会抛出ConcurrentModificationException。
值得注意的是,为了支持更高的并发性,ConcurrentHashMap的size和isEmpty返回的结果只是一个估计值,是不精确的。
虽然ConcurrentHashMap不能通过客户端加锁创建新的原子操作。但是提供了一些常见的复合操作,例如“若没有则添加”,“若想等则移除”,“若想等则替换”等。
用于替代List,某些时候表现出更好的并发性能。只要正确发布一个事实不可变的对象,在访问该对象的时候就不需要进一步的同步。
当修改时会创建并重新创建一个容器的副本,而写入时复制容器的迭代器保留一个指向原来数组的引用,这样就可以避免修改对迭代的影响。迭代器返回的元素与迭代器创建的时候的元素完全一致。
由于每次修改都要复制数组,因此这部分的性能消耗需要权衡。
阻塞队列常常用于生产者消费者模式,提供了两类修改方法,阻塞方法和非阻塞方法。
阻塞方法包括put和take。put方法会阻塞到有空间可用;take会阻塞到有元素可用。
非阻塞方法包括offer和poll,offer插入成功返回true,poll获取成功返回true。二者都有一个重写方法,加入超时机制,限时阻塞。
阻塞队列的方法中提供了足够的内部同步机制,因此在使用的时候不需要额外的同步。
LinkedBlockingQueue和ArrayBlockingQueue,二者分别于LinkedList和ArrayList类似,但是有更好的并发性能。
PriorityBlockingQueue是一个优先级队列。
SynchronousQueue
Deque和BlockingDeque,可以在首尾两端进行插入删除操作。
工作密取 生产者消费者中所有消费者公用一个队列,而工作密取中每一个消费者都有一个队列,一个消费者每次从自己队列的头部获取工作,当完成自己队列中的所有工作之后,从其他消费者队列的尾部秘密获取工作。
一般来说,中断就是取消某个操作。库函数中需要正确的响应InterruptedException异常中断,对于InterruptedException不能捕获了但不作出任何响应。响应中断有两种选择:
以上可以应付大多数情况。
除了阻塞队列以外,常用的同步工具类有:
Latch包括一个计数器,该计数器初始化为一个整数,countDown方法递减计数器,await方法等待计数器为0。
public class TestHarness { public long timeTasks(int nThreads, final Runnable task) throws InterruptedException { final CountDownLatch startGate = new CountDownLatch(1); final CountDownLatch endGate = new CountDownLatch(nThreads); for (int i = 0; i < nThreads; i++) { Thread t = new Thread() { public void run() { try { startGate.await(); try { task.run(); } finally { endGate.countDown(); } } catch (InterruptedException ignored) { } } }; t.start(); } long start = System.nanoTime(); startGate.countDown(); endGate.await(); long end = System.nanoTime(); return end - start; }}
通过startGate保证同时启动多个线程,endGate时主线程等所有线程执行完。
FutureTask也可以用作闭锁。
如果任务已经完成,那么get会立即返回,反之就会将阻塞知道任务进入完成状态,然后返回结果或者抛出异常。
public class Preloader { ProductInfo loadProductInfo() throws DataLoadException { return null; } private final FutureTaskfuture = new FutureTask (new Callable () { public ProductInfo call() throws DataLoadException { return loadProductInfo(); } }); private final Thread thread = new Thread(future); public void start() { thread.start(); } public ProductInfo get() throws DataLoadException, InterruptedException { try { return future.get(); } catch (ExecutionException e) { Throwable cause = e.getCause(); if (cause instanceof DataLoadException) throw (DataLoadException) cause; else throw LaunderThrowable.launderThrowable(cause); } }}
信号量用于控制同时访问某个特定资源的操作数量或者同时执行某个指定操作的数量。Semaphore中管理着若干虚拟的许可,acquire阻塞直到获得许可,release方法归还许可给Semaphore。使用中可以用来构造有界缓冲池(一种更好的方法是BlockingQueue)。
public class BoundedHashSet{ private final Set set; private final Semaphore sem; public BoundedHashSet(int bound) { this.set = Collections.synchronizedSet(new HashSet ()); sem = new Semaphore(bound); } public boolean add(T o) throws InterruptedException { sem.acquire(); boolean wasAdded = false; try { wasAdded = set.add(o); return wasAdded; } finally { if (!wasAdded) sem.release(); } } public boolean remove(Object o) { boolean wasRemoved = set.remove(o); if (wasRemoved) sem.release(); return wasRemoved; }}
与闭锁类似,但闭锁是一次性的。栅栏与闭锁的关键区别在于:闭锁的所有线程必须同时到达栅栏位置才能继续执行。
public class CellularAutomata { private final Board mainBoard; private final CyclicBarrier barrier; private final Worker[] workers; public CellularAutomata(Board board) { this.mainBoard = board; int count = Runtime.getRuntime().availableProcessors(); this.barrier = new CyclicBarrier(count, new Runnable() { public void run() { mainBoard.commitNewValues();//会在最后一个到达的线程中执行 }}); this.workers = new Worker[count]; for (int i = 0; i < count; i++) workers[i] = new Worker(mainBoard.getSubBoard(count, i)); } private class Worker implements Runnable { private final Board board; public Worker(Board board) { this.board = board; } public void run() { while (!board.hasConverged()) { for (int x = 0; x < board.getMaxX(); x++) for (int y = 0; y < board.getMaxY(); y++) board.setNewValue(x, y, computeValue(x, y)); try { barrier.await(); } catch (InterruptedException ex) { return; } catch (BrokenBarrierException ex) { return; } } } private int computeValue(int x, int y) { // Compute the new value that goes in (x,y) return 0; } } public void start() { for (int i = 0; i < workers.length; i++) new Thread(workers[i]).start(); mainBoard.waitForConvergence(); } ...}
public class Memoizer implements Computable { private final ConcurrentMap > cache = new ConcurrentHashMap >(); private final Computable c; public Memoizer(Computable c) { this.c = c; } public V compute(final A arg) throws InterruptedException { while (true) { Futuref = cache.get(arg); if (f == null) { Callable eval = new Callable () { public V call() throws InterruptedException { return c.compute(arg); } }; FutureTask ft = new FutureTask (eval); f = cache.putIfAbsent(arg, ft); if (f == null) { f = ft; ft.run(); } } try { return f.get(); } catch (CancellationException e) { cache.remove(arg, f); } catch (ExecutionException e) { throw LaunderThrowable.launderThrowable(e.getCause()); } } }}
转载地址:http://rywci.baihongyu.com/