当前位置: 首页 > news >正文

harbor-从源码理解镜像清理的逻辑实现

harbor-从源码理解镜像清理的逻辑实现

一、背景:

最近公司内的harbor镜像发生了较大故障,凌晨某时刻开始postgresql大量连接把数据库直接打死了,进而很多用户反馈拉不了镜像等问题,所以需要对harbor的镜像清理逻辑做代码排查。

二、架构:

这里需要提及一下harbor的整体架构,harbor主体模块包括harbor-nginx, harbor-core, harbor-jobservice, harbor-chartmuseum,harbor-portal,harbor-trivy,registry , 分别负责 nignx代理,核心控制服务,job引擎服务,chart仓库服务,portal页面服务以及 trivy漏洞检测服务以及registry镜像仓库服务,实际上harbor是一个服务框架集合。

在这里插入图片描述

三、清理实现:

3.1 添加策略

当用户在页面自己的项目下面新建策略之后,以及计划任务之后,便有了该项目的清理任务。

在这里插入图片描述

3.2 jobservice 巡检
3.2.1 bootstrap启动

先看jobservice的启动main函数, main中调用了bootstrap 的LoadAndRun加载引导程序,调用函数LoadAndRun

#harbor/src/jobservice/main.go
func main() {
......
// Startif err := runtime.JobService.LoadAndRun(ctx, cancel); err != nil {logger.Fatal(err)}
}
3.2.2 注册job worker

在bootstrap的LoadAndRun中,因为我们使用的backend是redis,所以通过loadAndRunRedisWorkerPool 运行了redis backend worker。

loadAndRunRedisWorkerPool中注册并启动了redis worker。

