RGWRados::Object::Write::_do_write_meta()
1 函数详解
_do_write_meta是写入元数据的函数,非常重要。
int RGWRados::Object::Write::_do_write_meta(uint64_t size, uint64_t accounted_size,map<string, bufferlist>& attrs,bool assume_noent, bool modify_tail,void *_index_op)
{RGWRados::Bucket::UpdateIndex *index_op = static_cast<RGWRados::Bucket::UpdateIndex *>(_index_op);RGWRados *store = target->get_store();
这里的target是在RGWInitMultipart::execute()中初始化的数据结构,是创建的临时对象。
ObjectWriteOperation op;
#ifdef WITH_LTTNGconst struct req_state* s = get_req_state();string req_id;if (!s) {// fake req_idreq_id = store->unique_id(store->get_new_req_id());} else {req_id = s->req_id;}
#endifRGWObjState *state;int r = target->get_state(&state, false, assume_noent);if (r < 0)return r;
get_state获取临时meta对象的状态,有着如下的调用栈:
RGWRados::Object::get_state()
|–RGWRados::get_obj_state()
|\quad|–get_obj_state_impl()
其功能是获取rgw对象的rados对象,大小以及其他属性等。
rgw_obj& obj = target->get_obj();if (obj.get_oid().empty()) {ldout(store->ctx(), 0) << "ERROR: " << __func__ << "(): cannot write object with empty name" << dendl;return -EIO;}rgw_rados_ref ref;r = store->get_obj_head_ref(target->get_bucket_info(), obj, &ref);if (r < 0)return r;
这里target->get_obj()实际上就是获取要写入的对象(不同请求下代表的obj也不同,例如在RGWInitMultipart::execute()最终调用的_do_write_meta中这里获取的rgw_rados_ref是指生成的临时meta对象,其对象名称可以参考RGWInitMultipart::execute()。
bool is_olh = state->is_olh;bool reset_obj = (meta.flags & PUT_OBJ_CREATE) != 0;const string *ptag = meta.ptag;if (!ptag && !index_op->get_optag()->empty()) {ptag = index_op->get_optag();}r = target->prepare_atomic_modification(op, reset_obj, ptag, meta.if_match, meta.if_nomatch, false, modify_tail);if (r < 0)return r;
prepare_atomic_modification主要是查看obj是否需要原子操作,针对InitMultipart接口来说并未设置,即使使用相同的key去init multipart,也会返回不同的etag,不属于同一个上传了,所以不必设置原子操作。
if (real_clock::is_zero(meta.set_mtime)) {meta.set_mtime = real_clock::now();}if (state->is_olh) {op.setxattr(RGW_ATTR_OLH_ID_TAG, state->olh_tag);}
olh tag是与多版本相关的标志位,暂时不必关注。
struct timespec mtime_ts = real_clock::to_timespec(meta.set_mtime);op.mtime2(&mtime_ts);if (meta.data) {/* if we want to overwrite the data, we also want to overwrite thexattrs, so just remove the object */op.write_full(*meta.data);}
meta.data在InitMultipart中未设置,在RGWPutObjProcessor_Atomic::do_complete()有中设置。
string etag;string content_type;bufferlist acl_bl;map<string, bufferlist>::iterator iter;if (meta.rmattrs) {for (iter = meta.rmattrs->begin(); iter != meta.rmattrs->end(); ++iter) {const string& name = iter->first;op.rmxattr(name.c_str());}}if (meta.manifest) {/* remove existing manifest attr */iter = attrs.find(RGW_ATTR_MANIFEST);if (iter != attrs.end())attrs.erase(iter);bufferlist bl;encode(*meta.manifest, bl);op.setxattr(RGW_ATTR_MANIFEST, bl);}for (iter = attrs.begin(); iter != attrs.end(); ++iter) {const string& name = iter->first;bufferlist& bl = iter->second;if (!bl.length())continue;op.setxattr(name.c_str(), bl);if (name.compare(RGW_ATTR_ETAG) == 0) {etag = bl.to_str();} else if (name.compare(RGW_ATTR_CONTENT_TYPE) == 0) {content_type = bl.c_str();} else if (name.compare(RGW_ATTR_ACL) == 0) {acl_bl = bl;}}if (attrs.find(RGW_ATTR_PG_VER) == attrs.end()) {cls_rgw_obj_store_pg_ver(op, RGW_ATTR_PG_VER);}if (attrs.find(RGW_ATTR_SOURCE_ZONE) == attrs.end()) {bufferlist bl;encode(store->get_zone_short_id(), bl);op.setxattr(RGW_ATTR_SOURCE_ZONE, bl);}
以上均为设置或者删除一些属性,具体的功能读者可以自行探究,不过有点值得注意,那就是针对op的操作。可以发现op(也就是ObjectWriteOperation)有很多不同功能的功能函数,write_full、setxattr、mtime2等等,这些都是要最终落盘的信息,也就是要发送给OSD执行的操作。
稍微观察这些函数的调用栈就可以发现,并非调用一次函数就向OSD发送一次请求,这样集群整体的网络IO将相当之高;实际上则是通过bufferlist记录需要操作的数据以及针对数据需要执行的操作。关于RGW层Op的组织形式,可以参考RGW层Op的组织,里面介绍了数据结构如何encode到bufferlist以及RGW层的Op是如何在内存中存储。
if (!op.size())return 0;uint64_t epoch;int64_t poolid;bool orig_exists;uint64_t orig_size;if (!reset_obj) { //Multipart upload, it has immutable head. orig_exists = false;orig_size = 0;} else {orig_exists = state->exists;orig_size = state->accounted_size;}bool versioned_target = (meta.olh_epoch && *meta.olh_epoch > 0) ||!obj.key.instance.empty();bool versioned_op = (target->versioning_enabled() || is_olh || versioned_target);if (versioned_op) {index_op->set_bilog_flags(RGW_BILOG_FLAG_VERSIONED_OP);}if (!index_op->is_prepared()) {tracepoint(rgw_rados, prepare_enter, req_id.c_str());r = index_op->prepare(CLS_RGW_OP_ADD, &state->write_tag);tracepoint(rgw_rados, prepare_exit, req_id.c_str());if (r < 0)return r;}
这里index_op->prepare()是针对索引对象的准备工作,借着这个函数我们可以了解rgw发往osd的Op是如何发送,以及如何在osd层执行。因为所有发往osd的请求都采用同一套机制,读者可以在这里阅读文档RGW层Op的下发执行,详细看一下op的发送,执行过程,也可在这里跳过,在讲解分段上传的文档中查看。
而在这里prepare有两个任务,第一检测是否在进行reshard(cls_rgw_guard_bucket_resharding函数),第二个则与调用write_meta的函数有关,如果是在InitMultipart,即初始化分段上传的时候(RGWInitMultipart::execute())。在此时调用write_meta则会将pending_map写入名称为"_multipart_<object-key>.upload-id.meta"的对象(这个对象则是写入到某个bucket index上,在index pool中对bucket shard执行listomapkey可以找到)的omap中,并且以一个随机数作为key(request id或者随机生成的32位的字符串),rgw_bucket_pending_info作为value,以防止重传以及客户端重试导致的重复操作,用于rgw上传的并发控制,以后会在rgw的一致性中进行介绍。
tracepoint(rgw_rados, operate_enter, req_id.c_str());r = ref.ioctx.operate(ref.oid, &op);tracepoint(rgw_rados, operate_exit, req_id.c_str());if (r < 0) { /* we can expect to get -ECANCELED if object was replaced under,or -ENOENT if was removed, or -EEXIST if it did not existbefore and now it does */if (r == -EEXIST && assume_noent) {target->invalidate_state();return r;}goto done_cancel;}
这里则是将对象写入到对应的pool,对于RGWInitMultipart来说,是将创建的临时对象写入到non-ec pool中,如果没有这个pool则会写入data pool中,最终的对象名称是<bucket-shard-id>._multipart<key-name>.<upload-id>.meta。
epoch = ref.ioctx.get_last_version();poolid = ref.ioctx.get_id();r = target->complete_atomic_modification();if (r < 0) {ldout(store->ctx(), 0) << "ERROR: complete_atomic_modification returned r=" << r << dendl;}tracepoint(rgw_rados, complete_enter, req_id.c_str());r = index_op->complete(poolid, epoch, size, accounted_size,meta.set_mtime, etag, content_type, &acl_bl,meta.category, meta.remove_objs, meta.user_data);
这里则是将一些处理完成后的元数据更新到索引之中,也就是刚才写入到bucket shard中的meta对象。再向下找调用栈,可以发现调用了RGWRados::cls_obj_complete_op(),它使用了aio_operate(),也就是complete操作是异步下发的。可以参考文档【RGW】Ceph中的异步回调机制——AioCompletionImpl。
tracepoint(rgw_rados, complete_exit, req_id.c_str());if (r < 0)goto done_cancel;if (meta.mtime) {*meta.mtime = meta.set_mtime;}/* note that index_op was using state so we couldn't invalidate it earlier */target->invalidate_state();state = NULL;if (versioned_op && meta.olh_epoch) {r = store->set_olh(target->get_ctx(), target->get_bucket_info(), obj, false, NULL, *meta.olh_epoch, real_time(), false, meta.zones_trace);if (r < 0) {return r;}}if (!real_clock::is_zero(meta.delete_at)) {rgw_obj_index_key obj_key;obj.key.get_index_key(&obj_key);r = store->objexp_hint_add(meta.delete_at,obj.bucket.tenant, obj.bucket.name, obj.bucket.bucket_id, obj_key);if (r < 0) {ldout(store->ctx(), 0) << "ERROR: objexp_hint_add() returned r=" << r << ", object will not get removed" << dendl;/* ignoring error, nothing we can do at this point */}}meta.canceled = false;/* update quota cache */if (meta.completeMultipart){store->quota_handler->update_stats(meta.owner, obj.bucket, (orig_exists ? 0 : 1),0, orig_size);}else {store->quota_handler->update_stats(meta.owner, obj.bucket, (orig_exists ? 0 : 1),accounted_size, orig_size); }return 0;done_cancel:int ret = index_op->cancel();if (ret < 0) {ldout(store->ctx(), 0) << "ERROR: index_op.cancel()() returned ret=" << ret << dendl;}meta.canceled = true;
done_cancel则是处理失败的情况。
/* we lost in a race. There are a few options:* - existing object was rewritten (ECANCELED)* - non existing object was created (EEXIST)* - object was removed (ENOENT)* should treat it as a success*/if (meta.if_match == NULL && meta.if_nomatch == NULL) {if (r == -ECANCELED || r == -ENOENT || r == -EEXIST) {r = 0;}} else {if (meta.if_match != NULL) {// only overwrite existing objectif (strcmp(meta.if_match, "*") == 0) {if (r == -ENOENT) {r = -ERR_PRECONDITION_FAILED;} else if (r == -ECANCELED) {r = 0;}}}if (meta.if_nomatch != NULL) {// only create a new objectif (strcmp(meta.if_nomatch, "*") == 0) {if (r == -EEXIST) {r = -ERR_PRECONDITION_FAILED;} else if (r == -ENOENT) {r = 0;}}}}return r;
}