Linux 进程间通信

什么是通信?

  • 数据传输

  • 资源共享

  • 进程控制

  • 通知事件

为什么要通信?

多进程协同完成某种任务内容

进程通信来的模式

  • 管道 基于文件系统的通信

    匿名管道 pipe

    命名管道

  • System V 进程间通讯 聚焦本地通信

    System V 消息队列

    System V 共享内存

    System V 信号量

  • POSIX 进程间通讯 通讯过程中可以跨主机

    消息队列

    共享内存

    信号量

    互斥量

    条件变量

    读写锁

两个进程都是的独立的个体 每个进程通讯的时候不能直接访问对方 OS 需要直接或者间接的给通讯双方进程提供“内存空间” 要通讯的进程需要看到一份公共的资源

管道

我们把从一个进程连接到另一个进程的一个数据流称为一个“管道“

管道只能单向数据通讯

匿名管道

Standard C library (libc, -lc)

#include <unistd.h>

int pipe(int pipefd[2]); // 输出型参数

pipe

简单的测试代码

#include <iostream>
#include <cassert>
#include <string>
#include <cstring>
#include <unistd.h>
​
#include <sys/types.h>
#include <sys/wait.h>
​
#include <cstdio>
using namespace std;
int main()
{
    // 创建管道文件 打开读写端
    int fds[2];
    int n = pipe(fds);
    assert(n==0);
    // 0 1 2 被占用  
    // 分配的临时文件描述符是3,4
    // cout << fds[0] << " " << fds[1] << endl;
    // 3 是 读取  4 是 写
    // Fork  
    pid_t id = fork();
    assert(id >= 0);
​
    if(id == 0)
    {
        // 紫禁城
        // 紫禁城的通讯代码
​
        // 紫禁城进行写入
        close(fds[0]);  // 关闭读取
​
        int cnt = 0;
        while(true)
        {
            cnt ++;
            char buffer[1024];
            snprintf(buffer,sizeof buffer,"child->parent say : %s| [%d] | PID[%d] ",\
            "what can I say",cnt,getpid());
            write(fds[1],buffer,strlen(buffer));
            sleep(1);
        }
        exit(0);
    }
    //父进程的通讯代码
    // 父进程进行读取
    close(fds[1]);
    while(true)
    {
        char buffer[1024];
        ssize_t s = read(fds[0],buffer,sizeof(buffer)  -1 );
        if(s > 0)
            buffer[s] = 0;
        cout << "# " << buffer <<"PID:"<< getpid() <<endl;
    }
    n = waitpid(id,nullptr,0);
    assert(n == id);
    close(fds[0]);
​
    exit(0);
​
    return 0;
}

xinchen@chen:~/testc/testc/pipe$ ./mypipe #child->parent say : what can I say| [1] | PID[2563369] PID:2563368 #child->parent say : what can I say| [2] | PID[2563369] PID:2563368 #child->parent say : what can I say| [3] | PID[2563369] PID:2563368 #child->parent say : what can I say| [4] | PID[2563369] PID:2563368 #child->parent say : what can I say| [5] | PID[2563369] PID:2563368

管道的四种情况

父进程读,子进程写

  • 父进程从管道中读取数据,子进程向管道中写入数据。

  • 如果写入速度快于读取速度,可能导致管道阻塞。

读慢写快

  • 当子进程写数据的速度快于父进程读取的速度时,管道会被填满。此时,写进程会阻塞,等待读进程消耗掉一些数据,腾出空间。

读快写慢

  • 当父进程读取数据的速度快于子进程写入的速度时,管道会很快被清空。此时,读进程会阻塞,等待写进程写入更多数据。

写端关闭,读端读到0

  • 如果子进程关闭写端或者终止,那么父进程在读取时会读到0,表示EOF(文件结束)。

读端关闭,写进程被kill

  • 如果父进程关闭读端或者终止,那么子进程在写入时会收到SIGPIPE信号,从而导致写进程被终止。

