Building Custom Synchronizers

状态依赖的类

  • 某些操作具有基于状态的前置条件的类
  • 比如:FutureTask、Semaphore、BlockingQueue
  • 比如:你不能从空队列中移除item,不能在任务结束前得到结果

构建状态依赖的类最简单的办法就是利用已有的状态依赖的类。比如使用CountDownLatch,或者自己构建同步器。自己构建同步器可以利用:内置条件队列(intrinsic condition queues)、显式Condition对象、AbstractQueuedSynchronizer框架。

管理状态依赖

使用polling(轮询)和sleeping来出来状态依赖是很痛苦的。所以不推荐。

阻塞的状态依赖动作的结构:

1
2
3
4
5
6
7
8
9
acquire lock on object state
while (precondition does not hold) {
  release lock
  wait until precondition might hold
  optionally fail if interrupted or timeout expires
  reacquire lock
}
perform action
release lock

下面是一个有届的Buffer的基类,用的是循环数组,后面有些例子会用它来说明:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
@ThreadSafe
public abstract class BaseBoundedBuffer <V> {
    @GuardedBy("this") private final V[] buf;
    @GuardedBy("this") private int tail;
    @GuardedBy("this") private int head;
    @GuardedBy("this") private int count;

    protected BaseBoundedBuffer(int capacity) {
        this.buf = (V[]) new Object[capacity];
    }

    protected synchronized final void doPut(V v) {
        buf[tail] = v;
        if (++tail == buf.length)
            tail = 0;
        ++count;
    }

    protected synchronized final V doTake() {
        V v = buf[head];
        buf[head] = null;
        if (++head == buf.length)
            head = 0;
        --count;
        return v;
    }

    public synchronized final boolean isFull() {
        return count == buf.length;
    }

    public synchronized final boolean isEmpty() {
        return count == 0;
    }
}

例子:传播前置条件失败给调用方(不要)

下面这个例子不要的地方在于:

  • Full和Empty是两个正常状态,抛异常不好
  • 调用方得捕获异常,然后还要自己重试,重试的方式有两种
    • sleep,但是sleep时长很难掌握,会造成响应度不够
    • 不sleep直接重试,这个就是所谓的busy waiting或者spin waiting,浪费CPU
  • 调用方处理前置条件失败
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
@ThreadSafe
public class GrumpyBoundedBuffer <V> extends BaseBoundedBuffer<V> {
    public GrumpyBoundedBuffer(int size) {
        super(size);
    }

    public synchronized void put(V v) throws BufferFullException {
        if (isFull())
            throw new BufferFullException();
        doPut(v);
    }

    public synchronized V take() throws BufferEmptyException {
        if (isEmpty())
            throw new BufferEmptyException();
        return doTake();
    }
}

class ExampleUsage {
    private GrumpyBoundedBuffer<String> buffer;
    int SLEEP_GRANULARITY = 50;

    void useBuffer() throws InterruptedException {
        while (true) {
            try {
                String item = buffer.take();
                // use item
                break;
            } catch (BufferEmptyException e) {
                Thread.sleep(SLEEP_GRANULARITY);
            }
        }
    }
}

class BufferFullException extends RuntimeException {
}

class BufferEmptyException extends RuntimeException {
}

不用异常也可以,比如通过返回一个Error结果,但是问题的本质没有变

例子:用轮询和睡眠来阻塞

这个例子稍微好点,自己处理前置条件失败:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
@ThreadSafe
public class SleepyBoundedBuffer <V> extends BaseBoundedBuffer<V> {
    int SLEEP_GRANULARITY = 60;

    public SleepyBoundedBuffer() {
        this(100);
    }

    public SleepyBoundedBuffer(int size) {
        super(size);
    }

    public void put(V v) throws InterruptedException {
        while (true) {
            synchronized (this) {
                if (!isFull()) {
                    doPut(v);
                    return;
                }
            }
            Thread.sleep(SLEEP_GRANULARITY);
        }
    }

