当前位置: 首页 > news >正文

suricata之PoolThread

一、结构体

struct PoolThreadElement_ {SCMutex lock;                   /**< lock, should have low contention */Pool *pool;                     /**< actual pool */
};typedef struct PoolThreadElement_ PoolThreadElement;typedef struct PoolThread_ {size_t size;                    /**< size of the array */PoolThreadElement *array;       /**< array of elements */
} PoolThread;

二、API

PoolThread *PoolThreadInit(int threads, uint32_t size, uint32_t prealloc_size, uint32_t elt_size,  void *(*Alloc)(void), int (*Init)(void *, void *), void *InitData,  void (*Cleanup)(void *), void (*Free)(void *));int PoolThreadExpand(PoolThread *pt);void PoolThreadFree(PoolThread *pt);void *PoolThreadGetById(PoolThread *pt, uint16_t id);void PoolThreadReturn(PoolThread *pt, void *data);void PoolThreadLock(PoolThread *pt, PoolThreadId id);
void PoolThreadReturnRaw(PoolThread *pt, PoolThreadId id, void *data);
void PoolThreadUnlock(PoolThread *pt, PoolThreadId id);int PoolThreadSize(PoolThread *pt);

三、实现

3.1 PoolThreadInit

创建指定个数的PoolThreadElement,并为每个PoolThreadElement创建单独的Pool。每个PoolThreadElement有自己单独的lock,这样减少了多线程之间的锁竞争。
在这里插入图片描述

