首页 > 其他分享 >dpdk mem pool1

dpdk mem pool1

时间:2022-12-10 13:02:54浏览次数:39  
标签:rte mem cache ret mp pool1 dpdk mempool size

以dpvs中的一段代码为例:

/* connection cache on each NUMA socket */
for (i = 0; i < get_numa_nodes(); i++) {
snprintf(poolname, sizeof(poolname), "dp_vs_conn_%d", i);
dp_vs_conn_cache[i] = rte_mempool_create(poolname,
conn_pool_size,
sizeof(struct dp_vs_conn),
conn_pool_cache,
0, NULL, NULL, NULL, NULL,
i, 0);
if (!dp_vs_conn_cache[i]) {
err = EDPVS_NOMEM;
goto cleanup;
}
}

每个cpu核下面都会生成一个local cache;

/**
* The RTE mempool structure.
*/
struct rte_mempool {
char name[RTE_MEMPOOL_NAMESIZE]; /**< Name of mempool. */
RTE_STD_C11
union {
void *pool_data; /**< Ring or pool to store objects.实际存储池子的地方 */
uint64_t pool_id; /**< External mempool identifier. */
};
void *pool_config; /**< optional args for ops alloc. */
const struct rte_memzone *mz; /**< Memzone where pool is alloc'd.池子的memzone */
unsigned int flags; /**< Flags of the mempool. */
int socket_id; /**< Socket id passed at create. */
uint32_t size; /**< Max size of the mempool. mempool的size*/
uint32_t cache_size;
/**< Size of per-lcore default local cache. */

uint32_t elt_size; /**< Size of an element. element的Size */
uint32_t header_size; /**< Size of header (before elt). */
uint32_t trailer_size; /**< Size of trailer (after elt). */

unsigned private_data_size; /**< Size of private data. 私有属性的大小*/
/**
* Index into rte_mempool_ops_table array of mempool ops
* structs, which contain callback function pointers.
* We're using an index here rather than pointers to the callbacks
* to facilitate any secondary processes that may want to use
* this mempool.
*/
int32_t ops_index;//这里存储的是实际的环状数组的操作方法

struct rte_mempool_cache *local_cache; /**< Per-lcore local cache cache进行加速 */

uint32_t populated_size; /**< Number of populated objects. */
struct rte_mempool_objhdr_list elt_list; /**< List of objects in pool */
uint32_t nb_mem_chunks; /**< Number of memory chunks */
struct rte_mempool_memhdr_list mem_list; /**< List of memory chunks */

#ifdef RTE_LIBRTE_MEMPOOL_DEBUG
/** Per-lcore statistics. */
struct rte_mempool_debug_stats stats[RTE_MAX_LCORE];
#endif
} __rte_cache_aligned;

 

 

mempool的创建

  • 1.mempool头结构。mempool由名字区分,挂接在​​struct rte_tailq_elem rte_mempool_tailq​​​全局队列中,可以根据mempool的名字进行查找,使用​​rte_mempool_lookup()​​接口即可。这只是个mempool的指示结构,mempool分配的内存区并不在这里面,只是通过物理和虚拟地址指向实际的内存地址。
  • 2.mempool的实际空间。这就是通过内存分配出来的地址连续的空间,用来存储mempool的obj对象。
  • 3.ring队列。ring是个环形无锁队列。其作用就是存放mempool中的对象指针,提供了方便存取使用mempool的空间的办法。
