服务热线:13616026886

技术文档 欢迎使用技术文档,我们为你提供从新手到专业开发者的所有资源,你也可以通过它日益精进

位置:首页 > 技术文档 > JAVA > 高级技术 > 设计模式 > 查看文档

一个简单的 thread 缓冲池的实现


  在应用中,我们常常需要thread缓冲池来做一些事以提高程序的效率和并发性。本文演示了如何利用queue这种数据结构实现一个简单的thread缓冲池。
  
  一个thread缓冲池可以设计成以下这样:缓冲池由几个工作thread和一个queue组成,client负责把任务放到queue里面(put方法),而工作thread就依次取出这些任务并执行它们(get方法)。
  
  queue的一个经典实现是使用一个循环数组(这个实现在很多数据结构的书上都有介绍),如一个大小为size的数组,这个循环数组可以被想象成首尾相连的一个环。oldest指向queue中最老的数据所在的位置,next指向下一个可以放新数据的位置。
  
  放入一个新数据到next的位置后,需要更新next:next = (next + 1) % size;
  
  从oldest位置取出一个数据后,需要更新oldest:oldest = (oldest + 1) % size;
  
  当oldest == next的时候,queue为空,
  
  当(next + 1) % size == oldest的时候,queue为满。
  
  (注意:为了区分queue为空和为满的情况,实际上queue里面最多能放size-1个数据。)
  
  因为这个queue会同时被多个线程访问,需要考虑在这种情况下queue如何工作。首先,queue需要是线程安全的,可以用java里的synchronized关键字来确保同时只有一个thread在访问queue.
  
  我们还可以注意到当queue为空的时候,get操作是无法进行的;当queue为满的时候,put操作又是无法进行的。在多线程访问遇到这种情况时,一般希望执行操作的线程可以等待(block)直到该操作可以进行下去。比如,但一个thread在一个空queue上执行get方法的时候,这个 thread应当等待(block),直到另外的thread执行该queue的put方法后,再继续执行下去。在java里面,object对象的 wait(),notify()方法提供了这样的功能。
  
  把上面的内容结合起来,就是一个syncqueue的类:
  
  public class syncqueue {
  
  public syncqueue(int size) {
  _array = new object[size];
  _size = size;
  _oldest = 0;
  _next = 0;
  }
  
  public synchronized void put(object o) {
  while (full()) {
  try {
  wait();
  } catch (interruptedexception ex) {
  throw new exceptionadapter(ex);
  }
  }
  _array[_next] = o;
  _next = (_next + 1) % _size;
  notify();
  }
  
  public synchronized object get() {
  while (empty()) {
  try {
  wait();
  } catch (interruptedexception ex) {
  throw new exceptionadapter(ex);
  }
  }
  object ret = _array[_oldest];
  _oldest = (_oldest + 1) % _size;
  notify();
  return ret;
  }
  
  protected boolean empty() {
  return _next == _oldest;
  }
  
  protected boolean full() {
  return (_next + 1) % _size == _oldest;
  }
  
  protected object [] _array;
  protected int _next;
  protected int _oldest;
  protected int _size;
  }
  
  可以注意一下get和put方法中while的使用,如果换成if是会有问题的。这是个很容易犯的错误。;-)
  
  在以上代码中使用了exceptionadapter这个类,它的作用是把一个checked exception包装成runtimeexception。详细的说明可以参考我的避免在java中使用checked exception一文。
  
  接下来我们需要一个对象来表现thread缓冲池所要执行的任务。可以发现jdk中的runnable interface非常合适这个角色。
  
  最后,剩下工作线程的实现就很简单了:从syncqueue里取出一个runnable对象并执行它。
  
  public class worker implements runnable {
  
  public worker(syncqueue queue) {
  _queue = queue;
  }
  
  public void run() {
  while (true) {
  runnable task = (runnable) _queue.get();
  task.run();
  }
  }
  
  protected syncqueue _queue = null;
  
  }
  
  下面是一个使用这个thread缓冲池的例子:
  
  //构造thread缓冲池
  
  syncqueue queue = new syncqueue(10);
  for (int i = 0; i < 5; i ++) {
  new thread(new worker(queue)).start();
  }
  
  //使用thread缓冲池
  
  runnable task = new mytask();
  queue.put(task);
  
  为了使本文中的代码尽可能简单,这个thread缓冲池的实现是一个基本的框架。当使用到实际中时,一些其他功能也可以在这一基础上添加,比如异常处理,动态调整缓冲池大小等等。

扫描关注微信公众号