如何手写一个线程池?

手写一个异步工具类

于是我就到处看项目的源码,看看有没有什么能改进的?果然让我发现了。项目中到处充斥着 new Thread 类来异步执行代码的逻辑。

new Thread(r).start();

我们可以封装一个异步工具类啊!

第一版

说干就干,把上面的代码简单封装一下,一个简单的异步工具类就封装好了。

public interface Executor {

void execute(Runnable r);
}
public class AsyncExecutorV1 implements Executor {

@Override
public void execute(Runnable r) {
new Thread(r).start();
}
}

于是开开心心的提交了 merge request。

第二版

正当我满怀期待工具类代码能被合并的时候,没想代码被组长杰哥打回来了。

「杰哥」:有心封装工具类值得鼓励,不过还可以改进一下。

「小识」:还能再改进?没感觉我这个工具类还有改进的余地啊!

「杰哥」:假如说有10000个异步任务,你这创建10000个线程,资源耗费太严重了!

「小识」:这样啊,那我加个队列,任务都放到队列中,用一个线程从队列中取任务执行。

public class AsyncExecutorV2 implements Executor {

private BlockingQueue workQueue;

public AsyncExecutorV2(BlockingQueue workQueue) {
this.workQueue = workQueue;
WorkThread workThread = new WorkThread();
workThread.start();
}

@SneakyThrows
@Override
public void execute(Runnable r) {
workQueue.add(r);
}

class WorkThread extends Thread {

@Override
public void run() {
while (true) {
Runnable task = null;
try {
task = workQueue.take();
} catch (InterruptedException e) {
e.printStackTrace();
}
task.run();
}
}
}
}

第三版

「小识」:杰哥,快帮我看看,还有啥改进的没?

「杰哥」:小伙子不错啊,居然能想到用队列来缓冲任务,不愧是我招进来的人!但是用一个异步线程执行任务,你确定这个工具类比同步执行的效率快?

「小识」:哈哈,又一个工具类翻车的案例,应该多开几个异步线程来执行任务,但是应该开多少呢?

「杰哥」:谁最清楚异步工具类应该用多少个线程来执行呢?

「小识」:使用工具类的人。

「杰哥」:这不对了,你可以定义一个线程数量参数,让用户来决定开多少线程。「另外你这个工具类还个问题,队列满了会直接抛出异常!」

「小识」:那我增加一个拒绝策略类(RejectedExecutionHandler),当线程池满了让用户决定执行策略,比如直接抛异常,用当前线程同步执行任务。

public class AsyncExecutorV3 implements Executor {

private BlockingQueue workQueue;

private List workThreadList = new ArrayList<>();

private RejectedExecutionHandler handler;

public AsyncExecutorV3(int corePoolSize,
BlockingQueue workQueue,
RejectedExecutionHandler handler) {
this.workQueue = workQueue;
this.handler = handler;
for (int i = 0; i < corePoolSize; i++) {
WorkThread workThread = new WorkThread();
workThread.start();
workThreadList.add(workThread);
}
}

@SneakyThrows
@Override
public void execute(Runnable r) {
if (!workQueue.offer(r)) {
// 队列满了,执行拒绝策略
handler.rejectedExecution(r);
}
}

class WorkThread extends Thread {

@Override
public void run() {
while (true) {
Runnable task = null;
try {
task = workQueue.take();
} catch (InterruptedException e) {
e.printStackTrace();
}
task.run();
}
}
}
}
// 拒绝策略类
public interface RejectedExecutionHandler {

void rejectedExecution(Runnable r);
}
// 当线程池满了之后直接抛出异常
public class AbortPolicy implements RejectedExecutionHandler {

@Override
public void rejectedExecution(Runnable r) {
throw new RuntimeException("queue is full");
}
}
// 当线程池满了之后直接抛出异常
public class AbortPolicy implements RejectedExecutionHandler {

@Override
public void rejectedExecution(Runnable r) {
throw new RuntimeException("queue is full");
}
}
// 当线程池满了之后,用提交任务的线程同步执行任务
public class CallerRunsPolicy implements RejectedExecutionHandler {

@Override
public void rejectedExecution(Runnable r) {
r.run();
}
}

再次提交 merge request,终于被合并了,别的团队都开始使用我的工具类了!

过了几天小亮急匆匆找到我。

「小亮」:小识,你的工具类挺好用的。但是我最近遇到了一个问题,我用了CountDownLatch批量执行任务,但是我这个任务好像卡住了,我用jstack想看看线程的执行情况,快告诉我你异步线程的名字设置的是啥?

