在前一節實現異步調用的基礎上 , 現在我們來看一下一個完善的 Java 異步消息處理機制 .
[ 寫在本節之前 ]
在所有這些地方 , 我始終沒有提到設計模式這個詞 , 而事實上 , 多線程編程幾乎每一步都在應該設計模式 . 你只要能恰如其份地應用它 , 為什么要在意你用了某某名稱的模式呢 ?
一個說書人它可以把武功招數說得天花亂墜 , 引得一班聽書客掌聲如雷 , 但他只是說書的 . 真正的武林高手也許并不知道自己的招式在說書人口中叫什么 , 這不重要 , 重要的是他能在最恰當的時機把他不知名的招式發揮到極致 !
你了解再多的設計模式 , 或你寫出了此類的著作 , 并不重要 , 重要的是你能應用它設計出性能卓越的系統 .
本節的這個例子 , 如果你真正的理解了 , 不要懷疑自己 , 你已經是 Java 高手的行列了 . 如果你拋開本節的內容 , 五天后能自己獨立地把它再實現一次 , 那你完全可以不用再看我寫的文章系列了 , 至少是目前 , 我再也沒有更高級的內容要介紹了 .
上節的 Java 異步調用為了簡化調用關系 , 很多角色被合并到一個類中實現 , 為了幫助大家改快地抓住核心的流程 . 那么一個真正的異步消息處理器 , 當然不是這樣的簡單 .
一. 它要能適應不同類型的請求 :
本節用 makeString 來說明要求有返回值的請求 . 用 displayString 來說明不需要返回值的請求 .
二. 要能同時并發處理多個請求 , 并能按一定機制調度 :
本節將用一個隊列來存放請求 , 所以只能按 FIFO 機制調度 , 你可以改用 LinkedList, 就可以簡單實現一個優先級 ( 優先級高的 addFirst, 低的 addLast).
三. 有能力將調用的邊界從線程擴展到機器間 (RMI)
四. 分離過度耦合 , 如分離調用句柄 ( 取貨憑證 ) 和真實數據的實現 . 分離調用和執行的過程 , 可以盡快地將調返回 .
現在看具體的實現 :
public interface Axman {
Result resultTest(int count,char c);
void noResultTest(String str);
}
這個接口有兩個方法要實現 , 就是有返回值的調用 resultTest 和不需要返回值的調用
noResultTest, 我們把這個接口用一個代理類來實現 , 目的是將方法調用轉化為對象 , 這樣就可以將多個請求 ( 多個方法調 ) 放到一個容器中緩存起來 , 然后統一處理 , 因為 Java 不支持方法指針 , 所以把方法調用轉換為對象 , 然后在這個對象上統一執行它們的方法 , 不僅可以做到異步處理 , 而且可以將代表方法調用的請求對象序列化后通過網絡傳遞到另一個機器上執行 (RMI). 這也是 Java 回調機制最有力的實現 .
一個簡單的例子 .
如果 1: 做 A
如果 2: 做 B
如果 3: 做 C
如果有 1000 個情況 , 你不至于用 1000 個 case 吧 ? 以后再增加呢 ?
所以如果 C/C++ 程序員 , 會這樣實現 : (c 和 c++ 定義結構不同 )
type define struct MyStruct{
int mark;
(*fn) ();
} MyList;
然后你可以聲明這個結構數據 :
{1,A,
2,B
3,C
}
做一個循環 :
for(i=0;i<length;i++) {
if( 數據組 [i].mark == 傳入的值 ) ( 數據組 [i].*fn)();
}
簡單說 c/c++ 中將要被調用的涵數可以被保存起來 , 然后去訪問 , 調用 , 而 Java 中 , 我們無法將一個方法保存 , 除了直接調用 , 所以將要調用的方法用子類來實現 , 然后把這些子類實例保存起來 , 然后在這些子類的實現上調用方法 :
interface My{
void test();
}
class A implements My{
public void test(){
System.out.println(“A”):
}
}
class B implements My{
public void test(){
System.out.println(“B”):
}
}
class C implements My{
public void test(){
System.out.println(“C”):
}
}
class MyStruct {
int mark;
My m;
public MyStruct(int mark,My m){this.mark = amrk;this.m = m}
}
數組 :
{ new MyStruct(1,new A()),new MyStruct(2,new B()),new MyStruct(3,new C())}
for(xxxxxxxxx) if( 參數 == 數組 [i].mark) 數組 [i].m.test();
這樣把要調用的方法轉換為對象的保程不僅僅是可以對要調用的方法進行調度 , 而且可以把對象序列化后在另一臺機器上執行 , 這樣就把調用邊界從線程擴展到了機器 .
回到我們的例子 :
class Proxy implements Axman{
private final Scheduler scheduler;
private final Servant servant;
public Proxy(Scheduler scheduler,Servant servant){
this.scheduler = scheduler;
this.servant = servant;
}
public Result resultTest(int count,char c){
FutureResult futrue = new FutureResult();
this.scheduler.invoke(new ResultRequest(servant,futrue,count,c));
return futrue;
}
public void noResultTest(String str){
this.scheduler.invoke(new NoResultRequest(this.servant,str));
}
}
其中 scheduler 是管理對調用的調度 , servant 是真正的對方法的執行 :
Servant 就是去真實地實現方法 :
class Servant implements Axman{
public Result resultTest(int count,char c){
char[] buf = new char[count];
for(int i = 0;i < count;i++){
buf[i] = c;
try{
Thread.sleep(100);
}catch(Throwable t){}
}
return new RealResult(new String(buf));
}
public void noResultTest(String str){
try{
System.out.println("displayString :" + str);
Thread.sleep(10);
}catch(Throwable t){}
}
}
在 scheduler 將方法的調用 (invkoe) 和執行 (execute) 進行了分離 , 調用就是開始 ” 注冊 ” 方法到要執行的容器中 , 這樣就可以立即返回出來 . 真正執行多久就是 execute 的事了 , 就象一個人點燃爆竹的引信就跑了 , 至于那個爆竹什么時候爆炸就不是他能控制的了 .
public class Scheduler extends Thread {
private final ActivationQueue queue;
public Scheduler(ActivationQueue queue){
this.queue = queue;
}
public void invoke(MethodRequest request){
this.queue.putRequest(request);
}
public void run(){
while(true){
// 如果隊列中有請求線程 , 測開始執行請求
MethodRequest request = this.queue.takeRequest();
request.execute();
}
}
}
在 schetbduler 中只用一個隊列來保存代表方法和請求對象 , 實行簡單的 FIFO 調用 , 你要實更復雜的調度就要在這里重新實現 :
class ActivationQueue{
private static final int MAX_METHOD_REQUEST = 100;
private final MethodRequest[] requestQueue;
private int tail;
private int head;
private int count;
public ActivationQueue(){
this.requestQueue = new MethodRequest[MAX_METHOD_REQUEST];
this.head = this.count = this.tail = 0;
}
public synchronized void putRequest(MethodRequest request){
while(this.count >= this.requestQueue.length){
try {
this.wait();
}
catch (Throwable t) {}
}
this.requestQueue[this.tail] = request;
tail = (tail + 1)%this.requestQueue.length;
count ++ ;
this.notifyAll();
}
public synchronized MethodRequest takeRequest(){
while(this.count <= 0){
try {
this.wait();
}
catch (Throwable t) {}
}
MethodRequest request = this.requestQueue[this.head];
this.head = (this.head + 1) % this.requestQueue.length;
count --;
this.notifyAll();
return request;
}
}
為了將方法調用轉化為對象 , 我們通過實現 MethodRequestb 對象的 execute 方法來方法具體方法轉換成具體對象 :
abstract class MethodRequest{
protected final Servant servant;
}