xmemcached發(fā)布
1.0-beta,從0.60直接到1.0-beta,主要改進如下:
1、支持更多協(xié)議,在已有協(xié)議支持的基礎上添加了append、prepend、gets、批量gets、cas協(xié)議的支持,具體請查看XMemcachedClient類的實例方法。重點是cas操作,下文將詳細描述下。
2、memcached分布支持,支持連接多個memcached server,支持簡單的余數(shù)分布和一致性哈希分布。
3、0.60版本以來的bug修復。
memcached 1.2.4之后開始支持cas協(xié)議,該協(xié)議存儲數(shù)據(jù)同時發(fā)送一個版本號,只有當這個版本號與memcached server上該key的最新版本一致時才更新成功,否則返回EXISTS,版本號的獲取需要通過gets協(xié)議獲得,cas全稱就是compare and set,如果對hibernate樂觀鎖和java.util.concurrent.atomic包都比較熟悉的話這個概念應該很了解了。xmemcached 1.0-beta開始支持cas協(xié)議,看例子:
XMemcachedClient client = new XMemcachedClient();
client.addServer("localhost",11211);
client.set("a", 0, 1); //設置a為1
GetsResponse result = client.gets("a");
long cas = result.getCas(); //獲取當前cas
//嘗試更新a成2
if (!client.cas("a", 0, 2, cas))
System.err.println("cas error");
XMemcachedClient.cas(final String key, final int exp, Object value, long cas)將嘗試更新key的值到value,如果失敗就返回false。這樣搞好像很麻煩,需要先gets獲取cas值,然后再調用cas方法更新,因此XMemcached提供了一個包裝類可以幫你搞定這兩步,并且提供重試機制:
/**
* 合并gets和cas,利用CASOperation
*/
client.cas("a", 0, new CASOperation() {
@Override
public int getMaxTries() {
return 10;
}
@Override
public Object getNewValue(long currentCAS, Object currentValue) {
System.out.println("current value " + currentValue);
return 2;
}
});
通過
CASOperation,你只要實現(xiàn)兩個方法即可,getMaxTries返回最大重試次數(shù),超過這個次數(shù)還沒有更新成功就拋出TimeoutException;getNewValue方法返回依據(jù)當前cas和緩存值,你希望設置的更新值。看一個cas更詳細的例子,開100個線程遞增緩沖中的變量a,采用cas才能保證最后a會等于100:
import java.util.concurrent.CountDownLatch;
import net.rubyeye.xmemcached.CASOperation;
import net.rubyeye.xmemcached.XMemcachedClient;
/**
* 測試CAS
* @author dennis
*/
class CASThread extends Thread {
private XMemcachedClient mc;
private CountDownLatch cd;
public CASThread(XMemcachedClient mc, CountDownLatch cdl) {
super();
this.mc = mc;
this.cd = cdl;
}
public void run() {
try {
if (mc.cas("a", 0, new CASOperation() {
@Override
public int getMaxTries() {
return 50;
}
@Override
public Object getNewValue(long currentCAS, Object currentValue) {
System.out.println("currentValue=" + currentValue
+ ",currentCAS=" + currentCAS);
return ((Integer) currentValue).intValue() + 1;
}
}))
this.cd.countDown();
} catch (Exception e) {
e.printStackTrace();
}
}
}
public class CASTest {
static int NUM = 100;
public static void main(String[] args) throws Exception {
XMemcachedClient mc = new XMemcachedClient();
mc.addServer("192.168.222.100", 11211);
// 設置初始值為0
mc.set("a", 0, 0);
CountDownLatch cdl = new CountDownLatch(NUM);
// 開NUM個線程遞增變量a
for (int i = 0; i < NUM; i++)
new CASThread(mc, cdl).start();
cdl.await();
// 打印結果,最后結果應該為NUM
System.out.println("result=" + mc.get("a"));
mc.shutdown();
}
}
最高重試次數(shù)設置成了50,觀察輸出你就會知道cas沖突在高并發(fā)下非常頻繁,這個操作應當慎用。
說完cas,我們再來看下xmemcached對分布的支持。
1、如何添加多個memcached server?
通過XMemcachdClient.addServer(String ip,int port)方法,
XMemcachedClient mc = new XMemcachedClient();
mc.addServer(ip1, port1);
mc.addServer(ip2, port2);
mc.addServer(ip3, port3);
mc.addServer(ip4, port3);
2、怎么分布?
在添加了>=2個memcached server后,對
XMemcachdClient的存儲、刪除等操作都將默認根據(jù)key的哈希值模連接數(shù)的余數(shù)做分布,這也是spymemcached默認的分布算法。這個算法簡單快速,然而在添加或者移除memcached server后,緩存會大面積失效需要重組,這個代價太高,因此還有所謂Consistent Hashing算法,通過將memcached節(jié)點分布在一個0-2^128-1的環(huán)上,發(fā)送數(shù)據(jù)到某個節(jié)點經(jīng)過的跳躍次數(shù)可以縮減到O(lgn)次,并且在添加或者移除節(jié)點時最大限度的降低影響,這個算法的思想其實來源于p2p網(wǎng)絡的路由算法,不過路由算法比這個復雜多了,畢竟memcached的
分布是在客戶端,因此不需要節(jié)點之間的通訊和路由表的存儲更新等。這個算法在java上的實現(xiàn)可以通過TreeMap紅黑樹,具體可以參考
這里和
這里。
在xmemcached啟動Consistent Hashing如下:
XMemcachedClient client = new XMemcachedClient(new KetamaMemcachedSessionLocator(HashAlgorithm.CRC32_HASH));
client.addServer(ip, 12000);
client.addServer(ip, 12001);
client.addServer(ip, 11211);
client.addServer(ip, 12003);
client.addServer(ip, 12004);
散列函數(shù)采用CRC32,你也可以采用其他散列函數(shù),具體看場景測試而定,散列函數(shù)決定了你的查找節(jié)點效率和緩存重新分布的均衡程度。
在完成
1.0-beta這個里程碑版本后,xmemcached將集中于穩(wěn)定性方面的測試和性能優(yōu)化。歡迎提交測試報告和建議,我的email killme2008@gmail.com