当前位置: 首页 > news >正文

源码分析unexpected EOF on client connection with an open transaction

最近在学源码,看到什么问题都想从源码角度来看问题。
在高并发测试的时候发生了以下报错场景

错误: 关系 "base/3629476/3679049" 中的块 9114 存在无效的页

最让人头疼的是 这居然是中文报错,中文报错文本也可以找到其对应的翻译前的英文文本。

外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传

#: catalog/storage.c:344 storage/buffer/bufmgr.c:922
#, c-format
msgid "invalid page in block %u of relation %s"
msgstr "关系 \"%2$s\" 中的块 %1$u 存在无效的页"

从上面可以看出来抛错点在storage.c、bufmgr.c中,看这两个存在的目录 大概率是和存储、缓存相关

看看storage.c 中代码RelationCopyStorage函数中

void
RelationCopyStorage(SMgrRelation src, SMgrRelation dst,ForkNumber forkNum, char relpersistence)
{bool		use_wal;bool		copying_initfork;BlockNumber nblocks;BlockNumber blkno;BulkWriteState *bulkstate;/** The init fork for an unlogged relation in many respects has to be* treated the same as normal relation, changes need to be WAL logged and* it needs to be synced to disk.*/copying_initfork = relpersistence == RELPERSISTENCE_UNLOGGED &&forkNum == INIT_FORKNUM;/** We need to log the copied data in WAL iff WAL archiving/streaming is* enabled AND it's a permanent relation.  This gives the same answer as* "RelationNeedsWAL(rel) || copying_initfork", because we know the* current operation created new relation storage.*/use_wal = XLogIsNeeded() &&(relpersistence == RELPERSISTENCE_PERMANENT || copying_initfork);bulkstate = smgr_bulk_start_smgr(dst, forkNum, use_wal);nblocks = smgrnblocks(src, forkNum);for (blkno = 0; blkno < nblocks; blkno++){BulkWriteBuffer buf;/* If we got a cancel signal during the copy of the data, quit */CHECK_FOR_INTERRUPTS();buf = smgr_bulk_get_buf(bulkstate);smgrread(src, forkNum, blkno, (Page) buf);if (!PageIsVerifiedExtended((Page) buf, blkno,PIV_LOG_WARNING | PIV_REPORT_STAT)){/** For paranoia's sake, capture the file path before invoking the* ereport machinery.  This guards against the possibility of a* relcache flush caused by, e.g., an errcontext callback.* (errcontext callbacks shouldn't be risking any such thing, but* people have been known to forget that rule.)*/char	   *relpath = relpathbackend(src->smgr_rlocator.locator,src->smgr_rlocator.backend,forkNum);ereport(ERROR,(errcode(ERRCODE_DATA_CORRUPTED),errmsg("invalid page in block %u of relation %s",blkno, relpath)));}/** Queue the page for WAL-logging and writing out.  Unfortunately we* don't know what kind of a page this is, so we have to log the full* page including any unused space.*/smgr_bulk_write(bulkstate, blkno, buf, false);}smgr_bulk_finish(bulkstate);
}

这段代码逻辑主要是数据在进行缓冲置换的时候 去遍历数据块进行效验,效验成功之后将其写入缓冲中。

//通过一下逻辑进行定义,页效验
if (!PageIsVerifiedExtended((Page) buf, blkno,PIV_LOG_WARNING | PIV_REPORT_STAT))
// 页效验通过之后,执行以下写入动作
smgr_bulk_write(bulkstate, blkno, buf, false);
//再执行以下flush动作smgr_bulk_finish(bulkstate);

当PageIsVerifiedExtended返回为false 的时候 就会抛出错
看看效验是如何定义的
PageIsVerifiedExtended函数定义如下

bool PageIsVerified	( 
PageData * 	page,
BlockNumber 	blkno,
int 	flags,
bool * 	checksum_failure_p 
)		
{const PageHeaderData *p = (const PageHeaderData *) page;size_t     *pagebytes;bool        checksum_failure = false;bool        header_sane = false;uint16      checksum = 0;if (checksum_failure_p)*checksum_failure_p = false;/** Don't verify page data unless the page passes basic non-zero test*/if (!PageIsNew(page)){if (DataChecksumsEnabled()){checksum = pg_checksum_page(page, blkno);if (checksum != p->pd_checksum){checksum_failure = true;if (checksum_failure_p)*checksum_failure_p = true;}}/** The following checks don't prove the header is correct, only that* it looks sane enough to allow into the buffer pool. Later usage of* the block can still reveal problems, which is why we offer the* checksum option.*/if ((p->pd_flags & ~PD_VALID_FLAG_BITS) == 0 &&p->pd_lower <= p->pd_upper &&p->pd_upper <= p->pd_special &&p->pd_special <= BLCKSZ &&p->pd_special == MAXALIGN(p->pd_special))header_sane = true;if (header_sane && !checksum_failure)return true;}/* Check all-zeroes case */pagebytes = (size_t *) page;if (pg_memory_is_all_zeros(pagebytes, BLCKSZ))return true;/** Throw a WARNING/LOG, as instructed by PIV_LOG_*, if the checksum fails,* but only after we've checked for the all-zeroes case.*/if (checksum_failure){if ((flags & (PIV_LOG_WARNING | PIV_LOG_LOG)) != 0)ereport(flags & PIV_LOG_WARNING ? WARNING : LOG,(errcode(ERRCODE_DATA_CORRUPTED),errmsg("page verification failed, calculated checksum %u but expected %u",checksum, p->pd_checksum)));if (header_sane && (flags & PIV_IGNORE_CHECKSUM_FAILURE))return true;}return false;
}

