继“多线程程序模型研究”文章发布后,最近又继续研究,推出一个比较复杂但功能比较完善,而且可以直接套用的线程池的实例,希望对使用多线程设计的读者有所帮助。
该实例来源于apache项目源代码,源程序有800余行,功能比较全面,而且是非常完善的,并且运行于诸多服务器如tomcat上,就是分析起来有点繁琐。如果开发人员直接把这段程序拿来修改后使用到自己的开发项目中,不失为拿来主义的上策。
本文对该源程序进行了修改和简化,对其中核心部分进行分析,然后创建测试类进行测试。读者学习之后可以直接模仿和套用,而不需要花费大量时间自己亲自再去写线程池程序了。
关键的程序有2个:线程池类(threadpool)和线程池接口(threadpoolrunnable)。
首先看较复杂的线程池类程序,文件名为threadpool.java,有200余行,需要读者有耐心,内容如下:
---------------------------------------------------------------------------------------
import java.util.vector;
public class threadpool {
public static final int max_threads = 100;
public static final int max_spare_threads = 50;
public static final int min_spare_threads = 10;
public static final int work_wait_timeout = 60 * 1000;
protected vector pool;
protected monitorrunnable monitor;
protected int maxthreads;
protected int minsparethreads;
protected int maxsparethreads;
protected int currentthreadcount;
protected int currentthreadsbusy;
protected boolean stopthepool;
public threadpool() {
maxthreads = max_threads;
maxsparethreads = max_spare_threads;
minsparethreads = min_spare_threads;
currentthreadcount = 0;
currentthreadsbusy = 0;
stopthepool = false;
}
public synchronized void start() {
adjustlimits();
openthreads(minsparethreads);
monitor = new monitorrunnable(this);
}
public void setmaxthreads(int maxthreads) {
this.maxthreads = maxthreads;
}
public int getmaxthreads() {
return maxthreads;
}
public void setminsparethreads(int minsparethreads) {
this.minsparethreads = minsparethreads;
}
public int getminsparethreads() {
return minsparethreads;
}
public void setmaxsparethreads(int maxsparethreads) {
this.maxsparethreads = maxsparethreads;
}
public int getmaxsparethreads() {
return maxsparethreads;
}
public void runit(threadpoolrunnable r) {
if (null == r) {
throw new nullpointerexception();
}
if (0 == currentthreadcount || stopthepool) {
throw new illegalstateexception();
}
controlrunnable c = null;
synchronized (this) {
if (currentthreadsbusy == currentthreadcount) {
if (currentthreadcount < maxthreads) {
int toopen = currentthreadcount + minsparethreads;
openthreads(toopen);
} else {
while (currentthreadsbusy == currentthreadcount) {
try {
this.wait();
}catch (interruptedexception e) {
}
if (0 == currentthreadcount || stopthepool) {
throw new illegalstateexception();
}
}
}
}
c = (controlrunnable) pool.lastelement();
pool.removeelement(c);
currentthreadsbusy++;
}
c.runit(r);
}
public synchronized void shutdown() {
if (!stopthepool) {
stopthepool = true;
monitor.terminate();
monitor = null;
for (int i = 0; i < (currentthreadcount - currentthreadsbusy); i++) {
try {
((controlrunnable) (pool.elementat(i))).terminate();
} catch (throwable t) {
}
}
currentthreadsbusy = currentthreadcount = 0;
pool = null;
notifyall();
}
}
protected synchronized void checksparecontrollers() {
if (stopthepool) {
return;
}
if ((currentthreadcount - currentthreadsbusy) > maxsparethreads) {
int tofree = currentthreadcount - currentthreadsbusy - maxsparethreads;
for (int i = 0; i < tofree; i++) {
controlrunnable c = (controlrunnable) pool.firstelement();
pool.removeelement(c);
c.terminate();
currentthreadcount--;
}
}
}
protected synchronized void returncontroller(controlrunnable c) {
if (0 == currentthreadcount || stopthepool) {
c.terminate();
return;
}
currentthreadsbusy--;
pool.addelement(c);
notify();
}
protected synchronized void notifythreadend() {
currentthreadsbusy--;
currentthreadcount--;
notify();
openthreads(minsparethreads);
}
protected void adjustlimits() {
if (maxthreads <= 0) {
maxthreads = max_threads;
}
if (maxsparethreads >= maxthreads) {
maxsparethreads = maxthreads;
}
if (maxsparethreads <= 0) {
if (1 == maxthreads) {
maxsparethreads = 1;
} else {
maxsparethreads = maxthreads / 2;
}
}
if (minsparethreads > maxsparethreads) {
minsparethreads = maxsparethreads;
}
if (minsparethreads <= 0) {
if (1 == maxsparethreads) {
minsparethreads = 1;
} else {
minsparethreads = maxsparethreads / 2;
}
}
}
protected void openthreads(int toopen) {
if (toopen > maxthreads) {
toopen = maxthreads;
}
if (0 == currentthreadcount) {
pool = new vector(toopen);
}
for (int i = currentthreadcount; i < toopen; i++) {
pool.addelement(new controlrunnable(this));
}
currentthreadcount = toopen;
}
class monitorrunnable implements runnable {
threadpool p;
thread t;
boolean shouldterminate;
monitorrunnable(threadpool p) {
shouldterminate = false;
this.p = p;
t = new thread(this);
t.start();
}
public void run() {
while (true) {
try {
synchronized (this) {
this.wait(work_wait_timeout);
}
if (shouldterminate) {
break;
}
p.checksparecontrollers();
} catch (throwable t) {
t.printstacktrace();
}
}
}
public synchronized void terminate() {
shouldterminate = true;
this.notify();
}
}
class controlrunnable implements runnable {
threadpool p;
thread t;
threadpoolrunnable torun;
boolean shouldterminate;
boolean shouldrun;
boolean nothdata;
object thdata[] = null;
controlrunnable(threadpool p) {
torun = null;
shouldterminate = false;
shouldrun = false;
this.p = p;
t = new thread(this);
t.start();
nothdata = true;
thdata = null;
}
public void run() {
while (true) {
try {
synchronized (this) {
if (!shouldrun && !shouldterminate) {
this.wait();
}
}
if (shouldterminate) {
break;
}
try {
if (nothdata) {
thdata = torun.getinitdata();
nothdata = false;
}
if (shouldrun) {
torun.runit(thdata);
}
} catch (throwable t) {
system.err.println("controlrunnable throwable: ");
t.printstacktrace();
shouldterminate = true;
shouldrun = false;
p.notifythreadend();
} finally {
if (shouldrun) {
shouldrun = false;
p.returncontroller(this);
}
}
if (shouldterminate) {
break;
}
} catch (interruptedexception ie) {
}
}
}
public synchronized void runit(threadpoolrunnable torun) {
if (torun == null) {
throw new nullpointerexception("no runnable");
}
this.torun = torun;
shouldrun = true;
this.notify();
}
public synchronized void terminate() {
shouldterminate = true;
this.notify();
}
}
}
---------------------------------------------------------------------------------------
以上程序中,关键的是openthreads方法、runit方法以及2个内部类:monitorrunnable和controlrunnable。
刚开始运行的时候,线程池会往vector对象里装入minsparethreads个元素,每个元素都是controlrunnable线程类,controlrunnable类在其构造方法中启动线程。如果shouldrun和shouldterminate都是false的话,线程就等待。如果shouldrun为true,就调用threadpoolrunnable的runit(object[])方法,该接口的方法就是我们需要在自己的任务类中覆盖的方法。
如果minsparethreads个线程都处于busy后,线程池会再创建出minsparethreads个线程。monitorrunnable是用来监视线程池运行情况的,其线程间隔60秒(work_wait_timeout)调用一次线程池类的checksparecontrollers方法,如果发现(currentthreadcount - currentthreadsbusy) > maxsparethreads,就会调用controlrunnable类的terminate方法删除空闲线程,准备删除的线程是否空闲是通过shouldterminate参数来判断的。
线程池接口threadpoolrunnable有2个空方法getinitdata和runit,我们一般自己创建一个任务类实现这个线程池接口就可以了,把具体的任务内容放在任务类的runit方法中。如果不想用getinitdata,就让它返回null值。
线程池接口程序很简单,文件名为threadpoolrunnable.java,就几行,内容如下:
---------------------------------------------------------------------------------------------
public interface threadpoolrunnable {
public object[] getinitdata();
public void runit(object thdata[]);
}
---------------------------------------------------------------------------------------------
线程池类和线程池接口都已经说完,下面就举个例子说说怎么使用它们了。
我们的任务还是扫描端口(请参考我的“多线程程序模型研究”),文件名为testthreadpool.java,内容如下:
---------------------------------------------------------------------------------------------
import java.net.inetaddress;
import java.net.socket;
public class testthreadpool {
public static void main(string[] args) {
string host = null; //第一个参数,目标主机。
int beginport = 1; //第二个参数,开始端口。
int endport = 65535; //第三个参数,结束端口。
try{
host = args[0];
beginport = integer.parseint(args[1]);
endport = integer.parseint(args[2]);
if(beginport <= 0 || endport >= 65536 || beginport > endport){
throw new exception("port is illegal");
}
}catch(exception e){
system.out.println("usage: java portscannersinglethread host beginport endport");
system.exi
闽公网安备 35060202000074号