    public V take() throws InterruptedException {
        while (true) {
            synchronized (this) {
                if (!isEmpty())
                    return doTake();
            }
            Thread.sleep(SLEEP_GRANULARITY);
        }
    }
}

不过问题也差不多:睡眠的粒度很难把握,短了浪费CPU,长了增加响应度,

条件队列(Condition queues)

起名为Condition queue是因为它给一组线程——称为wait set——等待特定的条件变为true。这个队列中的元素是等待条件的线程。

每个对象可以作为condition queue,ObjectwaitnotifynotifyAll方法构成了内置条件队列的API。

对象内置锁和内置条件队列是关联的:

  • 你要调用条件队列方法必须先获得对象锁。
  • 你不能等待条件除非你能检查状态,你不能把其他线程从条件等待中释放除非你能够修改状态。

下面这个例子更简单,也更高效,响应度更高。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
@ThreadSafe
public class BoundedBuffer <V> extends BaseBoundedBuffer<V> {
    // CONDITION PREDICATE: not-full (!isFull())
    // CONDITION PREDICATE: not-empty (!isEmpty())
    public BoundedBuffer() {
        this(100);
    }

    public BoundedBuffer(int size) {
        super(size);
    }

    // BLOCKS-UNTIL: not-full
    public synchronized void put(V v) throws InterruptedException {
        while (isFull())
            wait();
        doPut(v);
        notifyAll();
    }

    // BLOCKS-UNTIL: not-empty
    public synchronized V take() throws InterruptedException {
        while (isEmpty())
            wait();
        V v = doTake();
        notifyAll();
        return v;
    }

    // BLOCKS-UNTIL: not-full
    // Alternate form of put() using conditional notification
    // 减少notifyAll的次数
    public synchronized void alternatePut(V v) throws InterruptedException {
        while (isFull())
            wait();
        boolean wasEmpty = isEmpty();
        doPut(v);
        if (wasEmpty)
            notifyAll();
    }
}

使用条件队列

条件判断 The condition predicate

  • 正确使用条件队列的关键是识别对象所等待的condition predicates。
  • contidition predicates是使得操作依赖于状态的前置条件。
  • 比如take的condition predicate是buffer不为空、put的则是buffer没有满

condition wait牵涉三个:加锁、wait方法、被锁保护的状态变量。测试condition predicate之前要得到锁,锁对象和条件队列对象得是同一个。

wait方法:释放锁,阻塞当前线程,等待——直到或超过规定时间、或线程被中断、或线程被通知唤醒。线程被唤醒后和其他线程再次争抢锁。

醒得太快 waking up too soon

wait中醒来不代表condition predicate变成true了,所以醒过来后一定要判断条件。

一个内置条件队列可能被多个condition predicate使用,因此如果有人调用了notifyAll不代表你等待的condition predicate变成true了。而且wait还会虚假的醒来,即并没有人调用notify

下面是经典用法:

1
2
3
4
5
6
7
8
void stateDependentMethod() throws InterruptedException {
  synchronized(lock) {
    while (!conditionPredicate()) {
      lock.wait();
    }
    // object is no in desired state
  }
}

使用Object.waitCondition.await要记牢:

  • 总是得有一个condition predicate
  • wait之前,和从wait唤醒之后要测试condition predicate
  • 总是在循环中调用wait
  • 保证condition predicate所用的状态变量被condition queue的相同锁保护
  • 调用waitnotifynotifyAll之前要持有锁
  • 在检查condition predicate之后但在对其进行操作之前,请勿释放锁

漏掉的信号

漏掉信号发生在:一个线程必须等待一个已经是true的条件,但是在等待之前没能检查condition predicate。

如果A 先notify,而B在后面wait,那么B是不会得到A发出的信号的,所以在wait之前一定要检查condition predicate。用前面的代码结构就能解决这个问题。

通知

如果你在等待一个条件,那么确保肯定有其他人会在条件变成true的时候发出通知。

通知也需要得到锁,所以而且通知线程释放锁越快越好。大部分情况下要用notifyAll而不是notify,因为notify只通知一个线程,如果这个线程等待的condition没有变成true,那么这次通知就浪费了,那么其他线程就没有机会了,也就是这个信号被劫持了。

