当前位置: 首页 > news >正文

【HUSTOJ 判题机源码解读系列03】judge.cc 源码详细注释

【HUSTOJ 判题机源码解读系列03】judge.cc 源码详细注释

本文是 HUSTOJ 中一个比较重要的模块——judge.cc 源代码文件全文注释版本,部分直观的代码不会注释。

做个复习:

  • judge.cc 是用来调度判题的
  • judge_client.cc 是真正执行判题逻辑的
/*
 * Copyright 2008 sempr <iamsempr@gmail.com>
 *
 * Refacted and modified by zhblue<newsclan@gmail.com>
 * Bug report email newsclan@gmail.com
 *
 * This file is part of HUSTOJ.
 *
 * HUSTOJ is free software; you can redistribute it and/or modify
 * it under the terms of the GNU General Public License as published by
 * the Free Software Foundation; either version 2 of the License, or
 * (at your option) any later version.
 *
 * HUSTOJ is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 * GNU General Public License for more details.
 *
 * You should have received a copy of the GNU General Public License
 * along with HUSTOJ. if not, see <http://www.gnu.org/licenses/>.
 */
#include <time.h>
#include <stdio.h>
#include <string.h>
#include <ctype.h>
#include <stdlib.h>
#include <unistd.h>
#include <syslog.h>
#include <errno.h>
#include <fcntl.h>
#include <stdarg.h>
#include <sys/wait.h>
#include <sys/stat.h>
#include <signal.h>
#include <sys/resource.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#ifdef OJ_USE_MYSQL
#include <mysql/mysql.h>
#endif

#define BUFFER_SIZE 1024
#define LOCKFILE "/var/run/judged.pid"
#define LOCKMODE (S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH)
#define STD_MB 1048576LL

// 魔术值,定义 OJ 中的各种值
#define OJ_WT0 0
#define OJ_WT1 1
#define OJ_CI 2
#define OJ_RI 3
#define OJ_AC 4
#define OJ_PE 5
#define OJ_WA 6
#define OJ_TL 7
#define OJ_ML 8
#define OJ_OL 9
#define OJ_RE 10
#define OJ_CE 11
#define OJ_CO 12
static char lock_file[BUFFER_SIZE + 32] = LOCKFILE;
static char host_name[BUFFER_SIZE];
static char user_name[BUFFER_SIZE];
static char password[BUFFER_SIZE];
static char db_name[BUFFER_SIZE];
static char oj_home[BUFFER_SIZE];
static char oj_lang_set[BUFFER_SIZE];
static int port_number;
static int max_running;  // 并行判题最大数量
static int sleep_time;
static int sleep_tmp;
static int oj_tot;
static int oj_mod;
static int http_judge = 0;
static char http_baseurl[BUFFER_SIZE];
static char http_apipath[BUFFER_SIZE];
static char http_loginpath[BUFFER_SIZE];
static char http_username[BUFFER_SIZE];
static char http_password[BUFFER_SIZE];
static int prefetch = 80;  // 预获取的题目数量

static int oj_udp = 0;
static char oj_udpserver[BUFFER_SIZE];
static int oj_udpport = 1536;
static int oj_udp_fd;

static int oj_redis = 0;
static char oj_redisserver[BUFFER_SIZE];
static int oj_redisport;
static char oj_redisauth[BUFFER_SIZE];
static char oj_redisqname[BUFFER_SIZE];
static int turbo_mode = 0;
static int use_docker = 0;  // 使用 docker 作为沙箱判题
static char docker_path[BUFFER_SIZE];
static int internal_client = 1;
static int oj_dedicated = 0;

static bool STOP = false;
static int DEBUG = 0;
static int ONCE = 0;
#ifdef _mysql_h
static MYSQL *conn;
static MYSQL_RES *res;
static MYSQL_ROW row;
// static FILE *fp_log;
static char query[BUFFER_SIZE * 4];
#endif
void wait_udp_msg(int fd)
{
    char buf[BUFFER_SIZE]; //......1024..
    socklen_t len;
    int count;
    struct sockaddr_in clent_addr; // clent_addr............
    memset(buf, 0, BUFFER_SIZE);
    len = sizeof(clent_addr);
    count = recvfrom(fd, buf, BUFFER_SIZE, 0, (struct sockaddr *)&clent_addr, &len); // recvfrom...............
    if (count == -1)
    {
        printf("recieve data fail!\n");
        return;
    }
    printf("udp client:%s\n", buf);
    memset(buf, 0, BUFFER_SIZE);
}

