当前位置: 首页 > news >正文

langchain 实现 任务分解器

需要使用任务分解器的情况:

1.复杂长期目标:当任务过于复杂,无法一步完成时

例如:“制定一个完整的产品发布计划”
需要分解为市场调研、产品设计、测试、营销策略等子任务

2.多步骤依赖关系:任务间存在明确的先后顺序

例如:软件开发流程(需求分析 → 设计 → 编码 → 测试 → 部署)

3.并行处理需求:某些子任务可以同时进行

例如:内容创作项目中的文本撰写和图片设计可以并行

4.资源分配优化:需要合理分配计算资源或时间

大型数据处理任务可以分解为多个批次处理


from langchain.prompts import PromptTemplatefrom langchain.schema import BaseOutputParser
from pydantic import BaseModel, Field
from typing import List, Dict
import json
import re
from llm.llm import get_chatmodelclass SubTask(BaseModel):"""子任务数据模型"""task_name: str = Field(description="任务名称")task_description: str = Field(description="任务详细描述")task_steps: List[str] = Field(description="执行步骤列表")dependencies: List[str] = Field(default=[], description="依赖的其他任务")priority: int = Field(default=1, description="优先级(1-5)")class TaskDecomposition(BaseModel):"""任务分解结果"""original_request: str = Field(description="原始用户请求")total_tasks: int = Field(description="总任务数")sub_tasks: List[SubTask] = Field(description="子任务列表")class TaskDecompositionParser(BaseOutputParser):"""自定义解析器"""def parse(self, text: str) -> TaskDecomposition:try:# 清理文本,提取JSON部分json_match = re.search(r'\{.*\}', text, re.DOTALL)if json_match:json_str = json_match.group()data = json.loads(json_str)return TaskDecomposition(**data)except:pass# 如果JSON解析失败,使用规则解析return self._rule_based_parse(text)def _rule_based_parse(self, text: str) -> TaskDecomposition:"""基于规则的解析方法"""lines = text.strip().split('\n')sub_tasks = []current_task = Nonefor line in lines:line = line.strip()if line.startswith('任务') or line.startswith('Task'):if current_task:sub_tasks.append(current_task)current_task = {'task_name': line,'task_description': '','task_steps': [],'dependencies': [],'priority': 1}elif line.startswith('步骤') or line.startswith('Step'):if current_task:current_task['task_steps'].append(line)if current_task:sub_tasks.append(current_task)return TaskDecomposition(original_request="解析的用户请求",total_tasks=len(sub_tasks),sub_tasks=[SubTask(**task) for task in sub_tasks])def create_task_decomposition_chain():"""创建任务分解链"""template = """你是一个智能任务规划助手。请将用户的复杂请求分解为具体的子任务。用户请求: {user_request}请按照以下JSON格式返回结果:{{"original_request": "用户的原始请求","total_tasks": 子任务总数,"sub_tasks": [{{"task_name": "任务名称","task_description": "详细描述这个任务要做什么","task_steps": ["步骤1", "步骤2", "步骤3"],"dependencies": ["依赖的其他任务名称"],"priority": 优先级数字(1-5)}}]}}分解原则:1. 每个子任务应该是独立可执行的2. 任务之间的依赖关系要清晰3. 步骤要具体可操作4. 优先级要合理安排"""prompt = PromptTemplate(template=template,input_variables=["user_request"])get_chatmodel()parser = TaskDecompositionParser()llm = get_chatmodel()chain = prompt | llm | parserreturn chainif __name__ == "__main__":chain = create_task_decomposition_chain()user_request = "查询我申报了哪些补贴,并查询补贴的状态"result = chain.invoke(user_request)print(result)

相关文章:

  • 【基础知识】QSPI的命令
  • Oracle中如何解决LATCH:CACHE BUFFERS LRU CHAIN
  • Windows 上配置 Docker,Docker 的基本原理和用途,以及如何在 Docker 中运行程序
  • JVM 性能问题排查实战10连击
  • 静态代理有哪些优势
  • 第二届帕鲁杯时间循环的信使
  • Vortex GPGPU的github流程跑通与功能模块波形探索(三)
  • CAN总线
  • 开源情报搜集系统:科研创新的强大引擎
  • 电网中窃电分析:概念、算法与应用
  • 深度解析 HDFS与Hive的关系
  • HarmonyOS NEXT~鸿蒙系统与mPaaS三方框架集成指南
  • 电商虚拟户:重构资金管理逻辑,解锁高效归集与智能分账新范式
  • 基于springboot3 VUE3 火车订票系统前后端分离项目适合新手学习的项目包含 智能客服 换乘算法
  • Qt调用librdkafka
  • Android 中拖拽从一个组件到另外一个组件的写法(跨容器拖拽)
  • 封装POD与PinMap文件总结学习-20250516
  • 【AS32X601驱动系列教程】MCU启动详解
  • java接口自动化初识
  • 在 Azure OpenAI 上使用 Elastic 优化支出和内容审核
  • 牛商网做网站/厦门关键词优化平台
  • 移动端首页/天津seo网站排名优化公司
  • 门户网站建设技术要求/推广
  • 安徽建设工程信息网固镇县/网站seo综合诊断
  • 织梦做商城网站/代写文章接单平台
  • 做国外单的网站叫什么名字/seo渠道是什么意思