当前位置: 首页 > news >正文

新一代请求库niquests使用入门

新一代请求库niquests

    • 1. 依赖库的安装
      • 1.1 版本验证
    • 2. 具体的代码示例
    • 3. 异步使用 async/await!
    • 4. 基础使用01, 包含同步和异步
      • 4.1 get的同步和异步的写法
      • 4.2 post的 同步和异步的写法
      • 其它的方式的同步和异步写法
    • 5. 基础使用02,使用异步
    • 6. 基础使用
    • 7. DNS over HTTPS
    • 7. 流式下载
      • 7.1 同步和异步保存文件的写法
    • 8. 使用 Niquests 实现 WebSocket 连接:
    • 10. 流式上传
      • 10.1 异步流式上传
      • 10.2 post请求
      • 10.3 加入身份验证
      • 10.4 支持patch方式
      • 10.5 支持delete方式
      • 10.6 HEAD 请求来获取标头
    • 11. post支持生成器的传入写法
      • 11.1 post发送多个文件
    • 12. 流式请求
    • 13. 使用代理
      • 13.1 使用代理02
    • 14. 支持 SOCKS,需要安装依赖
      • 14.1 具体的代码如下:
    • 15. 自动重试
    • 16. 超时
    • 17. 跟踪真实下载速度
    • 18. 异步的写法
    • 19. cookie的写法
    • 20. 重定向和历史
    • 21. 异步会话
    • 22. AsyncResponse for streams
    • 23. WebSockets 链接
      • 23.1 WebSocket 服务器通信的同步和异步的读取和写入
    • 24. 具体的代码示例请参考

1. 依赖库的安装

uv add niquestspip install niquests安装版本的选择
1. 如果支持 ws
python -m pip install niquests[ws]2. 如果支持 socks
python -m pip install niquests[socks]3. 如果支持 http3
python -m pip install niquests[http3]4. 多选择的安装pip install niquests[socks,ws]5. 使用全部安装
python -m pip install niquests[full]

1.1 版本验证

import niquests
print(niquests.__version__)

2. 具体的代码示例

>>> import niquests
>>> r = niquests.get('https://one.one.one.one')
>>> r.status_code
200
>>> r.headers['content-type']
'application/json; charset=utf-8'
>>> r.oheaders.content_type.charset
'utf-8'
>>> r.encoding
'utf-8'
>>> r.text
'{"authenticated": true, ...'
>>> r.json()
{'authenticated': True, ...}
>>> r
<Response HTTP/2 [200]>
>>> r.ocsp_verified
True
>>> r.conn_info.established_latency
datetime.timedelta(microseconds=38)

3. 异步使用 async/await!

import niquests
import asyncioasync def main() -> None:r = await niquests.aget('https://one.one.one.one', stream=True)print(r)  # Output: <Response HTTP/2 [200]>payload = await r.text  # we await text because we set `stream=True`!print(payload)  # Output: <html>...# or... without stream=Truer = await niquests.aget('https://one.one.one.one')print(r)  # Output: <Response HTTP/3 [200]>payload = r.text  # we don't need to away anything, it's already loaded!print(payload)  # Output: <html>...asyncio.run(main())

4. 基础使用01, 包含同步和异步

4.1 get的同步和异步的写法


import niquests# 同步的方法
r = niquests.get('https://api.github.com/events')
print(r.status_code)
print(r.text)---> 传入params
payload = {'key1': 'value1', 'key2': 'value2'}
r = niquests.get('https://httpbin.org/get', params=payload)# 异步的方法
r = await niquests.aget('https://api.github.com/events')---> 传入params
payload = {'key1': 'value1', 'key2': 'value2'}
r = await niquests.aget('https://httpbin.org/get', params=payload)----直接使用json()
>>> r = niquests.get(r.url + '/comments')
>>> r.status_code
200>>> comments = r.json()>>> print(comments[0].keys())
['body', 'url', 'created_at', 'updated_at', 'user', 'id']>>> print(comments[2]['body'])
Probably in the "advanced" section

4.2 post的 同步和异步的写法

# 同步的方法
r = niquests.post('https://httpbin.org/post', data={'key': 'value'})# 异步的方法
r = await niquests.apost('https://httpbin.org/post', data={'key': 'value'})

其它的方式的同步和异步写法

PUT、DELETE、HEAD 和 OPTIONS?这些都同样简单

# 同步的方法
r = niquests.put('https://httpbin.org/put', data={'key': 'value'})
r = niquests.delete('https://httpbin.org/delete')
r = niquests.head('https://httpbin.org/get')
r = niquests.options('https://httpbin.org/get')# 异步的方法
r = await niquests.aput('https://httpbin.org/put', data={'key': 'value'})
r = await niquests.adelete('https://httpbin.org/delete')
r = await niquests.ahead('https://httpbin.org/get')
r = await niquests.aoptions('https://httpbin.org/get')

5. 基础使用02,使用异步

import niquests
import asyncioasync def main():response = await niquests.aget('https://one.one.one.one')print(response.status_code)print(await response.text())asyncio.run(main())

6. 基础使用

import niquests# 创建一个Niquests会话
session = niquests.Session()# 设置代理
session.proxies = {'http': 'http://10.10.1.10:3128','https': 'http://10.10.1.10:1080',
}# 现在所有的请求都会通过代理
response = session.get('http://httpbin.org/ip')
print(response.text)

7. DNS over HTTPS

import niquestsresponse = niquests.get('https://one.one.one.one', dns_over_https=True)
print(response.status_code)

7. 流式下载

import niquestsresponse = niquests.get('https://example.com/largefile.zip', stream=True)
with open('largefile.zip', 'wb') as f:for chunk in response.iter_content(chunk_size=8192):f.write(chunk)# 同步的写法
r = niquests.get('https://api.github.com/events', stream=True)r.raw
# <urllib3.response.HTTPResponse object at 0x101194810>r.raw.read(10)
# b'\x1f\x8b\x08\x00\x00\x00\x00\x00\x00\x03'#异步的写法
r = await niquests.aget('https://api.github.com/events', stream=True)r.raw
# <urllib3._async.response.AsyncHTTPResponse object at 0x101194810>await r.raw.read(10)
# b'\x1f\x8b\x08\x00\x00\x00\x00\x00\x00\x03'

7.1 同步和异步保存文件的写法

# 同步的写法
with open(filename, 'wb') as fd:for chunk in r.iter_content(chunk_size=128):fd.write(chunk)#异步的写法
with open(filename, 'wb') as fd:async for chunk in await r.iter_content(chunk_size=128):fd.write(chunk)

8. 使用 Niquests 实现 WebSocket 连接:

import niquestsws = niquests.WebSocket('wss://example.com/socket')
ws.connect()
ws.send('Hello, WebSocket!')
print(ws.recv())
ws.close()

10. 流式上传

with open('massive-body', 'rb') as f:niquests.post('http://some.url/streamed', data=f)# HTTPX
with open('file.bin', 'rb') as f:httpx.post(url, files={'file': f})# Niquests
with open('file.bin', 'rb') as f:niquests.post(url, files={'file': f})

10.1 异步流式上传

import niquests
import asyncio
import aiofileasync def upload() -> None:async with niquests.AsyncSession() as s:async with aiofile.async_open("massive-body", "rb") as afp:r = await s.post("https://httpbingo.org/post", data=afp)if __name__ == "__main__":asyncio.run(upload())

10.2 post请求

>>> body = json.dumps({u"body": u"Sounds great! I'll get right on it!"})
>>> url = u"https://api.github.com/repos/psf/requests/issues/482/comments">>> r = niquests.post(url=url, data=body)
>>> r.status_code
404

10.3 加入身份验证


>>> from niquests.auth import HTTPBasicAuth
>>> auth = HTTPBasicAuth('fake@example.com', 'not_a_real_password')>>> r = niquests.post(url=url, data=body, auth=auth)
>>> r.status_code
201>>> content = r.json()
>>> print(content['body'])
Sounds great! I'll get right on it.

10.4 支持patch方式

>>> print(content[u"id"])
5804413>>> body = json.dumps({u"body": u"Sounds great! I'll get right on it once I feed my cat."})
>>> url = u"https://api.github.com/repos/psf/requests/issues/comments/5804413">>> r = niquests.patch(url=url, data=body, auth=auth)
>>> r.status_code
200

10.5 支持delete方式

>>> r = niquests.delete(url=url, auth=auth)
>>> r.status_code
204
>>> r.headers['status']
'204 No Content'

10.6 HEAD 请求来获取标头

>>> r = niquests.head(url=url, auth=auth)
>>> print(r.headers)
...
'x-ratelimit-remaining': '4995'
'x-ratelimit-limit': '5000'
...

11. post支持生成器的传入写法

def gen():yield 'hi'yield 'there'niquests.post('http://some.url/chunked', data=gen())

11.1 post发送多个文件

>>> url = 'https://httpbin.org/post'
>>> multiple_files = [
...     ('images', ('foo.png', open('foo.png', 'rb'), 'image/png')),
...     ('images', ('bar.png', open('bar.png', 'rb'), 'image/png'))]
>>> r = niquests.post(url, files=multiple_files)
>>> r.text
{...'files': {'images': 'data:image/png;base64,iVBORw ....'}'Content-Type': 'multipart/form-data; boundary=3131623adb2043caaeb5538cc7aa0b3a',...
}

12. 流式请求

import json
import niquestsr = niquests.get('https://httpbin.org/stream/20', stream=True)for line in r.iter_lines():# filter out keep-alive new linesif line:decoded_line = line.decode('utf-8')print(json.loads(decoded_line))

13. 使用代理

import niquestsproxies = {'http': 'http://10.10.1.10:3128','https': 'http://10.10.1.10:1080',
}niquests.get('http://example.org', proxies=proxies)

13.1 使用代理02

import niquestsproxies = {'http': 'http://10.10.1.10:3128','https': 'http://10.10.1.10:1080',
}
session = niquests.Session()
session.proxies.update(proxies)session.get('http://example.org')

14. 支持 SOCKS,需要安装依赖

$ python -m pip install niquests[socks]

14.1 具体的代码如下:

import niquests
proxies = {'http': 'socks5://user:pass@host:port','https': 'socks5://user:pass@host:port'
}niquests.get('http://example.org', proxies=proxies)

15. 自动重试

from niquests.packages.urllib3.util import Retry
from niquests import Sessionretries = Retry(total=3,backoff_factor=0.1,status_forcelist=[502, 503, 504],allowed_methods={'POST'},
)s = Session(retries=retries)
s.get("https://1.1.1.1")

16. 超时

r = niquests.get('https://github.com', timeout=5)
或者
r = niquests.get('https://github.com', timeout=(3.05, 27))
或者
r = niquests.get('https://github.com', timeout=None)

17. 跟踪真实下载速度

import niquestswith niquests.Session() as s:with s.get("https://ash-speed.hetzner.com/100MB.bin", stream=True) as r:for chunk in r.iter_content():# do anything you want with chunkprint(r.download_progress.total)  # this actually contain the amt of bytes (raw) downloaded from the socket.

18. 异步的写法

session = niquests.Session()
改写为:
session = niquests.AsyncSession()resp = niquests.get(...)
改写为:
resp = await niquests.aget(...)

19. cookie的写法

 >>> url = 'https://httpbin.org/cookies'>>> cookies = dict(cookies_are='working')>>> r = niquests.get(url, cookies=cookies)>>> r.text'{"cookies": {"cookies_are": "working"}}'

20. 重定向和历史

>>> r = niquests.get('http://github.com/', allow_redirects=False)
>>> r.status_code
301
>>> r.history

21. 异步会话

import asyncio
from niquests import AsyncSession, Responseasync def fetch(url: str) -> Response:async with AsyncSession() as s:return await s.get(url)async def main() -> None:tasks = []for _ in range(10):tasks.append(asyncio.create_task(fetch("https://httpbingo.org/delay/1")))responses = await asyncio.gather(*tasks)print(responses)if __name__ == "__main__":asyncio.run(main())

22. AsyncResponse for streams

import niquests
import asyncioasync def main() -> None:async with niquests.AsyncSession() as s:r = await s.get("https://httpbingo.org/get", stream=True)async for chunk in await r.iter_content(16):print(chunk)if __name__ == "__main__":asyncio.run(main())-->或者使用01import niquests
import asyncioasync def main() -> None:async with niquests.AsyncSession() as s:r = await s.get("https://httpbingo.org/get", stream=True)async for chunk in r.iter_line():print(chunk)if __name__ == "__main__":asyncio.run(main())-->或者使用02import niquests
import asyncioasync def main() -> None:async with niquests.AsyncSession() as s:r = await s.get("https://httpbingo.org/get", stream=True)payload = await r.json()if __name__ == "__main__":asyncio.run(main())

23. WebSockets 链接

# 同步写法
from niquests import Sessionwith Session() as s:resp = s.get("wss://echo.websocket.org",)print(resp.status_code)  # it says "101", for "Switching Protocol"print(resp.extension.next_payload())  # unpack the next message from serverresp.extension.send_payload("Hello World")  # automatically sends a text message to the serverprint(resp.extension.next_payload() == "Hello World")  # output True!resp.extension.close()  # don't forget this call to release the connection!# 异步写法
from niquests import AsyncSession
import asyncioasync def main() -> None:async with AsyncSession() as s:resp = await s.get("wss://echo.websocket.org")# ...print(await resp.extension.next_payload())  # unpack the next message from serverawait resp.extension.send_payload("Hello World")  # automatically sends a text message to the serverprint((await resp.extension.next_payload()) == "Hello World")  # output True!await resp.extension.close()

23.1 WebSocket 服务器通信的同步和异步的读取和写入

