OpenAI 实现额外传参
文章目录
- 1. OpenAI 接口说明
- 1.1. 功能说明
- 1.2. 源码分析
- 1.2.1. chat.completions.create 接口定义
- 1.2.2. 处理 extra_headers、extra_query、extra_body
- 1.2.3. 处理 extra_json
- 2. 实例
- 2.1. 客户端
- 2.2. 服务器
1. OpenAI 接口说明
1.1. 功能说明
chat.completions.create 接口如果要想传递额外参数。需要用到以下三个字段。
# Use the following arguments if you need to pass additional parameters to the API that aren't available via kwargs.# 如果需要向API传递无法通过kwargs获得的其他参数,请使用以下参数。# The extra values given here take precedence over values defined on the client or passed to this method.# 此处给出的额外值优先于客户端上定义的值或传递给此方法的值。extra_headers: Headers | None = None,extra_query: Query | None = None,extra_body: Body | None = None,
openai 源码会对 extra_headers、extra_query、extra_body 三个参数转换一次:
- extra_headers 会被解析到 request.headers 中输出;
- extra_query 会被解析到 request.query_params 中输出;
- extra_body 会被解析到接口参数输出,extra_body 参数类型为 Body 实际为 dict。
extra_body 会被重新命名为 extra_json={‘extra_param1’: {‘hello’: ‘world’}, ‘extra_param2’: {‘hello’: ‘world’}};(也不知道为啥要重新命名)
然后接收参数时,不能使用 extra_body 也不能使用 extra_json;会根据 body 体的 key extra_param1 和 extra_param2 接收参数,参考实例。(这个地方有点奇怪)
1.2. 源码分析
1.2.1. chat.completions.create 接口定义
class Completions(SyncAPIResource):@required_args(["messages", "model"], ["messages", "model", "stream"])def create(self,*,messages: Iterable[ChatCompletionMessageParam],model: Union[str, ChatModel],audio: Optional[ChatCompletionAudioParam] | NotGiven = NOT_GIVEN,frequency_penalty: Optional[float] | NotGiven = NOT_GIVEN,function_call: completion_create_params.FunctionCall | NotGiven = NOT_GIVEN,functions: Iterable[completion_create_params.Function] | NotGiven = NOT_GIVEN,logit_bias: Optional[Dict[str, int]] | NotGiven = NOT_GIVEN,logprobs: Optional[bool] | NotGiven = NOT_GIVEN,max_completion_tokens: Optional[int] | NotGiven = NOT_GIVEN,max_tokens: Optional[int] | NotGiven = NOT_GIVEN,metadata: Optional[Metadata] | NotGiven = NOT_GIVEN,modalities: Optional[List[Literal["text", "audio"]]] | NotGiven = NOT_GIVEN,n: Optional[int] | NotGiven = NOT_GIVEN,parallel_tool_calls: bool | NotGiven = NOT_GIVEN,prediction: Optional[ChatCompletionPredictionContentParam] | NotGiven = NOT_GIVEN,presence_penalty: Optional[float] | NotGiven = NOT_GIVEN,prompt_cache_key: str | NotGiven = NOT_GIVEN,reasoning_effort: Optional[ReasoningEffort] | NotGiven = NOT_GIVEN,response_format: completion_create_params.ResponseFormat | NotGiven = NOT_GIVEN,safety_identifier: str | NotGiven = NOT_GIVEN,seed: Optional[int] | NotGiven = NOT_GIVEN,service_tier: Optional[Literal["auto", "default", "flex", "scale", "priority"]] | NotGiven = NOT_GIVEN,stop: Union[Optional[str], List[str], None] | NotGiven = NOT_GIVEN,store: Optional[bool] | NotGiven = NOT_GIVEN,stream: Optional[Literal[False]] | Literal[True] | NotGiven = NOT_GIVEN,stream_options: Optional[ChatCompletionStreamOptionsParam] | NotGiven = NOT_GIVEN,temperature: Optional[float] | NotGiven = NOT_GIVEN,tool_choice: ChatCompletionToolChoiceOptionParam | NotGiven = NOT_GIVEN,tools: Iterable[ChatCompletionToolUnionParam] | NotGiven = NOT_GIVEN,top_logprobs: Optional[int] | NotGiven = NOT_GIVEN,top_p: Optional[float] | NotGiven = NOT_GIVEN,user: str | NotGiven = NOT_GIVEN,verbosity: Optional[Literal["low", "medium", "high"]] | NotGiven = NOT_GIVEN,web_search_options: completion_create_params.WebSearchOptions | NotGiven = NOT_GIVEN,# Use the following arguments if you need to pass additional parameters to the API that aren't available via kwargs.# 如果需要向API传递无法通过kwargs获得的其他参数,请使用以下参数。# The extra values given here take precedence over values defined on the client or passed to this method.# 此处给出的额外值优先于客户端上定义的值或传递给此方法的值。extra_headers: Headers | None = None,extra_query: Query | None = None,extra_body: Body | None = None,timeout: float | httpx.Timeout | None | NotGiven = NOT_GIVEN,) -> ChatCompletion | Stream[ChatCompletionChunk]:validate_response_format(response_format)return self._post("/chat/completions",body=maybe_transform({"messages": messages,"model": model,"audio": audio,"frequency_penalty": frequency_penalty,"function_call": function_call,"functions": functions,"logit_bias": logit_bias,"logprobs": logprobs,"max_completion_tokens": max_completion_tokens,"max_tokens": max_tokens,"metadata": metadata,"modalities": modalities,"n": n,"parallel_tool_calls": parallel_tool_calls,"prediction": prediction,"presence_penalty": presence_penalty,"prompt_cache_key": prompt_cache_key,"reasoning_effort": reasoning_effort,"response_format": response_format,"safety_identifier": safety_identifier,"seed": seed,"service_tier": service_tier,"stop": stop,"store": store,"stream": stream,"stream_options": stream_options,"temperature": temperature,"tool_choice": tool_choice,"tools": tools,"top_logprobs": top_logprobs,"top_p": top_p,"user": user,"verbosity": verbosity,"web_search_options": web_search_options,},completion_create_params.CompletionCreateParamsStreamingif streamelse completion_create_params.CompletionCreateParamsNonStreaming,),# 处理三个参数options=make_request_options(extra_headers=extra_headers, extra_query=extra_query, extra_body=extra_body, timeout=timeout),cast_to=ChatCompletion,stream=stream or False,stream_cls=Stream[ChatCompletionChunk],)
1.2.2. 处理 extra_headers、extra_query、extra_body
def make_request_options(*,query: Query | None = None,extra_headers: Headers | None = None,extra_query: Query | None = None,extra_body: Body | None = None,idempotency_key: str | None = None,timeout: float | httpx.Timeout | None | NotGiven = NOT_GIVEN,post_parser: PostParser | NotGiven = NOT_GIVEN,
) -> RequestOptions:"""Create a dict of type RequestOptions without keys of NotGiven values."""options: RequestOptions = {}# 将 extra_headers 放到 headers 中if extra_headers is not None:options["headers"] = extra_headers# extra_body 修改为 extra_jsonif extra_body is not None:options["extra_json"] = cast(AnyMapping, extra_body)if query is not None:options["params"] = queryif extra_query is not None:options["params"] = {**options.get("params", {}), **extra_query}if not isinstance(timeout, NotGiven):options["timeout"] = timeoutif idempotency_key is not None:options["idempotency_key"] = idempotency_keyif is_given(post_parser):# internaloptions["post_parser"] = post_parser # type: ignoreprint("000000", options)return options
1.2.3. 处理 extra_json
class BaseClient(Generic[_HttpxClientT, _DefaultStreamT]):......def _build_request(self,options: FinalRequestOptions,*,retries_taken: int = 0,) -> httpx.Request:if log.isEnabledFor(logging.DEBUG):log.debug("Request options: %s", model_dump(options, exclude_unset=True))kwargs: dict[str, Any] = {}json_data = options.json_dataprint("111111", json_data)if options.extra_json is not None:if json_data is None:json_data = cast(Body, options.extra_json)elif is_mapping(json_data):json_data = _merge_mappings(json_data, options.extra_json)else:raise RuntimeError(f"Unexpected JSON data type, {type(json_data)}, cannot merge with `extra_body`")print("222222", json_data)headers = self._build_headers(options, retries_taken=retries_taken)params = _merge_mappings(self.default_query, options.params)......
2. 实例
2.1. 客户端
from openai import OpenAI
from openai import AsyncOpenAIapi_key="111111"
base_url="http://127.0.0.1:8080/v1"
model="deepseek-r1:1.5b"# client = AsyncOpenAI(
client = OpenAI(api_key=api_key,base_url=base_url,)response = client.chat.completions.create(model=model,messages=[{"role": "user", "content": "郑州去北京怎么走才最快"},],functions=None,temperature=1,top_p=0,# max_tokens=20,stream=True,extra_headers={"extra_header": "test_header"},# extra_query={"extra_query1": {"hello":"world"}, "extra_query2": {"hello":"world"}, },extra_query={"extra_query1": "hello1", "extra_query2": "hello2"},extra_body={"extra_param1": {"hello": "world"}, "extra_param2": {"hello": "world"}})print(f"##### response:{response}")
# print("####", response["choices"])
# print(response.choices[0].message.content)
think_flag = 0
think_content = ""
chunk_content = ""
output = ""
for chunks in response:content = chunks.choices[0].delta.content or ""print(content)if content == "<think>":think_flag = 1continueelif content == "</think>":think_flag = 2continueif think_flag == 1:think_content += contentelse:chunk_content += content# # print(chunks.choices[0].delta.content or "")# if chunks.choices[0].delta.content:# output += chunks.choices[0].delta.content# # print(output)print(f"think_content:{think_content}, chunk_content:{chunk_content}")
运行结果:
> python.exe .\client.py
000000 {'headers': {'extra_header': 'test_header'}, 'extra_json': {'extra_param1': {'hello': 'world'}, 'extra_param2': {'hello': 'world'}}, 'params': {'extra_query1': 'hello1', 'extra_query2': 'hello2'}}
000111 method='post' url='/chat/completions' params={'extra_query1': 'hello1', 'extra_query2': 'hello2'} headers={'extra_header': 'test_header'} max_retries=NOT_GIVEN timeout=NOT_GIVEN files=None idempotency_key=None post_parser=NOT_GIVEN follow_redirects=None json_data={'messages': [{'role': 'user', 'content': '郑州去北京怎么走才最快'}], 'model': 'deepseek-r1:1.5b', 'functions': None, 'stream': True, 'temperature': 1, 'top_p': 0} extra_json={'extra_param1': {'hello': 'world'}, 'extra_param2': {'hello': 'world'}}
111111 {'messages': [{'role': 'user', 'content': '郑州去北京怎么走才最快'}], 'model': 'deepseek-r1:1.5b', 'functions': None, 'stream': True, 'temperature': 1, 'top_p': 0}
222222 {'messages': [{'role': 'user', 'content': '郑州去北京怎么走才最快'}], 'model': 'deepseek-r1:1.5b', 'functions': None, 'stream': True, 'temperature': 1, 'top_p': 0, 'extra_param1': {'hello': 'world'}, 'extra_param2': {'hello': 'world'}}
##### response:<openai.Stream object at 0x0000022146FC43A0>
2.2. 服务器
from fastapi import FastAPI, Query, Path, Request
import uvicornfrom pydantic import BaseModel
from typing import Optional, List, Dict, Anyclass OpenArgs(BaseModel):model: strmessages: List = []functions: Optional[str]temperature: int = 0top_p: float = 0# max_tokens=20,stream: Optional[bool] = True# 不能使用 extra_body 接收额外参数,也不能使用 extra_json# 需要使用 body 体中的 key 接收参数。# extra_json: Dict = {}extra_param1: Any = None # key1extra_param2: Any = None # key2app = FastAPI()@app.post("/v1/chat/completions")
async def v1_chat_completions(request: Request, args: OpenArgs):# print(request.body())print("header", request.headers.items())print("params", request.query_params.items())# print(request.json())print("openai", args)print(extra_json)return "hello"# return EventSourceResponse(async_process(), media_type="text/event-stream")if __name__ == '__main__':# 2. 启动 web 服务uvicorn.run(app, host="0.0.0.0", port=8080)
运行结果
> python.exe .\server.py
INFO: Started server process [21984]
INFO: Waiting for application startup.
INFO: Application startup complete.
INFO: Uvicorn running on http://0.0.0.0:8080 (Press CTRL+C to quit)
header [('host', '127.0.0.1:8080'), ('accept-encoding', 'gzip, deflate'), ('connection', 'keep-alive'), ('accept', 'application/json'), ('content-type', 'application/json'), ('user-agent', 'OpenAI/Python 1.100.2'), ('x-stainless-lang', 'python'), ('x-stainless-package-version', '1.100.2'), ('x-stainless-os', 'Windows'), ('x-stainless-arch', 'other:amd64'), ('x-stainless-runtime', 'CPython'), ('x-stainless-runtime-version', '3.10.11'), ('authorization', 'Bearer 111111'), ('x-stainless-async', 'false'), ('extra_header', 'test_header'), ('x-stainless-retry-count', '0'), ('x-stainless-read-timeout', '600'), ('content-length', '226')]
params dict_items([('extra_query1', 'hello1'), ('extra_query2', 'hello2')])
openai model='deepseek-r1:1.5b' messages=[{'role': 'user', 'content': '郑州去北京怎么走才最快'}] functions=None temperature=1 top_p=0.0 stream=True extra_param1={'hello': 'world'} extra_param2={'hello': 'world'}
INFO: 127.0.0.1:55352 - "POST /v1/chat/completions?extra_query1=hello1&extra_query2=hello2 HTTP/1.1" 200 OK