Java Concurrency in Practice 读书笔记 第五章

5.1  同步的Collections

JDK中,同步的Collections包含Vector和Hashtable,以及从1.2之后加入的Collections.synchronizedXXX 工厂构造函数生成的类。

这些类都内置了同步措施,确保任何时间只有一个线程能访问public方法。

然而这有一些例外:下标迭代、iteration迭代、foreach、next()、pre()等都是“快速失效的”,即有可能在你调用的过程中其他线程执行了修改,导致抛出异常。

因此,下面只是理论上的线程安全:

//可能会抛出异常
for (int i = 0; i < vector.size(); i++)
    doSomething(vector.get(i));

这可以通过客户端加锁来修正:

synchronized (vector) {
    for (int i = 0; i < vector.size(); i++)
        doSomething(vector.get(i));
}

但是加锁实际上性能开销很大,而且增大了死锁的概率。因此可以有另一种方法:在迭代前clone出一个副本出来,然后对“静态的”副本进行迭代操作等。

另外需要注意“隐式的”迭代器,例如对一个Set/Map来toSting也会隐式调用迭代器。

5.2  同步容器类

从Java 5开始,提供了一系列的线程安全的容器类。之前的版本都是通过“串行化”来实现线程安全的,并发性能很差。而Java 5之后提供的类具有较好的并发性能、并保证了线程安全

ConruccentMap是线程安全的Map接口。具有ConcurrentHashMap和ConrurrentSkipMap两个子类。前者用于替代HashMap,后者替代SortedMap。

CopyOnWriteArrayList是线程安全的List。当需要经常遍历的时候,使用较合适。

Queue接口,ConcurrentLinkedQueue:FIFO。

5.2.1  ConcurrentHashMap

    ConcurrentHashMap与普通的同步Map不同,使用了不同的同步策略,具有更好的并发性。
    更优秀的一点是,它们提供的Iterator不再是快速失效的了,即不会抛出ConcurrentModificationException。也就不必再遍历的时候加锁控制。

    缺点:
    (1)size()不再是准确的,而只是大概的估算值(因为不能立即反应变化)。
    (2)不能被互斥锁锁定。

    尽管如此,与Hashtable 或者 synchronizedMap相比ConcurrentHashMap还是非常有吸引力的,应该多使用。

    因为不支持互斥锁定,因此就无法在Client的代码中实现组合原子操作,例如put-if-absent,removal-if-equal等。这些都由ConcurrentHashMap提供好了。
    如果还需要自己写类似的组合原子操作,换用ConrurrentMap最好。

5.2.3  CopyOnWriteArrayList

CopyOnWriteArrayList/CopyOnWriteArraySet是用于替代同步的List/Set的。

迭代器同样不会抛出修改异常。当每次迭代前会拷贝List和Set的副本,因此这个副本是“Immute不变”的,就无需同步。

显然,拷贝的代价是巨大的,所以仅当元素不多修改非常少迭代非常多的情况下,才考虑使用它们。

5.3  阻塞队列与生产者-消费者模型

JDK6以后,提供了更为强大的接口:BlockingQueue和BlockingDequeue。

上述接口内置实现了“N对N的生产者-消费者”模型。当Queue为空的时候,take会被阻塞。同理Queue可设置“容量上限”,超过容量上线后,put操作会被阻塞。

它们的内部具有高并发的同步机制,因此使用下面的代码完全是线程安全的。

 class Producer implements Runnable {
   private final BlockingQueue queue;
   Producer(BlockingQueue q) { queue = q; }
   public void run() {
     try {
       while(true) { queue.put(produce()); }
     } catch (InterruptedException ex) { ... handle ...}
   }
   Object produce() { ... }
}

class Consumer implements Runnable {
   private final BlockingQueue queue;
   Consumer(BlockingQueue q) { queue = q; }
   public void run() {
     try {
       while(true) { consume(queue.take()); }
     } catch (InterruptedException ex) { ... handle ...}
   }
   void consume(Object x) { ... }
}

class Setup {
   void main() {
     BlockingQueue q = new SomeQueueImplementation();
     Producer p = new Producer(q);
     Consumer c1 = new Consumer(q);
     Consumer c2 = new Consumer(q);
     new Thread(p).start();
     new Thread(c1).start();
     new Thread(c2).start();
   }
}
无需额外的同步控制了,而且线程可以N个。

实现了这些接口的有ArrayBlockingQueue, DelayQueue, LinkedBlockingDeque, LinkedBlockingQueue, PriorityBlockingQueue, SynchronousQueue。分别对应于之前几乎所有的容器类。

BlockingDeQueue使用了更先进的“Working-Stell”模型,当一个线程没有工作可以做的时候,会从其他线程的工作队列“偷取”任务,以保证每个线程都是忙碌的。

这在Web-Crawler中经常被用到。

5.4  阻塞和可中断(Interruptible)的方法

当线程在活动之前或活动期间处于正在wait()等待、sleep()休眠或占用状态且该线程被中断时,抛出该异常。有时候,一种方法可能希望测试当前线程是否已被中断,如果已被中断,则立即抛出InterruptedException异常。

被中断 != 线程终止!只代表线程状态的改变。

可中断的方法(Interruptible)是指方法将线程被中断的时候抛出InterruptedException异常。例如BlockingQueue提供的take和put都是这种可中断的方法。

当你的程序调用这种“可中断”方法的时候,需要捕获这种异常时,有两种处理方法:

1、继续传播。2、自我捕获。

下面是一个例子:

public class TaskRunnable implements Runnable {
    BlockingQueue<Task> queue;
    ...
    public void run() {
        try {
            processTask(queue.take());
        } catch (InterruptedException e) {
             // restore interrupted status
             Thread.currentThread().interrupt();
        }
    }
}

5.5 同步装置

同步装置:协调控制并发流程的各种机制。例如前面的BlockingQueue、信号量、栅栏、闸门等。

5.5.1  门阀

延迟线程直到到达某个中介状态。闸门打开后状态将维持不变。

用途:

1、保证资源初始化前不会被使用。

2、确保服务的依赖顺序,当之前服务启动之前,不会启动后面的服务。

3、等待多方因素都参与到活动中。例如多人参加的在线游戏。

CountDownLatch(JDK5后提供)是一种经典的门阀。它允许多个线程等待一系列的事件发生。门阀状态由正整数表示,即等待的事件个数。countDown()方法 对计数器减一,表示某个事件发生了。调用await()方法将阻塞,直到countDown减到0所有的事件都发生)。

用这个方法可以精确计算并发程序的效率。即:

1、先创建n个线程,然后让他们等待第一个阀门。
2、创建完毕后,开启第一个阀门。第二个阀门设置为N(线程个数)
3、每个线程执行完毕后阀门二countDown()。

最后主线程就得到了比较精确的时间了。

JDK里面的例子比较直观,书上给的有些小别扭。

public class TestHarness {
    public long timeTasks(int nThreads, final Runnable task)
            throws InterruptedException {
        final CountDownLatch startGate = new CountDownLatch(1);
        final CountDownLatch endGate = new CountDownLatch(nThreads);

        for (int i = 0; i < nThreads; i++) {
            Thread t = new Thread() {
                public void run() {
                    try {
                        startGate.await();
                        try {
                            task.run();
                        } finally {
                            endGate.countDown();
                        }
                    } catch (InterruptedException ignored) { }
                }
            };
            t.start();
        }

        long start = System.nanoTime();
        startGate.countDown();
        endGate.await();
        long end = System.nanoTime();
        return end-start;
    }
}

5.5.2  FutureTask

Future同时实现了Callable接口和Runnable,

Runnable使其能像个线程一样工作,儿Callable支持返回结果。

因此经常用在Executor中,异步的执行任务并返回结果或者预计算(计算耗时但无需同步地等待结果)。

它提供了get()用于获取执行返回值。如果线程没有执行完毕,get将被阻塞直到执行完毕返回。如果执行完毕了,马上就返回。

public class Preloader {
    private final FutureTask<ProductInfo> future =
        new FutureTask<ProductInfo>(new Callable<ProductInfo>() {
            public ProductInfo call() throws DataLoadException {
                return loadProductInfo();
            }
        });
    private final Thread thread = new Thread(future);

    public void start() { thread.start(); }

    public ProductInfo get()
            throws DataLoadException, InterruptedException {
        try {
            return future.get();
        } catch (ExecutionException e) {
            Throwable cause = e.getCause();
            if (cause instanceof DataLoadException)
                throw (DataLoadException) cause;
            else
                throw launderThrowable(cause);
        }
    }

5.5.3  信号量

Semaphore类在JDK5后提供。

acquire()获得一个信号量(没有则阻塞),release()释放一个信号量。

public class BoundedHashSet<T> {
    private final Set<T> set;
    private final Semaphore sem;

    public BoundedHashSet(int bound) {
        this.set = Collections.synchronizedSet(new HashSet<T>());
        sem = new Semaphore(bound);
    }

    public boolean add(T o) throws InterruptedException {
        sem.acquire();
        boolean wasAdded = false;
        try {
            wasAdded = set.add(o);
            return wasAdded;
        }
        finally {
            if (!wasAdded)
                sem.release();
        }
    }

    public boolean remove(Object o) {
        boolean wasRemoved = set.remove(o);
        if (wasRemoved)
            sem.release();
        return wasRemoved;
    }
}

用处很多,就不说了。

5.5.4  屏障

屏障与闸门类似,闸门时一次性的,而栅栏可以被reset重新使用。

CyclicBarrier是一个同步辅助类,允许一组线程相互等待,直到到达某个公公屏障点,它再释放后可以重新reset重用。

class Solver {
   final int N;
   final float[][] data;
   final CyclicBarrier barrier;

   class Worker implements Runnable {
     int myRow;
     Worker(int row) { myRow = row; }
     public void run() {
       while (!done()) {
         processRow(myRow);

         try {
           barrier.await();
         } catch (InterruptedException ex) {
return;
         } catch (BrokenBarrierException ex) {
return;
         }
       }
     }
   }

   public Solver(float[][] matrix) {
     data = matrix;
     N = matrix.length;
     barrier = new CyclicBarrier(N,
                                 new Runnable() {
                                   public void run() {
                                     mergeRows(...);
                                   }
                                 });
     for (int i = 0; i < N; ++i)
       new Thread(new Worker(i)).start();

     waitUntilDone();
   }
}

屏障经常用在仿真中,某个步骤可以被并行分解为多个步骤,但是必须都完成了才能走下一步的时候。

另外一种形式是Exchanger,可以拿来直接实现双缓冲、生产者消费者之类的。

Leave a Reply

Your email address will not be published. Required fields are marked *