Linux 跨进程同步方案
一、信号量(Semaphores)
-
原理与适用场景
信号量通过 PV 操作(原子增减操作)控制资源访问权限,适用于需要严格互斥的场景(如共享内存、文件等资源的并发访问)。- System V 信号量:通过唯一键值(
key
)标识,支持跨进程同步。 - POSIX 信号量:提供更现代接口(
sem_open
),兼容性需结合系统环境。
- System V 信号量:通过唯一键值(
-
实现步骤
- 创建信号量集:
int semid = semget(key, nsems, IPC_CREAT | 0666); semctl(semid, 0, SETVAL, 1); // 初始化信号量值为1
- PV 操作:
struct sembuf p = {0, -1, 0}; // P操作(获取锁) struct sembuf v = {0, 1, 0}; // V操作(释放锁) semop(semid, &p, 1); // 加锁 semop(semid, &v, 1); // 解锁
- 适用场景:多进程共享资源的互斥访问(如共享内存缓冲区)。
#include <stdio.h> #include <stdlib.h> #include <sys/ipc.h> #include <sys/sem.h> #include <sys/types.h> #include <unistd.h> #define SEM_KEY 0x1234 // 自定义信号量键值 #define NSEMS 1 // 信号量集合中信号量的数量 // 手动定义 semun 联合体(某些 Linux 版本需显式声明) union semun { int val; struct semid_ds *buf; unsigned short *array; }; //初始化信号量 int init_semaphore() { // 创建信号量集 int semid = semget(SEM_KEY, NSEMS, IPC_CREAT | 0666); // 权限设为 0666 if (semid == -1) { perror("semget failed"); exit(EXIT_FAILURE); } // 初始化信号量值为1(互斥锁) union semun arg; arg.val = 1; if (semctl(semid, 0, SETVAL, arg) == -1) { // 对第一个信号量(索引0)设置初始值 perror("semctl(SETVAL) failed"); exit(EXIT_FAILURE); } return semid; } //P/V 操作 void sem_p(int semid) { struct sembuf op = {0, -1, SEM_UNDO}; // 索引0的信号量减1(P操作) if (semop(semid, &op, 1) == -1) { perror("semop(P) failed"); exit(EXIT_FAILURE); } } void sem_v(int semid) { struct sembuf op = {0, 1, SEM_UNDO}; // 索引0的信号量加1(V操作) if (semop(semid, &op, 1) == -1) { perror("semop(V) failed"); exit(EXIT_FAILURE); } } //进程逻辑(模拟临界区操作) void process_job(int semid) { printf("Process %d waiting for semaphore...\n", getpid()); sem_p(semid); // 进入临界区前获取信号量 printf("Process %d entered critical section.\n", getpid()); sleep(2); // 模拟临界区操作 sem_v(semid); // 离开临界区后释放信号量 printf("Process %d released semaphore.\n", getpid()); } int main() { int semid = init_semaphore(); pid_t pid = fork(); if (pid == 0) { // 子进程 process_job(semid); exit(0); } else { // 父进程 process_job(semid); wait(NULL); // 等待子进程结束 // 删除信号量集 if (semctl(semid, 0, IPC_RMID) == -1) { // 清理资源 perror("semctl(IPC_RMID) failed"); exit(EXIT_FAILURE); } } return 0; }
- 创建信号量集:
二、共享内存 + 同步机制
-
互斥锁(Mutex)
- 原理:在共享内存中定义互斥锁(
pthread_mutex_t
),通过pthread_mutex_lock
和pthread_mutex_unlock
实现原子操作。
默认pthread_mutex_t
仅支持单进程内线程同步,需通过 PTHREAD_PROCESS_SHARED
属性启用跨进程支持。pthread_mutexattr_t attr; pthread_mutexattr_init(&attr); pthread_mutexattr_setpshared(&attr, PTHREAD_PROCESS_SHARED); pthread_mutex_init(mutex, &attr);
- 实现要点:
// 共享内存结构体定义 typedef struct { pthread_mutex_t mutex; char data[1024]; } SharedData; pthread_mutex_init(&shared_mem->mutex, NULL); // 初始化锁
- 原理:在共享内存中定义互斥锁(
-
条件变量(Condition Variable)
- 原理:与互斥锁配合,允许进程在条件不满足时挂起(
pthread_cond_wait
)或唤醒其他进程(pthread_cond_signal
)。 - 适用场景:生产者-消费者模型,需等待特定条件触发的场景。
- 原理:与互斥锁配合,允许进程在条件不满足时挂起(
- 示例代码1
#include <stdio.h> #include <stdlib.h> #include <pthread.h> #include <sys/shm.h> #include <sys/ipc.h> int main() { key_t key = ftok("/tmp", 0x01); int shmid = shmget(key, sizeof(pthread_mutex_t), IPC_CREAT | 0666); pthread_mutex_t *mutex = shmat(shmid, NULL, 0); // 初始化互斥锁属性 pthread_mutexattr_t attr; pthread_mutexattr_init(&attr); pthread_mutexattr_setpshared(&attr, PTHREAD_PROCESS_SHARED); pthread_mutex_init(mutex, &attr); pid_t pid = fork(); if (pid == 0) { // 子进程加锁 pthread_mutex_lock(mutex); printf("Child process entered critical section.\n"); sleep(1); pthread_mutex_unlock(mutex); exit(0); } else { // 父进程加锁 pthread_mutex_lock(mutex); printf("Parent process entered critical section.\n"); sleep(1); pthread_mutex_unlock(mutex); wait(NULL); } // 清理资源 shmdt(mutex); shmctl(shmid, IPC_RMID, NULL); return 0; }
- 示例代码2
#include <stdio.h> #include <stdlib.h> #include <unistd.h> #include <pthread.h> #include <sys/mman.h> #include <sys/stat.h> #include <fcntl.h> #include <sys/wait.h> // 定义共享内存中的数据结构 typedef struct { pthread_mutex_t mutex; // 跨进程互斥锁 pthread_cond_t cond; // 跨进程条件变量 int data_ready; // 数据是否就绪(条件判断标志) int data; // 共享数据 } SharedData; // 创建或附加共享内存 SharedData* init_shared_memory() { int fd = shm_open("/shared_mem", O_CREAT | O_RDWR, 0666); ftruncate(fd, sizeof(SharedData)); // 设置共享内存大小 // 映射共享内存 SharedData* shm = mmap(NULL, sizeof(SharedData), PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0); close(fd); // 初始化互斥锁和条件变量(仅由父进程执行一次) static int initialized = 0; if (!initialized) { pthread_mutexattr_t mutex_attr; pthread_condattr_t cond_attr; // 设置互斥锁为跨进程共享 pthread_mutexattr_init(&mutex_attr); pthread_mutexattr_setpshared(&mutex_attr, PTHREAD_PROCESS_SHARED); pthread_mutex_init(&shm->mutex, &mutex_attr); // 设置条件变量为跨进程共享 pthread_condattr_init(&cond_attr); pthread_condattr_setpshared(&cond_attr, PTHREAD_PROCESS_SHARED); pthread_cond_init(&shm->cond, &cond_attr); shm->data_ready = 0; // 初始化条件标志 initialized = 1; } return shm; } //生产者进程 void producer(SharedData* shm) { pthread_mutex_lock(&shm->mutex); // 生产数据 shm->data = rand() % 100; printf("[Producer] Generated data: %d\n", shm->data); // 标记数据已就绪,并通知消费者 shm->data_ready = 1; pthread_cond_signal(&shm->cond); // 唤醒等待的进程 pthread_mutex_unlock(&shm->mutex); } //消费者进程 void consumer(SharedData* shm) { pthread_mutex_lock(&shm->mutex); // 等待数据就绪(使用 while 防止虚假唤醒) while (shm->data_ready == 0) { pthread_cond_wait(&shm->cond, &shm->mutex); } // 消费数据 printf("[Consumer] Received data: %d\n", shm->data); shm->data_ready = 0; // 重置条件标志 pthread_mutex_unlock(&shm->mutex); } int main() { SharedData* shm = init_shared_memory(); pid_t pid = fork(); if (pid == 0) { // 子进程作为消费者 consumer(shm); exit(0); } else { // 父进程作为生产者 producer(shm); wait(NULL); // 等待子进程结束 // 清理资源 pthread_mutex_destroy(&shm->mutex); pthread_cond_destroy(&shm->cond); munmap(shm, sizeof(SharedData)); shm_unlink("/shared_mem"); // 删除共享内存对象 } return 0; }
三、管道(Pipe)与命名管道(FIFO)
-
匿名管道
- 原理:通过
pipe
创建,仅适用于父子进程或兄弟进程,利用read
/write
阻塞特性实现同步。 - 示例:
int fd; pipe(fd); if (fork() == 0) { read(fd, buf, size); // 子进程阻塞读取 } else { write(fd, data, size); // 父进程写入数据 }
- 原理:通过
-
命名管道(FIFO)
- 原理:通过
mkfifo
创建具名管道文件,支持无亲缘关系进程通信,操作方式与匿名管道类似。
- 原理:通过
四、文件锁(File Locking)
- 原理:使用
fcntl
或flock
对文件区域加锁,确保同一时刻仅一个进程可访问指定资源。- 示例:
struct flock lock = { .l_type = F_WRLCK, .l_whence = SEEK_SET, .l_start = 0, .l_len = 0 }; fcntl(fd, F_SETLKW, &lock); // 阻塞式加锁
- 适用场景:跨进程文件读写同步。
- 示例代码
#include <stdio.h> #include <stdlib.h> #include <unistd.h> #include <fcntl.h> #include <sys/file.h> #define LOCK_FILE "/tmp/flock_example.file" void process_job() { int fd = open(LOCK_FILE, O_RDWR | O_CREAT, 0666); if (fd == -1) { perror("open() failed"); exit(EXIT_FAILURE); } // 加锁(阻塞式) if (flock(fd, LOCK_EX) == -1) { // LOCK_SH 为共享锁 perror("flock() failed"); exit(EXIT_FAILURE); } printf("Process %d acquired lock.\n", getpid()); // 临界区操作 char buf[256]; snprintf(buf, sizeof(buf), "Data from process %d\n", getpid()); write(fd, buf, strlen(buf)); sleep(2); // 解锁 flock(fd, LOCK_UN); printf("Process %d released lock.\n", getpid()); close(fd); } int main() { process_job(); return 0; }
- 示例:
五、其他方案
-
消息队列(Message Queue)
- 通过
msgget
和msgsnd/msgrcv
传递结构化消息,支持进程间异步通信。//生产者进程 #include <stdio.h> #include <stdlib.h> #include <string.h> #include <sys/ipc.h> #include <sys/msg.h> // 定义消息结构体(必须包含长整型 `mtype`) struct message { long mtype; // 消息类型(必须 > 0) char data[256]; // 消息数据 }; int main() { key_t key = ftok("/tmp", 'A'); // 生成唯一键值 int msgid = msgget(key, IPC_CREAT | 0666); // 创建消息队列 if (msgid == -1) { perror("msgget failed"); exit(EXIT_FAILURE); } // 发送消息 struct message msg; msg.mtype = 1; // 消息类型设为1(接收端需匹配此类型) strcpy(msg.data, "Hello from sender process!"); if (msgsnd(msgid, &msg, sizeof(msg.data), 0) == -1) { // 发送消息(忽略 `mtype` 长度) perror("msgsnd failed"); exit(EXIT_FAILURE); } printf("Message sent: %s\n", msg.data); return 0; }
//消费者进程 #include <stdio.h> #include <stdlib.h> #include <sys/ipc.h> #include <sys/msg.h> struct message { long mtype; char data[256]; }; int main() { key_t key = ftok("/tmp", 'A'); // 使用相同键值 int msgid = msgget(key, 0666); // 获取已存在的消息队列 if (msgid == -1) { perror("msgget failed"); exit(EXIT_FAILURE); } // 接收消息 struct message msg; if (msgrcv(msgid, &msg, sizeof(msg.data), 1, 0) == -1) { // 接收类型为1的消息 perror("msgrcv failed"); exit(EXIT_FAILURE); } printf("Message received: %s\n", msg.data); // 删除消息队列 if (msgctl(msgid, IPC_RMID, NULL) == -1) { perror("msgctl(IPC_RMID) failed"); exit(EXIT_FAILURE); } return 0; }
- 通过
-
套接字(Socket)
- 本地套接字(
AF_UNIX
)支持双向通信,结合send
/recv
实现同步。
- 本地套接字(