jdk线程池的分为两类:ExecutorService和ScheduledExecutorService

jdk对应的实现类为ThreadPoolExecutor和ScheduledThreadPoolExecutor

ThreadPoolExecutor组成

1
2
3
4
5
6
7
8
new ThreadPoolExecutor(int corePoolSize, int maxinumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler);
//corePoolSize --核心线程数,线程池中创建的线程数小于核心线程数时,线程空闲也不会被回收;大于核心线程数时,线程空闲时间超过keepAliveTime个unit单位,线程就会被回收
//maxinumPoolSize --最大线程数,线程池中所能创建的最大的线程个数,超过将执行handler里面的拒绝策略.
//keepAliveTime --线程最大空闲时间,作用在上面已说明
//unit --时间单位,作用在上面已说明
//workQueue --工作队列,超过核心线程数时,任务会放在工作队列中.工作队列满了,才会再创建最大线程内的线程.
//threadFactory --线程工厂,实现创建线程的接口(有默认实现:Executors.defaultThreadFactory())
//handler --拒绝策略,实现拒绝任务的接口(有默认实现: defaultHandler)

三种工厂方法

1
2
3
4
5
6
//适用场景:任务数总量固定,每个任务执行周期较长
ExecutorService fixedPool = Executors.newFixedThreadPool(8);
//适用场景:任务数总量不固定,每个任务执行周期较短
ExecutorService cachedPool = Executors.newCachedThreadPool();
//适用场景:需要依次执行任务
ExecutorService singlePool = Executors.newSingleThreadExecutor();

ExecutorService的主要方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
//执行Runnable类型任务
execute(Runnable task);
//执行Runnable类型任务,并返回future,可通过future.get(),阻塞获取执行任务结果
Future<T> submit(Runnable task);
//执行Callable类型任务,并返回future,可通过future.get(),阻塞获取执行任务结果
Future<T> submit(Callable<T> task);
//执行批量Callable类型的任务,并返回批量执行后的结果.
List<Future<T>> invokeAll(List<Callable<T>> taskList);
//执行批量Callable类型的任务,并返回批量执行后的结果.如果执行了timeout个unit单位的还没有结束,线程池也可以直接返回future
List<Future<T>> invokeAll(List<Callable<T>> taskList, long timeout, TimeUnit unit);
//执行批量Callable类型的任务,并返回其中一个执行最快任务的结果,返回后其他任务将不会在执行
T invokeAny(ListCallable<T> taskList);
//执行批量Callable类型的任务,并返回其中一个执行最快任务的结果,如果在timeout个unit单位还没有结束,线程池将抛出timeoutException异常
T invokeAny(ListCallable<T> taskList, long timeout, TimeUnit unit);
//将线程池关闭.如果线程池中还有任务在执行,线程不会马上关闭,会等待所有任务执行完成再关闭.同时该期间内,线程池不会在接受新任务.
shutdown();

示例代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
package per.guc.gucproject.test;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.*;

public class ThreadPool {

public static void main(String[] args) {
//适用场景:任务数总量固定,每个任务执行周期较长
ExecutorService fixedPool = Executors.newFixedThreadPool(8);
//适用场景:任务数总量不固定,每个任务执行周期较短
ExecutorService cachedPool = Executors.newCachedThreadPool();
//适用场景:需要依次执行任务
ExecutorService singlePool = Executors.newSingleThreadExecutor();

Runnable runTask = new Runnable() {
@Override
public void run() {
System.out.println("run...");
}
};

//void runnable
fixedPool.execute(runTask);
//result runnable
Future<?> submit = fixedPool.submit(runTask);

Callable callTask = new Callable<Object>() {
@Override
public Object call() throws Exception {
System.out.println("call...");
return "1";
}
};
//result callable
Future submit1 = fixedPool.submit(callTask);


List<Callable<Object>> callableList = new ArrayList<>();
//result-List callable-List
try {
List<Future<Object>> futures = fixedPool.invokeAll(callableList);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
//result-List callable-List timeout-value timeout-unit
try {
List<Future<Object>> futures = fixedPool.invokeAll(callableList, 1000, TimeUnit.SECONDS);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}

//result-T callable-anyone complete return
try {
Object o = fixedPool.invokeAny(callableList);
} catch (InterruptedException e) {
throw new RuntimeException(e);
} catch (ExecutionException e) {
throw new RuntimeException(e);
}

//result-T callable-anyone complete or timeoutException return
try {
Object o1 = fixedPool.invokeAny(callableList, 1000, TimeUnit.SECONDS);
} catch (InterruptedException e) {
throw new RuntimeException(e);
} catch (ExecutionException e) {
throw new RuntimeException(e);
} catch (TimeoutException e) {
throw new RuntimeException(e);
}

fixedPool.shutdown();


try {
Object result = submit.get();
System.out.println(result);

Object result1 = submit1.get();
System.out.println(result1);

} catch (InterruptedException e) {
throw new RuntimeException(e);
} catch (ExecutionException e) {
throw new RuntimeException(e);
}
}
}

ScheduledThreadPoolExecutor组成

1
2
3
4
public ScheduledThreadPoolExecutor(int corePoolSize, ThreadFactory threadFactory, RejectedExecutionHandler handler);
//corePoolSize 核心线程数,同上面的ThreadPoolExecutor的corePoolSize的注释
//threadFactory, 同上
//handler, 同上

四种工厂方法

1
2
3
4
5
6
7
##重要 ScheduledExecutorService和ExecutorService主要区别在于阻塞队列不同,ScheduledExecutorService内部是用DelayedWorkQueue的实现队列
//定义核心线程数
ScheduledExecutorService ses1 = Executors.newScheduledThreadPool(8);
ScheduledExecutorService ses2 = Executors.newScheduledThreadPool(8, Executors.defaultThreadFactory());
//需要按顺序执行的定时任务线程池
ScheduledExecutorService ses3 = Executors.newSingleThreadScheduledExecutor();
ScheduledExecutorService ses4 = Executors.newSingleThreadScheduledExecutor(Executors.defaultThreadFactory());

ScheduledExecutorService 主要方法

1
2
3
4
5
6
7
8
//延迟delay个unit单位时间,去执行runnable类型的任务,返回ScheduledFuture
ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit);
//延迟delay个unit单位时间,去执行callable类型的任务,返回ScheduledFuture并带结果值
ScheduledFuture<T> schedule(Callable command, long delay, TimeUnit unit);
//按period个unit单位定时去执行runnable类型的任务,period时间周期从开始执行commond的时间计算,如果任务执行时间超过period且线程池中也没有空闲的线程,则会延迟任务时间后去执行runnable类型的任务.
ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit);
//按period个unit单位定时去执行runnable类型的任务,period时间周期从结束执行commond的时间计算.
ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long period, TimeUnit);