/* create the mempool 
mp_init 和obj_init,前者负责初始化mempool中配置的私有参数,如在数据包中加入的我们自己的私有结构;
后者负责初始化每个mempool对象

*/
struct rte_mempool *
rte_mempool_create(const char *name, unsigned n, unsigned elt_size,
unsigned cache_size, unsigned private_data_size,
rte_mempool_ctor_t *mp_init, void *mp_init_arg,
rte_mempool_obj_cb_t *obj_init, void *obj_init_arg,
int socket_id, unsigned flags)
{
int ret;
struct rte_mempool *mp;

mp = rte_mempool_create_empty(name, n, elt_size, cache_size,
private_data_size, socket_id, flags);
if (mp == NULL)
return NULL;

/*
* Since we have 4 combinations of the SP/SC/MP/MC examine the flags to
* set the correct index into the table of ops structs.
*/
if ((flags & RTE_MEMPOOL_F_SP_PUT) && (flags & RTE_MEMPOOL_F_SC_GET))
ret = rte_mempool_set_ops_byname(mp, "ring_sp_sc", NULL);
else if (flags & RTE_MEMPOOL_F_SP_PUT)
ret = rte_mempool_set_ops_byname(mp, "ring_sp_mc", NULL);
else if (flags & RTE_MEMPOOL_F_SC_GET)
ret = rte_mempool_set_ops_byname(mp, "ring_mp_sc", NULL);
else
ret = rte_mempool_set_ops_byname(mp, "ring_mp_mc", NULL);

if (ret)
goto fail;

/* call the mempool priv initializer */
if (mp_init)
mp_init(mp, mp_init_arg);
//.mempool实际空间的创建和ring的创建
if (rte_mempool_populate_default(mp) < 0)
goto fail;

/* call the object initializers */
if (obj_init)
rte_mempool_obj_iter(mp, obj_init, obj_init_arg);

rte_mempool_trace_create(name, n, elt_size, cache_size,
private_data_size, mp_init, mp_init_arg, obj_init,
obj_init_arg, flags, mp);
return mp;

fail:
rte_mempool_free(mp);
return NULL;
}

 

/* Default function to populate the mempool: allocate memory in memzones,
* and populate them. Return the number of objects added, or a negative
* value on error.
*/
int
rte_mempool_populate_default(struct rte_mempool *mp)
{
unsigned int mz_flags = RTE_MEMZONE_1GB|RTE_MEMZONE_SIZE_HINT_ONLY;
char mz_name[RTE_MEMZONE_NAMESIZE];
const struct rte_memzone *mz;
ssize_t mem_size;
size_t align, pg_sz, pg_shift = 0;
rte_iova_t iova;
unsigned mz_id, n;
int ret;
bool need_iova_contig_obj;
size_t max_alloc_size = SIZE_MAX;

ret = mempool_ops_alloc_once(mp);//调用 common_ring_alloc---->ring_alloc
if (ret != 0)
return ret;

/* mempool must not be populated */
if (mp->nb_mem_chunks != 0)
return -EEXIST;

/*
* the following section calculates page shift and page size values.
*
* these values impact the result of calc_mem_size operation, which
* returns the amount of memory that should be allocated to store the
* desired number of objects. when not zero, it allocates more memory
* for the padding between objects, to ensure that an object does not
* cross a page boundary. in other words, page size/shift are to be set
* to zero if mempool elements won't care about page boundaries.
* there are several considerations for page size and page shift here.
*
* if we don't need our mempools to have physically contiguous objects,
* then just set page shift and page size to 0, because the user has
* indicated that there's no need to care about anything.
*
* if we do need contiguous objects (if a mempool driver has its
* own calc_size() method returning min_chunk_size = mem_size),
* there is also an option to reserve the entire mempool memory
* as one contiguous block of memory.
*
* if we require contiguous objects, but not necessarily the entire
* mempool reserved space to be contiguous, pg_sz will be != 0,
* and the default ops->populate() will take care of not placing
* objects across pages.
*
* if our IO addresses are physical, we may get memory from bigger
* pages, or we might get memory from smaller pages, and how much of it
* we require depends on whether we want bigger or smaller pages.
* However, requesting each and every memory size is too much work, so
* what we'll do instead is walk through the page sizes available, pick
* the smallest one and set up page shift to match that one. We will be
* wasting some space this way, but it's much nicer than looping around
* trying to reserve each and every page size.
*
* If we fail to get enough contiguous memory, then we'll go and
* reserve space in smaller chunks.
*/

need_iova_contig_obj = !(mp->flags & RTE_MEMPOOL_F_NO_IOVA_CONTIG);
ret = rte_mempool_get_page_size(mp, &pg_sz);
if (ret < 0)
return ret;

if (pg_sz != 0)
pg_shift = rte_bsf32(pg_sz);

for (mz_id = 0, n = mp->size; n > 0; mz_id++, n -= ret) {
size_t min_chunk_size;

mem_size = rte_mempool_ops_calc_mem_size(
mp, n, pg_shift, &min_chunk_size, &align);

if (mem_size < 0) {
ret = mem_size;
goto fail;
}

ret = snprintf(mz_name, sizeof(mz_name),
RTE_MEMPOOL_MZ_FORMAT "_%d", mp->name, mz_id);
if (ret < 0 || ret >= (int)sizeof(mz_name)) {
ret = -ENAMETOOLONG;
goto fail;
}

/* if we're trying to reserve contiguous memory, add appropriate
* memzone flag.
*/
if (min_chunk_size == (size_t)mem_size)
mz_flags |= RTE_MEMZONE_IOVA_CONTIG;

/* Allocate a memzone, retrying with a smaller area on ENOMEM */
do {//分配空间---从内存区中直接获取一个足够大的内存区,存放内存池
mz = rte_memzone_reserve_aligned(mz_name,
RTE_MIN((size_t)mem_size, max_alloc_size),
mp->socket_id, mz_flags, align);

if (mz != NULL || rte_errno != ENOMEM)
break;

max_alloc_size = RTE_MIN(max_alloc_size,
(size_t)mem_size) / 2;
} while (mz == NULL && max_alloc_size >= min_chunk_size);

if (mz == NULL) {
ret = -rte_errno;
goto fail;
}

if (need_iova_contig_obj)
iova = mz->iova;
else
iova = RTE_BAD_IOVA;
/*
初始化好一个对象元素后,会将这个对象元素放到这个ring队列中,在所有元素都初始化完成后,此时ring队列存放了内存池上所有的对象元素。
需要注意的是ring队列存放的是对象元素的指针而已,而不是对象元素本身的拷贝。应用程序要申请内存时,调用rte_mempool_get,
最终是从这个ring队列中获取元素的; 应用程序调用rte_mempool_put将内存回收时,也是将要回收的内存空间放到这个ring队列中。
因此内存池与ring队列相互关联起来

*/
if (pg_sz == 0 || (mz_flags & RTE_MEMZONE_IOVA_CONTIG))
ret = rte_mempool_populate_iova(mp, mz->addr,
iova, mz->len,
rte_mempool_memchunk_mz_free,
(void *)(uintptr_t)mz);//-->>调用 mempool_add_elem//将初始化完成的对象元素入队ring
else
ret = rte_mempool_populate_virt(mp, mz->addr,
mz->len, pg_sz,
rte_mempool_memchunk_mz_free,
(void *)(uintptr_t)mz);
if (ret == 0) /* should not happen */
ret = -ENOBUFS;
if (ret < 0) {
rte_memzone_free(mz);
goto fail;
}
}

rte_mempool_trace_populate_default(mp);
return mp->size;

fail:
rte_mempool_free_memchunks(mp);
return ret;
}

 