# 同步写法
from __future__ import annotationsfrom niquests import Session, Response, ReadTimeout
from threading import Thread
from time import sleepdef pull_message_from_server(my_response: Response) -> None:"""Read messages here."""iteration_counter = 0while my_response.extension.closed is False:try:# will block for 1s topmessage = my_response.extension.next_payload()if message is None:  # server just closed the connection. exit.print("received goaway from server")returnprint(f"received message: '{message}'")except ReadTimeout:  # if no message received within 1spasssleep(1)  # let some time for the write part to acquire the lockiteration_counter += 1# send a ping every four iterationif iteration_counter % 4 == 0:my_response.extension.ping()print("ping sent")if __name__ == "__main__":with Session() as s:# connect to websocket server "echo.websocket.org" with timeout of 1s (both read and connect)resp = s.get("wss://echo.websocket.org", timeout=1)if resp.status_code != 101:exit(1)t = Thread(target=pull_message_from_server, args=(resp,))t.start()# send messages herefor i in range(30):to_send = f"Hello World {i}"resp.extension.send_payload(to_send)print(f"sent message: '{to_send}'")sleep(1)  # let some time for the read part to acquire the lock# exit gently!resp.extension.close()# wait for thread proper exit.t.join()print("program ended!")# 异步写法
import asyncio
from niquests import AsyncSession, ReadTimeout, Responseasync def read_from_ws(my_response: Response) -> None:iteration_counter = 0while my_response.extension.closed is False:try:# will block for 1s topmessage = await my_response.extension.next_payload()if message is None:  # server just closed the connection. exit.print("received goaway from server")returnprint(f"received message: '{message}'")except ReadTimeout:  # if no message received within 1spassawait asyncio.sleep(1)  # let some time for the write part to acquire the lockiteration_counter += 1# send a ping every four iterationif iteration_counter % 4 == 0:await my_response.extension.ping()print("ping sent")async def main() -> None:async with AsyncSession() as s:resp = await s.get("wss://echo.websocket.org", timeout=1)print(resp)task = asyncio.create_task(read_from_ws(resp))for i in range(30):to_send = f"Hello World {i}"await resp.extension.send_payload(to_send)print(f"sent message: '{to_send}'")await asyncio.sleep(1)  # let some time for the read part to acquire the lock# exit gently!await resp.extension.close()await taskif __name__ == "__main__":asyncio.run(main())

24. 具体的代码示例请参考

  • https://gitcode.com/gh_mirrors/ni/niquests/blob/main/tests
  • https://niquests.readthedocs.io/en/latest/user/advanced.html
http://www.dtcms.com/a/199362.html

相关文章:

  • CI/CD 实践:实现可灰度、可监控、可回滚的现代部署体系
  • 10.15 LangChain v0.3重磅升级:Tool Calling技术颠覆大模型工具调用,效率飙升300%!
  • 第 25 届中国全电展即将启幕,构建闭环能源生态系统推动全球能源转型
  • Easyi3C 新产品发布:I2C Host Adapter
  • 关于百度地图JSAPI自定义标注的图标显示不完整的问题(其实只是因为图片尺寸问题)
  • 交叉引用、多个参考文献插入、跨文献插入word/wps中之【插入[1-3]、连续文献】
  • AcWing 223. 阿九大战朱最学——扩展欧几里得算法
  • 高性能锁机制 CAS:Java 并发编程中的深度剖析
  • 一、内存调优
  • 低功耗:XILINX FPGA如何优化功耗?
  • 人工智能(AI)与BIM:建筑业创新实践的深度融合
  • 供应商管理有哪些风险点?
  • C++11特性
  • HTML向四周扩散背景
  • DriveGenVLM:基于视觉-语言模型的自动驾驶真实世界视频生成
  • C#中的ThreadStart委托
  • 代理IP高可用性与稳定性方案:负载均衡、节点健康监测与智能切换策略
  • LLaMA-Factory:了解webUI参数
  • 【hive】hive内存dump导出hprof文件
  • 虚幻引擎5-Unreal Engine笔记之什么时候新建GameMode,什么时候新建关卡?
  • solidity智能合约-知识点
  • 开源音视频转文字工具:基于 Vosk 和 Whisper 的多语言语音识别项目
  • B/S架构和C/S架构的介绍与分析
  • 如何在LVGL之外的线程更新UI内容
  • 从纸质契约到智能契约:AI如何改写信任规则与商业效率?​——从智能合约到监管科技,一场颠覆传统商业逻辑的技术革命
  • Unreal 从入门到精通之SceneCaptureComponent2D实现UI层3D物体360°预览
  • 学习VS2022离线安装包的下载方法
  • STC-ISP烧录过程中一直显示“正在检测单片机”的解决办法
  • WebSphere Application Server(WAS)8.5.5教程第五讲
  • 解释加密中的加盐操作