// main 函数接受 SIGQUIT | SIGINT | SIGTERM 信号的处理函数
// signal(SIGQUIT, call_for_exit);
// signal(SIGINT, call_for_exit);
// signal(SIGTERM, call_for_exit);
// 具体操作是屏蔽这些信号终止能够终止进程
// 需要使用 pkill -9 judged 命令强行终止进程
void call_for_exit(int s)
{
    if (DEBUG)
    {
        STOP = true;
        printf("Stopping judged...\n");
    }
    else
    {
        printf("HUSTOJ Refusing to stop...\n Please use kill -9 !\n");
    }
}

// 打印日志的工具函数
void write_log(const char *fmt, ...)
{
    va_list ap;
    char buffer[4096];
    //	time_t          t = time(NULL);
    //	int             l;
    sprintf(buffer, "%s/log/client.log", oj_home);
    FILE *fp = fopen(buffer, "ae+");
    if (fp == NULL)
    {
        fprintf(stderr, "openfile error!\n");
        system("pwd");
    }
    va_start(ap, fmt);
    vsprintf(buffer, fmt, ap);
    fprintf(fp, "%s\n", buffer);
    if (DEBUG)
        printf("%s\n", buffer);
    va_end(ap);
    fclose(fp);
}

// 返回字符 = 后面第一个字符的下标
// 例如 after_equal("key=value")
// 012345678
// key=value
//     ↑
// 返回 4(指向 'v' 字符)
int after_equal(char *c)
{
    int i = 0;
    for (; c[i] != '\0' && c[i] != '='; i++)
        ;
    return ++i;
}

// 去除字符串 c 前后的空白字符
// 输入:"  hello "
// 返回:"hello"
void trim(char *c)
{
    char buf[BUFFER_SIZE];
    char *start, *end;
    strcpy(buf, c);
    start = buf;
    while (isspace(*start))
        start++;
    end = start;
    while (!isspace(*end))
        end++;
    *end = '\0';
    strcpy(c, start);
}

// 从 buf 中读取 key=value 格式的值,并去除前后空格存入 value
// 例如:
// 调用:read_buf("username = eling", "username", value);
// 执行结果:函数返回 1, value = eling
bool read_buf(char *buf, const char *key, char *value)
{
    if (strncmp(buf, key, strlen(key)) == 0)
    {
        strcpy(value, buf + after_equal(buf));
        trim(value);
        if (DEBUG)
            printf("%s\n", value);
        return 1;
    }
    return 0;
}

// read_int 从 buf 中读取 key=value 格式的整数值,并存入 value 变量
// 例如:
// 调用:read_buf("size = 12", "size", value);
// 执行结果:函数返回 1, value = 12
void read_int(char *buf, const char *key, int *value)
{
    char buf2[BUFFER_SIZE];
    if (read_buf(buf, key, buf2))
        sscanf(buf2, "%d", value);
}

