Java Concurrency in Practice 读书笔记 第六章

6.1  在线程中执行任务

在多线程开发中,要执行多线程任务,第一步是“找出任务边界”。

一个好的并发服务器是:大吞吐量,低响应时间。当系统过载的时候,只是性能降低而不会崩溃。

网络服务器为多线程提供了自然的任务边界:每一个客户端的请求。例如Web服务器、Mail服务器、文件服务器、EJB容器、数据库服务器都接受这种请求。各个客户端之间的请求应该是互补影响的。

单线程服务器

class SingleThreadWebServer {
    public static void main(String[] args) throws IOException {
        ServerSocket socket = new ServerSocket(80);
        while (true) {
            Socket connection = socket.accept();
            handleRequest(connection);
        }
    }
}

处理一个客户的连接时,其他连接只能等待。在成熟的商业化产品中,显然性能太差。实际上,在处理过程中,经常会发生I/O阻塞(网络read、write等),由于是单线程,只要有阻塞,服务器就没有响应了。

多线程(每次创建新线程)

因此多线程是一个很自然的想法,一种简单的方法是,每一个客户端请求时,新建一个线程进行处理。原线程继续等待。

class ThreadPerTaskWebServer {
    public static void main(String[] args) throws IOException {
        ServerSocket socket = new ServerSocket(80);
        while (true) {
            final  Socket connection = socket.accept();
            Runnable task = new Runnable() {
                    public void run() {
                        handleRequest(connection);
                    }
                };
            new Thread(task).start();
        }
    }
}

需要注意的是:task中的处理代码必须保证线程安全。

当用户请求不密集的时候,这种模型能很好的工作。但当大量用户并发连接(线程增多)的时候,性能会急剧下降甚至导致程序的崩溃:

1、线程的创建、销毁需要开销。特别是线程数量很多的时候,累积起来的开销很大。

2、资源消耗。过多线程会消耗大量内存,增加GC的负担。

3、健壮性。每个操作系统、JVM都有线程创建的极限,当超过这个极限的时候,一般就会OutofMemoryException。

总结:多线程可以通过并发计算的形式提升性能,但毫无限制的创建线程是不行的,最终会导致系统的崩溃。

6.2  Executor框架

单线程模型的并发性太差、响应时间太长。每次请求创建一个新线程的开销太大,很难进行资源管理。

为此JDK提供了Executor框架,以Task(Runnable)为基本单位,将任务的提交与任务的执行解耦。

Executor使用的是“生产者-消费者”模型。Task的提交是生产者,执行时消费者。

这种框架让并发控制与业务逻辑解耦,使用起来非常简单。

一个使用了固定100线程Executor的Web服务器。

class TaskExecutionWebServer {
    private static final int NTHREADS = 100;
    private static final Executor exec
        = Executors.newFixedThreadPool(NTHREADS);

    public static void main(String[] args) throws IOException {
        ServerSocket socket = new ServerSocket(80);
        while (true) {
            final Socket connection = socket.accept();
            Runnable task = new Runnable() {
                public void run() {
                    handleRequest(connection);
                }
            };
            exec.execute(task);
        }
    }
}

6.2.2  Executor的策略

Executor的策略包括:

1、在哪个线程执行

2、什么顺序?(FIFO、LIFO、优先队列)

3、多少个任务可并发执行

4、多少个任务可以被放入队列等待执行

5、如果因为系统过载导致Task被拒绝,应该选择哪个Task,怎么通知用户?

6、执行完一个Task后应该做什么。

围绕这些问题,Executors提供了一系列子类,方便用户选择。

6.2.3  线程池

线程池,顾名思义拥有N个工作线程,以及一个工作队列。每一个线程的工作很简单:从队列中取出一个任务、执行、然后再等待下一个任务。

与每次都新建线程相比,线程池拥有诸多优势,最主要的是:可以避免反复创建、销毁线程的开销。通过合理的调整线程池大小,可以在性能和内存消耗之间达到一个平衡。

Execotors提供了很多线程池:

newFixedThreadPool,初始创建N个线程,如果中途有线程挂掉了,创建新的以维持N个的数量。

newCachedThreadPool,更自由的控制线程数量,当任务多的时候增多线程,少的时候减少线程。没有上下限。

newSingleThreadExecutor,纯单线程,保证任务是顺序执行的。

尽管FixedThreadPool限定了线程上限,但是若Task过多的话,还是有可能OutofMemoryException的

6.2.4  Executor的生命周期

如果Executor没有被正确shutdown的话,可能会致使JVM无法退出。先来看一下Executor的生命周期吧。

为了加强生命周期管理(如程序关闭、异常结束),JDK拓展出了ExecutorService接口

public interface ExecutorService extends Executor {
    void shutdown();
    List<Runnable> shutdownNow();
    boolean isShutdown();
    boolean isTerminated();
    boolean awaitTermination(long timeout, TimeUnit unit)
        throws InterruptedException;
    //  ... additional convenience methods for task submission
}

ExecutorService有三种状态:running、shutdown、terminated。初始状态下,是Running的

shutdown() 优雅的关闭,不接受新Task,已经提交的Task将被执行完毕后,再关闭。