#harbor/src/jobservice/runtime/bootstrap.go
func (bs *Bootstrap) LoadAndRun(ctx context.Context, cancel context.CancelFunc) (err error) {
......if cfg.PoolConfig.Backend == config.JobServicePoolBackendRedis {......// Start the backend workerbackendWorker, err = bs.loadAndRunRedisWorkerPool(rootContext,namespace,workerNum,redisPool,lcmCtl,)if err != nil {return errors.Errorf("load and run worker error: %s", err)}}}
func (bs *Bootstrap) loadAndRunRedisWorkerPool(ctx *env.Context,ns string,workers uint,redisPool *redis.Pool,lcmCtl lcm.Controller,
) (worker.Interface, error) {redisWorker := cworker.NewWorker(ctx, ns, workers, redisPool, lcmCtl)workerPoolID = redisWorker.GetPoolID()// Register jobs hereif err := redisWorker.RegisterJobs(map[string]interface{}{// Only for debugging and testing purposejob.SampleJob: (*sample.Job)(nil),// Functional jobsjob.ImageScanJob:           (*scan.Job)(nil),job.GarbageCollection:      (*gc.GarbageCollector)(nil),job.Replication:            (*replication.Replication)(nil),job.Retention:              (*retention.Job)(nil),scheduler.JobNameScheduler: (*scheduler.PeriodicJob)(nil),job.WebhookJob:             (*notification.WebhookJob)(nil),job.SlackJob:               (*notification.SlackJob)(nil),job.P2PPreheat:             (*preheat.Job)(nil),// In v2.2 we migrate the scheduled replication, garbage collection and scan all to// the scheduler mechanism, the following three jobs are kept for the legacy jobs// and they can be removed after several releases"IMAGE_REPLICATE": (*legacy.ReplicationScheduler)(nil),"IMAGE_GC":        (*legacy.GarbageCollectionScheduler)(nil),"IMAGE_SCAN_ALL":  (*legacy.ScanAllScheduler)(nil),}); err != nil {// exitreturn nil, err}if err := redisWorker.Start(); err != nil {return nil, err}return redisWorker, nil
}

启动前 注册了job worker , 其中在RegisterJobs时registerJobs将 将不同的job分别注册并 将该job 运行并放到 gocraft 的worker pool中,所以这里retention 清理镜像的守护 job 便通过注册load 启动了。

#harbor/src/jobservice/runtime/bootstrap.go
// Load and run the worker worker
func (bs *Bootstrap) loadAndRunRedisWorkerPool(ctx *env.Context,ns string,workers uint,redisPool *redis.Pool,lcmCtl lcm.Controller,
) (worker.Interface, error) {redisWorker := cworker.NewWorker(ctx, ns, workers, redisPool, lcmCtl)workerPoolID = redisWorker.GetPoolID()// Register jobs hereif err := redisWorker.RegisterJobs(......job.Retention:              (*retention.Job)(nil),......}if err := redisWorker.Start(); err != nil {return nil, err}return redisWorker, nil
}
#harbor/src/jobservice/worker/cworker/c_worker.go
// RegisterJobs is used to register multiple jobs to worker.
func (w *basicWorker) RegisterJobs(jobs map[string]interface{}) error {if jobs == nil || len(jobs) == 0 {// Do nothingreturn nil}for name, j := range jobs {if err := w.registerJob(name, j); err != nil {return err}}return nil
}
func (w *basicWorker) registerJob(name string, j interface{}) (err error) {......// Put into the poolw.pool.JobWithOptions(name,work.JobOptions{MaxFails:       theJ.MaxFails(),MaxConcurrency: theJ.MaxCurrency(),Priority:       job.Priority().For(name),SkipDead:       true,},// Use generic handler to handle as we do not accept context with this way.func(job *work.Job) error {return redisJob.Run(job)},)......
}
3.2.3 scheduler 启动

basicWorker 启动中通过 w.scheduler.Start() 启动了周期性的scheduler调度器

#harbor/src/jobservice/worker/cworker/c_worker.gofunc (w *basicWorker) Start() error {......// Start the periodic schedulerw.scheduler.Start()......
}

basicScheduler start时直接启动了

#barbor/src/jobservice/period/basic_scheduler.go
// Start the periodic scheduling process
func (bs *basicScheduler) Start() {// Run once clean// Try best to dogo bs.clearDirtyJobs()// start enqueuerbs.enqueuer.start()
}

enqueuer 启动了loop for死循环,enqueuer 会循环的 checkAndEnqueue,check到获得锁之后,执行enqueue()。

enqueue中调用Load 函数,加载了所有的policy以及schedule数据

#harbor/src/jobservice/period/enqueuer.go
// Blocking call
func (e *enqueuer) start() {go e.loop()logger.Info("Scheduler: periodic enqueuer is started")
}func (e *enqueuer) loop() {defer func() {logger.Info("Scheduler: periodic enqueuer is stopped")}()// Do enqueue immediately when startingisHit := e.checkAndEnqueue()// Begin reaping periodicallytimer := time.NewTimer(e.nextTurn(isHit, e.lastEnqueueErr != nil))defer timer.Stop()for {select {case <-e.context.Done():return // exitcase <-timer.C:// Check and enqueue.// Set next turn with lower priority to balance workload with long// round time if it hits.isHit = e.checkAndEnqueue()timer.Reset(e.nextTurn(isHit, e.lastEnqueueErr != nil))}}
}func (e *enqueuer) enqueue() {conn := e.pool.Get()defer func() {_ = conn.Close()}()// Reset error tracke.lastEnqueueErr = nil// Load policies and schedule next jobs for thempls, err := Load(e.namespace, conn)if err != nil {// Log errorlogger.Errorf("%s:%s", err, "enqueue error: enqueuer")return}for _, p := range pls {e.scheduleNextJobs(p, conn)}
}

Load函数中从redis中查询了其namespace下的所有数据并反序列化成policy结构返回

#harbor/src/jobservice/period/policy_store.go
// Load all the policies from the backend storage.
func Load(namespace string, conn redis.Conn) ([]*Policy, error) {bytes, err := redis.Values(conn.Do("ZRANGE", rds.KeyPeriodicPolicy(namespace), 0, -1, "WITHSCORES"))if err != nil {return nil, err}......return policies, nil
}

enqueue 再把所有policy for循环调度,解析schedule表的cron字段为cron结构,解析完成后 ,通过计算下次的执行时间点,再未来4分分钟内达到调度时间的 policy,创建当前时间点的 execution ,并通过ZADD 添加到redis的队列中。

#harbor/src/jobservice/period/enqueuer.go
// scheduleNextJobs schedules job for next time slots based on the policy
func (e *enqueuer) scheduleNextJobs(p *Policy, conn redis.Conn) {// Follow UTC time specnowTime := time.Unix(time.Now().UTC().Unix(), 0).UTC()horizon := nowTime.Add(enqueuerHorizon)#解析数据库schedule表cron字段schedule, err := cron.Parse(p.CronSpec)if err != nil {// The cron spec should be already checked at upper layers.// Just in cases, if error occurred, ignore ite.lastEnqueueErr = errlogger.Errorf("Invalid corn spec in periodic policy %s %s: %s", p.JobName, p.ID, err)} else {#判断下次的执行时间点for t := schedule.Next(nowTime); t.Before(horizon); t = schedule.Next(t) {epoch := t.Unix()......execution := e.createExecution(p, epoch)......// Put job to the scheduled job queue_, err = conn.Do("ZADD", rds.RedisKeyScheduled(e.namespace), epoch, rawJSON)......logger.Debugf("Scheduled execution for periodic job %s:%s at %d", j.Name, p.ID, epoch)}}
}

到此,jobservice完成了守护检查工作,已经在后台loop 添加需要待处理的任务。

3.3 jobservice 处理

gocraft 框架接收到 redis job后开始执行对应的 Run函数,执行 runningJob.Run

#harbor/src/jobservice/runner/redis.go
// Run the job
func (rj *RedisJob) Run(j *work.Job) (err error) {......// Do operation based on the job statusjStatus := job.Status(tracker.Job().Info.Status)switch jStatus {case job.PendingStatus, job.ScheduledStatus:// do nothing nowbreakcase job.StoppedStatus:// Probably jobs has been stopped by directly mark status to stopped.// Directly exit and no retryreturn nilcase job.RunningStatus, job.ErrorStatus:// The failed jobs can be put into retry queue and the in progress jobs may be// interrupted by a sudden service crash event, all those jobs can be rescheduled.// Reset job info.if err = tracker.Reset(); err != nil {// Log error and return the original error if existingtracelib.RecordError(span, err, "reset job failed")err = errors.Wrap(err, fmt.Sprintf("retrying %s job %s:%s failed", jStatus.String(), j.Name, j.ID))if len(j.LastErr) > 0 {err = errors.Wrap(err, j.LastErr)}return}logger.Infof("Retrying job %s:%s, revision: %d", j.Name, j.ID, tracker.Job().Info.Revision)breakcase job.SuccessStatus:// do nothingreturn nildefault:err = errors.Errorf("mismatch status for running job: expected %s/%s but got %s", job.PendingStatus, job.ScheduledStatus, jStatus.String())tracelib.RecordError(span, err, "status mismatch")return err}// Build job contextif execContext, err = rj.context.JobContext.Build(tracker); err != nil {return}......// Run the joberr = runningJob.Run(execContext, j.Args)......
}

如果是retention 的job,则运行对应的Run方法, 这里会对repo 的artifacts pull/push时间做判断,最终候选出需要操作的列表。

#harbor/src/pkg/retention/job.go
// Run the job
func (pj *Job) Run(ctx job.Context, params job.Parameters) error {// logger for loggingmyLogger := ctx.GetLogger()// Parameters have been validated, ignore error checkingrepo, _ := getParamRepo(params)liteMeta, _ := getParamMeta(params)isDryRun, _ := getParamDryRun(params)// Log stage: startrepoPath := fmt.Sprintf("%s/%s", repo.Namespace, repo.Name)myLogger.Infof("Run retention process.\n Repository: %s \n Rule Algorithm: %s \n Dry Run: %v", repoPath, liteMeta.Algorithm, isDryRun)// Stop check point 1:if isStopped(ctx) {logStop(myLogger)return nil}// Retrieve all the candidates under the specified repositoryallCandidates, err := dep.DefaultClient.GetCandidates(repo)if err != nil {return logError(myLogger, err)}// Log stage: load candidatesmyLogger.Infof("Load %d candidates from repository %s", len(allCandidates), repoPath)// Build the processorbuilder := policy.NewBuilder(allCandidates)processor, err := builder.Build(liteMeta, isDryRun)if err != nil {return logError(myLogger, err)}// Stop check point 2:if isStopped(ctx) {logStop(myLogger)return nil}// Run the flowresults, err := processor.Process(ctx.SystemContext(), allCandidates)if err != nil {return logError(myLogger, err)}// Log stage: results with table viewlogResults(myLogger, allCandidates, results)// Save retain and total num in DBreturn saveRetainNum(ctx, results, allCandidates, isDryRun)
}

最终会筛选出要删除的会调用harbor-core清理

// DeleteRepository deletes the specified repository
func (bc *basicClient) DeleteRepository(repo *selector.Repository) error {if repo == nil {return errors.New("repository is nil")}switch repo.Kind {case selector.Image:return bc.coreClient.DeleteArtifactRepository(repo.Namespace, repo.Name)/*case art.Chart:return bc.coreClient.DeleteChartRepository(repo.Namespace, repo.Name)*/default:return fmt.Errorf("unsupported repository kind: %s", repo.Kind)}
}// Deletes the specified candidate
func (bc *basicClient) Delete(candidate *selector.Candidate) error {if candidate == nil {return errors.New("candidate is nil")}switch candidate.Kind {case selector.Image:return bc.coreClient.DeleteArtifact(candidate.Namespace, candidate.Repository, candidate.Digest)/*case art.Chart:return bc.coreClient.DeleteChart(candidate.Namespace, candidate.Repository, candidate.Tag)*/default:return fmt.Errorf("unsupported candidate kind: %s", candidate.Kind)}
}

至此,jobservice的整个处理流程完成了。

参考:

https://github.com/gocraft/work

http://www.dtcms.com/a/553173.html

相关文章:

  • 为什么安装epel-release
  • Apache Maven 项目的开发指南
  • NET系列算法
  • 基于可视化天气系统demo,基于python+ matplotlib+request爬虫,开发语言python,数据库无,10个可视化界面,需要的可以了联系。
  • 被网站开发公司坑湖北网站设计
  • 可视化视角:AI + 实时流 + 可访问性时代的 3 大改变
  • Rust `std::iter` 深度解析:`Iterator` Trait、适配器与性能
  • MacOS学习笔记
  • 搭建网站程序网站域名和服务器到期
  • 从零开发一款实用插件,掌握VSCode扩展生态核心技术
  • mapbox高阶,使用自定义图层实现雷达扫描效果
  • 上海网站空间租用WordPress渗透思路
  • 邦邦汽服x优湃能源汽车零部件绿色循环中心揭牌暨中保智修新能源技术中心授牌仪式圆满举行
  • 蓝牙钥匙 第30次 蓝牙钥匙在汽车共享与分时租赁场景中的技术创新与实践
  • 百度AI眼镜Pro预售启幕,Snap/微美全息AR眼镜技术领跑掌握市场主动权
  • 阿里通义千问推理优化上下文缓存之隐式缓存和显式缓存
  • 南宁网站建设产品介绍做效果图挣钱的网站
  • 【Linux系统编程】调试器-gdb/cgdb
  • 【JUnit实战3_20】第十一章:用 Gradle 运行 JUnit 测试实战
  • TouchDIVER Pro 触觉手套:Weart把火星岩石触感、手术操作感搬进 XR
  • 极不均匀电场的强垂直分量和弱垂直分量
  • 直播卡顿?会议割裂?视频直播点播平台EasyDSS全新升级,一平台终结音视频“老大难”!
  • Dotnet使用System.Xml.Serialization处理Xml序列化
  • 【JUnit实战3_19】第十章:用 Maven 3 运行 JUnit 测试(下)
  • wordpress 禁止过滤张家口seo
  • 网站建设的流程该怎么确定自己怎么设计logo制作
  • 3.游戏逆向-pxxx-对照UE源码和IDA分析GName偏移(ida中calloff开头地址的说明)
  • AR智能巡检:开启工业运维的“透视眼”
  • PhotoQt,一款轻量级图片浏览器
  • 什么是一级boot和二级boot