python基础14 gRPC的流式(stream)传输(二)
一、前言
上一章节的内容,描述了使用gRPC完成基础的请求和返回。在很多场景下,我们需要用RPC的方式,传输较大的数据流,如文件的上传和下载。为了满足这些需求,gRPC提供了stream的方式。从字面意思来理解,stream是一种数据流,用来进行源源不断或连续的数据推送。适用于服务端和客户端进行较大数据量的传输,如上传和下载大数据量的文件。
二、构建proto文件
我们使用文件的上传和下载来实现双向的流式传输。.proto文件定义如下:
syntax = "proto3";
service FileSrv {
//定义上传RPC
rpc UploadFile(stream UploadRequest) returns (StringResponse) {}
//定义下载RPC
rpc DownloadFile(FileHeader) returns (stream FileResponse) {}
}
//文件头定义
message FileHeader {
string filename = 1;
string extension = 2;
}
//上传请求
message UploadRequest {
oneof request {
FileHeader header = 1;
bytes chunk_data = 2;
}
}
//上传结束后返回
message StringResponse {
int32 code = 1;
string message = 2;
}
//下载返回
message FileResponse {
bytes chunk_data = 1;
}
关键字说明:
stream:表明请求或返回以流的方式(连续/持续)进行。
oneof:oneof类型可以同时包含多个类型的字段,但是同时只能有一个字段被设置。示例中,使用的要么是FileHeader,要么使用的是bytes数据流。oneof中的字段以上一级同级,故不可和上一级字段同名。
当前目录结构说明:
工作目录
|--protos
| |-file.proto
在命令行工具中,进入当前工作目录,执行命令行:
python -m grpc_tools.protoc -I=. --python_out=. --grpc_python_out=. ./protos/file.proto
完成后的目录:
接下来,我们将按照如下的目录结构,创建服务单和客户端:
工作目录
|--protos
| |-file.proto
| |-file_pb2.py
| |-file_pb2_grpc.py
|--server.py
|--client.py
|--test.dcm
特别说明:
修改当前文件目录为工作目录
os.chdir(os.path.dirname(__file__))
三、服务端构建
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
from concurrent import futures
import os
import grpc
from protos import file_pb2, file_pb2_grpc
def get_filepath(filename, extension):
return f'{filename}_recv{extension}'
class Greeter(file_pb2_grpc.FileSrv):
#因为流式请求的定义:stream UploadRequest),所以该处的请求参数是一个迭代器
#迭代器处理完成,表示一个文件所有的数据接收完毕
def UploadFile(self, request_iterator, context):
data = bytearray()
filepath = 'dummy'
for request in request_iterator:
if request.header.filename and request.header.extension:
filepath = get_filepath(request.header.filename, request.header.extension)
continue
data.extend(request.chunk_data)
with open(filepath, 'wb') as f:
f.write(data)
return file_pb2.StringResponse(message='Success!')
#下载服务接收的参数是FileHeader
#读取的文件按照定义的块大小进行传输
def DownloadFile(self, request, context):
chunk_size = 1024*1024
filepath = f'{request.filename}{request.extension}'
if os.path.exists(filepath):
with open(filepath, mode="rb") as f:
while True:
chunk = f.read(chunk_size)
if chunk:
entry_response = file_pb2.FileResponse(chunk_data=chunk)
yield entry_response
else: # The chunk was empty, which means we're at the end of the file
return
def serve():
server = grpc.server(futures.ThreadPoolExecutor(max_workers=4))
file_pb2_grpc.add_FileSrvServicer_to_server(Greeter(), server)
server.add_insecure_port('[::]:50051')
server.start()
server.wait_for_termination()
if __name__ == '__main__':
os.chdir(os.path.dirname(__file__))
serve()
yield:带yield的函数是一个生成器,next开始的地方是接着上一次的next停止的地方执行。示例中,使用yield,会依次生成一序列读取到的数据组成的FileResponse。
四、客户端构建
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import os
import grpc
from protos import file_pb2, file_pb2_grpc
def get_filepath(filename, extension):
return f'{filename}_download{extension}'
def read_iterfile(filepath, chunk_size=1024*1024):
split_data = os.path.splitext(filepath)
filename = split_data[0]
extension = split_data[1]
metadata = file_pb2.FileHeader(filename=filename, extension=extension)
yield file_pb2.UploadRequest(header=metadata)
with open(filepath, mode="rb") as f:
while True:
chunk = f.read(chunk_size)
if chunk:
entry_request = file_pb2.UploadRequest(chunk_data=chunk)
yield entry_request
else: # The chunk was empty, which means we're at the end of the file
return
def run():
with grpc.insecure_channel('localhost:50051', options=(('grpc.enable_http_proxy', 0),) ) as channel:
stub = file_pb2_grpc.FileSrvStub(channel)
response = stub.UploadFile(read_iterfile('test.dcm'))
print("client received: " + response.message)
filename = 'test'
extension = '.dcm'
filepath = get_filepath(filename, extension)
for entry_response in stub.DownloadFile(file_pb2.FileHeader(filename=filename, extension=extension)):
with open(filepath, mode="ab") as f:
f.write(entry_response.chunk_data)
if __name__ == '__main__':
os.chdir(os.path.dirname(__file__))
run()
yield:示例中,使用yield,会依次生成一序列读取到的数据组成的request。第一次出现的地方,生成包含header的request,接下来生成包含bytes数据的request。
五、执行结果
如果客户端待上传文件不存在,则会有如下的错误输出(保证输入正确的路径即可解决,本文中,通过设置当前工作路径解决):
Traceback (most recent call last):
File "e:\workdir\python-study\test\gRPC_File\client.py", line 45, in <module>
File "e:\workdir\python-study\test\gRPC_File\client.py", line 33, in run
File "D:\Python\Python39\lib\site-packages\grpc\_channel.py", line 1536, in __call__
File "D:\Python\Python39\lib\site-packages\grpc\_channel.py", line 1006, in _end_unary_response_blocking
raise _InactiveRpcError(state) # pytype: disable=not-instantiable
grpc._channel._InactiveRpcError: <_InactiveRpcError of RPC that terminated with:
status = StatusCode.UNKNOWN
details = "Exception iterating requests!"
debug_error_string = "None"
正常执行后的结果:
六、异步调用
上述示例中,我们在服务端和客户端都使用了同步调用的方式,实现了文件的上传和下载。
gRPC也支持异步方式的调用,主要通过gRPC asyncio api实现。
6.1 异步方式实现服务端改造
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
from concurrent import futures
import os
import grpc
from protos import file_pb2, file_pb2_grpc
import asyncio
def get_filepath(filename, extension):
return f'{filename}_recv{extension}'
class Greeter(file_pb2_grpc.FileSrv):
#因为流式请求的定义:stream UploadRequest),所以该处的请求参数是一个迭代器
#迭代器处理完成,表示一个文件所有的数据接收完毕
def UploadFile(self, request_iterator, context):
data = bytearray()
filepath = 'dummy'
for request in request_iterator:
if request.header.filename and request.header.extension:
filepath = get_filepath(request.header.filename, request.header.extension)
continue
data.extend(request.chunk_data)
with open(filepath, 'wb') as f:
f.write(data)
return file_pb2.StringResponse(message='Success!')
#下载服务接收的参数是FileHeader
#读取的文件按照定义的块大小进行传输
def DownloadFile(self, request, context):
chunk_size = 1024*1024
filepath = f'{request.filename}{request.extension}'
if os.path.exists(filepath):
with open(filepath, mode="rb") as f:
while True:
chunk = f.read(chunk_size)
if chunk:
entry_response = file_pb2.FileResponse(chunk_data=chunk)
yield entry_response
else: # The chunk was empty, which means we're at the end of the file
return
async def serve():
server = grpc.aio.server(futures.ThreadPoolExecutor(max_workers=4))
file_pb2_grpc.add_FileSrvServicer_to_server(Greeter(), server)
server.add_insecure_port('[::]:50051')
await server.start()
# since server.start() will not block,
# a sleep-loop is added to keep alive
try:
await server.wait_for_termination()
except KeyboardInterrupt:
await server.stop(None)
if __name__ == '__main__':
os.chdir(os.path.dirname(__file__))
loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait([serve()]))
loop.close()
要点:
- 使用异步aio来启动服务。
- 定义启动服务函数为异步函数。
6.2 异步方式实现客户端改造
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import os
import asyncio
import grpc
from protos import file_pb2, file_pb2_grpc
def get_filepath(filename, extension):
return f'{filename}_download{extension}'
def read_iterfile(filepath, chunk_size=1024*1024):
split_data = os.path.splitext(filepath)
filename = split_data[0]
extension = split_data[1]
metadata = file_pb2.FileHeader(filename=filename, extension=extension)
yield file_pb2.UploadRequest(header=metadata)
with open(filepath, mode="rb") as f:
while True:
chunk = f.read(chunk_size)
if chunk:
entry_request = file_pb2.UploadRequest(chunk_data=chunk)
yield entry_request
else: # The chunk was empty, which means we're at the end of the file
return
async def run():
async with grpc.aio.insecure_channel('localhost:50051') as channel:
stub = file_pb2_grpc.FileSrvStub(channel)
response =await stub.UploadFile(read_iterfile('test.dcm'))
print("client received: " + response.message)
filename = 'test'
extension = '.dcm'
filepath = get_filepath(filename, extension)
'''
#分步骤
stub = file_pb2_grpc.FileSrvStub(channel)
request = file_pb2.FileHeader(filename=filename, extension=extension)
response_stream = stub.DownloadFile(request)
async for res in response_stream:
'''
#简写
async for entry_response in stub.DownloadFile(file_pb2.FileHeader(filename=filename, extension=extension)):
with open(filepath, mode="ab") as f:
f.write(entry_response.chunk_data)
if __name__ == '__main__':
os.chdir(os.path.dirname(__file__))
asyncio.run(run())
要点:
- 使用asyncio.run执行异步函数。
- 在异步函数使用async和await来执行请求和获取返回。
6.3 执行结果
通过执行,可以得到和同步方法一样的结果:
七、高阶使用
在上述案例中,我们实现了单个文件的上传和下载。如果要实现多个文件的传输该如何实现?初步构想:
7.1 改造FileResponse
message FileResponse {
oneof data {
FileHeader header = 1;
bytes chunk_data = 2;
}
}
服务端在发送文件之前,先发送header,然后再发送bytes数据。
7.2 改造客户端
客户端解析数据时,判断是否为header。如有header,则表示当前为新文件,上一个文件数据已传递完成,接下来的数据需要写入新文件。