instructor库实现batch 请求
目录
- 代码
- 代码解释
- 示例
- 类似例子
代码
import json
import instructor
import asyncio
from openai import AsyncOpenAI
from pydantic import BaseModel, Field, field_validator
from enum import Enum
client = AsyncOpenAI(api_key = "your api key",
base_url="https://dashscope.aliyuncs.com/compatible-mode/v1")
client = instructor.from_openai(client, mode=instructor.Mode.TOOLS)
sem = asyncio.Semaphore(5)
class QuestionType(Enum):
CONTACT = "CONTACT"
TIMELINE_QUERY = "TIMELINE_QUERY"
DOCUMENT_SEARCH = "DOCUMENT_SEARCH"
COMPARE_CONTRAST = "COMPARE_CONTRAST"
EMAIL = "EMAIL"
PHOTOS = "PHOTOS"
SUMMARY = "SUMMARY"
# You can add more instructions and examples in the description
# or you can put it in the prompt in `messages=[...]`
class QuestionClassification(BaseModel):
"""
Predict the type of question that is being asked.
Here are some tips on how to predict the question type:
CONTACT: Searches for some contact information.
TIMELINE_QUERY: "When did something happen?
DOCUMENT_SEARCH: "Find me a document"
COMPARE_CONTRAST: "Compare and contrast two things"
EMAIL: "Find me an email, search for an email"
PHOTOS: "Find me a photo, search for a photo"
SUMMARY: "Summarize a large amount of data"
"""
# If you want only one classification, just change it to
# `classification: QuestionType` rather than `classifications: List[QuestionType]``
chain_of_thought: str = Field(
..., description="The chain of thought that led to the classification"
)
classification: list[QuestionType] = Field(
description=f"An accuracy and correct prediction predicted class of question. Only allowed types: {[t.value for t in QuestionType]}, should be used",
)
@field_validator("classification", mode="before")
def validate_classification(cls, v):
# sometimes the API returns a single value, just make sure it's a list
if not isinstance(v, list):
v = [v]
return v
async def classify(data: str):
async with sem: # some simple rate limiting
return data, await client.chat.completions.create(
model="qwen-turbo",
response_model=QuestionClassification,
max_retries=2,
messages=[
{
"role": "user",
"content": f"Classify the following question: {data}",
},
],
)
async def main(questions: list[str], *, path_to_jsonl: str = None):
tasks = [classify(question) for question in questions]
for task in asyncio.as_completed(tasks):
question, label = await task
resp = {
"question": question,
"classification": [c.value for c in label.classification],
}
print(resp)
if path_to_jsonl:
with open(path_to_jsonl, "a") as f:
json_dump = json.dumps(resp)
f.write(json_dump + "\n")
代码解释
- 初始化设置:
client = AsyncOpenAI(api_key = "...", base_url="...")
client = instructor.from_openai(client, mode=instructor.Mode.TOOLS)
sem = asyncio.Semaphore(5) # 限制并发请求数为5
- 问题类型定义:
class QuestionType(Enum):
CONTACT = "CONTACT"
TIMELINE_QUERY = "TIMELINE_QUERY"
# ...
使用枚举定义了7种不同的问题类型,便于分类和管理。
- 分类模型定义:
class QuestionClassification(BaseModel):
chain_of_thought: str = Field(...) # 分类推理过程
classification: list[QuestionType] # 分类结果列表
- 使用 Pydantic 模型定义输出格式
- 包含推理过程和分类结果两个字段
- 通过
field_validator
确保分类结果始终为列表
- 分类函数:
async def classify(data: str):
async with sem: # 使用信号量控制并发
return data, await client.chat.completions.create(...)
- 异步处理单个问题的分类
- 使用信号量控制并发请求数
- 返回原始问题和分类结果
- 主函数:
async def main(questions: list[str], *, path_to_jsonl: str = None):
tasks = [classify(question) for question in questions] # 创建任务列表
for task in asyncio.as_completed(tasks): # 并发执行
question, label = await task
resp = {
"question": question,
"classification": [c.value for c in label.classification],
}
print(resp) # 打印结果
if path_to_jsonl: # 可选的结果保存
with open(path_to_jsonl, "a") as f:
json_dump = json.dumps(resp)
f.write(json_dump + "\n")
- 并发处理多个问题
- 使用
asyncio.as_completed
处理完成的任务 - 支持将结果保存到 JSONL 文件
这段代码的主要特点:
- 使用 instructor 实现结构化输出
- 异步并发处理提高效率
- 使用信号量控制并发数
- 支持批量处理和结果保存
- 类型安全的数据处理
示例
import asyncio
questions = [
"What was that ai app that i saw on the news the other day?",
"Can you find the trainline booking email?",
"What was the book I saw on amazon yesturday?",
"Can you speak german?",
"Do you have access to the meeting transcripts?",
"what are the recent sites I visited?",
"what did I do on Monday?",
"Tell me about todays meeting and how it relates to the email on Monday",
]
loop = asyncio.get_running_loop()
import nest_asyncio
nest_asyncio.apply()
loop.run_until_complete(main(questions))
{'question': 'Can you speak german?', 'classification': ['COMPARE_CONTRAST']}
{'question': 'Can you find the trainline booking email?', 'classification': ['EMAIL']}
{'question': 'What was that ai app that i saw on the news the other day?', 'classification': ['DOCUMENT_SEARCH']}
{'question': 'what are the recent sites I visited?', 'classification': ['DOCUMENT_SEARCH']}
{'question': 'What was the book I saw on amazon yesturday?', 'classification': ['DOCUMENT_SEARCH']}
{'question': 'Do you have access to the meeting transcripts?', 'classification': ['DOCUMENT_SEARCH']}
{'question': 'what did I do on Monday?', 'classification': ['TIMELINE_QUERY']}
{'question': 'Tell me about todays meeting and how it relates to the email on Monday', 'classification': ['COMPARE_CONTRAST']}
类似例子
import json
import instructor
import asyncio
from openai import AsyncOpenAI
from pydantic import BaseModel, Field, field_validator
from enum import Enum
# 初始化客户端
client = AsyncOpenAI(api_key = "your api key",
base_url="https://dashscope.aliyuncs.com/compatible-mode/v1")
client = instructor.from_openai(client, mode=instructor.Mode.TOOLS)
sem = asyncio.Semaphore(5) # 并发控制
# 定义任务类型枚举
class TaskType(Enum):
DEBUGGING = "DEBUGGING"
OPTIMIZATION = "OPTIMIZATION"
IMPLEMENTATION = "IMPLEMENTATION"
DOCUMENTATION = "DOCUMENTATION"
CODE_REVIEW = "CODE_REVIEW"
TESTING = "TESTING"
RESEARCH = "RESEARCH"
# 任务分类模型
class TaskClassification(BaseModel):
"""
预测编程任务的类型
示例类型说明:
DEBUGGING: 修复代码中的错误
OPTIMIZATION: 提高代码性能
IMPLEMENTATION: 实现新功能
DOCUMENTATION: 编写文档
CODE_REVIEW: 代码审查
TESTING: 编写或执行测试
RESEARCH: 技术调研
"""
chain_of_thought: str = Field(
..., description="分类的思考过程"
)
classification: list[TaskType] = Field(
description=f"预测的任务类型,可选值: {[t.value for t in TaskType]}"
)
@field_validator("classification", mode="before")
def validate_classification(cls, v):
if not isinstance(v, list):
v = [v]
return v
# 分类函数
async def classify_task(description: str):
async with sem:
return description, await client.chat.completions.create(
model="qwen-turbo",
response_model=TaskClassification,
max_retries=2,
messages=[
{
"role": "user",
"content": f"分类以下编程任务: {description}"
}
]
)
# 主函数
async def main(tasks: list[str], output_file: str = None):
results = []
tasks = [classify_task(task) for task in tasks]
for task in asyncio.as_completed(tasks):
description, classification = await task
result = {
"task": description,
"type": [t.value for t in classification.classification]
}
print(result)
results.append(result)
if output_file:
with open(output_file, "w") as f:
json.dump(results, f, indent=2)
sample_tasks = [
"修复这个导致内存泄漏的Python函数",
"将这段C++代码的运行时间减少50%",
"为我们的API添加用户认证功能",
"为这个模块编写使用说明文档",
"审查同事提交的Pull Request",
"为登录功能编写单元测试",
"调研最适合我们项目的数据库技术"
]
loop = asyncio.get_running_loop()
import nest_asyncio
nest_asyncio.apply()
loop.run_until_complete(main(sample_tasks))
{'task': '为我们的API添加用户认证功能', 'type': ['IMPLEMENTATION']}
{'task': '为这个模块编写使用说明文档', 'type': ['DOCUMENTATION']}
{'task': '修复这个导致内存泄漏的Python函数', 'type': ['DEBUGGING']}
{'task': '将这段C++代码的运行时间减少50%', 'type': ['OPTIMIZATION']}
{'task': '审查同事提交的Pull Request', 'type': ['CODE_REVIEW']}
{'task': '为登录功能编写单元测试', 'type': ['TESTING']}
{'task': '调研最适合我们项目的数据库技术', 'type': ['RESEARCH']}
参考链接:https://github.com/instructor-ai/instructor/tree/main