管道的特征

  • 管道是生命周期进程

  • 管道可以用来进行具有血缘关系的进程之间进行通讯,常用和父子进程

  • 管道是面向字节流的(网络)

  • 半双工 -- 单项通讯机制

  • 互斥与同步机制 -- 堆共享资源进行保护的方案

Demo 匿名管道 进程均衡分配

#include <iostream>
#include <cassert>
#include <cstdlib>
#include <unistd.h>
#include <cstdio>
#include <ctime>
​
#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>
#include <sys/wait.h>
​
#include <vector>
​
#define PROCSS_NUM 5   // 创建5个进程
#define MAKE_SEED() srand((unsigned long)time(nullptr) ^ getpid() ^ 0x1712212 ^ rand()%1234)
​
// 函数指针
typedef void(*func)();   
// OR
// #include <functional>
//std::function<void()> f = xxxxx
​
class subEp{
​
    public:
        subEp(pid_t subId,int writeFd)
            :subId_(subId),writeFd_(writeFd)
        {
            char namebuffer[1024];; // 
            snprintf(namebuffer,sizeof namebuffer,"poroacess-%d[PID:%d] | [FD:%d]",nums++,subId_,writeFd_);
            name_ = namebuffer;
        }
​
    public:
        static int nums;
        std::string name_;
        pid_t subId_;
        int writeFd_;
​
};
​
​
/////////////////////////////////////////////    要完成的任务   ///////////////////////////////////////////////////////////
​
void download_Task()
{
    std:: cout << getpid() << " : download_Task.... " << std::endl;
    sleep(1);
}
​
void OI_Task()
{
    std::cout << getpid() <<" : OI_Task ..... " << std::endl;
    sleep(1);
}
​
void flush_Task()
{
    std::cout << getpid() << " : flush_Task....." << std::endl;
    sleep(1);
}
​
void loadTaskFunc(std::vector<func> * out)
{
    assert(out); //notnull
    out->push_back(download_Task);
    out->push_back(OI_Task);
    out->push_back(flush_Task);
}
​
​
//////////////////////////////////////////////// 创建 子 进程////////////////////////////////////////
​
int recvTask( int readFd)
{
    int code = 0;
    size_t s = read(readFd,&code , sizeof code); 
    // assert(s == sizeof(int));
​
    if(s == sizeof(int))
        return code;
    else if(s <= 0) return -1;
    else return 0;
}
void createSubProcess(std::vector<subEp> * subs,std::vector<func> &funcs)
{
    for(int i = 0;i < PROCSS_NUM; i ++)
    {
        int fds[2];
        int n = pipe(fds);
        assert(n == 0);     // 已经知道会出现的错误 使用断言 未知debug调试使用条件
        (void)n; // 防止warning
​
​
        // 这可能会有问题   复杂 !! 
        pid_t id  = fork();
​
        if(id == 0)
        {
            
            // 子进程
            // 让子进程读取指令 关闭写
            close(fds[1]);
            // 获取命令码 没有发送 子进程应该阻塞
​
            while(true)
            {
                // 获取任务码 
                int commandCode = recvTask(fds[0]);
                // 完成任务
​
                if(commandCode >=0  && commandCode < funcs.size())
                {
                    funcs[commandCode]();
                }
                else if(commandCode == -1){
                    break;
                }
​
            }
​
            exit(0); // 正常退出进程
        }
        //  父进程  关闭写
        close(fds[0]); 
​
        subEp sub(id,fds[1]);
        subs->push_back(sub);
    }
}
​
​
void sendTask(const subEp & process,int taskNum)
{
    std::cout << "send task num: " << taskNum << " send to" << process.name_  << std::endl;
    int n = write(process.writeFd_,& taskNum,sizeof(taskNum));
    assert(n == sizeof(int));
    (void)n;
}  
​
void loadBlanceContrl(std::vector<subEp> &subs,std::vector<func> &funcMap,int taskCnt)
{
    int processnum = subs.size();
    int tasknum = funcMap.size();
    bool quit = false;
    if(taskCnt  ==  0)quit = true;
    while(quit || taskCnt)
    {
        // 选择一个子进程  std::vector<subEp>  -> index
        int subIdx = rand() % processnum;
        // 选择一个 任务   std::vector<func>  -> index 
        int taskIdx = rand() % tasknum;
        // 任务发送给选择的进程
        sendTask(subs[subIdx],taskIdx);
​
        sleep(1); 
        taskCnt --;
    }
}
​
void waitProcess( std::vector<subEp> processes)
{
    int processnum  = processes.size();
    for(int i = 0;i < processnum;i ++)
    {
        waitpid(processes[i].writeFd_,nullptr,0);
        std::cout << "wait proc " << i << "sussesful " << std::endl;
    }
​
}
//////////////////////////////////////////////     Main函数     /////////////////////////////////////////////
​
int subEp::nums = 0; // 类外初始化  static 类型
​
int main()
{   
    std::vector<func> funcMap; // all 任务
    std::vector<subEp> subs;   // 进程
    // init Task TODO
    MAKE_SEED();
    loadTaskFunc(&funcMap);
     // 创建子进程  维护通讯 信道
    createSubProcess(&subs,funcMap);
    // 父进程处理
    int taskCnt = 5;  // 0代表永远执行
    loadBlanceContrl(subs,funcMap,taskCnt);
    // 回收子进程信息
    waitProcess(subs);
    return 0;
}

