同步、異步、周期以及事務(wù)地執(zhí)行工作 
David Currie 將繼續(xù)他的由三部分組成的系列文章,介紹 Java 2 企業(yè)版(J2EE)連接器架構(gòu)(JCA)最新版本中的增強(qiáng)和變化。在本文中,他將介紹新的 JCA 1.5 工作管理合約,該合約允許資源適配器利用應(yīng)用服務(wù)器的某些功能來(lái)調(diào)度和處理工作。在 JCA 的另一個(gè)增強(qiáng) —— 事務(wù)流入 —— 的支持下,企業(yè)信息系統(tǒng)可以在自己的事務(wù)中執(zhí)行這項(xiàng)工作。

JCA 1.5 是 J2EE 連接器體系結(jié)構(gòu)的最新版本,它包含許多重要的改進(jìn)和一些重要增強(qiáng)。本文是介紹這些變化的由三部分組成的系列文章的第 2 部分,建立在介紹的生命周期管理特性的第 1 部分的基礎(chǔ)上,本文介紹的是 JCA 新的工作管理合約。該合約允許資源適配器為延遲執(zhí)行或定期執(zhí)行的工作創(chuàng)建計(jì)時(shí)器,并允許計(jì)時(shí)器使用應(yīng)用服務(wù)器的線程同步或異步執(zhí)行處理。本文將描述事務(wù)流入支持如何在資源適配器導(dǎo)入到服務(wù)器的事務(wù)中進(jìn)行這種處理,以及資源適配器隨后如何控制事務(wù)的完成。

如果想在現(xiàn)有資源適配器中使用這項(xiàng)功能,或者正在考慮編寫(xiě)新的 JCA 1.5 資源適配器,那么本文是一份必不可少的讀物。如果要編寫(xiě)使用資源適配器的應(yīng)用程序,想了解更多幕后的情況,那么本文也會(huì)吸引您。

讓工作完成
在本系列的第 1 部分中,我們介紹了 ResourceAdapter 接口,它提供了一種機(jī)制,能夠在應(yīng)用服務(wù)器內(nèi)部為資源適配器提供生命周期。您可能還記得 start 方法被用來(lái)傳遞一個(gè)叫做 BootstrapContext 的對(duì)象。上次我們介紹了這個(gè)對(duì)象,但是 BootstrapContext 接口的三個(gè)方法才是工作管理和事務(wù)流入合約的關(guān)鍵,如清單 1 所示:

清單 1. 用來(lái)得到三個(gè)工具對(duì)象的 BootstrapContext 接口
public interface BootstrapContext {

    WorkManager getWorkManager();
    XATerminator getXATerminator();
    Timer createTimer() throws UnavailableException;

}

WorkManager 允許資源適配器對(duì)工作進(jìn)行調(diào)度,在應(yīng)用服務(wù)器線程上同步或異步執(zhí)行調(diào)度。這個(gè)工作可以在資源導(dǎo)入的事務(wù)中執(zhí)行,在這種情況下,XATerminator 有助于完成工作。Timer 負(fù)責(zé)延遲工作或定期工作的執(zhí)行。本文將更深入地研究這三個(gè)類(lèi),并說(shuō)明如何使用它們。

WorkManager 接口提供了三套處理工作的方法(doWork、 startWorkscheduleWork),如清單 2 所示:

清單 2. 用來(lái)提交工作項(xiàng)目的WorkManager 接口
public interface WorkManager {
    
    void doWork(Work work) throws WorkException;
    void doWork(Work work, long startTimeout, ExecutionContext execContext,
            WorkListener workListener) throws WorkException;

    long startWork(Work work) throws WorkException;
    long startWork(Work work, long startTimeout, ExecutionContext execContext,
            WorkListener workListener) throws WorkException;

    void scheduleWork(Work work) throws WorkException;
    void scheduleWork(Work work, long startTimeout,
            ExecutionContext execContext, WorkListener workListener)
            throws WorkException;
    
}

每個(gè)方法接收的第一個(gè)參數(shù),都是實(shí)現(xiàn) Work 接口的對(duì)象的一個(gè)實(shí)例,如清單 3 所示:

清單 3. 資源適配器實(shí)現(xiàn)的 Work 接口
public interface Work extends Runnable {

