当前位置: 首页 > news >正文

bws-rs:Rust 编写的 S3 协议网关框架,支持灵活后端接入

bws-rs:Rust 编写的 S3 协议网关框架,支持灵活后端接入

bws-rs介绍

bws-rs 是一个用 Rust 编写的轻量级 S3 协议服务端网关框架,旨在帮助开发者快速构建兼容 AWS S3 协议 的对象存储服务。该框架支持 S3 V4 签名校验,集成 Axum 作为 Web 框架,所有协议校验逻辑通过实现对应的 trait 并注册为 axum::Extension 实现非侵入式扩展,具有良好的可维护性与可插拔性。

bws-rs 可作为前端网关挂载在你已有的文件系统、对象存储系统甚至缓存引擎之前,为其提供标准化的 S3 协议兼容层,支持与 AWS CLI、MinIO Client 等主流 S3 SDK 的交互。

✅ 已支持的功能
📁 S3 协议支持列表

  • PutObject(上传对象)

  • GetObject(获取对象)

  • HeadObject(获取对象元信息)

  • DeleteObject(删除对象)

  • CreateBucket(创建桶)

  • HeadBucket(桶存在性检查)

  • ListBucket(列举所有桶)

  • DeleteBucket(删除桶)

  • GetBucketLocation(获取桶区域)

  • MultipartUpload(分片上传)

  • Range Get(部分下载)

  • Get/Put Object ACL(访问控制列表)

  • Get/Put Object Metadata(对象元数据)

  • Put Object Tagging(对象标签)

✅ MinIO SDK 兼容性验证
使用 MinIO Go SDK 进行功能验证,支持以下操作:

  • MakeBucket

  • DeleteBucket

  • ListBucket

  • ListObject

  • PutObject

  • DeleteObject

  • BucketExists

在项目中使用bws-rs: cargo add bws-rs

实现bws_rs::service::s3下对应的trait以支持对应的s3 功能

  • HeadHandler: 对应 s3 head object ,head bucket
  • GetObjectHandler: 对应s3 GetObject
  • PutObjectHandler: 对应s3 PutObject
  • DeleteObjectHandler: 对应s3 DeleteObject
  • ListObjectHandler: 对应s3 ListObject
  • CreateBucketHandler: 对应的s3 create bucket
  • ListBucketHandler: 对应s3 list bucket
  • DeleteBucketHandler: 对应s3 delete bucket
  • GetBucketLocationHandler: 对应s3 get bucket location
  • MultiUploadObjectHandler: 对应s3 MultiUpload系列操作

aceeskey 仓库需要实现bws_rs::authorization::AccesskeyStore 来提供对应accesskey的secretkey

