当前位置: 首页 > news >正文

open webui源码分析8—管道

        我们可以把Open WebUI想象成一个管道系统,数据通过管道和阀门流动。管道作为open webui的插件,可以为数据构建新的通路,可以自定义逻辑和处理数据;阀门是管道的可配置部件,控制数据流过管道时的行为。管道可以理解成用户自定义的模型,并且被追加到用户可见的模型列表中。用户的对话式,可以跟选择大模型一样选择管道。

        管道的引入方式通前面所说的Action和Filter一样,下面以一个使用langchain的管道为例,该管道在发送请求到大模型之前把{'role':'system', 'cotent':'You are a helpful bot'}增加到messages中,具体代码如:

"""
title: LangChain Pipe Function
author: Colby Sawyer @ Attollo LLC (mailto:colby.sawyer@attollodefense.com)
author_url: https://github.com/ColbySawyer7
version: 0.1.0

This module defines a Pipe class that utilizes LangChain
"""

from typing import Optional, Callable, Awaitable
from pydantic import BaseModel, Field
import os
import time

# import LangChain dependencies
from langchain_core.prompts import ChatPromptTemplate
from langchain.schema import StrOutputParser
from langchain_core.runnables import RunnablePassthrough
from langchain_community.llms import Ollama
# Uncomment to use OpenAI and FAISS
#from langchain_openai import ChatOpenAI
#from langchain_community.vectorstores import FAISS

class Pipe:
class Valves(BaseModel):
base_url: str = Field(default="http://localhost:11434")
ollama_embed_model: str = Field(default="nomic-embed-text")
ollama_model: str = Field(default="llama3.1")
openai_api_key: str = Field(default="...")
openai_model: str = Field(default="gpt3.5-turbo")
emit_interval: float = Field(
default=2.0, description="Interval in seconds between status emissions"
)
enable_status_indicator: bool = Field(
default=True, description="Enable or disable status indicator emissions"
)

    def __init__(self):
self.type = "pipe"
self.id = "langchain_pipe"
self.name = "LangChain Pipe"
self.valves = self.Valves()
self.last_emit_time = 0
pass

    async def emit_status(
self,
__event_emitter__: Callable[[dict], Awaitable[None]],
level: str,
message: str,
done: bool,
):
current_time = time.time()
if (
__event_emitter__
and self.valves.enable_status_indicator
and (
current_time - self.last_emit_time >= self.valves.emit_interval or done
)
):
await __event_emitter__(
{
"type": "status",
"data": {
"status": "complete" if done else "in_progress",
"level": level,
"description": message,
"done": done,
},
}
)
self.last_emit_time = current_time

    async def pipe(self, body: dict, 
__user__: Optional[dict] = None,
__event_emitter__: Callable[[dict], Awaitable[None]] = None,
__event_call__: Callable[[dict], Awaitable[dict]] = None,
) -> Optional[dict]:
await self.emit_status(
__event_emitter__, "info", "/initiating Chain", False
)

        # ======================================================================================================================================
# MODEL SETUP
# ======================================================================================================================================
# Setup the model for generating responses
# ==========================================================================
# Ollama Usage
_model = Ollama(
model=self.valves.ollama_model,
base_url=self.valves.base_url
)
# ==========================================================================
# OpenAI Usage
# _model = ChatOpenAI(
#     openai_api_key=self.valves.openai_api_key,
#     model=self.valves.openai_model
# )
# ==========================================================================

        # Example usage of FAISS for retreival
# vectorstore = FAISS.from_texts(
#     texts, embedding=OpenAIEmbeddings(openai_api_key=self.valves.openai_api_key)
# )

        # ======================================================================================================================================
# PROMPTS SETUP
# ==========================================================================
_prompt = ChatPromptTemplate.from_messages([
("system", "You are a helpful bot"),
("human", "{question}")
])
# ======================================================================================================================================
# CHAIN SETUP
# ==========================================================================
# Basic Chain
chain = (
{"question": RunnablePassthrough()}
| _prompt
| _model
| StrOutputParser()
)
# ======================================================================================================================================
# Langchain Calling
# ======================================================================================================================================
await self.emit_status(
__event_emitter__, "info", "Starting Chain", False
)
messages = body.get("messages", [])
# Verify a message is available
if messages:
question = messages[-1]["content"]
try:
# Invoke Chain
response = chain.invoke(question)
# 把调用管道的应答数据追加到表单的messages中
body["messages"].append({"role": "assistant", "content": response})
except Exception as e:
await self.emit_status(__event_emitter__, "error", f"Error during sequence execution: {str(e)}", True)
return {"error": str(e)}
# If no message is available alert user
else:
await self.emit_status(__event_emitter__, "error", "No messages found in the request body", True)
body["messages"].append({"role": "assistant", "content": "No messages found in the request body"})

        await self.emit_status(__event_emitter__, "info", "Complete", True)