    void release();
    
}

Work 接口擴(kuò)展了 Runnable 接口,您應(yīng)當(dāng)像直接進(jìn)行 Java 線程編程時(shí)所做的那樣,實(shí)現(xiàn)run 方法中執(zhí)行的工作。您很快就會(huì)看到 release 方法發(fā)揮其作用的地方。

WorkManager 上的 doWork 方法可以讓一些工作同步執(zhí)行、一直受阻塞或者直到某些工作完成才執(zhí)行。這看起來(lái)可能不是特別有用——這不就是直接調(diào)用 run 方法時(shí)發(fā)生的事情嗎?并不完全如此。首先,它讓?xiě)?yīng)用程序服務(wù)器說(shuō)明現(xiàn)在不是做這項(xiàng)工作的恰當(dāng)時(shí)候。例如,如果在 ResourceAdapterstart 方法的范圍內(nèi)調(diào)用 doWork,那么您可能發(fā)現(xiàn)它將拋出 WorkRejectedException 異常。應(yīng)當(dāng)盡快從這個(gè)方法返回,如果可能的話(huà),應(yīng)當(dāng)把工作安排成異步處理。

第二,如果應(yīng)用服務(wù)器特別繁忙,那么它可能會(huì)推遲這項(xiàng)工作的啟動(dòng)??梢杂玫?2 個(gè)startTimeout 參數(shù)指明資源適配器準(zhǔn)備為工作啟動(dòng)等候多長(zhǎng)時(shí)間。如果應(yīng)用服務(wù)器沒(méi)能在這個(gè)時(shí)間內(nèi)啟動(dòng)工作,那么就會(huì)拋出 WorkRejectedException 異常。WorkManager 接口定義了常量 IMMEDIATEINDEFINITE,它們?cè)试S資源適配器指明自己根本不準(zhǔn)備等候或者準(zhǔn)備一直等候下去。

第三,正如下一節(jié)解釋的,有可能讓工作片斷在資源適配器導(dǎo)入的事務(wù)上下文中執(zhí)行,而不是在與當(dāng)前線程關(guān)聯(lián)的上下文中執(zhí)行。這正是第 3 個(gè)參數(shù)的用途,該參數(shù)是一個(gè)可選的 ExecutionContext。

最后,使用 doWork 方法讓?xiě)?yīng)用服務(wù)器對(duì)資源適配器執(zhí)行的工作擁有更多控制。在關(guān)閉應(yīng)用服務(wù)器時(shí),如果資源適配器夾在一個(gè)漫長(zhǎng)的、復(fù)雜的操作中間,那么服務(wù)器不需要等待資源適配器完成,或者被清理干凈。相反,服務(wù)器可以通過(guò)調(diào)用 Work 對(duì)象的 release 方法,給資源適配器發(fā)信號(hào)。然后資源適配器應(yīng)當(dāng)盡快完成處理。清單 4 顯示了 release 方法使用的一個(gè)示例:

清單 4. Work 對(duì)象的示例
public class ExampleWork implements Work {

    private volatile boolean _released;

    void run() {
        for (int i = 0; i < 100; i++) {
            if (_released) break;
            // Do something
        }
    }

    void release() {
        _released = true;
    }
    
}

注意清單 4 中使用的關(guān)鍵字 volatile。在 run 方法正進(jìn)行其處理的同時(shí),會(huì)在一個(gè)獨(dú)立的線程上調(diào)用 release 方法,volatile 修飾符確保 run 方法能夠看到更新的字段。

doWork 方法也可能失敗,拋出一個(gè)名稱(chēng)古怪的 WorkCompletedException 異常。這個(gè)異常是在將 run 方法分配給一個(gè)線程時(shí)拋出的,但這個(gè)異?;蛘呤巧舷挛脑O(shè)置失敗,或者是通過(guò)拋出運(yùn)行時(shí)異常退出方法。doWork 方法會(huì)提供一個(gè)錯(cuò)誤代碼,表示這些故障發(fā)生的路徑,還把問(wèn)題原因作為鏈接的異常提供。

