<rt id="bn8ez"></rt>
<label id="bn8ez"></label>

  • <span id="bn8ez"></span>

    <label id="bn8ez"><meter id="bn8ez"></meter></label>

    jinfeng_wang

    G-G-S,D-D-U!

    BlogJava 首頁 新隨筆 聯系 聚合 管理
      400 Posts :: 0 Stories :: 296 Comments :: 0 Trackbacks
    http://m635674608.iteye.com/blog/2297558


    Java代碼  收藏代碼
    1. BinaryJedisCluster   
    2. public String set(final byte[] key, final byte[] value) {  
    3.   return new JedisClusterCommand<String>(connectionHandler, maxRedirections) {  
    4.     @Override  
    5.     public String execute(Jedis connection) {  
    6.       return connection.set(key, value);  
    7.     }   
    8.   }.runBinary(key);  
    9. }  

       

    Java代碼  收藏代碼
    1. JedisClusterCommand  
    2.  public T runBinary(byte[] key) {  
    3.     if (key == null) {  
    4.       throw new JedisClusterException("No way to dispatch this command to Redis Cluster.");  
    5.     }  
    6.   
    7.     return runWithRetries(key, this.redirections, falsefalse);  
    8.   }  
    9.   
    10.  private T runWithRetries(byte[] key, int redirections, boolean tryRandomNode, boolean asking) {  
    11.     if (redirections <= 0) {  
    12.       throw new JedisClusterMaxRedirectionsException("Too many Cluster redirections?");  
    13.     }  
    14.   
    15.     Jedis connection = null;  
    16.     try {  
    17.   
    18.       if (asking) {  
    19.         // TODO: Pipeline asking with the original command to make it  
    20.         // faster....  
    21.         connection = askConnection.get();  
    22.         connection.asking();  
    23.   
    24.         // if asking success, reset asking flag  
    25.         asking = false;  
    26.       } else {  
    27.         if (tryRandomNode) {  
    28.           connection = connectionHandler.getConnection();  
    29.         } else {  
    30. //獲取 連接  
    31.           connection = connectionHandler.getConnectionFromSlot(  
    32. //獲取槽   
    33. JedisClusterCRC16.getSlot(key));  
    34.         }  
    35.       }  
    36.   
    37.       return execute(connection);  
    38.     } catch (JedisConnectionException jce) {  
    39.       if (tryRandomNode) {  
    40.         // maybe all connection is down  
    41.         throw jce;  
    42.       }  
    43.   
    44.       // release current connection before recursion  
    45.       releaseConnection(connection);  
    46.       connection = null;  
    47.   
    48.       // retry with random connection  
    49.       return runWithRetries(key, redirections - 1true, asking);  
    50.     } catch (JedisRedirectionException jre) {  
    51.       // if MOVED redirection occurred,  
    52.       if (jre instanceof JedisMovedDataException) {  
    53.         // it rebuilds cluster's slot cache  
    54.         // recommended by Redis cluster specification  
    55.         this.connectionHandler.renewSlotCache(connection);  
    56.       }  
    57.   
    58.       // release current connection before recursion or renewing  
    59.       releaseConnection(connection);  
    60.       connection = null;  
    61.   
    62.       if (jre instanceof JedisAskDataException) {  
    63.         asking = true;  
    64.       askConnection.set(this.connectionHandler.getConnectionFromNode(jre.getTargetNode()));  
    65.       } else if (jre instanceof JedisMovedDataException) {  
    66.       } else {  
    67.         throw new JedisClusterException(jre);  
    68.       }  
    69.   
    70.       return runWithRetries(key, redirections - 1false, asking);  
    71.     } finally {  
    72.       releaseConnection(connection);  
    73.     }  
    74.   }  
    75.   
    76.  @Override  
    77.   public Jedis getConnectionFromSlot(int slot) {  
    78.     JedisPool connectionPool = cache.getSlotPool(slot);  
    79.     if (connectionPool != null) {  
    80.       // It can't guaranteed to get valid connection because of node  
    81.       // assignment  
    82.       return connectionPool.getResource();  
    83.     } else {  
    84.       return getConnection();  
    85.     }  
    86.   }  

      

    Java代碼  收藏代碼
    1. public abstract class JedisClusterConnectionHandler {  
    2.   protected final JedisClusterInfoCache cache;  
    3.   
    4.   public JedisClusterConnectionHandler(Set<HostAndPort> nodes,  
    5.                                        final GenericObjectPoolConfig poolConfig, int connectionTimeout, int soTimeout) {  
    6.     this.cache = new JedisClusterInfoCache(poolConfig, connectionTimeout, soTimeout);  
    7.     //通過slot 初始化集群信息  
    8.     initializeSlotsCache(nodes, poolConfig);  
    9.   }  
    10.   
    11.   abstract Jedis getConnection();  
    12.   
    13.   abstract Jedis getConnectionFromSlot(int slot);  
    14.   
    15.   public Jedis getConnectionFromNode(HostAndPort node) {  
    16.     cache.setNodeIfNotExist(node);  
    17.     return cache.getNode(JedisClusterInfoCache.getNodeKey(node)).getResource();  
    18.   }  
    19.   
    20. public class JedisClusterInfoCache {  
    21.   private Map<String, JedisPool> nodes = new HashMap<String, JedisPool>();  
    22.   private Map<Integer, JedisPool> slots = new HashMap<Integer, JedisPool>();  
    23.   
    24.   private final ReentrantReadWriteLock rwl = new ReentrantReadWriteLock();  
    25.   private final Lock r = rwl.readLock();  
    26.   private final Lock w = rwl.writeLock();  
    27.   private final GenericObjectPoolConfig poolConfig;  
    28.   
    29.   private int connectionTimeout;  
    30.   private int soTimeout;  
    31.   
    32.   private static final int MASTER_NODE_INDEX = 2;  
    33.   
    34.   public JedisClusterInfoCache(final GenericObjectPoolConfig poolConfig, int timeout) {  
    35.     this(poolConfig, timeout, timeout);  
    36.   }  
    37.   
    38.   public JedisClusterInfoCache(final GenericObjectPoolConfig poolConfig,  
    39.       final int connectionTimeout, final int soTimeout) {  
    40.     this.poolConfig = poolConfig;  
    41.     this.connectionTimeout = connectionTimeout;  
    42.     this.soTimeout = soTimeout;  
    43.   }  
    44.   
    45.   public void discoverClusterNodesAndSlots(Jedis jedis) {  
    46.     w.lock();  
    47.   
    48.     try {  
    49.       this.nodes.clear();  
    50.       this.slots.clear();  
    51.   
    52.       List<Object> slots = jedis.clusterSlots();  
    53.   
    54.       for (Object slotInfoObj : slots) {  
    55.         List<Object> slotInfo = (List<Object>) slotInfoObj;  
    56.   
    57.         if (slotInfo.size() <= MASTER_NODE_INDEX) {  
    58.           continue;  
    59.         }  
    60.   
    61.         List<Integer> slotNums = getAssignedSlotArray(slotInfo);  
    62.   
    63.         // hostInfos  
    64.         int size = slotInfo.size();  
    65.         for (int i = MASTER_NODE_INDEX; i < size; i++) {  
    66.           List<Object> hostInfos = (List<Object>) slotInfo.get(i);  
    67.           if (hostInfos.size() <= 0) {  
    68.             continue;  
    69.           }  
    70.   
    71.           HostAndPort targetNode = generateHostAndPort(hostInfos);  
    72.           setNodeIfNotExist(targetNode);  
    73.           if (i == MASTER_NODE_INDEX) {  
    74.             assignSlotsToNode(slotNums, targetNode);  
    75.           }  
    76.         }  
    77.       }  
    78.     } finally {  
    79.       w.unlock();  
    80.     }  
    81.   }  
    82.   //初始化集群信息   
    83.   public void discoverClusterSlots(Jedis jedis) {  
    84.     w.lock();  
    85.   
    86.     try {  
    87.       this.slots.clear();  
    88.       //通過 slots 命令獲取集群信息  
    89.       List<Object> slots = jedis.clusterSlots();  
    90.   
    91.       for (Object slotInfoObj : slots) {  
    92.         List<Object> slotInfo = (List<Object>) slotInfoObj;  
    93.   
    94.         if (slotInfo.size() <= 2) {  
    95.           continue;  
    96.         }  
    97.   
    98.         List<Integer> slotNums = getAssignedSlotArray(slotInfo);  
    99.   
    100.         // hostInfos  
    101.         List<Object> hostInfos = (List<Object>) slotInfo.get(2);  
    102.         if (hostInfos.size() <= 0) {  
    103.           continue;  
    104.         }  
    105.   
    106.         // at this time, we just use master, discard slave information  
    107.         HostAndPort targetNode = generateHostAndPort(hostInfos);  
    108.   
    109.         setNodeIfNotExist(targetNode);  
    110.         assignSlotsToNode(slotNums, targetNode);  
    111.       }  
    112.     } finally {  
    113.       w.unlock();  
    114.     }  
    115.   }  
    116.   
    117.   private HostAndPort generateHostAndPort(List<Object> hostInfos) {  
    118.     return new HostAndPort(SafeEncoder.encode((byte[]) hostInfos.get(0)),  
    119.         ((Long) hostInfos.get(1)).intValue());  
    120.   }  
    121.   
    122.   public void setNodeIfNotExist(HostAndPort node) {  
    123.     w.lock();  
    124.     try {  
    125.       String nodeKey = getNodeKey(node);  
    126.       if (nodes.containsKey(nodeKey)) return;  
    127.   
    128.       JedisPool nodePool = new JedisPool(poolConfig, node.getHost(), node.getPort(),  
    129.               connectionTimeout, soTimeout, null0null);  
    130.       nodes.put(nodeKey, nodePool);  
    131.     } finally {  
    132.       w.unlock();  
    133.     }  
    134.   }  
    135.   
    136.   public void assignSlotToNode(int slot, HostAndPort targetNode) {  
    137.     w.lock();  
    138.     try {  
    139.       JedisPool targetPool = nodes.get(getNodeKey(targetNode));  
    140.   
    141.       if (targetPool == null) {  
    142.         setNodeIfNotExist(targetNode);  
    143.         targetPool = nodes.get(getNodeKey(targetNode));  
    144.       }  
    145.       slots.put(slot, targetPool);  
    146.     } finally {  
    147.       w.unlock();  
    148.     }  
    149.   }  
    150.   
    151.   public void assignSlotsToNode(List<Integer> targetSlots, HostAndPort targetNode) {  
    152.     w.lock();  
    153.     try {  
    154.       JedisPool targetPool = nodes.get(getNodeKey(targetNode));  
    155.   
    156.       if (targetPool == null) {  
    157.         setNodeIfNotExist(targetNode);  
    158.         targetPool = nodes.get(getNodeKey(targetNode));  
    159.       }  
    160.   
    161.       for (Integer slot : targetSlots) {  
    162.         slots.put(slot, targetPool);  
    163.       }  
    164.     } finally {  
    165.       w.unlock();  
    166.     }  
    167.   }  
    168.   
    169.   public JedisPool getNode(String nodeKey) {  
    170.     r.lock();  
    171.     try {  
    172.       return nodes.get(nodeKey);  
    173.     } finally {  
    174.       r.unlock();  
    175.     }  
    176.   }  
    177.   
    178.   public JedisPool getSlotPool(int slot) {  
    179.     r.lock();  
    180.     try {  
    181.       return slots.get(slot);  
    182.     } finally {  
    183.       r.unlock();  
    184.     }  
    185.   }  
    186.   
    187.   public Map<String, JedisPool> getNodes() {  
    188.     r.lock();  
    189.     try {  
    190.       return new HashMap<String, JedisPool>(nodes);  
    191.     } finally {  
    192.       r.unlock();  
    193.     }  
    194.   }  
    195.   
    196.   public static String getNodeKey(HostAndPort hnp) {  
    197.     return hnp.getHost() + ":" + hnp.getPort();  
    198.   }  
    199.   
    200.   public static String getNodeKey(Client client) {  
    201.     return client.getHost() + ":" + client.getPort();  
    202.   }  
    203.   
    204.   public static String getNodeKey(Jedis jedis) {  
    205.     return getNodeKey(jedis.getClient());  
    206.   }  
    207.   
    208.   private List<Integer> getAssignedSlotArray(List<Object> slotInfo) {  
    209.     List<Integer> slotNums = new ArrayList<Integer>();  
    210.     for (int slot = ((Long) slotInfo.get(0)).intValue(); slot <= ((Long) slotInfo.get(1))  
    211.         .intValue(); slot++) {  
    212.       slotNums.add(slot);  
    213.     }  
    214.     return slotNums;  
    215.   }  
    216.   
    217. }  
    218.   
    219.     
    220.   public Map<String, JedisPool> getNodes() {  
    221.     return cache.getNodes();  
    222.   }  
    223.   
    224.   private void initializeSlotsCache(Set<HostAndPort> startNodes, GenericObjectPoolConfig poolConfig) {  
    225.     for (HostAndPort hostAndPort : startNodes) {  
    226.       Jedis jedis = new Jedis(hostAndPort.getHost(), hostAndPort.getPort());  
    227.       try {  
    228.         cache.discoverClusterNodesAndSlots(jedis);  
    229.         break;  
    230.       } catch (JedisConnectionException e) {  
    231.         // try next nodes  
    232.       } finally {  
    233.         if (jedis != null) {  
    234.           jedis.close();  
    235.         }  
    236.       }  
    237.     }  
    238.   
    239.     for (HostAndPort node : startNodes) {  
    240.       cache.setNodeIfNotExist(node);  
    241.     }  
    242.   }  
    243.   
    244.   public void renewSlotCache() {  
    245.     for (JedisPool jp : getShuffledNodesPool()) {  
    246.       Jedis jedis = null;  
    247.       try {  
    248.         jedis = jp.getResource();  
    249.         cache.discoverClusterSlots(jedis);  
    250.         break;  
    251.       } catch (JedisConnectionException e) {  
    252.         // try next nodes  
    253.       } finally {  
    254.         if (jedis != null) {  
    255.           jedis.close();  
    256.         }  
    257.       }  
    258.     }  
    259.   }  
    260.   
    261.   public void renewSlotCache(Jedis jedis) {  
    262.     try {  
    263.       cache.discoverClusterSlots(jedis);  
    264.     } catch (JedisConnectionException e) {  
    265.       renewSlotCache();  
    266.     }  
    267.   }  
    268.   
    269.   protected List<JedisPool> getShuffledNodesPool() {  
    270.     List<JedisPool> pools = new ArrayList<JedisPool>();  
    271.     pools.addAll(cache.getNodes().values());  
    272.     Collections.shuffle(pools);  
    273.     return pools;  
    274.   }  
    275. }  

       總結:

      1.JedisCluster 會初始化一個 連接獲取集群信息通過 solts 命令。(JedisClusterInfoCache 構造方法初始化)

      2.get ,set 的時候。會通過key JedisClusterCRC16.getSlot(key) 定位到solt

      3. 然后根據solt獲取 jedis

    public Jedis getConnectionFromSlot(int slot) {
        JedisPool connectionPool = cache.getSlotPool(slot);

      4.執行操作

     

      讀操作:主庫,從庫都會讀

      寫操作:主庫寫

    posted on 2016-12-20 16:29 jinfeng_wang 閱讀(820) 評論(0)  編輯  收藏 所屬分類: 2016-REDIS
    主站蜘蛛池模板: 中文亚洲成a人片在线观看| 美女视频黄.免费网址| 亚洲中文字幕伊人久久无码| 全免费毛片在线播放| 国产一区二区三区免费观看在线| 狠狠入ady亚洲精品| 亚洲国产精品综合久久网各| 亚洲AV永久精品爱情岛论坛| 亚洲精品无码久久久| 宅男666在线永久免费观看| 国产1024精品视频专区免费 | 久久经典免费视频| 暖暖免费在线中文日本| 一个人免费观看视频在线中文| 亚洲国产成人无码AV在线| 亚洲Av高清一区二区三区| 久久丫精品国产亚洲av| 亚洲日韩精品无码专区网址| 亚洲精品国产自在久久| 国产成人涩涩涩视频在线观看免费 | AAA日本高清在线播放免费观看| 九九九精品视频免费| 337P日本欧洲亚洲大胆艺术图| 亚洲一级毛片在线播放| 亚洲国产成人久久99精品| 中文字幕亚洲免费无线观看日本| 亚洲s色大片在线观看| 亚洲av无码专区国产乱码在线观看| 国产av无码专区亚洲av果冻传媒| 亚洲国产成人五月综合网 | 亚洲另类自拍丝袜第1页| 久久亚洲精精品中文字幕| 亚洲最大福利视频网站| 亚洲a在线视频视频| 亚洲av色福利天堂| 亚洲网址在线观看你懂的| 亚洲国产综合精品中文第一区 | jjizz全部免费看片| 国产精品久久久久免费a∨| 无码免费午夜福利片在线| 最近的免费中文字幕视频|