Java Concurrency in Practice 读书笔记 第七章

7.1  任务的取消

任务应该是可取消的:在run的外界可以让其状态从运行变为终止。

需要取消任务的场景:

1、用户取消任务,如点击了“取消”

2、时间timeout限制的任务

3、程序外部事件需要处理。

4、出错

5、关闭、退出、清理

Java中没有原生提供“停止线程”的方法,但可以使用“bool标志位+volatile”的方式来实现,注意list还是需要被锁保护的。

@ThreadSafe
public class PrimeGenerator implements Runnable {
     @GuardedBy("this")
     private final List<BigInteger> primes
             = new ArrayList<BigInteger>();
     private  volatile boolean cancelled;

     public void run() {
         BigInteger p = BigInteger.ONE;
         while (!cancelled ) {
             p = p.nextProbablePrime();
             synchronized (this) {
                 primes.add(p);
             }
         }
     }

     public void cancel() { cancelled = true;  }

     public synchronized List<BigInteger> get() {
         return new ArrayList<BigInteger>(primes);
     }
}

List<BigInteger> aSecondOfPrimes() throws InterruptedException {
    PrimeGenerator generator = new PrimeGenerator();
    new Thread(generator).start();
    try {
        SECONDS.sleep(1);
    } finally {
        generator.cancel();
    }
    return generator.get();
}

7.1.1  Interruption

有的时候这种简单机制足矣,但是如果使用了BlockingQueue呢?get()将被阻塞,根本没机会去check这个bool的状态。这就是Interruption机制的用武之地了。

Thread内置bool变量:
1、调用线程对象的interruption(),改变bool状态,线程变为被中断
2、Thread.sleep、Object.wait以及可能抛出InterruptionException的方法会检测到,并尽快抛出异常、返回。同时清空bool状态
3、static的interrupted将清理当前线程的bool的值并恢复之前的状态

interruption(和中断一样),并发马上终止,而是在线程的“下一个最快可能时间”结束并转入其他流程。

interrupt有两个捕获点:

1、正在Sleep、wait时抛出的InterruptionException异常
2、正好处于while循环check时,检查isInterruption()

class PrimeProducer extends Thread {
    private final BlockingQueue<BigInteger> queue;

    PrimeProducer(BlockingQueue<BigInteger> queue) {
        this.queue = queue;
    }

    public void run() {
        try {
            BigInteger p = BigInteger.ONE;
            while (!Thread.currentThread().isInterrupted())
                queue.put(p = p.nextProbablePrime());
        } catch (InterruptedException consumed) {
            /*  Allow thread to exit  */
        }
    }
    public void cancel() { interrupt(); }
}

7.1.2  Interruption策略

由于线程没有直接的cancel机制,所以一种比较“官方”的方法是通过“interrupt”。
线程的调用者可以对线程进行interrupt(),线程可以定期对isInterrupt()进行判断,并进行相应的处理。
线程自己也可以Thread.currentThread().interrupt();

7.1.3   响应Interruptiion

在抛出InterruptedException异常时,有两种策略:

1、传递异常(throws直接)

2、重设interrupt状态(因为你不想抛出异常导致程序退出)

策略1,直接加上throws:

public Task getNextTask() throws InterruptedException {
    return queue.take();
}

策略2,捕获异常后,再次Thread.currentThread().interrupt():

public Task getNextTask(BlockingQueue<Taskgt; queue) {
    boolean interrupted = false;
    try {
        while (true) {
            try {
                return queue.take();
            } catch (InterruptedException e) {
                interrupted = true;
                // fall through and retry
            }
        }
    } finally {
        if (interrupted)
            Thread.currentThread().interrupt();
    }
}

7.1.4  例子

“ 在指定时间内运行某线程,超时则结束掉”是一个常见的业务需求。

除了上述各种蹩脚的办法外,还可以用Future解决这个问题。

public static void timedRun(Runnable r,
                            long timeout, TimeUnit unit)
                            throws InterruptedException {
    Future<?> task = taskExec.submit(r);
    try {
        task.get(timeout, unit);
    } catch (TimeoutException e) {
        // task will be cancelled below
    } catch (ExecutionException e) {
        // exception thrown in task; rethrow
        throw launderThrowable(e.getCause());
    } finally {
        // Harmless if task already completed
        task.cancel(true);  // interrupt if running
    }
}

尽管get返回的是null,但是这种方法可以清晰的了解到到底是超时,还是运行时错误导致的异常。

7.1.6  处理无法被interrupt的代码

不是所有的方法都能“响应”interrupt,特别是I/O阻塞这些。有一些特殊的技巧来取消被阻塞的任务:

java.io的同步阻塞:直接关闭socket、文件。(可能会抛出SocketException)

java.nio的同步阻塞:打断InterruptibleChannel上等待的线程,会抛出异常CloseByInterruptException。关闭Channel会抛出AsynchronousCloseException。多数Channel都实现了InterruptibleChannel。

java.nio的异步非阻塞I/O:如果被阻塞在Selector.select上,可以调用wakeup让它直接返回。

public class ReaderThread extends Thread {
    private final Socket socket;
    private final InputStream in;

    public ReaderThread(Socket socket) throws IOException {
        this.socket = socket;
        this.in = socket.getInputStream();
    }

    public void  interrupt()  {
        try {
            socket.close();
        }
        catch (IOException ignored) { }
        finally {
            super.interrupt();
        }
    }

    public void run() {
        try {
            byte[] buf = new byte[BUFSZ];
            while (true) {
                int count = in.read(buf);
                if (count < 0)
                    break;
                else if (count > 0)
                    processBuffer(buf, count);
            }
        } catch (IOException e) { /*  Allow thread to exit  */  }
    }
}

尽管Runnable不支持cancel,但是将它转化成Future以支持cancel。
可以利用ThreadPoolExecutor的newTaskfor将Callable包装成RunnableFuture,以支持cancel。newTaskFor负责将向pool中提交的Task转化成Future(RunnableFuture)。

而定义cancel,可以在定义Task的时候用匿名函数搞定。

public interface CancellableTask<T> extends Callable<T> {
    void cancel();
    RunnableFuture<T> newTask();
}

@ThreadSafe
public class CancellingExecutor extends ThreadPoolExecutor {
    ...
    protected<T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
        if (callable instanceof CancellableTask)
            return ((CancellableTask<T>) callable).newTask();
        else
            return super.newTaskFor(callable);
    }
}

public abstract class SocketUsingTask<T>
        implements CancellableTask<T> {
    @GuardedBy("this") private Socket socket;

    protected synchronized void setSocket(Socket s) { socket = s; }

    public synchronized void cancel() {
        try {
            if (socket != null)
                socket.close();
        } catch (IOException ignored) { }
    }

    public RunnableFuture<T> newTask() {
        return new FutureTask<T>(this) {
            public boolean cancel(boolean mayInterruptIfRunning) {
                try {
                    SocketUsingTask.this.cancel();
                } finally {
                    return super.cancel(mayInterruptIfRunning);
                }
            }
        };
    }
}

Leave a Reply

Your email address will not be published. Required fields are marked *