MCoT在医疗AI工程化编程的实践手册(中)

2.4 核心代码实现
A. 定义事件(使用Avro/Protobuf Schema)
// inference_events.proto
syntax = "proto3";message InferenceRequest {string request_id = 1;string patient_id_hash = 2;repeated DataSource data_sources = 3;string workflow_definition_id = 4; // e.g., "dr_diagnosis_v1"
}message TaskCompleted {string task_id = 1;string request_id = 2;string worker_id = 3;string stage_name = 4; // "localization"string artifact_uri = 5; // s3://evidence-bucket/loc/123.jsonbool success = 6;string error_message = 7;string model_version = 8;
}message ValidationResult {string task_id = 1;string request_id = 2;bool passed = 3;map<string, double> metrics = 4; // e.g., {"dice_score": 0.82}string decision = 5; // "PROCEED", "FALLBACK", "RETRY"
}
B. Worker示例(C++ + ONNX Runtime)
// localization_worker.cpp
#include <onnxruntime_cxx_api.h>
#include <opencv2/opencv.hpp>
#include <kafkacpp/kafka_producer.h> // A hypothetical Kafka C++ libraryvoid run_localization_task(const std::string& task_json) {// 1. Parse task, download image from S3/MinIOTask task = parse_task(task_json);cv::Mat image = download_image(task.get_input_image_uri());// 2. Preprocesscv::Mat resized_img;cv::resize(image, resized_img, cv::Size(224, 224));// ... other preprocessing steps// 3. Run ONNX ModelOrt::Env env(ORT_LOGGING_LEVEL_WARNING, "LocalizationWorker");Ort::Session session(env, L"localization_model_v1.onnx", Ort::SessionOptions{});// ... create input tensor, run session, get output tensor ...// 4. Postprocess results (e.g., NMS, convert to bbox)LocalizationResult result = postprocess_bboxes(output_tensor);// 5. Save intermediate artifactstd::string artifact_uri = save_artifact_to_s3(result.to_json(), task.request_id);// 6. Publish TaskCompleted eventTaskCompletedEvent event;event.set_task_id(task.id);event.set_request_id(task.request_id);event.set_stage_name("localization");event.set_artifact_uri(artifact_uri);event.set_success(true);event.set_model_version("localization_model_v1.onnx");KafkaProducer producer("task_completed_topic");producer.produce(event.serialize());
}
C. 验证网关
# validation_gateway.py
from scipy.spatial.distance import dice
import numpy as npdef validate_localization(task_completed_event: TaskCompleted):"""Validates a localization task by comparing against a ground truthif available, or using other heuristics."""ground_truth_uri = get_ground_truth_uri(task_completed_event.request_id)