当前位置: 首页 > news >正文

instructor库实现batch 请求

目录

    • 代码
    • 代码解释
    • 示例
    • 类似例子

代码

import json
import instructor
import asyncio

from openai import AsyncOpenAI
from pydantic import BaseModel, Field, field_validator
from enum import Enum



client = AsyncOpenAI(api_key = "your api key",
    base_url="https://dashscope.aliyuncs.com/compatible-mode/v1")
client = instructor.from_openai(client, mode=instructor.Mode.TOOLS)
sem = asyncio.Semaphore(5)


class QuestionType(Enum):
    CONTACT = "CONTACT"
    TIMELINE_QUERY = "TIMELINE_QUERY"
    DOCUMENT_SEARCH = "DOCUMENT_SEARCH"
    COMPARE_CONTRAST = "COMPARE_CONTRAST"
    EMAIL = "EMAIL"
    PHOTOS = "PHOTOS"
    SUMMARY = "SUMMARY"


# You can add more instructions and examples in the description
# or you can put it in the prompt in `messages=[...]`
class QuestionClassification(BaseModel):
    """
    Predict the type of question that is being asked.
    Here are some tips on how to predict the question type:
    CONTACT: Searches for some contact information.
    TIMELINE_QUERY: "When did something happen?
    DOCUMENT_SEARCH: "Find me a document"
    COMPARE_CONTRAST: "Compare and contrast two things"
    EMAIL: "Find me an email, search for an email"
    PHOTOS: "Find me a photo, search for a photo"
    SUMMARY: "Summarize a large amount of data"
    """

    # If you want only one classification, just change it to
    #   `classification: QuestionType` rather than `classifications: List[QuestionType]``
    chain_of_thought: str = Field(
        ..., description="The chain of thought that led to the classification"
    )
    classification: list[QuestionType] = Field(
        description=f"An accuracy and correct prediction predicted class of question. Only allowed types: {[t.value for t in QuestionType]}, should be used",
    )

    @field_validator("classification", mode="before")
    def validate_classification(cls, v):
        # sometimes the API returns a single value, just make sure it's a list
        if not isinstance(v, list):
            v = [v]
        return v


async def classify(data: str):
    async with sem:  # some simple rate limiting
        return data, await client.chat.completions.create(
            model="qwen-turbo",
            response_model=QuestionClassification,
            max_retries=2,
            messages=[
                {
                    "role": "user",
                    "content": f"Classify the following question: {data}",
                },
            ],
        )


async def main(questions: list[str], *, path_to_jsonl: str = None):
    tasks = [classify(question) for question in questions]
    for task in asyncio.as_completed(tasks):
        question, label = await task
        resp = {
            "question": question,
            "classification": [c.value for c in label.classification],
        }
        print(resp)
        if path_to_jsonl:
            with open(path_to_jsonl, "a") as f:
                json_dump = json.dumps(resp)
                f.write(json_dump + "\n")

代码解释

  1. 初始化设置
client = AsyncOpenAI(api_key = "...", base_url="...")
client = instructor.from_openai(client, mode=instructor.Mode.TOOLS)
sem = asyncio.Semaphore(5)  # 限制并发请求数为5
  1. 问题类型定义
class QuestionType(Enum):
    CONTACT = "CONTACT"
    TIMELINE_QUERY = "TIMELINE_QUERY"
    # ...

使用枚举定义了7种不同的问题类型,便于分类和管理。

  1. 分类模型定义
class QuestionClassification(BaseModel):
    chain_of_thought: str = Field(...)  # 分类推理过程
    classification: list[QuestionType]   # 分类结果列表
  • 使用 Pydantic 模型定义输出格式
  • 包含推理过程和分类结果两个字段
  • 通过 field_validator 确保分类结果始终为列表
  1. 分类函数
