当前位置: 首页 > news >正文

python基础14 gRPC的流式(stream)传输(二)

一、前言

上一章节的内容,描述了使用gRPC完成基础的请求和返回。在很多场景下,我们需要用RPC的方式,传输较大的数据流,如文件的上传和下载。为了满足这些需求,gRPC提供了stream的方式。从字面意思来理解,stream是一种数据流,用来进行源源不断或连续的数据推送。适用于服务端和客户端进行较大数据量的传输,如上传和下载大数据量的文件。

二、构建proto文件

我们使用文件的上传和下载来实现双向的流式传输。.proto文件定义如下:


syntax = "proto3";

service FileSrv {
  //定义上传RPC
 rpc UploadFile(stream UploadRequest) returns (StringResponse) {}
   //定义下载RPC
 rpc DownloadFile(FileHeader) returns (stream FileResponse) {}
}


//文件头定义
message FileHeader {
  string filename = 1;
  string extension = 2;
}

//上传请求
message UploadRequest {
  oneof request {
    FileHeader header = 1;
    bytes chunk_data = 2;
  }
}

//上传结束后返回
message StringResponse {
  int32 code = 1;
  string message = 2;
}

//下载返回
message FileResponse {
  bytes chunk_data = 1;
}

关键字说明:

stream:表明请求或返回以流的方式(连续/持续)进行。

oneof:oneof类型可以同时包含多个类型的字段,但是同时只能有一个字段被设置。示例中,使用的要么是FileHeader,要么使用的是bytes数据流。oneof中的字段以上一级同级,故不可和上一级字段同名。

当前目录结构说明:

工作目录
|--protos
|  |-file.proto

在命令行工具中,进入当前工作目录,执行命令行:

python -m grpc_tools.protoc -I=. --python_out=. --grpc_python_out=. ./protos/file.proto

完成后的目录:

接下来,我们将按照如下的目录结构,创建服务单和客户端:

工作目录
|--protos
|  |-file.proto
|  |-file_pb2.py
|  |-file_pb2_grpc.py
|--server.py
|--client.py
|--test.dcm

    特别说明:

修改当前文件目录为工作目录

os.chdir(os.path.dirname(__file__))

三、服务端构建

#!/usr/bin/env python3
# -*- coding: utf-8 -*-

from concurrent import futures
import os
import grpc
from protos import file_pb2, file_pb2_grpc


def get_filepath(filename, extension):
    return f'{filename}_recv{extension}'


class Greeter(file_pb2_grpc.FileSrv):

    #因为流式请求的定义:stream UploadRequest),所以该处的请求参数是一个迭代器
    #迭代器处理完成,表示一个文件所有的数据接收完毕
    def UploadFile(self, request_iterator, context):
        data = bytearray()
        filepath = 'dummy'

        for request in request_iterator:
            if request.header.filename and request.header.extension:
                filepath = get_filepath(request.header.filename, request.header.extension)
                continue
            data.extend(request.chunk_data)
        with open(filepath, 'wb') as f:
            f.write(data)
        return file_pb2.StringResponse(message='Success!')

    #下载服务接收的参数是FileHeader
    #读取的文件按照定义的块大小进行传输
    def DownloadFile(self, request, context):
        chunk_size = 1024*1024

        filepath = f'{request.filename}{request.extension}'
        if os.path.exists(filepath):
            with open(filepath, mode="rb") as f:
                while True:
                    chunk = f.read(chunk_size)
                    if chunk:
                        entry_response = file_pb2.FileResponse(chunk_data=chunk)
                        yield entry_response
                    else:  # The chunk was empty, which means we're at the end of the file
                        return


def serve():
    server = grpc.server(futures.ThreadPoolExecutor(max_workers=4))
    file_pb2_grpc.add_FileSrvServicer_to_server(Greeter(), server)
    server.add_insecure_port('[::]:50051')
    server.start()
    server.wait_for_termination()


if __name__ == '__main__':
    os.chdir(os.path.dirname(__file__))
    serve()

yield:带yield的函数是一个生成器,next开始的地方是接着上一次的next停止的地方执行。示例中,使用yield,会依次生成一序列读取到的数据组成的FileResponse。

四、客户端构建

#!/usr/bin/env python3
# -*- coding: utf-8 -*-

import os
import grpc
from protos import file_pb2, file_pb2_grpc
 
def get_filepath(filename, extension):
    return f'{filename}_download{extension}'
 
 