dpdk mem pool1_本地缓存

 

内存的申请

在创建好内存池后,当应用程序需要从内存池中获取一个对象元素的空间时,可以调用rte_mempool_get从内存池中获取一个元素空间。优先从每个cpu本身的缓存中查找是否有空闲的对象元素,如果有就从cpu本地缓存中获取;如果cpu本地缓存没有空闲的对象元素,则从ring队列中取出一个对象元素。这里所说的cpu本地缓存并不是cpu硬件上的cache, 而是应用层为每个cpu准备的缓存。之所以要维护一个cpu本地缓存是为了尽量减少多个cpu同时访问内存池上的元素,减少竞争的发生

/**
* @internal Get several objects from the mempool; used internally.
* @param mp
* A pointer to the mempool structure.
* @param obj_table
* A pointer to a table of void * pointers (objects).
* @param n
* The number of objects to get, must be strictly positive.
* @param cache
* A pointer to a mempool cache structure. May be NULL if not needed.
* @return
* - >=0: Success; number of objects supplied.
* - <0: Error; code of ring dequeue function.
*/
static __rte_always_inline int
rte_mempool_do_generic_get(struct rte_mempool *mp, void **obj_table,
unsigned int n, struct rte_mempool_cache *cache)
{
int ret;
uint32_t index, len;
void **cache_objs;

/* No cache provided or cannot be satisfied from cache */
if (unlikely(cache == NULL || n >= cache->size))
goto ring_dequeue;

cache_objs = cache->objs;

/* Can this be satisfied from the cache? */
if (cache->len < n) {
/* No. Backfill the cache first, and then fill from it */
uint32_t req = n + (cache->size - cache->len);

/* How many do we require i.e. number to fill the cache + the request */
ret = rte_mempool_ops_dequeue_bulk(mp,
&cache->objs[cache->len], req);
if (unlikely(ret < 0)) {
/*
* In the off chance that we are buffer constrained,
* where we are not able to allocate cache + n, go to
* the ring directly. If that fails, we are truly out of
* buffers.
*/
goto ring_dequeue;
}

cache->len += req;
}

/* Now fill in the response ... */
for (index = 0, len = cache->len - 1; index < n; ++index, len--, obj_table++)
*obj_table = cache_objs[len];

cache->len -= n;

RTE_MEMPOOL_STAT_ADD(mp, get_success_bulk, 1);
RTE_MEMPOOL_STAT_ADD(mp, get_success_objs, n);

return 0;

ring_dequeue:

/* get remaining objects from ring */
ret = rte_mempool_ops_dequeue_bulk(mp, obj_table, n);

if (ret < 0) {
RTE_MEMPOOL_STAT_ADD(mp, get_fail_bulk, 1);
RTE_MEMPOOL_STAT_ADD(mp, get_fail_objs, n);
} else {
RTE_MEMPOOL_STAT_ADD(mp, get_success_bulk, 1);
RTE_MEMPOOL_STAT_ADD(mp, get_success_objs, n);
}

return ret;
}

 

