基于 Python 构建的安全 gRPC 服务——TLS、mTLS 与 Casbin 授权实战
在分布式系统中,服务之间的通信如果没有经过加密和认证,就如同在公共 Wi-Fi 上传输明文密码——任何人都能拦截和伪造请求。
本文带大家一步步实现一个完整的安全通信系统(完整代码附在文末),包括:
✅ TLS 加密传输 ✅ mTLS 双向认证 ✅ Casbin 权限控制 ✅ gRPC 高性能通信。
一、背景
在真实的微服务环境中,我们通常需要确保:
- 通信加密(TLS)——防止中间人攻击;
- 身份认证(mTLS)——双方都能验证对方身份;
- 权限控制(ACL/RBAC)——限制每个客户端的操作范围;
- 高性能通信协议——在服务间传输结构化数据(使用 gRPC)。
本文实现的系统是一个 安全的日志服务 Secure Log Service,包含以下功能:
- 客户端可以向服务端发送日志(
Produce
) - 客户端可以读取日志(
Consume
) - 双方通信全程加密,必须使用受信任的证书
- Casbin 授权控制哪个客户端能执行哪些操作
二、系统架构
整体设计如下:
模块说明:
模块 | 功能 |
---|---|
gRPC Server | 提供 Produce / Consume RPC 接口 |
mTLS 层 | 使用客户端和服务端证书双向验证 |
Casbin | 负责 ACL 授权判断 |
LogStore | 简单的内存存储(可替换为数据库) |
三、证书体系:从信任根出发
1️⃣ 生成根 CA
使用 CFSSL 工具生成根证书:
cfssl gencert -initca ca-csr.json | cfssljson -bare ca
这一步生成:
ca.pem
根证书(公钥)ca-key.pem
根私钥
2️⃣ 签发服务器证书
cfssl gencert -ca=ca.pem -ca-key=ca-key.pem \-config=ca-config.json -profile=server server-csr.json | cfssljson -bare server
生成:
server.pem
服务端证书server-key.pem
服务端私钥
3️⃣ 签发客户端证书
cfssl gencert -ca=ca.pem -ca-key=ca-key.pem \-config=ca-config.json -profile=client client-csr.json | cfssljson -bare client
生成:
client.pem
客户端证书client-key.pem
客户端私钥
客户端证书的 CN(Common Name) 决定它的身份,如:
{ "CN": "demo-client" }
Casbin 就用这个 CN 做权限判断。
四、gRPC 服务端:安全通信与访问控制
服务端代码逻辑如下:
class LogService(log_pb2_grpc.LogServicer):def __init__(self, authorizer):self.store = LogStore()self.authorizer = authorizerdef _get_subject(self, context):# 从客户端证书中提取 CNssl_context = context.auth_context()cert_cn = ssl_context.get("x509_common_name", [b""])[0].decode()return cert_cndef Produce(self, request, context):sub = self._get_subject(context)if not self.authorizer.authorize(sub, "log", "produce"):context.abort(grpc.StatusCode.PERMISSION_DENIED, "permission denied")offset = self.store.append(request.record.value)return log_pb2.ProduceResponse(offset=offset)
🔐 安全配置
creds = grpc.ssl_server_credentials([(private_key, certificate_chain)],root_certificates=root_certs,require_client_auth=True, # 开启双向认证
)
server.add_secure_port("[::]:8443", creds)
五、Casbin 授权模型:谁能干什么
Casbin 使用两个配置文件:
1️⃣ model.conf
[request_definition]
r = sub, obj, act[policy_definition]
p = sub, obj, act[policy_effect]
e = some(where (p.eft == allow))[matchers]
m = r.sub == p.sub && r.obj == p.obj && r.act == p.act
含义:
sub
:主体(客户端 CN)obj
:资源(这里是"log"
)act
:行为(produce
/consume
)
2️⃣ policy.csv
p, demo-client, log, produce
p, demo-client, log, consume
即:
demo-client
能写入日志;demo-client
能读取日志。
六、客户端:携带证书调用
creds = grpc.ssl_channel_credentials(root_certificates=root_certs,private_key=private_key,certificate_chain=certificate_chain,
)
with grpc.secure_channel(addr, creds) as channel:stub = log_pb2_grpc.LogStub(channel)resp = stub.Produce(log_pb2.ProduceRequest(record=log_pb2.Record(value=b"hello")))
运行结果:
✅ Produced offset=0
✅ Consumed value=hello via mtls
如果客户端换了证书(比如 CN 不是 demo-client
),则:
PERMISSION_DENIED: permission denied
七、代码目录与运行
secure_services_py/
├─ server.py
├─ client.py
├─ requirements.txt
├─ Makefile
├─ authz/
│ ├─ authorizer.py
│ ├─ model.conf
│ └─ policy.csv
├─ proto/log.proto
├─ certs/
│ ├─ ca-config.json
│ ├─ ca-csr.json
│ ├─ client-csr.json
│ └─ server-csr.json
启动流程:
make gencert # 生成证书
make proto # 生成 gRPC 代码
python server.py # 启动服务端
python client.py --method produce --value "hello"
八、安全机制串联:从握手到授权
下面的时序图直观展示了从客户端发起请求、通过 TLS 认证、再由 Casbin 权限系统决策放行或拒绝的整个过程。
💬 图解说明
步骤 | 阶段 | 说明 |
---|---|---|
1–4 | TLS 握手阶段 | 客户端和服务器通过 mTLS 验证身份。服务器验证客户端证书是否由同一 CA 签发。 |
5–7 | Casbin 权限判断 | 服务器从客户端证书中提取 CN(Common Name)作为用户标识,调用 enforce(sub, obj, act) 进行权限匹配。 |
8–9 | 业务逻辑阶段 | 若授权成功,执行日志写入;否则返回 PERMISSION_DENIED 。 |
📘 配置文件对应关系
功能 | 文件 | 示例 |
---|---|---|
CA/证书链 | ~/.proglog/ca.pem / server.pem / client.pem | 负责加密与身份验证 |
权限模型 | authz/model.conf | 定义 r=sub,obj,act 逻辑结构 |
权限策略 | authz/policy.csv | 定义谁能访问什么:p, demo-client, log, produce |
gRPC 服务 | server.py / client.py | 实现日志读写操作 |
客户端发起 gRPC 请求 → TLS 验证身份(CN=demo-client) → 服务端使用 Casbin 检查该 CN 是否允许访问指定资源和操作 → 若匹配 policy.csv
中规则 → 执行操作;否则拒绝访问。
安全链条:
- mTLS:确保通信双方身份可信;
- TLS:确保传输层加密;
- Casbin:确保授权严格;
- gRPC:确保结构化高效通信。
九、可扩展方向
方向 | 思路 |
---|---|
持久化日志 | 用数据库代替内存 LogStore |
动态策略管理 | 使用 Casbin Adapter(MySQL / PostgreSQL) |
角色权限模型 (RBAC) | 在 model.conf 增加 [role_definition] 与 g 规则 |
审计日志 | 记录每次访问的 subject、method、结果 |
服务网格集成 | 把该逻辑嵌入 Istio / Linkerd / Envoy Filter |
🔚 总结
通过本文的完整示例,读者可以了解如何用 Python 构建一个:
- 使用 mTLS 保证通信安全;
- 使用 Casbin 管理访问控制;
- 使用 gRPC 提供高效结构化通信接口;
- 并且结构清晰、可扩展、可部署的微服务基础模块。
项目源码:
server.py
import grpc
from concurrent import futures
import log_pb2
import log_pb2_grpc
import ssl
import os
from authz.authorizer import AuthorizerCONFIG_PATH = os.path.expanduser("~/.proglog")
CA_FILE = f"{CONFIG_PATH}/ca.pem"
SERVER_CERT = f"{CONFIG_PATH}/server.pem"
SERVER_KEY = f"{CONFIG_PATH}/server-key.pem"class LogStore:def __init__(self):self.records = []def append(self, value):offset = len(self.records)self.records.append(log_pb2.Record(value=value, offset=offset))return offsetdef read(self, offset):if offset >= len(self.records):raise IndexError("offset out of range")return self.records[offset]class LogService(log_pb2_grpc.LogServicer):def __init__(self, authorizer):self.store = LogStore()self.authorizer = authorizerdef _get_subject(self, context):# 从客户端证书中提取 CNssl_context = context.auth_context()cert_cn = ssl_context.get("x509_common_name", [b""])[0].decode()return cert_cndef Produce(self, request, context):sub = self._get_subject(context)if not self.authorizer.authorize(sub, "log", "produce"):context.abort(grpc.StatusCode.PERMISSION_DENIED, "permission denied")offset = self.store.append(request.record.value)return log_pb2.ProduceResponse(offset=offset)def Consume(self, request, context):sub = self._get_subject(context)if not self.authorizer.authorize(sub, "log", "consume"):context.abort(grpc.StatusCode.PERMISSION_DENIED, "permission denied")try:rec = self.store.read(request.offset)except IndexError:context.abort(grpc.StatusCode.OUT_OF_RANGE, "offset out of range")return log_pb2.ConsumeResponse(record=rec)def serve(port=50051):server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))log_pb2_grpc.add_LogServicer_to_server(LogService(Authorizer("authz/model.conf", "authz/policy.csv")),server,)with open(SERVER_KEY, "rb") as f:private_key = f.read()with open(SERVER_CERT, "rb") as f:certificate_chain = f.read()with open(CA_FILE, "rb") as f:root_certs = f.read()creds = grpc.ssl_server_credentials([(private_key, certificate_chain)],root_certificates=root_certs,require_client_auth=True,)server.add_secure_port(f"[::]:{port}", creds)print(f"✅ Server listening on port {port} with mTLS+ACL")server.start()server.wait_for_termination()if __name__ == "__main__":serve()
client.py
import grpc
import log_pb2
import log_pb2_grpc
import ssl
import os
import argparseCONFIG_PATH = os.path.expanduser("~/.proglog")
CA_FILE = f"{CONFIG_PATH}/ca.pem"
CLIENT_CERT = f"{CONFIG_PATH}/client.pem"
CLIENT_KEY = f"{CONFIG_PATH}/client-key.pem"def run(addr, method, value, offset):with open(CLIENT_KEY, "rb") as f:private_key = f.read()with open(CLIENT_CERT, "rb") as f:certificate_chain = f.read()with open(CA_FILE, "rb") as f:root_certs = f.read()creds = grpc.ssl_channel_credentials(root_certificates=root_certs,private_key=private_key,certificate_chain=certificate_chain,)with grpc.secure_channel(addr, creds) as channel:stub = log_pb2_grpc.LogStub(channel)if method == "produce":resp = stub.Produce(log_pb2.ProduceRequest(record=log_pb2.Record(value=value.encode())))print(f"✅ Produced offset={resp.offset}")elif method == "consume":resp = stub.Consume(log_pb2.ConsumeRequest(offset=offset))print(f"✅ Consumed value={resp.record.value.decode()}")else:print("Unknown method")if __name__ == "__main__":parser = argparse.ArgumentParser()parser.add_argument("--addr", default="localhost:50051")parser.add_argument("--method", default="produce")parser.add_argument("--value", default="hello via mtls")parser.add_argument("--offset", type=int, default=0)args = parser.parse_args()run(args.addr, args.method, args.value, args.offset)
certs/ca-config.json
{"signing": {"default": {"expiry": "87600h"},"profiles": {"server": {"usages": ["signing","key encipherment","server auth"],"expiry": "87600h"},"client": {"usages": ["signing","key encipherment","client auth"],"expiry": "87600h"}}}
}
certs/ca-csr.json
{"CN": "Demo Root CA","key": {"algo": "rsa","size": 2048},"names": [{"C": "CN","L": "Beijing","ST": "Beijing","O": "DemoOrg","OU": "CA"}]
}
certs/client-csr.json
{"CN": "another-client","key": {"algo": "rsa","size": 2048},"names": [{"C": "CN","L": "Beijing","ST": "Beijing","O": "DemoOrg","OU": "Client"}]
}
certs/server-csr.json
{"CN": "127.0.0.1","hosts": ["127.0.0.1","localhost"],"key": {"algo": "rsa","size": 2048},"names": [{"C": "CN","L": "Beijing","ST": "Beijing","O": "DemoOrg","OU": "Server"}]
}
Makefile
SHELL := /bin/bash
CONFIG_PATH ?= $(HOME)/.progloginit:mkdir -p $(CONFIG_PATH)gencert: initcd certs && \cfssl gencert -initca ca-csr.json | cfssljson -bare ca && \cfssl gencert -ca=ca.pem -ca-key=ca-key.pem -config=ca-config.json -profile=server server-csr.json | cfssljson -bare server && \cfssl gencert -ca=ca.pem -ca-key=ca-key.pem -config=ca-config.json -profile=client client-csr.json | cfssljson -bare client && \mv *.pem *.csr $(CONFIG_PATH)proto:python -m grpc_tools.protoc -I proto --python_out=. --grpc_python_out=. proto/log.proto
proto/log.proto
syntax = "proto3";
package api.v1;service Log {rpc Produce(ProduceRequest) returns (ProduceResponse);rpc Consume(ConsumeRequest) returns (ConsumeResponse);
}message Record {bytes value = 1;uint64 offset = 2;
}message ProduceRequest { Record record = 1; }
message ProduceResponse { uint64 offset = 1; }message ConsumeRequest { uint64 offset = 1; }
message ConsumeResponse { Record record = 1; }
authz/authorizer.py
import casbinclass Authorizer:def __init__(self, model_file, policy_file):self.enforcer = casbin.Enforcer(model_file, policy_file)def authorize(self, sub, obj, act):return self.enforcer.enforce(sub, obj, act)
authz/model.conf
[request_definition]
r = sub, obj, act[policy_definition]
p = sub, obj, act[policy_effect]
e = some(where (p.eft == allow))[matchers]
m = r.sub == p.sub && r.obj == p.obj && r.act == p.act
authz/policy.csv
p, demo-client, log, produce
p, demo-client, log, consume
p, another-client, log, consume
p, another-client, log, produce