Java并发编程-Executor框架几个常见的实现类介绍

不鸣则已,一鸣惊人。——《史记·滑稽列传》

上一篇文章已经讲了 Executor 的整体框架,其中有提到它的执行示意流程图。这篇文章就介绍几个常见的实现类的原理和用法,最后通过示例代码,打印 log 来明确它们的执行流程。

常见实现类介绍

newFixedThreadPool 详解

它会创建一个有固定数目线程的线程池,并绑定一个无界的阻塞队列。任何时候,最多只有 nThread 个线程处于活动状态。当提交一个任务给队列的时候,如果此时没有空闲线程,这个任务将在队列等待,直到有空闲线程时才会去取出任务并执行。任何一个线程,在关闭之前,假如由于执行失败终止了,则会创建一个新的线程取代他。所有线程在线程池中会一直存在直到调用 shutdown 关闭他们。

1
2
3
4
5
6
//参数表示要创建的线程数,由用户定义
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}

它的执行示意图如下:

newFixedThreadPool

newSingleThreadExecutor

创建只有一个线程的 Executor 框架,任何时候只有一个线程,它绑定一个无界的阻塞队列。如果执行过程中线程终止了,它会创建一个新的线程取代它。它保证绝对是串行执行任务的。它有两个构造版本:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
//版本一:
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
//版本二:
//我们可以自己定义线程的实现类,然后在穿进去。方便我们私人订制。
public static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory) {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(),
threadFactory));
}
//接口,定义自己的线程。
public interface ThreadFactory {
Thread newThread(Runnable r);
}

下面是它的运行示意图:

newSingleThreadExecutor

newCachedThreadPool

这个线程池是按需创建线程的,它可以重用创建了的线程。当执行的都是耗时很短的异步任务时,这个线程池能够显著的提升程序性能。因为它能够重用创建过的并且空闲的线程。如果此时提交一个任务但是没有可用线程时,它能够创建新的线程来执行。但是假如线程存在 60s 并且都处于空闲状态时,它将会被回收。所以从这个角度说,这个线程池在空闲的时候不会消耗资源。但是极端情况下,会不断的创建线程直至资源用尽。
它使用的队列是 SynchronousQueue,提交一个任务给队列后,必须有空闲线程在等待,才能匹配成功。否则提交任务将会失败。

1
2
3
4
5
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}

下面是它的运行图:

这里写图片描述

代码示例

下面通过实例代码来看看Executor是如何运行的。模拟一个下载图片的耗时任务,通过把任务提交给 Executor 现成框架去执行,打印输出结果。这里举例说明下面三个实现类,其他的大同小异。

  1. newFixedThreadPool
  2. ExecutorCompletionService
  3. ScheduledThreadPoolExecutor

newFixedThreadPool 示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.Semaphore;a
import java.util.concurrent.TimeUnit;