// 读取 /home/judge/src/etc/judge.conf 配置文件
// 然后将这些配置赋值给上面定义的那些全局变量
void init_judge_conf()
{
    FILE *fp = NULL;
    char buf[BUFFER_SIZE];
    host_name[0] = 0;
    user_name[0] = 0;
    password[0] = 0;
    db_name[0] = 0;
    port_number = 3306;
    max_running = 3;
    sleep_time = 1;
    oj_tot = 1;
    oj_mod = 0;
    strcpy(oj_lang_set, "0,1,3,6");
    strcpy(oj_udpserver, "127.0.0.1");
    strcpy(docker_path, "/usr/bin/docker");
    fp = fopen("./etc/judge.conf", "r");
    if (fp != NULL)
    {
        while (fgets(buf, BUFFER_SIZE - 1, fp))
        {
            read_buf(buf, "OJ_HOST_NAME", host_name);
            read_buf(buf, "OJ_USER_NAME", user_name);
            read_buf(buf, "OJ_PASSWORD", password);
            read_buf(buf, "OJ_DB_NAME", db_name);
            read_int(buf, "OJ_PORT_NUMBER", &port_number);
            read_int(buf, "OJ_RUNNING", &max_running);
            read_int(buf, "OJ_SLEEP_TIME", &sleep_time);
            read_int(buf, "OJ_TOTAL", &oj_tot);
            read_int(buf, "OJ_DEDICATED", &oj_dedicated);

            read_int(buf, "OJ_MOD", &oj_mod);

            read_int(buf, "OJ_HTTP_JUDGE", &http_judge);
            read_buf(buf, "OJ_HTTP_BASEURL", http_baseurl);
            read_buf(buf, "OJ_HTTP_APIPATH", http_apipath);
            read_buf(buf, "OJ_HTTP_LOGINPATH", http_loginpath);
            read_buf(buf, "OJ_HTTP_USERNAME", http_username);
            read_buf(buf, "OJ_HTTP_PASSWORD", http_password);
            read_buf(buf, "OJ_LANG_SET", oj_lang_set);

            read_int(buf, "OJ_UDP_ENABLE", &oj_udp);
            read_buf(buf, "OJ_UDP_SERVER", oj_udpserver);
            read_int(buf, "OJ_UDP_PORT", &oj_udpport);

            read_int(buf, "OJ_REDISENABLE", &oj_redis);
            read_buf(buf, "OJ_REDISSERVER", oj_redisserver);
            read_int(buf, "OJ_REDISPORT", &oj_redisport);
            read_buf(buf, "OJ_REDISAUTH", oj_redisauth);
            read_buf(buf, "OJ_REDISQNAME", oj_redisqname);
            read_int(buf, "OJ_TURBO_MODE", &turbo_mode);
            read_int(buf, "OJ_USE_DOCKER", &use_docker);
            read_int(buf, "OJ_INTERNAL_CLIENT", &internal_client);

            read_buf(buf, "OJ_DOCKER_PATH", docker_path);
        }
#ifdef _mysql_h
        if (oj_tot == 1)
        {
            sprintf(query,
                    "SELECT solution_id FROM solution WHERE language in (%s) and result<2 ORDER BY result, solution_id  limit %d",
                    oj_lang_set, prefetch * max_running);
        }
        else
        {
            sprintf(query,
                    "SELECT solution_id FROM solution WHERE language in (%s) and result<2 and MOD(solution_id,%d)=%d ORDER BY result, solution_id ASC limit %d",
                    oj_lang_set, oj_tot, oj_mod, prefetch * max_running);
        }
#endif
        sleep_tmp = sleep_time;
        fclose(fp);
    }
}

// 启动 judge_client 进程判题
void run_client(int runid, int clientid)
{
    // 使用 setrlimit 系统调用
    // 能够限制进程使用的 CPU、内存、硬盘等计算机的资源
    // 有很多条件编译语句,针对不同的平台,做不同的限制
    char buf[BUFFER_SIZE], runidstr[BUFFER_SIZE];
    struct rlimit LIM;
    LIM.rlim_max = 800;
    LIM.rlim_cur = 800;
    setrlimit(RLIMIT_CPU, &LIM);

    LIM.rlim_max = 1024 * STD_MB;
    LIM.rlim_cur = 1024 * STD_MB;
    setrlimit(RLIMIT_FSIZE, &LIM);
#ifdef __mips__
    LIM.rlim_max = STD_MB << 12;
    LIM.rlim_cur = STD_MB << 12;
#endif
#ifdef __arm__
    LIM.rlim_max = STD_MB << 11;
    LIM.rlim_cur = STD_MB << 11;
#endif
#ifdef __aarch64__
    LIM.rlim_max = STD_MB << 15;
    LIM.rlim_cur = STD_MB << 15;
#endif
#ifdef __i386
    LIM.rlim_max = STD_MB << 11;
    LIM.rlim_cur = STD_MB << 11;
#endif
#ifdef __x86_64__
    LIM.rlim_max = STD_MB << 15;
    LIM.rlim_cur = STD_MB << 15;
#endif
    setrlimit(RLIMIT_AS, &LIM);

    LIM.rlim_cur = LIM.rlim_max = 800 * max_running;
    setrlimit(RLIMIT_NPROC, &LIM);

    // buf[0]=clientid+'0'; buf[1]=0;
    sprintf(runidstr, "%d", runid);
    sprintf(buf, "%d", clientid);

    // write_log("sid=%s\tclient=%s\toj_home=%s\n",runidstr,buf,oj_home);
    // sprintf(err,"%s/run%d/error.out",oj_home,clientid);
    // freopen(err,"a+",stderr);
    //	char * const envp[]={(char * const )"PYTHONIOENCODING=utf-8",
    //			     (char * const )"LANG=zh_CN.UTF-8",
    //			     (char * const )"LANGUAGE=zh_CN.UTF-8",
    //			     (char * const )"LC_ALL=zh_CN.UTF-8",NULL};
    // if (!DEBUG)
    if (use_docker)
    {
        char docker_v[BUFFER_SIZE * 3];
        sprintf(docker_v, "%s:/home/judge", oj_home);
        if (internal_client)
            execl(docker_path, docker_path, "container", "run", "--pids-limit", "100", "--rm", "--cap-add", "SYS_PTRACE", "--net=host",
                  "-v", docker_v, "hustoj", "/usr/bin/judge_client", runidstr, buf, (char *)NULL);
        else
            execl(docker_path, docker_path, "container", "run", "--pids-limit", "100", "--rm", "--cap-add", "SYS_PTRACE", "--net=host",
                  "-v", docker_v, "hustoj", "/home/judge/src/core/judge_client/judge_client", runidstr, buf, (char *)NULL);
    }
    else
    {
        // 使用 execl 系统调用运行 /usr/bin/judge_client 系统调用
        execl("/usr/bin/judge_client", "/usr/bin/judge_client", runidstr, buf,
              oj_home, (char *)NULL);
    }
    // else
    //	execl("/usr/bin/judge_client", "/usr/bin/judge_client", runidstr, buf,
    //			oj_home, "debug", (char *) NULL);
    if (use_docker)
    {

        printf("DOCKER IS DOWN!\n");
    }
    // exit(0);
}
#ifdef _mysql_h
int executesql(const char *sql)
{

    if (mysql_real_query(conn, sql, strlen(sql)))
    {
        if (DEBUG)
            write_log("%s", mysql_error(conn));
        sleep(2);
        conn = NULL;
        return 1;
    }
    else
        return 0;
}
#endif

#ifdef _mysql_h
int init_mysql()
{
    if (conn == NULL)
    {
        conn = mysql_init(NULL); // init the database connection
        /* connect the database */
        const char timeout = 30;
        mysql_options(conn, MYSQL_OPT_CONNECT_TIMEOUT, &timeout);

        if (!mysql_real_connect(conn, host_name, user_name, password, db_name,
                                port_number, 0, 0))
        {
            if (DEBUG)
                write_log("%s", mysql_error(conn));
            sleep(2);
            return 1;
        }
        else
        {
            return executesql("set names utf8");
        }
    }
    else
    {
        return executesql("commit");
    }
}
#endif
FILE *read_cmd_output(const char *fmt, ...)
{
    char cmd[BUFFER_SIZE * 2];

    FILE *ret = NULL;
    va_list ap;

    va_start(ap, fmt);
    vsprintf(cmd, fmt, ap);
    va_end(ap);
    // if(DEBUG) printf("%s\n",cmd);
    ret = popen(cmd, "r");

    return ret;
}
int read_int_http(FILE *f)
{
    char buf[BUFFER_SIZE];
    fgets(buf, BUFFER_SIZE - 1, f);
    return atoi(buf);
}
bool check_login()
{
    const char *cmd =
        "wget --post-data=\"checklogin=1\" --load-cookies=cookie --save-cookies=cookie --keep-session-cookies -q -O - \"%s%s\"";
    int ret = 0;

    FILE *fjobs = read_cmd_output(cmd, http_baseurl, http_apipath);
    ret = read_int_http(fjobs);
    pclose(fjobs);

    return ret > 0;
}
void login()
{
    if (!check_login())
    {
        char cmd[BUFFER_SIZE * 5];
        sprintf(cmd,
                "wget --post-data=\"user_id=%s&password=%s\" --load-cookies=cookie --save-cookies=cookie --keep-session-cookies -q -O - \"%s%s\"",
                http_username, http_password, http_baseurl, http_loginpath);
        system(cmd);
    }
}
int _get_jobs_http(int *jobs)
{
    login();
    int ret = 0;
    int i = 0;
    char buf[BUFFER_SIZE];
    const char *cmd =
        "wget --post-data=\"getpending=1&oj_lang_set=%s&max_running=%d\" --load-cookies=cookie --save-cookies=cookie --keep-session-cookies -q -O - \"%s%s\"";
    FILE *fjobs = read_cmd_output(cmd, oj_lang_set, max_running, http_baseurl, http_apipath);
    while (fscanf(fjobs, "%s", buf) != EOF)
    {
        // puts(buf);
        int sid = atoi(buf);
        if (sid > 0)
            jobs[i++] = sid;
        // i++;
    }
    pclose(fjobs);
    ret = i;
    while (i <= max_running * prefetch)
        jobs[i++] = 0;
    return ret;
}
#ifdef _mysql_h
/**
 * 从数据库中获取等待判题的 solution 的 solution_id
 * 通过 jobs 将 solution_id 数组传出
 * 返回值是获取到的 solution_id 的数量
 */
