| |
前面谈了多线程应用程序能极大地改善用户相应。例如对于一个web应用程序,每当一个用户请求服务器连接时,服务器就可以启动一个新线程为用户服务。 然而,创建和销毁线程本身就有一定的开销,如果频繁创建和销毁线程,cpu和内存开销就不可忽略,垃圾收集器还必须负担更多的工作。因此,线程池就是为了避免频繁创建和销毁线程。 每当服务器接受了一个新的请求后,服务器就从线程池中挑选一个等待的线程并执行请求处理。处理完毕后,线程并不结束,而是转为阻塞状态再次被放入线程池中。这样就避免了频繁创建和销毁线程。 worker pattern实现了类似线程池的功能。首先定义task接口: package com.crackj2ee.thread; public interface task { void execute(); } 线程将负责执行execute()方法。注意到任务是由子类通过实现execute()方法实现的,线程本身并不知道自己执行的任务。它只负责运行一个耗时的execute()方法。 具体任务由子类实现,我们定义了一个calculatetask和一个timertask: // calculatetask.java package com.crackj2ee.thread; public class calculatetask implements task { private static int count = 0; private int num = count; public calculatetask() { count++; } public void execute() { system.out.println("[calculatetask " + num + "] start..."); try { thread.sleep(3000); } catch(interruptedexception ie) {} system.out.println("[calculatetask " + num + "] done."); } } // timertask.java package com.crackj2ee.thread; public class timertask implements task { private static int count = 0; private int num = count; public timertask() { count++; } public void execute() { system.out.println("[timertask " + num + "] start..."); try { thread.sleep(2000); } catch(interruptedexception ie) {} system.out.println("[timertask " + num + "] done."); } } 以上任务均简单的sleep若干秒。 taskqueue实现了一个队列,客户端可以将请求放入队列,服务器线程可以从队列中取出任务: package com.crackj2ee.thread; import java.util.*; public class taskqueue { private list queue = new linkedlist(); public synchronized task gettask() { while(queue.size()==0) { try { this.wait(); } catch(interruptedexception ie) { return null; } } return (task)queue.remove(0); } public synchronized void puttask(task task) { queue.add(task); this.notifyall(); } } 终于到了真正的workerthread,这是真正执行任务的服务器线程: package com.crackj2ee.thread; public class workerthread extends thread { private static int count = 0; private boolean busy = false; private boolean stop = false; private taskqueue queue; public workerthread(threadgroup group, taskqueue queue) { super(group, "worker-" + count); count++; this.queue = queue; } public void shutdown() { stop = true; this.interrupt(); try { this.join(); } catch(interruptedexception ie) {} } public boolean isidle() { return !busy; } public void run() { system.out.println(getname() + " start."); while(!stop) { task task = queue.gettask(); if(task!=null) { busy = true; task.execute(); busy = false; } } system.out.println(getname() + " end."); } } 前面已经讲过,queue.gettask()是一个阻塞方法,服务器线程可能在此wait()一段时间。此外,workerthread还有一个shutdown方法,用于安全结束线程。 最后是threadpool,负责管理所有的服务器线程,还可以动态增加和减少线程数: package com.crackj2ee.thread; import java.util.*; public class threadpool extends threadgroup { private list threads = new linkedlist(); private taskqueue queue; public threadpool(taskqueue queue) { super("thread-pool"); this.queue = queue; } public synchronized void addworkerthread() { thread t = new workerthread(this, queue); threads.add(t); t.start(); } public synchronized void removeworkerthread() { if(threads.size()>0) { workerthread t = (workerthread)threads.remove(0); t.shutdown(); } } public synchronized void currentstatus() { system.out.println("-----------------------------------------------"); system.out.println("thread count = " + threads.size()); iterator it = threads.iterator(); while(it.hasnext()) { workerthread t = (workerthread)it.next(); system.out.println(t.getname() + ": " + (t.isidle() ? "idle" : "busy")); } system.out.println("-----------------------------------------------"); } } currentstatus()方法是为了方便调试,打印出所有线程的当前状态。 最后,main负责完成main()方法: package com.crackj2ee.thread; public class main { public static void main(string[] args) { taskqueue queue = new taskqueue(); threadpool pool = new threadpool(queue); for(int i=0; i<10; i++) { queue.puttask(new calculatetask()); queue.puttask(new timertask()); } pool.addworkerthread(); pool.addworkerthread(); dosleep(8000); pool.currentstatus(); pool.addworkerthread(); pool.addworkerthread(); pool.addworkerthread(); pool.addworkerthread(); pool.addworkerthread(); dosleep(5000); pool.currentstatus(); } private static void dosleep(long ms) { try { thread.sleep(ms); } catch(interruptedexception ie) {} } } main()一开始放入了20个task,然后动态添加了一些服务线程,并定期打印线程状态,运行结果如下: worker-0 start. [calculatetask 0] start... worker-1 start. [timertask 0] start... [timertask 0] done. [calculatetask 1] start... [calculatetask 0] done. [timertask 1] start... [calculatetask 1] done. [calculatetask 2] start... [timertask 1] done. [timertask 2] start... [timertask 2] done. [calculatetask 3] start... ----------------------------------------------- thread count = 2 worker-0: busy worker-1: busy ----------------------------------------------- [calculatetask 2] done. [timertask 3] start... worker-2 start. [calculatetask 4] start... worker-3 start. [timertask 4] start... worker-4 start. [calculatetask 5] start... worker-5 start. [timertask 5] start... worker-6 start. [calculatetask 6] start... [calculatetask 3] done. [timertask 6] start... [timertask 3] done. [calculatetask 7] start... [timertask 4] done. [timertask 7] start... [timertask 5] done. [calculatetask 8] start... [calculatetask 4] done. [timertask 8] start... [calculatetask 5] done. [calculatetask 9] start... [calculatetask 6] done. [timertask 9] start... [timertask 6] done. [timertask 7] done. ----------------------------------------------- thread count = 7 worker-0: idle worker-1: busy worker-2: busy worker-3: idle worker-4: busy worker-5: busy worker-6: busy ----------------------------------------------- [calculatetask 7] done. [calculatetask 8] done. [timertask 8] done. [timertask 9] done. [calculatetask 9] done. 仔细观察:一开始只有两个服务器线程,因此线程状态都是忙,后来线程数增多,
|
|