新一代请求库niquests使用入门
新一代请求库niquests
- 1. 依赖库的安装
- 1.1 版本验证
- 2. 具体的代码示例
- 3. 异步使用 async/await!
- 4. 基础使用01, 包含同步和异步
- 4.1 get的同步和异步的写法
- 4.2 post的 同步和异步的写法
- 其它的方式的同步和异步写法
- 5. 基础使用02,使用异步
- 6. 基础使用
- 7. DNS over HTTPS
- 7. 流式下载
- 7.1 同步和异步保存文件的写法
- 8. 使用 Niquests 实现 WebSocket 连接:
- 10. 流式上传
- 10.1 异步流式上传
- 10.2 post请求
- 10.3 加入身份验证
- 10.4 支持patch方式
- 10.5 支持delete方式
- 10.6 HEAD 请求来获取标头
- 11. post支持生成器的传入写法
- 11.1 post发送多个文件
- 12. 流式请求
- 13. 使用代理
- 13.1 使用代理02
- 14. 支持 SOCKS,需要安装依赖
- 14.1 具体的代码如下:
- 15. 自动重试
- 16. 超时
- 17. 跟踪真实下载速度
- 18. 异步的写法
- 19. cookie的写法
- 20. 重定向和历史
- 21. 异步会话
- 22. AsyncResponse for streams
- 23. WebSockets 链接
- 23.1 WebSocket 服务器通信的同步和异步的读取和写入
- 24. 具体的代码示例请参考
1. 依赖库的安装
uv add niquestspip install niquests安装版本的选择
1. 如果支持 ws
python -m pip install niquests[ws]2. 如果支持 socks
python -m pip install niquests[socks]3. 如果支持 http3
python -m pip install niquests[http3]4. 多选择的安装pip install niquests[socks,ws]5. 使用全部安装
python -m pip install niquests[full]
1.1 版本验证
import niquests
print(niquests.__version__)
2. 具体的代码示例
>>> import niquests
>>> r = niquests.get('https://one.one.one.one')
>>> r.status_code
200
>>> r.headers['content-type']
'application/json; charset=utf-8'
>>> r.oheaders.content_type.charset
'utf-8'
>>> r.encoding
'utf-8'
>>> r.text
'{"authenticated": true, ...'
>>> r.json()
{'authenticated': True, ...}
>>> r
<Response HTTP/2 [200]>
>>> r.ocsp_verified
True
>>> r.conn_info.established_latency
datetime.timedelta(microseconds=38)
3. 异步使用 async/await!
import niquests
import asyncioasync def main() -> None:r = await niquests.aget('https://one.one.one.one', stream=True)print(r) # Output: <Response HTTP/2 [200]>payload = await r.text # we await text because we set `stream=True`!print(payload) # Output: <html>...# or... without stream=Truer = await niquests.aget('https://one.one.one.one')print(r) # Output: <Response HTTP/3 [200]>payload = r.text # we don't need to away anything, it's already loaded!print(payload) # Output: <html>...asyncio.run(main())
4. 基础使用01, 包含同步和异步
4.1 get的同步和异步的写法
import niquests# 同步的方法
r = niquests.get('https://api.github.com/events')
print(r.status_code)
print(r.text)---> 传入params
payload = {'key1': 'value1', 'key2': 'value2'}
r = niquests.get('https://httpbin.org/get', params=payload)# 异步的方法
r = await niquests.aget('https://api.github.com/events')---> 传入params
payload = {'key1': 'value1', 'key2': 'value2'}
r = await niquests.aget('https://httpbin.org/get', params=payload)----直接使用json()
>>> r = niquests.get(r.url + '/comments')
>>> r.status_code
200>>> comments = r.json()>>> print(comments[0].keys())
['body', 'url', 'created_at', 'updated_at', 'user', 'id']>>> print(comments[2]['body'])
Probably in the "advanced" section
4.2 post的 同步和异步的写法
# 同步的方法
r = niquests.post('https://httpbin.org/post', data={'key': 'value'})# 异步的方法
r = await niquests.apost('https://httpbin.org/post', data={'key': 'value'})
其它的方式的同步和异步写法
PUT、DELETE、HEAD 和 OPTIONS?这些都同样简单
# 同步的方法
r = niquests.put('https://httpbin.org/put', data={'key': 'value'})
r = niquests.delete('https://httpbin.org/delete')
r = niquests.head('https://httpbin.org/get')
r = niquests.options('https://httpbin.org/get')# 异步的方法
r = await niquests.aput('https://httpbin.org/put', data={'key': 'value'})
r = await niquests.adelete('https://httpbin.org/delete')
r = await niquests.ahead('https://httpbin.org/get')
r = await niquests.aoptions('https://httpbin.org/get')
5. 基础使用02,使用异步
import niquests
import asyncioasync def main():response = await niquests.aget('https://one.one.one.one')print(response.status_code)print(await response.text())asyncio.run(main())
6. 基础使用
import niquests# 创建一个Niquests会话
session = niquests.Session()# 设置代理
session.proxies = {'http': 'http://10.10.1.10:3128','https': 'http://10.10.1.10:1080',
}# 现在所有的请求都会通过代理
response = session.get('http://httpbin.org/ip')
print(response.text)
7. DNS over HTTPS
import niquestsresponse = niquests.get('https://one.one.one.one', dns_over_https=True)
print(response.status_code)
7. 流式下载
import niquestsresponse = niquests.get('https://example.com/largefile.zip', stream=True)
with open('largefile.zip', 'wb') as f:for chunk in response.iter_content(chunk_size=8192):f.write(chunk)# 同步的写法
r = niquests.get('https://api.github.com/events', stream=True)r.raw
# <urllib3.response.HTTPResponse object at 0x101194810>r.raw.read(10)
# b'\x1f\x8b\x08\x00\x00\x00\x00\x00\x00\x03'#异步的写法
r = await niquests.aget('https://api.github.com/events', stream=True)r.raw
# <urllib3._async.response.AsyncHTTPResponse object at 0x101194810>await r.raw.read(10)
# b'\x1f\x8b\x08\x00\x00\x00\x00\x00\x00\x03'
7.1 同步和异步保存文件的写法
# 同步的写法
with open(filename, 'wb') as fd:for chunk in r.iter_content(chunk_size=128):fd.write(chunk)#异步的写法
with open(filename, 'wb') as fd:async for chunk in await r.iter_content(chunk_size=128):fd.write(chunk)
8. 使用 Niquests 实现 WebSocket 连接:
import niquestsws = niquests.WebSocket('wss://example.com/socket')
ws.connect()
ws.send('Hello, WebSocket!')
print(ws.recv())
ws.close()
10. 流式上传
with open('massive-body', 'rb') as f:niquests.post('http://some.url/streamed', data=f)# HTTPX
with open('file.bin', 'rb') as f:httpx.post(url, files={'file': f})# Niquests
with open('file.bin', 'rb') as f:niquests.post(url, files={'file': f})
10.1 异步流式上传
import niquests
import asyncio
import aiofileasync def upload() -> None:async with niquests.AsyncSession() as s:async with aiofile.async_open("massive-body", "rb") as afp:r = await s.post("https://httpbingo.org/post", data=afp)if __name__ == "__main__":asyncio.run(upload())
10.2 post请求
>>> body = json.dumps({u"body": u"Sounds great! I'll get right on it!"})
>>> url = u"https://api.github.com/repos/psf/requests/issues/482/comments">>> r = niquests.post(url=url, data=body)
>>> r.status_code
404
10.3 加入身份验证
>>> from niquests.auth import HTTPBasicAuth
>>> auth = HTTPBasicAuth('fake@example.com', 'not_a_real_password')>>> r = niquests.post(url=url, data=body, auth=auth)
>>> r.status_code
201>>> content = r.json()
>>> print(content['body'])
Sounds great! I'll get right on it.
10.4 支持patch方式
>>> print(content[u"id"])
5804413>>> body = json.dumps({u"body": u"Sounds great! I'll get right on it once I feed my cat."})
>>> url = u"https://api.github.com/repos/psf/requests/issues/comments/5804413">>> r = niquests.patch(url=url, data=body, auth=auth)
>>> r.status_code
200
10.5 支持delete方式
>>> r = niquests.delete(url=url, auth=auth)
>>> r.status_code
204
>>> r.headers['status']
'204 No Content'
10.6 HEAD 请求来获取标头
>>> r = niquests.head(url=url, auth=auth)
>>> print(r.headers)
...
'x-ratelimit-remaining': '4995'
'x-ratelimit-limit': '5000'
...
11. post支持生成器的传入写法
def gen():yield 'hi'yield 'there'niquests.post('http://some.url/chunked', data=gen())
11.1 post发送多个文件
>>> url = 'https://httpbin.org/post'
>>> multiple_files = [
... ('images', ('foo.png', open('foo.png', 'rb'), 'image/png')),
... ('images', ('bar.png', open('bar.png', 'rb'), 'image/png'))]
>>> r = niquests.post(url, files=multiple_files)
>>> r.text
{...'files': {'images': ' ....'}'Content-Type': 'multipart/form-data; boundary=3131623adb2043caaeb5538cc7aa0b3a',...
}
12. 流式请求
import json
import niquestsr = niquests.get('https://httpbin.org/stream/20', stream=True)for line in r.iter_lines():# filter out keep-alive new linesif line:decoded_line = line.decode('utf-8')print(json.loads(decoded_line))
13. 使用代理
import niquestsproxies = {'http': 'http://10.10.1.10:3128','https': 'http://10.10.1.10:1080',
}niquests.get('http://example.org', proxies=proxies)
13.1 使用代理02
import niquestsproxies = {'http': 'http://10.10.1.10:3128','https': 'http://10.10.1.10:1080',
}
session = niquests.Session()
session.proxies.update(proxies)session.get('http://example.org')
14. 支持 SOCKS,需要安装依赖
$ python -m pip install niquests[socks]
14.1 具体的代码如下:
import niquests
proxies = {'http': 'socks5://user:pass@host:port','https': 'socks5://user:pass@host:port'
}niquests.get('http://example.org', proxies=proxies)
15. 自动重试
from niquests.packages.urllib3.util import Retry
from niquests import Sessionretries = Retry(total=3,backoff_factor=0.1,status_forcelist=[502, 503, 504],allowed_methods={'POST'},
)s = Session(retries=retries)
s.get("https://1.1.1.1")
16. 超时
r = niquests.get('https://github.com', timeout=5)
或者
r = niquests.get('https://github.com', timeout=(3.05, 27))
或者
r = niquests.get('https://github.com', timeout=None)
17. 跟踪真实下载速度
import niquestswith niquests.Session() as s:with s.get("https://ash-speed.hetzner.com/100MB.bin", stream=True) as r:for chunk in r.iter_content():# do anything you want with chunkprint(r.download_progress.total) # this actually contain the amt of bytes (raw) downloaded from the socket.
18. 异步的写法
session = niquests.Session()
改写为:
session = niquests.AsyncSession()resp = niquests.get(...)
改写为:
resp = await niquests.aget(...)
19. cookie的写法
>>> url = 'https://httpbin.org/cookies'>>> cookies = dict(cookies_are='working')>>> r = niquests.get(url, cookies=cookies)>>> r.text'{"cookies": {"cookies_are": "working"}}'
20. 重定向和历史
>>> r = niquests.get('http://github.com/', allow_redirects=False)
>>> r.status_code
301
>>> r.history
21. 异步会话
import asyncio
from niquests import AsyncSession, Responseasync def fetch(url: str) -> Response:async with AsyncSession() as s:return await s.get(url)async def main() -> None:tasks = []for _ in range(10):tasks.append(asyncio.create_task(fetch("https://httpbingo.org/delay/1")))responses = await asyncio.gather(*tasks)print(responses)if __name__ == "__main__":asyncio.run(main())
22. AsyncResponse for streams
import niquests
import asyncioasync def main() -> None:async with niquests.AsyncSession() as s:r = await s.get("https://httpbingo.org/get", stream=True)async for chunk in await r.iter_content(16):print(chunk)if __name__ == "__main__":asyncio.run(main())-->或者使用01import niquests
import asyncioasync def main() -> None:async with niquests.AsyncSession() as s:r = await s.get("https://httpbingo.org/get", stream=True)async for chunk in r.iter_line():print(chunk)if __name__ == "__main__":asyncio.run(main())-->或者使用02import niquests
import asyncioasync def main() -> None:async with niquests.AsyncSession() as s:r = await s.get("https://httpbingo.org/get", stream=True)payload = await r.json()if __name__ == "__main__":asyncio.run(main())
23. WebSockets 链接
# 同步写法
from niquests import Sessionwith Session() as s:resp = s.get("wss://echo.websocket.org",)print(resp.status_code) # it says "101", for "Switching Protocol"print(resp.extension.next_payload()) # unpack the next message from serverresp.extension.send_payload("Hello World") # automatically sends a text message to the serverprint(resp.extension.next_payload() == "Hello World") # output True!resp.extension.close() # don't forget this call to release the connection!# 异步写法
from niquests import AsyncSession
import asyncioasync def main() -> None:async with AsyncSession() as s:resp = await s.get("wss://echo.websocket.org")# ...print(await resp.extension.next_payload()) # unpack the next message from serverawait resp.extension.send_payload("Hello World") # automatically sends a text message to the serverprint((await resp.extension.next_payload()) == "Hello World") # output True!await resp.extension.close()
23.1 WebSocket 服务器通信的同步和异步的读取和写入
# 同步写法
from __future__ import annotationsfrom niquests import Session, Response, ReadTimeout
from threading import Thread
from time import sleepdef pull_message_from_server(my_response: Response) -> None:"""Read messages here."""iteration_counter = 0while my_response.extension.closed is False:try:# will block for 1s topmessage = my_response.extension.next_payload()if message is None: # server just closed the connection. exit.print("received goaway from server")returnprint(f"received message: '{message}'")except ReadTimeout: # if no message received within 1spasssleep(1) # let some time for the write part to acquire the lockiteration_counter += 1# send a ping every four iterationif iteration_counter % 4 == 0:my_response.extension.ping()print("ping sent")if __name__ == "__main__":with Session() as s:# connect to websocket server "echo.websocket.org" with timeout of 1s (both read and connect)resp = s.get("wss://echo.websocket.org", timeout=1)if resp.status_code != 101:exit(1)t = Thread(target=pull_message_from_server, args=(resp,))t.start()# send messages herefor i in range(30):to_send = f"Hello World {i}"resp.extension.send_payload(to_send)print(f"sent message: '{to_send}'")sleep(1) # let some time for the read part to acquire the lock# exit gently!resp.extension.close()# wait for thread proper exit.t.join()print("program ended!")# 异步写法
import asyncio
from niquests import AsyncSession, ReadTimeout, Responseasync def read_from_ws(my_response: Response) -> None:iteration_counter = 0while my_response.extension.closed is False:try:# will block for 1s topmessage = await my_response.extension.next_payload()if message is None: # server just closed the connection. exit.print("received goaway from server")returnprint(f"received message: '{message}'")except ReadTimeout: # if no message received within 1spassawait asyncio.sleep(1) # let some time for the write part to acquire the lockiteration_counter += 1# send a ping every four iterationif iteration_counter % 4 == 0:await my_response.extension.ping()print("ping sent")async def main() -> None:async with AsyncSession() as s:resp = await s.get("wss://echo.websocket.org", timeout=1)print(resp)task = asyncio.create_task(read_from_ws(resp))for i in range(30):to_send = f"Hello World {i}"await resp.extension.send_payload(to_send)print(f"sent message: '{to_send}'")await asyncio.sleep(1) # let some time for the read part to acquire the lock# exit gently!await resp.extension.close()await taskif __name__ == "__main__":asyncio.run(main())
24. 具体的代码示例请参考
- https://gitcode.com/gh_mirrors/ni/niquests/blob/main/tests
- https://niquests.readthedocs.io/en/latest/user/advanced.html