命名管道

  • 管道应用的一个限制就是只能在具有共同祖先(具有亲缘关系)的进程间通信。

  • 如果我们想在不相关的进程之间交换数据, 可以使用 FIFO 文件夹来完成这项工作,经常被称为命名管道

  • 是特殊类型的文件

匿名管道与命名管道的区别

  • 匿名管道由pipe函数创建并打开。

  • 命名管道由mkfifo函数创建,打开用open

  • FIFO(命名管道)与pipe(匿名管道)之间唯一的区别在它们创建与打开的方式不同,一但这些工作完 成之后,它们具有相同的语义。

bash 创建命名管道

mkfifo filename

xinchen@chen:~/testc$ mkfifo test xinchen@chen:~/testc$ ls 'Client&ServerDemo' processpool pytest test testc xinchen@chen:~/testc$ ls -l 总计 16 drwxrwxr-x 2 xinchen xinchen 4096 10月 23 13:56 'Client&ServerDemo' drwxrwxr-x 2 xinchen xinchen 4096 10月 23 13:15 processpool drwxrwxr-x 2 xinchen xinchen 4096 10月 23 10:11 pytest prw-rw-r-- 1 xinchen xinchen 0 10月 23 13:59 test drwxrwxr-x 3 xinchen xinchen 4096 10月 22 17:00 testc

以 P开头的都是 管道文件

创建管道的OS也有相应的接口

int mkfifo(const char * filename,mode_t mode);

使用命名管道实现server & client 通讯

.PHONY: all
all: clientPipe serverPipe 
​
clientPipe:clientPipe.cpp
    g++ -o $@ $^ -std=c++11 -g
serverPipe:serverPipe.cpp
    g++ -o $@ $^ -std=c++11 -g
​
.PHONY:clean
clean:
    rm -rf clientPipe serverPipe

comm.hpp

#include <iostream>
​
#include <string>
#include <cstring>
#include  <cerrno>
#include <sys/types.h>
#include <sys/stat.h>
#include <cassert>
​
​
#include <fcntl.h>
#include <unistd.h>
#define NAME_PATH "/tmp/test" 
​
bool createFifo(const std::string & path)
{
    int n = mkfifo(path.c_str(),0666);
​
    if(n == 0)return true;
    else {
​
        std::cout << "Error : " << errno <<  "err String : "<< strerror(errno) << std::endl;
        return false;
    }
}
​
void removeFifo(const std::string & path)
{
    int n = unlink(path.c_str());
    assert(n == 0);
    (void)n;
}

serverPipe.cpp