public class ThreadExecutor{
//线程数目
private static final int THREAD_SIZE = 2;
//任务的个数
private static final int TASK_SIZE = 5;
//Executor
private static ExecutorService executor = null;
//任务列表
private static List<Future<String>> list = null;
public static void main(String[] args) {
//创建固定长度的线程池
executor = Executors.newFixedThreadPool(THREAD_SIZE);
list = new ArrayList<>();
for(int i = 0; i < TASK_SIZE; i++) {
//提交任务到线程池的队列去执行
Future<String> f = executor.submit(new ImageTask(i+1));
//把返回的Future添加到数组
list.add(f);
}
long start = System.nanoTime();
for(int i = 0; i < list.size();i++) {
try {
//获取执行结果
String imageId = list.get(i).get();
long period = System.nanoTime() - start;
System.out.println(imageId + "download time : " + formatTime(period));
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} catch (ExecutionException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}

//时间转化,将长整形的时间间隔转化为秒
static String formatTime(long period) {
return String.format(
"period(nanoseconds): %d, period(seconds): %d", new Object[] {
period, TimeUnit.NANOSECONDS.toSeconds(period)});
}

//创建模拟下载图片的任务
static class ImageTask implements Callable<String>{
//图片 ID
private int imageId;
public ImageTask(int val) {
this.imageId = val;
}

@Override
public String call() throws Exception {
return downlaodImage();
}
//下载图片并返回
private String downlaodImage() {
try {
System.out.println(Thread.currentThread().getName() + " is downloading image");
Thread.sleep(2000);//假装是耗时操作
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
return (Thread.currentThread().getName() + ": imageId - " + this.imageId);
}
}
}

执行结果:

可以看出,线程池只有两个线程,每次只能执行两个任务,执行完成则进行下一组。可以看出log,每两秒一组。通过数组遍历的方式按顺序打印结果。

然而,假设每个任务的耗时不同,那么每次打印的时候,即使后面的任务已经执行完毕,也必须等待前面的下载完才能打印出来。

那么,是否能够做到下载完图片就立刻打印出来呢?答案是肯定的。来看下面这个框架示例,它很好的满足了这个需求。

ExecutorCompletionService 示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83

import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

public class ThreadExecutor{
//线程数目
private static final int THREAD_SIZE = 2;
//任务的个数
private static final int TASK_SIZE = 5;
//Executor
private static ExecutorCompletionService executor;
//标志位,每个任务执行时间不同
private static boolean flag = true;

public static void main(String[] args) {
//创建固定长度的线程池
executor = new ExecutorCompletionService(Executors.newFixedThreadPool(THREAD_SIZE));

for(int i = 0; i < TASK_SIZE; i++) {
//提交任务到线程池的队列去执行
executor.submit(new ImageTask(i+1));
}

long start = System.nanoTime();
int count = 0;
while(count < TASK_SIZE) {
//获取执行结果
count++;
long period = System.nanoTime() - start;
try {
//获取输出结果
System.out.println(executor.take().get() + " download use time : " + formatTime(period) + "s ");
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} catch (ExecutionException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}

//时间转化,将长整形的时间间隔转化为秒
static String formatTime(long period) {
return String.format(
"%d", new Object[] {
TimeUnit.NANOSECONDS.toSeconds(period)});
}

//创建模拟下载图片的任务
static class ImageTask implements Callable<String>{
//图片 ID
private int imageId;
public ImageTask(int val) {
this.imageId = val;
}

@Override
public String call() throws Exception {
return downlaodImage();
}
//下载图片并返回
private String downlaodImage() {
try {
System.out.println(Thread.currentThread().getName() + ": imageId-" + imageId + " is downloading");
if(flag) {
flag = false;
Thread.sleep(2000);//假装是耗时操作
} else {
flag = true;
Thread.sleep(50);//假装是耗时操作
}
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
return (Thread.currentThread().getName() + ": imageId-" + this.imageId);
}
}
}

执行结果如下:

可以看出,哪个任务先下载完图片,哪个就先打印出来,而不是按顺序打印。这种方式在安卓 ListView 中,加载图片时很合适,谁先下载完立刻就显示出来,能使用户获得一个更加动态和更高响应性的用户界面体验。

同时,多个 ExecutorCompletionService 可以共享一个 Executor,各自执行任务互不影响。这个框架其实封装了一个 BlockingQueue, 只要执行完毕就把结果往队列放,Executor@take() 实际上就是向这队列拿结果,所以先执行出结果就先打印出来,没有结果就一直阻塞等待。

ScheduledThreadPoolExecutor 示例

这个实现类用于延迟任务和周期任务的执行。通常情况下,已经有 Timer 定时器可以负责这类任务。但是 Timer 存在一些缺陷,所以很多时候应该考虑用这个实现类来执行。
Timer 的缺陷有:

1.Timer执行所有定时任务时,只创建一个线程。某个任务执行时间过长,会影响 TimerTask 的精确性。

2.Timer 线程并不捕获异常,当抛出未检查的异常时,将终止定时线程。 新的 TimerTask 也将不再执行。

SchuduledThreadPoolExecutor 对这个问题进行了妥善处理,不会影响其他任务的执行。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class ThreadExecutor{
//线程数目
private static final int THREAD_SIZE = 3;
//记录任务提交的时间
private static long start = 0;
public static void main(String[] args) {
//创建固定长度的线程池
ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(THREAD_SIZE);
//记录开始时间
start = System.nanoTime();
//延迟启动两个任务
executor.schedule(new ImageTask(1), 2000, TimeUnit.MILLISECONDS);
executor.schedule(new ImageTask(2), 1000, TimeUnit.MILLISECONDS);
//延迟并周期执行一个任务
executor.scheduleAtFixedRate(new ImageTask(3), 1000, 5000, TimeUnit.MILLISECONDS);
}
//时间转化,将长整形的时间间隔转化为秒
static String formatTime(long period) {
return String.format(
"%d", new Object[] {
TimeUnit.NANOSECONDS.toSeconds(period)});
}

//创建模拟下载图片的任务
static class ImageTask implements Runnable{
//图片 ID
private int imageId;
public ImageTask(int val) {
this.imageId = val;
}
//下载图片并返回
private String downlaodImage() {
long period = System.nanoTime() - start;
System.out.println(Thread.currentThread().getName() + ": imageId-" + imageId + " is downloading at " + formatTime(period) + "s");
return (Thread.currentThread().getName() + ": imageId-" + this.imageId);
}
@Override
public void run() {
// TODO Auto-generated method stub
downlaodImage();
}
}
}

执行结果:

从执行结果看,image3 和 image2 的下载任务都在 1s 后才执行。image1 在延时 2s 后执行。其中,image3 每隔 5s 执行一次。这就是SchuduledThreadPoolExecutor的延迟执行和周期执行用法。

本文完结。

坚持原创技术分享,您的支持将鼓励我继续创作!