当前位置: 首页 > news >正文

RGW层Op的组织

1 引言

在Radosgw处理来自客户端请求的时候,最终需要将请求发往OSD进行落盘,通常情况下,一个客户端请求所对应的发往OSD的请求远不止1个,比如对象的写入要判断acl、写入元数据等等;如果每个RGW层的Op都对应发往OSD的一次IO,那么集群的网络IO将会非常高。因此在RGW层Op是以一个Op列表的形式下发到OSD,本文就来介绍这一机制。

2 一个例子ObjectWriteOperation

阅读RGW层的代码可以经常发现以ObjectWriteOperation类定义的op,并且会执行这个类的成员函数,以我们以RGWRados::Object::Write::_do_write_meta为例:

int RGWRados::Object::Write::_do_write_meta(uint64_t size, uint64_t accounted_size,map<string, bufferlist>& attrs,bool assume_noent, bool modify_tail,void *_index_op)
{RGWRados::Bucket::UpdateIndex *index_op = static_cast<RGWRados::Bucket::UpdateIndex *>(_index_op);RGWRados *store = target->get_store();ObjectWriteOperation op;...if (meta.manifest) {/* remove existing manifest attr */iter = attrs.find(RGW_ATTR_MANIFEST);if (iter != attrs.end())attrs.erase(iter);bufferlist bl;encode(*meta.manifest, bl);op.setxattr(RGW_ATTR_MANIFEST, bl);}...

在op被初始化之后,第一个可能执行的函数是setxattr,也就是设置xattr元数据,来看这个函数的调用栈:
ObjectWriteOperation::setxattr()
|–::ObjectOperation::setxattr()
|\quad|–add_xattr()
来看add_xattr:

void add_xattr(int op, const char *name, const bufferlist& data) {OSDOp& osd_op = add_op(op);osd_op.op.xattr.name_len = (name ? strlen(name) : 0);osd_op.op.xattr.value_len = data.length();if (name)osd_op.indata.append(name, osd_op.op.xattr.name_len);osd_op.indata.append(data);}

可以发现最终的操作很简单,首先执行add_op():

OSDOp& add_op(int op) {int s = ops.size();ops.resize(s+1);ops[s].op.op = op;out_bl.resize(s+1);out_bl[s] = NULL;out_handler.resize(s+1);out_handler[s] = NULL;out_rval.resize(s+1);out_rval[s] = NULL;return ops[s];}

有一个名为ops的vector<OSDOp>,用于存储所有要发往OSD的操作,当有一个新Op到来的时候就resize这个vector并将这个Op的操作数添加到vector末尾的OSDOp中,例如add_xattr要添加的操作数就是CEPH_OSD_OP_SETXATTR。操作数都是enum类型,可以根据宏定义__CEPH_FORALL_OSD_OPS找到对应的操作数。
可以发现,针对OSD的操作都是存储在列表中,等待一次性发送下去。ObjectWriteOperation的很多成员函数都是相似的处理过程,例如write_full、rm_xattr等,只是不同的成员函数处理的OSDOp的成员不同罢了。

3 bufferlist的encode

3.1 encode的函数定义

还有一个值得注意的点是所有ObjectWriteOperation的成员函数处理的都是bufferlist数据结构,在之前处理manifest的代码中,manifest也是转化为bufferlist再交给op进行处理的。

if (meta.manifest) {/* remove existing manifest attr */iter = attrs.find(RGW_ATTR_MANIFEST);if (iter != attrs.end())attrs.erase(iter);bufferlist bl;encode(*meta.manifest, bl);op.setxattr(RGW_ATTR_MANIFEST, bl);}

接下来就来详细看一下manifest是如何转化到bufferlist。
首先来看encode(*meta.manifest, bl);这行是如何实现的:

#define WRITE_CLASS_ENCODER(cl)						\inline void encode(const cl &c, ::ceph::bufferlist &bl, uint64_t features=0) { \ENCODE_DUMP_PRE(); c.encode(bl); ENCODE_DUMP_POST(cl); }		\inline void decode(cl &c, ::ceph::bufferlist::iterator &p) { c.decode(p); }

这个宏定义在编译的时候就会展开为:

inline void encode(const RGWObjManifest &c, ::ceph::bufferlist &bl, uint64_t features=0) { c.encode(bl); 
}

实际上这是为了encode有一个统一入口,实际上调用的还是RGWObjManifest自身的encode函数。因为bufferlist实际上是ceph特有的内存管理方式,不同的类将其转化为bufferlist可识别的形式显然必须要自定义自己的encode函数。掌握了一个类的encode方式其他的类也可以举一反三。
来看RGWObjManifest::encode():

void encode(bufferlist& bl) const {ENCODE_START(7, 6, bl);encode(obj_size, bl);...encode(head_placement_rule, bl);encode(tail_placement.placement_rule, bl);ENCODE_FINISH(bl);}

比较重要的是ENCODE_START和ENCODE_FINISH这两个宏定义:

/*** start encoding block** @param v current (code) version of the encoding* @param compat oldest code version that can decode it* @param bl bufferlist to encode to*/
#define ENCODE_START(v, compat, bl)			     \using ::ceph::encode;					     \__u8 struct_v = v, struct_compat = compat;		     \// 写入当前版本号(1字节)encode(struct_v, (bl));				     \// 写入兼容版本号(1字节)encode(struct_compat, (bl));			     \// 指向刚写入的 compat 字节位置::ceph::buffer::list::iterator struct_compat_it = (bl).end();	\struct_compat_it.advance(-1);				     \ceph_le32 struct_len;				             \struct_len = 0;                                            \encode(struct_len, (bl));				     \::ceph::buffer::list::iterator struct_len_it = (bl).end(); \// 指向刚写入的长度字段位置struct_len_it.advance(-4);				     \do {/*** finish encoding block** @param bl bufferlist we were encoding to* @param new_struct_compat struct-compat value to use*/
#define ENCODE_FINISH_NEW_COMPAT(bl, new_struct_compat)			\} while (false);							\// 当前 bufferlist 总长度,get_off()长度字段的位置,struct_len长度字段本身占用的4字节// 这句代码实际上是计算实际编码的数据长度,不包括长度字段本身struct_len = (bl).length() - struct_len_it.get_off() - sizeof(struct_len); \// 回填真实长度struct_len_it.copy_in(4, (char *)&struct_len);			\if (new_struct_compat) {						\struct_compat = new_struct_compat;					\struct_compat_it.copy_in(1, (char *)&struct_compat);		\}#define ENCODE_FINISH(bl) ENCODE_FINISH_NEW_COMPAT(bl, 0)

很明显中间的do-while是留给其他类型的encode函数的,自定义的encode都会插入到ENCODE_START和ENCODE_FINISH_NEW_COMPAT中间的do-while中。
ENCODE_START的三个参数v、compact和bl,bl自不必说,就是数据结构需要encode的bufferlist。v和compact则都服务于bufferlist的版本。在写代码的过程中免不了向数据结构中添加一些成员,这样就导致新旧版本存储数据不一致,当新添加一个成员的时候,就要给v加1,compact则表示能兼容的最老版本,鉴于此,在我们添加新的成员时最好在encode函数中的末尾添加这个成员的encode函数,这样其之前的encode函数均能正常解析,也不必给compact加1了。

3.2 bufferlist具体encode过程

实际上,ceph复杂数据结构的encode过程就是不断的迭代,直到encode到基本类型,哪怕是class嵌套着class,只要不断的调用成员函数的encode函数,最终就能将整个复杂数据结构编码为bufferlist形式,下面将从一个基本类型的encode例子进行讲解。
我们用encode(obj_size, bl);这条语句来详细讲解encode的过程,obj_size是uint64_t类型,该类型的encode函数为:

#define WRITE_INTTYPE_ENCODER(type, etype)				\inline void encode(type v, ::ceph::bufferlist& bl, uint64_t features=0) { \ceph_##etype e;					                \e = v;                                                              \::ceph::encode_raw(e, bl);						\}									\inline void decode(type &v, ::ceph::bufferlist::iterator& p) {	\ceph_##etype e;							\::ceph::decode_raw(e, p);						\v = e;								\}WRITE_INTTYPE_ENCODER(uint64_t, le64)
WRITE_INTTYPE_ENCODER(int64_t, le64)
WRITE_INTTYPE_ENCODER(uint32_t, le32)
WRITE_INTTYPE_ENCODER(int32_t, le32)
WRITE_INTTYPE_ENCODER(uint16_t, le16)
WRITE_INTTYPE_ENCODER(int16_t, le16)

那么实际处理的函数就是::ceph::encode_raw(),来看encode_raw函数,非常简单:

template<class T>
inline void encode_raw(const T& t, bufferlist& bl)
{bl.append((char*)&t, sizeof(t));
}

无论const T是什么类型,都会传该类型的引用t,最终取地址转化为char*,实际上就是将地址转换为字符串。encode_raw调用了buffer::list::append(const char *data, unsigned len)函数:

void buffer::list::append(const char *data, unsigned len){_len += len;const unsigned free_in_last = get_append_buffer_unused_tail_length();const unsigned first_round = std::min(len, free_in_last);if (first_round) {// _buffers and carriage can desynchronize when 1) a new ptr// we don't own has been added into the _buffers 2) _buffers// has been emptied as as a result of std::move or stolen by// claim_append.if (unlikely(_carriage != &_buffers.back())) {auto bptr = ptr_node::create(*_carriage, _carriage->length(), 0);_carriage = bptr.get();_buffers.push_back(*bptr.release());_num += 1;}_carriage->append(data, first_round);}const unsigned second_round = len - first_round;if (second_round) {auto& new_back = refill_append_space(second_round);new_back.append(data + first_round, second_round);}}

其中_carriage指向bufferlist链表中的最后一个ptr_node,可以理解为指向最后一块内存空间,因为bufferlist实际上是buffer::raw的链表,而buffer::raw是管理内存的数据结构,所以bufferlist的append函数实际上就是将当前加入的数据存储到最后一块内存中并加入到链表末尾。那么_carriage的作用也不难理解了,因为经常需要操作链表末尾的指针,如果从链表头开始找那么时间复杂度就是O(n)O(n)O(n),直接用_carriage可以大大加快append的速度。
append函数中大部分代码都是处理末尾内存空间不足的情况,实际上是buffer::ptr::append(_carriage的父类buffer::ptr)函数执行最终的加入链表逻辑:

_carriage->append(data, first_round);unsigned buffer::ptr::append(const char *p, unsigned l){ceph_assert(_raw);ceph_assert(l <= unused_tail_length());char* c = _raw->get_data() + _off + _len;maybe_inline_memcpy(c, p, l, 32);_len += l;return _len + _off;}

首先获取raw内存的实际地址c,再将p(也就是之前获取到的需要encode的数据的地址)拷贝过去。
针对bufferlist如何管理内存,将在其他篇章中介绍。

http://www.dtcms.com/a/355688.html

相关文章:

  • 并发编程——05 并发锁机制之深入理解synchronized
  • 优雅地实现ChatGPT式的打字机效果:Spring Boot 流式响应
  • Jtekt深沟球轴承外圈防跑圈开发
  • Python Imaging Library (PIL) 全面指南:PIL基础入门-图像颜色模式转换与应用
  • [网鼎杯 2018]Fakebook
  • 基础IO详解
  • 【前端教程】JavaScript 基础总结
  • 教育类《河北教育》杂志简介
  • Day03_苍穹外卖——公共字段自动填充菜品相关功能
  • 河南萌新联赛2025第(七)场:郑州轻工业大学
  • 【数据结构与算法】(LeetCode)141.环形链表 142.环形链表Ⅱ
  • 数据分析学习笔记4:加州房价预测
  • 国产的服务器
  • 如何监控PCIe 5.0 SSD的运行温度?多软件推荐
  • 中国剩余定理(以及扩展..)
  • 用 Docker 玩转 Kafka 4.0镜像选型、快速起步、配置持久化与常见坑
  • 影楼精修-锁骨增强算法
  • 深入理解 PHP 中的 `pcntl_fork()`:父进程与子进程的执行路径
  • SRE网易一面面经
  • Linux笔记12——shell编程基础-6
  • 少样本图异常检测系列【A Survey of Few-Shot Graph Anomaly Detection】
  • Python实战:银行ATM系统开发全解析
  • RuoYi-VuePlus:前端指定接口不显示错误提示
  • 面试tips--JVM(2)--对象创建的过程
  • ERNIE-4.5-VL:技术解密+应用实战,解锁多模态新场景!
  • 8.29 贪心|摩尔投票
  • 【不说废话】pytorch中.to(device)函数详解
  • 基于K8s部署服务:dev、uat、prod环境的核心差异解析
  • 工业级TF卡NAND+北京君正+Rk瑞芯微的应用
  • openEuler Embedded 的 Yocto入门 : 5.基本变量与基本任务详解