本篇blog將講述coroutine的一些背景知識,以及在Java中如何使用Coroutine,包括一個簡單的benchmark對比,希望能借助這篇blog讓大家了解到更多在java中使用coroutine的方法,本篇blog的PDF版本可從此下載:http://www.bluedavy.com/open/UseCoroutineInJava.pdf
在講到具體內容之前,不能不先講下Coroutine的一些背景知識,來先具體了解下什么是Coroutine。
1.
背景知識
現在的操作系統都是支持多任務的,多任務可通過多進程或多線程的方式去實現,進程和線程的對比就不在這里說了,在多任務的調度上操作系統采取搶占式和協作式兩種方式,搶占式是指操作系統給每個任務一定的執行時間片,在到達這個時間片后如任務仍然未釋放對CPU的占用,那么操作系統將強制釋放,這是目前多數操作系統采取的方式;協作式是指操作系統按照任務的順序來分配CPU,每個任務執行過程中除非其主動釋放,否則將一直占據CPU,這種方式非常值得注意的是一旦有任務占據CPU不放,會導致其他任務”餓死”的現象,因此操作系統確實不太適合采用這種方式。
說完操作系統多任務的調度方式后,來看看通常程序是如何實現支持高并發的,一種就是典型的基于操作系統提供的多進程或多線程機制,每個任務占據一個進程或一個線程,當任務中有IO等待等動作時,則將進程或線程放入待調度隊列中,這種方式是目前大多數程序采取的方式,這種方式的壞處在于如想支持高的并發量,就不得不創建很多的進程或線程,而進程和線程都是要消耗不少系統資源的,另外一方面,進程或線程創建太多后,操作系統需要花費很多的時間在進程或線程的切換上,切換動作需要做狀態保持和恢復,這也會消耗掉很多的系統資源;另外一種方式則是每個任務不完全占據一個進程或線程,當任務執行過程中需要進行IO等待等動作時,任務則將其所占據的進程或線程釋放,以便其他任務使用這個進程或線程,這種方式的好處在于可以減少所需要的原生的進程或線程數,并且由于操作系統不需要做進程或線程的切換,而是自行來實現任務的切換,其成本會較操作系統切換低,這種方式也就是本文的重點,Coroutine方式,又稱協程方式,這種方式在目前的大多數語言中都有支持。
各種語言在實現Coroutine方式的支持時,多數都采用了Actor
Model來實現,Actor Model簡單來說就是每個任務就是一個Actor,Actor之間通過消息傳遞的方式來進行交互,而不采用共享的方式,Actor可以看做是一個輕量級的進程或線程,通常在一臺4G內存的機器上,創建幾十萬個Actor是毫無問題的,Actor支持Continuations,即對于如下代碼:
Actor
act方法
進行一些處理
創建并執行另外一個Actor
通過消息box阻塞獲取另一個Actor執行的結果
繼續基于這個結果進行一些處理
在支持Continuations的情況下,可以做到消息box阻塞時并不是進程或線程級的阻塞,而只是Actor本身的阻塞,并且在阻塞時可將所占據的進程或線程釋放給其他Actor使用,Actor Model實現最典型的就是erLang了。
對于Java應用而言,傳統方式下為了支持高并發,由于一個線程只能用于處理一個請求,即使是線程中其實有很多IO中斷、鎖等待也同樣如此,因此通常的做法是通過啟動很多的線程來支撐高并發,但當線程過多時,就造成了CPU需要消耗不少的時間在線程的切換上,從而出現瓶頸,按照上面對Coroutine的描述,Coroutine的方式理論上而言能夠大幅度的提升Java應用所能支撐的并發量。
2. 在Java中使用Coroutine
Java尚不能從語言層次上支持Coroutine,也許Java 7能夠支持,目前已經有了一個測試性質的版本,在Sun JDK 7尚未正式發布的情況下如希望在Java中使用Coroutine,Scala或Kilim是可以做的選擇,來分別看下。
Scala是現在很火的語言之一,Twitter消息中間件基于Scala編寫更是讓Scala名聲鵲起,除了在語法方面所做出的改進外,其中一個最突出的特色就是Scala Actor,Scala Actor是Scala用于實現Coroutine的方式,先來具體看看Scala在Coroutine支持實現的關鍵概念。
l
Actor
Scala Actor可以看做是一個輕量級的Java
Thread,其使用方式和Java Thread基本也一致,繼承Actor,實現act方法,啟動時也是調用start方法,但和Java Thread不同的是,Scala Actor可等待外部發送過來的消息,并進行相應的處理。
l
Actor的消息發送機制
發送消息到Actor的方式有異步、Future兩種方式,異步即指發送后立即返回,繼續后續流程,使用異步發送的方法為:actor ! MessageObject,其中消息對象可以為任何類型,并且Scala還支持一種稱為case Object的對象,便于在收到消息時做pattern matching。
Future方式是指阻塞線程等待消息處理的結果,使用Future方式發送的方法為:actor !! MessageObject,在等待結果方面,Scala支持不限時等待,限時等待以及等待多個Future或個別Future完成,使用方法如下:
val ft=actor !!
MessageObject // Future方式發送消息
val result=ft()
// 不限時等待
val
results=awaitAll(500,ft1,ft2,ft3) // 限時等待多個Future返回值
val results=awaitEither(ft1,ft2)
// 等待個別future完成
接收消息方通過reply方法返回Future方式所等待的結果。
l
Actor的消息接收機制
當代碼處于Actor的act方法或Actor環境(例如為Actor的act方法調用過來的代碼)中時,可通過以下兩種方式來接收外部發送給Actor的消息:一為receive方式,二為react方式,代碼例子如下:
receive{
case MessageObject(args) =>
doHandle(args)
}
react{
case MessageObject(args) =>
doHandle(args)
}
receive和react的差別在于receive需要阻塞當前Java線程,react則僅為阻塞當前Actor,但并不會阻塞Java線程,因此react模式更適合于充分發揮coroutine帶來的原生線程數減少的好處,但react模式有個缺點是react不支持返回。
receive和react都有限時接收的方式,方法為:receiveWithin(timeout)、reactWithin(timeout),超時的消息通過case TIMEOUT的方式來接收。
下面來看基于Scala Actor實現并發處理請求的一個簡單例子。
class
Processor extends Actor{
def
act(){
loop{
react{
case
command:String => doHandle(command)
}
}
}
def
doHandle(command:String){
//
業務邏輯處理
}
}
當需要并發執行此Processor時,在處理時需要的僅為調用以下代碼:
val
processor=new Processor()
processor.start
processor ! “Hello”
從以上說明來看,要在舊的應用中使用Scala還是會有一些成本,部署運行則非常簡單,在Scala IDE Plugin編寫了上面的scala代碼后,即生成了java class文件,可直接在jvm中運行。
Kilim是由劍橋的兩位博士開發的一個用于在Java中使用Coroutine的框架,Kilim基于Java語法,先來看看Kilim中的關鍵概念。
l
Task
可以認為Task就是Actor,使用方式和Java
Thread基本相同,只是繼承的為Task,覆蓋的為execute方法,啟動也是調用task的start方法。
l
Task的消息發送機制
Kilim中通過Mailbox對象來發送消息,Mailbox的基本原則為可以有多個消息發送者,但只能有一個消息接收者,發送的方式有同步發送、異步發送和阻塞線程方式的同步發送三種,同步發送是指保證一定能將消息放入發送隊列中,如當前發送隊列已滿,則等待到可用為止,阻塞的為當前Task;異步發送則是嘗試將消息放入發送隊列一次,如失敗,則返回false,成功則返回true,不會阻塞Task;阻塞線程方式的同步發送是指阻塞當前線程,并保證將消息發送給接收者,三種方式的使用方法如下:
mailbox.put(messageObject);
// 同步發送
mailbox.putnb(messageObject);
// 異步發送
mailbox.putb(messageObject);
// 阻塞線程方式發送
l
Task的消息接收機制
Kilim中通過Mailbox來接收消息,接收消息的方式有同步接收、異步接收以及阻塞線程方式的同步接收三種,同步接收是指阻塞當前Task,直到接收到消息才返回;異步接收是指立刻返回Mailbox中的消息,有就返回,沒有則返回null;阻塞線程方式的同步接收是指阻塞當前線程,直到接收到消息才返回,使用方法如下:
mailbox.get();
// 同步接收,傳入long參數表示等待的超時時間,單位為毫秒
mailbox.getnb();
// 異步接收,立刻返回
mailbox.getb();
// 阻塞線程方式接收
下面來看基于Kilim實現并發處理請求的一個簡單例子。
public
class Processor extends Task{
private
String command;
public
Processor(String command){
this.command=command;
}
public void execute() throws Pausable,Exception{
// 業務邏輯處理
}
}
在處理時,僅需調用以下代碼:
Task
processor=new Processor(command);
processor.start();
從以上代碼來看,Kilim對于Java人員而言學習門檻更低,但對于需要采用coroutine方式執行的代碼在編譯完畢后,還需要采用Kilim的kilim.tools.Weaver類來對這些已編譯出來的class文件做織入,運行時需要用織入后生成的class文件才行,織入的方法為:java kilim.tools.Weaver –d [織入后生成的class文件存放的目錄] [需要織入的類文件所在的目錄],目前尚沒有Kilim IDE
Plugin可用,因此weaver這個過程還是比較的麻煩。
上面對Scala和Kilim做了一個簡單的介紹,在實際Java應用中使用Coroutine時,通常會出現以下幾種典型的更復雜的使用場景,由于Actor模式本身就是異步的,因此其天然對異步場景支持的就非常好,更多的問題會出現在以下幾個同步場景上,分別來看看基于Scala、Kilim如何來實現。
l
Actor同步調用
Actor同步調用是經常會出現的使用場景,主要為Actor發送消息給其他的Actor處理,并等待結果才能繼續。
n
Scala
對于這種情況,在Scala 2.7.7中,目前可采取的為以下兩種方法:
一種為通過Future方式發送消息來實現:
class
Processor(command:String) extends Actor{
def act(){
val actor=new
NetSenderActor()
val ft=actor !! command
println(ft())
}
}
class
NetSenderActor extends Actor{
def act(){
case command:String => {
reply(“received
command:”+command)
}
}
}
第二種為通過receive的方式來實現:
class
Processor(command:String) extends Actor{
def act(){
val actor=new
NetSenderActor()
actor ! command
var senderResult=””
receive{
case result:String => {
senderResult=result
}
}
println(senderResult)
}
}
class
NetSenderActor extends Actor{
def act(){
case command:String => {
sender ! (“received
command:”+command)
}
}
}
但這兩種方式其實都不好,因為這兩種方式都會造成當前Actor的線程阻塞,這也是因為目前Scala版本對continuations尚不支持的原因,Scala 2.8版本將提供continuations的支持,希望到時能有不需要阻塞Actor線程實現上述需求的方法。
還有一種常見的場景是Actor調一段普通的Scala類,然后那個類中進行了一些處理,并調用了其他Actor,此時在該類中如需要等待Actor的返回結果,也可使用上面兩種方法。
n
Kilim
在Kilim中要實現Task之間的同步調用非常簡單,代碼如下:
public class
TaskA extends Task{
public void execute() throws
Pausable,Exception{
Mailbox<Object>
result=new Mailbox<Object>();
Task task=new TaskB(result);
task.start();
Object
resultObject=result.get();
System.out.println(resultObject);
}
}
public class
TaskB extends Task{
private Mailbox<Object> result;
public TaskB(Mailbox<Object> result){
this.result=result;
}
public void execute() throws Pausable,Exception{
result.put(“result
from TaskB”);
}
}
Kilim的Mailbox.get并不會阻塞線程,因此這種方式是完全滿足需求的。
l
普通Java代碼同步調用Actor
由于已有的應用是普通的Java代碼,經常會出現這樣的場景,就是希望實現在這些Java代碼中同步的調用Actor,并等待Actor的返回結果,但由于Scala和Kilim都強調首先必須是在Actor或Task的環境下才行,因此此場景更佳的方式應為Scala Actor(Kilim Task) à Java Code à Scala Actor(Kilim Task),這種場景在對已有的應用中會是最常出現的,來看看在Scala和Kilim中如何應對這樣的需求。
n
Scala
目前Scala中如希望在Java
Code中調用Scala Actor,并等待其返回結果,暫時還沒辦法,做法只能改為從Java Code中去調一個Scala的Object,然后在這個Object中調用Actor,并借助上面提到的receive或future的方法來獲取返回值,最后將這個返回值返回Java Code。
n
Kilim
目前Kilim中如希望實現上面的需求,其實非常簡單,只需要在Java Code的方法上加上Throw Pausable,然后通過mailbox.get來等待Kilim Task返回的結果即可,在Kilim中只要調用棧上的每個方法都有Throw Pausable,就可在這些方法上做等待返回這類的同步操作。
從上面這兩個最常見的需求來看,無疑Kilim更符合需求,但要注意的是對于Kilim而言,如果出現Task à nonpausable method à pausable method這樣的狀況時,pausable method中如果想執行阻塞當前Task的操作,是無法做到的,只能改造成Task (在mailbox上做等待,并傳遞mailbox給后續步驟) à nonpausable method (傳遞mailbox) à pausable method (將邏輯轉為放入一個Task中,并將返回值放入傳遞過來的mailbox),這種狀況在面對spring aop、反射調用等現象時就會出現了,目前kilim 0.6的版本尚未提供更透明的使用方法,不過據kilim作者提供的一個試用版本,其中已經有了對于反射調用的透明化的支持,暫時在目前只能采用上述方法,遷移成本相對較大,也許以后的kilim版本會考慮這樣的場景,提供相應的方法來降低遷移的成本。
3. 性能、所能支撐的并發量對比
在對Scala、Kilim有了這些了解后,來具體看看采用Scala、Kilim后與傳統Java方式在性能、所能支撐的并發量上的對比。
l
測試模型
采用一個比較簡單的模型進行測試,具體為有4個線程,這4個線程分別接收到了一定數量的請求,每個請求需要交給另外一個線程去執行,這個線程所做的動作為循環10次獲取另外一個線程的執行結果,此執行線程所做的動作為循環1000次拼接一個字符串,然后返回。
l
實現代碼
由于目前Scala版本對Continuation支持不夠好,但上面的場景中又有此類需求,所以導致Scala版本的代碼寫的比較麻煩一些。
實現代碼以及可運行的環境請從此處下載:
http://www.bluedavy.com/open/benchmark.zip
l
結果對比
測試機器為一臺4核的linux機器。
TPS的對比結果如下:

