Python Minio库连接和操作Minio数据库
目录
- 1. 创建Docker Compose文件
- 2. 创建环境变量文件
- 3. 创建Python连接和操作脚本
- 4. 启动和使用说明
- 启动MinIO服务
- 环境安装
- 5、MinIO基本操作示例
- 前提条件
- 1. 连接MinIO服务器
- 2. 存储桶(Bucket)操作
- 3. 对象(Object)上传操作
- 4. 列出对象
- 5. 下载和读取对象
- 6. 生成预签名URL
- 7. 对象元数据操作
- 8. 删除操作
MinIO 是一个高性能、分布式对象存储系统,兼容 Amazon S3 API,适用于大规模数据存储和管理。它支持多种部署方式,包括 Docker 和 Kubernetes,使其能够轻松集成到现代云原生架构中。通过其简单直观的 API,开发者可以快速实现文件上传、下载、元数据管理、预签名 URL 生成等功能。
下面通过 docker-compose
快速搭建了一个 MinIO 服务,并使用 Python 客户端库实现了完整的对象存储操作。这些功能包括:
- 存储桶管理:创建、列出和删除存储桶。
- 对象操作:上传文本/JSON 数据、下载文件、读取内容、生成预签名 URL。
- 元数据处理:自定义对象元数据并进行检索。
- 批量删除:高效地批量删除多个对象。
- 安全性与可扩展性:结合
.env
环境变量配置敏感信息,确保开发安全;同时利用 MinIO 的分布式能力支持横向扩展。
MinIO 非常适合用于 AI/ML、大数据分析、日志存储等场景,是构建私有云存储平台的理想选择。
1. 创建Docker Compose文件
首先创建 docker-compose.yml
文件:
version: '3.8'services:minio:image: quay.io/minio/minio:RELEASE.2023-12-20T01-00-02Zcontainer_name: ragflow-miniocommand: server --console-address ":9001" /dataports:- ${MINIO_PORT}:9000- ${MINIO_CONSOLE_PORT}:9001env_file: .envenvironment:- MINIO_ROOT_USER=${MINIO_USER}- MINIO_ROOT_PASSWORD=${MINIO_PASSWORD}- TZ=${TIMEZONE}volumes:- minio_data:/datanetworks:- ragflowrestart: on-failurevolumes:minio_data:networks:ragflow:driver: bridge
2. 创建环境变量文件
创建 .env
文件:
# MinIO配置
MINIO_PORT=9000
MINIO_CONSOLE_PORT=9001
MINIO_USER=minioadmin
MINIO_PASSWORD=minioadmin123
TIMEZONE=Asia/Shanghai
3. 创建Python连接和操作脚本
创建 minio_client.py
文件:
from minio import Minio
from minio.error import S3Error
import os
from datetime import timedelta
import ioclass MinIOClient:def __init__(self, endpoint="localhost:9000", access_key="minioadmin", secret_key="minioadmin123", secure=False):"""初始化MinIO客户端Args:endpoint: MinIO服务地址access_key: 访问密钥secret_key: 秘密密钥secure: 是否使用HTTPS"""self.client = Minio(endpoint=endpoint,access_key=access_key,secret_key=secret_key,secure=secure)def create_bucket(self, bucket_name):"""创建存储桶Args:bucket_name: 存储桶名称"""try:if not self.client.bucket_exists(bucket_name):self.client.make_bucket(bucket_name)print(f"存储桶 '{bucket_name}' 创建成功")else:print(f"存储桶 '{bucket_name}' 已存在")except S3Error as e:print(f"创建存储桶失败: {e}")def list_buckets(self):"""列出所有存储桶"""try:buckets = self.client.list_buckets()print("存储桶列表:")for bucket in buckets:print(f" - {bucket.name} (创建时间: {bucket.creation_date})")return bucketsexcept S3Error as e:print(f"列出存储桶失败: {e}")return []def upload_file(self, bucket_name, object_name, file_path):"""上传文件Args:bucket_name: 存储桶名称object_name: 对象名称(在MinIO中的文件名)file_path: 本地文件路径"""try:self.client.fput_object(bucket_name, object_name, file_path)print(f"文件 '{file_path}' 上传成功,对象名: '{object_name}'")except S3Error as e:print(f"上传文件失败: {e}")def upload_data(self, bucket_name, object_name, data):"""上传数据(字符串或字节)Args:bucket_name: 存储桶名称object_name: 对象名称data: 要上传的数据"""try:if isinstance(data, str):data = data.encode('utf-8')data_stream = io.BytesIO(data)self.client.put_object(bucket_name, object_name, data_stream, length=len(data))print(f"数据上传成功,对象名: '{object_name}'")except S3Error as e:print(f"上传数据失败: {e}")def download_file(self, bucket_name, object_name, file_path):"""下载文件Args:bucket_name: 存储桶名称object_name: 对象名称file_path: 本地保存路径"""try:self.client.fget_object(bucket_name, object_name, file_path)print(f"文件下载成功,保存到: '{file_path}'")except S3Error as e:print(f"下载文件失败: {e}")def get_object_data(self, bucket_name, object_name):"""获取对象数据Args:bucket_name: 存储桶名称object_name: 对象名称Returns:对象数据(字节)"""try:response = self.client.get_object(bucket_name, object_name)data = response.read()response.close()response.release_conn()return dataexcept S3Error as e:print(f"获取对象数据失败: {e}")return Nonedef list_objects(self, bucket_name, prefix=None):"""列出存储桶中的对象Args:bucket_name: 存储桶名称prefix: 对象名前缀(可选)"""try:objects = self.client.list_objects(bucket_name, prefix=prefix)print(f"存储桶 '{bucket_name}' 中的对象:")for obj in objects:print(f" - {obj.object_name} (大小: {obj.size} bytes, 修改时间: {obj.last_modified})")except S3Error as e:print(f"列出对象失败: {e}")def delete_object(self, bucket_name, object_name):"""删除对象Args:bucket_name: 存储桶名称object_name: 对象名称"""try:self.client.remove_object(bucket_name, object_name)print(f"对象 '{object_name}' 删除成功")except S3Error as e:print(f"删除对象失败: {e}")def delete_bucket(self, bucket_name):"""删除存储桶(必须为空)Args:bucket_name: 存储桶名称"""try:self.client.remove_bucket(bucket_name)print(f"存储桶 '{bucket_name}' 删除成功")except S3Error as e:print(f"删除存储桶失败: {e}")def generate_presigned_url(self, bucket_name, object_name, expires=timedelta(hours=1)):"""生成预签名URLArgs:bucket_name: 存储桶名称object_name: 对象名称expires: 过期时间Returns:预签名URL"""try:url = self.client.presigned_get_object(bucket_name, object_name, expires=expires)print(f"预签名URL生成成功: {url}")return urlexcept S3Error as e:print(f"生成预签名URL失败: {e}")return None
4. 启动和使用说明
启动MinIO服务
docker-compose up -d
环境安装
uv init
uv venv
.venv/Script/activate
uv pip install minio notebook
5、MinIO基本操作示例
下面演示如何使用Python连接MinIO并进行基本的对象存储操作。
前提条件
- 确保MinIO服务已启动:
docker-compose up -d
- 安装依赖:
uv pip install minio jupyter
# 导入必要的库
from minio import Minio
from minio.error import S3Error
import io
import json
from datetime import timedelta
import os
1. 连接MinIO服务器
# 创建MinIO客户端
client = Minio(endpoint='localhost:9000',access_key='minioadmin',secret_key='minioadmin123',secure=False # HTTP连接,生产环境建议使用HTTPS
)print('MinIO客户端创建成功!')
MinIO客户端创建成功!
2. 存储桶(Bucket)操作
# 列出所有存储桶
try:buckets = client.list_buckets()print('现有存储桶:')for bucket in buckets:print(f' - {bucket.name} (创建时间: {bucket.creation_date})')
except S3Error as e:print(f'列出存储桶失败: {e}')
现有存储桶:
# 创建新的存储桶
bucket_name = 'demo-bucket'try:if not client.bucket_exists(bucket_name):client.make_bucket(bucket_name)print(f'存储桶 "{bucket_name}" 创建成功')else:print(f'存储桶 "{bucket_name}" 已存在')
except S3Error as e:print(f'创建存储桶失败: {e}')
存储桶 "demo-bucket" 创建成功
3. 对象(Object)上传操作
# 上传文本数据
text_data = "这是一个测试文件的内容\n包含中文字符\n用于演示MinIO的基本功能"try:# 将字符串转换为字节流data_bytes = text_data.encode('utf-8')data_stream = io.BytesIO(data_bytes)# 上传数据client.put_object(bucket_name=bucket_name,object_name='test.txt',data=data_stream,length=len(data_bytes),content_type='text/plain')print('文本文件上传成功')
except S3Error as e:print(f'上传失败: {e}')
文本文件上传成功
# 上传JSON数据
json_data = {'name': '张三','age': 30,'city': '北京','hobbies': ['读书', '旅行', '编程']
}try:# 将JSON转换为字节流json_bytes = json.dumps(json_data, ensure_ascii=False, indent=2).encode('utf-8')json_stream = io.BytesIO(json_bytes)# 上传JSON数据client.put_object(bucket_name=bucket_name,object_name='data.json',data=json_stream,length=len(json_bytes),content_type='application/json')print('JSON文件上传成功')
except S3Error as e:print(f'上传失败: {e}')
JSON文件上传成功
# 创建并上传本地文件
local_file = 'sample.txt'# 创建本地文件
with open(local_file, 'w', encoding='utf-8') as f:f.write('这是一个本地文件\n')f.write('用于演示文件上传功能\n')f.write('MinIO对象存储服务')try:# 上传本地文件client.fput_object(bucket_name, 'uploaded_file.txt', local_file)print(f'本地文件 {local_file} 上传成功')# 删除本地文件os.remove(local_file)print(f'本地文件 {local_file} 已删除')
except S3Error as e:print(f'上传失败: {e}')
本地文件 sample.txt 上传成功
本地文件 sample.txt 已删除
4. 列出对象
# 列出存储桶中的所有对象
try:objects = client.list_objects(bucket_name)print(f'存储桶 "{bucket_name}" 中的对象:')for obj in objects:print(f' - {obj.object_name}')print(f' 大小: {obj.size} bytes')print(f' 修改时间: {obj.last_modified}')print(f' ETag: {obj.etag}')print()
except S3Error as e:print(f'列出对象失败: {e}')
存储桶 "demo-bucket" 中的对象:- data.json大小: 116 bytes修改时间: 2025-06-20 06:36:40.507000+00:00ETag: 4dde3c79af4dc9daec0d6732299fd978- test.txt大小: 85 bytes修改时间: 2025-06-20 06:35:24.762000+00:00ETag: 94f45a1da8526a43e60345353fa49828- uploaded_file.txt大小: 81 bytes修改时间: 2025-06-20 06:36:54.572000+00:00ETag: 3e067e47a4a226f953fae4e395f54fba
5. 下载和读取对象
# 读取文本文件内容
try:response = client.get_object(bucket_name, 'test.txt')content = response.read().decode('utf-8')response.close()response.release_conn()print('文本文件内容:')print(content)
except S3Error as e:print(f'读取文件失败: {e}')
文本文件内容:
这是一个测试文件的内容
包含中文字符
用于演示MinIO的基本功能
# 读取JSON文件内容
try:response = client.get_object(bucket_name, 'data.json')json_content = json.loads(response.read().decode('utf-8'))response.close()response.release_conn()print('JSON文件内容:')print(json.dumps(json_content, ensure_ascii=False, indent=2))
except S3Error as e:print(f'读取JSON文件失败: {e}')
JSON文件内容:
{"name": "张三","age": 30,"city": "北京","hobbies": ["读书","旅行","编程"]
}
# 下载文件到本地
download_file = 'downloaded_file.txt'try:client.fget_object(bucket_name, 'uploaded_file.txt', download_file)print(f'文件下载成功: {download_file}')# 读取下载的文件内容with open(download_file, 'r', encoding='utf-8') as f:content = f.read()print('下载文件内容:')print(content)# 清理下载的文件# os.remove(download_file)# print(f'本地文件 {download_file} 已删除')
except S3Error as e:print(f'下载文件失败: {e}')
文件下载成功: downloaded_file.txt
下载文件内容:
这是一个本地文件
用于演示文件上传功能
MinIO对象存储服务
6. 生成预签名URL
# 生成预签名下载URL(1小时有效)
try:url = client.presigned_get_object(bucket_name, 'test.txt', expires=timedelta(hours=1))print('预签名下载URL(1小时有效):')print(url)
except S3Error as e:print(f'生成预签名URL失败: {e}')
预签名下载URL(1小时有效):
http://localhost:9000/demo-bucket/test.txt?X-Amz-Algorithm=AWS4-HMAC-SHA256&X-Amz-Credential=minioadmin%2F20250620%2Fus-east-1%2Fs3%2Faws4_request&X-Amz-Date=20250620T064216Z&X-Amz-Expires=3600&X-Amz-SignedHeaders=host&X-Amz-Signature=8f9822f2994820097d6420df1414a24e1cedfa1f042299fbb1022fc5c4d8176e
# 生成预签名上传URL(30分钟有效)
try:upload_url = client.presigned_put_object(bucket_name, 'new_upload.txt', expires=timedelta(minutes=30))print('预签名上传URL(30分钟有效):')print(upload_url)
except S3Error as e:print(f'生成预签名上传URL失败: {e}')
预签名上传URL(30分钟有效):
http://localhost:9000/demo-bucket/new_upload.txt?X-Amz-Algorithm=AWS4-HMAC-SHA256&X-Amz-Credential=minioadmin%2F20250620%2Fus-east-1%2Fs3%2Faws4_request&X-Amz-Date=20250620T064649Z&X-Amz-Expires=1800&X-Amz-SignedHeaders=host&X-Amz-Signature=272f7c56aad68ea815a78732c3e4cc3cebd2dd3f397b3729c60787a3585a44f2
7. 对象元数据操作
# 获取对象信息
try:stat = client.stat_object(bucket_name, 'test.txt')print('对象信息:')print(f' 对象名: {stat.object_name}')print(f' 大小: {stat.size} bytes')print(f' ETag: {stat.etag}')print(f' 内容类型: {stat.content_type}')print(f' 最后修改时间: {stat.last_modified}')print(f' 元数据: {stat.metadata}')
except S3Error as e:print(f'获取对象信息失败: {e}')
对象信息:对象名: test.txt大小: 85 bytesETag: 94f45a1da8526a43e60345353fa49828内容类型: text/plain最后修改时间: 2025-06-20 06:35:24+00:00元数据: HTTPHeaderDict({'Accept-Ranges': 'bytes', 'Content-Length': '85', 'Content-Type': 'text/plain', 'ETag': '"94f45a1da8526a43e60345353fa49828"', 'Last-Modified': 'Fri, 20 Jun 2025 06:35:24 GMT', 'Server': 'MinIO', 'Strict-Transport-Security': 'max-age=31536000; includeSubDomains', 'Vary': 'Origin, Accept-Encoding', 'X-Amz-Id-2': 'dd9025bab4ad464b049177c95eb6ebf374d3b3fd1af9251148b658df7ac2e3e8', 'X-Amz-Request-Id': '184AAD60DBDA67DD', 'X-Content-Type-Options': 'nosniff', 'X-Xss-Protection': '1; mode=block', 'Date': 'Fri, 20 Jun 2025 06:47:16 GMT'})
# 上传带有自定义元数据的对象
import base64metadata = {'author': base64.b64encode('张三'.encode('utf-8')).decode('ascii'),'department': base64.b64encode('技术部'.encode('utf-8')).decode('ascii'),'project': base64.b64encode('MinIO演示'.encode('utf-8')).decode('ascii')
}try:data = '这是一个带有自定义元数据的文件'.encode('utf-8')data_stream = io.BytesIO(data)client.put_object(bucket_name=bucket_name,object_name='metadata_file.txt',data=data_stream,length=len(data),metadata=metadata)print('带元数据的文件上传成功')# 获取并显示元数据stat = client.stat_object(bucket_name, 'metadata_file.txt')print('自定义元数据:')for key, value in stat.metadata.items():print(f' {key}: {value}')
except S3Error as e:print(f'操作失败: {e}')
带元数据的文件上传成功
自定义元数据:Accept-Ranges: bytesContent-Length: 45Content-Type: application/octet-streamETag: "7a2d55727bfb2f17005600941e4ba45e"Last-Modified: Fri, 20 Jun 2025 06:49:54 GMTServer: MinIOStrict-Transport-Security: max-age=31536000; includeSubDomainsVary: OriginVary: Accept-EncodingX-Amz-Id-2: dd9025bab4ad464b049177c95eb6ebf374d3b3fd1af9251148b658df7ac2e3e8X-Amz-Request-Id: 184AAD859BF00043X-Content-Type-Options: nosniffX-Xss-Protection: 1; mode=blockx-amz-meta-author: 5byg5LiJx-amz-meta-department: 5oqA5pyv6YOox-amz-meta-project: TWluSU/mvJTnpLo=Date: Fri, 20 Jun 2025 06:49:54 GMT
8. 删除操作
# 删除单个对象
try:client.remove_object(bucket_name, 'metadata_file.txt')print('对象 metadata_file.txt 删除成功')
except S3Error as e:print(f'删除对象失败: {e}')
对象 metadata_file.txt 删除成功
# 批量删除对象
objects_to_delete = ['test.txt', 'data.json', 'uploaded_file.txt']
from minio.deleteobjects import DeleteObjecttry:# 创建删除对象的迭代器delete_object_list = [DeleteObject(obj) for obj in objects_to_delete]# 批量删除errors = client.remove_objects(bucket_name, delete_object_list)# 检查删除结果error_occurred = Falsefor error in errors:print(f'删除失败: {error}')error_occurred = Trueif not error_occurred:print('所有对象删除成功')
except S3Error as e:print(f'批量删除失败: {e}')
所有对象删除成功
# 删除存储桶(必须为空)
try:# 确认存储桶为空objects = list(client.list_objects(bucket_name))if objects:print(f'存储桶 {bucket_name} 不为空,包含 {len(objects)} 个对象')print('请先删除所有对象再删除存储桶')else:client.remove_bucket(bucket_name)print(f'存储桶 {bucket_name} 删除成功')
except S3Error as e:print(f'删除存储桶失败: {e}')
存储桶 demo-bucket 删除成功