PoolThread *PoolThreadInit(int threads, uint32_t size, uint32_t prealloc_size,uint32_t elt_size,  void *(*Alloc)(void), int (*Init)(void *, void *),void *InitData,  void (*Cleanup)(void *), void (*Free)(void *))
{sc_errno = SC_OK;if (threads <= 0) {SCLogDebug("error");sc_errno = SC_EINVAL;return NULL;}PoolThread *pt = SCCalloc(1, sizeof(*pt));if (unlikely(pt == NULL)) {SCLogDebug("memory alloc error");sc_errno = SC_ENOMEM;goto error;}SCLogDebug("size %d", threads);pt->array = SCMalloc(threads * sizeof(PoolThreadElement));if (pt->array == NULL) {SCLogDebug("memory alloc error");sc_errno = SC_ENOMEM;goto error;}pt->size = threads;for (int i = 0; i < threads; i++) {PoolThreadElement *e = &pt->array[i];SCMutexInit(&e->lock, NULL);SCMutexLock(&e->lock);
//        SCLogDebug("size %u prealloc_size %u elt_size %u Alloc %p Init %p InitData %p Cleanup %p Free %p",
//                size, prealloc_size, elt_size,
//                Alloc, Init, InitData, Cleanup, Free);e->pool = PoolInit(size, prealloc_size, elt_size, Alloc, Init, InitData, Cleanup, Free);SCMutexUnlock(&e->lock);if (e->pool == NULL) {SCLogDebug("error");goto error;}}return pt;
error:if (pt != NULL)PoolThreadFree(pt);return NULL;
}

3.2 PoolThreadExpand

当新创建一个线程时,如果此时线程数已经超过PoolThreadInit初始化传递的threads时,调用PoolThreadExpand增加一个PoolThreadElement, 并使用第一个线程的PoolThreadElement配置进行初始化新创建的节点,并创建Pool,成功则返回已有线程数,否则返回-1。
在这里插入图片描述

int PoolThreadExpand(PoolThread *pt)
{if (pt == NULL || pt->array == NULL || pt->size == 0) {SCLogError("pool grow failed");return -1;}size_t newsize = pt->size + 1;SCLogDebug("newsize %"PRIuMAX, (uintmax_t)newsize);void *ptmp = SCRealloc(pt->array, (newsize * sizeof(PoolThreadElement)));if (ptmp == NULL) {SCFree(pt->array);pt->array = NULL;SCLogError("pool grow failed");return -1;}pt->array = ptmp;pt->size = newsize;/* copy settings from first thread that registered the pool */Pool settings;memset(&settings, 0x0, sizeof(settings));PoolThreadElement *e = &pt->array[0];SCMutexLock(&e->lock);settings.max_buckets = e->pool->max_buckets;settings.preallocated = e->pool->preallocated;settings.elt_size = e->pool->elt_size;settings.Alloc = e->pool->Alloc;settings.Init = e->pool->Init;settings.InitData = e->pool->InitData;settings.Cleanup = e->pool->Cleanup;settings.Free = e->pool->Free;SCMutexUnlock(&e->lock);e = &pt->array[newsize - 1];memset(e, 0x00, sizeof(*e));SCMutexInit(&e->lock, NULL);SCMutexLock(&e->lock);e->pool = PoolInit(settings.max_buckets, settings.preallocated,settings.elt_size, settings.Alloc, settings.Init, settings.InitData,settings.Cleanup, settings.Free);SCMutexUnlock(&e->lock);if (e->pool == NULL) {SCLogError("pool grow failed");return -1;}return (int)(newsize - 1);
}

3.3 PoolThreadSize

返回当前PoolThread中有多少线程数。

int PoolThreadSize(PoolThread *pt)
{if (pt == NULL)return -1;return (int)pt->size;
}

3.4 PoolThreadFree

将PoolThread销毁,从实现看,函数是不可重入的,因此只能有一个线程能调用此函数。

void PoolThreadFree(PoolThread *pt)
{if (pt == NULL)return;if (pt->array != NULL) {for (int i = 0; i < (int)pt->size; i++) {PoolThreadElement *e = &pt->array[i];SCMutexLock(&e->lock);PoolFree(e->pool);SCMutexUnlock(&e->lock);SCMutexDestroy(&e->lock);}SCFree(pt->array);}SCFree(pt);
}

3.5 PoolThreadGetById

每个线程有一个唯一id,通过此id从各自的Pool中获取预分配元素。
id为PoolThreadElement数组下标, 并且将id写入元素的开头,用于归还给原来的Pool中。

void *PoolThreadGetById(PoolThread *pt, uint16_t id)
{void *data = NULL;if (pt == NULL || id >= pt->size)return NULL;PoolThreadElement *e = &pt->array[id];SCMutexLock(&e->lock);data = PoolGet(e->pool);SCMutexUnlock(&e->lock);if (data) {PoolThreadId *did = data;*did = id;}return data;
}

3.6 PoolThreadReturn

将元素归还给Pool。

void PoolThreadReturn(PoolThread *pt, void *data)
{PoolThreadId *id = data;if (pt == NULL || *id >= pt->size)return;SCLogDebug("returning to id %u", *id);PoolThreadElement *e = &pt->array[*id];SCMutexLock(&e->lock);PoolReturn(e->pool, data);SCMutexUnlock(&e->lock);
}

3.7 PoolThreadLock,PoolThreadReturnRaw,PoolThreadUnlock

这三个函数实现了PoolThreadReturn功能,只是将锁的作用域扩大,这样能在数据受保护的情况下作更多的事情。
PoolThreadLock/PoolThreadUnlock必须配对使用。

void PoolThreadLock(PoolThread *pt, PoolThreadId id)
{DEBUG_VALIDATE_BUG_ON(pt == NULL || id >= pt->size);PoolThreadElement *e = &pt->array[id];SCMutexLock(&e->lock);
}
void PoolThreadReturnRaw(PoolThread *pt, PoolThreadId id, void *data)
{DEBUG_VALIDATE_BUG_ON(pt == NULL || id >= pt->size);PoolThreadElement *e = &pt->array[id];PoolReturn(e->pool, data);
}
void PoolThreadUnlock(PoolThread *pt, PoolThreadId id)
{DEBUG_VALIDATE_BUG_ON(pt == NULL || id >= pt->size);PoolThreadElement *e = &pt->array[id];SCMutexUnlock(&e->lock);
}

四、应用

从实现看,PoolThreadInit以及PoolThreadExpand是线程不安全的,需要加锁。
从PoolThread的使用也能看到,使用了一个全局的ssn_pool_mutex锁进行控制。

TmEcode StreamTcpThreadInit(ThreadVars *tv, void *initdata, void **data)
{
...
SCMutexLock(&ssn_pool_mutex);if (ssn_pool == NULL) {ssn_pool = PoolThreadInit(1, /* thread */0, /* unlimited */stream_config.prealloc_sessions,sizeof(TcpSession),StreamTcpSessionPoolAlloc,StreamTcpSessionPoolInit, NULL,StreamTcpSessionPoolCleanup, NULL);stt->ssn_pool_id = 0;SCLogDebug("pool size %d, thread ssn_pool_id %d", PoolThreadSize(ssn_pool), stt->ssn_pool_id);} else {/* grow ssn_pool until we have a element for our thread id */stt->ssn_pool_id = PoolThreadExpand(ssn_pool);SCLogDebug("pool size %d, thread ssn_pool_id %d", PoolThreadSize(ssn_pool), stt->ssn_pool_id);}
SCMutexUnlock(&ssn_pool_mutex);if (stt->ssn_pool_id < 0 || ssn_pool == NULL) {SCLogError("failed to setup/expand stream session pool. Expand stream.memcap?");SCReturnInt(TM_ECODE_FAILED);}...

但是Pool内部的SCMutex和全局的ssn_pool_mutex无关,PoolThread的其他API也不是线程安全的,如果某些线程初始化慢些,一些线程已经开始工作,就可能出现问题。

五、总结

  • PoolThread基于Pool实现线程级别的缓存池, Pool的实现原理看suricata之Pool
  • 每个线程使用独立的Pool。
  • 从Pool分配的元素会填写PoolThreadId用于归还原来的Pool。
  • PoolThread初始化以及扩容都是线程不安全的,需要加锁。
  • 每个线程的配置都相同,都是从第一个线程配置复制的。
  • PoolThreadReturn和PoolThreadLock,PoolThreadReturnRaw,PoolThreadUnlock这三个函数实现了相同的逻辑。
http://www.dtcms.com/a/508881.html

相关文章:

  • 教育课程网站建设Iis wordpress无法发表文章
  • 怎样制作免费的网站邯郸手机网站建设费用
  • 自己做的腾讯充值网站科技小发明小制作
  • Android studio 修改包名
  • legacyForge插件(2)
  • 百度做个网站多少钱餐饮vi设计一套多少钱
  • 新网站百度收录要几天注册公司地址可以用家庭地址
  • YOLOv4 深度解析:单 GPU 可训的目标检测性能王者
  • 建设网站征集图片的通知织梦做网站也是模板吗
  • 免费做调查问卷的网站个人外贸网站制作
  • Kafka面试精讲 Day 25:Kafka与大数据生态集成
  • 中小学网站建设规范培训网站官网
  • 开鲁网站seo转接如何建设一个收费的影视图文网站
  • 自己做的网站打不开了h5网站架设
  • 天翼云OS2.0.1快速查看CPU架构
  • 5.类和对象(下)
  • 树莓派控制板载LED闪烁
  • 上海专业网站建设平台深圳罗湖高端网站建设
  • 怎么让程序更高效地连起来?
  • 网站 商城 app 建设银川网站建设nx110
  • 泉州建设网站公司网站建设新闻 常识
  • 从零搭建本地化 RAG 聊天助手:从环境配置到核心逻辑全解析
  • 福建建设局网站旅游网站建设代码
  • 云南城市建设职业学院成绩查询网站将网站做成logo怎么做
  • svn使用和idea集成
  • 汕头房产网站建设苏州怎么做网站
  • 基于springboot的知识管理系统开发与设计
  • ORM(Tortoise-ORM)操作
  • 深圳外包网站网站域名切换
  • 空间代码网站百度认证是什么