大型数据集数据处理函数
import argparse
#argparse 是 Python 标准库中的一个模块,用于解析命令行参数
def parse_args():
#parse_args 函数使用 argparse 创建了一个命令行参数解析器,定义了多个参数
subj = 'subj01'
parser = argparse.ArgumentParser(description='visual neural decoding')
#通过 ArgumentParser 创建参数解析器
#添加参数:
parser.add_argument('--subj', dest='subj', help='', type=str, default=subj)
parser.add_argument('--nsd_stim_info_merged', dest='nsd_stim_info_merged', help='', type=str,
default=r'.\step01_downloadDataset\nsddata\experiments\nsd\nsd_stim_info_merged.csv')
parser.add_argument('--excel', dest='excel', help='', type=str, default=r'./step00_excel')
parser.add_argument('--beta_path', dest='beta_path', help='', type=str,
default=f'./step01_downloadDataset/nsddata_betas/ppdata/{subj}/func1pt8mm/betas_fithrf_GLMdenoise_RR')
parser.add_argument('--label_path', dest='label_path', help='', type=str,
default=f'./step01_downloadDataset/nsddata/freesurfer/{subj}/label')
parser.add_argument('--roi_path', dest='roi_path', help='', type=str,
default=f'./step01_downloadDataset/nsddata/ppdata/{subj}/func1pt8mm/roi')
parser.add_argument('--roi_label', dest='roi_label', help='', type=str,
default=f'./step01_downloadDataset/nsddata/freesurfer/{subj}/label')
parser.add_argument('--roi_list', dest='roi_list', help='', type=list,
default=['prf-visualrois', 'prf-eccrois',
'floc-faces', 'floc-words', 'floc-places', 'floc-bodies',
'corticalsulc', 'HCP_MMP1', 'Kastner2015', 'MTL', 'nsdgeneral', 'streams', 'thalamus'])
parser.add_argument('--captions_train2017', dest='captions_train2017', help='', type=str,
default='./step00_cocodataset/annotations_trainval2017/annotations/captions_train2017.json')
parser.add_argument('--captions_val2017', dest='captions_val2017', help='', type=str,
default='./step00_cocodataset/annotations_trainval2017/annotations/captions_val2017.json')
parser.add_argument('--instances_train2017', dest='instances_train2017', help='', type=str,
default='./step00_cocodataset/annotations_trainval2017/annotations/instances_train2017.json')
parser.add_argument('--instances_val2017', dest='instances_val2017', help='', type=str,
default='./step00_cocodataset/annotations_trainval2017/annotations/instances_val2017.json')
parser.add_argument('--nsd_stimuli', dest='nsd_stimuli', help='', type=str,
default='./step01_downloadDataset/nsddata_stimuli/stimuli/nsd/nsd_stimuli.hdf5')
parser.add_argument('--clip_model', dest='clip_model', help='', type=str,
default=r"D:\WORKS\Task00_PreModel\openai\clip-vit-base-patch32")
#argparse 会解析这些参数,并将值存储在 args 对象中
args = parser.parse_args()
return args
参数名 | 描述/用途 | 类型 | 默认值/示例路径 | 备注 |
---|---|---|---|---|
--subj | 主题ID或被试ID(subject ID),用于标识特定被试(如 subj01 ) | str | subj01 | 用于区分不同被试的数据,可能是神经解码实验中的个体标识。 |
--nsd_stim_info_merged | NSD(Natural Scenes Dataset)数据集的合并刺激信息文件路径 | str | .\step01_downloadDataset\nsddata\experiments\nsd\nsd_stim_info_merged.csv | 指向一个CSV文件,包含刺激信息,可能用于数据预处理或分析。 |
--excel | Excel文件路径,可能是用于存储或读取实验相关数据的文件夹或文件 | str | ./step00_excel | 可能是保存实验配置或结果的目录。 |
--beta_path | 神经解码实验的beta值(模型拟合结果)文件路径 | str | ./step01_downloadDataset/nsddata_betas/ppdata/{subj}/func1pt8mm/betas_fithrf_GLMdenoise_RR | 路径中包含 {subj} ,表示根据 subj 参数动态生成,指向功能性MRI数据的beta值。 |
--label_path | FreeSurfer生成的标签文件路径,用于脑区分割或标注 | str | ./step01_downloadDataset/nsddata/freesurfer/{subj}/label | 路径中包含 {subj} ,指向被试的脑区标签文件,通常用于神经解码或脑区分析。 |
--roi_path | 感兴趣区域(ROI)文件路径,可能包含功能性MRI数据的ROI信息 | str | ./step01_downloadDataset/nsddata/ppdata/{subj}/func1pt8mm/roi | 路径中包含 {subj} ,用于存储或读取ROI数据。 |
--roi_label | ROI的标签文件路径,与 label_path 类似,用于脑区或ROI的标注 | str | ./step01_downloadDataset/nsddata/freesurfer/{subj}/label | 路径中包含 {subj} ,可能与 label_path 重复或有细微区别。 |
--roi_list | 感兴趣区域的列表,指定要处理的ROI名称 | list | ['prf-visualrois', 'prf-eccrois', 'floc-faces', 'floc-words', 'floc-places', 'floc-bodies', 'corticalsulc', 'HCP_MMP1', 'Kastner2015', 'MTL', 'nsdgeneral', 'streams', 'thalamus'] | 包含多个预定义的ROI名称,用于指定分析的脑区或区域。 |
--captions_train2017 | COCO数据集训练集的caption注释文件路径 | str | ./step00_cocodataset/annotations_trainval2017/annotations/captions_train2017.json | 指向 COCO 数据集的训练集标注文件,可能用于图像描述生成或分析。 |
--captions_val2017 | COCO数据集验证集的caption注释文件路径 | str | ./step00_cocodataset/annotations_trainval2017/annotations/captions_val2017.json | 指向 COCO 数据集的验证集标注文件,可能用于图像描述生成或分析。 |
--instances_train2017 | COCO数据集训练集的实例注释文件路径 | str | ./step00_cocodataset/annotations_trainval2017/annotations/instances_train2017.json | 指向 COCO 数据集的训练集实例标注文件,可能用于目标检测任务。 |
--instances_val2017 | COCO数据集验证集的实例注释文件路径 | str | ./step00_cocodataset/annotations_trainval2017/annotations/instances_val2017.json | 指向 COCO 数据集的验证集实例标注文件,可能用于目标检测任务。 |
--nsd_stimuli | NSD数据集的刺激文件路径,可能包含图像或刺激数据 | str | ./step01_downloadDataset/nsddata_stimuli/stimuli/nsd/nsd_stimuli.hdf5 | 指向一个 HDF5 文件,存储自然场景数据集的刺激数据,可能用于神经解码。 |
--clip_model | CLIP模型的路径,用于加载预训练的视觉-语言模型 | str | r"D:\WORKS\Task00_PreModel\openai\clip-vit-base-patch32" | 指向一个本地路径,包含预训练的 CLIP 模型(Vision Transformer 基模型),用于多模态任务。 |
import os
def seg_path_name(path_name):
name_ = os.path.basename(path_name) # 文件名
path_ = os.path.dirname(path_name) + os.sep # 路径+分隔符
return path_, name_
path_name = r'C:\data\file.txt'
path_, name_ = seg_path_name(path_name)
print(path_) # C:\data\
print(name_) # file.txt
输入:path_name,一个字符串,表示文件路径(例如:/path/to/nsddata/subj01/datafile.txt)。
输出:两个字符串:
path_:路径部分(例如:/path/to/nsddata/subj01/)。
name_:文件名部分(例如:datafile.txt)。
目的:将完整的文件路径分割为路径和文件名两部分,便于后续处理(如单独访问目录或文件名)
Unix用/,Windows用\,源于早期作系统的设计差异。 Python的os.path模块解决了跨平台问题。os.sep 自动适配分隔符
path_ 和 name_ 用下划线结尾,避免与内置函数(如path)冲突,是一种常见习惯。 用途: 分割后,path_
可用于切换工作目录(如os.chdir(path_))。 name_ 可用于提取文件类型(如.mgz)或构造新文件名。
import requests
from tqdm import tqdm
import sys
import os
def download_fun(url_, path_save_):
"""
从网络上下载文件,并保存在指定文件夹,使用requests和tqdm显示进度
"""
# 创建指定文件夹
if not os.path.exists(path_save_):
os.makedirs(path_save_)
# 检查文件是否存在
file_name = url_.split("/")[-1]
save_file = f"{path_save_}/{file_name}"
if os.path.exists(save_file):
return False
# 下载文件
sys.stdout.write(f'\rDownloading file: {save_file}\n')
response = requests.get(url_, stream=True) # 获取文件流
total_size = int(response.headers.get('content-length', 0)) # 文件总大小
block_size = 1024 # 每次下载1KB
# 使用tqdm显示进度条
with open(save_file, 'wb') as f:
for data in tqdm(response.iter_content(block_size), total=total_size//block_size, unit='KB'):
f.write(data)
sys.stdout.flush()
return None
输入:
url_:字符串,表示要下载的文件的网络地址(如http://example.com/file.txt)。
path_save_:字符串,表示保存文件的本地目录(如/path/to/save/)。
输出:
如果文件已存在,返回False。
如果下载成功,返回None。
目的:从指定URL下载文件,保存到本地目录,并显示下载进度。如果目标文件夹不存在,会自动创建。
1.requests 是一个流行的Python第三方库,用于发送HTTP请求(如下载网页或文件)。支持更多功能(如超时、重定向、认证)
!requests.get(url_, stream=True):
发送GET请求到url_,获取文件内容。
stream=True:不一次性下载全部数据,而是按流(stream)逐块获取。
返回response对象,包含响应头和内容。
total_size = int(response.headers.get(‘content-length’, 0)):
import requests
url = 'http://example.com'
response = requests.get(url)
print(response.text) # 输出网页HTML内容
!response.headers:HTTP响应头,字典形式。
‘content-length’:文件字节大小,若服务器未提供,默认0。
int():将字符串转为整数。
!response.iter_content(block_size):
分块迭代,每次返回block_size字节的数据。
示例:block_size=1024表示每次下载1KB
2.tqdm 库
tqdm 是一个Python第三方库,用于显示进度条(全称:“taqaddum”,阿拉伯语“进度”)。
tqdm(iterable, total, unit):
iterable:可迭代对象(如response.iter_content)。
total:总迭代次数(这里是total_size//block_size,即块数)。
unit:单位(如’KB’),美化显示。
用法:
• 包裹response.iter_content,每次迭代时更新进度条
with open(save_file, ‘wb’) as f::
• ‘wb’:以二进制写模式打开文件,适合下载文件(如图片、HDF5)。
• with语句:自动关闭文件,节省资源。
• f.write(data):
• 将每次迭代的data(字节数据)写入文件
使用:
url = 'https://example.com/sample.txt' # 替换为真实URL
save_path = '/tmp/downloads'
download_fun(url, save_path)
添加错误处理
try:
response = requests.get(url_, stream=True)
response.raise_for_status() # 检查状态码
except requests.RequestException as e:
print(f"Error: {e}")
return False
检查状态
if response.status_code != 200:
raise Exception(f"Download failed: {response.status_code}")
for i in tqdm(range(100), desc="Processing", bar_format="{l_bar}{bar} {n_fmt}/{total_fmt}"):
time.sleep(0.1)
def un_gz(file_name):
"""
解压*.gz文件
"""
f_name = file_name.replace(".gz", "")
g_file = gzip.GzipFile(file_name)
open(f_name, "wb+").write(g_file.read())
g_file.close()
• 输入:file_name,一个字符串,表示要解压的.gz文件路径(如/path/to/file.txt.gz)。
• 输出:无显式返回值,但会在同一目录下生成解压后的文件(如/path/to/file.txt)。
• 目的:解压一个gzip压缩文件(.gz格式),将其内容保存为未压缩的文件。
g_file = gzip.GzipFile(file_name)
gzip:Python标准库模块,用于处理gzip压缩文件。
GzipFile(file_name):
打开.gz文件,默认模式为读取(‘rb’,二进制读)。
返回一个文件对象,类似open()的返回值,但能解码gzip压缩数据。
结果:g_file 是压缩文件的可读对象
gzip格式:
基于DEFLATE算法,广泛用于Unix系统(如.tar.gz)。
NSD中常见.nii.gz文件(NIFTI图像),节省存储空间。
二进制模式:
‘wb’ 和 ‘rb’ 用于处理字节数据,避免文本编码问题。
示例:文本模式(‘w’)可能损坏非文本文件(如图像)
def un_gz(file_name, block_size=1024):
f_name = file_name.replace(".gz", "")
with gzip.GzipFile(file_name, 'rb') as g_file, open(f_name, 'wb') as f:
while True:#对于大文件(如NSD的.nii.gz),一次性read()可能耗尽内存。分块读取更高效
block = g_file.read(block_size)
if not block:
break
f.write(block)
#添加错误捕获
import gzip
def un_gz(file_name):
try:
f_name = file_name.replace(".gz", "")
with gzip.GzipFile(file_name, 'rb') as g_file, open(f_name, 'wb') as f:
f.write(g_file.read())
except FileNotFoundError:
print(f"File {file_name} not found")
except gzip.BadGzipFile:
print(f"{file_name} is not a valid gzip file")
结合tqdm:
def un_gz_shutil(file_name):
total_size = os.path.getsize(file_name)
f_name = file_name[:-3]
with gzip.open(file_name, 'rb') as f_in, open(f_name, 'wb') as f_out:
with tqdm(total=total_size, unit='B', desc=f"Decompressing {file_name}") as pbar:
while True:
#使用shutil: shutil.copyfileobj(f_in, f_out, length=1024) # 每次1KB
block = f_in.read(1024) # 每次读1KB
if not block:
break
f_out.write(block)
pbar.update(len(block)) # 更新实际字节数
tqdm参数:tqdm(total=10000, unit=‘B’):进度条总长度为10000字节,每次更新时显示字节进度
from pycocotools.coco import COCO
def read_info():
# 读取 nsd_stim_info_merged
df = pd.read_csv(args.nsd_stim_info_merged)
column_names = df.columns.tolist()
'''
根据NSD文档,这可能是 nsddata/experiments/nsd/nsd_stim_info_merged.csv,
包含73,000张图像的元数据(如 cocoId、nsdId、trialID)。
df.columns:
DataFrame 的属性,返回列名(pandas.Index 对象)。
.tolist():
将列名转为普通Python列表。
'''
dict_info_ = {}
for name in column_names:
dict_info_[name] = df[name].tolist()
'''
df[name]:
访问 DataFrame 的某列,返回一个 pandas.Series(一维序列)。
.tolist():
将 Series 转为Python列表。
结果:
dict_info_ 是字典,键是列名,值是对应列的数据列表。
示例:
dict_info_ = {
'cocoId': [123, 456, ...],
'nsdId': [0, 1, ...],
'subject1': [1, 0, ...]
}
'''
# 读取 coco_info
'''
captions_train2017:
• 这是一个 COCO 对象,由 pycocotools.coco.COCO 类实例化。
• 它加载了COCO数据集的标题标注文件(如 captions_train2017.json),包含图像ID和对应的描述(captions)。
annotation_file 参数,表示COCO标注文件的路径,告诉 COCO 类从哪个JSON文件加载标注数据。例如:
captions_train2017.json:训练集标题。
captions_val2017.json:验证集标题。
instances_train2017.json:训练集对象实例。
annotation_file=args.captions_train2017
那么args.captions_train2017是什么
是一个字符串路径
其中args 是一个对象,通常由 argparse 模块创建,用于管理命令行参数。
import argparse
parser = argparse.ArgumentParser(description="Process NSD data")
parser.add_argument('--captions_train2017', default='/path/to/captions_train2017.json', help='Path to COCO captions train file')
args = parser.parse_args()
参数'--captions_train2017'表示命令行传入的 captions_train2017.json 文件路径。
如果未传入,默认使用定义中的路径(如 /path/to/captions_train2017.json)。
例如:
python script.py --captions_train2017 /custom/path/captions_train2017.json
因此COCO 类用这个路径加载COCO 2017训练集的标题标注文件。
文件内容:
{
"images": [{"id": 123, "file_name": "cat.jpg", ...}],
"annotations": [
{"id": 1001, "image_id": 123, "caption": "A cat sleeping"},
{"id": 1002, "image_id": 123, "caption": "A cat on a bed"},
{"id": 1003, "image_id": 123, "caption": "Cat resting"}
]
}
'''
captions_train2017 = COCO(annotation_file=args.captions_train2017)
captions_val2017 = COCO(annotation_file=args.captions_val2017)
instances_train2017 = COCO(annotation_file=args.instances_train2017)
instances_val2017 = COCO(annotation_file=args.instances_val2017)
return dict_info_, captions_train2017, captions_val2017, instances_train2017, instances_val2017
dict_info, coco_cap_tr, coco_cap_val, coco_ins_tr, coco_ins_val = read_info()
#使用
ann_ids = captions_train2017.getAnnIds(imgIds=123)
captions = captions_train2017.loadAnns(ann_ids)
print(captions[0]['caption']) # 示例输出:'A cat sleeping'
'''
captions_train2017这是一个 COCO 对象,由 pycocotools.coco.COCO 类实例化。
getAnnIds:
COCO 类的方法,用于获取与指定图像ID(imgIds)相关的标注ID(Annotation IDs)
返回一个列表,包含与图像ID 123关联的所有标注ID(一张图像可能有多个标题(captions),每个标题对应一个独立的标注ID。)。
captions = captions_train2017.loadAnns(ann_ids):
loadAnns:
COCO 类的方法,根据标注ID列表加载具体的标注信息。
ann_ids:
输入的是上一步返回的标注ID列表。
返回值:
返回一个字典列表,每个字典包含一个标注的详细信息(如标题、ID等)。
'''
输入:无显式参数,但依赖外部变量 args,包含文件路径(如 args.nsd_stim_info_merged)。
输出:
dict_info_:一个字典,包含 NSD 刺激信息的列名和对应数据列表。
captions_train2017, captions_val2017:COCO 2017训练和验证集的标题(captions)对象。
instances_train2017, instances_val2017:COCO 2017训练和验证集的实例(instances)对象。
目的:读取NSD的刺激元数据和COCO的标注信息,返回结构化数据供后续处理。
根据NSD文档: nsd_stim_info_merged.csv 包含73,000张图像的元数据。 每行对应一张图像,每列是属性。
列名包括: ‘Unnamed: 0’:可能是一个索引列(0-72999)。 ‘cocoId’:COCO图像ID。
‘nsdId’:NSD图像ID(0-72999)。 ‘cocoSplit’:COCO拆分(train2017 或 val2017)。
‘subjectX’(X=1-8):是否显示给被试X(0或1)。
‘subjectX_repN’(X=1-8,N=0-2):试验ID(1-30000)或0(未显示)。 其他列(如
‘cropBox’、‘loss’ 等)。
Unnamed: 0 cocoId nsdId cocoSplit subject1 subject1_rep0 subject1_rep1 ...
0 0 123 0 train2017 1 1 0 ...
1 1 456 1 val2017 0 0 0 ...
2 2 789 2 train2017 1 2 5 ...
转换后的:
dict_info_ = {
'Unnamed: 0': [0, 1, 2, ...], # 73,000个值
'cocoId': [123, 456, 789, ...],
'nsdId': [0, 1, 2, ...],
'cocoSplit': ['train2017', 'val2017', 'train2017', ...],
'subject1': [1, 0, 1, ...],
'subject1_rep0': [1, 0, 2, ...],#图像在第几次实验出现。30000次实验,每个受试者10000张,每张图#像重复3次
'subject1_rep1': [0, 0, 5, ...],
...
}
def id_trail2coco(trail_id):#输入参数0-2999
"""
trailID_to_cocoID
"""
trail_id += 1#转换为1-3000
sub_rep0 = list(dict_info[f'subject{args.subj[-1]}_rep0'])
sub_rep1 = list(dict_info[f'subject{args.subj[-1]}_rep1'])
sub_rep2 = list(dict_info[f'subject{args.subj[-1]}_rep2'])
if trail_id in sub_rep0:
#返回 trail_id 在列表中的位置(0基索引)。也就是第trail_id次实验出现了哪张图像
index = sub_rep0.index(trail_id)
elif trail_id in sub_rep1:
index = sub_rep1.index(trail_id)
else:
index = sub_rep2.index(trail_id)
coco_id = list(dict_info['cocoId'])[index] #找到nsd图像索引对应的coco图像索引
tr_val = list(dict_info['cocoSplit'])[index]#COCO拆分('train2017' 或 'val2017')
Unnamed = list(dict_info['Unnamed: 0'])[index]#nsd图像索引
return coco_id, tr_val, Unnamed
输入:trail_id,一个整数,表示NSD中的试验ID(0基索引)。
输出:
coco_id:对应的COCO图像ID。
tr_val:COCO数据集的拆分信息(train2017 或 val2017)。
Unnamed:Unnamed: 0 列的值(NSD的内部索引)。
目的:根据被试的试验ID,查询NSD刺激信息(dict_info),返回对应的COCO图像信息。
依赖:全局变量 args(含被试ID)和 dict_info(从 read_info 获取)。
def read_template_name():
roi_label = os.listdir(args.roi_label) #返回指定目录(args.roi_label)中的文件和子目录列表
# 结果:roi_label 是文件名列表,如 ['prf-visualrois.mgz.ctab', 'corticalsul.mgz.ctab', ...]。
roi_label = [x for x in roi_label if x.endswith('.mgz.ctab')]
#列表推导式:从 roi_label 中筛选出以 .mgz.ctab 结尾的文件。
roi_dict_ = {}
for roi in roi_label:
name = roi.rstrip('.mgz.ctab')#对每个roi去掉后缀
lines = open(f"{args.roi_label}/{roi}", 'r').read().split('\n')#按换行符分割成行列表
temp_dict = {-1: 'other'}#初始化临时字典,预设 -1 映射到 'other'(可能是非皮层区域)。
for line in lines:
if not line.strip():
continue
key, value = line.strip().split(' ')
temp_dict[int(key)] = value #结果:temp_dict = {-1: 'other', 1: 'V1v', 2: 'V1d', ...}。
roi_dict_[name] = copy.deepcopy(temp_dict) #深拷贝 temp_dict,避免后续修改影响。deepcopy避免引用问题,复制嵌套对象
roi_dict_["corticalsulc"] = roi_dict_.pop('corticalsul') #修正键名
return roi_dict_#返回字典,包含所有ROI的标签映射
作用:args.roi_label目录下每个文件作为键,内容转换为字典作为值(每一行都有个键值对)。
roi_dict_ 的样子
结构:
键:ROI名称(如 ‘prf-visualrois’、‘corticalsulc’)。
值:字典,键是标签值(整数),值是区域名(字符串)。
示例
roi_dict_ = {
‘prf-visualrois’: {-1: ‘other’, 1: ‘V1v’, 2: ‘V1d’, 3: ‘V2v’, …},
‘corticalsulc’: {-1: ‘other’, 1: ‘sulcus1’, 2: ‘sulcus2’, …},
…
}
import nibabel as nib
def get_beta(trail_id):
"""
1.读取beta的函数,入参:subjID, trailID。出参:三维大脑的beta数组
"""
session_id = (trail_id // 750) + 1 #40轮,每轮750次实验
stimuli_id = trail_id % 750 #该轮中第几个实验所对应的刺激
path_beta = f"./step01_downloadDataset/nsddata_betas/ppdata/{args.subj}/" \
f"func1pt8mm/betas_fithrf_GLMdenoise_RR/betas_session{session_id:02d}.nii"
#功能数据分辨率(1.8mm)+ GLM模型版本(b3:HRF拟合+去噪+岭回归)
'''动态路径:
path_beta = os.path.join('step01_downloadDataset', 'nsddata_betas', 'ppdata', args.subj,
'func1pt8mm', 'betas_fithrf_GLMdenoise_RR', f'betas_session{session_id:02d}.nii')
'''
beta_data = nib.load(path_beta)#读取NIFTI文件(.nii),返回 Nifti1Image 对象。神经影像标准数据
beta_data = beta_data.get_fdata() # 从 Nifti1Image 提取数据,返回 浮点 numpy 数组。
beta_data = beta_data[:, :, :, stimuli_id]
return beta_data
输入:trail_id,一个整数,表示NSD中的试验ID。
依赖:全局变量 args.subj,表示被试ID(如 ‘subj01’)。
输出:beta_data,一个三维数组,表示对应试验的大脑beta值(BOLD响应幅度)。
目的:从NSD的beta文件中提取特定试验的fMRI数据,返回三维大脑活动数组。
NSD关联:读取 betas_sessionXX.nii 文件,包含被试的单次试验beta值。
def get_roi(roi_name):
"""
2.读取roi的函数,入参:subjID, roi。出参:三维大脑的roi数组
"""
path_beta = f"{args.roi_path}/{roi_name}.nii"
roi_data = nib.load(path_beta)
roi_data = roi_data.get_fdata() # 将 nii.gz 文件转换为 array
return roi_data
def get_category(trail_id):
"""
4.读取类别的函数,入参:subjID, trailID。出参:类别ID和类别名
"""
img_id, tr_val, _ = id_trail2coco(trail_id)
coco = coco_ins_tr if tr_val.startswith('train') else coco_ins_val
#如果 tr_val 以 "train" 开头,使用 coco_ins_tr(训练集的 COCO 实例)。
ann_ids = coco.getAnnIds(imgIds=img_id)
targets = coco.loadAnns(ann_ids)
category_id = set([x['category_id'] for x in targets])
#使用 set 去除重复类别,同一图像可能包含多只猫和狗
category = [x for x in coco.loadCats(coco.getCatIds()) if x['id'] in category_id]
'''
coco.getCatIds():获取 COCO 数据集中所有类别的ID。
coco.loadCats(coco.getCatIds()):加载所有类别的详细信息,返回一个类别字典列表,每个字典可能包含 id、name(类别名)、
supercategory(超类别)等字段。
进一步通过x['id'] in category_id进行过滤:
[{'id': 1, 'name': 'dog', 'supercategory': 'animal'}, {'id': 2, 'name': 'cat', 'supercategory': 'animal'}]
'''
return category
def get_sentence(trail_id):
"""
5.读取描述的函数,入参:subjID, trailID。出参:五个描述句子
"""
img_id, tr_val, _ = id_trail2coco(trail_id)
coco = coco_cap_tr if tr_val.startswith('train') else coco_cap_val
ann_ids = coco.getAnnIds(imgIds=img_id)
anns = coco.loadAnns(ann_ids)
return anns
def get_img(trail_id):
"""
6.读取图片的函数,入参:subjID, trailID。出参:原始RGB图像
"""
img_id, tr_val, _ = id_trail2coco(trail_id)
coco = coco_ins_tr if tr_val.startswith('train') else coco_ins_val
img = coco.loadImgs(ids=img_id)[0]
img = io.imread(img['coco_url'])
return img
说明:使用 COCO API 的 loadImgs 方法,根据 img_id 加载图像信息。loadImgs 返回一个列表,列表中的每个元素是一个字典,包含图像的元数据(如 id、file_name、coco_url 等)。这里通过 [0] 取第一个(且假设唯一的)图像信息字典。
假设:img_id 对应一个唯一的图像,loadImgs 返回的列表只有一个元素。如果 img_id 无效或对应多个图像,可能会导致索引错误。
说明:使用 io.imread(通常来自 scipy.misc 或 skimage.io 库)读取图像。img[‘coco_url’] 是 COCO 数据集中图像的URL地址,指向图像的在线存储位置。
返回值:io.imread 通常返回一个 NumPy 数组,表示图像的像素值(RGB格式,形状为 (height, width, 3))。
假设:需要确保 io 模块已正确导入(例如 from skimage import io 或 from scipy.misc import imread),并且 coco_url 是有效的URL且图像可以访问。
def get_square_img(trail_id):
"""
7.读取图片(clip)的函数,入参:subjID, trailID。出参:正方形RGB图像
提取刺激图像的剪切版本
"""
_, _, Unnamed = id_trail2coco(trail_id)
with h5py.File(args.nsd_stimuli, 'r') as f:
clip_img = f['imgBrick'][Unnamed]
return clip_img
nsddata/experiments/nsd/nsd_stim_info_merged.csv
这是一个逗号分隔的文本文件,包含与NSD图像选择和准备相关的信息。文件包含一个标题行,之后每行对应一张NSD实验使用的73,000张图像。
第1列:图像编号(0-72999,从0开始)。
第2列(cocoId):COCO数据库中该图像的ID号。
第3列(cocoSplit):值为“train2017”或“val2017”,表示COCO的训练或验证集拆分。NSD实验设计未使用此信息,仅供参考。
第4列(cropBox):裁剪框的四个数字元组(顶部、底部、左侧、右侧),以图像尺寸的分数表示。裁剪总是沿最大维度进行,因此总有两个0。
第5列(loss):裁剪后的对象损失得分,详情见论文及下文“裁剪选择细节”。
第6列(nsdId):图像在73k图像集中的0基索引,与第1列相同。(注:某些情况下73k ID为1基,这里为0基。)
第7列(flagged):若图像内容有争议(如暴力或色情),则为True。
第8列(BOLD5000):若图像包含在BOLD5000数据集中,则为True(见http://bold5000.github.io)。注意NSD图像为正方形裁剪,与BOLD5000略有不同。
第9列(shared1000):若图像是所有8名受试者共见的1000张特殊图像之一,则为True。
第10-17列(subjectX):0或1,表示该图像是否展示给受试者X(X从1到8)。
第18-41列(subjectX_repN):0表示未展示给受试者X;若为正整数T,则表示该图像在第N次重复中展示给受试者X(X从1-8,N从0-2,共3次试验)。T是试验ID(1-30000),按时间顺序排列,涵盖受试者在NSD实验中遇到的所有30,000个刺激试验。
nsddata_stimuli/stimuli/nsd/nsd_stimuli.hdf5
这是一个包含NSD实验所有图像的单一.hdf5文件。为3通道×425像素×425像素×73,000张图像,格式为uint8。这些图像显示在RGB值为(127,127,127)的灰色背景上。
该文件中的图像是73k图像的官方列表,“73k-ID”指该列表的索引(1基)。
有一个特殊的1000张图像子集,所有8名受试者都看到。此外,每位受试者还看到9000张独特图像(部分受试者未完成全部40场扫描)。
示例:在MATLAB中加载第10239张图像:
matlab
崩溃
包装
复制
im = permute(h5read(‘nsd_stimuli.hdf5’,‘/imgBrick’,[1 1 1 10239],[3 425 425 1]),[3 2 1]);
翻译:
这个.hdf5文件存储了全部73,000张实验图像,每张为3通道(RGB)、425×425像素,背景灰色。
它定义了“73k-ID”(从1开始)。其中1000张图像是共享的,每人另有9000张独特图像。
MATLAB代码示例展示了如何读取特定图像。
讲解:
hdf5格式: 高效存储大规模数据,适合快速访问73k张图像。
图像规格: 425×425像素是实验标准尺寸,灰色背景避免干扰视觉处理。
代码示例: 给出了实用工具,permute调整维度以正确显示图像。
代码实战(输入:subj1的trail_id,输出对应的coco_id,并返回coco标注与图像)
import os
import sys
import urllib.request
from pycocotools.coco import COCO
import pandas as pd
def seg_path_name(path_name):
name_ = path_name.split('/')[-1]
path_ = path_name[:(len(path_name) - len(name_))]
return path_, name_
def download_fun(url_, path_save_):
"""
从网络上下载文件,并保存在指定文件夹,所保存的文件文件名称是url的文件名
"""
def report(count, block_size, total_size):
percent = int(count * block_size * 100 / total_size)
sys.stdout.write("\r%d%%" % percent + ' complete')
sys.stdout.flush()
# 创建指定文件夹
if not os.path.exists(path_save_):
os.makedirs(path_save_)
# 检查文件是否存在
file_name = url_.split("/")[-1]
save_file = f"{path_save_}/{file_name}"
if os.path.exists(save_file):
return False
# 下载文件
sys.stdout.write(f'\rDownloading file: {save_file}\n')
urllib.request.urlretrieve(url_, save_file, reporthook=report)
sys.stdout.flush()
return None
url_base = "https://natural-scenes-dataset.s3.amazonaws.com"
path_output = "./step01_downloadDataset"
url = f"{url_base}/nsddata/experiments/nsd/nsd_stim_info_merged.csv"
path_save, filename = seg_path_name(url)#path_save: {url_base}/nsddata/experiments/nsd/ filename:nsd_stim_info_merged.csv
path_save = path_save.replace(url_base, path_output)#path_save: {path_output}/nsddata/experiments/nsd/
if not os.path.exists(path_save + filename):
download_fun(url, path_save)# url的 QQ路径下的文件xx保存到 本地文件夹下的 QQ路径/XX文件
#---
# 基本函数2
def read_info():
# 读取 nsd_stim_info_merged
df = pd.read_csv("E:/桌面/CLIP多模态/data/step01_downloadDataset/nsddata/experiments/nsd/nsd_stim_info_merged.csv")
column_names = df.columns.tolist()
dict_info_ = {}
for name in column_names:
dict_info_[name] = df[name].tolist()
# 读取 coco_info
base_path = "E:/桌面/CLIP多模态/data/annotations/"
captions_train2017 = COCO(annotation_file=base_path + "captions_train2017.json")
captions_val2017 = COCO(annotation_file=base_path + "captions_val2017.json")
instances_train2017 = COCO(annotation_file=base_path + "instances_train2017.json")
instances_val2017 = COCO(annotation_file=base_path + "instances_val2017.json")
return dict_info_, captions_train2017, captions_val2017, instances_train2017, instances_val2017
dict_info, coco_cap_tr, coco_cap_val, coco_ins_tr, coco_ins_val = read_info()
def id_trail2coco(trail_id):
"""
trailID_to_cocoID
"""
trail_id += 1
sub_rep0 = list(dict_info[f'subject1_rep0'])
sub_rep1 = list(dict_info[f'subject1_rep1'])
sub_rep2 = list(dict_info[f'subject1_rep2'])
if trail_id in sub_rep0:
index = sub_rep0.index(trail_id)
elif trail_id in sub_rep1:
index = sub_rep1.index(trail_id)
else:
index = sub_rep2.index(trail_id)
coco_id = list(dict_info['cocoId'])[index]
tr_val = list(dict_info['cocoSplit'])[index]
Unnamed = list(dict_info['Unnamed: 0'])[index]
return coco_id, tr_val, Unnamed
# 获取 trail_id=27001 对应的 coco_id 和数据集划分
trail_id = 27005
coco_id, tr_val, _ = id_trail2coco(trail_id)
# 选择对应的数据集
if tr_val == "train2017":
coco_caps = coco_cap_tr
elif tr_val == "val2017":
coco_caps = coco_cap_val
else:
raise ValueError(f"Unknown dataset split: {tr_val}")
# 获取 coco_id 对应的 captions
ann_ids = coco_caps.getAnnIds(imgIds=coco_id)
annotations = coco_caps.loadAnns(ann_ids)
captions = [ann['caption'] for ann in annotations]
# 打印 captions
print(captions)
#---
import matplotlib.pyplot as plt
import requests
from PIL import Image
# COCO 图片存储 URL(COCO 2017 数据集)
COCO_IMG_URL = "http://images.cocodataset.org/"
def show_coco_image(trail_id):
# 获取 coco_id 和数据集划分
coco_id, tr_val, _ = id_trail2coco(trail_id)
# 选择数据集路径
img_folder = "train2017" if tr_val == "train2017" else "val2017"
img_url = f"{COCO_IMG_URL}/{img_folder}/{str(coco_id).zfill(12)}.jpg"
# 读取并显示图像
response = requests.get(img_url, stream=True)
if response.status_code == 200:
img = Image.open(response.raw)
plt.imshow(img)
plt.axis("off") # 关闭坐标轴
# 获取 captions
coco_caps = coco_cap_tr if tr_val == "train2017" else coco_cap_val
ann_ids = coco_caps.getAnnIds(imgIds=coco_id)
annotations = coco_caps.loadAnns(ann_ids)
captions = [ann["caption"] for ann in annotations]
# 显示标题(五句话描述)
plt.title("\n".join(captions), fontsize=8, wrap=True)
plt.show()
else:
print(f"无法加载图片: {img_url}")
# 显示 trail_id 的 COCO 图像
show_coco_image(trail_id)
输出结果:
加载图像有点问题,可以直接根据编号在官网查找:coco图像查找