shutdownNow() 立即关闭,不再执行任何Task,并将Queue的没有执行的返回。

awaitTermination() 等待Executor到达terminated状态

isTerminated() 检查是否到达terminated状态

一个结合了shutdown的服务器实例如下:

class LifecycleWebServer {
    private final ExecutorService exec = ...;

    public void start() throws IOException {
        ServerSocket socket = new ServerSocket(80);
        while (!exec.isShutdown()) {
            try {
                final Socket conn = socket.accept();
                exec.execute(new Runnable() {
                    public void run() { handleRequest(conn); }
                });
            } catch (RejectedExecutionException e) {
                if (!exec.isShutdown())
                    log("task submission rejected", e);
            }
        }
    }

    public void stop() { exec.shutdown(); }

    void handleRequest(Socket connection) {
        Request req = readRequest(connection);
        if (isShutdownRequest(req))
            stop();
        else
            dispatchRequest(req);
    }
}

6.2.5  延时和周期任务

在JDK5之前,通过Timer执行周期任务,或者延时任务。

JDK5之后ScheduledThreadPoolExecutor可担任Timer提供了更灵活的方法。

Timer的问题是:只能开一一个线程,如果频率是10ms一次,但是某Task是40ms(可以选择fixed rate或者fixeddelay,如果是前者的话)就可能导致任务丢失,后者的话就是任务少执行了。

ScheduledThreadPoolExecutor可以创建多个线程来解决这个问题。

另外一个问题是,如果Task抛出异常,Timer的线程就挂掉了,不会重启,相当于Timer被取消了。ScheduledThradPool可以解决这个问题。

因此JDK5后,应该首选ScheduledThreadPoolExecutor。

类似的还有DelayQueue,也可以使用。

6.3 找出可利用的并行

为了很好地利用Executor框架,需要把Task封装为Runnable。

对于C/S程序而言,任务边界很简单,就是一次Request/Response。更多的时候没有明确的定义。

以一个HTML网页渲染为例。

最基本的策略:单线程、先渲染文本,再下载、渲染图片:

public class SingleThreadRenderer {
    void renderPage(CharSequence source) {
        renderText(source);
        List<ImageData> imageData = new ArrayList<ImageData>();
        for (ImageInfo imageInfo : scanForImageInfo(source))
            imageData.add(imageInfo.downloadImage());
        for (ImageData data : imageData)
            renderImage(data);
    }
}

由于图片的下载速度受到网络影响,经常会I/O阻塞。因此CPU计算资源将被大量的闲置。

为了支持异步任务的返回,Executor还支持Future(一种Callable)。可以通过get异步获取结果。除了支持submit一个Future(Runnable之外),默认提交Runnable也会返回一个Callale(get也会阻塞,只不过最终返回null)。

这种submit导致的发布时安全的。

使用Callable对程序进行改进:

public class FutureRenderer {
    private final ExecutorService executor = ...;

    void renderPage(CharSequence source) {
        final List<ImageInfo> imageInfos = scanForImageInfo(source);
        Callable<List<ImageData>> task =
                new Callable<List<ImageData>>() {
                    public List<ImageData> call() {
                        List<ImageData> result
                                = new ArrayList<ImageData>();
                        for (ImageInfo imageInfo : imageInfos)
                            result.add(imageInfo.downloadImage());
                        return result;
                    }
                };

        Future<List<ImageData>> future =  executor.submit(task);
        renderText(source);

        try {
            List<ImageData> imageData =  future.get();
            for (ImageData data : imageData)
                renderImage(data);
        } catch (InterruptedException e) {
            // Re-assert the thread's interrupted status
            Thread.currentThread().interrupt();
            // We don't need the result, so cancel the task too
            future.cancel(true);
        } catch (ExecutionException e) {
            throw launderThrowable(e.getCause());
        }
    }
}

先开启线程提交图片下载,再渲染文字,最后等待异步的图片下载完成后,再渲染图片。在一定程度上减少了程序的响应时间。

BlockingQueue是一个非常有利的工具:

1、当多个Class共享一个线程池的时候,可以在Task执行完毕后将结果写入BlockingQueue。主线程从BQ中get,即使暂时没有也是异步的、线程安全的。

2、可以控制多个扔进Executors的异步任务的执行结果。

另外Future提供了一个带timeout的接口。用于有时间限制的完成指定任务。

V get(long timeout,TimeUnit unit)

Page renderPageWithAd() throws InterruptedException {
    long endNanos = System.nanoTime() + TIME_BUDGET;
    Future<Ad> f = exec.submit(new FetchAdTask());
    // Render the page while waiting for the ad
    Page page = renderPageBody();
    Ad ad;
    try {
        // Only wait for the remaining time budget
        long timeLeft = endNanos - System.nanoTime();
        ad = f.get(timeLeft, NANOSECONDS);
    } catch (ExecutionException e) {
        ad = DEFAULT_AD;
    } catch (TimeoutException e) {
        ad = DEFAULT_AD;
        f.cancel(true);
    }
    page.setAd(ad);
    return page;
}

Executors也提供了带时间版本的invokeAll,用于对一组提交的Future,并在指定时间内返回。

Leave a Reply

Your email address will not be published. Required fields are marked *