在上一篇中介紹了SolrCloud的第一個模塊---構建管理solr集群狀態信息的zookeeper集群。當我們在solr服務器啟動時擁有了這樣一個Zookeeper集群后,顯然我們需要連接到Zookeeper集群的方便手段,在這一篇中我將對Zookeeper客戶端相關的各個封裝類進行分析。
SolrZkClient類是Solr服務器用來與Zookeeper集群進行通信的接口類,它包含的主要組件有:
private ConnectionManager connManager;
private volatile SolrZooKeeper keeper;
private ZkCmdExecutor zkCmdExecutor = new ZkCmdExecutor();
其中ConnectionManager是Watcher的實現類,主要負責對客戶端與Zookeeper集群之間連接的狀態變化信息進行響應,關于Watcher的詳細介紹,可以參考http://zookeeper.apache.org/doc/trunk/zookeeperProgrammers.html#ch_zkWatches,
SolrZooKeeper類是一個包裝類,沒有實際意義,ZkCmdExecutor類是負責在連接失敗的情況下,重試某種操作特定次數,具體的操作是ZkOperation這個抽象類的具體實現子類,其execute方法中包含了具體操作步驟,這些操作包括新建一個Znode節點,讀取Znode節點數據,創建Znode路徑,刪除Znode節點等Zookeeper操作。
首先來看它的構造函數,先創建ConnectionManager對象來響應兩端之間的狀態變化信息,然后ZkClientConnectionStrategy類是一個連接策略抽象類,它包含連接和重連兩種策略,并且采用模板方法模式,具體的實現是通過靜態累不類ZkUpdate來實現的,DefaultConnectionStrategy是它的一個實現子類,它覆寫了connect和reconnect兩個連接策略方法。
public SolrZkClient(String zkServerAddress, int zkClientTimeout,
ZkClientConnectionStrategy strat, final OnReconnect onReconnect, int clientConnectTimeout) throws InterruptedException,
TimeoutException, IOException {
connManager = new ConnectionManager("ZooKeeperConnection Watcher:"
+ zkServerAddress, this, zkServerAddress, zkClientTimeout, strat, onReconnect);
strat.connect(zkServerAddress, zkClientTimeout, connManager,
new ZkUpdate() {
@Override
public void update(SolrZooKeeper zooKeeper) {
SolrZooKeeper oldKeeper = keeper;
keeper = zooKeeper;
if (oldKeeper != null) {
try {
oldKeeper.close();
} catch (InterruptedException e) {
// Restore the interrupted status
Thread.currentThread().interrupt();
log.error("", e);
throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR,
"", e);
}
}
}
});
connManager.waitForConnected(clientConnectTimeout);
numOpens.incrementAndGet();
}
值得注意的是,構造函數中生成的ZkUpdate匿名類對象,它的update方法會被調用,
在這個方法里,會首先將已有的老的SolrZooKeeperg關閉掉,然后放置上一個新的SolrZooKeeper。做好這些準備工作以后,就會去連接Zookeeper服務器集群,
connManager.waitForConnected(clientConnectTimeout);//連接zk服務器集群,默認30秒超時時間
其實具體的連接動作是new SolrZooKeeper(serverAddress, timeout, watcher)引發的,上面那句代碼只是在等待指定時間,看是否已經連接上。
如果連接Zookeeper服務器集群成功,那么就可以進行Zookeeper的常規操作了:
1) 是否已經連接
public boolean isConnected() {
return keeper != null && keeper.getState() == ZooKeeper.States.CONNECTED;
}
2) 是否存在某個路徑的Znode
public Stat exists(final String path, final Watcher watcher, boolean retryOnConnLoss) throws KeeperException, InterruptedException {
if (retryOnConnLoss) {
return zkCmdExecutor.retryOperation(new ZkOperation() {
@Override
public Stat execute() throws KeeperException, InterruptedException {
return keeper.exists(path, watcher);
}
});
} else {
return keeper.exists(path, watcher);
}
}
3) 創建一個Znode節點
public String create(final String path, final byte data[], final List<ACL> acl, final CreateMode createMode, boolean retryOnConnLoss) throws KeeperException, InterruptedException {
if (retryOnConnLoss) {
return zkCmdExecutor.retryOperation(new ZkOperation() {
@Override
public String execute() throws KeeperException, InterruptedException {
return keeper.create(path, data, acl, createMode);
}
});
} else {
return keeper.create(path, data, acl, createMode);
}
}
4) 獲取指定路徑下的孩子Znode節點
public List<String> getChildren(final String path, final Watcher watcher, boolean retryOnConnLoss) throws KeeperException, InterruptedException {
if (retryOnConnLoss) {
return zkCmdExecutor.retryOperation(new ZkOperation() {
@Override
public List<String> execute() throws KeeperException, InterruptedException {
return keeper.getChildren(path, watcher);
}
});
} else {
return keeper.getChildren(path, watcher);
}
}
5) 獲取指定Znode上附加的數據
public byte[] getData(final String path, final Watcher watcher, final Stat stat, boolean retryOnConnLoss) throws KeeperException, InterruptedException {
if (retryOnConnLoss) {
return zkCmdExecutor.retryOperation(new ZkOperation() {
@Override
public byte[] execute() throws KeeperException, InterruptedException {
return keeper.getData(path, watcher, stat);
}
});
} else {
return keeper.getData(path, watcher, stat);
}
}
6) 在指定Znode上設置數據
public Stat setData(final String path, final byte data[], final int version, boolean retryOnConnLoss) throws KeeperException, InterruptedException {
if (retryOnConnLoss) {
return zkCmdExecutor.retryOperation(new ZkOperation() {
@Override
public Stat execute() throws KeeperException, InterruptedException {
return keeper.setData(path, data, version);
}
});
} else {
return keeper.setData(path, data, version);
}
}
7) 創建路徑

