open webui源码分析8—管道
我们可以把Open WebUI想象成一个管道系统,数据通过管道和阀门流动。管道作为open webui的插件,可以为数据构建新的通路,可以自定义逻辑和处理数据;阀门是管道的可配置部件,控制数据流过管道时的行为。管道可以理解成用户自定义的模型,并且被追加到用户可见的模型列表中。用户的对话式,可以跟选择大模型一样选择管道。
管道的引入方式通前面所说的Action和Filter一样,下面以一个使用langchain的管道为例,该管道在发送请求到大模型之前把{'role':'system', 'cotent':'You are a helpful bot'}增加到messages中,具体代码如:
"""
title: LangChain Pipe Function
author: Colby Sawyer @ Attollo LLC (mailto:colby.sawyer@attollodefense.com)
author_url: https://github.com/ColbySawyer7
version: 0.1.0This module defines a Pipe class that utilizes LangChain
"""from typing import Optional, Callable, Awaitable
from pydantic import BaseModel, Field
import os
import time# import LangChain dependencies
from langchain_core.prompts import ChatPromptTemplate
from langchain.schema import StrOutputParser
from langchain_core.runnables import RunnablePassthrough
from langchain_community.llms import Ollama
# Uncomment to use OpenAI and FAISS
#from langchain_openai import ChatOpenAI
#from langchain_community.vectorstores import FAISSclass Pipe:
class Valves(BaseModel):
base_url: str = Field(default="http://localhost:11434")
ollama_embed_model: str = Field(default="nomic-embed-text")
ollama_model: str = Field(default="llama3.1")
openai_api_key: str = Field(default="...")
openai_model: str = Field(default="gpt3.5-turbo")
emit_interval: float = Field(
default=2.0, description="Interval in seconds between status emissions"
)
enable_status_indicator: bool = Field(
default=True, description="Enable or disable status indicator emissions"
)def __init__(self):
self.type = "pipe"
self.id = "langchain_pipe"
self.name = "LangChain Pipe"
self.valves = self.Valves()
self.last_emit_time = 0
passasync def emit_status(
self,
__event_emitter__: Callable[[dict], Awaitable[None]],
level: str,
message: str,
done: bool,
):
current_time = time.time()
if (
__event_emitter__
and self.valves.enable_status_indicator
and (
current_time - self.last_emit_time >= self.valves.emit_interval or done
)
):
await __event_emitter__(
{
"type": "status",
"data": {
"status": "complete" if done else "in_progress",
"level": level,
"description": message,
"done": done,
},
}
)
self.last_emit_time = current_timeasync def pipe(self, body: dict,
__user__: Optional[dict] = None,
__event_emitter__: Callable[[dict], Awaitable[None]] = None,
__event_call__: Callable[[dict], Awaitable[dict]] = None,
) -> Optional[dict]:
await self.emit_status(
__event_emitter__, "info", "/initiating Chain", False
)# ======================================================================================================================================
# MODEL SETUP
# ======================================================================================================================================
# Setup the model for generating responses
# ==========================================================================
# Ollama Usage
_model = Ollama(
model=self.valves.ollama_model,
base_url=self.valves.base_url
)
# ==========================================================================
# OpenAI Usage
# _model = ChatOpenAI(
# openai_api_key=self.valves.openai_api_key,
# model=self.valves.openai_model
# )
# ==========================================================================# Example usage of FAISS for retreival
# vectorstore = FAISS.from_texts(
# texts, embedding=OpenAIEmbeddings(openai_api_key=self.valves.openai_api_key)
# )# ======================================================================================================================================
# PROMPTS SETUP
# ==========================================================================
_prompt = ChatPromptTemplate.from_messages([
("system", "You are a helpful bot"),
("human", "{question}")
])
# ======================================================================================================================================
# CHAIN SETUP
# ==========================================================================
# Basic Chain
chain = (
{"question": RunnablePassthrough()}
| _prompt
| _model
| StrOutputParser()
)
# ======================================================================================================================================
# Langchain Calling
# ======================================================================================================================================
await self.emit_status(
__event_emitter__, "info", "Starting Chain", False
)
messages = body.get("messages", [])
# Verify a message is available
if messages:
question = messages[-1]["content"]
try:
# Invoke Chain
response = chain.invoke(question)
# 把调用管道的应答数据追加到表单的messages中
body["messages"].append({"role": "assistant", "content": response})
except Exception as e:
await self.emit_status(__event_emitter__, "error", f"Error during sequence execution: {str(e)}", True)
return {"error": str(e)}
# If no message is available alert user
else:
await self.emit_status(__event_emitter__, "error", "No messages found in the request body", True)
body["messages"].append({"role": "assistant", "content": "No messages found in the request body"})await self.emit_status(__event_emitter__, "info", "Complete", True)
return response
管道增加成功后,出现在聊天页面的模型列表中,用户可以在聊天页面选择使用,效果如下图:
一、请求报文
在请求数据中的model就是当前新增的管道名LangChain Pipe,具体如下:
{
"stream": true,
"model": "langchain_pipe",
"messages": [
{
"role": "user",
"content": "床前明月光的下一句"
}
],
"params": {},
"tool_servers": [],
"features": {
"image_generation": false,
"code_interpreter": false,
"web_search": false,
"memory": false
},
"variables": {
"{{USER_NAME}}": "acaluis",
"{{USER_LOCATION}}": "Unknown",
"{{CURRENT_DATETIME}}": "2025-08-22 18:27:04",
"{{CURRENT_DATE}}": "2025-08-22",
"{{CURRENT_TIME}}": "18:27:04",
"{{CURRENT_WEEKDAY}}": "Friday",
"{{CURRENT_TIMEZONE}}": "Etc/GMT-8",
"{{USER_LANGUAGE}}": "zh-CN"
},
"model_item": {
"id": "langchain_pipe",
"name": "LangChain Pipe",
"object": "model",
"created": 1755857251,
"owned_by": "openai",
"pipe": {
"type": "pipe"
},
"actions": [],
"filters": [],
"tags": []
},
"session_id": "Hh2Tyy7FYSQz-I1SAAAg",
"chat_id": "b80cadeb-0389-4464-9fd9-270193f4f3a0",
"id": "618d361e-d70c-4fec-b945-a4be549bd699",
"background_tasks": {
"title_generation": true,
"tags_generation": true,
"follow_up_generation": true
}
}
二、源码分析
在会话主流程中,关于pipe的处理入口在generate_chat_completion方法中,相关代码如下:
async def generate_chat_completion(
request: Request,
form_data: dict,
user: Any,
bypass_filter: bool = False,
):……
if model.get("pipe"): #看这里。请求报文中有pipe属性,所以进入这个分支。
return await generate_function_chat_completion(#下面重点分析该方法代码
request, form_data, user=user, models=models
)……
generate_function_chat_completion代码如下:
本方法处理流程如下:
1)确定使用的模型,也就是现在的langchain_pipe
2)设置pipe方法的部分参数到extra_params中
3)从模块中获取管道Function的pipe方法及参数
4)异步执行pipe方法
5)把调用pipe方法的应答包装成openai兼容格式,并以流式返回
async def generate_function_chat_completion(
request, form_data, user, models: dict = {}
):
async def execute_pipe(pipe, params):#该方法一步调用管道中的pipe方法
if inspect.iscoroutinefunction(pipe):
return await pipe(**params)
else:
return pipe(**params)async def get_message_content(res: str | Generator | AsyncGenerator) -> str:
if isinstance(res, str):
return res
if isinstance(res, Generator):
return "".join(map(str, res))
if isinstance(res, AsyncGenerator):
return "".join([str(stream) async for stream in res])def process_line(form_data: dict, line):
if isinstance(line, BaseModel):
line = line.model_dump_json()
line = f"data: {line}"
if isinstance(line, dict):
line = f"data: {json.dumps(line)}"try:
line = line.decode("utf-8")
except Exception:
passif line.startswith("data:"):
return f"{line}\n\n"
else:
line = openai_chat_chunk_message_template(form_data["model"], line)
return f"data: {json.dumps(line)}\n\n"def get_pipe_id(form_data: dict) -> str:
pipe_id = form_data["model"]
if "." in pipe_id:
pipe_id, _ = pipe_id.split(".", 1)
return pipe_id#本方法用于从form_data,user和extra_params中获取管道的pipe函数参数
def get_function_params(function_module, form_data, user, extra_params=None):
if extra_params is None:
extra_params = {}pipe_id = get_pipe_id(form_data)
# Get the signature of the function
sig = inspect.signature(function_module.pipe) #获取函数签名'''
body设置为表单,__event_emitter__、__event_caller__和__user__来自
extra_params
'''
params = {"body": form_data} | {
k: v for k, v in extra_params.items() if k in sig.parameters
}'''
如果参数中有__user__并且管道函数中有UserValves,则需要根据pipe_id和user_id
找到用户阀门值,并设置到params["__user__"]["valves"]中
'''
if "__user__" in params and hasattr(function_module, "UserValves"):
user_valves = Functions.get_user_valves_by_id_and_user_id(pipe_id, user.id)
try:
params["__user__"]["valves"] = function_module.UserValves(**user_valves)
except Exception as e:
log.exception(e)
params["__user__"]["valves"] = function_module.UserValves()return params
#以下两行代码根据请求中的model确定使用的模型,这里对应就是前面定位的管道
model_id = form_data.get("model")
model_info = Models.get_model_by_id(model_id)'''
当前model_info信息如下:
{
"id": "langchain_pipe",
"name": "LangChain Pipe",
"object": "model",
"created": 1755827527,
"owned_by": "openai",
"pipe":{"type": "pipe"},
}'''
metadata = form_data.pop("metadata", {})
files = metadata.get("files", [])
tool_ids = metadata.get("tool_ids", [])
# tool_ids处理,暂不关注
if tool_ids is None:
tool_ids = []__event_emitter__ = None
__event_call__ = None
__task__ = None
__task_body__ = Noneif metadata:#如果元数据不为空,判断其中是否包含了session_id,chat_id和message_id
#如果上面的判断成立,则创建__event_emitter__和__event_call__
if all(k in metadata for k in ("session_id", "chat_id", "message_id")):
__event_emitter__ = get_event_emitter(metadata)
__event_call__ = get_event_call(metadata)
__task__ = metadata.get("task", None)
__task_body__ = metadata.get("task_body", None)extra_params = { #t填充extra_params,用于提供调用pipe方法时的参数
"__event_emitter__": __event_emitter__,
"__event_call__": __event_call__,
"__chat_id__": metadata.get("chat_id", None),
"__session_id__": metadata.get("session_id", None),
"__message_id__": metadata.get("message_id", None),
"__task__": __task__,
"__task_body__": __task_body__,
"__files__": files,
"__user__": user.model_dump() if isinstance(user, UserModel) else {},
"__metadata__": metadata,
"__request__": request,
}
extra_params["__tools__"] = get_tools(#根据tool_ids设置extra_params的__tools__
request,
tool_ids,
user,
{
**extra_params,
"__model__": models.get(form_data["model"], None),
"__messages__": form_data["messages"],
"__files__": files,
},
)if model_info:
if model_info.base_model_id:#暂不考虑
form_data["model"] = model_info.base_model_idparams = model_info.params.model_dump()
if params:#暂不考虑
system = params.pop("system", None)
form_data = apply_model_params_to_body_openai(params, form_data)
form_data = apply_model_system_prompt_to_body(
system, form_data, metadata, user
)pipe_id = get_pipe_id(form_data)#从表单中获取管道对应的函数ID
function_module = get_function_module_by_id(request, pipe_id) #根据ID获取模块pipe = function_module.pipe#得到该管道的pipe方法
#从表单数据、用户和extra_params中找到并组织好调用pipe方法时所有的参数
params = get_function_params(function_module, form_data, user, extra_params)if form_data.get("stream", False):#一般是流式应答
async def stream_content(): #执行管道的pipe方法,并处理流式应答
try:'''
重要:管道中调用ollama返回的数据是ndjson
ndjson是一种数据交换格式,其中每行包含一个独立的json对象,
需要把所有的行合并成一个大的json对象,langchain完成了合并,
所以调用管道后,返回的内容就是应答内容
'''
res = await execute_pipe(pipe, params)# Directly return if the response is a StreamingResponse
if isinstance(res, StreamingResponse):
async for data in res.body_iterator:
yield data
return
if isinstance(res, dict):
yield f"data: {json.dumps(res)}\n\n"
returnexcept Exception as e:
log.error(f"Error: {e}")
yield f"data: {json.dumps({'error': {'detail':str(e)}})}\n\n"
returnif isinstance(res, str): #正常情况进入本分支
''' --------把大模型应答内容转换成标准openai格式------------
返回数据格式如下:
{"data":
{
"id", "langchain_pipe-{uuid}",
"created": "当前时间",
"model": "langchain_pipe",
choices:[
{
"index":0,
"logprobs":None,
"finish_reason":None
"delta":{
"content": "管道返回内容"
}
}
]
}
}
'''
message = openai_chat_chunk_message_template(form_data["model"], res)
yield f"data: {json.dumps(message)}\n\n"if isinstance(res, Iterator):
for line in res:
yield process_line(form_data, line)if isinstance(res, AsyncGenerator):
async for line in res:
yield process_line(form_data, line)if isinstance(res, str) or isinstance(res, Generator):
finish_message = openai_chat_chunk_message_template(
form_data["model"], ""
)
finish_message["choices"][0]["finish_reason"] = "stop"
yield f"data: {json.dumps(finish_message)}\n\n"
yield "data: [DONE]"#调用stream_content()后,以流式应答返回供process_chat_response处理
return StreamingResponse(stream_content(), media_type="text/event-stream")
else:#非流式请求,不须考虑
try:
res = await execute_pipe(pipe, params) #执行管道的pipe方法except Exception as e:
log.error(f"Error: {e}")
return {"error": {"detail": str(e)}}if isinstance(res, StreamingResponse) or isinstance(res, dict):
return res
if isinstance(res, BaseModel):
return res.model_dump()message = await get_message_content(res)
return openai_chat_completion_message_template(form_data["model"], message)
process_chat_response的处理逻辑在系列3已经分析,在此不做赘述。