為什么等待?
您已經(jīng)看到 doWork 方法允許您在調(diào)用線程阻塞的同時(shí)執(zhí)行工作。但是如果您不想等待 —— 也就是說(shuō),如果您不想同步執(zhí)行工作,該怎么辦呢?在 Java 2 平臺(tái)標(biāo)準(zhǔn)版本(J2SE)環(huán)境中,可以使用多線程實(shí)現(xiàn)這一目標(biāo)。但是,J2EE 規(guī)范讓?xiě)?yīng)用程序服務(wù)器使用 Java 2 安全管理器防止應(yīng)用程序離開(kāi)自己的線程。如果應(yīng)用服務(wù)器的這部分功能是通過(guò) EJB 池或 servlet 池來(lái)提供并發(fā)性,那么這樣做是合理的。服務(wù)器也可能想把各種形式的上下文都關(guān)聯(lián)到一個(gè)線程上,因?yàn)楫a(chǎn)生新線程時(shí),上下文可能會(huì)丟失。最后,正如我以前討論過(guò)的,不受服務(wù)器控制的線程會(huì)造成有序關(guān)機(jī)很難實(shí)現(xiàn)。

雖然可以通過(guò)使用安全策略,逐個(gè)案例地克服這個(gè)限制,但是 WorkManager 上的 startWorkscheduleWork 方法可以讓資源適配器異步地處理工作,同時(shí)確保應(yīng)用程序服務(wù)器仍然在控制之下。startWork 方法會(huì)一直等候,直到工作片斷開(kāi)始執(zhí)行,但不用等到工作結(jié)束。所以,如果調(diào)用器需要知道工作是否已經(jīng)得以執(zhí)行,但是不需要等到工作完成,那么可以使用這種方法。相比之下,只要該工作調(diào)度被接受,scheduleWork 方法就立即返回。在這種情況下,并不能確保工作真的被執(zhí)行。

清單 2 中的 WorkManager 方法的第 4 個(gè)參數(shù)(WorkListener)在用于這些異步方法時(shí)最有用。清單 5 顯示了這個(gè)接口:

清單 5. 接收事件通知的 WorkListener 接口
public interface WorkListener {

    void workAccepted(WorkEvent e);
    void workRejected(WorkEvent e);
    void workStarted(WorkEvent e);
    void workCompleted(WorkEvent e);
    
}

資源適配器能夠有選擇地傳遞一個(gè)監(jiān)聽(tīng)器,當(dāng)工作的項(xiàng)目通過(guò)接受、啟動(dòng)或完成這幾個(gè)狀態(tài)(失敗的情況下則是拒絕)傳遞過(guò)來(lái)時(shí),監(jiān)聽(tīng)器會(huì)得到通知。在這個(gè)監(jiān)聽(tīng)器已經(jīng)完成其工作項(xiàng)目,或者出現(xiàn)故障要重新安排工作項(xiàng)目時(shí),可以用它將通知發(fā)送給工作的發(fā)起者。WorkAdapter 類(lèi)也包含在內(nèi),它提供了所有方法的默認(rèn)實(shí)現(xiàn),因此,子類(lèi)只需覆蓋自己感興趣的方法即可。

WorkListener 的每個(gè)方法都采用 WorkEvent 對(duì)象作為參數(shù),如清單 6 所示:

清單 6. WorkEvent 類(lèi)上的附加方法
public class WorkEvent extends EventObject {

    ...

    public int getType() { ... }
    public Work getWork() { ... }
    public long getStartDuration() { ... }
    public WorkException getException() { ... }
 
}

除了通常的事件方法之外,WorkEvent 類(lèi)還為這些類(lèi)型的事件 (接受、拒絕、啟動(dòng)或完成)以及有問(wèn)題的工作項(xiàng)目提供了存取器。這使您能夠用一個(gè)(多線程的)監(jiān)聽(tīng)器負(fù)責(zé)多個(gè)工作提交。還有一些方法可以返回啟動(dòng)工作所花費(fèi)的時(shí)間,而且,在出現(xiàn) workRejectedworkCompleted 時(shí),返回可能發(fā)生的對(duì)應(yīng)的 WorkRejectedExceptionWorkCompletedException 異常。

圖 1 顯示了通過(guò) Work 對(duì)象傳遞的狀態(tài)。

圖 1. Work 對(duì)象的狀態(tài)
Work 對(duì)象的狀態(tài)

