suricata之PoolThread
一、结构体
struct PoolThreadElement_ {SCMutex lock; /**< lock, should have low contention */Pool *pool; /**< actual pool */
};typedef struct PoolThreadElement_ PoolThreadElement;typedef struct PoolThread_ {size_t size; /**< size of the array */PoolThreadElement *array; /**< array of elements */
} PoolThread;
二、API
PoolThread *PoolThreadInit(int threads, uint32_t size, uint32_t prealloc_size, uint32_t elt_size, void *(*Alloc)(void), int (*Init)(void *, void *), void *InitData, void (*Cleanup)(void *), void (*Free)(void *));int PoolThreadExpand(PoolThread *pt);void PoolThreadFree(PoolThread *pt);void *PoolThreadGetById(PoolThread *pt, uint16_t id);void PoolThreadReturn(PoolThread *pt, void *data);void PoolThreadLock(PoolThread *pt, PoolThreadId id);
void PoolThreadReturnRaw(PoolThread *pt, PoolThreadId id, void *data);
void PoolThreadUnlock(PoolThread *pt, PoolThreadId id);int PoolThreadSize(PoolThread *pt);
三、实现
3.1 PoolThreadInit
创建指定个数的PoolThreadElement
,并为每个PoolThreadElement创建单独的Pool。每个PoolThreadElement有自己单独的lock,这样减少了多线程之间的锁竞争。
PoolThread *PoolThreadInit(int threads, uint32_t size, uint32_t prealloc_size,uint32_t elt_size, void *(*Alloc)(void), int (*Init)(void *, void *),void *InitData, void (*Cleanup)(void *), void (*Free)(void *))
{sc_errno = SC_OK;if (threads <= 0) {SCLogDebug("error");sc_errno = SC_EINVAL;return NULL;}PoolThread *pt = SCCalloc(1, sizeof(*pt));if (unlikely(pt == NULL)) {SCLogDebug("memory alloc error");sc_errno = SC_ENOMEM;goto error;}SCLogDebug("size %d", threads);pt->array = SCMalloc(threads * sizeof(PoolThreadElement));if (pt->array == NULL) {SCLogDebug("memory alloc error");sc_errno = SC_ENOMEM;goto error;}pt->size = threads;for (int i = 0; i < threads; i++) {PoolThreadElement *e = &pt->array[i];SCMutexInit(&e->lock, NULL);SCMutexLock(&e->lock);
// SCLogDebug("size %u prealloc_size %u elt_size %u Alloc %p Init %p InitData %p Cleanup %p Free %p",
// size, prealloc_size, elt_size,
// Alloc, Init, InitData, Cleanup, Free);e->pool = PoolInit(size, prealloc_size, elt_size, Alloc, Init, InitData, Cleanup, Free);SCMutexUnlock(&e->lock);if (e->pool == NULL) {SCLogDebug("error");goto error;}}return pt;
error:if (pt != NULL)PoolThreadFree(pt);return NULL;
}
3.2 PoolThreadExpand
当新创建一个线程时,如果此时线程数已经超过PoolThreadInit
初始化传递的threads时,调用PoolThreadExpand
增加一个PoolThreadElement
, 并使用第一个线程的PoolThreadElement配置进行初始化新创建的节点,并创建Pool,成功则返回已有线程数,否则返回-1。
int PoolThreadExpand(PoolThread *pt)
{if (pt == NULL || pt->array == NULL || pt->size == 0) {SCLogError("pool grow failed");return -1;}size_t newsize = pt->size + 1;SCLogDebug("newsize %"PRIuMAX, (uintmax_t)newsize);void *ptmp = SCRealloc(pt->array, (newsize * sizeof(PoolThreadElement)));if (ptmp == NULL) {SCFree(pt->array);pt->array = NULL;SCLogError("pool grow failed");return -1;}pt->array = ptmp;pt->size = newsize;/* copy settings from first thread that registered the pool */Pool settings;memset(&settings, 0x0, sizeof(settings));PoolThreadElement *e = &pt->array[0];SCMutexLock(&e->lock);settings.max_buckets = e->pool->max_buckets;settings.preallocated = e->pool->preallocated;settings.elt_size = e->pool->elt_size;settings.Alloc = e->pool->Alloc;settings.Init = e->pool->Init;settings.InitData = e->pool->InitData;settings.Cleanup = e->pool->Cleanup;settings.Free = e->pool->Free;SCMutexUnlock(&e->lock);e = &pt->array[newsize - 1];memset(e, 0x00, sizeof(*e));SCMutexInit(&e->lock, NULL);SCMutexLock(&e->lock);e->pool = PoolInit(settings.max_buckets, settings.preallocated,settings.elt_size, settings.Alloc, settings.Init, settings.InitData,settings.Cleanup, settings.Free);SCMutexUnlock(&e->lock);if (e->pool == NULL) {SCLogError("pool grow failed");return -1;}return (int)(newsize - 1);
}
3.3 PoolThreadSize
返回当前PoolThread中有多少线程数。
int PoolThreadSize(PoolThread *pt)
{if (pt == NULL)return -1;return (int)pt->size;
}
3.4 PoolThreadFree
将PoolThread销毁,从实现看,函数是不可重入的,因此只能有一个线程能调用此函数。
void PoolThreadFree(PoolThread *pt)
{if (pt == NULL)return;if (pt->array != NULL) {for (int i = 0; i < (int)pt->size; i++) {PoolThreadElement *e = &pt->array[i];SCMutexLock(&e->lock);PoolFree(e->pool);SCMutexUnlock(&e->lock);SCMutexDestroy(&e->lock);}SCFree(pt->array);}SCFree(pt);
}
3.5 PoolThreadGetById
每个线程有一个唯一id,通过此id从各自的Pool中获取预分配元素。
id为PoolThreadElement
数组下标, 并且将id写入元素的开头,用于归还给原来的Pool中。
void *PoolThreadGetById(PoolThread *pt, uint16_t id)
{void *data = NULL;if (pt == NULL || id >= pt->size)return NULL;PoolThreadElement *e = &pt->array[id];SCMutexLock(&e->lock);data = PoolGet(e->pool);SCMutexUnlock(&e->lock);if (data) {PoolThreadId *did = data;*did = id;}return data;
}
3.6 PoolThreadReturn
将元素归还给Pool。
void PoolThreadReturn(PoolThread *pt, void *data)
{PoolThreadId *id = data;if (pt == NULL || *id >= pt->size)return;SCLogDebug("returning to id %u", *id);PoolThreadElement *e = &pt->array[*id];SCMutexLock(&e->lock);PoolReturn(e->pool, data);SCMutexUnlock(&e->lock);
}
3.7 PoolThreadLock,PoolThreadReturnRaw,PoolThreadUnlock
这三个函数实现了PoolThreadReturn
功能,只是将锁的作用域扩大,这样能在数据受保护的情况下作更多的事情。
PoolThreadLock/PoolThreadUnlock必须配对使用。
void PoolThreadLock(PoolThread *pt, PoolThreadId id)
{DEBUG_VALIDATE_BUG_ON(pt == NULL || id >= pt->size);PoolThreadElement *e = &pt->array[id];SCMutexLock(&e->lock);
}
void PoolThreadReturnRaw(PoolThread *pt, PoolThreadId id, void *data)
{DEBUG_VALIDATE_BUG_ON(pt == NULL || id >= pt->size);PoolThreadElement *e = &pt->array[id];PoolReturn(e->pool, data);
}
void PoolThreadUnlock(PoolThread *pt, PoolThreadId id)
{DEBUG_VALIDATE_BUG_ON(pt == NULL || id >= pt->size);PoolThreadElement *e = &pt->array[id];SCMutexUnlock(&e->lock);
}
四、应用
从实现看,PoolThreadInit
以及PoolThreadExpand
是线程不安全的,需要加锁。
从PoolThread的使用也能看到,使用了一个全局的ssn_pool_mutex锁进行控制。
TmEcode StreamTcpThreadInit(ThreadVars *tv, void *initdata, void **data)
{
...
SCMutexLock(&ssn_pool_mutex);if (ssn_pool == NULL) {ssn_pool = PoolThreadInit(1, /* thread */0, /* unlimited */stream_config.prealloc_sessions,sizeof(TcpSession),StreamTcpSessionPoolAlloc,StreamTcpSessionPoolInit, NULL,StreamTcpSessionPoolCleanup, NULL);stt->ssn_pool_id = 0;SCLogDebug("pool size %d, thread ssn_pool_id %d", PoolThreadSize(ssn_pool), stt->ssn_pool_id);} else {/* grow ssn_pool until we have a element for our thread id */stt->ssn_pool_id = PoolThreadExpand(ssn_pool);SCLogDebug("pool size %d, thread ssn_pool_id %d", PoolThreadSize(ssn_pool), stt->ssn_pool_id);}
SCMutexUnlock(&ssn_pool_mutex);if (stt->ssn_pool_id < 0 || ssn_pool == NULL) {SCLogError("failed to setup/expand stream session pool. Expand stream.memcap?");SCReturnInt(TM_ECODE_FAILED);}...
但是Pool内部的SCMutex和全局的ssn_pool_mutex无关,PoolThread的其他API也不是线程安全的,如果某些线程初始化慢些,一些线程已经开始工作,就可能出现问题。
五、总结
- PoolThread基于Pool实现线程级别的缓存池, Pool的实现原理看suricata之Pool
- 每个线程使用独立的Pool。
- 从Pool分配的元素会填写PoolThreadId用于归还原来的Pool。
- PoolThread初始化以及扩容都是线程不安全的,需要加锁。
- 每个线程的配置都相同,都是从第一个线程配置复制的。
- PoolThreadReturn和PoolThreadLock,PoolThreadReturnRaw,PoolThreadUnlock这三个函数实现了相同的逻辑。