#include "comm.hpp"
​
int main()
{
    
    bool r = createFifo(NAME_PATH);
    assert(r);
    (void)r;
​
    int rfd = open(NAME_PATH,O_RDONLY);
​
    if(rfd < 0) exit(1);
​
    // Read 
​
    while(true)
    {
        char  buffer[1024];
        ssize_t s = read(rfd,buffer,sizeof(buffer) - 1);
        if(s > 0)
        {
            buffer[s] = 0;
            std::cout << " client # ";
            printf("%s",buffer);
        }
        else if(s == 0)
        {
            std::cout<< "client quit, me too! " << std::endl;
            break;
        }else{
            std::cout << "error" << errno << ":"<<strerror(errno) << std::endl;
            break;
        }
​
    }
​
    close(rfd);
    removeFifo(NAME_PATH);
    return 0;
}
​
​

clientPipe.cpp

#include "comm.hpp"
​
​
int main()
{
​
    std::cout <<" # Hello Client" << std::endl;
​
​
    int wfd = open(NAME_PATH,O_WRONLY);
    if(wfd < 0) exit(1);
​
    // Write
​
    char buffer[1024];
    while(true)
    {
        std::cout << "Plase Say#" << std::endl;
        fgets(buffer,sizeof(buffer),stdin);
        ssize_t n = write(wfd,buffer,strlen(buffer));
        assert(n == strlen(buffer));
        (void)n;
    }
​
​
    
​
    return 0;
}
​

共享内存

共享内存的原理

task_struck (进程结构体)指向 进程地址空间 在进程地址空间通过页表获取到物理内存 而共享内存就是 两个进程通过页表获取到的是同一个物理内存

什么是共享内存

通过让不同的进程看到同一个内存块的方式: 共享内存

共享内存是一种通讯方式

共享内存接口函数

fotk 函数

key_t ftok(const char *pathname, int proj_id);

通过一定的运算方法返回/ 创建可以唯一标识的id

shmget函数 用来创建共享内存

int shmget(key_t key, size_t size, int shmflg);

key 共享内存段的名字

size 共享内存大小

shmflg 由9个权限标志构成

  • IPC_CREAT 不存在创建 /存在获取值

  • IPC_CREAT | IPC_EXCL 不存在创建 /存在出错(使用者的角度看拿到的永远都是新的)

key 是 代表进入共享内存时候设定的 用来标识内核中的唯一性

shmid 和 key 的额 关系 就像 fd 和 inode

共享内存的生命周期是随OS 的并不是 随进程 的

shmat函数 将共享内存段 连接到进程地址空间

void *shmat(int shmid, const void *_Nullable shmaddr, int shmflg);

shmid 共享内存标识

smaddr 指定链接的地址

shmflg:它的两个可能取值是SHM_RND|SHM_RDONLY

返回 返回指针指向共享内存子hi个字节

shmdt函数 将共享内存和当前进程脱离

int shmdt(const void *shmaddr);

shmaddr 由 shmat所返回的指针

成功返回 0 失败返回 -1

shmctl 函数 控制共享内存

int shmctl(int shmid, int cmd, struct shmid_ds *buf);

shmid 由shmget返回的共享内存标识码

cmd 代表将要采取的动作

  • IPC_STAT 将 shmid_ds 数据结构中数据设置为当前关联值

  • IPC_RMID 删除共享 内存段

buf 一个保存着共享内存的模式状态和访问权限的数据结构

返回值 成功返回 0 失败返回 -1

使用命令查看

ipcs -m

xinchen@chen:~/testc/shm$ ipcs -m

------------ 共享内存段 -------------- 键 shmid 拥有者 权限 字节 连接数 状态 0x00000000 229378 xinchen 600 67108864 2 目标 0x00000000 4 xinchen 600 16384 1 目标 0x00000000 589833 xinchen 600 4194304 2 目标 0x00000000 589837 xinchen 600 16384000 2 目标 0x00000000 589848 xinchen 600 5046272 2 目标 0x00000000 622639 xinchen 600 524288 2 目标 0x00000000 294971 xinchen 600 4194304 2 目标

nattch 是 连接数 (这里是中文)

ipcrm -m xxxxxx 删除 shm ipc资源

