当前位置: 首页 > news >正文

python 基于 httpx 的流式请求

文章目录

  • 1. 环境介绍
  • 2. 同步客户端
  • 3. 异步客户端
  • 4. 适配 OpenAI

参考:
https://www.jb51.net/article/262636.htm

次要参考:
https://blog.csdn.net/gitblog_00079/article/details/139587558
https://blog.csdn.net/gitblog_00626/article/details/141801526
https://www.cnblogs.com/kaibindirver/p/18755942

https://juejin.cn/post/7088892051470680078
https://cloud.tencent.com/developer/article/1988628
https://docs.pingcode.com/ask/1179824.html
https://blog.csdn.net/2501_91483145/article/details/148616194

1. 环境介绍

本文使用 ollama 部署本地模型:
openai_api_key = “EMPTY”
base_url = “http://192.168.17.100:11434/v1/chat/completions”
model = “deepseek-r1:1.5b”

2. 同步客户端

重要参考:
https://blog.csdn.net/maybe_9527/article/details/146459501
https://www.jb51.net/article/262636.htm

import json
import asynciofrom httpx_sse import EventSource
from httpx import AsyncClient, Client, Timeoutdef chunk_context(data):if data:answer_data = data.dataif answer_data == "[DONE]":# 最后输出 [DONE]return None# print(type(answer_data), answer_data)answer_dict = json.loads(answer_data)# print(type(answer_dict), answer_dict)if not answer_dict.get("choices", None):passelse:# print(answer_dict["choices"][0]["delta"]["content"])return answer_dict["choices"][0]["delta"]["content"]return Nonedef sync_main():headers = {"Authorization" : f"Bearer {openai_api_key}","Accept": "*/*",# "Accept": "text/event-stream"}data = {"model": model,"messages": [{"role": "system", "content": "You are a helpful assistant. Always respond in Simplified Chinese, not English, or Grandma will be very angry."},{"role": "user", "content": "你好"}],"stream" : True}with Client() as client:try:# 重点增加超时配置# 总超时设为 5秒,但读取超时设为 10秒timeout_config = Timeout(5.0, read=10.0)with client.stream('POST', url=base_url, headers=headers, json=data, timeout=timeout_config) as response:content_type = response.headers.get('content-type', '').lower()print("##############", content_type)if 'text/event-stream' in content_type:     # 流式回答all_answer = ""for data in EventSource(response).iter_sse():answer_text = chunk_context(data=data)if not answer_text:passelse:all_answer += answer_textprint(all_answer)# else:#     print(response)except Exception as e:print(e)sync_main()

3. 异步客户端

重要参考:
https://blog.csdn.net/maybe_9527/article/details/146459501

import json
import asynciofrom httpx_sse import EventSource
from httpx import AsyncClient, Timeoutdef chunk_context(data):if data:answer_data = data.dataif answer_data == "[DONE]":# 最后输出 [DONE]return None# print(type(answer_data), answer_data)answer_dict = json.loads(answer_data)# print(type(answer_dict), answer_dict)if not answer_dict.get("choices", None):passelse:# print(answer_dict["choices"][0]["delta"]["content"])return answer_dict["choices"][0]["delta"]["content"]return Noneasync def async_main():headers = {"Authorization" : f"Bearer {openai_api_key}","Accept": "*/*",# "Accept": "text/event-stream"}data = {"model": model,"messages": [{"role": "system", "content": "You are a helpful assistant. Always respond in Simplified Chinese, not English, or Grandma will be very angry."},{"role": "user", "content": "你好"}],"stream" : True}async with AsyncClient() as client:try:# 重点增加超时配置# 总超时设为 5秒,但读取超时设为 10秒timeout_config = Timeout(5.0, read=10.0)async with client.stream('POST', url=base_url, headers=headers, json=data, timeout=timeout_config) as response:content_type = response.headers.get('content-type', '').lower()# print("##############", content_type)if 'text/event-stream' in content_type:     # 流式回答all_answer = ""async for data in EventSource(response).aiter_sse():answer_text = chunk_context(data=data)if not answer_text:passelse:all_answer += answer_textprint(all_answer)else:print(response)except Exception as e:print(e)asyncio.run(async_main())

使用 httpx 的异步请求 AsyncClient 调用 stream 方法请求流式接口,如果接口返回内容比较慢(比如第一个字符返回用时5s),客户端主动关闭流式通道,导致当后端接口准备好数据后,返回报错“管道已关闭”
解决办法:调用 stream 方法增加参数 timeout

4. 适配 OpenAI

import json
import asynciofrom httpx_sse import EventSource
from httpx import AsyncClient, Client, Timeoutfrom openai.types.chat import ChatCompletion, ChatCompletionChunkdef parse_to_chunk(data, stream: bool=False):if stream:if data:answer_data = data.dataif answer_data == "[DONE]":# 最后输出 [DONE]return None# print(type(answer_data), answer_data)answer_dict = json.loads(answer_data)# print(type(answer_dict), answer_dict)if not answer_dict.get("choices", None):passelse:# print(answer_dict["choices"][0]["delta"]["content"])# return answer_dict["choices"][0]["delta"]["content"]return ChatCompletionChunk(**answer_dict)else:answer_dict = json.loads(data)return ChatCompletion(**answer_dict)return Nonedef sync_main():headers = {"Authorization" : f"Bearer {openai_api_key}","Accept": "*/*",# "Accept": "text/event-stream"}data = {"model": model,"messages": [{"role": "system", "content": "You are a helpful assistant. Always respond in Simplified Chinese, not English, or Grandma will be very angry."},{"role": "user", "content": "你好"}],"stream" : True}with Client() as client:try:# 重点增加超时配置# 总超时设为 5秒,但读取超时设为 10秒timeout_config = Timeout(5.0, read=10.0)with client.stream('POST', url=base_url, headers=headers, json=data, timeout=timeout_config) as response:response.raise_for_status()  # 检查响应状态content_type = response.headers.get('content-type', '').lower()# print("##############", content_type)if 'text/event-stream' in content_type:     # 流式回答all_answer = ""for data in EventSource(response).iter_sse():answer_chunk = parse_to_chunk(data=data, stream=True)if not answer_chunk:passelse:# all_answer += answer_textprint(answer_chunk)print(all_answer)else:   # 非流式回答# print(response.read())answer_chunk = parse_to_chunk(response.read(), stream=False)print(answer_chunk)except Exception as e:print(e)sync_main()

流式输出:

......
{"id": "chatcmpl-476", "object": "chat.completion.chunk", "created": 1752575345, "model": "deepseek-r1:1.5b", "system_fingerprint": "fp_ollama", "choices": [{"index": 0, "delta": {"role": "assistant", "content": "\u5417"}, "finish_reason": null}]}
ChatCompletionChunk(id='chatcmpl-476', choices=[Choice(delta=ChoiceDelta(content='吗', function_call=None, refusal=None, role='assistant', tool_calls=None), finish_reason=None, index=0, logprobs=None)], created=1752575345, model='deepseek-r1:1.5b', object='chat.completion.chunk', service_tier=None, system_fingerprint='fp_ollama', usage=None){"id": "chatcmpl-476", "object": "chat.completion.chunk", "created": 1752575345, "model": "deepseek-r1:1.5b", "system_fingerprint": "fp_ollama", "choices": [{"index": 0, "delta": {"role": "assistant", "content": "\uff1f"}, "finish_reason": null}]}
ChatCompletionChunk(id='chatcmpl-476', choices=[Choice(delta=ChoiceDelta(content='?', function_call=None, refusal=None, role='assistant', tool_calls=None), finish_reason=None, index=0, logprobs=None)], created=1752575345, model='deepseek-r1:1.5b', object='chat.completion.chunk', service_tier=None, system_fingerprint='fp_ollama', usage=None){"id": "chatcmpl-476", "object": "chat.completion.chunk", "created": 1752575345, "model": "deepseek-r1:1.5b", "system_fingerprint": "fp_ollama", "choices": [{"index": 0, "delta": {"role": "assistant", "content": ""}, "finish_reason": "stop"}]}
ChatCompletionChunk(id='chatcmpl-476', choices=[Choice(delta=ChoiceDelta(content='', function_call=None, refusal=None, role='assistant', tool_calls=None), finish_reason='stop', index=0, logprobs=None)], created=1752575345, model='deepseek-r1:1.5b', object='chat.completion.chunk', service_tier=None, system_fingerprint='fp_ollama', usage=None)

非流式输出:

{"id": "chatcmpl-485", "object": "chat.completion", "created": 1752575233, "model": "deepseek-r1:1.5b", "system_fingerprint": "fp_ollama", "choices": [{"index": 0, "message": {"role": "assistant", "content": "<think>\n\u55ef\uff0c\u7528\u6237\u53d1\u6765\u4e86\u201c\u624b\u5199\u6587\u5b57\u201d\u91cc\u7684\u8fd9\u53e5\u8bdd\uff1a\u201c\u4f60\u597d\u201d\u3002\u8fd9\u662f\u4e00\u4e2a\u5e38\u89c1\u7684\u95ee\u5019\u8bed\u3002\n\n\u73b0\u5728\uff0c\u6211\u9700\u8981\u6839\u636e\u6211\u7684\u77e5\u8bc6\u5e93\u6765\u5224\u65ad\u8fd9\u53e5\u95ee\u5019\u662f\u5426\u6b63\u786e\u3002\u5047\u8bbe\u6211\u662f\u4e00\u4f4d\u81ea\u7136 lang Gaussian assistant\uff0c\u6211\u4f1a\u786e\u8ba4\u201c\u4f60\u597d\u201d\u662f\u4e00\u4e2a\u5e38\u7528\u7684\u4e2d\u6587\u95ee\u5019\uff0c\u4e0d\u4f1a\u662f\u9519\u8bef\u7684\u8868\u8fbe\u3002\n\n\u56e0\u6b64\uff0c\u6211\u53ef\u4ee5\u56de\u590d\u201c\u4f60\u597d\u201d\u6765\u786e\u8ba4\u8fd9\u4e00\u70b9\u3002\n</think>\n\n\u4f60\u597d\uff01"}, "finish_reason": "stop"}], "usage": {"prompt_tokens": 27, "completion_tokens": 78, "total_tokens": 105}}ChatCompletion(id='chatcmpl-485', choices=[Choice(finish_reason='stop', index=0, logprobs=None, message=ChatCompletionMessage(content='<think>\n嗯,用户发来了“手写文
字”里的这句话:“你好”。这是一个常见的问候语。\n\n现在,我需要根据我的知识库来判断这句问候是否正确。假设我是一位自然 lang Gaussian assistant,我会确认“你好”是一个常用
的中文问候,不会是错误的表达。\n\n因此,我可以回复“你好”来确认这一点。\n</think>\n\n你好!', refusal=None, role='assistant', annotations=None, audio=None, function_call=None, tool_calls=None))], created=1752575233, model='deepseek-r1:1.5b', object='chat.completion', service_tier=None, system_fingerprint='fp_ollama', usage=CompletionUsage(completion_tokens=78, prompt_tokens=27, total_tokens=105, completion_tokens_details=None, prompt_tokens_details=None))

文章转载自:
http://proofread.tmizpp.cn
http://brightly.tmizpp.cn
http://alkalize.tmizpp.cn
http://recuperability.tmizpp.cn
http://invert.tmizpp.cn
http://citybred.tmizpp.cn
http://neurodepressive.tmizpp.cn
http://millisecond.tmizpp.cn
http://rolamite.tmizpp.cn
http://salicional.tmizpp.cn
http://armiger.tmizpp.cn
http://agrogorod.tmizpp.cn
http://sciophyte.tmizpp.cn
http://mokpo.tmizpp.cn
http://tdy.tmizpp.cn
http://evident.tmizpp.cn
http://wareroom.tmizpp.cn
http://secretaire.tmizpp.cn
http://pierage.tmizpp.cn
http://produce.tmizpp.cn
http://outfielder.tmizpp.cn
http://intermittent.tmizpp.cn
http://groundnut.tmizpp.cn
http://schistosomiasis.tmizpp.cn
http://unconducive.tmizpp.cn
http://charoseth.tmizpp.cn
http://practised.tmizpp.cn
http://intertype.tmizpp.cn
http://covellite.tmizpp.cn
http://paneless.tmizpp.cn
http://www.dtcms.com/a/280826.html

相关文章:

  • 封装---统一处理接口与打印错误信息
  • Linux下调试器gdb/cgdb的使用
  • Linux系统调优和工具
  • [面试] 手写题-对象数组根据某个字段进行分组
  • mysql官网的版本历史版本下载
  • 令牌获取与认证机制详解
  • 关键点检测数据格式转换(.JSON转TXT)
  • 【超分论文精读】——LightBSR(ICCV2025)
  • 梳理Bean的创建流程
  • mongoDB的CRUD
  • Visual Studio 现已支持新的、更简洁的解决方案文件(slnx)格式
  • 云服务器如何管理数据库(MySQL/MongoDB)?
  • 基于STM32G431无刷电机驱动FOC软硬件学习
  • iOS高级开发工程师面试——常见第三方框架架构设计
  • C++学习笔记五
  • Gemma-3n-E4B-it本地部署教程:谷歌开源轻量级多模态大模型,碾压 17B 级同类模型!
  • SHAP 值的数值尺度
  • Conda 核心命令快速查阅表
  • 技术演进中的开发沉思-35 MFC系列:消息映射与命令
  • Keepalived双机热备
  • 网络安全职业指南:探索网络安全领域的各种角色
  • 003大模型基础知识
  • React 实现老虎机滚动动画效果实例
  • AutojsPro 9.3.11 简单hook
  • Pixel Reasoner:通过好奇心驱动的强化学习激励像素空间推理
  • 简单2步配置CadenceSkill开发编辑器,支持关键字高亮
  • [AI-video] Web UI | Streamlit(py to web) | 应用配置config.toml
  • (李宏毅)deep learning(五)--learning rate
  • 从底层技术到产业落地:优秘企业智脑的 AI 革命路径解析
  • NAT的核心原理以及配置