harbor-从源码理解镜像清理的逻辑实现
harbor-从源码理解镜像清理的逻辑实现
一、背景:
最近公司内的harbor镜像发生了较大故障,凌晨某时刻开始postgresql大量连接把数据库直接打死了,进而很多用户反馈拉不了镜像等问题,所以需要对harbor的镜像清理逻辑做代码排查。
二、架构:
这里需要提及一下harbor的整体架构,harbor主体模块包括harbor-nginx, harbor-core, harbor-jobservice, harbor-chartmuseum,harbor-portal,harbor-trivy,registry , 分别负责 nignx代理,核心控制服务,job引擎服务,chart仓库服务,portal页面服务以及 trivy漏洞检测服务以及registry镜像仓库服务,实际上harbor是一个服务框架集合。

三、清理实现:
3.1 添加策略
当用户在页面自己的项目下面新建策略之后,以及计划任务之后,便有了该项目的清理任务。

3.2 jobservice 巡检
3.2.1 bootstrap启动
先看jobservice的启动main函数, main中调用了bootstrap 的LoadAndRun加载引导程序,调用函数LoadAndRun
#harbor/src/jobservice/main.go
func main() {
......
// Startif err := runtime.JobService.LoadAndRun(ctx, cancel); err != nil {logger.Fatal(err)}
}
3.2.2 注册job worker
在bootstrap的LoadAndRun中,因为我们使用的backend是redis,所以通过loadAndRunRedisWorkerPool 运行了redis backend worker。
loadAndRunRedisWorkerPool中注册并启动了redis worker。
#harbor/src/jobservice/runtime/bootstrap.go
func (bs *Bootstrap) LoadAndRun(ctx context.Context, cancel context.CancelFunc) (err error) {
......if cfg.PoolConfig.Backend == config.JobServicePoolBackendRedis {......// Start the backend workerbackendWorker, err = bs.loadAndRunRedisWorkerPool(rootContext,namespace,workerNum,redisPool,lcmCtl,)if err != nil {return errors.Errorf("load and run worker error: %s", err)}}}
func (bs *Bootstrap) loadAndRunRedisWorkerPool(ctx *env.Context,ns string,workers uint,redisPool *redis.Pool,lcmCtl lcm.Controller,
) (worker.Interface, error) {redisWorker := cworker.NewWorker(ctx, ns, workers, redisPool, lcmCtl)workerPoolID = redisWorker.GetPoolID()// Register jobs hereif err := redisWorker.RegisterJobs(map[string]interface{}{// Only for debugging and testing purposejob.SampleJob: (*sample.Job)(nil),// Functional jobsjob.ImageScanJob: (*scan.Job)(nil),job.GarbageCollection: (*gc.GarbageCollector)(nil),job.Replication: (*replication.Replication)(nil),job.Retention: (*retention.Job)(nil),scheduler.JobNameScheduler: (*scheduler.PeriodicJob)(nil),job.WebhookJob: (*notification.WebhookJob)(nil),job.SlackJob: (*notification.SlackJob)(nil),job.P2PPreheat: (*preheat.Job)(nil),// In v2.2 we migrate the scheduled replication, garbage collection and scan all to// the scheduler mechanism, the following three jobs are kept for the legacy jobs// and they can be removed after several releases"IMAGE_REPLICATE": (*legacy.ReplicationScheduler)(nil),"IMAGE_GC": (*legacy.GarbageCollectionScheduler)(nil),"IMAGE_SCAN_ALL": (*legacy.ScanAllScheduler)(nil),}); err != nil {// exitreturn nil, err}if err := redisWorker.Start(); err != nil {return nil, err}return redisWorker, nil
}
启动前 注册了job worker , 其中在RegisterJobs时registerJobs将 将不同的job分别注册并 将该job 运行并放到 gocraft 的worker pool中,所以这里retention 清理镜像的守护 job 便通过注册load 启动了。
#harbor/src/jobservice/runtime/bootstrap.go
// Load and run the worker worker
func (bs *Bootstrap) loadAndRunRedisWorkerPool(ctx *env.Context,ns string,workers uint,redisPool *redis.Pool,lcmCtl lcm.Controller,
) (worker.Interface, error) {redisWorker := cworker.NewWorker(ctx, ns, workers, redisPool, lcmCtl)workerPoolID = redisWorker.GetPoolID()// Register jobs hereif err := redisWorker.RegisterJobs(......job.Retention: (*retention.Job)(nil),......}if err := redisWorker.Start(); err != nil {return nil, err}return redisWorker, nil
}
#harbor/src/jobservice/worker/cworker/c_worker.go
// RegisterJobs is used to register multiple jobs to worker.
func (w *basicWorker) RegisterJobs(jobs map[string]interface{}) error {if jobs == nil || len(jobs) == 0 {// Do nothingreturn nil}for name, j := range jobs {if err := w.registerJob(name, j); err != nil {return err}}return nil
}
func (w *basicWorker) registerJob(name string, j interface{}) (err error) {......// Put into the poolw.pool.JobWithOptions(name,work.JobOptions{MaxFails: theJ.MaxFails(),MaxConcurrency: theJ.MaxCurrency(),Priority: job.Priority().For(name),SkipDead: true,},// Use generic handler to handle as we do not accept context with this way.func(job *work.Job) error {return redisJob.Run(job)},)......
}
3.2.3 scheduler 启动
basicWorker 启动中通过 w.scheduler.Start() 启动了周期性的scheduler调度器
#harbor/src/jobservice/worker/cworker/c_worker.gofunc (w *basicWorker) Start() error {......// Start the periodic schedulerw.scheduler.Start()......
}
basicScheduler start时直接启动了
#barbor/src/jobservice/period/basic_scheduler.go
// Start the periodic scheduling process
func (bs *basicScheduler) Start() {// Run once clean// Try best to dogo bs.clearDirtyJobs()// start enqueuerbs.enqueuer.start()
}
enqueuer 启动了loop for死循环,enqueuer 会循环的 checkAndEnqueue,check到获得锁之后,执行enqueue()。
enqueue中调用Load 函数,加载了所有的policy以及schedule数据
#harbor/src/jobservice/period/enqueuer.go
// Blocking call
func (e *enqueuer) start() {go e.loop()logger.Info("Scheduler: periodic enqueuer is started")
}func (e *enqueuer) loop() {defer func() {logger.Info("Scheduler: periodic enqueuer is stopped")}()// Do enqueue immediately when startingisHit := e.checkAndEnqueue()// Begin reaping periodicallytimer := time.NewTimer(e.nextTurn(isHit, e.lastEnqueueErr != nil))defer timer.Stop()for {select {case <-e.context.Done():return // exitcase <-timer.C:// Check and enqueue.// Set next turn with lower priority to balance workload with long// round time if it hits.isHit = e.checkAndEnqueue()timer.Reset(e.nextTurn(isHit, e.lastEnqueueErr != nil))}}
}func (e *enqueuer) enqueue() {conn := e.pool.Get()defer func() {_ = conn.Close()}()// Reset error tracke.lastEnqueueErr = nil// Load policies and schedule next jobs for thempls, err := Load(e.namespace, conn)if err != nil {// Log errorlogger.Errorf("%s:%s", err, "enqueue error: enqueuer")return}for _, p := range pls {e.scheduleNextJobs(p, conn)}
}
Load函数中从redis中查询了其namespace下的所有数据并反序列化成policy结构返回
#harbor/src/jobservice/period/policy_store.go
// Load all the policies from the backend storage.
func Load(namespace string, conn redis.Conn) ([]*Policy, error) {bytes, err := redis.Values(conn.Do("ZRANGE", rds.KeyPeriodicPolicy(namespace), 0, -1, "WITHSCORES"))if err != nil {return nil, err}......return policies, nil
}
enqueue 再把所有policy for循环调度,解析schedule表的cron字段为cron结构,解析完成后 ,通过计算下次的执行时间点,再未来4分分钟内达到调度时间的 policy,创建当前时间点的 execution ,并通过ZADD 添加到redis的队列中。
#harbor/src/jobservice/period/enqueuer.go
// scheduleNextJobs schedules job for next time slots based on the policy
func (e *enqueuer) scheduleNextJobs(p *Policy, conn redis.Conn) {// Follow UTC time specnowTime := time.Unix(time.Now().UTC().Unix(), 0).UTC()horizon := nowTime.Add(enqueuerHorizon)#解析数据库schedule表cron字段schedule, err := cron.Parse(p.CronSpec)if err != nil {// The cron spec should be already checked at upper layers.// Just in cases, if error occurred, ignore ite.lastEnqueueErr = errlogger.Errorf("Invalid corn spec in periodic policy %s %s: %s", p.JobName, p.ID, err)} else {#判断下次的执行时间点for t := schedule.Next(nowTime); t.Before(horizon); t = schedule.Next(t) {epoch := t.Unix()......execution := e.createExecution(p, epoch)......// Put job to the scheduled job queue_, err = conn.Do("ZADD", rds.RedisKeyScheduled(e.namespace), epoch, rawJSON)......logger.Debugf("Scheduled execution for periodic job %s:%s at %d", j.Name, p.ID, epoch)}}
}
到此,jobservice完成了守护检查工作,已经在后台loop 添加需要待处理的任务。
3.3 jobservice 处理
gocraft 框架接收到 redis job后开始执行对应的 Run函数,执行 runningJob.Run
#harbor/src/jobservice/runner/redis.go
// Run the job
func (rj *RedisJob) Run(j *work.Job) (err error) {......// Do operation based on the job statusjStatus := job.Status(tracker.Job().Info.Status)switch jStatus {case job.PendingStatus, job.ScheduledStatus:// do nothing nowbreakcase job.StoppedStatus:// Probably jobs has been stopped by directly mark status to stopped.// Directly exit and no retryreturn nilcase job.RunningStatus, job.ErrorStatus:// The failed jobs can be put into retry queue and the in progress jobs may be// interrupted by a sudden service crash event, all those jobs can be rescheduled.// Reset job info.if err = tracker.Reset(); err != nil {// Log error and return the original error if existingtracelib.RecordError(span, err, "reset job failed")err = errors.Wrap(err, fmt.Sprintf("retrying %s job %s:%s failed", jStatus.String(), j.Name, j.ID))if len(j.LastErr) > 0 {err = errors.Wrap(err, j.LastErr)}return}logger.Infof("Retrying job %s:%s, revision: %d", j.Name, j.ID, tracker.Job().Info.Revision)breakcase job.SuccessStatus:// do nothingreturn nildefault:err = errors.Errorf("mismatch status for running job: expected %s/%s but got %s", job.PendingStatus, job.ScheduledStatus, jStatus.String())tracelib.RecordError(span, err, "status mismatch")return err}// Build job contextif execContext, err = rj.context.JobContext.Build(tracker); err != nil {return}......// Run the joberr = runningJob.Run(execContext, j.Args)......
}
如果是retention 的job,则运行对应的Run方法, 这里会对repo 的artifacts pull/push时间做判断,最终候选出需要操作的列表。
#harbor/src/pkg/retention/job.go
// Run the job
func (pj *Job) Run(ctx job.Context, params job.Parameters) error {// logger for loggingmyLogger := ctx.GetLogger()// Parameters have been validated, ignore error checkingrepo, _ := getParamRepo(params)liteMeta, _ := getParamMeta(params)isDryRun, _ := getParamDryRun(params)// Log stage: startrepoPath := fmt.Sprintf("%s/%s", repo.Namespace, repo.Name)myLogger.Infof("Run retention process.\n Repository: %s \n Rule Algorithm: %s \n Dry Run: %v", repoPath, liteMeta.Algorithm, isDryRun)// Stop check point 1:if isStopped(ctx) {logStop(myLogger)return nil}// Retrieve all the candidates under the specified repositoryallCandidates, err := dep.DefaultClient.GetCandidates(repo)if err != nil {return logError(myLogger, err)}// Log stage: load candidatesmyLogger.Infof("Load %d candidates from repository %s", len(allCandidates), repoPath)// Build the processorbuilder := policy.NewBuilder(allCandidates)processor, err := builder.Build(liteMeta, isDryRun)if err != nil {return logError(myLogger, err)}// Stop check point 2:if isStopped(ctx) {logStop(myLogger)return nil}// Run the flowresults, err := processor.Process(ctx.SystemContext(), allCandidates)if err != nil {return logError(myLogger, err)}// Log stage: results with table viewlogResults(myLogger, allCandidates, results)// Save retain and total num in DBreturn saveRetainNum(ctx, results, allCandidates, isDryRun)
}
最终会筛选出要删除的会调用harbor-core清理
// DeleteRepository deletes the specified repository
func (bc *basicClient) DeleteRepository(repo *selector.Repository) error {if repo == nil {return errors.New("repository is nil")}switch repo.Kind {case selector.Image:return bc.coreClient.DeleteArtifactRepository(repo.Namespace, repo.Name)/*case art.Chart:return bc.coreClient.DeleteChartRepository(repo.Namespace, repo.Name)*/default:return fmt.Errorf("unsupported repository kind: %s", repo.Kind)}
}// Deletes the specified candidate
func (bc *basicClient) Delete(candidate *selector.Candidate) error {if candidate == nil {return errors.New("candidate is nil")}switch candidate.Kind {case selector.Image:return bc.coreClient.DeleteArtifact(candidate.Namespace, candidate.Repository, candidate.Digest)/*case art.Chart:return bc.coreClient.DeleteChart(candidate.Namespace, candidate.Repository, candidate.Tag)*/default:return fmt.Errorf("unsupported candidate kind: %s", candidate.Kind)}
}
至此,jobservice的整个处理流程完成了。
参考:
https://github.com/gocraft/work