使用示范

    use std::sync::Arc;use tokio::io::AsyncReadExt;#[derive(Default)]struct Target {}use crate::service::s3::*;impl CreateBucketHandler for Target {fn handle<'a>(&'a self,_opt: &'a CreateBucketOption,_bucket: &'a str,) -> std::pin::Pin<Box<dyn 'a + Send + std::future::Future<Output = Result<(), String>>>>{Box::pin(async move {log::info!("create bucket {_bucket}");Ok(())})}}impl ListBucketHandler for Target {fn handle<'a>(&'a self,_opt: &'a ListBucketsOption,) -> std::pin::Pin<Box<dyn 'a + Send + std::future::Future<Output = Result<Vec<Bucket>, String>>>,> {Box::pin(async move {let datetime = chrono::Utc::now().to_rfc3339();Ok(vec![Bucket {name: "test1".to_string(),creation_date: datetime,bucket_region: "us-east-1".to_string(),}])})}}impl HeadHandler for Target {fn lookup<'a>(&self,_bucket: &str,_object: &str,) -> std::pin::Pin<Box<dyn 'a+ Send+ Sync+ std::future::Future<Output = Result<Option<HeadObjectResult>, Error>>,>,> {Box::pin(async move {let mut ret: HeadObjectResult = Default::default();ret.checksum_sha256 = Some("2cf24dba5fb0a30e26e83b2ac5b9e29e1b161e5c1fa7425e73043362938b9824".to_string(),);ret.content_length = Some(5);ret.etag = Some("5d41402abc4b2a76b9719d911017c592".to_string());ret.last_modified = Some(chrono::Utc::now().format("%a, %d %b %Y %H:%M:%S GMT").to_string(),);Ok(Some(ret))})}}impl PutObjectHandler for Target {fn handle<'a>(&'a self,opt: &PutObjectOption,bucket: &'a str,object: &'a str,body: &'a mut (dyn tokio::io::AsyncRead + Unpin + Send),) -> std::pin::Pin<Box<dyn 'a + Send + std::future::Future<Output = Result<(), String>>>>{Box::pin(async move {log::info!("put bucket {bucket} object {object}");let mut buff = vec![];match body.read_to_end(&mut buff).await {Ok(size) => {log::info!("get {}", unsafe {std::str::from_utf8_unchecked(&buff[..size])});}Err(err) => {log::error!("read error {err}");}}Ok(())})}}impl DeleteBucketHandler for Target {fn handle<'a>(&'a self,_opt: &'a DeleteBucketOption,_bucket: &'a str,) -> std::pin::Pin<Box<dyn 'a + Send + std::future::Future<Output = Result<(), String>>>>{Box::pin(async move {log::info!("delete bucket {_bucket}");Ok(())})}}impl DeleteObjectHandler for Target {fn handle<'a>(&'a self,_opt: &'a DeleteObjectOption,_object: &'a str,) -> std::pin::Pin<Box<dyn 'a + Send + std::future::Future<Output = Result<(), String>>>>{Box::pin(async move {log::info!("delete object {_object}");Ok(())})}}impl crate::authorization::AccesskeyStore for Target {fn get<'a>(&'a self,_accesskey: &'a str,) -> std::pin::Pin<Box<dyn 'a + Send + Sync + std::future::Future<Output = Result<Option<String>, String>>,>,> {Box::pin(async move { Ok(Some(format!("{_accesskey}12345"))) })}}impl crate::service::s3::GetObjectHandler for Target {fn handle<'a>(&'a self,bucket: &str,object: &str,opt: crate::service::s3::GetObjectOption,mut out: tokio::sync::Mutex<std::pin::Pin<std::boxed::Box<(dyn crate::utils::io::PollWrite + Send + Unpin + 'a)>,>,>,) -> std::pin::Pin<Box<dyn 'a + Send + std::future::Future<Output = Result<(), String>>>>{Box::pin(async move {let mut l = out.lock().await;let _ = l.poll_write(b"hello").await.map_err(|err| {log::error!("write error {err}");});Ok(())})}}impl crate::service::s3::GetBucketLocationHandler for Target {}impl MultiUploadObjectHandler for Target {fn handle_create_session<'a>(&'a self,bucket: &'a str,key: &'a str,) -> std::pin::Pin<Box<dyn 'a + Send + std::future::Future<Output = Result<String, ()>>>>{Box::pin(async move { Ok("ffffff".to_string()) })}fn handle_upload_part<'a>(&'a self,bucket: &'a str,key: &'a str,upload_id: &'a str,part_number: u32,body: &'a mut (dyn tokio::io::AsyncRead + Unpin + Send),) -> std::pin::Pin<Box<dyn 'a + Send + std::future::Future<Output = Result<String, ()>>>>{Box::pin(async move {let mut buff = Vec::new();let size = body.read_to_end(&mut buff).await.map_err(|err| log::error!("read body error {err}"))?;println!("upload part upload_id={upload_id} part_number={part_number} bucket={bucket} key={key}\n{}",unsafe { std::str::from_boxed_utf8_unchecked((&buff[..size]).into()) });Ok("5d41402abc4b2a76b9719d911017c592".to_string())})}fn handle_complete<'a>(&'a self,bucket: &'a str,key: &'a str,upload_id: &'a str,//(etag,part number)data: &'a [(&'a str, u32)],opts: MultiUploadObjectCompleteOption,) -> std::pin::Pin<Box<dyn 'a + Send + std::future::Future<Output = Result<String, ()>>>>{Box::pin(async move { Ok("69a329523ce1ec88bf63061863d9cb14".to_string()) })}fn handle_abort<'a>(&'a self,bucket: &'a str,key: &'a str,upload_id: &'a str,) -> std::pin::Pin<Box<dyn 'a + Send + std::future::Future<Output = Result<(), ()>>>>{todo!()}}#[tokio::test]async fn test_server() -> Result<(), Box<dyn std::error::Error>> {let _ = tokio::fs::create_dir_all(".sys_bws").await;env_logger::builder().filter_level(log::LevelFilter::Info).init();let target = Arc::new(Target::default());let r = axum::Router::new().layer(axum::middleware::from_fn(super::handle_fn)).layer(axum::middleware::from_fn(super::handle_authorization_middleware,)).layer(axum::Extension(target.clone() as Arc<dyn PutObjectHandler + Send + Sync>)).layer(axum::Extension(target.clone() as Arc<dyn HeadHandler + Send + Sync>)).layer(axum::Extension(target.clone() as Arc<dyn ListBucketHandler + Send + Sync>)).layer(axum::Extension(target.clone() as Arc<dyn CreateBucketHandler + Send + Sync>)).layer(axum::Extension(target.clone() as Arc<dyn DeleteBucketHandler + Send + Sync>)).layer(axum::Extension(target.clone() as Arc<dyn DeleteObjectHandler + Send + Sync>)).layer(axum::Extension(target.clone() as Arc<dyn crate::authorization::AccesskeyStore + Send + Sync>)).layer(axum::Extension(target.clone() as Arc<dyn GetObjectHandler + Send + Sync>)).layer(axum::Extension(target.clone() as Arc<dyn GetBucketLocationHandler + Send + Sync>)).layer(axum::Extension(target.clone() as Arc<dyn MultiUploadObjectHandler + Send + Sync>));let l = tokio::net::TcpListener::bind("0.0.0.0:9900").await?;axum::serve(l, r).await?;Ok(())}

