`

原码剖析之ThreadPoolExecutor入门

阅读更多
jdk 1.5 开始提供支持线程池的功能。

线程池使用的场景:创建线程的时间和资源耗费较高,线程执行时间较短。
优点:
1. 这样使用线程池可以避免多次创建耗费巨大的线程,去完成一个较小的任务
2. 复用线程,减低系统的资源浪费。
3. 另外就是线程已经创建好等待任务的执行,那么相应性也会大大提高。
4. 通过适当地调整线程池中的线程数目,也就是当请求的数目超过某个阈值时,就强制其它任何新到的请求一直等待,直到获得一个线程来处理为止,从而可以防止资源不足。保障系统的稳定运行。


注意点:
1、线程池的大小对于使用者非常关键,比如对于多并发请求较为频繁的场景,有可能因为线程池的大小导致请求的阻塞。
2、线程池可以让任务的提交和执行隔离开来,达到异步的效果。(核心:FutureTask)
3、线程池维护者任务执行的相关信息,如果执行已经执行完成的数量。
4、以及异常和任务队列满时的丢弃策略。
5、不要对那些同步等待其它任务结果的任务排队,这可能造成死锁。

java线程池类库中关键类的继承关系类图:




核心接口和类的介绍:

1、顶层接口 Executor。

执行已提交的 Runnable 任务的对象。此接口提供一种将任务提交与每个任务将如何运行的机制(包括线程使用的细节、调度等)分离开来的方法。通常使用 Executor 而不是显式地创建线程。

Executor 接口并没有严格地要求执行是异步的。在最简单的情况下,执行程序可以在调用者的线程中立即运行已提交的任务
class DirectExecutor implements Executor {
     public void execute(Runnable r) {
         r.run();
     }
}

更常见的是,任务是在某个不是调用者线程的线程中执行的。以下执行程序将为每个任务生成一个新线程
class ThreadPerTaskExecutor implements Executor {
     public void execute(Runnable r) {
         new Thread(r).start();
     }
}

此包中提供的 Executor 实现实现了 ExecutorService,这是一个使用更广泛的接口。ThreadPoolExecutor 类提供一个可扩展的线程池实现。Executors 类为这些 Executor 提供了便捷的工厂方法。

内存一致性效果:线程中将 Runnable 对象提交到 Executor 之前的操作 happen-before 其执行开始(可能在另一个线程中)。


public interface Executor {
    /**
     *
     * 执行提交的command命令,command 可能在一个新线程中执行,也可以在池中执行,或者在当前调用的线程,详细参考Executor的实现
     * @param command the runnable task
     * @throws RejectedExecutionException 如任务不能被接受,会抛出异常
     */
    void execute(Runnable command);
}



2、执行器通用服务接口:ExecutorService。

1、是一个实现了可以管理线程池功能的Executor 。
2、提供了可为跟踪一个或多个异步任务执行状况而生成 Future 的方法。
3、可以关闭 ExecutorService,这将导致其拒绝新任务shutdown()
4、通过创建并返回一个可用于取消执行和/或等待完成的 Future,方法 submit 扩展了基本方法 Executor.execute(java.lang.Runnable)。方法 invokeAny 和 invokeAll 是批量执行的最常用形式,它们执行任务 collection,然后等待至少一个,或全部任务完成(可使用 ExecutorCompletionService 类来编写这些方法的自定义变体)。

另外:Executors 类提供了用于此包中所提供的执行程序服务的工厂方法。

内存一致性效果:线程中向 ExecutorService 提交 Runnable 或 Callable 任务之前的操作 happen-before 由该任务所提取的所有操作,后者依次 happen-before 通过 Future.get() 获取的结果。

public interface ExecutorService extends Executor {
    /**
     * 启动一次顺序关闭,执行以前提交的任务,但不接受新任务。如果已经关闭,则调用没有其他作用。 
     */
    void shutdown();

    /**
      试图关闭所有活动的正在执行的任务,停止等待认为的处理,并返回等待执行的任务列表。 
     *无法保证能够停止正在处理的活动执行任务,但是会尽力尝试。
      例如,通过 Thread.interrupt() 来取消典型的实现,所以任何任务无法响应中断都可能永远无法终止。
     */
    List<Runnable> shutdownNow();

    /**
     * 返回executor 是否已经shutdown
     */
    boolean isShutdown();

    /**
      如果关闭后所有任务都已完成,则返回 true。注意,除非首先调用 shutdown 或 shutdownNow,否则 isTerminated 永不为 true。 
     */
    boolean isTerminated();

    
    boolean awaitTermination(long timeout, TimeUnit unit)
        throws InterruptedException;


    /**
     *提交一个返回值的任务用于执行,返回一个表示任务执行未决结果的 Future。该 Future 的 get 方法在成功完成时将会返回该任务的结果。 
      如果想立即阻塞任务的等待,则可以使用 result = exec.submit(aCallable).get(); 形式的构造。 
     */
    <T> Future<T> submit(Callable<T> task);

    /**
     *提交一个 Runnable 任务用于执行,并返回一个表示该任务的 Future。该 Future 的 get 方法在成功完成时将会返回给定的结果。 
     */
    <T> Future<T> submit(Runnable task, T result);

    /**
     * 提交一个 Runnable 任务用于执行,并返回一个表示该任务的 Future。该 Future 的 get 方法在成功 完成时将会返回 null。 
     */
    Future<?> submit(Runnable task);

    /**
     * 批量执行任务。返回列表的所有元素的 Future.isDone() 为 true
     */

    <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
        throws InterruptedException;

    /**
     *在特定的时间内执行tasks任务列表,当中断 过期等,即取消尚未完成的任务
     */
    <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
                                  long timeout, TimeUnit unit)
        throws InterruptedException;

    /**
     * 执行给定的任务,如果某个任务已成功完成(也就是未抛出异常),则返回其结果。一旦正常或异常返回后,则取消尚未完成的任务。
     */
    <T> T invokeAny(Collection<? extends Callable<T>> tasks)
        throws InterruptedException, ExecutionException;

    /**
     同上
     */
    <T> T invokeAny(Collection<? extends Callable<T>> tasks,
                    long timeout, TimeUnit unit)
        throws InterruptedException, ExecutionException, TimeoutException;
}


3、执行器服务的默认实现:AbstractExecutorService。
此类提供 ExecutorService 执行方法的默认实现。此类使用 newTaskFor 返回的 RunnableFuture 实现 submit、invokeAny 和 invokeAll 方法,默认情况下,RunnableFuture 是此包中提供的 FutureTask 类。例如,submit(Runnable) 的实现创建了一个关联 RunnableFuture 类,该类将被执行并返回。子类可以重写 newTaskFor 方法,以返回 FutureTask 之外的 RunnableFuture 实现。

AbstractExecutorService 对callable 和 runnable 通过适配器做统一管理。
相关类图如下:



原码分析如下:
public abstract class AbstractExecutorService implements ExecutorService {

    /**
     * 为给定可运行任务和默认值返回一个 RunnableFuture。 
     *
     * @param runnable 将被包装的可运行任务
     * @param 用于所返回的将来任务的默认值
     * @return 在运行的时候,它将运行底层可运行任务,作为 Future 任务,它将生成给定值作为其结果,并为底层任务提供取消操作。
     */
    protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
        return new FutureTask<T>(runnable, value);
    }

    /**
     * 同上
     */
    protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
        return new FutureTask<T>(callable);
    }

    public Future<?> submit(Runnable task) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<Object> ftask = newTaskFor(task, null); //注意:如果提交的是runnable 的任务,默认值返回的是null
        execute(ftask);
        return ftask;
    }

    public <T> Future<T> submit(Runnable task, T result) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<T> ftask = newTaskFor(task, result); //可以提供返回的默认值result
        execute(ftask);
        return ftask;
    }

    public <T> Future<T> submit(Callable<T> task) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<T> ftask = newTaskFor(task);
        execute(ftask); //执行任务
        return ftask;
    }

		/** 调用所有任务
		执行给定的任务,当所有任务完成时,返回保持任务状态和结果的 Future 列表。返回列表的所有元素的 Future.isDone() 为 true。
		注意,可以正常地或通过抛出异常来终止已完成 任务。如果正在进行此操作时修改了给定的 collection,则此方法的结果是不确定的。 
		*/
    public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
        throws InterruptedException {
        if (tasks == null)
            throw new NullPointerException();
        List<Future<T>> futures = new ArrayList<Future<T>>(tasks.size()); //定义返回的Futrue list
        boolean done = false; //标志执行是否结束
        try {
            for (Callable<T> t : tasks) {
                RunnableFuture<T> f = newTaskFor(t);
                futures.add(f);
                execute(f); //执行任务!!
            }
            for (Future<T> f : futures) {
                if (!f.isDone()) { //如果任务没有执行完,那么f.get()将强制进行执行完成。
                    try {
                        f.get();
                    } catch (CancellationException ignore) { //注意:如果此次抛出异常,那么返回的list列表里面将不能保证所有的任务都是完成的!!!
                    } catch (ExecutionException ignore) {
                    }
                }
            }
            done = true;
            return futures;
        } finally {
            if (!done) //如果未执行完发生意外,那么所有的任务进行取消!
                for (Future<T> f : futures)
                    f.cancel(true);
        }
    }

		/** 同上,有过期限制*/
    public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
                                         long timeout, TimeUnit unit)
        throws InterruptedException {
        if (tasks == null || unit == null)
            throw new NullPointerException();
        long nanos = unit.toNanos(timeout);
        List<Future<T>> futures = new ArrayList<Future<T>>(tasks.size());
        boolean done = false;
        try {
            for (Callable<T> t : tasks)
                futures.add(newTaskFor(t));

            long lastTime = System.nanoTime();

            // Interleave time checks and calls to execute in case
            // executor doesn't have any/much parallelism.
            Iterator<Future<T>> it = futures.iterator();
            while (it.hasNext()) {
                execute((Runnable)(it.next()));
                long now = System.nanoTime();
                nanos -= now - lastTime; //nanos = nanos - 两次执行之间的时间间隔
                lastTime = now; //更改上一次执行的时间
                if (nanos <= 0) // nanos <=0 代表 已经超时
                    return futures;
            }

            for (Future<T> f : futures) {
                if (!f.isDone()) {
                    if (nanos <= 0)
                        return futures;
                    try {
                        f.get(nanos, TimeUnit.NANOSECONDS);
                    } catch (CancellationException ignore) {
                    } catch (ExecutionException ignore) {
                    } catch (TimeoutException toe) {
                        return futures;
                    }
                    long now = System.nanoTime();
                    nanos -= now - lastTime;
                    lastTime = now;
                }
            }
            done = true;
            return futures;
        } finally {
            if (!done)
                for (Future<T> f : futures)
                    f.cancel(true);
        }
    }

}


  • 大小: 30.1 KB
  • 大小: 45.2 KB
分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics