前面谈了多线程应用程序能极大地改善用户相应。例如对于一个web应用程序,每当一个用户请求服务器连接时,服务器就可以启动一个新线程为用户服务。
然而,创建和销毁线程本身就有一定的开销,如果频繁创建和销毁线程,cpu和内存开销就不可忽略,垃圾收集器还必须负担更多的工作。因此,线程池就是为了避免频繁创建和销毁线程。
每当服务器接受了一个新的请求后,服务器就从线程池中挑选一个等待的线程并执行请求处理。处理完毕后,线程并不结束,而是转为阻塞状态再次被放入线程池中。这样就避免了频繁创建和销毁线程。
worker pattern实现了类似线程池的功能。首先定义task接口:
package com.crackj2ee.thread;
public interface task {
void execute();
}
线程将负责执行execute()方法。注意到任务是由子类通过实现execute()方法实现的,线程本身并不知道自己执行的任务。它只负责运行一个耗时的execute()方法。
具体任务由子类实现,我们定义了一个calculatetask和一个timertask:
// calculatetask.java
package com.crackj2ee.thread;
public class calculatetask implements task {
private static int count = 0;
private int num = count;
public calculatetask() {
count++;
}
public void execute() {
system.out.println("[calculatetask " + num + "] start...");
try {
thread.sleep(3000);
}
catch(interruptedexception ie) {}
system.out.println("[calculatetask " + num + "] done.");
}
}
// timertask.java
package com.crackj2ee.thread;
public class timertask implements task {
private static int count = 0;
private int num = count;
public timertask() {
count++;
}
public void execute() {
system.out.println("[timertask " + num + "] start...");
try {
thread.sleep(2000);
}
catch(interruptedexception ie) {}
system.out.println("[timertask " + num + "] done.");
}
}
以上任务均简单的sleep若干秒。
taskqueue实现了一个队列,客户端可以将请求放入队列,服务器线程可以从队列中取出任务:
package com.crackj2ee.thread;
import java.util.*;
public class taskqueue {
private list queue = new linkedlist();
public synchronized task gettask() {
while(queue.size()==0) {
try {
this.wait();
}
catch(interruptedexception ie) {
return null;
}
}
return (task)queue.remove(0);
}
public synchronized void puttask(task task) {
queue.add(task);
this.notifyall();
}
}
终于到了真正的workerthread,这是真正执行任务的服务器线程:
package com.crackj2ee.thread;
public class workerthread extends thread {
private static int count = 0;
private boolean busy = false;
private boolean stop = false;
private taskqueue queue;
public workerthread(threadgroup group, taskqueue queue) {
super(group, "worker-" + count);
count++;
this.queue = queue;
}
public void shutdown() {
stop = true;
this.interrupt();
try {
this.join();
}
catch(interruptedexception ie) {}
}
public boolean isidle() {
return !busy;
}
public void run() {
system.out.println(getname() + " start.");
while(!stop) {
task task = queue.gettask();
if(task!=null) {
busy = true;
task.execute();
busy = false;
}
}
system.out.println(getname() + " end.");
}
}
前面已经讲过,queue.gettask()是一个阻塞方法,服务器线程可能在此wait()一段时间。此外,workerthread还有一个shutdown方法,用于安全结束线程。
最后是threadpool,负责管理所有的服务器线程,还可以动态增加和减少线程数:
package com.crackj2ee.thread;
import java.util.*;
public class threadpool extends threadgroup {
private list threads = new linkedlist();
private taskqueue queue;
public threadpool(taskqueue queue) {
super("thread-pool");
this.queue = queue;
}
public synchronized void addworkerthread() {
thread t = new workerthread(this, queue);
threads.add(t);
t.start();
}
public synchronized void removeworkerthread() {
if(threads.size()>0) {
workerthread t = (workerthread)threads.remove(0);
t.shutdown();
}
}
public synchronized void currentstatus() {
system.out.println("-----------------------------------------------");
system.out.println("thread count = " + threads.size());
iterator it = threads.iterator();
while(it.hasnext()) {
workerthread t = (workerthread)it.next();
system.out.println(t.getname() + ": " + (t.isidle() ? "idle" : "busy"));
}
system.out.println("-----------------------------------------------");
}
}
currentstatus()方法是为了方便调试,打印出所有线程的当前状态。
最后,main负责完成main()方法:
package com.crackj2ee.thread;
public class main {
public static void main(string[] args) {
taskqueue queue = new taskqueue();
threadpool pool = new threadpool(queue);
for(int i=0; i<10; i++) {
queue.puttask(new calculatetask());
queue.puttask(new timertask());
}
pool.addworkerthread();
pool.addworkerthread();
dosleep(8000);
pool.currentstatus();
pool.addworkerthread();
pool.addworkerthread();
pool.addworkerthread();
pool.addworkerthread();
pool.addworkerthread();
dosleep(5000);
pool.currentstatus();
}
private static void dosleep(long ms) {
try {
thread.sleep(ms);
}
catch(interruptedexception ie) {}
}
}
main()一开始放入了20个task,然后动态添加了一些服务线程,并定期打印线程状态,运行结果如下:
worker-0 start.
[calculatetask 0] start...
worker-1 start.
[timertask 0] start...
[timertask 0] done.
[calculatetask 1] start...
[calculatetask 0] done.
[timertask 1] start...
[calculatetask 1] done.
[calculatetask 2] start...
[timertask 1] done.
[timertask 2] start...
[timertask 2] done.
[calculatetask 3] start...
-----------------------------------------------
thread count = 2
worker-0: busy
worker-1: busy
-----------------------------------------------
[calculatetask 2] done.
[timertask 3] start...
worker-2 start.
[calculatetask 4] start...
worker-3 start.
[timertask 4] start...
worker-4 start.
[calculatetask 5] start...
worker-5 start.
[timertask 5] start...
worker-6 start.
[calculatetask 6] start...
[calculatetask 3] done.
[timertask 6] start...
[timertask 3] done.
[calculatetask 7] start...
[timertask 4] done.
[timertask 7] start...
[timertask 5] done.
[calculatetask 8] start...
[calculatetask 4] done.
[timertask 8] start...
[calculatetask 5] done.
[calculatetask 9] start...
[calculatetask 6] done.
[timertask 9] start...
[timertask 6] done.
[timertask 7] done.
-----------------------------------------------
thread count = 7
worker-0: idle
worker-1: busy
worker-2: busy
worker-3: idle
worker-4: busy
worker-5: busy
worker-6: busy
-----------------------------------------------
[calculatetask 7] done.
[calculatetask 8] done.
[timertask 8] done.
[timertask 9] done.
[calculatetask 9] done.
仔细观察:一开始只有两个服务器线程,因此线程状态都是忙,后来线程数增多,
闽公网安备 35060202000074号