[本文是我對Java Concurrency In Practice C08的歸納和總結. ?轉載請注明作者和出處, ?如有謬誤, 歡迎在評論中指正. ]
task和線程池執行機制之間隱式的耦合
前面曾提到過, 線程池的應用解耦了task的提交和執行. 事實上, 這有所夸大, 因為不是所有的task都適用于所有的執行機制, 某些task要求在特定的線程池中執行:
1. 非獨立task, 指的是依賴于其他task的任務.?
2. 要求在單線程中運行的task. 某些task不是線程安全的, 無法并發運行. Executors.newSingleThreadExecutor()方法返回的線程池只包含單個線程, 提交給該線程池的task將緩存在一個無界隊列中, 線程池中所包含的單個線程將依次從隊列中取出task運行.
3. 響應時間敏感的task. 某些task要求必須在極短的時間內開始執行, 比如GUI應用中處理用戶點擊操作的task. 假如提交給某一線程池的task既包含long-running task, 也包含響應時間敏感的task, 那么響應時間敏感的task可能無法在極短的時間內得到執行.?
4. 使用了ThreadLocal類的task. 線程池的標準實現可能會在空閑時銷毀多余的線程, 繁忙時創建更多的線程, 更有可能重用線程. 所以使用了ThreadLocal的task不應該提交給線程池運行, 除非ThreadLocal的使用只限定在單個task內, 不用于多個task之間通信.
?
線程饑餓死鎖
如果提交給線程池運行的task之間不是相互獨立的, 就有可能出現線程饑餓死鎖. 比如提交給SingleThreadExecutor執行的2個task, task A在執行過程中需要等待task B的執行結果才能繼續, 而此時沒有多余的線程用于執行task B, 如此就發生了線程饑餓死鎖.
public class StarvationDeadLock {
public static void main(String[] args) {
final ExecutorService executor = Executors.newSingleThreadExecutor();
final Runnable taskB = new Runnable() {
@Override
public void run() {
//...
}
};
Runnable taskA = new Runnable() {
@Override
public void run() {
Future<?> future = executor.submit(taskB);
try {
System.out.println("waiting for taskB complete");
// get方法將阻塞, 直到taskB執行完成
// 但是由于線程池中只有一個線程, 而該線程已經被taskA占用, 所以taskB將沒有機會執行.
// 此時就發生了線程饑餓死鎖
future.get();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} catch (ExecutionException e) {
e.printStackTrace();
}
//...
}
};
executor.submit(taskA);
}
}
不僅SingleThreadExecutor執行相互依賴的task時會發生死鎖, 其他線程池執行相互依賴的task時也可能發生死鎖:
public class StarvationDeadLock {
public static void main(String[] args) {
final ExecutorService executor = Executors.newFixedThreadPool(3);
// 設定await在Barrier對象上的線程數達到4個時, 其await方法才釋放
final CyclicBarrier barrier = new CyclicBarrier(4);
// 重復提交4個task, 每個task都await在barrier對象上
// barrier的await方法將一直阻塞, 直到4個線程都到達await點.
// 但是線程池中只有3個線程, 不可能出現4個線程都達到await點的情形, 所以依然會發生死鎖
for (int i = 0; i < 4; i++) {
executor.submit(new Runnable() {
@Override
public void run() {
try {
System.out.println("waiting for other tasks arriving at common point");
barrier.await();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} catch (BrokenBarrierException e) {
e.printStackTrace();
}
}
});
}
}
}?
避免相互依賴的task提交給同一線程池執行時發生死鎖的唯一方法是: 線程池中的線程足夠多.?
?
確定線程池的size
如果線程池的size過大, 將造成內存等資源的浪費, 甚至使得資源耗盡. 如果線程池的size過小, 將造成CPU的利用率不高. 確定合適的size需要考慮:CPU數, 內存, 是計算密集型task還是I/O密集型task, 是否需要獲取稀缺資源(比如數據庫連接)等.
對于計算密集型task, 合適的size大約為CPU數量+1. 對于I/O占較大比例的task, 合適的size可以通過以下公式確定: size = CPU數量 * CPU利用率 * (1 + I/O時間比例). Runtime.getRuntime().availableProcessors()返回CPU的個數.
當然, 實際開發中size還受到內存, 文件句柄, socket, 數據庫連接數等稀缺資源的約束. 將總的稀缺資源除以每一個task使用的資源數, 能得到線程數的上限.?
?
循環并行化
如果循環體所進行的操作是相互獨立的, 這樣的循環可以并發的運行:
// 循環操作
void processSequentially(List<Element> elements) {
for (Element e : elements)
process(e);
}
// 將相互獨立的循環操作轉變為并發操作
void processInParallel(Executor exec, List<Element> elements) {
for (final Element e : elements) {
exec.execute(new Runnable() {
public void run() {
process(e);
}
});
}
exec.shutdown();
exec.awaitTermination(Long.MAX_VALUE, TimeUnit.SECONDS);
}?
如果希望同時提交一系列task, 并且等待它們執行完畢, 可以調用ExecutorService.invokeAll方法.
如果希望task執行完畢之后就獲取其執行結果, 可以使用CompletionService.
?