服务热线:13616026886

技术文档 欢迎使用技术文档,我们为你提供从新手到专业开发者的所有资源,你也可以通过它日益精进

位置:首页 > 技术文档 > JAVA > 新手入门 > 基础入门 > 查看文档

一个虽然复杂但可直接套用的线程池实例

继“多线程程序模型研究”文章发布后,最近又继续研究,推出一个比较复杂但功能比较完善,而且可以直接套用的线程池的实例,希望对使用多线程设计的读者有所帮助。

该实例来源于apache项目源代码,源程序有800余行,功能比较全面,而且是非常完善的,并且运行于诸多服务器如tomcat上,就是分析起来有点繁琐。如果开发人员直接把这段程序拿来修改后使用到自己的开发项目中,不失为拿来主义的上策。

本文对该源程序进行了修改和简化,对其中核心部分进行分析,然后创建测试类进行测试。读者学习之后可以直接模仿和套用,而不需要花费大量时间自己亲自再去写线程池程序了。

关键的程序有2个:线程池类(threadpool)和线程池接口(threadpoolrunnable)。

首先看较复杂的线程池类程序,文件名为threadpool.java,有200余行,需要读者有耐心,内容如下:
---------------------------------------------------------------------------------------
import java.util.vector;

public class threadpool {
    public static final int max_threads = 100;
    public static final int max_spare_threads = 50;
    public static final int min_spare_threads = 10;
    public static final int work_wait_timeout = 60 * 1000;
    
    protected vector pool;
    protected monitorrunnable monitor;
    protected int maxthreads;
    protected int minsparethreads;
    protected int maxsparethreads;
    protected int currentthreadcount;
    protected int currentthreadsbusy;
    protected boolean stopthepool;

    public threadpool() {
        maxthreads = max_threads;
        maxsparethreads = max_spare_threads;
        minsparethreads = min_spare_threads;
        currentthreadcount = 0;
        currentthreadsbusy = 0;
        stopthepool = false;
    }

    public synchronized void start() {
        adjustlimits();
        openthreads(minsparethreads);
        monitor = new monitorrunnable(this);
    }

    public void setmaxthreads(int maxthreads) {
        this.maxthreads = maxthreads;
    }

    public int getmaxthreads() {
        return maxthreads;
    }

    public void setminsparethreads(int minsparethreads) {
        this.minsparethreads = minsparethreads;
    }

    public int getminsparethreads() {
        return minsparethreads;
    }

    public void setmaxsparethreads(int maxsparethreads) {
        this.maxsparethreads = maxsparethreads;
    }

    public int getmaxsparethreads() {
        return maxsparethreads;
    }
    
    public void runit(threadpoolrunnable r) {
        if (null == r) {
            throw new nullpointerexception();
        }
        if (0 == currentthreadcount || stopthepool) {
            throw new illegalstateexception();
        }
        controlrunnable c = null;
        synchronized (this) {
            if (currentthreadsbusy == currentthreadcount) {
                if (currentthreadcount < maxthreads) {
                    int toopen = currentthreadcount + minsparethreads;
                    openthreads(toopen);
                } else {
                    while (currentthreadsbusy == currentthreadcount) {
                        try {
                            this.wait();
                        }catch (interruptedexception e) {
                        }
                        if (0 == currentthreadcount || stopthepool) {
                            throw new illegalstateexception();
                        }
                    }
                }
            }
            c = (controlrunnable) pool.lastelement();
            pool.removeelement(c);
            currentthreadsbusy++;
        }
        c.runit(r);
    }
   
    public synchronized void shutdown() {
        if (!stopthepool) {
            stopthepool = true;
            monitor.terminate();
            monitor = null;
            for (int i = 0; i < (currentthreadcount - currentthreadsbusy); i++) {
                try {
                    ((controlrunnable) (pool.elementat(i))).terminate();
                } catch (throwable t) {
                }
            }
            currentthreadsbusy = currentthreadcount = 0;
            pool = null;
            notifyall();
        }
    }
    
    protected synchronized void checksparecontrollers() {
        if (stopthepool) {
            return;
        }
        
        if ((currentthreadcount - currentthreadsbusy) > maxsparethreads) {
            int tofree = currentthreadcount - currentthreadsbusy - maxsparethreads;
            for (int i = 0; i < tofree; i++) {
                controlrunnable c = (controlrunnable) pool.firstelement();
                pool.removeelement(c);
                c.terminate();
                currentthreadcount--;
            }
        }
    }
    
    protected synchronized void returncontroller(controlrunnable c) {
        if (0 == currentthreadcount || stopthepool) {
            c.terminate();
            return;
        }
        currentthreadsbusy--;
        
        pool.addelement(c);
        notify();
    }
    
    protected synchronized void notifythreadend() {
        currentthreadsbusy--;
        currentthreadcount--;
        notify();
        openthreads(minsparethreads);
    }
    
    protected void adjustlimits() {
        if (maxthreads <= 0) {
            maxthreads = max_threads;
        }
        if (maxsparethreads >= maxthreads) {
            maxsparethreads = maxthreads;
        }
        if (maxsparethreads <= 0) {
            if (1 == maxthreads) {
                maxsparethreads = 1;
            } else {
                maxsparethreads = maxthreads / 2;
            }
        }
        if (minsparethreads > maxsparethreads) {
            minsparethreads = maxsparethreads;
        }
        if (minsparethreads <= 0) {
            if (1 == maxsparethreads) {
                minsparethreads = 1;
            } else {
                minsparethreads = maxsparethreads / 2;
            }
        }
    }

