当前位置: 首页 > news >正文

使用 TwelveLabs 的 Marengo 视频嵌入模型与 Amazon Bedrock 和 Elasticsearch

作者:来自 Elastic  Dave Erickson

创建一个小应用来搜索来自 TwelveLabs 的 Marengo 模型的视频嵌入

从向量搜索到强大的 REST API,Elasticsearch 为开发者提供了最全面的搜索工具包。深入 GitHub 上的示例笔记本来尝试新东西。你也可以立即开始免费试用或在本地运行 Elasticsearch。

这篇博客文章探讨了 TwelveLabs 为其视频嵌入模型 Marengo 推出的新的 Bedrock 集成,并演示了如何将生成的视频嵌入与 Elasticsearch 向量数据库一起使用。下面的演练详细说明了如何利用这一组合来搜索近期夏季大片的预告片。

动机

真实数据不仅仅是文本。在当今的 TikTok、工作视频通话和直播会议的世界里,内容越来越多地基于视频。在企业领域也是如此。无论是为了公共安全、合规审计,还是客户满意度,多模态 AI 都有潜力为知识应用解锁音频和视频。

然而,当我在大量内容中搜索时,我经常感到沮丧,因为除非我搜索的词语被捕捉到元数据里或在录制中被说出,否则我找不到视频。在移动应用时代的“它就是能用”的期望,已经在 AI 时代转变为 “它就是理解我的数据”。要实现这一点,AI 需要能够原生访问视频,而不是先将其转换为文本。

空间推理和视频理解这样的术语在视频和机器人领域都有应用。将视频理解加入到我们的工具集中,将是构建能够超越文本的 AI 系统的重要一步。

视频模型的超级能力

在使用专门的视频模型之前,我的常规方法是使用像 Whisper 这样的模型生成音频转录,再结合来自图像模型的密集向量嵌入,用于视频中提取的静态帧。这种方法在一些视频中效果不错,但当主题快速变化,或者关键信息实际上存在于拍摄对象的运动中时,就会失败。

简单来说,仅仅依赖图像模型会遗漏视频内容中大量的信息。

几个月前,我第一次通过 TwelveLabs 的 SaaS 平台接触到他们,它允许你上传视频进行一站式异步处理。他们有两个模型系列:

  • Marengo 是一个多模态嵌入模型,它不仅能从静态图像中捕捉含义,还能从动态视频片段中捕捉含义 —— 类似于文本嵌入模型能够从整段文字中捕捉含义,而不仅仅是单词。
  • Pegasus 是一个视频理解模型,可以用来生成字幕,或在片段上下文中回答类似 RAG 的问题。

虽然我喜欢这个 SaaS 服务的易用性和 API,但上传数据并不总是可行的。我的客户往往有数 TB 的敏感数据,不允许离开他们的控制范围。这就是 AWS Bedrock 发挥作用的地方。

TwelveLabs 已经将他们的核心模型提供在按需的 Bedrock 平台上,使源数据可以保存在我控制的 S3 存储桶中,并且只在安全计算模式下访问,而无需持久化在第三方系统中。这是个好消息,因为企业客户的视频用例通常包含商业机密、带有 PII 的记录,或其他受严格安全和隐私法规约束的信息。

我认为 Bedrock 集成能够解锁许多用例。

让我们搜索一些电影预告片

注意:Python 导入和通过 .env 文件处理环境变量的完整代码在 Python notebook 版本中。

  • 🐍完整代码示例的 Python notebook
  • 📖 Marengo + Bedrock API 文档在这里

依赖项

  • 你需要一个可以由你的 AWS ID 写入的 S3 存储桶

  • 你需要 Elasticsearch 的主机 URL 和 API key,可以是本地部署或 Elastic Cloud

  • 这段代码假设 Elasticsearch 版本为 8.17+ 或 9.0+

  • 一个很好的快速测试数据源是电影预告片。它们剪辑快速、视觉效果惊艳,并且通常包含高动作场景。你可以获取自己的 .mp4 文件,或者使用 https://github.com/yt-dlp/yt-dlp 从 YouTube 获取小规模的文件。

一旦文件在我们的本地文件系统中,我们需要将它们上传到我们的 S3 存储桶:

