- 最近在跑, 3D Diffusion Policy, DexGraspVLA, Improved Diffusion Policy的时候发现大家的数据集都是以zarr的格式存储的,遂记录一下zarr作为dataset类的基本信息,方便日后查询;
- 这个视频介绍了zarr是怎么高效存储的:https://www.youtube.com/watch?v=KiiKvXzhyMs,大约节省15%~50%的空间;
- 涉及到下面三个的代码我上传到了百度网盘:
- 链接: https://pan.baidu.com/s/1IQT4bAdP-LuOwSlfmE_49A 提取码: 2025
- 我的zarr版本是2.12.0;
1. 记录数据
- Demo (DP3, https://github.com/YanjieZe/3D-Diffusion-Policy)
- 按照现在的Replay Buffer,
episode_ends
是必须要有的,而且要存给meta;
import zarr
import numpy as np zarr_root = zarr.group('/mnt/data/workspace/Improved-3D-Diffusion-Policy/notebooks/test_data')
zarr_data = zarr_root.create_group('data')
zarr_meta = zarr_root.create_group('meta')B = 30
img_arrays = np.random.rand(B, 480, 640, 3)
pointcloud_arrays = np.random.rand(B, 512, 3)
depth_arrays = np.random.rand(B, 480, 640)
action_arrays = np.random.rand(B, 7)
episodeends_arrays = np.array([B//3, B//3*2, B], dtype=np.int64) compressor = zarr.Blosc(cname='zstd', clevel=3, shuffle=1)
img_chunk_size = (B, img_arrays.shape[1], img_arrays.shape[2], img_arrays.shape[3])
pointcloud_chunk_size = (B, pointcloud_arrays.shape[1], pointcloud_arrays.shape[2])
depth_chunk_size = (B, depth_arrays.shape[1], depth_arrays.shape[2])
action_chunk_size = (B, action_arrays.shape[1])
episodeends_chunk_size = (B,)zarr_data.create_dataset('img', data=img_arrays, chunks=img_chunk_size, dtype=np.float32, overwrite=True, compressor=compressor)
zarr_data.create_dataset('pointcloud', data=pointcloud_arrays, chunks=pointcloud_chunk_size, dtype=np.float32, overwrite=True, compressor=compressor)
zarr_data.create_dataset('depth', data=depth_arrays, chunks=depth_chunk_size, dtype=np.float32, overwrite=True, compressor=compressor)
zarr_data.create_dataset('action', data=action_arrays, chunks=action_chunk_size, dtype=np.float32, compressor=compressor)
zarr_meta.create_dataset('episode_ends', data=episodeends_arrays, chunks=episodeends_chunk_size, dtype=np.int64, compressor=compressor)
├── .zgroup
├── data
│ ├── .zgroup
│ ├── action
│ │ ├── .zarray
│ │ └── 0.0
│ ├── depth
│ │ ├── .zarray
│ │ └── 0.0.0
│ ├── img
│ │ ├── .zarray
│ │ └── 0.0.0.0
│ └── pointcloud
│ ├── .zarray
│ └── 0.0.0
└── meta├── .zgroup└── episode_ends├── .zarray└── 0
- DexGraspVLA:
- https://github.com/Psi-Robot/DexGraspVLA/blob/main/controller/common/replay_buffer.py
- https://github.com/Psi-Robot/DexGraspVLA/blob/main/controller/common/sampler.py
- Improved Diffusion Policy:
- https://github.com/YanjieZe/3D-Diffusion-Policy/blob/5207c41ad917684c6f2b9d2900c0d7a593df94ab/3D-Diffusion-Policy/diffusion_policy_3d/common/replay_buffer.py
- https://github.com/YanjieZe/3D-Diffusion-Policy/blob/5207c41a/3D-Diffusion-Policy/diffusion_policy_3d/common/sampler.py
- 加载数据
- Demo
- 使用 SequenceSampler 从 ReplayBuffer 中采样序列数据;
- pad_before和pad_after是针对每个episode而言的,我这里三个episode,每个episode的长度为10, pad一共是前1后7共计18,采样长度是16即每个episode能采样(18+1-16=3)个sample_sequence,最后三个episode就是一共9个sample_sequence;
from replay_buffer import ReplayBuffer
from sampler import SequenceSampler
replay_buffer = ReplayBuffer.copy_from_path( '/mnt/data/workspace/Improved-3D-Diffusion-Policy/notebooks/test_data', keys=['img', 'pointcloud', 'depth', 'action']
)
episode_0 = replay_buffer.get_episode(0)
sampler = SequenceSampler( replay_buffer=replay_buffer, sequence_length=16, pad_before=1, pad_after=7, episode_mask=None
)
print(len(sampler))
sample = sampler.sample_sequence(0)
print(f"Sampled action shape: {sample['action'].shape}")
print(f"Sampled state shape: {sample['img'].shape}") '''
Output:
[32mReplay Buffer: img, shape (30, 480, 640, 3), dtype float32, range 0.00~1.00[0m
[32mReplay Buffer: pointcloud, shape (30, 512, 3), dtype float32, range 0.00~1.00[0m
[32mReplay Buffer: depth, shape (30, 480, 640), dtype float32, range 0.00~1.00[0m
[32mReplay Buffer: action, shape (30, 7), dtype float32, range 0.00~1.00[0m
[32m--------------------------[0m
9
Sampled action shape: (16, 7)
Sampled state shape: (16, 480, 640, 3)
'''
- DexGraspVLA, Improved Diffusion Policy的replay_buffer和DP3的毫无区别,就只是多了两个try,
sampler
完全一样的;