首页 > 其他分享 >twemproxy ketama一致性hash分析(转)

twemproxy ketama一致性hash分析(转)

时间:2022-12-23 16:15:06浏览次数:63  
标签:index hash continuum twemproxy server uint32 pool ketama

    • 测试Twemproxy集群,双主双活

      向twemproxy集群做写操作时,发现key的分布不太理想。在测试节点故障时,也发现一些和预想不太一样的地方。

      1、Key的一致性Hash

      当尝试以a001,a002这样有规律且的key值写入的时候,在4节点的集群环境中,key主要分布在其中的2台节点,另外两台分配极少。对于一些应用来说,key值可能根据一定规则生成,所以有被定向分配的可能。

      解决办法在key中使用hash_key:{},hask_key使用8位随机数,测试结果分布的比较满意。

      测试4节点中key的分布:

      1: 12917
      2: 10761
      3: 8596
      4: 14382 

      由于ketama的算法仍是使用了md5签名(具体后面说),又特意观察了比如有序数字生成的md5序列,结果并没有出现明显的有序或连序值。所以只能建议不使用连续的数据结尾key做一致性hash key。

      2、ketama算法

      twemproxy源码下载:https://github.com/twitter/twemproxy,命令:git clone https://github.com/twitter/twemproxy

      关于ketama算法的代码在nc_ketama.c文件中,主要是四个方法:

      • ketama_hash 计算某个主机,某个point的hash值
      • ketama_item_cmp 比较两个连续区的值,用于在ketama_update 方法中排序
      • ketama_update 更新server-pool的分配策略
      • ketama_dispatch 找出给定hash值所在的连续区

      2.1 连续区

      说一下连续区(continuum),参考下图。想象所有md5的值构成下面完整的“环”(没有起点),那么所有md5结果值在环上都有一个固定的位置。

      按ketama的算法,在这个环上创建服务器数*160个点,这些点把环分成了同等数量的段。

      那么,被插入数据的md5值也一定会落到环的某个区间,以此来判断数据应被写入哪台服务器。

      参考:理想化的Redis集群

      2.2 如何生成ketama_hash

      再来看服务器+点的hash值是如何生成的:

      alignment的值固定是4,ketama_hash是对由server名+索引组成的md5签名,从第16位开始取值,再重组一个32位值。

      static uint32_t
      ketama_hash(const char *key, size_t key_length, uint32_t alignment)
      {
          unsigned char results[16];
      
          md5_signature((unsigned char*)key, key_length, results);
      
          return ((uint32_t) (results[3 + alignment * 4] & 0xFF) << 24)
              | ((uint32_t) (results[2 + alignment * 4] & 0xFF) << 16)
              | ((uint32_t) (results[1 + alignment * 4] & 0xFF) << 8)
              | (results[0 + alignment * 4] & 0xFF);
      }

      下面是调用ketama_hash的代码:

      for (x = 0; x < pointer_per_hash; x++) {
          value = ketama_hash(host, hostlen, x);
          pool->continuum[continuum_index].index = server_index;
          pool->continuum[continuum_index++].value = value;
      }

      每个服务器被分成160个point点,由服务器名+索引组成host值,x值等于160/索引。

      这样计算出的服务器各点的值并不是有序的,所以进行排序。

      qsort(pool->continuum, pool->ncontinuum, sizeof(*pool->continuum), ketama_item_cmp);

      排序后的点值是连续的,但同一服务器的点并不一定连续。这时,所有的值构成了用于一致性hash的环。

      2.3、分配Key

      由ketama_dispatch实现key值的分配。

      可见方法中使用二分法找到一个值在环中的对应区域。

      uint32_t
      ketama_dispatch(struct continuum *continuum, uint32_t ncontinuum, uint32_t hash)
      {
          struct continuum *begin, *end, *left, *right, *middle;
      
          ASSERT(continuum != NULL);
          ASSERT(ncontinuum != 0);
      
          begin = left = continuum;
          end = right = continuum + ncontinuum;
      
          while (left < right) {
              middle = left + (right - left) / 2;
              if (middle->value < hash) {
                left = middle + 1;
              } else {
                right = middle;
              }
          }
          if (right == end) {
              right = begin;
          }
          return right->index;
      }

      3、服务器的故障处理

      从集群中摘除节点时,ketama的算法不会重新计算"环"。当需要写入故障节点时,会抛出异常。

      仔细想一下是合理的,因为摘除的节点持有一部分数据,一般来说是需要恢复的,这是一个前提。

      我们假设twemproxy可以感知节点故障,并重新计算分配策略。那么,故障后又有新的数据写入。这时,一部分原本要写入故障节点的数据会被分配到其它节点上。

      随后,故障节点恢复,twemproxy又重新调整了分配策略。那么,后写入的那部分数据就不会再被找到(这个有点像内存泄露)。


      nc_ketama.c 完整代码

    • /*
       * twemproxy - A fast and lightweight proxy for memcached protocol.
       * Copyright (C) 2011 Twitter, Inc.
       *
       * Licensed under the Apache License, Version 2.0 (the "License");
       * you may not use this file except in compliance with the License.
       * You may obtain a copy of the License at
       *
       * http://www.apache.org/licenses/LICENSE-2.0
       *
       * Unless required by applicable law or agreed to in writing, software
       * distributed under the License is distributed on an "AS IS" BASIS,
       * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
       * See the License for the specific language governing permissions and
       * limitations under the License.
       */
      
      #include <stdio.h>
      #include <stdlib.h>
      #include <math.h>
      
      #include <nc_core.h>
      #include <nc_server.h>
      #include <nc_hashkit.h>
      
      #define KETAMA_CONTINUUM_ADDITION   10  /* # extra slots to build into continuum */
      #define KETAMA_POINTS_PER_SERVER    160 /* 40 points per hash */
      #define KETAMA_MAX_HOSTLEN          86
      
      static uint32_t
      ketama_hash(const char *key, size_t key_length, uint32_t alignment)
      {
          unsigned char results[16];
      
          md5_signature((unsigned char*)key, key_length, results);
      
          return ((uint32_t) (results[3 + alignment * 4] & 0xFF) << 24)
              | ((uint32_t) (results[2 + alignment * 4] & 0xFF) << 16)
              | ((uint32_t) (results[1 + alignment * 4] & 0xFF) << 8)
              | (results[0 + alignment * 4] & 0xFF);
      }
      
      static int
      ketama_item_cmp(const void *t1, const void *t2)
      {
          const struct continuum *ct1 = t1, *ct2 = t2;
      
          if (ct1->value == ct2->value) {
              return 0;
          } else if (ct1->value > ct2->value) {
              return 1;
          } else {
              return -1;
          }
      }
      
      rstatus_t
      ketama_update(struct server_pool *pool)
      {
          uint32_t nserver;             /* # server - live and dead */
          uint32_t nlive_server;        /* # live server */
          uint32_t pointer_per_server;  /* pointers per server proportional to weight */
          uint32_t pointer_per_hash;    /* pointers per hash */
          uint32_t pointer_counter;     /* # pointers on continuum */
          uint32_t pointer_index;       /* pointer index */
          uint32_t points_per_server;   /* points per server */
          uint32_t continuum_index;     /* continuum index */
          uint32_t continuum_addition;  /* extra space in the continuum */
          uint32_t server_index;        /* server index */
          uint32_t value;               /* continuum value */
          uint32_t total_weight;        /* total live server weight */
          int64_t now;                  /* current timestamp in usec */
      
          ASSERT(array_n(&pool->server) > 0);
      
          now = nc_usec_now();
          if (now < 0) {
              return NC_ERROR;
          }
      
          /*
           * Count live servers and total weight, and also update the next time to
           * rebuild the distribution
           */
          nserver = array_n(&pool->server);
          nlive_server = 0;
          total_weight = 0;
          pool->next_rebuild = 0LL;
          for (server_index = 0; server_index < nserver; server_index++) {
              struct server *server = array_get(&pool->server, server_index);
      
              if (pool->auto_eject_hosts) {
                  if (server->next_retry <= now) {
                      server->next_retry = 0LL;
                      nlive_server++;
                  } else if (pool->next_rebuild == 0LL ||
                             server->next_retry < pool->next_rebuild) {
                      pool->next_rebuild = server->next_retry;
                  }
              } else {
                  nlive_server++;
              }
      
              ASSERT(server->weight > 0);
      
              /* count weight only for live servers */
              if (!pool->auto_eject_hosts || server->next_retry <= now) {
                  total_weight += server->weight;
              }
          }
      
          pool->nlive_server = nlive_server;
      
          if (nlive_server == 0) {
              log_debug(LOG_DEBUG, "no live servers for pool %"PRIu32" '%.*s'",
                        pool->idx, pool->name.len, pool->name.data);
      
              return NC_OK;
          }
          log_debug(LOG_DEBUG, "%"PRIu32" of %"PRIu32" servers are live for pool "
                    "%"PRIu32" '%.*s'", nlive_server, nserver, pool->idx,
                    pool->name.len, pool->name.data);
      
          continuum_addition = KETAMA_CONTINUUM_ADDITION;
          points_per_server = KETAMA_POINTS_PER_SERVER;
          /*
           * Allocate the continuum for the pool, the first time, and every time we
           * add a new server to the pool
           */
          if (nlive_server > pool->nserver_continuum) {
              struct continuum *continuum;
              uint32_t nserver_continuum = nlive_server + continuum_addition;
              uint32_t ncontinuum = nserver_continuum * points_per_server;
      
              continuum = nc_realloc(pool->continuum, sizeof(*continuum) * ncontinuum);
              if (continuum == NULL) {
                  return NC_ENOMEM;
              }
      
              pool->continuum = continuum;
              pool->nserver_continuum = nserver_continuum;
              /* pool->ncontinuum is initialized later as it could be <= ncontinuum */
          }
      
          /*
           * Build a continuum with the servers that are live and points from
           * these servers that are proportial to their weight
           */
          continuum_index = 0;
          pointer_counter = 0;
          for (server_index = 0; server_index < nserver; server_index++) {
              struct server *server;
              float pct;
      
              server = array_get(&pool->server, server_index);
      
              if (pool->auto_eject_hosts && server->next_retry > now) {
                  continue;
              }
      
              pct = (float)server->weight / (float)total_weight;
              pointer_per_server = (uint32_t) ((floorf((float) (pct * KETAMA_POINTS_PER_SERVER / 4 * (float)nlive_server + 0.0000000001))) * 4);
              pointer_per_hash = 4;
      
              log_debug(LOG_VERB, "%.*s:%"PRIu16" weight %"PRIu32" of %"PRIu32" "
                        "pct %0.5f points per server %"PRIu32"",
                        server->name.len, server->name.data, server->port,
                        server->weight, total_weight, pct, pointer_per_server);
      
              for (pointer_index = 1;
                   pointer_index <= pointer_per_server / pointer_per_hash;
                   pointer_index++) {
      
                  char host[KETAMA_MAX_HOSTLEN]= "";
                  size_t hostlen;
                  uint32_t x;
      
                  hostlen = snprintf(host, KETAMA_MAX_HOSTLEN, "%.*s-%u",
                                     server->name.len, server->name.data,
                                     pointer_index - 1);
      
                  for (x = 0; x < pointer_per_hash; x++) {
                      value = ketama_hash(host, hostlen, x);
                      pool->continuum[continuum_index].index = server_index;
                      pool->continuum[continuum_index++].value = value;
                  }
              }
              pointer_counter += pointer_per_server;
          }
          
      
          pool->ncontinuum = pointer_counter;
          qsort(pool->continuum, pool->ncontinuum, sizeof(*pool->continuum),
                ketama_item_cmp);
      
          for (pointer_index = 0;
               pointer_index < ((nlive_server * KETAMA_POINTS_PER_SERVER) - 1);
               pointer_index++) {
              if (pointer_index + 1 >= pointer_counter) {
                  break;
              }
              ASSERT(pool->continuum[pointer_index].value <=
                     pool->continuum[pointer_index + 1].value);
          }
      
          log_debug(LOG_VERB, "updated pool %"PRIu32" '%.*s' with %"PRIu32" of "
                    "%"PRIu32" servers live in %"PRIu32" slots and %"PRIu32" "
                    "active points in %"PRIu32" slots", pool->idx,
                    pool->name.len, pool->name.data, nlive_server, nserver,
                    pool->nserver_continuum, pool->ncontinuum,
                    (pool->nserver_continuum + continuum_addition) * points_per_server);
      
          return NC_OK;
      }
      
      uint32_t
      ketama_dispatch(struct continuum *continuum, uint32_t ncontinuum, uint32_t hash)
      {
          struct continuum *begin, *end, *left, *right, *middle;
      
          ASSERT(continuum != NULL);
          ASSERT(ncontinuum != 0);
      
          begin = left = continuum;
          end = right = continuum + ncontinuum;
      
          while (left < right) {
              middle = left + (right - left) / 2;
              if (middle->value < hash) {
                left = middle + 1;
              } else {
                right = middle;
              }
          }
      
          if (right == end) {
              right = begin;
          }
      
          return right->index;
      }
 

