基于生产-消费者模式的任务异步线程池设计与实现
有些童鞋可能会说,,干嘛还自己实现啊?这里,我就先简单说一下背景情况和设计的思路先。
1. JDK的ExecutorService中的线程池只是提供了一些基础的实现,进入线程池的任务一般有两种行为:阻塞或者激活新的线程,前者是对于fixedThreadPool而言,而后者是对于cachedTreadPool。而项目需要的是一个具有伸缩性的。这里包括两个方面,一个是伸缩性的工作线程池(可以根据情况对线程池进行自我调节),二是伸缩性的任务队列(具有固定的大小,多余的任务会被暂时转储,而队列空闲至一个阈值时,从恢复转储的任务)。Commons-pool其实实现了第一个需求,但是它在设计上是存在一些问题,并不太适合用于线程池的管理(改篇可以再行讨论)。
2. 对于任务队列好,对于工作线程好,一个具有良好设计组件的前提是还有这么几个需求的:它自身是可以被Audit,也就是它的性能,实际工作质量是可以被检查和评估的;它的行为是可以被扩展的;它必须是健壮的并且可控的。所以,除了生产-消费者的线程池外,还必须有一些管理线程,它们必须能够良好地反馈和控制线程池;其次对于整个组件的活动必须定义一套事件以及事件监听机制,其目标一是能做到组件状态的自省,二是能提供客户端的扩展(比如实现动态任务链等)。
好了,谈了背景,先简单减少一下组件中几个主要角色的构成吧(因为整个组件约有60个类,所以不可能全部说明和贴出来):
1. WorkEngine: 除了继承Switchable这一一个控制开关外,它包括三个主要组成部分和三个扩展部分。
三个组成部分也就是组件的核心:任务队列、工作线程代理、任务结果处理队列。
三个扩展部分分别是:配置、持久化接口以及控制钩子接口。
Java代码
public interface Switchable {
void cancelWork() ;
String getId() ;
boolean isStartForWork() ;
void startWork() ;
void stopWork() ;
}
public interface WorkEngine extends Switchable {
void addControlHook(Switchable hook) ;
WorkConfiguration getConfiguration() ;
Persistence getPersistence() ;
TaskReportQueue getReportQueue() ;
TaskQueue getTaskQueue() ;
WorkerBroker getWorkerBroker() ;
}
2. 下面对三个主要的部件接口简单说明下:
首先是任务队列TaskQueue,相对于传统的队列,增加了事件监听器和任务优先级重处理。
Java代码
public interface TaskQueue extends Iterable<Task> {
void addEventListener(TaskEventListener listener) ;
/**
* add a new task to tail of queue
* ***@param task
*/
void addTask(Task task) ;
/**
* check whether existing task in queue.
* ***@return
*/
boolean existTask() ;
/**
* sort tasks in queue according to priority
*/
void sequence() ;
/**
* remove the task at the head of queue
*/
Task removeTask() ;
java线程池 来自淘豆网m.daumloan.com转载请标明出处.