AF3 create_alignment_db_sharded脚本create_shard函数解读
AlphaFold3 create_alignment_db_sharded 脚本在源代码的scripts/alignment_db_scripts文件夹下。 该脚本中的 create_shard 函数的功能是将一部分链(shard_files
)中的所有对齐文件写入一个 .db
文件,并返回这些链的索引信息(字节偏移+长度+文件名)供上层构建 super index。
源代码:
def create_shard(shard_files: list[Path], output_dir: Path, output_name: str, shard_num: int
) -> dict:"""Creates a single shard of the alignment database, and returns thecorresponding indices for the super index."""CHUNK_SIZE = 200shard_index = defaultdict(create_index_default_dict) # e.g. {chain_name: {db: str, files: [(file_name, db_offset, file_length)]}, ...}chunk_iter = chunked_iterator(shard_files, CHUNK_SIZE)pbar_desc = f"Shard {shard_num}"output_path = output_dir / f"{output_name}_{shard_num}.db"db_offset = 0db_file = open(output_path, "wb")for files_chunk in tqdm(chunk_iter,total=ceil(len(shard_files) / CHUNK_SIZE),desc=pbar_desc,position=shard_num,leave=False,):# get processed files for one chunkchunk_data = process_chunk(files_chunk)# write to db and store info in indexfor chain_name, file_data in chunk_data.items():shard_index[chain_name]["db"] = output_path.namefor file_name, file_bytes in file_data:file_length = len(file_bytes)shard_index[chain_name]["files"].append((file_name, db_offset, file_length))db_file.write(file_bytes)db_offset += file_lengthdb_file.close()return shard_index
代码解读:
函数签名
def create_shard(shard_files: list[Path], # 当前 shard 负责处理的链目录列表output_dir: Path, # 输出 .db 文件的目录output_name: str, # .db 文件名前缀(如 "alignment")shard_num: int # 当前 shard 的编号(用于命名)
) -> dict: # 返回:当前 shard 的 index 字典
初始化
CHUNK_SIZE = 200
shard_index = defaultdict(create_index_default_dict)
chunk_iter = chunked_iterator(shard_files, CHUNK_SIZE)
output_path = output_dir / f"{output_name}_{shard_num}.db"
db_offset = 0
db_file = open(output_path, "wb")
-
CHUNK_SIZE = 200
:每次并发处理 200 个链目录,避免线程开销过大 -
shard_index
:保存当前 shard 中的所有链名对应的索引信息 -
output_path
:构造.db
文件的路径,如alignment_0.db
-
db_offset
:记录当前.db
文件的写入偏移位置(以字节为单位)
shard_index = defaultdict(create_index_default_dict)
这里你传入的是一个函数名 create_index_default_dict
,而不是函数调用结果(也就是不加 ()
)!
意图:
让 defaultdict
在访问一个不存在的 key 时,调用该函数来生成默认值。
项 | 说明 |
---|---|
defaultdict(<function>) | 不是调用函数,而是传入一个函数对象 |
每次访问不存在的 key | 会自动执行 function() ,作为该 key 的默认值 |
适合嵌套结构 | 如 dict[str → dict[str, list]] |
主循环:分批读取 + 写入
for files_chunk in tqdm(chunk_iter, ...):chunk_data = process_chunk(files_chunk)
调用 process_chunk()
:多线程读取这 200 个链目录下的所有文件,得到结构:
chunk_data = {"1abc_A": [("file1.a3m", b"..."), ("file2.sto", b"...")],"2xyz_B": [("file1.a3m", b"...")]
}
写入 .db
文件 + 更新索引
for chain_name, file_data in chunk_data.items():shard_index[chain_name]["db"] = output_path.namefor file_name, file_bytes in file_data:file_length = len(file_bytes)shard_index[chain_name]["files"].append((file_name, db_offset, file_length))db_file.write(file_bytes)db_offset += file_length
对于每个链:
-
shard_index[chain_name]["db"]
记录它在哪个.db
文件中 -
每个对齐文件都写入
.db
文件,顺序写入 -
记录每个文件的
(file_name, 起始字节位置, 文件长度)
-
更新
db_offset
以便下一个文件写入时知道正确的起点
完成处理后关闭文件并返回索引
db_file.close()
return shard_index
这个 shard_index
是供上层 super_index.update(shard_index)
使用的,格式类似:
{"1abc_A": {"db": "alignment_0.db","files": [["file1.a3m", 0, 1024],["file2.sto", 1024, 512]]}
}
总结:函数作用图解
[shard_files: list of chain dirs]↓
[chunked (200 chains at a time)]↓
[每个 chunk -> 并发读取 (ThreadPoolExecutor)]↓
[每个文件的字节数据 -> 顺序写入到 .db 文件]↓
[记录 offset 和长度 → shard_index 字典]↓
[返回 shard_index 字典]
函数特点与优势
特点 | 优势 |
---|---|
Chunk 处理 | 降低内存和线程并发压力 |
多线程读取 | 加快文件加载速度 |
顺序写入 | .db 文件结构简单,适合大规模读取 |
索引记录精确 | 每个链的每个文件都有 offset,方便快速查找 |
与 ProcessPoolExecutor 配合使用 | 多个 shard 并行构建,CPU 利用率高 |