def read_iterfile(filepath, chunk_size=1024*1024):
    split_data = os.path.splitext(filepath)
    filename = split_data[0]
    extension = split_data[1]
 
    metadata = file_pb2.FileHeader(filename=filename, extension=extension)
    yield file_pb2.UploadRequest(header=metadata)
    
    with open(filepath, mode="rb") as f:
        while True:
            chunk = f.read(chunk_size)
            if chunk:
                entry_request = file_pb2.UploadRequest(chunk_data=chunk)
                yield entry_request
            else:  # The chunk was empty, which means we're at the end of the file
                return
    

 
 
def run():
    with grpc.insecure_channel('localhost:50051', options=(('grpc.enable_http_proxy', 0),) ) as channel:
        stub = file_pb2_grpc.FileSrvStub(channel)
 
        response = stub.UploadFile(read_iterfile('test.dcm'))
        print("client received: " + response.message)
 
        filename = 'test'
        extension = '.dcm'
        filepath = get_filepath(filename, extension)
        for entry_response in stub.DownloadFile(file_pb2.FileHeader(filename=filename, extension=extension)):
            with open(filepath, mode="ab") as f:
                f.write(entry_response.chunk_data)
 
 
if __name__ == '__main__':
    os.chdir(os.path.dirname(__file__))
    run()

yield:示例中,使用yield,会依次生成一序列读取到的数据组成的request。第一次出现的地方,生成包含header的request,接下来生成包含bytes数据的request。

五、执行结果

如果客户端待上传文件不存在,则会有如下的错误输出(保证输入正确的路径即可解决,本文中,通过设置当前工作路径解决):

Traceback (most recent call last):
  File "e:\workdir\python-study\test\gRPC_File\client.py", line 45, in <module>
  File "e:\workdir\python-study\test\gRPC_File\client.py", line 33, in run
  File "D:\Python\Python39\lib\site-packages\grpc\_channel.py", line 1536, in __call__
  File "D:\Python\Python39\lib\site-packages\grpc\_channel.py", line 1006, in _end_unary_response_blocking
    raise _InactiveRpcError(state)  # pytype: disable=not-instantiable
grpc._channel._InactiveRpcError: <_InactiveRpcError of RPC that terminated with:
        status = StatusCode.UNKNOWN
        details = "Exception iterating requests!"
        debug_error_string = "None"

正常执行后的结果:

六、异步调用

上述示例中,我们在服务端和客户端都使用了同步调用的方式,实现了文件的上传和下载。

gRPC也支持异步方式的调用,主要通过gRPC asyncio api实现。

6.1 异步方式实现服务端改造

#!/usr/bin/env python3
# -*- coding: utf-8 -*-

from concurrent import futures
import os
import grpc
from protos import file_pb2, file_pb2_grpc
import asyncio


def get_filepath(filename, extension):
    return f'{filename}_recv{extension}'


class Greeter(file_pb2_grpc.FileSrv):

    #因为流式请求的定义:stream UploadRequest),所以该处的请求参数是一个迭代器
    #迭代器处理完成,表示一个文件所有的数据接收完毕
    def UploadFile(self, request_iterator, context):
        data = bytearray()
        filepath = 'dummy'

        for request in request_iterator:
            if request.header.filename and request.header.extension:
                filepath = get_filepath(request.header.filename, request.header.extension)
                continue
            data.extend(request.chunk_data)
        with open(filepath, 'wb') as f:
            f.write(data)
        return file_pb2.StringResponse(message='Success!')

    #下载服务接收的参数是FileHeader
    #读取的文件按照定义的块大小进行传输
    def DownloadFile(self, request, context):
        chunk_size = 1024*1024

        filepath = f'{request.filename}{request.extension}'
        if os.path.exists(filepath):
            with open(filepath, mode="rb") as f:
                while True:
                    chunk = f.read(chunk_size)
                    if chunk:
                        entry_response = file_pb2.FileResponse(chunk_data=chunk)
                        yield entry_response
                    else:  # The chunk was empty, which means we're at the end of the file
                        return

async def serve():
    server = grpc.aio.server(futures.ThreadPoolExecutor(max_workers=4))
    file_pb2_grpc.add_FileSrvServicer_to_server(Greeter(), server)
    server.add_insecure_port('[::]:50051')
    await server.start()

        # since server.start() will not block,
        # a sleep-loop is added to keep alive
    try:
        await server.wait_for_termination()
    except KeyboardInterrupt:
        await server.stop(None)


