医疗AI时代的生物医学Go编程:高性能计算与精准医疗的案例分析(三)
3.3.3 状态管理与检查点(State Manager & Checkpointing)
StateManager负责维护流水线全局状态,处理任务结果,触发新任务调度,并实现检查点机制。
// state_manager.go
package govarpipeimport ("encoding/json""fmt""os""sync""time"
)type StateManager struct {pipelineState *PipelineStatemu sync.RWMutex // 保护pipelineState的并发访问scheduler *SchedulercheckpointFile stringcheckpointInterval time.Duration
}type PipelineState struct {Samples map[string]*SampleState `json:"samples"` // SampleID -> SampleStateTasks map[string]*Task `json:"tasks"` // TaskID -> Task// 其他全局状态如开始时间、结束时间等
}type SampleState struct {ID string `json:"id"`Status string `json:"status"` // e.g., "Running", "Completed", "Failed"TaskIDs []string `json:"task_ids"` // 该样本包含的所有任务ID
}func NewStateManager(scheduler *Scheduler, checkpointFile string, interval time.Duration) *StateManager {return &StateManager{pipelineState: &PipelineState{Samples: make(map[string]*SampleState),Tasks: make(map[string]*Task),},scheduler: scheduler,checkpointFile: checkpointFile,checkpointInterval: interval,}
}// AddTask 添加新任务到状态管理器 (通常由Controller在初始化时调用)
func (sm *StateManager) AddTask(task *Task) {sm.mu.Lock()defer sm.mu.Unlock()sm.pipelineState.Tasks[task.ID] = task
}// UpdateTaskStatus 更新任务状态 (由Worker或Result Collector触发)
func (sm *StateManager) UpdateTaskStatus(taskID string, status TaskStatus, errorMsg string) {sm.mu.Lock()defer sm.mu.Unlock()task, exists := sm.pipelineState.Tasks[taskID]if !exists {fmt.Printf("Warning: Task %s not found in state\n", taskID)return}task.Status = statusif status == StatusFailed && errorMsg != "" {// 记录错误信息到任务结构体或日志fmt.Printf("Task %s failed: %s\n", taskID, errorMsg)}// 如果任务成功,检查并触发依赖它的任务if status == StatusSuccess {sm.triggerDependentTasks(taskID)}
}// ProcessTaskResult 处理Worker返回的任务结果
func (sm *StateManager) ProcessTaskResult(result *TaskResult) {if result.Status == StatusSuccess {sm.UpdateTaskStatus(result.TaskID, StatusSuccess, "")// 更新任务输出文件路径等信息 (如果需要)} else {sm.UpdateTaskStatus(result.TaskID, StatusFailed, result.Error.Error())// 处理重试逻辑 (如果配置允许)task := sm.GetTask(result.TaskID)if task != nil && task.RetryCount < Max