当前位置: 首页 > news >正文

RGWRados::Object::Write::_do_write_meta()

1 函数详解

_do_write_meta是写入元数据的函数,非常重要。

int RGWRados::Object::Write::_do_write_meta(uint64_t size, uint64_t accounted_size,map<string, bufferlist>& attrs,bool assume_noent, bool modify_tail,void *_index_op)
{RGWRados::Bucket::UpdateIndex *index_op = static_cast<RGWRados::Bucket::UpdateIndex *>(_index_op);RGWRados *store = target->get_store();

这里的target是在RGWInitMultipart::execute()中初始化的数据结构,是创建的临时对象。

  ObjectWriteOperation op;
#ifdef WITH_LTTNGconst struct req_state* s =  get_req_state();string req_id;if (!s) {// fake req_idreq_id = store->unique_id(store->get_new_req_id());} else {req_id = s->req_id;}
#endifRGWObjState *state;int r = target->get_state(&state, false, assume_noent);if (r < 0)return r;

get_state获取临时meta对象的状态,有着如下的调用栈:
RGWRados::Object::get_state()
|–RGWRados::get_obj_state()
|\quad|–get_obj_state_impl()
其功能是获取rgw对象的rados对象,大小以及其他属性等。

  rgw_obj& obj = target->get_obj();if (obj.get_oid().empty()) {ldout(store->ctx(), 0) << "ERROR: " << __func__ << "(): cannot write object with empty name" << dendl;return -EIO;}rgw_rados_ref ref;r = store->get_obj_head_ref(target->get_bucket_info(), obj, &ref);if (r < 0)return r;

这里target->get_obj()实际上就是获取要写入的对象(不同请求下代表的obj也不同,例如在RGWInitMultipart::execute()最终调用的_do_write_meta中这里获取的rgw_rados_ref是指生成的临时meta对象,其对象名称可以参考RGWInitMultipart::execute()。

bool is_olh = state->is_olh;bool reset_obj = (meta.flags & PUT_OBJ_CREATE) != 0;const string *ptag = meta.ptag;if (!ptag && !index_op->get_optag()->empty()) {ptag = index_op->get_optag();}r = target->prepare_atomic_modification(op, reset_obj, ptag, meta.if_match, meta.if_nomatch, false, modify_tail);if (r < 0)return r;

prepare_atomic_modification主要是查看obj是否需要原子操作,针对InitMultipart接口来说并未设置,即使使用相同的key去init multipart,也会返回不同的etag,不属于同一个上传了,所以不必设置原子操作。

if (real_clock::is_zero(meta.set_mtime)) {meta.set_mtime = real_clock::now();}if (state->is_olh) {op.setxattr(RGW_ATTR_OLH_ID_TAG, state->olh_tag);}

olh tag是与多版本相关的标志位,暂时不必关注。

struct timespec mtime_ts = real_clock::to_timespec(meta.set_mtime);op.mtime2(&mtime_ts);if (meta.data) {/* if we want to overwrite the data, we also want to overwrite thexattrs, so just remove the object */op.write_full(*meta.data);}

meta.data在InitMultipart中未设置,在RGWPutObjProcessor_Atomic::do_complete()有中设置。

string etag;string content_type;bufferlist acl_bl;map<string, bufferlist>::iterator iter;if (meta.rmattrs) {for (iter = meta.rmattrs->begin(); iter != meta.rmattrs->end(); ++iter) {const string& name = iter->first;op.rmxattr(name.c_str());}}if (meta.manifest) {/* remove existing manifest attr */iter = attrs.find(RGW_ATTR_MANIFEST);if (iter != attrs.end())attrs.erase(iter);bufferlist bl;encode(*meta.manifest, bl);op.setxattr(RGW_ATTR_MANIFEST, bl);}for (iter = attrs.begin(); iter != attrs.end(); ++iter) {const string& name = iter->first;bufferlist& bl = iter->second;if (!bl.length())continue;op.setxattr(name.c_str(), bl);if (name.compare(RGW_ATTR_ETAG) == 0) {etag = bl.to_str();} else if (name.compare(RGW_ATTR_CONTENT_TYPE) == 0) {content_type = bl.c_str();} else if (name.compare(RGW_ATTR_ACL) == 0) {acl_bl = bl;}}if (attrs.find(RGW_ATTR_PG_VER) == attrs.end()) {cls_rgw_obj_store_pg_ver(op, RGW_ATTR_PG_VER);}if (attrs.find(RGW_ATTR_SOURCE_ZONE) == attrs.end()) {bufferlist bl;encode(store->get_zone_short_id(), bl);op.setxattr(RGW_ATTR_SOURCE_ZONE, bl);}

以上均为设置或者删除一些属性,具体的功能读者可以自行探究,不过有点值得注意,那就是针对op的操作。可以发现op(也就是ObjectWriteOperation)有很多不同功能的功能函数,write_full、setxattr、mtime2等等,这些都是要最终落盘的信息,也就是要发送给OSD执行的操作。
稍微观察这些函数的调用栈就可以发现,并非调用一次函数就向OSD发送一次请求,这样集群整体的网络IO将相当之高;实际上则是通过bufferlist记录需要操作的数据以及针对数据需要执行的操作。关于RGW层Op的组织形式,可以参考RGW层Op的组织,里面介绍了数据结构如何encode到bufferlist以及RGW层的Op是如何在内存中存储。

if (!op.size())return 0;uint64_t epoch;int64_t poolid;bool orig_exists;uint64_t orig_size;if (!reset_obj) {    //Multipart upload, it has immutable head. orig_exists = false;orig_size = 0;} else {orig_exists = state->exists;orig_size = state->accounted_size;}bool versioned_target = (meta.olh_epoch && *meta.olh_epoch > 0) ||!obj.key.instance.empty();bool versioned_op = (target->versioning_enabled() || is_olh || versioned_target);if (versioned_op) {index_op->set_bilog_flags(RGW_BILOG_FLAG_VERSIONED_OP);}if (!index_op->is_prepared()) {tracepoint(rgw_rados, prepare_enter, req_id.c_str());r = index_op->prepare(CLS_RGW_OP_ADD, &state->write_tag);tracepoint(rgw_rados, prepare_exit, req_id.c_str());if (r < 0)return r;}

这里index_op->prepare()是针对索引对象的准备工作,借着这个函数我们可以了解rgw发往osd的Op是如何发送,以及如何在osd层执行。因为所有发往osd的请求都采用同一套机制,读者可以在这里阅读文档RGW层Op的下发执行,详细看一下op的发送,执行过程,也可在这里跳过,在讲解分段上传的文档中查看。
而在这里prepare有两个任务,第一检测是否在进行reshard(cls_rgw_guard_bucket_resharding函数),第二个则与调用write_meta的函数有关,如果是在InitMultipart,即初始化分段上传的时候(RGWInitMultipart::execute())。在此时调用write_meta则会将pending_map写入名称为"_multipart_<object-key>.upload-id.meta"的对象(这个对象则是写入到某个bucket index上,在index pool中对bucket shard执行listomapkey可以找到)的omap中,并且以一个随机数作为key(request id或者随机生成的32位的字符串),rgw_bucket_pending_info作为value,以防止重传以及客户端重试导致的重复操作,用于rgw上传的并发控制,以后会在rgw的一致性中进行介绍。

tracepoint(rgw_rados, operate_enter, req_id.c_str());r = ref.ioctx.operate(ref.oid, &op);tracepoint(rgw_rados, operate_exit, req_id.c_str());if (r < 0) { /* we can expect to get -ECANCELED if object was replaced under,or -ENOENT if was removed, or -EEXIST if it did not existbefore and now it does */if (r == -EEXIST && assume_noent) {target->invalidate_state();return r;}goto done_cancel;}

这里则是将对象写入到对应的pool,对于RGWInitMultipart来说,是将创建的临时对象写入到non-ec pool中,如果没有这个pool则会写入data pool中,最终的对象名称是<bucket-shard-id>._multipart<key-name>.<upload-id>.meta。

epoch = ref.ioctx.get_last_version();poolid = ref.ioctx.get_id();r = target->complete_atomic_modification();if (r < 0) {ldout(store->ctx(), 0) << "ERROR: complete_atomic_modification returned r=" << r << dendl;}tracepoint(rgw_rados, complete_enter, req_id.c_str());r = index_op->complete(poolid, epoch, size, accounted_size,meta.set_mtime, etag, content_type, &acl_bl,meta.category, meta.remove_objs, meta.user_data);

这里则是将一些处理完成后的元数据更新到索引之中,也就是刚才写入到bucket shard中的meta对象。再向下找调用栈,可以发现调用了RGWRados::cls_obj_complete_op(),它使用了aio_operate(),也就是complete操作是异步下发的。可以参考文档【RGW】Ceph中的异步回调机制——AioCompletionImpl。

  tracepoint(rgw_rados, complete_exit, req_id.c_str());if (r < 0)goto done_cancel;if (meta.mtime) {*meta.mtime = meta.set_mtime;}/* note that index_op was using state so we couldn't invalidate it earlier */target->invalidate_state();state = NULL;if (versioned_op && meta.olh_epoch) {r = store->set_olh(target->get_ctx(), target->get_bucket_info(), obj, false, NULL, *meta.olh_epoch, real_time(), false, meta.zones_trace);if (r < 0) {return r;}}if (!real_clock::is_zero(meta.delete_at)) {rgw_obj_index_key obj_key;obj.key.get_index_key(&obj_key);r = store->objexp_hint_add(meta.delete_at,obj.bucket.tenant, obj.bucket.name, obj.bucket.bucket_id, obj_key);if (r < 0) {ldout(store->ctx(), 0) << "ERROR: objexp_hint_add() returned r=" << r << ", object will not get removed" << dendl;/* ignoring error, nothing we can do at this point */}}meta.canceled = false;/* update quota cache */if (meta.completeMultipart){store->quota_handler->update_stats(meta.owner, obj.bucket, (orig_exists ? 0 : 1),0, orig_size);}else {store->quota_handler->update_stats(meta.owner, obj.bucket, (orig_exists ? 0 : 1),accounted_size, orig_size);  }return 0;done_cancel:int ret = index_op->cancel();if (ret < 0) {ldout(store->ctx(), 0) << "ERROR: index_op.cancel()() returned ret=" << ret << dendl;}meta.canceled = true;

done_cancel则是处理失败的情况。

/* we lost in a race. There are a few options:* - existing object was rewritten (ECANCELED)* - non existing object was created (EEXIST)* - object was removed (ENOENT)* should treat it as a success*/if (meta.if_match == NULL && meta.if_nomatch == NULL) {if (r == -ECANCELED || r == -ENOENT || r == -EEXIST) {r = 0;}} else {if (meta.if_match != NULL) {// only overwrite existing objectif (strcmp(meta.if_match, "*") == 0) {if (r == -ENOENT) {r = -ERR_PRECONDITION_FAILED;} else if (r == -ECANCELED) {r = 0;}}}if (meta.if_nomatch != NULL) {// only create a new objectif (strcmp(meta.if_nomatch, "*") == 0) {if (r == -EEXIST) {r = -ERR_PRECONDITION_FAILED;} else if (r == -ENOENT) {r = 0;}}}}return r;
}
http://www.dtcms.com/a/390025.html

相关文章:

  • Shopify 集合页改造:增加 Banner 图片 + 点击加载更多功能
  • 泛函 Φ(u) = ∫[(u″)² + u² + 2f(x)u]dx − (u′(0))² 在 u(0)=u(1) 下的驻点方程与边界条件
  • JAVA高频面试题汇总:Java+ 并发 +Spring+MySQL+ 分布式 +Redis+ 算法 +JVM 等
  • 构建与运营“爬虫 IP 代理池”的方法论
  • 【文献笔记】Point Transformer
  • Linux | i.MX6ULL Modbus 移植和使用(第二十一章)
  • 几种微前端框架的沙箱策略
  • 黑盒测试:测试用例设计之边界值设计方法(边界值分析法)(上点、离点、内点)健壮性测试、单缺陷假设理论
  • 【题解】P1548 [NOIP 1997 普及组] 棋盘问题
  • scala中for推导式详细讲解
  • React学习 ---- 基础知识学习
  • C语言实现MATLAB中的Fir1带通滤波器
  • 微信小程序开发教程(十七)
  • 9月18日星期四今日早报简报微语报早读
  • SqlSugar 问题记录
  • 记一次宝塔+nginx+php8+thinkphp8多应用下某个应用报错404的问题 - nginx、php日志全无 - 无法追踪
  • Windows Server远程桌面(RDP)安全优化
  • 工具链过于分散会导致哪些问题
  • 【RAG】Youtu-GraphRAG
  • 惠普LaserJet Pro M203dn黑白激光打印机双面卡纸维修一例
  • 专题二 二叉树中的深度优先搜索
  • Git 多人协作(1)
  • 设计模式第三章(迭代器模式)
  • 网络原理(4):HTTP协议 -- HTTP请求 -- 首行(请求方法)
  • 密钥下发服务中心:双重验证 + 实时监控的轻量级密钥管理解决方案
  • 硬件 - RK3588部分(4) - 原理图 - RK806
  • Sass开发【三】
  • 百度之星2025(第二场)
  • Ovis-U1:阿里巴巴推出的统一的多模态理解与生成模型
  • 深入剖析C++智能指针:unique_ptr与shared_ptr的资源管理哲学