[本文是我對(duì)Java Concurrency In Practice C08的歸納和總結(jié). ?轉(zhuǎn)載請(qǐng)注明作者和出處, ?如有謬誤, 歡迎在評(píng)論中指正. ]
task和線程池執(zhí)行機(jī)制之間隱式的耦合
前面曾提到過, 線程池的應(yīng)用解耦了task的提交和執(zhí)行. 事實(shí)上, 這有所夸大, 因?yàn)椴皇撬械膖ask都適用于所有的執(zhí)行機(jī)制, 某些task要求在特定的線程池中執(zhí)行:
1. 非獨(dú)立task, 指的是依賴于其他task的任務(wù).?
2. 要求在單線程中運(yùn)行的task. 某些task不是線程安全的, 無法并發(fā)運(yùn)行. Executors.newSingleThreadExecutor()方法返回的線程池只包含單個(gè)線程, 提交給該線程池的task將緩存在一個(gè)無界隊(duì)列中, 線程池中所包含的單個(gè)線程將依次從隊(duì)列中取出task運(yùn)行.
3. 響應(yīng)時(shí)間敏感的task. 某些task要求必須在極短的時(shí)間內(nèi)開始執(zhí)行, 比如GUI應(yīng)用中處理用戶點(diǎn)擊操作的task. 假如提交給某一線程池的task既包含long-running task, 也包含響應(yīng)時(shí)間敏感的task, 那么響應(yīng)時(shí)間敏感的task可能無法在極短的時(shí)間內(nèi)得到執(zhí)行.?
4. 使用了ThreadLocal類的task. 線程池的標(biāo)準(zhǔn)實(shí)現(xiàn)可能會(huì)在空閑時(shí)銷毀多余的線程, 繁忙時(shí)創(chuàng)建更多的線程, 更有可能重用線程. 所以使用了ThreadLocal的task不應(yīng)該提交給線程池運(yùn)行, 除非ThreadLocal的使用只限定在單個(gè)task內(nèi), 不用于多個(gè)task之間通信.
?
線程饑餓死鎖
如果提交給線程池運(yùn)行的task之間不是相互獨(dú)立的, 就有可能出現(xiàn)線程饑餓死鎖. 比如提交給SingleThreadExecutor執(zhí)行的2個(gè)task, task A在執(zhí)行過程中需要等待task B的執(zhí)行結(jié)果才能繼續(xù), 而此時(shí)沒有多余的線程用于執(zhí)行task B, 如此就發(fā)生了線程饑餓死鎖.
public class StarvationDeadLock {
public static void main(String[] args) {
final ExecutorService executor = Executors.newSingleThreadExecutor();
final Runnable taskB = new Runnable() {
@Override
public void run() {
//...
}
};
Runnable taskA = new Runnable() {
@Override
public void run() {
Future<?> future = executor.submit(taskB);
try {
System.out.println("waiting for taskB complete");
// get方法將阻塞, 直到taskB執(zhí)行完成
// 但是由于線程池中只有一個(gè)線程, 而該線程已經(jīng)被taskA占用, 所以taskB將沒有機(jī)會(huì)執(zhí)行.
// 此時(shí)就發(fā)生了線程饑餓死鎖
future.get();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} catch (ExecutionException e) {
e.printStackTrace();
}
//...
}
};
executor.submit(taskA);
}
}
不僅SingleThreadExecutor執(zhí)行相互依賴的task時(shí)會(huì)發(fā)生死鎖, 其他線程池執(zhí)行相互依賴的task時(shí)也可能發(fā)生死鎖:
public class StarvationDeadLock {
public static void main(String[] args) {
final ExecutorService executor = Executors.newFixedThreadPool(3);
// 設(shè)定await在Barrier對(duì)象上的線程數(shù)達(dá)到4個(gè)時(shí), 其await方法才釋放
final CyclicBarrier barrier = new CyclicBarrier(4);
// 重復(fù)提交4個(gè)task, 每個(gè)task都await在barrier對(duì)象上
// barrier的await方法將一直阻塞, 直到4個(gè)線程都到達(dá)await點(diǎn).
// 但是線程池中只有3個(gè)線程, 不可能出現(xiàn)4個(gè)線程都達(dá)到await點(diǎn)的情形, 所以依然會(huì)發(fā)生死鎖
for (int i = 0; i < 4; i++) {
executor.submit(new Runnable() {
@Override
public void run() {
try {
System.out.println("waiting for other tasks arriving at common point");
barrier.await();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} catch (BrokenBarrierException e) {
e.printStackTrace();
}
}
});
}
}
}?
避免相互依賴的task提交給同一線程池執(zhí)行時(shí)發(fā)生死鎖的唯一方法是: 線程池中的線程足夠多.?
?
確定線程池的size
如果線程池的size過大, 將造成內(nèi)存等資源的浪費(fèi), 甚至使得資源耗盡. 如果線程池的size過小, 將造成CPU的利用率不高. 確定合適的size需要考慮:CPU數(shù), 內(nèi)存, 是計(jì)算密集型task還是I/O密集型task, 是否需要獲取稀缺資源(比如數(shù)據(jù)庫連接)等.
對(duì)于計(jì)算密集型task, 合適的size大約為CPU數(shù)量+1. 對(duì)于I/O占較大比例的task, 合適的size可以通過以下公式確定: size = CPU數(shù)量 * CPU利用率 * (1 + I/O時(shí)間比例). Runtime.getRuntime().availableProcessors()返回CPU的個(gè)數(shù).
當(dāng)然, 實(shí)際開發(fā)中size還受到內(nèi)存, 文件句柄, socket, 數(shù)據(jù)庫連接數(shù)等稀缺資源的約束. 將總的稀缺資源除以每一個(gè)task使用的資源數(shù), 能得到線程數(shù)的上限.?
?
循環(huán)并行化
如果循環(huán)體所進(jìn)行的操作是相互獨(dú)立的, 這樣的循環(huán)可以并發(fā)的運(yùn)行:
// 循環(huán)操作
void processSequentially(List<Element> elements) {
for (Element e : elements)
process(e);
}
// 將相互獨(dú)立的循環(huán)操作轉(zhuǎn)變?yōu)椴l(fā)操作
void processInParallel(Executor exec, List<Element> elements) {
for (final Element e : elements) {
exec.execute(new Runnable() {
public void run() {
process(e);
}
});
}
exec.shutdown();
exec.awaitTermination(Long.MAX_VALUE, TimeUnit.SECONDS);
}?
如果希望同時(shí)提交一系列task, 并且等待它們執(zhí)行完畢, 可以調(diào)用ExecutorService.invokeAll方法.
如果希望task執(zhí)行完畢之后就獲取其執(zhí)行結(jié)果, 可以使用CompletionService.
?