共享内存模拟代码

makefile

.PHONY:all
all: shm_client  shm_server

shm_client:shm_client.cc
	g++ -o $@ $^ -std=c++11
shm_server:shm_server.cc
	g++ -o $@ $^ -std=c++11 


.PHONY:clean
clean:
	rm -rf shm_client shm_server

shm_client.cc

#include "comm.hpp"  // 包含一个自定义的头文件,可能包含了一些共享内存操作的函数声明
#include <unistd.h>  // 包含Unix标准函数定义,如sleep函数

using namespace std;  // 使用标准命名空间

int main()  // 主函数入口
{
    key_t k = getKey();  // 调用getKey函数获取一个key值,这个key用于创建或访问共享内存
    printf("0x%x\n",k);  // 打印获取到的key值,通常是一个十六进制数

    int shmid = getShm(k);  // 使用上面获取的key值,调用getShm函数来获取共享内存的标识符(ID)
    printf("0x%x\n",shmid);  // 打印共享内存的ID

    char * start = (char *)attchShm(shmid);  // 将共享内存附加到进程的地址空间,并返回起始地址
    printf("attach success, address start : %p \n",start);  // 打印共享内存的起始地址

    const char * message = "我是另一个我 :: can you see me ?";  // 定义一个要写入共享内存的消息字符串

    int cnt = 0;  // 定义一个计数器,用于追踪消息的发送次数
    while(true)  // 无限循环,持续发送消息
    {
        sleep(1);  // 每次循环等待1秒
        snprintf(start, MAX_SIZE, "%s [pid:%d][messageID:%d] ", message, getpid(), cnt++);  // 格式化字符串并写入共享内存
    }

    // 正常情况下,这里不应该直接退出程序,因为会导致共享内存无法正确释放
    // 应该在适当的时候调用detach函数来分离共享内存
    // detach(start);  // 分离共享内存,释放资源

    // delShm(shmid);  // 删除共享内存,释放资源
    // return 0;  // 程序正常退出返回值
}

shm_server.cc

#include "comm.hpp"  // 包含一个自定义的头文件,可能包含了一些共享内存操作的函数声明
#include <unistd.h>  // 包含Unix标准函数定义,如sleep函数

using namespace std;  // 使用标准命名空间

int main()  // 主函数入口
{
    key_t k = getKey();  // 调用getKey函数获取一个key值,这个key用于创建或访问共享内存
    printf("0x%x\n",k);  // 打印获取到的key值,通常是一个十六进制数

    int shmid = getShm(k);  // 使用上面获取的key值,调用getShm函数来获取共享内存的标识符(ID)
    printf("0x%x\n",shmid);  // 打印共享内存的ID

    char * start = (char *)attchShm(shmid);  // 将共享内存附加到进程的地址空间,并返回起始地址
    printf("attach success, address start : %p \n",start);  // 打印共享内存的起始地址

    const char * message = "我是另一个我 :: can you see me ?";  // 定义一个要写入共享内存的消息字符串

    int cnt = 0;  // 定义一个计数器,用于追踪消息的发送次数
    while(true)  // 无限循环,持续发送消息
    {
        sleep(1);  // 每次循环等待1秒
        snprintf(start, MAX_SIZE, "%s [pid:%d][messageID:%d] ", message, getpid(), cnt++);  // 格式化字符串并写入共享内存
    }

    // 正常情况下,这里不应该直接退出程序,因为会导致共享内存无法正确释放
    // 应该在适当的时候调用detach函数来分离共享内存
    // detach(start);  // 分离共享内存,释放资源

    // delShm(shmid);  // 删除共享内存,释放资源
    // return 0;  // 程序正常退出返回值
}

comm.hpp

#ifndef _COMM_HPP_  // 预处理指令,防止头文件内容被重复包含
#define _COMM_HPP_