沿著圖 1 的底部,有三種提交方法,從上到下的點(diǎn)線表示具體的方法返回的生命周期中的時(shí)間點(diǎn)。

可以推遲到明天的事為什么要現(xiàn)在做?
doWork、startWorkscheduleWork 方法可以都立即向 WorkManager 提交任務(wù)。在 WorkManager 執(zhí)行提交之前,可能會(huì)發(fā)生延遲,但是調(diào)用者只能控制最大延遲(用啟動(dòng)超時(shí))。那么如果想讓任務(wù)晚些而不是立即處理,該怎么辦呢?scheduleWork 方法只允許將工作安排到另一個(gè)線程,而不是安排到時(shí)間軸上的以后某個(gè)時(shí)間點(diǎn)上。

這正是 BootstrapContext 接口的第 3 個(gè)方法發(fā)揮其作用的地方。createTimer 方法使資源適配器能夠得到 java.util.Timer 類(lèi)的實(shí)例。這個(gè)類(lèi)從 1.3 版開(kāi)始就已經(jīng)成為標(biāo)準(zhǔn) Java 庫(kù)的一部分,如清單 7 所示:

清單 7. Timer 類(lèi)的方法
public class Timer  {
 
    public void schedule(TimerTask task, long delay) { ... }
    public void schedule(TimerTask task, Date time) { ... }

    public void schedule(TimerTask task, Date firstTime, long period) { ... }
    public void schedule(TimerTask task, long delay, long period) { ... }
    public void scheduleAtFixedRate(TimerTask task, Date firstTime, long period) { ... }
    public void scheduleAtFixedRate(TimerTask task, long delay, long period) { ... }

    public void cancel() { ... }
 
}

可以用 java.util.Timer 類(lèi)的前兩個(gè)方法把任務(wù)安排在指定延遲之后或指定日期和時(shí)間發(fā)生。余下的 4 個(gè)調(diào)度方法還有額外的 period 參數(shù)??梢杂盟鼈儗?duì)那些初次運(yùn)行之后需要按照常規(guī)間隔發(fā)生的事件進(jìn)行調(diào)度,使用周期參數(shù)指定時(shí)間間隔。schedulescheduleAtFixedRate 方法是不同的方法,因?yàn)檫@些操作的計(jì)時(shí)無(wú)法保證;像垃圾搜集這樣的操作可能會(huì)造成任務(wù)推遲執(zhí)行。如果任務(wù)被延遲,那么 schedule 方法在運(yùn)行下一個(gè)任務(wù)之前仍然會(huì)等候一個(gè)完整的周期。但是,scheduleAtFixedRate 方法是在前一個(gè)任務(wù) 應(yīng)當(dāng) 運(yùn)行的固定周期之后運(yùn)行下一個(gè)任務(wù)。所以,如果任務(wù)之間的時(shí)間對(duì)您非常很重要,那么請(qǐng)使用 schedule。如果絕對(duì)時(shí)間或累積時(shí)間很重要,那么請(qǐng)使用 scheduleAtFixedRate

每個(gè) schedule 方法都采用一個(gè)擴(kuò)展了 TimerTask 類(lèi)的對(duì)象作為自己的第一個(gè)參數(shù)。同使用 Work 接口時(shí)一樣,這個(gè)類(lèi)擴(kuò)展了 Runnable ,而且 Timer 會(huì)在適當(dāng)?shù)臅r(shí)間調(diào)用 run 方法。TimerTask 類(lèi)有一個(gè) cancel 方法,可以用它取消對(duì)任務(wù)的后續(xù)調(diào)用。或者,也可以調(diào)用 Timer 上的 cancel 方法來(lái)取消目前安排的所有任務(wù)。scheduledExecutionTime 方法允許 run 方法將當(dāng)前實(shí)際調(diào)用時(shí)間和它應(yīng)當(dāng)被調(diào)用的時(shí)間進(jìn)行比較。

雖然 TimerTaskrun 方法是在新線程中調(diào)用 ,但是這個(gè)線程是 JVM 已經(jīng)分配的線程,處于應(yīng)用服務(wù)器的控制之外。如果您想讓資源適配器進(jìn)行嚴(yán)肅的處理,那么在這個(gè)時(shí)候,應(yīng)當(dāng)用 WorkManager 切換到應(yīng)用服務(wù)器的線程。清單 8 顯示的示例采用了這一良好實(shí)踐來(lái)調(diào)度工作,從當(dāng)前時(shí)間開(kāi)始每分鐘執(zhí)行一次:

清單 8. 組合使用 TimerWorkManager
final WorkManager workManager = context.getWorkManager();
final Timer timer = context.createTimer();

timer.scheduleAtFixedRate(new TimerTask() {
    public void run() {
        workManager.scheduleWork(new ExampleWork());
    }
}, 0, 60 * 1000);

JCA WorkManager 和 Timer 的替代方案
正如前一小節(jié)提到過(guò)的,在 JCA 中,使用 Timer 有一個(gè)問(wèn)題:執(zhí)行 TimerTask 的線程不在應(yīng)用服務(wù)器的控制之下。因?yàn)?Timer 是類(lèi)而不是接口,而且線程實(shí)際是在類(lèi)的構(gòu)造函數(shù)中產(chǎn)生的, 所以應(yīng)用服務(wù)器不能修改這種行為。允許應(yīng)用服務(wù)器實(shí)現(xiàn)這一目的一個(gè)替代方案就是使用由 IBM 和 BEA 聯(lián)合開(kāi)發(fā)的 Timer and Work Manager for Application Servers 規(guī)范的接口(請(qǐng)參閱 參考資料) 。

在使用這個(gè)規(guī)范的接口時(shí),會(huì)從 Java 名稱(chēng)與目錄服務(wù)接口(JNDI)得到一個(gè) TimerManager 實(shí)現(xiàn)??梢酝ㄟ^(guò)給管理器安排了一個(gè) TimerListener 實(shí)現(xiàn)來(lái)返回一個(gè) Timer (是一個(gè)commonj.timers.Timer 而不是 java.util.Timer)。在安排好的時(shí)間上會(huì)調(diào)用 TimerListener,可以用 Timer 執(zhí)行諸如取消預(yù)定操作之類(lèi)的操作。

顧名思義,這個(gè)規(guī)范還提供了 JCA 工作管理的一個(gè)替代。這里的接口與 JCA 的那些接口很相似,除了初始的 commmonj.work.WorkManager 是從 JNDI 得到的。這個(gè)合約提供的附加行為還包括:阻塞到一個(gè)或全部預(yù)定工作完成的能力、在遠(yuǎn)程 JVM 上執(zhí)行可以序列化的工作的可能性。與 commonj Timer 一樣,commonj WorkManager 的應(yīng)用不限于資源適配器。任何服務(wù)器端 J2EE 組件都可以使用這項(xiàng)功能。

清單 9 顯示了為了使用 commonj 的類(lèi)而被重寫(xiě)的 清單 8 的示例:

清單 9. 使用 commonj 接口
final InitialContext context = new InitialContext();
final WorkManager workManager = (WorkManager) 
        context.lookup("java:comp/env/wm/MyWorkManager");
final TimerManager timerManager = (TimerManager)
	context.lookup("java:comp/env/timer/MyTimer");

timerManager.scheduleAtFixedRate(new TimerListener() {
    public void timerExpired(final Timer timer) {
        try {
            workManager.schedule(new ExampleCommonjWork());
        } catch (final WorkException exception) {
        }
    }
}, 0, 60 * 1000);

ExampleCommonjWork 類(lèi)與 清單 4ExampleWork 相同,此外,它還實(shí)現(xiàn)了 commonj Work 接口要求的額外的 isDaemon 方法。如果工作長(zhǎng)期存在,那么該方法應(yīng)當(dāng)返回 true

導(dǎo)入事務(wù)并完成事務(wù)
在 JCA 1.5 之前,應(yīng)用服務(wù)器總是充當(dāng)事務(wù)的協(xié)調(diào)者。應(yīng)用程序負(fù)責(zé)器負(fù)責(zé)啟動(dòng)事務(wù),而且在每個(gè)資源管理器通過(guò) JCA 連接管理器登記了自己的 XAResource 之后,還通過(guò)一起提交或回滾所有資源來(lái)協(xié)調(diào)事務(wù)的完成。JCA 1.5 的事務(wù)流入合約允許企業(yè)信息系統(tǒng)(EIS)啟動(dòng)和完成事務(wù),充當(dāng)事務(wù)的協(xié)調(diào)者。這樣 EIS 就能通過(guò)資源適配器把事務(wù)導(dǎo)入應(yīng)用服務(wù)器,并在事務(wù)范圍內(nèi)在服務(wù)器上執(zhí)行工作。例如,它可能使用 Supports 的容器管理器事務(wù)屬性來(lái)調(diào)用消息驅(qū)動(dòng) bean(MDB)。這樣,MDB 方法執(zhí)行的工作,包括對(duì)其它 EJB 的調(diào)用,就會(huì)成為事務(wù)的一部分。