async def classify(data: str):
    async with sem:  # 使用信号量控制并发
        return data, await client.chat.completions.create(...)
  • 异步处理单个问题的分类
  • 使用信号量控制并发请求数
  • 返回原始问题和分类结果
  1. 主函数
async def main(questions: list[str], *, path_to_jsonl: str = None):
    tasks = [classify(question) for question in questions]  # 创建任务列表
    for task in asyncio.as_completed(tasks):  # 并发执行
        question, label = await task
        resp = {
            "question": question,
            "classification": [c.value for c in label.classification],
        }
        print(resp)  # 打印结果
        if path_to_jsonl:  # 可选的结果保存
            with open(path_to_jsonl, "a") as f:
                json_dump = json.dumps(resp)
                f.write(json_dump + "\n")
  • 并发处理多个问题
  • 使用 asyncio.as_completed 处理完成的任务
  • 支持将结果保存到 JSONL 文件

这段代码的主要特点:

  • 使用 instructor 实现结构化输出
  • 异步并发处理提高效率
  • 使用信号量控制并发数
  • 支持批量处理和结果保存
  • 类型安全的数据处理

示例

import asyncio

questions = [
    "What was that ai app that i saw on the news the other day?",
    "Can you find the trainline booking email?",
    "What was the book I saw on amazon yesturday?",
    "Can you speak german?",
    "Do you have access to the meeting transcripts?",
    "what are the recent sites I visited?",
    "what did I do on Monday?",
    "Tell me about todays meeting and how it relates to the email on Monday",
]
loop = asyncio.get_running_loop()

import nest_asyncio
nest_asyncio.apply()
loop.run_until_complete(main(questions))
{'question': 'Can you speak german?', 'classification': ['COMPARE_CONTRAST']}
{'question': 'Can you find the trainline booking email?', 'classification': ['EMAIL']}
{'question': 'What was that ai app that i saw on the news the other day?', 'classification': ['DOCUMENT_SEARCH']}
{'question': 'what are the recent sites I visited?', 'classification': ['DOCUMENT_SEARCH']}
{'question': 'What was the book I saw on amazon yesturday?', 'classification': ['DOCUMENT_SEARCH']}
{'question': 'Do you have access to the meeting transcripts?', 'classification': ['DOCUMENT_SEARCH']}
{'question': 'what did I do on Monday?', 'classification': ['TIMELINE_QUERY']}
{'question': 'Tell me about todays meeting and how it relates to the email on Monday', 'classification': ['COMPARE_CONTRAST']}

类似例子

import json
import instructor
import asyncio
from openai import AsyncOpenAI
from pydantic import BaseModel, Field, field_validator
from enum import Enum

# 初始化客户端
client = AsyncOpenAI(api_key = "your api key",
    base_url="https://dashscope.aliyuncs.com/compatible-mode/v1")
    
client = instructor.from_openai(client, mode=instructor.Mode.TOOLS)
sem = asyncio.Semaphore(5)  # 并发控制

# 定义任务类型枚举
class TaskType(Enum):
    DEBUGGING = "DEBUGGING"
    OPTIMIZATION = "OPTIMIZATION"
    IMPLEMENTATION = "IMPLEMENTATION"
    DOCUMENTATION = "DOCUMENTATION"
    CODE_REVIEW = "CODE_REVIEW"
    TESTING = "TESTING"
    RESEARCH = "RESEARCH"

# 任务分类模型
class TaskClassification(BaseModel):
    """
    预测编程任务的类型
    示例类型说明:
    DEBUGGING: 修复代码中的错误
    OPTIMIZATION: 提高代码性能
    IMPLEMENTATION: 实现新功能
    DOCUMENTATION: 编写文档
    CODE_REVIEW: 代码审查
    TESTING: 编写或执行测试
    RESEARCH: 技术调研
    """
    chain_of_thought: str = Field(
        ..., description="分类的思考过程"
    )
    classification: list[TaskType] = Field(
        description=f"预测的任务类型,可选值: {[t.value for t in TaskType]}"
    )

    @field_validator("classification", mode="before")
    def validate_classification(cls, v):
        if not isinstance(v, list):
            v = [v]
        return v