「小识」:哎呀,我们没设置线程的名字,应该用的是默认的线程名字 Thread-n。

「小亮」:你可得给工具类加个线程名字的参数啊,不然一个一个看线程的状态太累了,而且效率也不高。

「小识」:我这就加。

第四版

赶紧加了一个线程名字的参数,然后再次提交代码。

「杰哥」:哎呀,没想到我也疏忽了,没发现这个问题,确实应该加个线程名字的参数,代码的可扩展性太重要了,改来改去可不行。

「小识」:是啊!

「杰哥」:你觉得你只加一个线程名字参数,可扩展性高吗?如果有的团队想修改异步线程的优先级,你再加个优先级参数?

「小识」:感觉不太行,那让用户把线程传给我吧!

「杰哥」:哈哈,可以,你还可以用工厂模式优化一下,用户传入线程工厂类,工具类用工厂类创建线程。

「小识」:不愧是杰哥,这样一来代码更清爽了!

public class AsyncExecutorV4 implements Executor {

private BlockingQueue workQueue;

private List workThreadList = new ArrayList<>();

private RejectedExecutionHandler handler;

public AsyncExecutorV4(int corePoolSize,
BlockingQueue workQueue,
RejectedExecutionHandler handler,
ThreadFactory threadFactory) {
this.workQueue = workQueue;
this.handler = handler;
for (int i = 0; i < corePoolSize; i++) {
// 用工厂类创建线程
WorkThread workThread = threadFactory.newThread();
workThread.start();
workThreadList.add(workThread);
}
}

@SneakyThrows
@Override
public void execute(Runnable r) {
if (!workQueue.offer(r)) {
handler.rejectedExecution(r);
}
}

// 异步线程
public class WorkThread extends Thread {

@Override
public void run() {
while (true) {
Runnable task = null;
try {
task = workQueue.take();
} catch (InterruptedException e) {
e.printStackTrace();
}
task.run();
}
}
}

// 异步线程工厂类
public interface ThreadFactory {
WorkThread newThread();
}
}

代码提交之后,小亮给线程起了一个名字,async-thread,现在他通过名字很快就能知道线程池中的线程在干嘛了!

大家不断的进行改进

随着这个异步工具类在公司内部使用的越来越多,大家也提交了很多改进的代码。

  • 按需创建线程,不要一开始就创建「corePoolSize」个线程,而是在调用者提交任务的过程中逐渐创建出来,最后创建了「corePoolSize」个就不再创建了。
  • 提高工具的弹性,当任务突增时,队列会被放满,然后多余的任务有可能会被直接扔掉。当然我们可以把「corePoolSize」设的很大,但是这样并不优雅,因为大部分情况下是用不到这么多线程的。当任务突增时,我们可以适当增加线程,提高执行速度,当然创建的总线程数还是要限制一下的,我们把能创建的总数定为「maximumPoolSize」。
  • 及时关闭不需要的线程,当任务突增时,线程数可能增加「maximumPoolSize」,但是大多数时间「corePoolSize」个线程就足够用了,因此可以定义一个超时时间,当一个线程在「keepAliveTime」时间内没有执行任务,就把它给关掉。

异步工具类执行流程图

经过大家的不断改进之后,构造函数中的参数也越来越多了,杰哥让我写个文档吧,把这个异步工具类的构造函数和执行流程总结一下,不然新来的小伙伴看到这个工具类一脸懵可不行!

这个工具类的构造函数目前有如下7个参数

public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler)

参数

含义

corePoolSize

核心线程数

maximumPoolSize

最大线程数

keepAliveTime

非核心线程的空闲时间

TimeUnit

空闲时间的单位

BlockingQueue

任务队列

ThreadFactory

线程工厂

RejectedExecutionHandler

拒绝策略

「执行流程图如下」:

对了,最后大家给这个异步工具类起了一个牛的名字,「线程池」。

分享文章:如何手写一个线程池?
当前路径:http://www.mswzjz.cn/qtweb/news13/464063.html

攀枝花网站建设、攀枝花网站运维推广公司-贝锐智能,是专注品牌与效果的网络营销公司;服务项目有等

广告

声明:本网站发布的内容(图片、视频和文字)以用户投稿、用户转载内容为主,如果涉及侵权请尽快告知,我们将会在第一时间删除。文章观点不代表本网站立场,如需处理请联系客服。电话:028-86922220;邮箱:631063699@qq.com。内容未经允许不得转载,或转载时需注明来源: 贝锐智能