Cancellation and Shutdown

任务取消

一种做法是设置cancel flag,但是存在永远无法响应的风险:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
public Task {
  private volatile boolean cancelled = false;
  public void cancel() {
    this.cancelled = true;
  }
  public void run() {
    if (!cancelled) {
      // 如果这个方法阻塞了,那么Task就永远不会cancel
      someBlockingMethod();      
    }
  }
}

线程中断

1
2
3
4
5
public class Thread {
  public void interrupt(); // 通知target thread中断
  public boolean isInterrupted(); // 查询target thread的中断状态
  public static boolean interrupted(); // 清空当前thread的中断状态,返回前一个中断状态
}

利用线程中断来取消任务的例子

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
public Task implements Runnable {
  public void run() {
    try {
      // 检测中断状态
      while (!Thread.currentThread().isInterrupted()) {
        someBlockingMethod();
      }
    } catch (InterruptedException e) {
      // 在阻塞方法时线程被中断了
      // 除非你知道自己在做什么,否则你应该rethrow 或者恢复线程的中断状态
      Thread.currentThread().interrupt();
    }
  }
}

就算某个任务不可中断也要在它结束后恢复中断状态

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
public Task {
  public void run() {
    try {
      while (true) {
        try {
          someBlockingMethod()
        } catch (InterruptedException e) {
          interrupted = true;
        }
      }      
    } finally {
      if (interrupted) {
        Thread.currentThread().interrupt();
      }
    }
  }
}

阻塞库的方法一旦探知线程中断状态,就会抛出异常,下面代码会造成死循环,他会不停抛出InterruptedException:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
public Task {
  public void run() {
    while (!blockingQueue.isEmpty()) {
      try {
        blockingQueue.take();
      } catch (InterrupttedException e) {
        Thread.currentThread().interrupt();
      }
    }
  }
}

反面例子,擅自中断

  1. 不知道线程拥有者的中断策略就擅自中断
  2. r.run()里面如果抛出了RuntimeException,返回到了caller那里,然后时间到了触发了caller线程的interruption
  3. r.run()先结束了,然后和第2点说的一样
  4. 如果r.run()不响应中断,那么timedRun方法就不会返回直到r.run()结束
1
2
3
4
5
6
// 本意是控制timeRun方法的执行时长
public void timedRun(Runnable r) {
  final Thread taskThread = Thread.currentThread();
  scheduledExecutor.schedule(() -> taskThread.interrupt(), timeout, unit);
  r.run();
}

用Future中断

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
public void timedRun(Runnable r) {
  Future f = executorService.submit(r);
  try {
    f.get(timeout, unit);
  } catch (TimeoutException e) {
    // 下面会取消这个task的
  } catch (ExecutionException e) {
    // 在task里出现异常,rethrow
    throw e.getCause();
  } finally {
    // 对一个正常结束的future执行cancel是没有伤害的
    f.cancel(true); // true代表interrupt
  }
}

对付不响应中断

有些方法不响应中断,但是我们要中断咋整?下面给了一个思路:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
public class ReaderThread extends Thread {
  private final Socket socket;
  private final InputStream in;
  @Override
  public void interrupt() {
    try {
      socket.close();
    } catch (IOException ignored) {}
    finally {
      super.interrupt(); // 注意还是得让上级中断的
    }
  }
  @Override
  public void run() {
    in.read(buf); // 这个方法阻塞但是不响应中断
  }
}

对付不响应中断2

如果你是用Executor执行,则可以这样做提供自己的ThreadPoolExecutor.newTaskFor实现,并提供自己的Future.cancel(boolean)

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
public interface CancellableTask<T> extends Callable<T> {
  void cancel();
  RunnableFuture<T> newTask();
}
public class CancellingExecutor extends ThreadPoolExecutor {
  // 看这个方法
  @Override
  protected<T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
    if (callable instanceof CancellableTask) {
      return ((CancellableTask<T>) callable).newTask();
    }
    return super.newTaskFor(callable);
  }
}
public abstract class SocketUsingTask<T> implements CancellableTask<T> {
  private Socket socket;
  public synchronized void cancel() {
    try {
      if (socket != null) {
        socket.close();
      }
    } catch (IOException ignored) {}
  }
  public RunnableFuture<T> newTask() {
    return new FutureTask<T>(this) {
      // 看这个方法
      @Override
      public boolean cancel(boolean mayInterruptIfRunning) {
        try {
          try {
            SocketUsingTask.this.cancel()            
          } finally {
            return super.cancel(mayInterruptIfRunning);
          }
        }
      }
    }
  }
}

停止基于线程的Service

利用ExecutorService

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
public class LogService {
  private final ExecutorService exec = ...;
  public void stop() throws InterruptionException {
    try {
      exec.shutdown();
      exec.awaitTermination(timeout, unit);
    } finally {
      writer.close();
    }
  }
  public void log(String msg) {
    try {
      exec.execute(new WriteTask(msg));
    } catch (RejectedExecutionException ignored) {}
  }
}

利用Poison Pills

1
2
3
4
5
producer.(POISON_PILL);

if (POISON_PILL == consumer.take()) {
  doShutdownWork();
}

跟踪shutdownNow时开始但未结束的任务

ExecutorService.shutdownNow会返回还未开始的任务,但是不会返回开始了但是没有结束的任务,用类似下面的代码可以得到这些任务:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
public class TrackingExecutor extends AbstractExecutorService {
  private final ExecutorService exec;
  private final Set<Runnable> tasksCancelledAtShutdown = 
    Collections.synchronizedSet(new HashSet<Runnable>());
  public List<Runnable> getCancelledTasks() {
    if (!exec.isTerminated()) {
      throw new IllegalStateException(...);
    }
    return new ArrayList<>(tasksCancelledAtShutdown);
  }
  public void execute(final Runnable runnable) {
    exec.execute(() -> {
      try {
        runnable.run();
      } finally {
        if (isShutdown() && Thread.currentThread.isInterrupted()) {
          tasksCancelledAtShutdown.add(runnable);
        }
      }
    })
  }
}

线程异常退出

下面的代码在线程异常退出前告知框架它死了:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
public void run() {
  Throwable thrown = null;
  try {
    while (!isInterrupted()) {
      runTask(getTaskFromWorkQueue());
    }
  } catch (Throwable e) {
    thrown = e;
  } finally {
    threadExited(this, thrown);
  }
}

JVM shutdown

版权

评论