# 分类函数
async def classify_task(description: str):
    async with sem:
        return description, await client.chat.completions.create(
            model="qwen-turbo",
            response_model=TaskClassification,
            max_retries=2,
            messages=[
                {
                    "role": "user",
                    "content": f"分类以下编程任务: {description}"
                }
            ]
        )

# 主函数
async def main(tasks: list[str], output_file: str = None):
    results = []
    tasks = [classify_task(task) for task in tasks]
    
    for task in asyncio.as_completed(tasks):
        description, classification = await task
        result = {
            "task": description,
            "type": [t.value for t in classification.classification]
        }
        print(result)
        results.append(result)
    
    if output_file:
        with open(output_file, "w") as f:
            json.dump(results, f, indent=2)


sample_tasks = [
    "修复这个导致内存泄漏的Python函数",
    "将这段C++代码的运行时间减少50%",
    "为我们的API添加用户认证功能",
    "为这个模块编写使用说明文档",
    "审查同事提交的Pull Request",
    "为登录功能编写单元测试",
    "调研最适合我们项目的数据库技术"
]


loop = asyncio.get_running_loop()


import nest_asyncio
nest_asyncio.apply()
loop.run_until_complete(main(sample_tasks))
{'task': '为我们的API添加用户认证功能', 'type': ['IMPLEMENTATION']}
{'task': '为这个模块编写使用说明文档', 'type': ['DOCUMENTATION']}
{'task': '修复这个导致内存泄漏的Python函数', 'type': ['DEBUGGING']}
{'task': '将这段C++代码的运行时间减少50%', 'type': ['OPTIMIZATION']}
{'task': '审查同事提交的Pull Request', 'type': ['CODE_REVIEW']}
{'task': '为登录功能编写单元测试', 'type': ['TESTING']}
{'task': '调研最适合我们项目的数据库技术', 'type': ['RESEARCH']}

参考链接:https://github.com/instructor-ai/instructor/tree/main

相关文章:

  • 基础数学:图论与信息论
  • 前端性能测试工具 —— WebPageTest
  • AI——使用pandas
  • 深入解析嵌入模型Embedding :从理论到实践的全方位指南
  • 微服务的服务调用详解以及常见解决方案对比
  • 软件测试——测试概念
  • CCLK IE转EtherCAT凭借网关模块实现三菱PLC与汇川伺服精准通讯的实用案例​
  • 107.二叉树的层序遍历II- 力扣(LeetCode)
  • opencv(C++)用直方图统计像素
  • 动态多目标优化:基于可学习预测的动态多目标进化算法(DIP-DMOEA)求解CEC2018(DF1-DF14),提供MATLAB代码
  • pair与tuple
  • 缓存与数据库一致性:从问题到解决方案全解析
  • 04-微服务 面试题-mk
  • 斐波那契数列 (Fibonacci Sequence) C++
  • 0.DockerCE起步之Linux相关【完善中】
  • 提示词 (Prompt)
  • 树上搜索 第32次CCF-CSP计算机软件能力认证
  • 激光院董事长龚赤坤到北京研发中心检查指导工作
  • 深入解析 Spring AI ChatClient:构建高效 AI 应用的终极指南
  • 2025年3月 Scratch图形化四级 真题解析 中国电子学会全国青少年软件编程等级考试
  • 兰州企业网络优化方案/自己怎么给网站做优化排名
  • 毕业设计网站只做前端行不行/百度销售系统
  • 做网站的书知乎/2021近期时事新闻热点事件
  • c 网站开发 pdf/广州广告推广公司
  • 创建全国文明城市建议简短/电脑优化大师官方免费下载
  • 临沂网站建设厂家/营销计划怎么写