根据以上定义,需要满足以下几个条件 才会返回false

 PageIsNew(page) 为false   也就是页是一个非新页面,该页有数据存在路径一:页面不是新的(PageIsNew 返回 false),页面头不合理(header_sane 为 false),且页面不全零(pg_memory_is_all_zeros 返回 false)。
header_sane 和checksum_failure 皆是false   
为
路径二:页面不是新的(PageIsNew 返回 false),页面头合理(header_sane 为 true),但校验和失败(checksum_failure 为 true)且忽略标志未设置((flags & PIV_IGNORE_CHECKSUM_FAILURE) 为 false),且页面不全零(pg_memory_is_all_zeros 返回 false)。路径三:页面是新的(PageIsNew 返回 true),但页面不全零(pg_memory_is_all_zeros 返回 false)。上面页检验返回false 之后  ,在storage.c 中代码RelationCopyStorage函数中便会走进报错抛错点。
if (!PageIsVerifiedExtended((Page) buf, blkno,PIV_LOG_WARNING | PIV_REPORT_STAT))

外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传

bufmgr.c的源码在WaitReadBuffers函数中在缓冲写入的过程中,也会进行页效验

void
WaitReadBuffers(ReadBuffersOperation *operation)
{Buffer	   *buffers;int			nblocks;BlockNumber blocknum;ForkNumber	forknum;IOContext	io_context;IOObject	io_object;char		persistence;/** Currently operations are only allowed to include a read of some range,* with an optional extra buffer that is already pinned at the end.  So* nblocks can be at most one more than io_buffers_len.*/Assert((operation->nblocks == operation->io_buffers_len) ||(operation->nblocks == operation->io_buffers_len + 1));/* Find the range of the physical read we need to perform. */nblocks = operation->io_buffers_len;if (nblocks == 0)return;					/* nothing to do */buffers = &operation->buffers[0];blocknum = operation->blocknum;forknum = operation->forknum;persistence = operation->persistence;if (persistence == RELPERSISTENCE_TEMP){io_context = IOCONTEXT_NORMAL;io_object = IOOBJECT_TEMP_RELATION;}else{io_context = IOContextForStrategy(operation->strategy);io_object = IOOBJECT_RELATION;}/** We count all these blocks as read by this backend.  This is traditional* behavior, but might turn out to be not true if we find that someone* else has beaten us and completed the read of some of these blocks.  In* that case the system globally double-counts, but we traditionally don't* count this as a "hit", and we don't have a separate counter for "miss,* but another backend completed the read".*/if (persistence == RELPERSISTENCE_TEMP)pgBufferUsage.local_blks_read += nblocks;elsepgBufferUsage.shared_blks_read += nblocks;for (int i = 0; i < nblocks; ++i){int			io_buffers_len;Buffer		io_buffers[MAX_IO_COMBINE_LIMIT];void	   *io_pages[MAX_IO_COMBINE_LIMIT];instr_time	io_start;BlockNumber io_first_block;/** Skip this block if someone else has already completed it.  If an* I/O is already in progress in another backend, this will wait for* the outcome: either done, or something went wrong and we will* retry.*/if (!WaitReadBuffersCanStartIO(buffers[i], false)){/** Report this as a 'hit' for this backend, even though it must* have started out as a miss in PinBufferForBlock().*/TRACE_POSTGRESQL_BUFFER_READ_DONE(forknum, blocknum + i,operation->smgr->smgr_rlocator.locator.spcOid,operation->smgr->smgr_rlocator.locator.dbOid,operation->smgr->smgr_rlocator.locator.relNumber,operation->smgr->smgr_rlocator.backend,true);continue;}/* We found a buffer that we need to read in. */io_buffers[0] = buffers[i];io_pages[0] = BufferGetBlock(buffers[i]);io_first_block = blocknum + i;io_buffers_len = 1;/** How many neighboring-on-disk blocks can we can scatter-read into* other buffers at the same time?  In this case we don't wait if we* see an I/O already in progress.  We already hold BM_IO_IN_PROGRESS* for the head block, so we should get on with that I/O as soon as* possible.  We'll come back to this block again, above.*/while ((i + 1) < nblocks &&WaitReadBuffersCanStartIO(buffers[i + 1], true)){/* Must be consecutive block numbers. */Assert(BufferGetBlockNumber(buffers[i + 1]) ==BufferGetBlockNumber(buffers[i]) + 1);io_buffers[io_buffers_len] = buffers[++i];io_pages[io_buffers_len++] = BufferGetBlock(buffers[i]);}io_start = pgstat_prepare_io_time(track_io_timing);smgrreadv(operation->smgr, forknum, io_first_block, io_pages, io_buffers_len);pgstat_count_io_op_time(io_object, io_context, IOOP_READ, io_start,io_buffers_len);/* Verify each block we read, and terminate the I/O. */for (int j = 0; j < io_buffers_len; ++j){BufferDesc *bufHdr;Block		bufBlock;if (persistence == RELPERSISTENCE_TEMP){bufHdr = GetLocalBufferDescriptor(-io_buffers[j] - 1);bufBlock = LocalBufHdrGetBlock(bufHdr);}else{bufHdr = GetBufferDescriptor(io_buffers[j] - 1);bufBlock = BufHdrGetBlock(bufHdr);}/* check for garbage data */if (!PageIsVerifiedExtended((Page) bufBlock, io_first_block + j,PIV_LOG_WARNING | PIV_REPORT_STAT)){if ((operation->flags & READ_BUFFERS_ZERO_ON_ERROR) || zero_damaged_pages){ereport(WARNING,(errcode(ERRCODE_DATA_CORRUPTED),errmsg("invalid page in block %u of relation %s; zeroing out page",io_first_block + j,relpath(operation->smgr->smgr_rlocator, forknum))));memset(bufBlock, 0, BLCKSZ);}elseereport(ERROR,(errcode(ERRCODE_DATA_CORRUPTED),errmsg("invalid page in block %u of relation %s",io_first_block + j,relpath(operation->smgr->smgr_rlocator, forknum))));}/* Terminate I/O and set BM_VALID. */if (persistence == RELPERSISTENCE_TEMP){uint32		buf_state = pg_atomic_read_u32(&bufHdr->state);buf_state |= BM_VALID;pg_atomic_unlocked_write_u32(&bufHdr->state, buf_state);}else{/* Set BM_VALID, terminate IO, and wake up any waiters */TerminateBufferIO(bufHdr, false, BM_VALID, true);}/* Report I/Os as completing individually. */TRACE_POSTGRESQL_BUFFER_READ_DONE(forknum, io_first_block + j,operation->smgr->smgr_rlocator.locator.spcOid,operation->smgr->smgr_rlocator.locator.dbOid,operation->smgr->smgr_rlocator.locator.relNumber,operation->smgr->smgr_rlocator.backend,false);}VacuumPageMiss += io_buffers_len;if (VacuumCostActive)VacuumCostBalance += VacuumCostPageMiss * io_buffers_len;}
}