notify可以作为性能优化的手段,但还是那句老话,先做对再做好。

例子:gate class

下面是一个gate例子,gate关闭的时候线程等待,gate打开的时候线程通过:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
@ThreadSafe
public class ThreadGate {
    // CONDITION-PREDICATE: opened-since(n) (isOpen || generation>n)
    @GuardedBy("this") private boolean isOpen;
    @GuardedBy("this") private int generation;

    public synchronized void close() {
        isOpen = false;
    }

    public synchronized void open() {
        ++generation;
        isOpen = true;
        notifyAll();
    }

    // BLOCKS-UNTIL: opened-since(generation on entry)
    public synchronized void await() throws InterruptedException {
        int arrivalGeneration = generation;
        while (!isOpen && arrivalGeneration == generation)
            wait();
    }
}

解释一下这段:

1
2
while (!isOpen && arrivalGeneration == generation)
  wait();

如果大门关闭 且 线程进入时的代数和门的代数一样,就要等待。反过来的意思是,如果大门开放,或者线程进入时的代数比门的代数更老,则通过。为什么?

因为在从wait唤醒到再次进入while之间门可能会被关闭,如果只看open状态,那么有一部分线程就会通不过,这个是有问题的,因为Gate设计的本来意是如果大门打开,那么就同时释放。

子类安全性问题

一个依赖状态的类应该要么完全暴露(并文档)它的等待和通知协议给子类,要么压根防止子类参与进来。

封装Condition queues

最好封装条件队列,这样在类层级外部就不会访问到它。

进入和退出协议

略。

显式条件对象

Condition对象关联一个Lock对象,一个Lock对象可以有很多Condition对象。Condition对象继承了Lock对象的公平性。

下面是Condition接口:

1
2
3
4
5
6
7
void await()
boolean await(long time, TimeUnit unit)
long awaitNanos(long nanosTimeout)
void awaitUninterruptibly()
boolean awaitUntil(Date deadline)
void signal()
void signalAll()

下面是一个例子:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
@ThreadSafe
public class ConditionBoundedBuffer <T> {
    protected final Lock lock = new ReentrantLock();
    // CONDITION PREDICATE: notFull (count < items.length)
    private final Condition notFull = lock.newCondition();
    // CONDITION PREDICATE: notEmpty (count > 0)
    private final Condition notEmpty = lock.newCondition();
    private static final int BUFFER_SIZE = 100;
    @GuardedBy("lock") private final T[] items = (T[]) new Object[BUFFER_SIZE];
    @GuardedBy("lock") private int tail, head, count;

    // BLOCKS-UNTIL: notFull
    public void put(T x) throws InterruptedException {
        lock.lock();
        try {
            while (count == items.length)
                notFull.await();
            items[tail] = x;
            if (++tail == items.length)
                tail = 0;
            ++count;
            notEmpty.signal();
        } finally {
            lock.unlock();
        }
    }

    // BLOCKS-UNTIL: notEmpty
    public T take() throws InterruptedException {
        lock.lock();
        try {
            while (count == 0)
                notEmpty.await();
            T x = items[head];
            items[head] = null;
            if (++head == items.length)
                head = 0;
            --count;
            notFull.signal();
            return x;
        } finally {
            lock.unlock();
        }
    }
}

解剖同步器

ReentrantLockSemaphoreCountDownLatchFutureTask,都是利用了AbstractQueuedSynchronizer (AQS)。

AbstractQueuedSynchronizer

基于AQS的同步器执行的操作是acquirerelease的变种:

  • 获取(acquire)是状态依赖的操作,而且总是会阻塞
  • 释放(release)不是一个阻塞操作,一次释放可能会允许阻塞在acquire的线程通过。

AQS管理同步器的状态,通过getStatesetStatecompareAndSetState方法。比如:

  • ReentrantLock使用它来记录拥有锁的线程获得锁的次数(可重入的关系)
  • Semaphore使用它来代表剩余的permits
  • FutureTask用它来表示任务的状态:没开始、运行中、完成、取消。