#include <cstdio>  // 包含C语言标准输入输出库
#include <iostream>  // 包含C++标准输入输出流库
#include <cstring>  // 包含C语言字符串处理库
#include <cerrno>  // 包含错误号定义
#include <sys/shm.h>  // 包含系统共享内存操作的函数和数据结构定义
#include <sys/ipc.h>  // 包含系统IPC(进程间通信)操作的数据结构和宏定义

#define PATHNAME "."  // 定义ftok函数使用的路径名,这里使用当前目录
#define PROJ_ID 0x66  // 定义ftok函数使用的项目ID
#define MAX_SIZE 4096  // 定义共享内存的最大大小为4096字节

// 错误处理函数,打印错误信息并退出程序
void puts_Error()
{
    std::cerr << errno << " : " << strerror(errno) << std::endl;
    exit(1);
}

// 获取key值的函数,使用ftok函数生成一个唯一的key
key_t getKey()
{
    key_t k = ftok(PATHNAME,PROJ_ID);
    if(k<0)puts_Error();  // 如果ftok失败,调用错误处理函数
    return k;
}

// 辅助函数,用于创建或获取共享内存
int getShmHelper(int key,int flags)
{
    int shmid = shmget(key,MAX_SIZE,flags);  // 调用shmget函数创建或获取共享内存
    if(shmid < 0)puts_Error();  // 如果shmget失败,调用错误处理函数
    return shmid;
}

// 获取共享内存的函数,使用IPC_CREAT标志创建共享内存
int getShm(key_t k)
{
    return getShmHelper(k,IPC_CREAT);  // 调用辅助函数,创建共享内存
}

// 创建共享内存的函数,使用IPC_CREAT|IPC_EXCL|0600标志创建共享内存
int createShm(key_t k)
{
    return getShmHelper(k,IPC_CREAT|IPC_EXCL|0600);  // 调用辅助函数,创建共享内存
}

// 附加共享内存到进程地址空间的函数
void *attchShm(int shmid)
{
    void *mem = shmat(shmid,nullptr,0);  // 调用shmat函数将共享内存附加到进程地址空间
    if((long long )mem == -1)  // 如果shmat失败
        puts_Error();  // 调用错误处理函数
    return mem;  // 返回共享内存的起始地址
}

// 分离共享内存的函数
void detach(void *start)
{
    if(shmdt(start) == -1)  // 如果shmdt失败
    {
        puts_Error();  // 调用错误处理函数
    }
}

// 删除共享内存的函数
void delShm(int shmid)
{
    if(shmctl(shmid,IPC_RMID,nullptr)  == -1)  // 如果shmctl失败
        puts_Error();  // 调用错误处理函数
}

#endif // _COMM_HPP_

什么是进程互斥和进程同步?

进程互斥:
  • 进程互斥是指多个进程在访问共享资源时,确保同一时刻只有一个进程可以访问这些资源,以避免数据不一致的情况。那些在同一时刻只允许一个进程使用的资源称为临界资源(或互斥资源)。访问这些资源的程序段称为临界区

  • 进程互斥可以通过使用锁(例如信号量、互斥量、条件变量)来实现,确保在一个进程执行临界区代码时,其他进程无法进入临界区。

进程同步:
  • 进程同步是为了使多个进程能够协调执行,按照某种顺序来共享资源或通信。它确保进程之间的执行有序,解决生产者-消费者问题等需要数据和资源有序访问的情况。

  • 进程同步常通过信号量、事件、条件变量等机制来实现,以确保进程间的相互依赖和协调。

2. 为什么共享内存没有进行同步和互斥,而管道有?

  • 共享内存:

    • 共享内存是一种高效的进程间通信(IPC)方式,因为内存是直接访问的,速度非常快。但由于共享内存本身不具备同步和互斥功能,多个进程同时对共享内存进行读写操作时可能导致数据竞争问题。因此,需要使用额外的同步机制(如信号量、互斥量)来控制对共享内存的访问,确保数据一致性。

  • 管道:

    • 管道是进程间通信的一种机制,系统会自动管理同步,防止进程间的数据竞争。当一个进程向管道写入数据时,另一个进程可以从管道读取数据,并且这些操作是有序的,从而避免了并发读写的冲突问题