Owen大规模文本嵌入生成
每周跟踪AI热点新闻动向和震撼发展 想要探索生成式人工智能的前沿进展吗?订阅我们的简报,深入解析最新的技术突破、实际应用案例和未来的趋势。与全球数同行一同,从行业内部的深度分析和实用指南中受益。不要错过这个机会,成为AI领域的领跑者。点击订阅,与未来同行! 订阅:https://rengongzhineng.io/
本文所展示的是一个先进的文本嵌入生成处理流程,适用于大规模文本数据处理任务。在示例中,使用了多种当今最先进的嵌入模型:
- all-MiniLM-L6-v2:Sentence Transformers 官方文档中默认使用的模型,常见于教学与入门教程中;
- gemini-embedding-001:当前在多语言 MTEB 基准测试中表现最优的模型,需通过 Gemini API 获取使用权限;
- Seed1.6-Embedding:当前在中文 MTEB 排行榜中得分最高的模型,需通过火山引擎 Volcengine API 获取访问权限。
对于开源模型,可以直接通过修改代码中的 EMBEDDING_MODEL_NAME
常量,轻松切换所用模型。
接下来是创建一个用户自定义函数(UDF)来从切分后的文本中生成嵌入向量的实现方式:
# 定义嵌入向量的返回类型
embedding_type = daft.DataType.embedding(daft.DataType.float32(), ENCODING_DIM)@daft.udf(return_dtype=embedding_type,concurrency=NUM_GPU_NODES,num_gpus=1,batch_size=BATCH_SIZE
)
class EncodingUDF:def __init__(self):from sentence_transformers import SentenceTransformerdevice = 'cuda' if torch.cuda.is_available() else 'cpu'self.model = SentenceTransformer(EMBEDDING_MODEL_NAME, device=device)self.model.compile()def __call__(self, text_col):embeddings = self.model.encode(text_col.to_pylist(),batch_size=SENTENCE_TRANSFORMER_BATCH_SIZE,convert_to_tensor=True,torch_dtype=torch.bfloat16,)return embeddings.cpu().numpy()
该 UDF 的功能包括:
- 自动检测并在可用时将模型加载至 GPU;
- 使用
bfloat16
精度以减少显存占用; - 以批量处理方式对文本进行嵌入生成(每批大小为 128),以实现最佳 GPU 利用率;
- 返回的结果为 NumPy 数组,兼容 Daft 数据框架。
第三步:配置分布式处理环境
尽管脚本可在本地运行,但若需在集群上执行完整流程,可以参考官方关于集群扩展的指南。在示例中,整个流程被部署至一个包含 8 个 g5.2xlarge 节点的 Ray 集群,每个节点配备一个 A10G GPU。为使 Daft 使用 Ray 集群进行调度,需要进行如下配置:
# 配置 Daft 使用 Ray 进行任务调度
daft.context.set_runner_ray()# 配置 S3 读取权限
daft.set_planning_config(default_io_config=daft.io.IOConfig(s3=daft.io.S3Config.from_env())
)
第四步:执行完整数据处理流程
以下为完整的数据管道执行代码:
(daft.read_parquet("s3://desmond-demo/text-embedding-dataset.parquet").with_column("sentences", ChunkingUDF(col("text"))).explode("sentences").with_column("text", col("sentences")["text"]).with_column("chunk_id", col("sentences")["chunk_id"]).exclude("sentences").with_column("embedding", EncodingUDF(col("text"))).with_column("id",col("url").str.right(50) + "-" + col("chunk_id").cast(daft.DataType.string())).select("id", "url", "language", "source", "text", "embedding").write_turbopuffer(namespace="desmond-scale-experiment6",region="aws-us-west-2",id_column="id",vector_column="embedding",distance_metric="cosine_distance")
)
数据管道的各个步骤如下:
- 读取数据:从 S3 加载 Parquet 文件,使用较大的 chunk size 提高读取效率;
- 文本切分:使用句子切分 UDF 将长文本划分为较小片段;
- 展开句子:将句子列表展开为独立行,便于后续处理;
- 提取字段:从句子结构中提取文本内容及对应的 chunk_id;
- 生成嵌入:将文本输入嵌入 UDF,生成高维向量;
- 创建唯一 ID:根据 URL 后缀和 chunk_id 拼接生成唯一标识;
- 选择输出字段:仅保留所需字段,便于后续处理;
- 写入向量数据库:将处理后的数据及嵌入写入 Turbopuffer 向量数据库。
若一切配置正确,脚本执行时将实现网络 I/O、CPU 和 GPU 工作的并行流水线化,从而获得较高的 GPU 使用率。
自定义与优化建议:
- 调整批量大小:增加
SENTENCE_TRANSFORMER_BATCH_SIZE
可提升吞吐能力,减少则可降低显存占用; - 扩展工作节点:可依据集群规模和每节点可用 CPU 核心数,修改
NUM_GPU_NODES
与CHUNKING_PARALLELISM
; - 更换模型:替换
EMBEDDING_MODEL_NAME
可快速切换其他 SentenceTransformer 模型; - 更改文本切分逻辑:可根据任务需求,自定义
ChunkingUDF
中的切分策略; - 替换向量数据库:可根据实际情况更换为如 Lance、Pinecone、Chroma 等向量数据库解决方案。
性能注意事项:
- GPU 显存:应监测显存使用状况,必要时调整批量大小。若 GPU 无法分配足够显存,或超过模型最大输入序列长度,则应适当减少
SENTENCE_TRANSFORMER_BATCH_SIZE
; - 模型加载策略:UDF 在每个工作节点上仅初始化一次模型,因此模型加载时间会被均摊;
- 量化策略:推荐使用
bfloat16
或float16
精度模型,可进一步降低显存使用并提升处理效率。
此流程可高效处理百万级文本数据,并可自动扩展至可用的计算资源,适用于生产级别的文本向量化应用场景。