return response

        管道增加成功后,出现在聊天页面的模型列表中,用户可以在聊天页面选择使用,效果如下图:

         一、请求报文

        在请求数据中的model就是当前新增的管道名LangChain Pipe,具体如下:

{
"stream": true,
"model": "langchain_pipe",
"messages": [
{
"role": "user",
"content": "床前明月光的下一句"
}
],
"params": {},
"tool_servers": [],
"features": {
"image_generation": false,
"code_interpreter": false,
"web_search": false,
"memory": false
},
"variables": {
"{{USER_NAME}}": "acaluis",
"{{USER_LOCATION}}": "Unknown",
"{{CURRENT_DATETIME}}": "2025-08-22 18:27:04",
"{{CURRENT_DATE}}": "2025-08-22",
"{{CURRENT_TIME}}": "18:27:04",
"{{CURRENT_WEEKDAY}}": "Friday",
"{{CURRENT_TIMEZONE}}": "Etc/GMT-8",
"{{USER_LANGUAGE}}": "zh-CN"
},
"model_item": {
"id": "langchain_pipe",
"name": "LangChain Pipe",
"object": "model",
"created": 1755857251,
"owned_by": "openai",
"pipe": {
"type": "pipe"
},
"actions": [],
"filters": [],
"tags": []
},
"session_id": "Hh2Tyy7FYSQz-I1SAAAg",
"chat_id": "b80cadeb-0389-4464-9fd9-270193f4f3a0",
"id": "618d361e-d70c-4fec-b945-a4be549bd699",
"background_tasks": {
"title_generation": true,
"tags_generation": true,
"follow_up_generation": true
}
}

        二、源码分析

        在会话主流程中,关于pipe的处理入口在generate_chat_completion方法中,相关代码如下:

async def generate_chat_completion(
request: Request,
form_data: dict,
user: Any,
bypass_filter: bool = False,
):

    ……

    if model.get("pipe"): #看这里。请求报文中有pipe属性,所以进入这个分支。
return await generate_function_chat_completion(#下面重点分析该方法代码
request, form_data, user=user, models=models
)

    ……  

        generate_function_chat_completion代码如下:

本方法处理流程如下:

1)确定使用的模型,也就是现在的langchain_pipe

2)设置pipe方法的部分参数到extra_params中

3)从模块中获取管道Function的pipe方法及参数

4)异步执行pipe方法

5)把调用pipe方法的应答包装成openai兼容格式,并以流式返回

async def generate_function_chat_completion(
request, form_data, user, models: dict = {}
):
async def execute_pipe(pipe, params):#该方法一步调用管道中的pipe方法
if inspect.iscoroutinefunction(pipe):
return await pipe(**params)
else:
return pipe(**params)

    async def get_message_content(res: str | Generator | AsyncGenerator) -> str:
if isinstance(res, str):
return res
if isinstance(res, Generator):
return "".join(map(str, res))
if isinstance(res, AsyncGenerator):
return "".join([str(stream) async for stream in res])

    def process_line(form_data: dict, line):
if isinstance(line, BaseModel):
line = line.model_dump_json()
line = f"data: {line}"
if isinstance(line, dict):
line = f"data: {json.dumps(line)}"

        try:
line = line.decode("utf-8")
except Exception:
pass

        if line.startswith("data:"):
return f"{line}\n\n"
else:
line = openai_chat_chunk_message_template(form_data["model"], line)
return f"data: {json.dumps(line)}\n\n"

    def get_pipe_id(form_data: dict) -> str:
pipe_id = form_data["model"]
if "." in pipe_id:
pipe_id, _ = pipe_id.split(".", 1)
return pipe_id

    #本方法用于从form_data,user和extra_params中获取管道的pipe函数参数

    def get_function_params(function_module, form_data, user, extra_params=None):
if extra_params is None:
extra_params = {}

        pipe_id = get_pipe_id(form_data)

        # Get the signature of the function
sig = inspect.signature(function_module.pipe) #获取函数签名

        '''

            body设置为表单,__event_emitter__、__event_caller__和__user__来自

            extra_params

        '''
params = {"body": form_data} | {
k: v for k, v in extra_params.items() if k in sig.parameters
}

        '''

            如果参数中有__user__并且管道函数中有UserValves,则需要根据pipe_id和user_id

            找到用户阀门值,并设置到params["__user__"]["valves"]中

        '''

        if "__user__" in params and hasattr(function_module, "UserValves"):
user_valves = Functions.get_user_valves_by_id_and_user_id(pipe_id, user.id)
try:
params["__user__"]["valves"] = function_module.UserValves(**user_valves)
except Exception as e:
log.exception(e)
params["__user__"]["valves"] = function_module.UserValves()

        return params

    #以下两行代码根据请求中的model确定使用的模型,这里对应就是前面定位的管道

    model_id = form_data.get("model")
model_info = Models.get_model_by_id(model_id)

    '''

        当前model_info信息如下:

                {
"id": "langchain_pipe",
"name": "LangChain Pipe",
"object": "model",
"created": 1755827527,
"owned_by": "openai",
"pipe":{"type": "pipe"},
}

    '''

    metadata = form_data.pop("metadata", {})

    files = metadata.get("files", [])
tool_ids = metadata.get("tool_ids", [])
# tool_ids处理,暂不关注
if tool_ids is None:
tool_ids = []

    __event_emitter__ = None
__event_call__ = None
__task__ = None
__task_body__ = None

    if metadata:#如果元数据不为空,判断其中是否包含了session_id,chat_id和message_id

        #如果上面的判断成立,则创建__event_emitter__和__event_call__
if all(k in metadata for k in ("session_id", "chat_id", "message_id")):
__event_emitter__ = get_event_emitter(metadata)
__event_call__ = get_event_call(metadata)
__task__ = metadata.get("task", None)
__task_body__ = metadata.get("task_body", None)

    extra_params = { #t填充extra_params,用于提供调用pipe方法时的参数
"__event_emitter__": __event_emitter__,
"__event_call__": __event_call__,
"__chat_id__": metadata.get("chat_id", None),
"__session_id__": metadata.get("session_id", None),
"__message_id__": metadata.get("message_id", None),
"__task__": __task__,
"__task_body__": __task_body__,
"__files__": files,
"__user__": user.model_dump() if isinstance(user, UserModel) else {},
"__metadata__": metadata,
"__request__": request,
}
extra_params["__tools__"] = get_tools(#根据tool_ids设置extra_params的__tools__
request,
tool_ids,
user,
{
**extra_params,
"__model__": models.get(form_data["model"], None),
"__messages__": form_data["messages"],
"__files__": files,
},
)

    if model_info:
if model_info.base_model_id:#暂不考虑
form_data["model"] = model_info.base_model_id

        params = model_info.params.model_dump()

        if params:#暂不考虑
system = params.pop("system", None)
form_data = apply_model_params_to_body_openai(params, form_data)
form_data = apply_model_system_prompt_to_body(
system, form_data, metadata, user
)

    pipe_id = get_pipe_id(form_data)#从表单中获取管道对应的函数ID
function_module = get_function_module_by_id(request, pipe_id) #根据ID获取模块

    pipe = function_module.pipe#得到该管道的pipe方法

    #从表单数据、用户和extra_params中找到并组织好调用pipe方法时所有的参数
params = get_function_params(function_module, form_data, user, extra_params)

    if form_data.get("stream", False):#一般是流式应答

        async def stream_content(): #执行管道的pipe方法,并处理流式应答
try:

                 '''

                   重要:管道中调用ollama返回的数据是ndjson

                   ndjson是一种数据交换格式,其中每行包含一个独立的json对象,

                   需要把所有的行合并成一个大的json对象,langchain完成了合并,

                   所以调用管道后,返回的内容就是应答内容

                '''
res = await execute_pipe(pipe, params)

                # Directly return if the response is a StreamingResponse
if isinstance(res, StreamingResponse):
async for data in res.body_iterator:
yield data
return
if isinstance(res, dict):
yield f"data: {json.dumps(res)}\n\n"
return

            except Exception as e:
log.error(f"Error: {e}")
yield f"data: {json.dumps({'error': {'detail':str(e)}})}\n\n"
return

            if isinstance(res, str): #正常情况进入本分支

                ''' --------把大模型应答内容转换成标准openai格式------------

                   返回数据格式如下:

                    {"data":

                             {

                                   "id", "langchain_pipe-{uuid}",

                                   "created": "当前时间",

                                   "model": "langchain_pipe",    

                                    choices:[

                                                     {

                                                            "index":0,

                                                            "logprobs":None,

                                                            "finish_reason":None

                                                             "delta":{

                                                                 "content": "管道返回内容"

                                                              }

                                                      }

                                         ]

                                }

                          }         

                '''
message = openai_chat_chunk_message_template(form_data["model"], res)
yield f"data: {json.dumps(message)}\n\n"

            if isinstance(res, Iterator):
for line in res:
yield process_line(form_data, line)

            if isinstance(res, AsyncGenerator):
async for line in res:
yield process_line(form_data, line)

            if isinstance(res, str) or isinstance(res, Generator):
finish_message = openai_chat_chunk_message_template(
form_data["model"], ""
)
finish_message["choices"][0]["finish_reason"] = "stop"
yield f"data: {json.dumps(finish_message)}\n\n"
yield "data: [DONE]"

        #调用stream_content()后,以流式应答返回供process_chat_response处理

        return StreamingResponse(stream_content(), media_type="text/event-stream")
else:#非流式请求,不须考虑
try:
res = await execute_pipe(pipe, params) #执行管道的pipe方法

        except Exception as e:
log.error(f"Error: {e}")
return {"error": {"detail": str(e)}}

        if isinstance(res, StreamingResponse) or isinstance(res, dict):
return res
if isinstance(res, BaseModel):
return res.model_dump()

        message = await get_message_content(res)
return openai_chat_completion_message_template(form_data["model"], message)

        process_chat_response的处理逻辑在系列3已经分析,在此不做赘述。

http://www.dtcms.com/a/352279.html

相关文章:

  • 域名常见问题集(十一)——为什么要进行域名管理?
  • 【实时Linux实战系列】基于实时Linux的音频实时监控系统
  • 从16个粉丝到680万年收入:AI创业的117天奇迹
  • 声明式微服务通信新范式:OpenFeign如何简化RestTemplate调用
  • Windows下实现类似`watch nvidia-smi`的实时监控效果
  • 进入docker中mysql容器的方法
  • Java:TreeSet的使用
  • (Arxiv-2024)VideoMaker:零样本定制化视频生成,依托于视频扩散模型的内在力量
  • QT qml(quick3D)模型的移动
  • 专业解读《Light》封面:可调谐混合超表面(THCMs)如何革新下一代LiDAR系统
  • 3D游戏角色建模资源搜索指南(资料来源于网络)
  • 湖仓一体:小米集团基于 Apache Doris + Apache Paimon 实现 6 倍性能飞跃
  • JavaWeb之分布式事务规范
  • LInux(二十一)——Linux SSH 基于密钥交换的自动登录原理简介及配置说明
  • jenkins2025配置邮箱发送
  • 基于Android的车位预售预租APP/基于Android的车位租赁系统APP/基于Android的车位管理系统APP
  • Leetcode—1163. 按字典序排在最后的子串【困难】
  • Linux(二十二)——服务器初始化指南
  • cuda编程笔记(16)--使用 cuDNN 实现卷积、激活、池化等反向操作
  • 刀客doc:沃尔玛取消与TTD的独家合作,对程序化广告意味着什么?
  • 【RAGFlow代码详解-23】聊天系统架构
  • 字节跳动国际版 TRAE 深度解析:重新定义 AI 时代的编程体验
  • Docker化性能监控平台搭建:JMeter+InfluxDB+Grafana全攻略
  • Vite 模块联邦插件 实现微前端架构,其核心原理概述
  • 网络安全零基础入门:2025核心知识与系统学习路径分享
  • 工地考勤数据、监控回传与远程办公需求,如何通过贝锐蒲公英实现?
  • 做项目总是出问题,如何提升项目管理能力?
  • (MySQL索引事务) 本节目标 索引 事务
  • JUC之并发编程总结
  • 控制系统仿真之基础知识(一)