mutex是互斥体,广泛地应用在多线程编程中。本文以广为流程的doug lea的concurrent工具包的mutex实现为例,进行一点探讨。在doug lea的concurrent工具包中,mutex实现了sync接口,该接口是concurrent工具包中所有锁(lock)、门(gate)和条件变量(condition)的公共接口,sync的实现类主要有:mutex、semaphore及其子类、latch、countdown、reentrantlock等。这也体现了面向抽象编程的思想,使我们可以在不改变代码或者改变少量代码的情况下,选择使用sync的不同实现。下面是sync接口的定义:
| public interface sync { public void acquire() throws interruptedexception; //获取许可 public boolean attempt(long msecs) throws interruptedexception; //尝试获取许可 public void release(); //释放许可 } |
通过使用sync可以替代java synchronized关键字,并提供更加灵活的同步控制。当然,并不是说 concurrent工具包是和java synchronized独立的技术,其实concurrent工具包也是在synchronized的基础上搭建的,从下面对mutex源码的解析即可以看到这一点。synchronized关键字仅在方法内或者代码块内有效,而使用sync却可以跨越方法甚至通过在对象之间传递,跨越对象进行同步。这是sync及concurrent工具包比直接使用synchronized更加强大的地方。
注意sync中的acquire()和attempt()都会抛出interruptedexception,所以使用sync及其子类时,调用这些方法一定要捕获interruptedexception。而release()方法并不会抛出interruptedexception,这是因为在acquire()和attempt()方法中可能会调用wait()等待其它线程释放锁。而release()在实现上进行了简化,直接释放锁,不管是否真的持有。所以,你可以对一个并没有acquire()的线程调用release()这也不会有什么问题。而由于release()不会抛出interruptedexception,所以我们可以在catch或finally子句中调用release()以保证获得的锁能够被正确释放。比如:
| class x { sync gate; // ... public void m() { try { gate.acquire(); // block until condition holds try { // ... method body } finally { gate.release(); } } catch (interruptedexception ex) { // ... evasive action } } } |
mutex是一个非重入的互斥锁。mutex广泛地用在需要跨越方法的before/after类型的同步环境中。下面是doug lea的concurrent工具包中的mutex的实现。
| public class mutex implements sync { /** the lock status **/ protected boolean inuse_ = false; public void acquire() throws interruptedexception { if (thread.interrupted()) throw new interruptedexception();//(1) synchronized(this) { try { while (inuse_) wait(); inuse_ = true; } catch (interruptedexception ex) { //(2) notify(); throw ex; } } } public synchronized void release() { inuse_ = false; notify(); } public boolean attempt(long msecs) throws interruptedexception { if (thread.interrupted()) throw new interruptedexception(); synchronized(this) { if (!inuse_) { inuse_ = true; return true; } else if (msecs <= 0) return false; else { long waittime = msecs; long start = system.currenttimemillis(); try { for (;;) { wait(waittime); if (!inuse_) { inuse_ = true; return true; } else { waittime = msecs - (system.currenttimemillis() - start); if (waittime <= 0) // (3) return false; } } } catch (interruptedexception ex) { notify(); throw ex; } } } } } |
为什么要在acquire()和attempt(0方法的开始都要检查当前线程的中断标志呢?这是为了在当前线程已经被打断时,可以立即返回,而不会仍然在锁标志上等待。调用一个线程的interrupt()方法根据当前线程所处的状态,可能产生两种不同的结果:当线程在运行过程中被打断,则设置当前线程的中断标志为true;如果当前线程阻塞于wait()、sleep()、join(),则当前线程的中断标志被清空,同时抛出interruptedexception。所以在上面代码的位置(2)也捕获了interruptedexception,然后再次抛出interruptedexception。
release()方法简单地重置inuse_标志,并通知其它线程。
attempt()方法是利用java的object.wait(long)进行计时的,由于object.wait(long)不是一个精确的时钟,所以attempt(long)方法也是一个粗略的计时。注意代码中位置(3),在超时时返回。
mutex是sync的一个基本实现,除了实现了sync接口中的方法外,并没有添加新的方法。所以,mutex的使用和sync的完全一样。在concurrent包的api中doug给出了一个精细锁定的list的实现示例,我们这儿也给出,作为对mutex和sync使用的一个例子:
| class node { object item; node next; mutex lock = new mutex(); // 每一个节点都持有一个锁 node(object x, node n) { item = x; next = n; } } class list { protected node head; // 指向列表的头 // 使用java的synchronized保护head域 // (我们当然可以使用mutex,但是这儿似乎没有这样做的必要 protected synchronized node gethead() { return head; } boolean search(object x) throws interruptedexception { node p = gethead(); if (p == null) return false; // (这儿可以更加紧凑,但是为了演示的清楚,各种情况都分别进行处理) p.lock.acquire(); // prime loop by acquiring first lock. // (if the acquire fails due to // interrupt, the method will throw // interruptedexception now, // so there is no need for any // further cleanup.) for (;;) { if (x.equals(p.item)) { p.lock.release(); // 释放当前节点的锁 return true; } else { node nextp = p.next; if (nextp == null) { p.lock.release(); // 释放最后持有的锁 return false; } else { try { nextp.lock.acquire(); // 在释放当前锁之前获取下一个节点的锁 } catch (interruptedexception ex) { p.lock.release(); // 如果获取失败,也释放当前的锁 throw ex; } p.lock.release(); // 释放上个节点的锁,现在已经持有新的锁了 p = nextp; } } } } synchronized void add(object x) { // 使用synchronized保护head域 head = new node(x, head); } // ... other similar traversal and update methods ... } |
闽公网安备 35060202000074号