以上代码看着比较长,实际上也是对PageIsVerifiedExtended函数的调用,进行也检查的情况。

http://www.dtcms.com/a/354209.html

相关文章:

  • 分治法——二分答案
  • 深入探索Vue:前端开发的强大框架
  • Android10 音频系统之AudioPlaybackConfiguration
  • JVM之CMS、G1|ZGC详解以及选型对比
  • SynClub-百度在海外推出的AI社交产品
  • A-Level物理课程全解析:知识点、学习计划与培训机构推荐
  • 网络编程-连接、发送、接收数据学习
  • React Hooks 完全指南:从基础到高级的实战技巧
  • C++ 由 std::thread 初始化想到的
  • TencentOS Server 4.4 下创建mysql容器无法正常运行的问题
  • wireshark解析FLV插件分享
  • 嵌入式Linux(Exynos 4412)笔记
  • 3459. 最长 V 形对角线段的长度
  • 设计模式理解
  • Nishang PowerShell工具:原理详解+使用方法+渗透实战
  • Go+Gdal 完成高性能GIS数据空间分析
  • 深度学习:常用的损失函数的使用
  • “java简单吗?”Java的“简单”与PHP的挑战:编程语言哲学-优雅草卓伊凡
  • 白话FNN、RNN、Attention和self-attention等
  • 《从有限元到深度学习:我的金属疲劳研究进阶之路》
  • 反内卷加速全产业链价值重塑 通威股份等行业龙头或率先受益
  • 基于 C# OpenCVSharp 的模板匹配检测技术方案
  • 计算机日常答疑,一起寻找问题的最优解
  • select
  • SM4加密算法
  • Karatsuba
  • 前端工程化与AI融合:构建智能化开发体系
  • 4-4.Python 数据容器 - 字典 dict(字典 dict 概述、字典的定义与调用、字典的遍历、字典的常用方法)
  • CPU 虚拟化之Cpu Models
  • 代码随想录刷题Day43