@import url(http://m.tkk7.com/CuteSoft_Client/CuteEditor/Load.ashx?type=style&file=SyntaxHighlighter.css);@import url(/css/cuteeditor.css);
在多線程大師Doug Lea的貢獻(xiàn)下,在JDK1.5中加入了許多對(duì)并發(fā)特性的支持,例如:線程池。
一、簡(jiǎn)介
線程池類為 java.util.concurrent.ThreadPoolExecutor,常用構(gòu)造方法為:
ThreadPoolExecutor(int corePoolSize, int maximumPoolSize,
long keepAliveTime, TimeUnit unit,
BlockingQueue workQueue,
RejectedExecutionHandler handler)
corePoolSize: 線程池維護(hù)線程的最少數(shù)量,
maximumPoolSize:線程池維護(hù)線程的最大數(shù)量
keepAliveTime: 線程池維護(hù)線程所允許的空閑時(shí)間
unit: 線程池維護(hù)線程所允許的空閑時(shí)間的單位
workQueue: 線程池所使用的緩沖隊(duì)列
handler: 線程池對(duì)拒絕任務(wù)的處理策略
一個(gè)任務(wù)通過 execute(Runnable)方法被添加到線程池,任務(wù)就是一個(gè) Runnable類型的對(duì)象,任務(wù)的執(zhí)行方法就是 Runnable類型對(duì)象的run()方法。
當(dāng)一個(gè)任務(wù)通過execute(Runnable)方法欲添加到線程池時(shí):
如果此時(shí)線程池中的數(shù)量小于corePoolSize,即使線程池中的線程都處于空閑狀態(tài),也要?jiǎng)?chuàng)建新的線程來(lái)處理被添加的任務(wù)。
如果此時(shí)線程池中的數(shù)量等于 corePoolSize,但是緩沖隊(duì)列 workQueue未滿,那么任務(wù)被放入緩沖隊(duì)列。
如果此時(shí)線程池中的數(shù)量大于corePoolSize,緩沖隊(duì)列workQueue滿,并且線程池中的數(shù)量小于maximumPoolSize,建新的線程來(lái)處理被添加的任務(wù)。
如果此時(shí)線程池中的數(shù)量大于corePoolSize,緩沖隊(duì)列workQueue滿,并且線程池中的數(shù)量等于maximumPoolSize,那么通過 handler所指定的策略來(lái)處理此任務(wù)。
也就是:處理任務(wù)的優(yōu)先級(jí)為:
核心線程corePoolSize、任務(wù)隊(duì)列workQueue、最大線程maximumPoolSize,如果三者都滿了,使用handler處理被拒絕的任務(wù)。
當(dāng)線程池中的線程數(shù)量大于 corePoolSize時(shí),如果某線程空閑時(shí)間超過keepAliveTime,線程將被終止。這樣,線程池可以動(dòng)態(tài)的調(diào)整池中的線程數(shù)。
unit可選的參數(shù)為java.util.concurrent.TimeUnit中的幾個(gè)靜態(tài)屬性:
NANOSECONDS、MICROSECONDS、MILLISECONDS、SECONDS。
workQueue我常用的是:java.util.concurrent.ArrayBlockingQueue
handler有四個(gè)選擇:
ThreadPoolExecutor.AbortPolicy()
拋出java.util.concurrent.RejectedExecutionException異常
ThreadPoolExecutor.CallerRunsPolicy()
重試添加當(dāng)前的任務(wù),他會(huì)自動(dòng)重復(fù)調(diào)用execute()方法
ThreadPoolExecutor.DiscardOldestPolicy()
拋棄舊的任務(wù)
ThreadPoolExecutor.DiscardPolicy()
拋棄當(dāng)前的任務(wù)
import java.io.Serializable;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
public class TestThreadPool {
private static int produceTaskSleepTime = 2;
private static int consumeTaskSleepTime = 2000;
private static int produceTaskMaxNumber = 10;
public static void main(String[] args) {
//構(gòu)造一個(gè)線程池
ThreadPoolExecutor threadPool = new ThreadPoolExecutor(2, 4, 3,
TimeUnit.SECONDS, new ArrayBlockingQueue(3),
new ThreadPoolExecutor.DiscardOldestPolicy());
for(int i=1;i<=produceTaskMaxNumber;i++){
try {
//產(chǎn)生一個(gè)任務(wù),并將其加入到線程池
String task = "task@ " + i;
System.out.println("put " + task);
threadPool.execute(new ThreadPoolTask(task));
//便于觀察,等待一段時(shí)間
Thread.sleep(produceTaskSleepTime);
} catch (Exception e) {
e.printStackTrace();
}
}
}
/**
* 線程池執(zhí)行的任務(wù)
* @author hdpan
*/
public static class ThreadPoolTask implements Runnable,Serializable{
private static final long serialVersionUID = 0;
//保存任務(wù)所需要的數(shù)據(jù)
private Object threadPoolTaskData;
ThreadPoolTask(Object tasks){
this.threadPoolTaskData = tasks;
}
public void run(){
//處理一個(gè)任務(wù),這里的處理方式太簡(jiǎn)單了,僅僅是一個(gè)打印語(yǔ)句
System.out.println("start .."+threadPoolTaskData);
try {
////便于觀察,等待一段時(shí)間
Thread.sleep(consumeTaskSleepTime);
} catch (Exception e) {
e.printStackTrace();
}
threadPoolTaskData = null;
}
public Object getTask(){
return this.threadPoolTaskData;
}
}
}
說明:
1、在這段程序中,一個(gè)任務(wù)就是一個(gè)Runnable類型的對(duì)象,也就是一個(gè)ThreadPoolTask類型的對(duì)象。
2、一般來(lái)說任務(wù)除了處理方式外,還需要處理的數(shù)據(jù),處理的數(shù)據(jù)通過構(gòu)造方法傳給任務(wù)。
3、在這段程序中,main()方法相當(dāng)于一個(gè)殘忍的領(lǐng)導(dǎo),他派發(fā)出許多任務(wù),丟給一個(gè)叫 threadPool的任勞任怨的小組來(lái)做。
這個(gè)小組里面隊(duì)員至少有兩個(gè),如果他們兩個(gè)忙不過來(lái), 任務(wù)就被放到任務(wù)列表里面。
如果積壓的任務(wù)過多,多到任務(wù)列表都裝不下(超過3個(gè))的時(shí)候,就雇傭新的隊(duì)員來(lái)幫忙。但是基于成本的考慮,不能雇傭太多的隊(duì)員, 至多只能雇傭 4個(gè)。
如果四個(gè)隊(duì)員都在忙時(shí),再有新的任務(wù), 這個(gè)小組就處理不了了,任務(wù)就會(huì)被通過一種策略來(lái)處理,我們的處理方式是不停的派發(fā), 直到接受這個(gè)任務(wù)為止(更殘忍!呵呵)。
因?yàn)殛?duì)員工作是需要成本的,如果工作很閑,閑到 3SECONDS都沒有新的任務(wù)了,那么有的隊(duì)員就會(huì)被解雇了,但是,為了小組的正常運(yùn)轉(zhuǎn),即使工作再閑,小組的隊(duì)員也不能少于兩個(gè)。
4、通過調(diào)整 produceTaskSleepTime和 consumeTaskSleepTime的大小來(lái)實(shí)現(xiàn)對(duì)派發(fā)任務(wù)和處理任務(wù)的速度的控制, 改變這兩個(gè)值就可以觀察不同速率下程序的工作情況。
5、通過調(diào)整4中所指的數(shù)據(jù),再加上調(diào)整任務(wù)丟棄策略, 換上其他三種策略,就可以看出不同策略下的不同處理方式。
6、對(duì)于其他的使用方法,參看jdk的幫助,很容易理解和使用。
發(fā)郵件示例:
在普通的web應(yīng)用中,發(fā)送郵件應(yīng)該只能算小任務(wù),而使用jms來(lái)發(fā)送郵件有點(diǎn)殺雞用牛刀的味道,那么如果能建立一個(gè)線程池來(lái)管理這些小線程并重復(fù)使用他們,應(yīng)該來(lái)說是一個(gè)簡(jiǎn)單有效的方案,我們可以使用concurrent包中的Executors來(lái)建立線程池,Executors是一個(gè)工廠,也是一個(gè)工具類,我把它的api的介紹簡(jiǎn)單的翻譯了一下(如果翻譯有誤請(qǐng)大家不要吝嗇手中的磚頭)
/**
* 由spring管理的線程池類,返回的ExecutorService就是給我們來(lái)執(zhí)行線程的
*如果不交給spring管理也是可以的,可以使用單例模式來(lái)實(shí)現(xiàn)同樣功能,但是poolSize *要hardcode了
* @author 張榮華(ahuaxuan)
* @version $Id$
*/
public class EasyMailExecutorPool implements InitializingBean {
//線程池大小,spring配置文件中配置
private int poolSize;
private ExecutorService service;
public ExecutorService getService() {
return service;
}
public int getPoolSize() {
return poolSize;
}
public void setPoolSize(int poolSize) {
this.poolSize = poolSize;
}
/**
* 在 bean 被初始化成功之后初始化線程池大小
*/
public void afterPropertiesSet() throws Exception {
service = Executors.newFixedThreadPool(poolSize);
}
}
這樣我們就初始化了線程池的大小,接下來(lái)就是如何使用這個(gè)線程池中的線程了,我們看看MailService是如何來(lái)使用線程池中的線程的,這個(gè)類中的代碼我已經(jīng)作了詳細(xì)的解釋
代碼
/**
* 用來(lái)發(fā)送 mail 的 service, 其中有一個(gè)內(nèi)部類專門用來(lái)供線程使用
* @author 張榮華(ahuaxuan)
* @since 2007-7-11
* @version $Id$
*/
public class EasyMailServieImpl implements EasyMailService{
private static transient Log logger = LogFactory.getLog(EasyMailServieImpl.class);
//注入MailSender
private JavaMailSender javaMailSender;
//注入線程池
private EasyMailExecutorPool easyMailExecutorPool;
//設(shè)置發(fā)件人
private String from;
public void setEasyMailExecutorPool(EasyMailExecutorPool easyMailExecutorPool) {
this.easyMailExecutorPool = easyMailExecutorPool;
}
public void setJavaMailSender(JavaMailSender javaMailSender) {
this.javaMailSender = javaMailSender;
}
public void setFrom(String from) {
this.from = from;
}
/**
* 簡(jiǎn)單的郵件發(fā)送接口,感興趣的同學(xué)可以在這個(gè)基礎(chǔ)上繼續(xù)添加
* @param to
* @param subject
* @param text
*/
public void sendMessage(EmailEntity email){
if (null == email) {
if (logger.isDebugEnabled()) {
logger.debug("something you need to tell here");
}
return;
}
SimpleMailMessage simpleMailMessage = new SimpleMailMessage();
simpleMailMessage.setTo(email.getTo());
simpleMailMessage.setSubject(email.getSubject());
simpleMailMessage.setText(email.getText());
simpleMailMessage.setFrom(from);
easyMailExecutorPool.getService().execute(new MailRunner(simpleMailMessage));
}
/**
* 發(fā)送復(fù)雜格式郵件的接口,可以添加附件,圖片,等等,但是需要修改這個(gè)方法,
* 如何做到添加附件和圖片論壇上有例子了,需要的同學(xué)搜一下,
* 事實(shí)上這里的text參數(shù)最好是來(lái)自于模板,用模板生成html頁(yè)面,然后交給javamail去發(fā)送,
* 如何使用模板來(lái)生成html見 {@link http://www.iteye.com/topic/71430 }
*
* @param to
* @param subject
* @param text
* @throws MessagingException
*/
public void sendMimeMessage(EmailEntity email) throws MessagingException {
if (null == email) {
if (logger.isDebugEnabled()) {
logger.debug("something you need to tell here");
}
return;
}
MimeMessage message = javaMailSender.createMimeMessage();
MimeMessageHelper helper = new MimeMessageHelper(message);
helper.setTo(email.getTo());
helper.setFrom(from);
helper.setSubject(email.getSubject());
this.addAttachmentOrImg(helper, email.getAttachment(), true);
this.addAttachmentOrImg(helper, email.getImg(), false);
//這里的text是html格式的, 可以使用模板引擎來(lái)生成html模板, velocity或者freemarker都可以做到
helper.setText(email.getText(),true);
easyMailExecutorPool.getService().execute(new MailRunner(message));
}
/**
* 添加附件或者是圖片
* @param helper
* @param map
* @param isAttachment
* @throws MessagingException
*/
private void addAttachmentOrImg(MimeMessageHelper helper, Map map, boolean isAttachment) throws MessagingException {
for (Iterator it = map.entrySet().iterator(); it.hasNext();) {
Map.Entry entry = (Map.Entry) it.next();
String key = (String) entry.getKey();
String value = (String) entry.getValue();
if (StringUtils.isNotBlank(key) && StringUtils.isNotBlank(value)) {
FileSystemResource file = new FileSystemResource(new File(value));
if (!file.exists()) continue;
if (isAttachment) {
helper.addAttachment(key, file);
} else {
helper.addInline(key, file);
}
}
}
}
/**
* 用來(lái)發(fā)送郵件的 Runnable, 該類是一個(gè)內(nèi)部類,之所以使用內(nèi)部類,而沒有使用嵌套類(靜態(tài)內(nèi)部類),
* 是因?yàn)閮?nèi)部類可以之間得到 service 的 javaMailSender
* 每次發(fā)送郵件都會(huì)從線程池中取一個(gè)線程, 然后進(jìn)行發(fā)郵件操作
* @author ahuaxuan
*/
private class MailRunner implements Runnable {
SimpleMailMessage simpleMailMessage;
MimeMessage mimeMessage;
/**
* 構(gòu)造簡(jiǎn)單文本郵件
* @param simpleMailMessage
*/
public MailRunner(SimpleMailMessage simpleMailMessage) {
if (mimeMessage == null) {
this.simpleMailMessage = simpleMailMessage;
}
}
/**
* 構(gòu)造復(fù)雜郵件,可以添加附近,圖片,等等
* @param mimeMessage
*/
public MailRunner(MimeMessage mimeMessage) {
if (simpleMailMessage == null) {
this.mimeMessage = mimeMessage;
}
}
/**
* 該方法將在線程池中的線程中執(zhí)行
*/
public void run() {
try {
if (simpleMailMessage != null) {
javaMailSender.send(this.simpleMailMessage);
} else if (mimeMessage != null) {
javaMailSender.send(this.mimeMessage);
}
} catch (Exception e) {
if (logger.isDebugEnabled()) {
logger.debug("logger something here", e);
}
}
}
}
}
MailService中的EmailEntity是對(duì)郵件的抽象(我只使用了失血模型,事實(shí)上我們也可以讓這個(gè)EmailEntity來(lái)實(shí)現(xiàn)Runnable接口,這樣Service中的內(nèi)部類就可以去掉了,同時(shí)service中的大部分代碼就要搬到EmailEntity及其父類里了,大家更傾向于怎么做呢?),代碼如下:
代碼
/**
* 該類是對(duì)郵件的抽象,郵件有哪些屬性,這個(gè)類就有哪些屬性 顯然這個(gè)只是一個(gè)例子,
* 這個(gè)例子中附帶mimemessage發(fā)送所需的附件或者圖片(如果有的話)
* 需要使用的同學(xué)自己擴(kuò)展
*
* @author 張榮華(ahuaxuan)
* @version $Id$
*/
public class EmailEntity {
String to;
String subject;
String text;
//郵件附件
Map