int _get_jobs_mysql(int *jobs)
{
    // jobs 是传入参数
    // 总体的逻辑就是从 solution 表中查询 未判题的 solution_id,然后将 solution_id 通过传入参数传出
    // 具体的 sql 在下面
    // SELECT solution_id FROM solution WHERE language in (%s) and result<2 and MOD(solution_id,%d)=%d ORDER BY result, solution_id ASC limit %d
    if (mysql_real_query(conn, query, strlen(query)))
    {
        if (DEBUG)
            write_log("%s", mysql_error(conn));
        sleep(2);
        return 0;
    }
    res = mysql_store_result(conn);
    int i = 0;
    int ret = 0;
    while (res != NULL && (row = mysql_fetch_row(res)) != NULL)
    {
        jobs[i++] = atoi(row[0]);
    }

    if (res != NULL && !executesql("commit"))
    {
        mysql_free_result(res); // free the memory
        res = NULL;
    }
    else
        i = 0;
    ret = i;
    while (i <= max_running * prefetch)
        jobs[i++] = 0;
    return ret;
}
#endif
int _get_jobs_redis(int *jobs)
{
    int ret = 0;
    const char *cmd = "redis-cli -h %s -p %d -a %s --raw rpop %s";
    while (ret <= max_running)
    {
        FILE *fjobs = read_cmd_output(cmd, oj_redisserver, oj_redisport, oj_redisauth, oj_redisqname);
        if (fscanf(fjobs, "%d", &jobs[ret]) == 1)
        {
            ret++;
            pclose(fjobs);
        }
        else
        {
            pclose(fjobs);
            break;
        }
    }
    int i = ret;
    while (i <= max_running * prefetch)
        jobs[i++] = 0;
    if (DEBUG)
        printf("redis return %d jobs", ret);
    return ret;
}

int get_jobs(int *jobs)
{
    // http_judge 是配置
    // 判题机支持 mysql 获取判题任务和 http 获取任务
    // 默认是 mysql
    if (http_judge)
    {
        return _get_jobs_http(jobs);
    }
    else
    {
        if (oj_redis)
        {
            return _get_jobs_redis(jobs);
        }
        else
        {
#ifdef _mysql_h
            return _get_jobs_mysql(jobs);
#else
            return 0;
#endif
        }
    }
}

#ifdef _mysql_h
bool _check_out_mysql(int solution_id, int result)
{
    char sql[BUFFER_SIZE];
    sprintf(sql,
            "UPDATE solution SET result=%d,time=0,memory=0,judgetime=NOW() WHERE solution_id=%d and result<2 LIMIT 1",
            result, solution_id);
    if (mysql_real_query(conn, sql, strlen(sql)))
    {
        syslog(LOG_ERR | LOG_DAEMON, "%s", mysql_error(conn));
        return false;
    }
    else
    {
        if (conn != NULL && mysql_affected_rows(conn) > 0ul)
            return true;
        else
            return false;
    }
}
#endif

bool _check_out_http(int solution_id, int result)
{
    login();
    const char *cmd =
        "wget --post-data=\"checkout=1&sid=%d&result=%d\" --load-cookies=cookie --save-cookies=cookie --keep-session-cookies -q -O - \"%s%s\"";
    int ret = 0;
    FILE *fjobs = read_cmd_output(cmd, solution_id, result, http_baseurl, http_apipath);
    fscanf(fjobs, "%d", &ret);
    pclose(fjobs);

    return ret;
}

/**
 * 更新评测结果
 */
bool check_out(int solution_id, int result)
{
    if (oj_redis || oj_tot > 1)
        return true;
    if (http_judge)
    {
        return _check_out_http(solution_id, result);
    }
    else
    {
#ifdef _mysql_h
        return _check_out_mysql(solution_id, result);
#else
        return 0;
#endif
    }
}
static int workcnt = 0;

/**
 * 从数据库获取任务,并且使用 fork() 创建子进程执行任务
 * @return int 
 */
