[本文是我對Java Concurrency In Practice 7.2的歸納和總結. ?轉載請注明作者和出處, ?如有謬誤, 歡迎在評論中指正. ]
以ExecutorService為例, 該類內部封裝有多個線程, 類外部無法直接停止這些線程. 相反, 外部調用Service的shutDown和shutDownNow方法關閉Service, 而Service負責停止其擁有的線程.
大多數server應用會使用到log, 下例中的LogWriter是一個使用生產者消費者模式構建的log service, 需要打印log的線程將待打印的內容加入到阻塞隊列中, 而logger線程則不斷的從阻塞隊列中取出數據輸出:
public class LogWriter {
private final BlockingQueue<String> queue;
private final LoggerThread logger;
public LogWriter(Writer writer) {
this.queue = new LinkedBlockingQueue<String>(CAPACITY);
this.logger = new LoggerThread(writer);
}
public void start() {
logger.start();
}
/**
* 需要打印數據的線程調用該方法, 將待打印數據加入阻塞隊列
*/
public void log(String msg) throws InterruptedException {
queue.put(msg);
}
/**
* 負責從阻塞隊列中取出數據輸出的線程
*/
private class LoggerThread extends Thread {
private final PrintWriter writer;
// ...
public void run() {
try {
while (true)
writer.println(queue.take());
} catch (InterruptedException ignored) {
} finally {
writer.close();
}
}
}
}
LogWriter內部封裝有LoggerThread線程, 所以LogWriter是一個基于線程構建的Service. 根據ExecutorService的經驗, 我們需要在LogWriter中提供停止LoggerThread線程的方法. 看起來這并不是很難, 我們只需在LogWriter中添加shutDown方法:
/**
* 該方法用于停止LoggerThread線程
*/
public void shutDown() {
logger.interrupt();
}
當LogWriter.shutDown方法被調用時, LoggerThread線程的中斷標記被設置為true, 之后LoggerThread線程執行queue.take()方法時會拋出InterruptedException異常, 從而使得LoggerThread線程結束.
然而這樣的shutDown方法并不是很恰當:?
1. 丟棄了隊列中尚未來得及輸出的數據.
2. 更嚴重的是, 假如線程A對LogWriter.log方法的調用因為隊列已滿而阻塞, 此時停止LoggerThread線程將導致線程A永遠阻塞在queue.put方法上.
對上例的改進:
public class LogWriter {
private final BlockingQueue<String> queue;
private final LoggerThread loggerThread;
private final PrintWriter writer;
/**
* 表示是否關閉Service
*/
private boolean isShutdown;
/**
* 隊列中待處理數據的數量
*/
private int reservations;
public void start() {
loggerThread.start();
}
public void shutDown() {
synchronized (this) {
isShutdown = true;
}
loggerThread.interrupt();
}
public void log(String msg) throws InterruptedException {
synchronized (this) {
// service已關閉后調用log方法直接拋出異常
if (isShutdown)
throw new IllegalStateException("Service has been shut down");
++reservations;
}
// BlockingQueue本身就是線程安全的, put方法的調用不在同步代碼塊中
// 我們只需要保證isShutdown和reservations是線程安全的即可
queue.put(msg);
}
private class LoggerThread extends Thread {
public void run() {
try {
while (true) {
try {
synchronized (this) {
// 當service已關閉且處理完隊列中的所有數據時才跳出while循環
if (isShutdown && reservations == 0)
break;
}
String msg = queue.take();
synchronized (this) {
--reservations;
}
writer.println(msg);
} catch (InterruptedException e) {
// 發生InterruptedException異常時不應該立刻跳出while循環
// 而應該繼續輸出log, 直到處理完隊列中的所有數據
}
}
} finally {
writer.close();
}
}
}
}
上面的處理顯得過于復雜, 利用ExecutorService可以編寫出相對更簡潔的程序:
public class LogService {
/**
* 創建只包含單個線程的線程池, 提交給該線程池的任務將以串行的方式逐個運行
* 本例中, 此線程用于執行打印log的任務
*/
private final ExecutorService exec = Executors.newSingleThreadExecutor();
private final PrintWriter writer;
public void start() {
}
public void shutdown() throws InterruptedException {
try {
// 關閉ExecutorService后再調用其awaitTermination將導致當前線程阻塞, 直到所有已提交的任務執行完畢, 或者發生超時
exec.shutdown();
exec.awaitTermination(TIMEOUT, UNIT);
} finally {
writer.close();
}
}
public void log(String msg) {
try {
// 線程池關閉后再調用其execute方法將拋出RejectedExecutionException異常
exec.execute(new WriteTask(msg));
} catch (RejectedExecutionException ignored) {
}
}
private final class WriteTask implements Runnable {
private String msg;
public WriteTask(String msg) {
this.msg = msg;
}
@Override
public void run() {
writer.println(msg);
}
}
}