标签:index,hash,continuum,twemproxy,server,uint32,pool,ketama
From: https://www.cnblogs.com/wangbin/p/17000868.html

相关文章

  • Set接口-HashSet和LinkedHashSet
    1.Set接口1.1基本介绍1)无序(添加和取出的顺序不一致),没有索引;2)不允许重复元素,所以最多包含一个null;3)JDKAPI中Set接口的实现类有:1.2Set接口的常用方法和L......
  • ConcurrentHashMap源码解析(JDK8)
    并发编程实践中,ConcurrentHashMap是一个经常被使用的数据结构,相比于Hashtable以及Collections.synchronizedMap(),ConcurrentHashMap在线程安全的基础上提供了更好的写并发能......
  • ConcurrentHashMap
    ​JDK5中添加了新的concurrent包,相对同步容器而言,并发容器通过一些机制改进了并发性能。因为同步容器将所有对容器状态的访问都串行化了,这样保证了线程的安全性,所以这种......
  • 分布式 | dble 沿用 jumpstringhash,移除 Mycat 一致性 hash 原因解析
    作者:爱可生开源社区背景MyCat对于字符串类型为分片字段的数据,有三种分片模式,分别是:模值hash(求模法),jumpstringhash(跳跃法),一致性hash(环割法)dble对于hash算法选取方面,除......
  • 你能谈谈HashMap怎样解决hash冲突吗
    在Java编程语言中,最基本的结构就是两种,一种是数组,一种是模拟指针(引用),所有的数据结构都可以用这两个基本结构构造,HashMap也一样。当程序试图将多个key-value放入HashM......
  • 一致性hash算法 - consistent hashing
    consistenthashing ​​算法​​早在 1997 年就在论文 ​​Consistenthashingandrandomtrees​​ 中被提出,目前在cache 系统中应用越来越广泛;1比如你有 N 个 ......
  • ConcurrentHashMap完全解析(jdk6/7,8)
    并发编程实践中,ConcurrentHashMap是一个经常被使用的数据结构,相比于Hashtable以及Collections.synchronizedMap(),ConcurrentHashMap在线程安全的基础上提供了更好的写并发能......
  • HashSet源码解读
    HashSet源码解读publicclassDemo5{finalVputVal(inthash,Kkey,Vvalue,booleanonlyIfAbsent,booleanevict){Node......
  • Lua 5.3 hashint函数缺陷导致遍历table性能非常差
    最近发现线上有个服务器某些逻辑耗时比较久,问了下同事,他告诉我是因为lua的pairs函数很慢导致的。“啊!不至于吧,这数据量才多少”我一脸诧异,记忆中Lua不至于慢到这种程度,遍......
  • Hash 算法详细介绍与实现 (二)
    前言书接上回,昨天写了第一部分,《​​Hash算法详细介绍与实现(一)​​》详细介绍了Hash表和Hash算法的相关概念以及算法的基本原理。同时简单介绍几种hash算法的实现:直接......