int work()
{
    //      char buf[1024];
    static int error = 0; // 出错计数器
    int retcnt = 0;  // return 值
    int i = 0;
    static pid_t ID[100];  // 父进程存储自己 fork 出来的子进程 ID 数组
    int runid = 0;
    
    // jobs 是一个存储等待判题的 solution ID 的数组。
    // 其大小由 `max_running * prefetch + 1` 决定,其中:
    // `max_running` 是配置项,表示最大并行判题的数量
    // 通过预获取更多的等待判题 solution,可以最大程度减少数据库访问次数,提高判题效率
    // 但并非每次从数据库都能获取 `max_running * prefetch + 1` 个等待判题的 solution,  
    // 因此,jobs 数组的结构可能如下:  
    // `[1001, 1002, 1003, ..., 1009, 0, 0, 0, ..., 0]`  
    // 其中,值大于 0 的元素表示 `solution_id`,其余的 0 仅为占位符。  
    // 这也是后续判题任务派发的 `for` 循环中,使用 `jobs[j] > 0` 作为循环条件的原因。  
    int jobs[max_running * prefetch + 1];
    
    pid_t tmp_pid = 0;

    // 初始化 jobs 数组
    for (i = 0; i < max_running * prefetch + 1; i++)
        jobs[i] = 0;

    // 开始从数据库中获取判题任务
    // 如果没有获取任务,work() 函数直接返回 0
    if (!get_jobs(jobs)) {
        return 0;
    }
    
    // 注意:循环条件是 jobs[j] > 9
    for (int j = 0; jobs[j] > 0; j++)
    {
        // runid 就是 solution_id
        runid = jobs[j];
        // 该 OJ 为了兼容其老版本做的处理,不必理会
        if (runid % oj_tot != oj_mod)
            continue;

        // workcnt 记录当前正在运行的判题任务数量
        // 如果当前运行的任务数达到或超过 max_running(最大并行判题数),
        // 尝试回收已结束的判题子进程(僵尸进程)。
        // 1. 使用 waitpid(-1, NULL, WNOHANG) 尝试回收一个已退出的子进程(并非一定能够回收到,只是尝试)
        // 2. 如果找到已退出的进程,在 ID 数组中抹去该进程的记录,并调整计数:
        //    workcnt--(当前运行任务数减少)
        //    retcnt++(已完成任务数增加)
        // 3. 如果 use_docker 启用,但未能正确回收进程
        //    则可能发生错误,error++ 计数;否则,error = 0(避免误判 Docker 服务卡死)
        if (workcnt >= max_running)
        {                  
            // 回收僵尸进程                         // if no more client can running
            tmp_pid = waitpid(-1, NULL, WNOHANG); // wait 4 one child exit
            if (DEBUG)
                printf("try get one tmp_pid=%d\n", tmp_pid);
            for (i = 0; i < max_running; i++)
            {   // get the client id
                if (ID[i] == tmp_pid)
                {
                    workcnt--;
                    retcnt++;
                    ID[i] = 0;
                    // 重点:这的 i 就是空闲的 client_id
                    break; // got the client id
                }
            }
            if (use_docker && (i == max_running || ID[i] != 0))
                error++;
            else
                error = 0; // check if docker service is hanged up
        }
        else
        { // have free client
            for (i = 0; i < max_running; i++) // find the client id
                if (ID[i] == 0)
                {
                    // 和上面一样
                    // 重点:当前的 i 记录了 空闲的 client_id
                    // 这个 client_id 就是 /home/judge/src/ 
                    // 目录下的 run0, run1, run2 用于运行代码的沙箱的数字
                    break; // got the client id
                }
        }
        if (i < max_running)
        {
            // chekout(runid, OJ_CI)
            // 任务已经派发
            if (workcnt < max_running && check_out(runid, OJ_CI))
            {
                workcnt++;
                ID[i] = fork(); // start to fork
                if (ID[i] == 0)
                {
                    // 子进程开始执行 run_client 函数
                    if (DEBUG)
                    {
                        write_log("Judging solution %d", runid);
                        write_log("<<=sid=%d===clientid=%d==>>\n", runid, i);
                    }
                    run_client(runid, i); // if the process is the son, run it
                    workcnt--;
                    exit(0);
                }
            }
            else
            {
                //	ID[i] = 0;
                if (DEBUG)
                {
                    if (workcnt < max_running)
                        printf("check out failure ! runid:%d pid:%d \n", i, ID[i]);
                    else
                        printf("workcnt:%d max_running:%d ! \n", workcnt, max_running);
                }
                usleep(5000);
            }
        }
        if (DEBUG)
            printf("workcnt:%d max_running:%d ! \n", workcnt, max_running);
        if (use_docker && (error >= max_running * prefetch))
        { // reboot docker
            if (DEBUG)
                printf("---------------------------------------------restarting--------------------------------------\n");
            // system("/usr/sbin/service docker restart");
#ifdef _mysql_h
            executesql("update solution set result=1 where result >1 and result <4 ");
#endif
            sleep(1);
            error = 0;
        }
    }
    int NOHANG = 0;
    if (oj_dedicated && (rand() % 100 > 20))
        NOHANG = WNOHANG; // CPU 占用大约80%左右,不要打满
    // 这里开始死循环,等待上面 fork() 出来的子进程退出,然后然后做好判题记录
    while ((tmp_pid = waitpid(-1, NULL, NOHANG)) > 0)
    { // if run dedicated judge using WNOHANG
        for (i = 0; i < max_running; i++)
        { // get the client id
            if (ID[i] == tmp_pid)
            {
                workcnt--;
                retcnt++;
                ID[i] = 0;
                break; // got the client id
            }
        }
        printf("tmp_pid = %d\n", tmp_pid);
    }
    if (!http_judge)
    {
#ifdef _mysql_h
        if (res != NULL)
        {
            mysql_free_result(res); // free the memory
            res = NULL;
        }
        executesql("commit");
#endif
    }
    if (DEBUG && retcnt)
        write_log("<<%ddone!>>", retcnt);
    // free(ID);
    // free(jobs);
    return retcnt;
}