# Initialize AWS session
session = boto3.session.Session(aws_access_key_id=AWS_ACCESS_KEY_ID,aws_secret_access_key=AWS_SECRET_ACCESS_KEY,region_name=AWS_REGION
)######### 
## Validate S3 Configuration
#########aws_account_id = session.client('sts').get_caller_identity()["Account"]
print(f"AWS Account ID: {aws_account_id}")
s3_client = session.client('s3')# Verify bucket access
try:s3_client.head_bucket(Bucket=S3_BUCKET_NAME)print(f"✅ Successfully connected to S3 bucket: {S3_BUCKET_NAME}")
except Exception as e:print(f"❌ Error accessing S3 bucket: {e}")print("Please ensure the bucket exists and you have proper permissions.")#########
## Upload videos to S3, and make note of where we put them in data object
#########for video_object in video_objects:# Get the video file pathvideo_path = video_object.get_video_path()# Skip if video path is not setif not video_path:print(f"Skipping {video_object.get_video_string()} - No video path set")continue# Define S3 destination key - organize by platform and video ID# put this information in our data object for laters3_key = video_object.get_s3_key()if not s3_key:s3_key = f"{S3_VIDEOS_PATH}/{video_object.get_platform()}/{video_object.get_video_id()}/{os.path.basename(video_path)}"video_object.set_s3_key(s3_key)try:# Check if file already exists in S3try:s3_client.head_object(Bucket=S3_BUCKET_NAME, Key=s3_key)print(f"Video {video_object.get_video_string()} already exists in S3. Skipping upload.")continueexcept botocore.exceptions.ClientError as e:if e.response['Error']['Code'] == '404':# File doesn't exist in S3, proceed with uploadpasselse:# Some other error occurredraise e# Upload the video to S3print(f"Uploading {video_object.get_video_string()} to S3...")s3_client.upload_file(video_path, S3_BUCKET_NAME, s3_key)print(f"Successfully uploaded {video_object.get_video_string()} to S3")except Exception as e:print(f"Error uploading {video_object.get_video_string()} to S3: {str(e)}")

现在我们可以使用异步 Bedrock 调用来创建视频嵌入:

#########
## Use Bedrock hosted Twelve Labs models to create video embeddings
########## Helper function to wait for async embedding results
def wait_for_embedding_output(s3_bucket: str, s3_prefix: str, invocation_arn: str, verbose: bool = False) -> list:"""Wait for Bedrock async embedding task to complete and retrieve resultsArgs:s3_bucket (str): The S3 bucket names3_prefix (str): The S3 prefix for the embeddingsinvocation_arn (str): The ARN of the Bedrock async embedding taskReturns:list: A list of embedding dataRaises:Exception: If the embedding task fails or no output.json is found"""# Wait until task completesstatus = Nonewhile status not in ["Completed", "Failed", "Expired"]:response = bedrock_client.get_async_invoke(invocationArn=invocation_arn)status = response['status']if verbose:clear_output(wait=True)tqdm.tqdm.write(f"Embedding task status: {status}")time.sleep(5)if status != "Completed":raise Exception(f"Embedding task failed with status: {status}")# Retrieve the output from S3response = s3_client.list_objects_v2(Bucket=s3_bucket, Prefix=s3_prefix)for obj in response.get('Contents', []):if obj['Key'].endswith('output.json'):output_key = obj['Key']obj = s3_client.get_object(Bucket=s3_bucket, Key=output_key)content = obj['Body'].read().decode('utf-8')data = json.loads(content).get("data", [])return dataraise Exception("No output.json found in S3 prefix")# Create video embedding
def create_video_embedding(video_s3_uri: str, video_id: str) -> list:"""Create embeddings for video using Marengo on BedrockArgs:video_s3_uri (str): The S3 URI of the video to create an embedding forvideo_id (str): the identifying unique id of the video, to be used as a uuidReturns:list: A list of embedding data"""unique_id = video_ids3_output_prefix = f'{S3_EMBEDDINGS_PATH}/{S3_VIDEOS_PATH}/{unique_id}'response = bedrock_client.start_async_invoke(modelId=MARENGO_MODEL_ID,modelInput={"inputType": "video","mediaSource": {"s3Location": {"uri": video_s3_uri,"bucketOwner": aws_account_id}}},outputDataConfig={"s3OutputDataConfig": {"s3Uri": f's3://{S3_BUCKET_NAME}/{s3_output_prefix}'}})invocation_arn = response["invocationArn"]print(f"Video embedding task started: {invocation_arn}")# Wait for completion and get resultstry:embedding_data = wait_for_embedding_output(S3_BUCKET_NAME, s3_output_prefix, invocation_arn)except Exception as e:print(f"Error waiting for embedding output: {e}")return Nonereturn embedding_datadef check_existing_embedding(video_id: str) -> bool:"""Check S3 folder to see if this video already has an embedding created to avoid re-inference"""s3_output_prefix = f'{S3_EMBEDDINGS_PATH}/{S3_VIDEOS_PATH}/{video_id}'print(s3_output_prefix)try:# Check if any files exist at this prefixresponse = s3_client.list_objects_v2(Bucket=S3_BUCKET_NAME, Prefix=s3_output_prefix)if 'Contents' in response and any(obj['Key'].endswith('output.json') for obj in response.get('Contents', [])):print(f"Embedding {video_object.get_video_string()} already has an embedding. Skipping embedding creation.")# Find the output.json filefor obj in response.get('Contents', []):if obj['Key'].endswith('output.json'):output_key = obj['Key']# Get the object from S3obj = s3_client.get_object(Bucket=S3_BUCKET_NAME, Key=output_key)# Read the content and parse as JSONcontent = obj['Body'].read().decode('utf-8')embedding_data = json.loads(content).get("data", [])return embedding_dataelse:print(f"No existing embedding found for {video_object.get_video_string()}.")return Noneexcept botocore.exceptions.ClientError as e:if e.response['Error']['Code'] == '404':# File doesn't exist in S3, proceed with uploadprint("Did not find embedding in s3")return Noneelse:# Some other error occurredraise edef create_s3_uri(bucket_name: str, key: str)-> str:video_uri = f"s3://{bucket_name}/{key}"return video_uri## Generate the embeddings one at a time, use S3 as cache to prevent double embedding generations
for video_object in tqdm.tqdm(video_objects, desc="Processing videos"):s3_key = video_object.get_s3_key()video_id = video_object.get_video_id()video_uri = create_s3_uri(S3_BUCKET_NAME, s3_key)  retrieved_embeddings = check_existing_embedding(video_id)if retrieved_embeddings:   video_object.set_embeddings_list(retrieved_embeddings)else:video_embedding_data = create_video_embedding(video_uri, video_id)video_object.set_embeddings_list(video_embedding_data)

现在我们已经在本地内存中的视频对象里得到了嵌入,这里做一个快速打印测试,看看返回了什么:

video_embedding_data = video_objects[0].get_embeddings_list()##Preview Print
for i, embedding in enumerate(video_embedding_data[:3]):print(f"{i}")for key in embedding:if "embedding" == key:print(f"\t{key}: len {len(embedding[key])}")else:print(f"\t{key}: {embedding[key]}")

输出如下:

0embedding: len 1024embeddingOption: visual-textstartSec: 0.0endSec: 6.199999809265137
1embedding: len 1024embeddingOption: visual-textstartSec: 6.199999809265137endSec: 10.399999618530273
2embedding: len 1024embeddingOption: visual-textstartSec: 10.399999618530273endSec: 17.299999237060547

插入到 Elasticsearch

我们将把对象上传到 Elasticsearch —— 在我的例子中,大约有 155 个视频片段的元数据和嵌入。在如此小的规模下,使用平铺的 float32 索引进行暴力最近邻搜索是最有效且最经济的方法。不过,下面的示例演示了如何为 Elasticsearch 支持的大规模用例中的每个流行量化级别创建不同的索引。参见 Elastic 关于更好的二进制量化(BBQ)功能的这篇文章。

