<rt id="bn8ez"></rt>
<label id="bn8ez"></label>

  • <span id="bn8ez"></span>

    <label id="bn8ez"><meter id="bn8ez"></meter></label>

    posts - 495,comments - 227,trackbacks - 0
    http://www.cnblogs.com/phinecos/archive/2012/02/16/2354834.html

    上一篇中介紹了SolrCloud的第一個模塊---構建管理solr集群狀態信息的zookeeper集群。當我們在solr服務器啟動時擁有了這樣一個Zookeeper集群后,顯然我們需要連接到Zookeeper集群的方便手段,在這一篇中我將對Zookeeper客戶端相關的各個封裝類進行分析。

    SolrZkClient類是Solr服務器用來與Zookeeper集群進行通信的接口類,它包含的主要組件有:

      private ConnectionManager connManager;
      private volatile SolrZooKeeper keeper;
      private ZkCmdExecutor zkCmdExecutor = new ZkCmdExecutor();

        其中ConnectionManagerWatcher的實現類,主要負責對客戶端與Zookeeper集群之間連接的狀態變化信息進行響應,關于Watcher的詳細介紹,可以參考http://zookeeper.apache.org/doc/trunk/zookeeperProgrammers.html#ch_zkWatches

    SolrZooKeeper類是一個包裝類,沒有實際意義,ZkCmdExecutor類是負責在連接失敗的情況下,重試某種操作特定次數,具體的操作是ZkOperation這個抽象類的具體實現子類,其execute方法中包含了具體操作步驟,這些操作包括新建一個Znode節點,讀取Znode節點數據,創建Znode路徑,刪除Znode節點等Zookeeper操作。

    首先來看它的構造函數,先創建ConnectionManager對象來響應兩端之間的狀態變化信息,然后ZkClientConnectionStrategy類是一個連接策略抽象類,它包含連接和重連兩種策略,并且采用模板方法模式,具體的實現是通過靜態累不類ZkUpdate來實現的,DefaultConnectionStrategy是它的一個實現子類,它覆寫了connectreconnect兩個連接策略方法。

    復制代碼
      public SolrZkClient(String zkServerAddress, int zkClientTimeout,
          ZkClientConnectionStrategy strat, final OnReconnect onReconnect, int clientConnectTimeout) throws InterruptedException,
          TimeoutException, IOException {
        connManager = new ConnectionManager("ZooKeeperConnection Watcher:"
            + zkServerAddress, this, zkServerAddress, zkClientTimeout, strat, onReconnect);
        strat.connect(zkServerAddress, zkClientTimeout, connManager,
            new ZkUpdate() {
              @Override
              public void update(SolrZooKeeper zooKeeper) {
                SolrZooKeeper oldKeeper = keeper;
                keeper = zooKeeper;
                if (oldKeeper != null) {
                  try {
                    oldKeeper.close();
                  } catch (InterruptedException e) {
                    // Restore the interrupted status
                    Thread.currentThread().interrupt();
                    log.error("", e);
                    throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR,
                        "", e);
                  }
                }
              }
            });
        connManager.waitForConnected(clientConnectTimeout);
        numOpens.incrementAndGet();
      }
    復制代碼

    值得注意的是,構造函數中生成的ZkUpdate匿名類對象,它的update方法會被調用,

    在這個方法里,會首先將已有的老的SolrZooKeeperg關閉掉,然后放置上一個新的SolrZooKeeper。做好這些準備工作以后,就會去連接Zookeeper服務器集群,

    connManager.waitForConnected(clientConnectTimeout);//連接zk服務器集群,默認30秒超時時間

    其實具體的連接動作是new SolrZooKeeper(serverAddress, timeout, watcher)引發的,上面那句代碼只是在等待指定時間,看是否已經連接上。

    如果連接Zookeeper服務器集群成功,那么就可以進行Zookeeper的常規操作了:

    1) 是否已經連接

      public boolean isConnected() {
        return keeper != null && keeper.getState() == ZooKeeper.States.CONNECTED;
      }

    2) 是否存在某個路徑的Znode

    復制代碼
      public Stat exists(final String path, final Watcher watcher, boolean retryOnConnLoss) throws KeeperException, InterruptedException {
        if (retryOnConnLoss) {
          return zkCmdExecutor.retryOperation(new ZkOperation() {
            @Override
            public Stat execute() throws KeeperException, InterruptedException {
              return keeper.exists(path, watcher);
            }
          });
        } else {
          return keeper.exists(path, watcher);
        }
      }
    復制代碼

    3) 創建一個Znode節點

    復制代碼
      public String create(final String path, final byte data[], final List<ACL> acl, final CreateMode createMode, boolean retryOnConnLoss) throws KeeperException, InterruptedException {
        if (retryOnConnLoss) {
          return zkCmdExecutor.retryOperation(new ZkOperation() {
            @Override
            public String execute() throws KeeperException, InterruptedException {
              return keeper.create(path, data, acl, createMode);
            }
          });
        } else {
          return keeper.create(path, data, acl, createMode);
        }
      }
    復制代碼

    4) 獲取指定路徑下的孩子Znode節點

    復制代碼
      public List<String> getChildren(final String path, final Watcher watcher, boolean retryOnConnLoss) throws KeeperException, InterruptedException {
        if (retryOnConnLoss) {
          return zkCmdExecutor.retryOperation(new ZkOperation() {
            @Override
            public List<String> execute() throws KeeperException, InterruptedException {
              return keeper.getChildren(path, watcher);
            }
          });
        } else {
          return keeper.getChildren(path, watcher);
        }
      }
    復制代碼

    5) 獲取指定Znode上附加的數據

    復制代碼
      public byte[] getData(final String path, final Watcher watcher, final Stat stat, boolean retryOnConnLoss) throws KeeperException, InterruptedException {
        if (retryOnConnLoss) {
          return zkCmdExecutor.retryOperation(new ZkOperation() {
            @Override
            public byte[] execute() throws KeeperException, InterruptedException {
              return keeper.getData(path, watcher, stat);
            }
          });
        } else {
          return keeper.getData(path, watcher, stat);
        }
      }
    復制代碼

    6) 在指定Znode上設置數據

    復制代碼
      public Stat setData(final String path, final byte data[], final int version, boolean retryOnConnLoss) throws KeeperException, InterruptedException {
        if (retryOnConnLoss) {
          return zkCmdExecutor.retryOperation(new ZkOperation() {
            @Override
            public Stat execute() throws KeeperException, InterruptedException {
              return keeper.setData(path, data, version);
            }
          });
        } else {
          return keeper.setData(path, data, version);
        }
      }
    復制代碼

    7) 創建路徑

    復制代碼
      public void makePath(String path, byte[] data, CreateMode createMode, Watcher watcher, boolean failOnExists, boolean retryOnConnLoss) throws KeeperException, InterruptedException {
        if (log.isInfoEnabled()) {
          log.info("makePath: " + path);
        }
        boolean retry = true;
        
        if (path.startsWith("/")) {
          path = path.substring(1, path.length());
        }
        String[] paths = path.split("/");
        StringBuilder sbPath = new StringBuilder();
        for (int i = 0; i < paths.length; i++) {
          byte[] bytes = null;
          String pathPiece = paths[i];
          sbPath.append("/" + pathPiece);
          final String currentPath = sbPath.toString();
          Object exists = exists(currentPath, watcher, retryOnConnLoss);
          if (exists == null || ((i == paths.length -1) && failOnExists)) {
            CreateMode mode = CreateMode.PERSISTENT;
            if (i == paths.length - 1) {
              mode = createMode;
              bytes = data;
              if (!retryOnConnLoss) retry = false;
            }
            try {
              if (retry) {
                final CreateMode finalMode = mode;
                final byte[] finalBytes = bytes;
                zkCmdExecutor.retryOperation(new ZkOperation() {
                  @Override
                  public Object execute() throws KeeperException, InterruptedException {
                    keeper.create(currentPath, finalBytes, ZooDefs.Ids.OPEN_ACL_UNSAFE, finalMode);
                    return null;
                  }
                });
              } else {
                keeper.create(currentPath, bytes, ZooDefs.Ids.OPEN_ACL_UNSAFE, mode);
              }
            } catch (NodeExistsException e) {
              
              if (!failOnExists) {
                // TODO: version ? for now, don't worry about race
                setData(currentPath, data, -1, retryOnConnLoss);
                // set new watch
                exists(currentPath, watcher, retryOnConnLoss);
                return;
              }
              
              // ignore unless it's the last node in the path
              if (i == paths.length - 1) {
                throw e;
              }
            }
            if(i == paths.length -1) {
              // set new watch
              exists(currentPath, watcher, retryOnConnLoss);
            }
          } else if (i == paths.length - 1) {
            // TODO: version ? for now, don't worry about race
            setData(currentPath, data, -1, retryOnConnLoss);
            // set new watch
            exists(currentPath, watcher, retryOnConnLoss);
          }
        }
      }
    復制代碼

    8) 刪除指定Znode

    復制代碼
      public void delete(final String path, final int version, boolean retryOnConnLoss) throws InterruptedException, KeeperException {
        if (retryOnConnLoss) {
          zkCmdExecutor.retryOperation(new ZkOperation() {
            @Override
            public Stat execute() throws KeeperException, InterruptedException {
              keeper.delete(path, version);
              return null;
            }
          });
        } else {
          keeper.delete(path, version);
        }
      }
    復制代碼

             我們再回過頭來看看ConnectionManager類是如何響應兩端的連接狀態信息的變化的,它最重要的方法是process方法,當它被觸發回調時,會從WatchedEvent參數中得到事件的各種狀態信息,比如連接成功,會話過期(此時需要進行重連),連接斷開等。

    復制代碼
      public synchronized void process(WatchedEvent event) {
        if (log.isInfoEnabled()) {
          log.info("Watcher " + this + " name:" + name + " got event " + event + " path:" + event.getPath() + " type:" + event.getType());
        }

        state = event.getState();
        if (state == KeeperState.SyncConnected) {
          connected = true;
          clientConnected.countDown();
        } else if (state == KeeperState.Expired) {
          connected = false;
          log.info("Attempting to reconnect to recover relationship with ZooKeeper...");
          //嘗試重新連接zk服務器
          try {
            connectionStrategy.reconnect(zkServerAddress, zkClientTimeout, this,
                new ZkClientConnectionStrategy.ZkUpdate() {
                  @Override
                  public void update(SolrZooKeeper keeper) throws InterruptedException, TimeoutException, IOException {
                    synchronized (connectionStrategy) {
                      waitForConnected(SolrZkClient.DEFAULT_CLIENT_CONNECT_TIMEOUT);
                      client.updateKeeper(keeper);
                      if (onReconnect != null) {
                        onReconnect.command();
                      }
                      synchronized (ConnectionManager.this) {
                        ConnectionManager.this.connected = true;
                      }
                    }
                    
                  }
                });
          } catch (Exception e) {
            SolrException.log(log, "", e);
          }
          log.info("Connected:" + connected);
        } else if (state == KeeperState.Disconnected) {
          connected = false;
        } else {
          connected = false;
        }
        notifyAll();
      }
    復制代碼

     

     

    posted on 2012-07-04 18:41 SIMONE 閱讀(620) 評論(0)  編輯  收藏 所屬分類: solr
    主站蜘蛛池模板: 又硬又粗又长又爽免费看 | 亚洲中文无码亚洲人成影院| 在线免费观看一级片| 有码人妻在线免费看片| 666精品国产精品亚洲| 黑人粗长大战亚洲女2021国产精品成人免费视频| 国产大陆亚洲精品国产| 亚洲韩国—中文字幕| 免费看美女被靠到爽的视频| 久久一区二区免费播放| 亚洲 欧洲 视频 伦小说| 国内精品久久久久久久亚洲| 免费精品国偷自产在线在线| 国产成人精品免费大全| 亚洲av无码片在线观看| 亚洲色WWW成人永久网址| 最近的免费中文字幕视频| 三级黄色免费观看| 亚洲欧美熟妇综合久久久久| 亚洲国产精品一区二区久久hs| 国外成人免费高清激情视频| 久久不见久久见免费视频7| 美女视频黄频a免费大全视频| 亚洲大香伊人蕉在人依线| 亚洲最大AV网站在线观看| 精品国产免费一区二区| 69av免费观看| a级毛片免费在线观看| 亚洲国产午夜精品理论片在线播放 | 国产亚洲av片在线观看16女人| 最近2019中文字幕免费看最新| 人妻丰满熟妇无码区免费| 成人在线免费视频| 亚洲国产精品无码中文lv| 亚洲欧洲日产国码在线观看| 国产av天堂亚洲国产av天堂| 亚洲精品456播放| 日本一道本高清免费| 大地资源在线观看免费高清| 99精品视频在线观看免费专区| 国产啪精品视频网站免费尤物 |