int lockfile(int fd)
{
    struct flock fl;
    fl.l_type = F_WRLCK;
    fl.l_start = 0;
    fl.l_whence = SEEK_SET;
    fl.l_len = 0;
    return (fcntl(fd, F_SETLK, &fl));
}

int already_running()
{
    int fd;
    char buf[16];
    fd = open(lock_file, O_RDWR | O_CREAT, LOCKMODE);
    if (fd < 0)
    {
        syslog(LOG_ERR | LOG_DAEMON, "can't open %s: %s", LOCKFILE,
               strerror(errno));
        exit(1);
    }
    if (lockfile(fd) < 0)
    {
        if (errno == EACCES || errno == EAGAIN)
        {
            close(fd);
            return 1;
        }
        syslog(LOG_ERR | LOG_DAEMON, "can't lock %s: %s", LOCKFILE,
               strerror(errno));
        exit(1);
    }
    ftruncate(fd, 0);
    sprintf(buf, "%d", getpid());
    write(fd, buf, strlen(buf) + 1);
    return (0);
}

// 启动一个守护进程的常规操作
int daemon_init(void)
{
    pid_t pid;

    if ((pid = fork()) < 0)
        return (-1);

    else if (pid != 0)
        exit(0); /* parent exit */

    /* child continues */

    setsid(); /* become session leader */

    chdir(oj_home); /* change working directory */

    umask(0); /* clear file mode creation mask */

    close(0); /* close stdin */
    close(1); /* close stdout */
    close(2); /* close stderr */

    int fd = open("/dev/null", O_RDWR);
    dup2(fd, 0);
    dup2(fd, 1);
    dup2(fd, 2);
    if (fd > 2)
    {
        close(fd);
    }

    return (0);
}

void turbo_mode2()
{
#ifdef _mysql_h
    if (turbo_mode == 2)
    {
        char sql[BUFFER_SIZE];
        sprintf(sql, " CALL `sync_result`();");
        if (mysql_real_query(conn, sql, strlen(sql)))
            ;
    }
#endif
}
/**
 * TRUBO_MODE = 2 放弃用户表和问题表的数据一致性,以在大型比赛中添加更多的判题机来提高判题速度。
 */
