Refer to:http://makemyownlife.iteye.com/blog/1439581
java memcached client源碼,總的代碼量還是很少的 主要是如下兩個類: MemcachedClient.java SockIOPool.java好 先看推薦的測試代碼: Java代碼

/**
* Copyright (c) 2008 Greg Whalin
* All rights reserved.
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the BSD license
*
* This library is distributed in the hope that it will be
* useful, but WITHOUT ANY WARRANTY; without even the implied
* warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR
* PURPOSE.
*
* You should have received a copy of the BSD License along with this
* library.
*
* @author greg whalin <greg@meetup.com>
*/
package com.meetup.memcached.test;
import com.meetup.memcached.*;
import org.apache.log4j.*;
public class TestMemcached {
public static void main(String[] args) {
// memcached should be running on port 11211 but NOT on 11212
BasicConfigurator.configure();
String[] servers = { "localhost:11211"};
SockIOPool pool = SockIOPool.getInstance();
pool.setServers( servers );
pool.setFailover( true );
pool.setInitConn( 10 );
pool.setMinConn( 5 );
pool.setMaxConn( 250 );
pool.setMaintSleep( 30 );
//這是開啟一個nagle 算法。改算法避免網絡中充塞小封包,提高網絡的利用率
pool.setNagle( false );
pool.setSocketTO( 3000 );
pool.setAliveCheck( true );
pool.initialize();
MemcachedClient mcc = new MemcachedClient();
// turn off most memcached client logging:
com.meetup.memcached.Logger.getLogger( MemcachedClient.class.getName() ).setLevel( com.meetup.memcached.Logger.LEVEL_WARN );
for ( int i = 0; i < 10; i++ ) {
boolean success = mcc.set( "" + i, "Hello!" );
String result = (String)mcc.get( "" + i );
System.out.println( String.format( "set( %d ): %s", i, success ) );
System.out.println( String.format( "get( %d ): %s", i, result ) );
}
}
其實 對于我來說 我很想明白的是連接池是如何配置的,關鍵在于 pool.initialize(); 這個方法如何初始化的。 Java代碼

/**
* Initializes the pool.
*/
public void initialize() {
synchronized( this ) {
// check to see if already initialized
if ( initialized
&& ( buckets != null || consistentBuckets != null )
&& ( availPool != null )
&& ( busyPool != null ) ) {
log.error( "++++ trying to initialize an already initialized pool" );
return;
}
// pools
availPool = new HashMap<String,Map<SockIO,Long>>( servers.length * initConn );
busyPool = new HashMap<String,Map<SockIO,Long>>( servers.length * initConn );
deadPool = new IdentityHashMap<SockIO,Integer>();
hostDeadDur = new HashMap<String,Long>();
hostDead = new HashMap<String,Date>();
maxCreate = (poolMultiplier > minConn) ? minConn : minConn / poolMultiplier; // only create up to maxCreate connections at once
if ( log.isDebugEnabled() ) {
log.debug( "++++ initializing pool with following settings:" );
log.debug( "++++ initial size: " + initConn );
log.debug( "++++ min spare : " + minConn );
log.debug( "++++ max spare : " + maxConn );
}
// if servers is not set, or it empty, then
// throw a runtime exception
if ( servers == null || servers.length <= 0 ) {
log.error( "++++ trying to initialize with no servers" );
throw new IllegalStateException( "++++ trying to initialize with no servers" );
}
// initalize our internal hashing structures
if ( this.hashingAlg == CONSISTENT_HASH )
populateConsistentBuckets();
else
populateBuckets();
// mark pool as initialized
this.initialized = true;
// start maint thread
if ( this.maintSleep > 0 )
this.startMaintThread();
}
}
如上代碼流程如下: 1 檢測是否已經被初始化 2 定義可用鏈接 ,繁忙鏈接池 3 判斷是否一致性hash算法 還是普通的算法 4 定義一個后臺線程 ,來維護 好 ,首先來分析下一致性hash算法。 從如下代碼來分析 : Java代碼

if (
this.hashingAlg == CONSISTENT_HASH )
populateConsistentBuckets();
}
Java代碼