内存的释放

当应用层已经不在需要使用某个内存时,需要将他进行回收,以免造成内存泄漏,进而导致内存池没有空间了,其他应用程序无法在获取内存空间。可以调用rte_mempool_put将不再使用的内存放回到内存池中。 首先也是查看cpu本地缓存是否还有空间,如果有则优先把元素放到cpu本地缓存;如果没有则将要释放的对象元素放回到ring队列中

/**
* @internal Put several objects back in the mempool; used internally.
* @param mp
* A pointer to the mempool structure.
* @param obj_table
* A pointer to a table of void * pointers (objects).
* @param n
* The number of objects to store back in the mempool, must be strictly
* positive.
* @param cache
* A pointer to a mempool cache structure. May be NULL if not needed.
*/
static __rte_always_inline void
rte_mempool_do_generic_put(struct rte_mempool *mp, void * const *obj_table,
unsigned int n, struct rte_mempool_cache *cache)
{
void **cache_objs;

/* increment stat now, adding in mempool always success */
RTE_MEMPOOL_STAT_ADD(mp, put_bulk, 1);
RTE_MEMPOOL_STAT_ADD(mp, put_objs, n);

/* No cache provided or if put would overflow mem allocated for cache */
if (unlikely(cache == NULL || n > RTE_MEMPOOL_CACHE_MAX_SIZE))
goto ring_enqueue;

cache_objs = &cache->objs[cache->len];

/*
* The cache follows the following algorithm
* 1. Add the objects to the cache
* 2. Anything greater than the cache min value (if it crosses the
* cache flush threshold) is flushed to the ring.
*/

/* Add elements back into the cache */
rte_memcpy(&cache_objs[0], obj_table, sizeof(void *) * n);

cache->len += n;

if (cache->len >= cache->flushthresh) {
rte_mempool_ops_enqueue_bulk(mp, &cache->objs[cache->size],
cache->len - cache->size);
cache->len = cache->size;
}

return;

ring_enqueue:

/* push remaining objects in ring */
#ifdef RTE_LIBRTE_MEMPOOL_DEBUG
if (rte_mempool_ops_enqueue_bulk(mp, obj_table, n) < 0)
rte_panic("cannot put objects in mempool\n");
#else
rte_mempool_ops_enqueue_bulk(mp, obj_table, n);
#endif
}

 

http代理服务器(3-4-7层代理)-网络事件库公共组件、内核kernel驱动 摄像头驱动 tcpip网络协议栈、netfilter、bridge 好像看过!!!! 但行好事 莫问前程 --身高体重180的胖子



标签:rte,mem,cache,ret,mp,pool1,dpdk,mempool,size
From: https://blog.51cto.com/u_15404950/5927489

相关文章