golang 客户端

package testsimport ("context""io""os""testing""github.com/minio/minio-go/v7""github.com/minio/minio-go/v7/pkg/credentials"
)func TestCreateBucket(t *testing.T) {creds, err := minio.New("127.0.0.1:9900", &minio.Options{Secure: false, Creds: credentials.NewStaticV4("root", "root12345", ""),Region: "us-east-1",})if err != nil {t.Fatal(err)}_, err = creds.BucketExists(context.Background(), "test")if err != nil {t.Fatal(err)}err = creds.MakeBucket(context.Background(), "itest", minio.MakeBucketOptions{})if err != nil {t.Fatal(err)}bkts, err := creds.ListBuckets(context.Background())if err != nil {t.Fatal(err)}t.Log(bkts)err = creds.RemoveBucket(context.Background(), "test")if err != nil {t.Fatal(err)}err = creds.RemoveObject(context.Background(), "test", "test", minio.RemoveObjectOptions{})if err != nil {t.Fatal(err)}err = os.WriteFile("test.txt", []byte("hello"), 0o644)if err != nil {t.Fatal(err)}fd, err := os.OpenFile("test.txt", os.O_RDONLY, 0)if err != nil {t.Fatal(err)}defer fd.Close()_, err = creds.PutObject(context.Background(), "test", "hello/world", fd, 5, minio.PutObjectOptions{})if err != nil {t.Fatal(err)}resp, err := creds.GetObject(context.Background(), "test", "test", minio.GetObjectOptions{})if err != nil {t.Fatal(err)}content, err := io.ReadAll(resp)if err != nil {t.Fatal(err)}if string(content) != "hello" {t.Fatal("expect hello got [" + string(content) + "]")}
}

s3 multipart 验证