Load的對比結果如下:

從上面的測試結果來看,在這個benchmark的場景中,基于Kilim和Scala實現的Coroutine版本在隨著請求數增長的情況下load的增長幅度都比純粹的Java版本低很多,Kilim版本表現尤其突出,在TPS方面,由于目前Scala版本對Continuation支持的不好,因此在這個測試場景中有點吃虧,表現反而最差,經過上面的測試可以看到,基于Coroutine版本可以以同樣的load或更低的load來支撐更高的TPS。
到此為止,基本上對Java中使用Coroutine的相關知識做了一個介紹,總結而言,采用Coroutine方式可以很好的繞開需要啟動太多線程來支撐高并發出現的瓶頸,提高Java應用所能支撐的并發量,但在開發模式上也會帶來變化,并且需要特別注意不能造成線程被阻塞的現象,從開發易用和透明遷移現有Java應用兩個角度而言目前Coroutine方式還有很多不足,但相信隨著越來越多的人在Java中使用Coroutine,其易用性必然是能夠得到提升的。
4. 參考資料
1.
http://en.wikipedia.org/wiki/Computer_multitasking
2.
http://en.wikipedia.org/wiki/Coroutine
3.
http://en.wikipedia.org/wiki/Actor_model
4.
http://en.wikipedia.org/wiki/Continuation
5.
http://lamp.epfl.ch/~phaller/doc/haller07coord.pdf
6.
http://www.scala-lang.org/sites/default/files/odersky/jmlc06.pdf
7.
http://www.malhar.net/sriram/kilim/kilim_ecoop08.pdf
8.
http://lamp.epfl.ch/~phaller/doc/ScalaActors.pdf