    protected void openthreads(int toopen) {
        if (toopen > maxthreads) {
            toopen = maxthreads;
        }
        if (0 == currentthreadcount) {
            pool = new vector(toopen);
        }
        for (int i = currentthreadcount; i < toopen; i++) {
            pool.addelement(new controlrunnable(this));
        }
        currentthreadcount = toopen;
    }
    
    class monitorrunnable implements runnable {
        threadpool p;
        thread t;
        boolean shouldterminate;
        monitorrunnable(threadpool p) {
            shouldterminate = false;
            this.p = p;
            t = new thread(this);
            t.start();
        }
        public void run() {
            while (true) {
                try {
                    synchronized (this) {
                        this.wait(work_wait_timeout);
                    }
                    if (shouldterminate) {
                        break;
                    }
                    p.checksparecontrollers();
                } catch (throwable t) {
                    t.printstacktrace();
                }
            }
        }
        
        public synchronized void terminate() {
            shouldterminate = true;
            this.notify();
        }
    }
    
    class controlrunnable implements runnable {
        threadpool p;
        thread t;
        threadpoolrunnable torun;
        boolean shouldterminate;
        boolean shouldrun;
        boolean nothdata;
        object thdata[] = null;

        controlrunnable(threadpool p) {
            torun = null;
            shouldterminate = false;
            shouldrun = false;
            this.p = p;
            t = new thread(this);
            t.start();
            nothdata = true;
            thdata = null;
        }

        public void run() {
            while (true) {
                try {
                    synchronized (this) {
                        if (!shouldrun && !shouldterminate) {
                            this.wait();
                        }
                    }
                    if (shouldterminate) {
                        break;
                    }
                    try {
                        if (nothdata) {
                            thdata = torun.getinitdata();
                            nothdata = false;
                        }
                        if (shouldrun) {
                            torun.runit(thdata);
                        }
                    } catch (throwable t) {
                        system.err.println("controlrunnable throwable: ");
                        t.printstacktrace();
                        shouldterminate = true;
                        shouldrun = false;
                        p.notifythreadend();
                    } finally {
                        if (shouldrun) { 
                            shouldrun = false;
                            p.returncontroller(this);
                        }
                    }
                    if (shouldterminate) {
                        break;
                    }
                } catch (interruptedexception ie) {
                }
            }
        }

        public synchronized void runit(threadpoolrunnable torun) {
            if (torun == null) {
                throw new nullpointerexception("no runnable");
            }
            this.torun = torun;
            shouldrun = true;
            this.notify();
        }

        public synchronized void terminate() {
            shouldterminate = true;
            this.notify();
        }
    }
}
---------------------------------------------------------------------------------------
以上程序中,关键的是openthreads方法、runit方法以及2个内部类:monitorrunnable和controlrunnable。

刚开始运行的时候,线程池会往vector对象里装入minsparethreads个元素,每个元素都是controlrunnable线程类,controlrunnable类在其构造方法中启动线程。如果shouldrun和shouldterminate都是false的话,线程就等待。如果shouldrun为true,就调用threadpoolrunnable的runit(object[])方法,该接口的方法就是我们需要在自己的任务类中覆盖的方法。

如果minsparethreads个线程都处于busy后,线程池会再创建出minsparethreads个线程。monitorrunnable是用来监视线程池运行情况的,其线程间隔60秒(work_wait_timeout)调用一次线程池类的checksparecontrollers方法,如果发现(currentthreadcount - currentthreadsbusy) > maxsparethreads,就会调用controlrunnable类的terminate方法删除空闲线程,准备删除的线程是否空闲是通过shouldterminate参数来判断的。

线程池接口threadpoolrunnable有2个空方法getinitdata和runit,我们一般自己创建一个任务类实现这个线程池接口就可以了,把具体的任务内容放在任务类的runit方法中。如果不想用getinitdata,就让它返回null值。

线程池接口程序很简单,文件名为threadpoolrunnable.java,就几行,内容如下:
---------------------------------------------------------------------------------------------
public interface threadpoolrunnable {
    public object[] getinitdata();
    
    public void runit(object thdata[]);

}
---------------------------------------------------------------------------------------------

线程池类和线程池接口都已经说完,下面就举个例子说说怎么使用它们了。

我们的任务还是扫描端口(请参考我的“多线程程序模型研究”),文件名为testthreadpool.java,内容如下:
---------------------------------------------------------------------------------------------
import java.net.inetaddress;
import java.net.socket;

public class testthreadpool {

    public static void main(string[] args) {
        string host = null;        //第一个参数,目标主机。
        int beginport = 1;         //第二个参数,开始端口。
        int endport = 65535;       //第三个参数,结束端口。
        try{
            host = args[0];
            beginport = integer.parseint(args[1]);
            endport = integer.parseint(args[2]);
            if(beginport <= 0 || endport >= 65536 || beginport > endport){
                throw new exception("port is illegal");
            }
        }catch(exception e){
            system.out.println("usage: java portscannersinglethread host beginport endport");
            system.exi

扫描关注微信公众号