示例代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
package per.guc.gucproject.test;

import lombok.extern.slf4j.Slf4j;

import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;

@Slf4j
public class ScheduledExecutorTest {


public static void main(String[] args) {
ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();

scheduledExecutorService.schedule(()->{
log.debug("rate task...");
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}, 1000, TimeUnit.SECONDS);
scheduledExecutorService.schedule(()->{
log.debug("rate task...");
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}, 1000, TimeUnit.SECONDS);

scheduledExecutorService.scheduleAtFixedRate(()->{
log.debug("rate task...");
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}, 1000, 1000, TimeUnit.MILLISECONDS);


scheduledExecutorService.scheduleWithFixedDelay(()->{
log.debug("delay task...");
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}, 1000, 1000, TimeUnit.MILLISECONDS);

}
}

手写线程池代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
package per.guc.gucproject.test;


import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.BlockingDeque;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

@Slf4j
public class MyThreadPoolTest {
public static void main(String[] args) {
MyThreadPool myThreadPool = new MyThreadPool(2, 2);
myThreadPool.execute(()->{
log.debug("task1 run...");
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
log.debug("task1 finished...");
});
myThreadPool.execute(()->{
log.debug("task2 run...");
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
log.debug("task2 finished...");
});
myThreadPool.execute(()->{
log.debug("task3 run...");
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
log.debug("task3 finished...");
});
myThreadPool.execute(()->{
log.debug("task4 run...");
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
log.debug("task4 finished...");
});
myThreadPool.execute(()->{
log.debug("task5 run...");
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
log.debug("task5 finished...");
});
}
}

class MyThreadPool{

//coreThreadSize
private int coreSize;

//blockQueue;
private MyBlockQueue myBlockQueue;

//计数器
private AtomicInteger atomicInteger;

public MyThreadPool(int coreSize, int blockQueueSize) {
this.coreSize = coreSize;
this.myBlockQueue = new MyBlockQueue(blockQueueSize);
this.atomicInteger = new AtomicInteger(1);
}

public void execute(Runnable runnable){
int threadSize = atomicInteger.get();
if(threadSize > coreSize){
myBlockQueue.put(runnable);
return;
}
if(atomicInteger.compareAndSet(threadSize, threadSize+1)) {

int threadName = threadSize;
Worker worker = new Worker(runnable);
Thread thread = new Thread(worker, "t"+ threadName);
thread.start();

}
}

class Worker implements Runnable{
private Runnable runnable;

public Worker(Runnable runnable) {
this.runnable = runnable;
}

@Override
public void run() {
while(runnable != null || (runnable = myBlockQueue.poll()) != null){
runnable.run();
runnable = null;
}
}
}

}

@Slf4j
class MyBlockQueue{
//队列容量
int capacity;

BlockingDeque<Runnable> linkedQueue;

public MyBlockQueue(int capacity) {
this.capacity = capacity;
this.linkedQueue = new LinkedBlockingDeque<>();
this.lock = new ReentrantLock();
this.emptyWait = lock.newCondition();
this.fullWait = lock.newCondition();
}

Lock lock;

Condition emptyWait;

Condition fullWait;

public Runnable poll(){
lock.lock();
try{
if(linkedQueue.size() == 0){
try {
log.debug("任务获取进入阻塞等待");
emptyWait.await();
log.debug("任务获取被唤醒");
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
log.debug("获取到队列头任务");
Runnable task = linkedQueue.removeFirst();
fullWait.signalAll();
return task;
}finally {
lock.unlock();
}
}

public void put(Runnable task){
lock.lock();
try{
if(linkedQueue.size() == capacity){
try {
log.debug("任务存放进入阻塞等待");
fullWait.await();
log.debug("任务存放被唤醒");
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
log.debug("存放到队列尾任务");
linkedQueue.addLast(task);
emptyWait.signalAll();
}finally {
lock.unlock();
}
}


}

代码分析

核心代码分析,新建worker任务类,重写run方法,第一个任务是对象构建的时候传入的任务,后面的任务是从阻塞队列取出的.如果阻塞队列没有任务了,线程会阻塞在阻塞队列获取任务的await方法上.

延迟线程池使用的队列是DelayedWorkQueue,内部属性queue采用的是RunnableScheduledFuture[]数组,该接口继承的是Delayed接口.

可以采用DelayQueue来实现任务的延迟执行,内部实现了PriorityQueue,一个优先级队列.放入的任务需要实现Delay接口的,内部会根据任务的优先级进行排序.

参考博客:

https://biteeniu.github.io/java/java-delay-queue/