当前位置: 首页 > news >正文

instructor 库实现缓存

目录

    • 代码
    • 代码解释
      • 1. 基础设置
      • 2. 客户端初始化
      • 3. 数据模型定义
      • 4. 缓存设置
      • 5. 缓存装饰器
      • 6. 示例函数
      • 工作流程
    • 示例
    • 类似例子

代码

import functools
import inspect
import instructor
import diskcache

from openai import OpenAI, AsyncOpenAI
from pydantic import BaseModel

client = instructor.from_openai(OpenAI(api_key = "your api key",
    base_url="https://dashscope.aliyuncs.com/compatible-mode/v1"))

aclient = instructor.from_openai(AsyncOpenAI(api_key = "your api key",
    base_url="https://dashscope.aliyuncs.com/compatible-mode/v1"))

model_name = "qwen-turbo"

class UserDetail(BaseModel):
    name: str
    age: int


cache = diskcache.Cache("./my_cache_directory")


def instructor_cache(func):
    """Cache a function that returns a Pydantic model"""
    return_type = inspect.signature(func).return_annotation
    if not issubclass(return_type, BaseModel):
        raise ValueError("The return type must be a Pydantic model")

    is_async = inspect.iscoroutinefunction(func)

    @functools.wraps(func)
    def wrapper(*args, **kwargs):
        key = f"{func.__name__}-{functools._make_key(args, kwargs, typed=False)}"
        # Check if the result is already cached
        if (cached := cache.get(key)) is not None:
            # Deserialize from JSON based on the return type
            if issubclass(return_type, BaseModel):
                return return_type.model_validate_json(cached)

        # Call the function and cache its result
        result = func(*args, **kwargs)
        serialized_result = result.model_dump_json()
        cache.set(key, serialized_result)

        return result

    @functools.wraps(func)
    async def awrapper(*args, **kwargs):
        key = f"{func.__name__}-{functools._make_key(args, kwargs, typed=False)}"
        # Check if the result is already cached
        if (cached := cache.get(key)) is not None:
            # Deserialize from JSON based on the return type
            if issubclass(return_type, BaseModel):
                return return_type.model_validate_json(cached)

        # Call the function and cache its result
        result = await func(*args, **kwargs)
        serialized_result = result.model_dump_json()
        cache.set(key, serialized_result)

        return result

    return wrapper if not is_async else awrapper


@instructor_cache
def extract(data) -> UserDetail:
    return client.chat.completions.create(
        model=model_name,
        response_model=UserDetail,
        messages=[
            {"role": "user", "content": data},
        ],
    )  # type: ignore


@instructor_cache
async def aextract(data) -> UserDetail:
    return await aclient.chat.completions.create(
        model=model_name,
        response_model=UserDetail,
        messages=[
            {"role": "user", "content": data},
        ],
    )  # type: ignore

代码解释

1. 基础设置

import functools
import inspect
import instructor
import diskcache
from openai import OpenAI, AsyncOpenAI
from pydantic import BaseModel

这部分导入了必要的库:

  • diskcache: 用于磁盘缓存
  • instructor: 用于增强 OpenAI API 的功能
  • pydantic: 用于数据验证和序列化

2. 客户端初始化

client = instructor.from_openai(OpenAI(...))
aclient = instructor.from_openai(AsyncOpenAI(...))

创建了两个客户端:

  • 同步客户端 client
  • 异步客户端 aclient

3. 数据模型定义

class UserDetail(BaseModel):
    name: str
    age: int

定义了用户详情的数据模型,包含名字和年龄字段。

4. 缓存设置

cache = diskcache.Cache("./my_cache_directory")

创建了一个磁盘缓存实例,用于存储函数调用结果。

5. 缓存装饰器

def instructor_cache(func):

这是核心的缓存装饰器,主要功能:

  • 检查返回类型是否为 Pydantic 模型
  • 根据函数是否为异步选择不同的包装器
  • 实现缓存逻辑

缓存逻辑包括:

  1. 生成缓存键
  2. 检查缓存是否存在
  3. 如果存在则返回缓存结果
  4. 如果不存在则执行函数并缓存结果

6. 示例函数

@instructor_cache
def extract(data) -> UserDetail:
    return client.chat.completions.create(...)

@instructor_cache
async def aextract(data) -> UserDetail:
    return await aclient.chat.completions.create(...)

两个示例函数:

  • extract: 同步函数
  • aextract: 异步函数
    都使用了缓存装饰器,用于从文本中提取用户信息。

工作流程

  1. 当调用这些函数时,装饰器首先检查缓存
  2. 如果找到缓存,直接返回缓存的结果
  3. 如果没有缓存,调用 OpenAI API 获取结果
  4. 将结果缓存到磁盘并返回

这种缓存机制可以:

  • 减少 API 调用次数
  • 降低成本
  • 提高响应速度
  • 减少重复计算

示例

# 同步测试
import time

start = time.perf_counter()
model = extract("John introduced himself as a 29-year-old developer")
print(f"Time taken: {time.perf_counter() - start}")
print(f"model:{model}")
print("-" * 80)

start = time.perf_counter()
model = extract("John introduced himself as a 29-year-old developer")
print(f"Time taken: {time.perf_counter() - start}")
print(f"model_cache:{model}")
print("-" * 80)