es = Elasticsearch(hosts=[ELASTICSEARCH_ENDPOINT],api_key=ELASTICSEARCH_API_KEY
)es_detail = es.info().body
if "version" in es_detail:identifier = es_detail['version']['build_flavor'] if 'build_flavor' in es_detail['version'] else es_detail['version']['number']print(f"✅ Successfully connected to Elasticsearch: {es_detail['version']['build_flavor']}")docs = []for video_object in video_objects:persist_object = video_object.get_video_object()embeddings = video_object.get_embeddings_list()for embedding in embeddings:if embedding["embeddingOption"] == "visual-image":# Create a copy of the persist object and add embedding detailsdoc = copy.deepcopy(persist_object)doc["embedding"] =  embedding["embedding"]doc["start_sec"] =  embedding["startSec"]doc["end_sec"] =    embedding["endSec"]docs.append(doc)index_varieties = ["flat", ## brute force float32"hnsw", ## float32 hnsw graph data structure"int8_hnsw", ## int8 hnsw graph data structure, default for lower dimension models"bbq_hnsw", ## Better Binary Qunatization HNSW, default for higher dimension models"bbq_flat" ## brute force + Better Binary Quantization 
]for index_variety in index_varieties:# Create an index for the movie trailer embeddings# Define mapping with proper settings for dense vector searchindex_name = f"twelvelabs-movie-trailer-{index_variety}"mappings = {"properties": {"url": {"type": "keyword"},"platform": {"type": "keyword"},"video_id": {"type": "keyword"},"title": {"type": "text", "analyzer": "standard"},"embedding": {"type": "dense_vector", "dims": 1024,"similarity": "cosine","index_options": {"type": index_variety}},"start_sec": {"type": "float"},"end_sec": {"type": "float"}}}# Check if index already existsif es.indices.exists(index=index_name):print(f"Deleting Index '{index_name}' and then sleeping for 2 seconds")es.indices.delete(index=index_name)sleep(2)# Create the indexes.indices.create(index=index_name, mappings=mappings)print(f"Index '{index_name}' created successfully")for index_variety in index_varieties:# Create an index for the movie trailer embeddings# Define mapping with proper settings for dense vector searchindex_name = f"twelvelabs-movie-trailer-{index_variety}"# Bulk insert docs into Elasticsearch indexprint(f"Indexing {len(docs)} documents into {index_name}...")# Create actions for bulk APIactions = []for doc in docs:actions.append({"_index": index_name,"_source": doc})# Perform bulk indexing with error handlingtry:success, failed = bulk(es, actions, chunk_size=100, max_retries=3, initial_backoff=2, max_backoff=60)print(f"\tSuccessfully indexed {success} documents into {index_name}")if failed:print(f"\tFailed to index {len(failed)} documents")except Exception as e:print(f"Error during bulk indexing: {e}")print(f"Completed indexing documents into {index_name}")

运行搜索

TwelveLabs 的 Bedrock 实现允许异步调用将文本生成向量嵌入到 S3。然而,下面我们将使用延迟更低的同步 invoke_model,直接为我们的搜索查询获取文本嵌入。(文本 Marengo 文档示例在这里。)

# Create text embedding
def create_text_embedding(text_query: str) -> list:text_model_id = TEXT_EMBEDDING_MODEL_ID text_model_input = {"inputType": "text","inputText": text_query}response = bedrock_client.invoke_model(modelId=text_model_id,body=json.dumps(text_model_input))response_body = json.loads(response['body'].read().decode('utf-8'))embedding_data = response_body.get("data", [])if embedding_data:return embedding_data[0]["embedding"]else:return Nonedef vector_query(index_name: str, text_query: str) -> dict:query_embedding = create_text_embedding(text_query)query = {"retriever": {"knn": {"field": "embedding","query_vector": query_embedding,"k": 10,"num_candidates": "25"}},"size": 10,"_source": False,"fields": ["title", "video_id", "start_sec"]}return es.search(index=index_name, body=query).bodytext_query = "Show me scenes with dinosaurs"
print (vector_query("twelvelabs-movie-trailer-flat", text_query))

返回的 JSON 就是我们的搜索结果!但为了创建一个更易用的测试界面,我们可以使用一些快速的 iPython 小部件:

from ipywidgets import widgets, HTML as WHTML, HBox, Layout
from IPython.display import displaydef display_search_results_html(query):results = vector_query("twelvelabs-movie-trailer-flat", query)hits = results.get('hits', {}).get('hits', [])if not hits:return "<p>No results found</p>"items = []for hit in hits:fields = hit.get('fields', {})title = fields.get('title', ['No Title'])[0]score = hit.get('_score', 0)video_id = fields.get('video_id', [''])[0]start_sec = fields.get('start_sec', [0])[0]url = 
f"https://www.youtube.com/watch?v={video_id}&t={int(start_sec)}s"items.append(f'<li><a href="{url}" target="_blank">{title} (Start: {float(start_sec):.1f}s)</a> <span>Score: {score}</span></li>')return "<h3>Search Results:</h3><ul>" + "\n".join(items) + "</ul>"def search_videos():search_input = widgets.Text(value='',placeholder='Enter your search query…',description='Search:',layout=Layout(width='70%'))search_button = widgets.Button(description='Search Videos',button_style='primary',layout=Layout(width='20%'))# Use a single HTML widget for output; update its .value to avoid double-renderingresults_box = WHTML(value="")def on_button_click(_):q = search_input.value.strip()if not q:results_box.value = "<p>Please enter a search query</p>"returnresults_box.value = "<p>Searching…</p>"results_box.value = display_search_results_html(q)# Avoid multiple handler attachments if the cell is re-runtry:search_button._click_handlers.callbacks.clear()except Exception:passsearch_button.on_click(on_button_click)display(HBox([search_input, search_button]))display(results_box)# Call this to create the UI
search_videos()

让我们在预告片中搜索一些视觉内容。

比较量化方法

较新的 Elasticsearch 版本默认对 1024 维密集向量使用 bbq_hnsw,它在通过对原始 float32 在过采样候选窗口中重评分来保持准确性的同时,提供最佳的速度和可扩展性。

为了通过简单的 UI 比较量化对搜索结果的影响,可以查看一个名为 Relevance Studio 的新项目。

如果我们在 Kibana 中检查索引管理,或使用 curl 执行 GET /_cat/indices ,我们会看到每个选项的存储大小大致相同。乍一看,这可能会让人困惑,但请记住存储大小大致相等,因为索引包含用于重评分的向量 float32 表示。在 bbq_hnsw 中,图中只使用量化的二进制向量表示,从而在索引和搜索时节省成本并提高性能。

最后的想法

对于单个 1024 维密集向量来说,这些结果令人印象深刻。我很期待尝试将 Marengo 模型的强大功能与混合搜索方法结合,包括音频转录,以及 Elasticsearch 的地理空间过滤和 RBAC/ABAC 访问控制。你希望 AI 对哪些视频了解一切?

原文:https://www.elastic.co/search-labs/blog/twelvelabs-marengo-video-embedding-amazon-bedrock

http://www.dtcms.com/a/392357.html

相关文章:

  • Windows 11 下 Notepad++ 等应用无法启动问题排查修复
  • 面向口齿不清者的语音识别新突破:用大模型拯救“听不懂”的声音
  • 服装企业优化信息化管理系统的最佳软件选择
  • 多阶段构建镜像
  • 推荐一个开源服务器一键自动重装系统脚本:reinstall
  • 【C++进阶】C++11 的新特性 | lambda | 包装器
  • 2.【QT 5.12.12 安装 Windows 版本】
  • Rust_2025:阶段1:day6.3 macro
  • 【Qt开发】输入类控件(一)-> QLineEdit
  • python10——组合数据类型(集合)
  • 分布式专题——14 RabbitMQ之集群实战
  • WEEX唯客的多维度安全守护
  • 深度学习环境配置
  • 生鲜速递:HTTP 的缓存控制
  • ​​Snipaste 2.10.1.dmg截图安装教程|Mac电脑拖拽安装详细步骤​
  • 10.1.1 使用python完成第一个遗传算法
  • C语言内存精讲系列(二十九):C 语言堆区内存进阶与动态内存实战
  • 6G量子通信融合:破解未来网络的安全与效能密码
  • C#练习题——泛型实现单例模式和增删改查
  • 网关登录校验
  • Kubernetes Fluent Bit Pod Pending 问题解决方案
  • 我爱学算法之—— 位运算(中)
  • 什么是差分信号
  • 相机标定(Camera Calibration)原理及步骤:从 “像素模糊” 到 “毫米精准” 的关键一步
  • 用 【C# + WinUI3 + 图像动画】 来理解:高数 - 函数 - 初等函数
  • ​​[硬件电路-296]:单刀双掷(SPDT)模拟开关
  • 【MAVLink】MAVSDK编程入门、基本概念
  • MAC-基于反射的枚举工具类优化
  • 防御性编程:编程界的‘安全驾驶‘指南
  • Qt绘图方式有哪些