int main(int argc, char **argv)
{
    int oj_udp_ret = 0;
    DEBUG = (argc > 2);
    ONCE = (argc > 3);
    if (argc > 1)
        strcpy(oj_home, argv[1]);
    else
        strcpy(oj_home, "/home/judge");
    chdir(oj_home); // change the dir

    sprintf(lock_file, "%s/etc/judge.pid", oj_home);
    if (!DEBUG)
        daemon_init();
    if (already_running())
    {
        syslog(LOG_ERR | LOG_DAEMON,
               "This daemon program is already running!\n");
        printf("%s already has one judged on it!\n", oj_home);
        return 1;
    }
    if (!DEBUG)
        system("/sbin/iptables -A OUTPUT -m owner --uid-owner judge -j DROP");
    //	struct timespec final_sleep;
    //	final_sleep.tv_sec=0;
    //	final_sleep.tv_nsec=500000000;
    init_judge_conf(); // set the database info
    if (oj_udp)
    {
        oj_udp_fd = socket(AF_INET, SOCK_DGRAM, 0);
        if (oj_udp_fd < 0)
            printf("udp fd open failed! \n");
        struct sockaddr_in ser_addr;
        memset(&ser_addr, 0, sizeof(ser_addr));
        ser_addr.sin_family = AF_INET;
        ser_addr.sin_addr.s_addr = inet_addr(oj_udpserver);
        ser_addr.sin_port = htons(oj_udpport);
        struct timeval timeOut;
        timeOut.tv_sec = sleep_time; //..5s..
        timeOut.tv_usec = 0;
        if (setsockopt(oj_udp_fd, SOL_SOCKET, SO_RCVTIMEO, &timeOut, sizeof(timeOut)) < 0)
        {
            printf("time out setting failed\n");
        }
        oj_udp_ret = bind(oj_udp_fd, (struct sockaddr *)&ser_addr, sizeof(ser_addr));
        if (oj_udp_ret < 0)
            printf("udp fd open failed! \n");
    }
    signal(SIGQUIT, call_for_exit);
    signal(SIGINT, call_for_exit);
    signal(SIGTERM, call_for_exit);

    int j = 1;
    int n = 0;
    while (!STOP)
    { // start to run until call for exit
        n = 0;
        while (j && (http_judge
#ifdef _mysql_h
                     || !init_mysql()
#endif
                         ))
        {
            // 查看 work
            j = work();
            n += j;
            // 如果启用加速模式 并且
            if (turbo_mode == 2 && (n > max_running * 10 || j < max_running))
            {
                turbo_mode2();
                n = 0;
            }

            if (ONCE && j == 0)
                break;
        }

        turbo_mode2();

        if (ONCE && j == 0)
        {
            break;
        }

        if (n == 0)
        {
            printf("workcnt:%d\n", workcnt);
            if (oj_udp && oj_udp_ret == 0)
            {
                if (STOP)
                    return 1;
                wait_udp_msg(oj_udp_fd);
                if (DEBUG)
                    printf("udp job ... \n");
            }
            else
            {
                sleep(sleep_time);
                if (DEBUG)
                    printf("sleeping ... %ds \n", sleep_time);
            }
        }
        j = 1;
    }
#ifdef _mysql_h
    mysql_close(conn);
#endif
    return 0;
}

到此处,有关 HUSTOJ 判题机的 judge.cc 文件,也可以说是判题的调度模块就完成解读了,在后续文章中就会开始解读真正用来判题的 judge_cient.cc 运行机制和源代码解析了。

相关文章:

  • 端到端测试利器:Playwright入门教程
  • 力扣 66.加一 (Java实现)
  • ROS应用之SwarmSim在ROS 中的协同路径规划
  • Baklib全场景云平台:一站式知识管理赋能企业效能升级
  • C++11 thread
  • 大模型应用开发书籍推荐
  • 对项目交接的一些思考
  • 通用知识库问答流程
  • vue2.0接入海康威视控件包V3.3.0——海康威视摄像头接入前端页面(webSDK包)模式
  • 【STM32】外部时钟|红外反射光电开关
  • 40、【OS】【Nuttx】OSTest分析(4):内存监控(二)
  • Python进制转换
  • inline关键字
  • effective-Objective-C第六章阅读笔记
  • hive:分区>>静态分区,动态分区,混合分区
  • 信通院:政府数字化转型发展研究报告(2024年)
  • 解锁 Java 回调函数:异步编程与事件处理的利器
  • Python实现AWS Fargate自动化部署系统
  • enum class与enum
  • 【Python 学习 / 4】基本数据结构之 字符串 与 集合
  • 850亿元!2025年中央金融机构注资特别国债(一期)拟第一次续发行
  • 讲座预告|全球贸易不确定情况下企业创新生态构建
  • 河南省省长王凯在郑州调研促消费工作,走访蜜雪冰城总部
  • “20后”比“60后”更容易遭遇极端气候事件
  • 不主动上门检查,上海已制定14个细分领域“企业白名单”甄别规则
  • 鸿蒙电脑正式亮相,五年布局积累超2700项核心专利