同步器也可以保存其他状态,比入ReentrantLock保存了lock owner线程,用来确保只有owner可以释放锁。

acquire可能是独占的(exclusive),比如ReentrantLock。也可以是非独占的(non-exclusive),比如SempaphoreCountDownLatch

一次acquire的包含两个部分:

  • 第一部分

    1. 判断当前状态是否允许获取
    2. 如果允许,线程通过。如果不允许,线程阻塞或者失败。
  • 第二部分:更新同步器状态。

acquirerelease的经典形式:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
boolean acquire() throws InterruptedException {
  while (state does not permit acquire) {
    if (blocking acquisition requested) {
      enqueue current thread if not already queued
      block current thread
    }
    else
      return failure
  }
  possibly update synchronization state
  dequeue thread if it was queued
  return success
}
void release() {
  update synchronization state
  if (new state may permit a blocked thread to acquire) 
    unblock one or more queued threads
}

实现AQS:

  • 支持独占式获取的同步器要实现:tryAcquiretryReleaseisheldExclusively方法。
  • 支持非独占获取的同步器要实现:tryAcquireSharedtryReleaseShared
  • AQS的acquireacquireSharedreleasereleaseShared会调用上面的try*方法。

tryAcquireShared方法返回值说明:

  • < 0,获取失败
  • = 0,独占式获取成功
  • > 0,非独占式获取成功

tryReleasetryReleaseShared返回值说明:

  • true,可以释放阻塞在获取操作的线程
  • false,不释放线程

简单的Latch

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
@ThreadSafe
public class OneShotLatch {
    private final Sync sync = new Sync();

    public void signal() {
        sync.releaseShared(0);
    }

    public void await() throws InterruptedException {
        sync.acquireSharedInterruptibly(0);
    }

    private class Sync extends AbstractQueuedSynchronizer {
        protected int tryAcquireShared(int ignored) {
            // Succeed if latch is open (state == 1), else fail
            return (getState() == 1) ? 1 : -1;
        }

        protected boolean tryReleaseShared(int ignored) {
            setState(1); // Latch is now open
            return true; // Other threads may now be able to acquire

        }
    }
}

一般来说不会直接继承AQS,而是弄一个私有内部类来继承,这样可以保证封装。

AQS在JUC同步器中的运用

ReentrantLock

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
protected boolean tryAcquire(int ignored) {
  find Thread current = Thread.currentThread();
  int c = getState();
  if (c == 0) {
    if (compareAndSetState(0, 1)) {   // 这里的整段代码不是同步的
      owner = current;                // 没成功说明被别的线程抢了,会跑到最后的return false
      return true;
    }
  } else if (current == owner) {
    setState(c + 1);
    return true
  }
  return false;
}

Semaphore和CountDownLatch

在Semaphore中的运用:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
protected int tryAcquireShared(int acquires) {
  while (true) {
    int available = getState();
    int remaining = available - acquires;
    if (remaining < 0 || compareAndSetState(available, remaining))
      /* > 0: 我拿别人也能拿,非独占
         = 0: 我拿别人拿不了,独占
         < 0: 谁都拿不了
       */
      return remaining;
  }
}
protected boolean tryReleaseShared(int releases) {
  while (true) {
    int p = getState();
    if (compareAndSetState(p, p + releases))
      return true;
  }
}

在CountDownLatch中的运用:

FutureTask

state保存任务状态:running、completed、cancelled,同时保存计算结果或抛出的异常,还维护了一个运行这个任务的线程(为了能够cancel)

ReentrantReadWriteLock

同时使用了shared和非shared两种方法。

AQS内部维护了一个等待线程的队列,跟踪每个线程是请求独占还是非独占。在ReentrantReadWriteLock里,当锁可用式,如果队列头的线程请求写锁,它会得到它。如果队列头线程请求读锁,则会释放它和后面的线程,直到碰到一个请求写锁的线程。

版权

评论