if __name__ == '__main__':
    os.chdir(os.path.dirname(__file__))
    loop = asyncio.get_event_loop()
    loop.run_until_complete(asyncio.wait([serve()]))
    loop.close()

 要点:

  • 使用异步aio来启动服务。
  • 定义启动服务函数为异步函数。

6.2 异步方式实现客户端改造

#!/usr/bin/env python3
# -*- coding: utf-8 -*-

import os
import asyncio
import grpc
from protos import file_pb2, file_pb2_grpc

def get_filepath(filename, extension):
    return f'{filename}_download{extension}'
 
 
def read_iterfile(filepath, chunk_size=1024*1024):
    split_data = os.path.splitext(filepath)
    filename = split_data[0]
    extension = split_data[1]
 
    metadata = file_pb2.FileHeader(filename=filename, extension=extension)
    yield file_pb2.UploadRequest(header=metadata)
    
    with open(filepath, mode="rb") as f:
        while True:
            chunk = f.read(chunk_size)
            if chunk:
                entry_request = file_pb2.UploadRequest(chunk_data=chunk)
                yield entry_request
            else:  # The chunk was empty, which means we're at the end of the file
                return
    
async def run():
        async with grpc.aio.insecure_channel('localhost:50051') as channel:
            stub = file_pb2_grpc.FileSrvStub(channel)
    
            response =await stub.UploadFile(read_iterfile('test.dcm'))
            print("client received: " + response.message)
    
            filename = 'test'
            extension = '.dcm'
            filepath = get_filepath(filename, extension)
            '''
            #分步骤
            stub = file_pb2_grpc.FileSrvStub(channel)
            request = file_pb2.FileHeader(filename=filename, extension=extension)
            response_stream = stub.DownloadFile(request)
            async for res in response_stream:            
            '''
            #简写
            async for entry_response in stub.DownloadFile(file_pb2.FileHeader(filename=filename, extension=extension)):
                with open(filepath, mode="ab") as f:
                    f.write(entry_response.chunk_data)
 
 
if __name__ == '__main__':
    os.chdir(os.path.dirname(__file__))
    asyncio.run(run())

要点:

  • 使用asyncio.run执行异步函数。
  • 在异步函数使用async和await来执行请求和获取返回。

 6.3 执行结果

通过执行,可以得到和同步方法一样的结果:

七、高阶使用

在上述案例中,我们实现了单个文件的上传和下载。如果要实现多个文件的传输该如何实现?初步构想:

7.1 改造FileResponse

message FileResponse {
  oneof data {
    FileHeader header = 1;
    bytes chunk_data = 2;
  }
}

服务端在发送文件之前,先发送header,然后再发送bytes数据。 

7.2 改造客户端

客户端解析数据时,判断是否为header。如有header,则表示当前为新文件,上一个文件数据已传递完成,接下来的数据需要写入新文件。 

相关文章:

  • 网络互连与互联网2
  • 镜像端口及观察端口的配置
  • WebPages 对象
  • 复习防火墙(二)
  • 【KWDB 创作者计划】_二进制安装部署 KWDB 踩过的坑和经验
  • 苍穹外卖|第二篇
  • Vue学习笔记 - 插件
  • js day5
  • JAVA实战开源项目:智能无人仓库管理系统 (Vue+SpringBoot) 附源码
  • 在 M1 芯片的 Mac 电脑上安装 Redis 可以通过 ​​Homebrew​​ 快速完成
  • 提高课:数据结构之树状数组
  • 注解方式实现主类属性和组合子类属性递归Valid校验
  • 八大定位UI
  • Ubuntu在桌面缺少图标
  • Android Studio - 解决 Please Select Android SDK
  • 【3dSwap】3D-Aware Face Swapping
  • Linux安装postgresql17
  • spring boot大文件与多文件下载
  • 超级码科技发布镂空AI保险胶带,重塑包装防伪新标准
  • 全国产V7-690T核心板/算法验证板/FPGA开发板
  • 工商核名在哪个网站/合肥今天的最新消息
  • 厦门长实建设有限公司网站/网站优化检测
  • 网站建设查询/做seo网页价格
  • 网站建设一般多钱/揭阳百度seo公司
  • 网站管理是什么工作/合肥seo关键词排名
  • 做app和做网站相同和区别/免费推广app平台有哪些