7.1 任务的取消
任务应该是可取消的:在run的外界可以让其状态从运行变为终止。
需要取消任务的场景:
1、用户取消任务,如点击了“取消”
2、时间timeout限制的任务
3、程序外部事件需要处理。
4、出错
5、关闭、退出、清理
Java中没有原生提供“停止线程”的方法,但可以使用“bool标志位+volatile”的方式来实现,注意list还是需要被锁保护的。
@ThreadSafe
public class PrimeGenerator implements Runnable {
@GuardedBy("this")
private final List<BigInteger> primes
= new ArrayList<BigInteger>();
private volatile boolean cancelled;
public void run() {
BigInteger p = BigInteger.ONE;
while (!cancelled ) {
p = p.nextProbablePrime();
synchronized (this) {
primes.add(p);
}
}
}
public void cancel() { cancelled = true; }
public synchronized List<BigInteger> get() {
return new ArrayList<BigInteger>(primes);
}
}
List<BigInteger> aSecondOfPrimes() throws InterruptedException {
PrimeGenerator generator = new PrimeGenerator();
new Thread(generator).start();
try {
SECONDS.sleep(1);
} finally {
generator.cancel();
}
return generator.get();
}
7.1.1 Interruption
有的时候这种简单机制足矣,但是如果使用了BlockingQueue呢?get()将被阻塞,根本没机会去check这个bool的状态。这就是Interruption机制的用武之地了。
Thread内置bool变量:
1、调用线程对象的interruption(),改变bool状态,线程变为被中断
2、Thread.sleep、Object.wait以及可能抛出InterruptionException的方法会检测到,并尽快抛出异常、返回。同时清空bool状态。
3、static的interrupted将清理当前线程的bool的值并恢复之前的状态
interruption(和中断一样),并发马上终止,而是在线程的“下一个最快可能时间”结束并转入其他流程。
interrupt有两个捕获点:
1、正在Sleep、wait时抛出的InterruptionException异常
2、正好处于while循环check时,检查isInterruption()
class PrimeProducer extends Thread {
private final BlockingQueue<BigInteger> queue;
PrimeProducer(BlockingQueue<BigInteger> queue) {
this.queue = queue;
}
public void run() {
try {
BigInteger p = BigInteger.ONE;
while (!Thread.currentThread().isInterrupted())
queue.put(p = p.nextProbablePrime());
} catch (InterruptedException consumed) {
/* Allow thread to exit */
}
}
public void cancel() { interrupt(); }
}
7.1.2 Interruption策略
由于线程没有直接的cancel机制,所以一种比较“官方”的方法是通过“interrupt”。
线程的调用者可以对线程进行interrupt(),线程可以定期对isInterrupt()进行判断,并进行相应的处理。
线程自己也可以Thread.currentThread().interrupt();
7.1.3 响应Interruptiion
在抛出InterruptedException异常时,有两种策略:
1、传递异常(throws直接)
2、重设interrupt状态(因为你不想抛出异常导致程序退出)
策略1,直接加上throws:
public Task getNextTask() throws InterruptedException {
return queue.take();
}
策略2,捕获异常后,再次Thread.currentThread().interrupt():
public Task getNextTask(BlockingQueue<Taskgt; queue) {
boolean interrupted = false;
try {
while (true) {
try {
return queue.take();
} catch (InterruptedException e) {
interrupted = true;
// fall through and retry
}
}
} finally {
if (interrupted)
Thread.currentThread().interrupt();
}
}
7.1.4 例子
“ 在指定时间内运行某线程,超时则结束掉”是一个常见的业务需求。
除了上述各种蹩脚的办法外,还可以用Future解决这个问题。
public static void timedRun(Runnable r,
long timeout, TimeUnit unit)
throws InterruptedException {
Future<?> task = taskExec.submit(r);
try {
task.get(timeout, unit);
} catch (TimeoutException e) {
// task will be cancelled below
} catch (ExecutionException e) {
// exception thrown in task; rethrow
throw launderThrowable(e.getCause());
} finally {
// Harmless if task already completed
task.cancel(true); // interrupt if running
}
}
尽管get返回的是null,但是这种方法可以清晰的了解到到底是超时,还是运行时错误导致的异常。
7.1.6 处理无法被interrupt的代码
不是所有的方法都能“响应”interrupt,特别是I/O阻塞这些。有一些特殊的技巧来取消被阻塞的任务:
java.io的同步阻塞:直接关闭socket、文件。(可能会抛出SocketException)
java.nio的同步阻塞:打断InterruptibleChannel上等待的线程,会抛出异常CloseByInterruptException。关闭Channel会抛出AsynchronousCloseException。多数Channel都实现了InterruptibleChannel。
java.nio的异步非阻塞I/O:如果被阻塞在Selector.select上,可以调用wakeup让它直接返回。
public class ReaderThread extends Thread {
private final Socket socket;
private final InputStream in;
public ReaderThread(Socket socket) throws IOException {
this.socket = socket;
this.in = socket.getInputStream();
}
public void interrupt() {
try {
socket.close();
}
catch (IOException ignored) { }
finally {
super.interrupt();
}
}
public void run() {
try {
byte[] buf = new byte[BUFSZ];
while (true) {
int count = in.read(buf);
if (count < 0)
break;
else if (count > 0)
processBuffer(buf, count);
}
} catch (IOException e) { /* Allow thread to exit */ }
}
}
尽管Runnable不支持cancel,但是将它转化成Future以支持cancel。
可以利用ThreadPoolExecutor的newTaskfor将Callable包装成RunnableFuture,以支持cancel。newTaskFor负责将向pool中提交的Task转化成Future(RunnableFuture)。
而定义cancel,可以在定义Task的时候用匿名函数搞定。
public interface CancellableTask<T> extends Callable<T> {
void cancel();
RunnableFuture<T> newTask();
}
@ThreadSafe
public class CancellingExecutor extends ThreadPoolExecutor {
...
protected<T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
if (callable instanceof CancellableTask)
return ((CancellableTask<T>) callable).newTask();
else
return super.newTaskFor(callable);
}
}
public abstract class SocketUsingTask<T>
implements CancellableTask<T> {
@GuardedBy("this") private Socket socket;
protected synchronized void setSocket(Socket s) { socket = s; }
public synchronized void cancel() {
try {
if (socket != null)
socket.close();
} catch (IOException ignored) { }
}
public RunnableFuture<T> newTask() {
return new FutureTask<T>(this) {
public boolean cancel(boolean mayInterruptIfRunning) {
try {
SocketUsingTask.this.cancel();
} finally {
return super.cancel(mayInterruptIfRunning);
}
}
};
}
}