資源適配器通過(guò)第 3 個(gè) ExecutionContext 參數(shù)導(dǎo)入事務(wù),在 清單 2 中可以看到它被傳遞給 WorkManager。正如在清單 10 中可以看到的,這個(gè)類(lèi)擁有設(shè)置事務(wù) ID (XID) 和事務(wù)超時(shí)的方法:

清單 10. 傳遞給 WorkManagerExecutionContext 類(lèi)
public class ExecutionContext {

    public ExecutionContext() { ... }
    public void setXid(Xid xid) { ... }
    public Xid getXid() { ... }
    public void setTransactionTimeout(long timeout)
        throws NotSupportedException { ... }
    public long getTransactionTimeout() { ... }    
    
}

XID 惟一地標(biāo)識(shí)事務(wù),它由三部分構(gòu)成:格式標(biāo)識(shí)符、事務(wù)界定符和分支界定符。如果 EIS 還沒(méi)有事務(wù)的 XID,那么就必須根據(jù) XA 規(guī)范(請(qǐng)參閱 參考資料)構(gòu)建一個(gè) XID。然后應(yīng)用服務(wù)器會(huì)在調(diào)用工作對(duì)象的 run 方法之前把這個(gè)事務(wù)與執(zhí)行線程關(guān)聯(lián)起來(lái)。

把事務(wù)導(dǎo)入應(yīng)用服務(wù)器之后,接下來(lái)就由資源適配器負(fù)責(zé)把屬于這個(gè)事務(wù)的事件通知給服務(wù)器。具體地說(shuō),它必須把事務(wù)完成通知給應(yīng)用服務(wù)器。它是通過(guò)清單 11 中的 XATerminator 接口做到這一點(diǎn)的,可以從 BootstrapContext 中得到它的實(shí)現(xiàn):

清單 11. 用于事務(wù)完成的 XATerminator 類(lèi)
public interface XATerminator {

    void commit(Xid xid, boolean onePhase) throws XAException;
    void forget(Xid xid) throws XAException;
    int prepare(Xid xid) throws XAException;
    Xid[] recover(int flag) throws XAException;
    void rollback(Xid xid) throws XAException;
    
}

XATerminator 接口的方法與 XAResource 上的方法對(duì)應(yīng),只是在這個(gè)例子中,資源適配器需要調(diào)用應(yīng)用程序服務(wù)器。典型情況下,如果事務(wù)中包含不止一個(gè)資源,那么資源適配器會(huì)調(diào)用 XATerminatorprepare 方法,傳遞與 ExecutionContext 中傳遞的 Xid 相同的 Xid。如果所有的資源都返回 XA_OK (或者 XA_RDONLY),那么資源適配器會(huì)接著調(diào)用 commit;否則就會(huì)調(diào)用 rollback。

結(jié)束語(yǔ)
本文介紹了如何用 WorkManager 接口對(duì)工作進(jìn)行調(diào)度,以便在應(yīng)用程序的控制下處理這些工作。您已經(jīng)看到如何用 Timer 的實(shí)例在以后某個(gè)時(shí)間執(zhí)行工作或定期執(zhí)行工作,還了解了使用 JCA 提供的對(duì)象的 commonj 替代方案。您已經(jīng)看到資源適配器如何在自己導(dǎo)入到應(yīng)用服務(wù)器的事務(wù)中執(zhí)行工作,并用 XATerminator 接口控制事務(wù)的完成。在本系列的第 3 篇也是最后一篇文章中,我將介紹 JCA 1.5 消息流入合約,它比較出名的地方是對(duì)消息驅(qū)動(dòng) bean 的支持。