public void makePath(String path, byte[] data, CreateMode createMode, Watcher watcher, boolean failOnExists, boolean retryOnConnLoss) throws KeeperException, InterruptedException {
if (log.isInfoEnabled()) {
log.info("makePath: " + path);
}
boolean retry = true;
if (path.startsWith("/")) {
path = path.substring(1, path.length());
}
String[] paths = path.split("/");
StringBuilder sbPath = new StringBuilder();
for (int i = 0; i < paths.length; i++) {
byte[] bytes = null;
String pathPiece = paths[i];
sbPath.append("/" + pathPiece);
final String currentPath = sbPath.toString();
Object exists = exists(currentPath, watcher, retryOnConnLoss);
if (exists == null || ((i == paths.length -1) && failOnExists)) {
CreateMode mode = CreateMode.PERSISTENT;
if (i == paths.length - 1) {
mode = createMode;
bytes = data;
if (!retryOnConnLoss) retry = false;
}
try {
if (retry) {
final CreateMode finalMode = mode;
final byte[] finalBytes = bytes;
zkCmdExecutor.retryOperation(new ZkOperation() {
@Override
public Object execute() throws KeeperException, InterruptedException {
keeper.create(currentPath, finalBytes, ZooDefs.Ids.OPEN_ACL_UNSAFE, finalMode);
return null;
}
});
} else {
keeper.create(currentPath, bytes, ZooDefs.Ids.OPEN_ACL_UNSAFE, mode);
}
} catch (NodeExistsException e) {
if (!failOnExists) {
// TODO: version ? for now, don't worry about race
setData(currentPath, data, -1, retryOnConnLoss);
// set new watch
exists(currentPath, watcher, retryOnConnLoss);
return;
}
// ignore unless it's the last node in the path
if (i == paths.length - 1) {
throw e;
}
}
if(i == paths.length -1) {
// set new watch
exists(currentPath, watcher, retryOnConnLoss);
}
} else if (i == paths.length - 1) {
// TODO: version ? for now, don't worry about race
setData(currentPath, data, -1, retryOnConnLoss);
// set new watch
exists(currentPath, watcher, retryOnConnLoss);
}
}
}

8) 刪除指定Znode
public void delete(final String path, final int version, boolean retryOnConnLoss) throws InterruptedException, KeeperException {
if (retryOnConnLoss) {
zkCmdExecutor.retryOperation(new ZkOperation() {
@Override
public Stat execute() throws KeeperException, InterruptedException {
keeper.delete(path, version);
return null;
}
});
} else {
keeper.delete(path, version);
}
}
我們再回過頭來看看ConnectionManager類是如何響應兩端的連接狀態信息的變化的,它最重要的方法是process方法,當它被觸發回調時,會從WatchedEvent參數中得到事件的各種狀態信息,比如連接成功,會話過期(此時需要進行重連),連接斷開等。

public synchronized void process(WatchedEvent event) {
if (log.isInfoEnabled()) {
log.info("Watcher " + this + " name:" + name + " got event " + event + " path:" + event.getPath() + " type:" + event.getType());
}
state = event.getState();
if (state == KeeperState.SyncConnected) {
connected = true;
clientConnected.countDown();
} else if (state == KeeperState.Expired) {
connected = false;
log.info("Attempting to reconnect to recover relationship with ZooKeeper...");
//嘗試重新連接zk服務器
try {
connectionStrategy.reconnect(zkServerAddress, zkClientTimeout, this,
new ZkClientConnectionStrategy.ZkUpdate() {
@Override
public void update(SolrZooKeeper keeper) throws InterruptedException, TimeoutException, IOException {
synchronized (connectionStrategy) {
waitForConnected(SolrZkClient.DEFAULT_CLIENT_CONNECT_TIMEOUT);
client.updateKeeper(keeper);
if (onReconnect != null) {
onReconnect.command();
}
synchronized (ConnectionManager.this) {
ConnectionManager.this.connected = true;
}
}
}
});
} catch (Exception e) {
SolrException.log(log, "", e);
}
log.info("Connected:" + connected);
} else if (state == KeeperState.Disconnected) {
connected = false;
} else {
connected = false;
}
notifyAll();
}