package testsimport ("context""crypto/tls""fmt""log""net/http""os""testing""github.com/aws/aws-sdk-go-v2/aws""github.com/aws/aws-sdk-go-v2/config""github.com/aws/aws-sdk-go-v2/credentials""github.com/aws/aws-sdk-go-v2/service/s3""github.com/aws/aws-sdk-go-v2/service/s3/types"
)func TestS3Sdk(t *testing.T) {var (host      = "127.0.0.1"port      = 9900accesskey = "root"secretkey = "root12345"region    = "us-east-1")customResolver := aws.EndpointResolverWithOptionsFunc(func(service, region string, options ...interface{}) (aws.Endpoint, error) {if service == s3.ServiceID {return aws.Endpoint{URL:           fmt.Sprintf("http://%s:%d", host, port),SigningRegion: "us-east-1",}, nil}return aws.Endpoint{}, &aws.EndpointNotFoundError{}})// 加载 AWS 配置,指定自定义端点解析器cfg, err := config.LoadDefaultConfig(context.TODO(),config.WithEndpointResolverWithOptions(customResolver),config.WithHTTPClient(&http.Client{Transport: &http.Transport{TLSClientConfig: &tls.Config{InsecureSkipVerify: true},},}),config.WithRegion(region),config.WithCredentialsProvider(credentials.NewStaticCredentialsProvider(accesskey, secretkey, "")),)if err != nil {log.Fatalf("无法加载 AWS 配置: %v", err)}// 创建 S3 客户端cli := s3.NewFromConfig(cfg, func(o *s3.Options) {o.UsePathStyle = true})var (bucket = "itest"key    = "test.txt")fd, err := os.OpenFile("./test.txt", os.O_RDONLY, 0)if err != nil {t.Fatal(err)}defer fd.Close()out, err := cli.CreateMultipartUpload(context.Background(), &s3.CreateMultipartUploadInput{Bucket: &bucket,Key:    &key,})if err != nil {t.Fatal(err)}var upNo int32 = 1resp, err := cli.UploadPart(context.Background(), &s3.UploadPartInput{Bucket: &bucket, Key: &key, PartNumber: &upNo, UploadId: out.UploadId, Body: fd,})if err != nil {t.Fatal(err)}_, err = cli.CompleteMultipartUpload(context.Background(), &s3.CompleteMultipartUploadInput{Bucket: &bucket, Key: &key, UploadId: out.UploadId, MultipartUpload: &types.CompletedMultipartUpload{Parts: []types.CompletedPart{{ETag: resp.ETag, PartNumber: &upNo,},},},})if err != nil {t.Fatal(err)}
}
http://www.dtcms.com/a/287190.html

相关文章:

  • 【Linux】AKHQ实现kafka可视化
  • 电力载波通信技术全景解析:从历史演进到智能电网创新应用
  • 【Linux服务器】-MySQL数据库参数调优
  • 打造高效订单处理!ZKmall开源商城的统一履约中心架构解析
  • 本地部署开源的 AI 驱动的搜索引擎 Perplexica 并实现外部访问
  • 【黑马SpringCloud微服务开发与实战】(三)微服务01
  • 快速上手AI整合包!GPT-SoVITS-v2打包教程,解锁AIStarter应用市场潜力
  • freertos任务调度关键函数理解
  • 笔试强训——第一周
  • 标准文件和系统文件I/O
  • 鸿蒙与web混合开发双向通信
  • 云服务器磁盘IO性能优化的测试与配置方法
  • docker|Linux|以centos基础镜像为基础制作nmap专用镜像(镜像瘦身计划)
  • SQL基础操作指南:约束、表设计与复杂查询
  • 【RK3576】【Android14】USB开发调试
  • install_arm_docker.sh
  • 【Qt开发】Qt的背景介绍(三)-> 认识Qt Creator
  • python网络爬虫之selenium库(二)
  • Android回调机制入门
  • 工程图矢量化 笔记 | potrace ezdxf svgpathtools | png转svg保存dxf用matplotlib画出来
  • 基于springboot的考研互助小程序
  • Redis的持久化-RDB
  • 【橘子分布式】gRPC(编程篇-中)
  • 基于开放API接口采集的定制开发开源AI智能名片S2B2C商城小程序数据整合与增长策略研究
  • 通过phpStudy,用nginx部署vue项目,支持部署多套vue项目(详细教程)
  • LLM 的Top-P参数 是在LLM中的每一层发挥作用,还是最后一层?
  • CSS-in-JSVue的解决方案
  • 将HTML+JS+CSS数独游戏包装为安卓App
  • 前端-CSS-day6
  • 从零开始学Tailwind CSS : 颜色配置原理与实践