/** 將server添加到一致性hash的2的32次 圓環 **/
private void populateConsistentBuckets() {
if ( log.isDebugEnabled() )
log.debug( "++++ initializing internal hashing structure for consistent hashing" );
// store buckets in tree map
this.consistentBuckets = new TreeMap<Long,String>();
MessageDigest md5 = MD5.get();
//得到總的權重
if ( this.totalWeight <= 0 && this.weights != null ) {
for ( int i = 0; i < this.weights.length; i++ )
this.totalWeight += ( this.weights[i] == null ) ? 1 : this.weights[i];
}
else if ( this.weights == null ) {
this.totalWeight = this.servers.length;
}
for ( int i = 0; i < servers.length; i++ ) {
//每臺服務器的權重
int thisWeight = 1;
if ( this.weights != null && this.weights[i] != null ) {
thisWeight = this.weights[i];
}
//有興趣的朋友可以參考平衡Hash 算法的另一個指標是平衡性 (Balance) ,定義如下: 平衡性 平衡性是指哈希的結果能夠盡可能分布到所有的緩沖中去,這樣可以使得所有的緩沖空間都得到利用
//了解決這種情況, consistent hashing 引入了“虛擬節點”的概念,它可以如下定義: “虛擬節點”( virtual node )是實際節點在 hash 空間的復制品( replica ),一實際個節點對應了若干個“虛擬節點”,這個對應個數也成為“復制個數”,“虛擬節點”在 hash 空間中以 hash 值排列。
double factor = Math.floor(((double)(40 * this.servers.length * thisWeight)) / (double)this.totalWeight);
for ( long j = 0; j < factor; j++ ) { //加密規則類似 127.0.0.1_1
byte[] d = md5.digest( ( servers[i] + "-" + j ).getBytes() ); //轉化成16位的字節數組
//16位二進制數組每4位為一組,每組第4個值左移24位,第三個值左移16位,第二個值左移8位,第一個值不移位。進行或運算,得到一個小于2的32 次方的long值
for ( int h = 0 ; h < 4; h++ ) { //因為是16位
Long k = //實際上每個字節進行了運算
((long)(d[3+h*4]&0xFF) << 24)
| ((long)(d[2+h*4]&0xFF) << 16)
| ((long)(d[1+h*4]&0xFF) << 8)
| ((long)(d[0+h*4]&0xFF));
consistentBuckets.put( k, servers[i] );
if ( log.isDebugEnabled() )
log.debug( "++++ added " + servers[i] + " to server bucket" );
}
}
// create initial connections
if ( log.isDebugEnabled() )
log.debug( "+++ creating initial connections (" + initConn + ") for host: " + servers[i] );
//創建鏈接
for ( int j = 0; j < initConn; j++ ) {
SockIO socket = createSocket( servers[i] );
if ( socket == null ) {
log.error( "++++ failed to create connection to: " + servers[i] + " -- only " + j + " created." );
break;
}
//加入socket到連接池 這里慢慢談
addSocketToPool( availPool, servers[i], socket );
if ( log.isDebugEnabled() )
log.debug( "++++ created and added socket: " + socket.toString() + " for host " + servers[i] );
}
}
}
好 比如說 我們調用了 如下代碼: Java代碼

MemcachedClient mcc = new MemcachedClient();
mcc.set("6", 1);
這里key 如何定位到一臺server呢?我先把一致性hash算法的定位方法說下。 Java代碼

//得到定位server的Socket封裝對象
SockIOPool.SockIO sock = pool.getSock( key, hashCode );
Java代碼

//計算出key對應的hash值(md5) ,然后
long bucket = getBucket( key, hashCode );
Java代碼
//得到大于hash的map,因為treemap已經排好序了。調用tailMap可以得到大于等于這個hash的對象 ,然后調用firstKey得到圓環上的hash值
SortedMap<Long,String> tmap =
this.consistentBuckets.tailMap( hv );
return ( tmap.isEmpty() ) ? this.consistentBuckets.firstKey() : tmap.firstKey();
-----------------------------------------------------
Silence, the way to avoid many problems;
Smile, the way to solve many problems;