Time taken: 1.371705400000792
model:name='John' age=29
--------------------------------------------------------------------------------
Time taken: 0.00026250000519212335
model_cache:name='John' age=29
--------------------------------------------------------------------------------
async def atest_extract():

    start = time.perf_counter()
    model = await aextract("John introduced himself as a 29-year-old developer")

    print(f"Time taken: {time.perf_counter() - start}")
    print(f"model:{model}")
    print("-" * 80)
    start = time.perf_counter()
    model = await aextract("John introduced himself as a 29-year-old developer")

    print(f"Time taken: {time.perf_counter() - start}")
    print(f"model_cache:{model}")
    print("-" * 80)


await atest_extract()
Time taken: 1.3256608999945456
model:name='John' age=29
--------------------------------------------------------------------------------
Time taken: 4.610000178217888e-05
model_cache:name='John' age=29
--------------------------------------------------------------------------------

类似例子

from pydantic import BaseModel, Field
import instructor
from openai import OpenAI

class ArticleSummary(BaseModel):
    title: str
    summary: str
    key_points: list[str]
    word_count: int

@instructor_cache
def extract_summary(article: str) -> ArticleSummary:
    return client.chat.completions.create(
        model="qwen-turbo",
        response_model=ArticleSummary,
        messages=[
            {"role": "user", "content": f"Summarize this article: {article}"}
        ]
    )

article = """
人工智能正在改变我们的生活方式。从智能手机助手到自动驾驶汽车,
AI技术已经渗透到各个领域。然而,这也带来了一些挑战,
比如隐私保护和就业问题需要我们认真思考。
"""
summary = extract_summary(article)
print(summary)
title='人工智能的发展与挑战' summary='人工智能正在深刻地改变我们的生活方式,其技术已广泛应用于各个领域。然而,这种快速发展也伴随着一些挑战,特别是隐私保护和就业问题需要我们深入思考和解决。' key_points=['人工智能改变生活方式', 'AI技术广泛应用', '面临的挑战如隐私保护和就业问题'] word_count=30
class SentimentAnalysis(BaseModel):
    text: str
    sentiment: str = Field(description="positive, negative, or neutral")
    confidence: float = Field(ge=0, le=1)
    keywords: list[str]

@instructor_cache
async def analyze_sentiment(text: str) -> SentimentAnalysis:
    return await aclient.chat.completions.create(
        model="qwen-turbo",
        response_model=SentimentAnalysis,
        messages=[
            {"role": "user", "content": f"Analyze the sentiment: {text}"}
        ]
    )

# 情感分析
async def analyze():
    result = await analyze_sentiment("这个产品非常好用,超出我的预期!")
    print(result)

await analyze()
text='这个产品非常好用,超出我的预期!' sentiment='positive' confidence=0.99 keywords=['产品', '好用', '超出预期']
class ProductInfo(BaseModel):
    name: str
    price: float
    category: str
    features: list[str]
    rating: float = Field(ge=0, le=5)

@instructor_cache
def extract_product(description: str) -> ProductInfo:
    return client.chat.completions.create(
        model="qwen-turbo",
        response_model=ProductInfo,
        messages=[
            {"role": "user", "content": f"Extract product information: {description}"}
        ]
    )

product_desc = "最新款iPhone 15 Pro,支持5G,售价999美元,深空黑色,256GB存储"
product = extract_product(product_desc)
print(product)
name='iPhone 15 Pro' price=999.0 category='Electronics' features=['Latest model', '5G support', 'Deep Space Black color', '256GB storage'] rating=4.5
class Address(BaseModel):
    country: str
    province: str
    city: str
    street: str
    postal_code: str = Field(default="")

@instructor_cache
async def parse_address(address_text: str) -> Address:
    return await aclient.chat.completions.create(
        model="qwen-turbo",
        response_model=Address,
        messages=[
            {"role": "user", "content": f"Parse this address: {address_text}"}
        ]
    )

await parse_address("北京市海淀区中关村大街1号")


Address(country='中国', province='北京市', city='北京市', street='中关村大街1号', postal_code='')

参考链接:https://github.com/instructor-ai/instructor/tree/main

相关文章:

  • 【C#】.NET 8适配器模式实战:用C#实现高可用系统集成与接口桥接艺术
  • AutoGen参数说明
  • Kubernetes中的Label和Selector核心作用与应用场景
  • AI相关视频
  • 字符串与栈和队列-算法小结
  • 驱动开发硬核特训 · Day 10 (理论上篇):设备模型 ≈ 运行时的适配器机制
  • c++中的this
  • 用java代码如何存取数据库的blob字段
  • 02 - spring security基于配置文件及内存的账号密码
  • 设计模式 --- 访问者模式
  • 【LeetCode】算法详解#4 ---合并区间
  • 进程线程回顾
  • Pinia最基本用法
  • Nginx基础讲解
  • ros2-rviz2控制unity仿真的6关节机械臂,探索从仿真到实际应用的过程
  • 论文精度:HeightFormer:基于Transformer的体素高度预测在路边3D目标检测中的应用
  • flutter 桌面应用之右键菜单
  • 【Cursor 】Cursor 安装与配置指南:从零开始的高效开发之旅
  • QT6 源(16):存储 QT 里元对象的类信息的类 QMetaClassInfo 的类,只有两个成员函数 name()、value(),比元对象属性简单多了
  • 数据驱动,数字能量分析API助力手机号数据解读
  • “上海-日喀则”援藏入境旅游包机在沪首航
  • 纽约市长称墨海军帆船撞桥已致2人死亡,撞桥前船只疑似失去动力
  • 特朗普政府涉税改法案遭众议院预算委员会否决
  • 海昏侯博物馆展览上新,“西汉帝陵文化展”将持续展出3个月
  • 王东杰评《国家与学术》︱不“国”不“故”的“国学”
  • 四川甘孜炉霍县觉日寺管委会主任呷玛降泽被查