Linux 进程间通信
什么是通信?
数据传输
资源共享
进程控制
通知事件
为什么要通信?
多进程协同完成某种任务内容
进程通信来的模式
管道 基于文件系统的通信
匿名管道 pipe
命名管道
System V 进程间通讯 聚焦本地通信System V 消息队列
System V 共享内存
System V 信号量
POSIX 进程间通讯 通讯过程中可以跨主机
消息队列
共享内存
信号量
互斥量
条件变量
读写锁
两个进程都是的独立的个体 每个进程通讯的时候不能直接访问对方 OS 需要直接或者间接的给通讯双方进程提供“内存空间” 要通讯的进程需要看到一份公共的资源
管道
?我们把从一个进程连接到另一个进程的一个数据流称为一个“管道“
管道只能单向数据通讯
匿名管道
Standard C library (libc, -lc)
#include <unistd.h>
int pipe(int pipefd[2]); // 输出型参数
pipe
简单的测试代码
#include <iostream>
#include <cassert>
#include <string>
#include <cstring>
#include <unistd.h>
#include <sys/types.h>
#include <sys/wait.h>
#include <cstdio>
using namespace std;
int main()
{
// 创建管道文件 打开读写端
int fds[2];
int n = pipe(fds);
assert(n==0);
// 0 1 2 被占用
// 分配的临时文件描述符是3,4
// cout << fds[0] << " " << fds[1] << endl;
// 3 是 读取 4 是 写
// Fork
pid_t id = fork();
assert(id >= 0);
if(id == 0)
{
// 紫禁城
// 紫禁城的通讯代码
// 紫禁城进行写入
close(fds[0]); // 关闭读取
int cnt = 0;
while(true)
{
cnt ++;
char buffer[1024];
snprintf(buffer,sizeof buffer,"child->parent say : %s| [%d] | PID[%d] ",\
"what can I say",cnt,getpid());
write(fds[1],buffer,strlen(buffer));
sleep(1);
}
exit(0);
}
//父进程的通讯代码
// 父进程进行读取
close(fds[1]);
while(true)
{
char buffer[1024];
ssize_t s = read(fds[0],buffer,sizeof(buffer) -1 );
if(s > 0)
buffer[s] = 0;
cout << "# " << buffer <<"PID:"<< getpid() <<endl;
}
n = waitpid(id,nullptr,0);
assert(n == id);
close(fds[0]);
exit(0);
return 0;
}
xinchen@chen:~/testc/testc/pipe$ ./mypipe #child->parent say : what can I say| [1] | PID[2563369] PID:2563368 #child->parent say : what can I say| [2] | PID[2563369] PID:2563368 #child->parent say : what can I say| [3] | PID[2563369] PID:2563368 #child->parent say : what can I say| [4] | PID[2563369] PID:2563368 #child->parent say : what can I say| [5] | PID[2563369] PID:2563368
管道的四种情况
父进程读,子进程写
父进程从管道中读取数据,子进程向管道中写入数据。
如果写入速度快于读取速度,可能导致管道阻塞。
读慢写快
当子进程写数据的速度快于父进程读取的速度时,管道会被填满。此时,写进程会阻塞,等待读进程消耗掉一些数据,腾出空间。
读快写慢
当父进程读取数据的速度快于子进程写入的速度时,管道会很快被清空。此时,读进程会阻塞,等待写进程写入更多数据。
写端关闭,读端读到0
如果子进程关闭写端或者终止,那么父进程在读取时会读到0,表示EOF(文件结束)。
读端关闭,写进程被kill
如果父进程关闭读端或者终止,那么子进程在写入时会收到
SIGPIPE
信号,从而导致写进程被终止。
管道的特征
管道是生命周期进程
管道可以用来进行具有血缘关系的进程之间进行通讯,常用和父子进程
管道是面向字节流的(网络)
半双工 -- 单项通讯机制
互斥与同步机制 -- 堆共享资源进行保护的方案
Demo 匿名管道 进程均衡分配
#include <iostream>
#include <cassert>
#include <cstdlib>
#include <unistd.h>
#include <cstdio>
#include <ctime>
#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>
#include <sys/wait.h>
#include <vector>
#define PROCSS_NUM 5 // 创建5个进程
#define MAKE_SEED() srand((unsigned long)time(nullptr) ^ getpid() ^ 0x1712212 ^ rand()%1234)
// 函数指针
typedef void(*func)();
// OR
// #include <functional>
//std::function<void()> f = xxxxx
class subEp{
public:
subEp(pid_t subId,int writeFd)
:subId_(subId),writeFd_(writeFd)
{
char namebuffer[1024];; //
snprintf(namebuffer,sizeof namebuffer,"poroacess-%d[PID:%d] | [FD:%d]",nums++,subId_,writeFd_);
name_ = namebuffer;
}
public:
static int nums;
std::string name_;
pid_t subId_;
int writeFd_;
};
///////////////////////////////////////////// 要完成的任务 ///////////////////////////////////////////////////////////
void download_Task()
{
std:: cout << getpid() << " : download_Task.... " << std::endl;
sleep(1);
}
void OI_Task()
{
std::cout << getpid() <<" : OI_Task ..... " << std::endl;
sleep(1);
}
void flush_Task()
{
std::cout << getpid() << " : flush_Task....." << std::endl;
sleep(1);
}
void loadTaskFunc(std::vector<func> * out)
{
assert(out); //notnull
out->push_back(download_Task);
out->push_back(OI_Task);
out->push_back(flush_Task);
}
//////////////////////////////////////////////// 创建 子 进程////////////////////////////////////////
int recvTask( int readFd)
{
int code = 0;
size_t s = read(readFd,&code , sizeof code);
// assert(s == sizeof(int));
if(s == sizeof(int))
return code;
else if(s <= 0) return -1;
else return 0;
}
void createSubProcess(std::vector<subEp> * subs,std::vector<func> &funcs)
{
for(int i = 0;i < PROCSS_NUM; i ++)
{
int fds[2];
int n = pipe(fds);
assert(n == 0); // 已经知道会出现的错误 使用断言 未知debug调试使用条件
(void)n; // 防止warning
// 这可能会有问题 复杂 !!
pid_t id = fork();
if(id == 0)
{
// 子进程
// 让子进程读取指令 关闭写
close(fds[1]);
// 获取命令码 没有发送 子进程应该阻塞
while(true)
{
// 获取任务码
int commandCode = recvTask(fds[0]);
// 完成任务
if(commandCode >=0 && commandCode < funcs.size())
{
funcs[commandCode]();
}
else if(commandCode == -1){
break;
}
}
exit(0); // 正常退出进程
}
// 父进程 关闭写
close(fds[0]);
subEp sub(id,fds[1]);
subs->push_back(sub);
}
}
void sendTask(const subEp & process,int taskNum)
{
std::cout << "send task num: " << taskNum << " send to" << process.name_ << std::endl;
int n = write(process.writeFd_,& taskNum,sizeof(taskNum));
assert(n == sizeof(int));
(void)n;
}
void loadBlanceContrl(std::vector<subEp> &subs,std::vector<func> &funcMap,int taskCnt)
{
int processnum = subs.size();
int tasknum = funcMap.size();
bool quit = false;
if(taskCnt == 0)quit = true;
while(quit || taskCnt)
{
// 选择一个子进程 std::vector<subEp> -> index
int subIdx = rand() % processnum;
// 选择一个 任务 std::vector<func> -> index
int taskIdx = rand() % tasknum;
// 任务发送给选择的进程
sendTask(subs[subIdx],taskIdx);
sleep(1);
taskCnt --;
}
}
void waitProcess( std::vector<subEp> processes)
{
int processnum = processes.size();
for(int i = 0;i < processnum;i ++)
{
waitpid(processes[i].writeFd_,nullptr,0);
std::cout << "wait proc " << i << "sussesful " << std::endl;
}
}
////////////////////////////////////////////// Main函数 /////////////////////////////////////////////
int subEp::nums = 0; // 类外初始化 static 类型
int main()
{
std::vector<func> funcMap; // all 任务
std::vector<subEp> subs; // 进程
// init Task TODO
MAKE_SEED();
loadTaskFunc(&funcMap);
// 创建子进程 维护通讯 信道
createSubProcess(&subs,funcMap);
// 父进程处理
int taskCnt = 5; // 0代表永远执行
loadBlanceContrl(subs,funcMap,taskCnt);
// 回收子进程信息
waitProcess(subs);
return 0;
}
命名管道
管道应用的一个限制就是只能在具有共同祖先(具有亲缘关系)的进程间通信。
如果我们想在不相关的进程之间交换数据, 可以使用 FIFO 文件夹来完成这项工作,经常被称为命名管道
是特殊类型的文件
匿名管道与命名管道的区别
匿名管道由pipe函数创建并打开。
命名管道由mkfifo函数创建,打开用open
FIFO(命名管道)与pipe(匿名管道)之间唯一的区别在它们创建与打开的方式不同,一但这些工作完 成之后,它们具有相同的语义。
bash 创建命名管道
mkfifo filename
xinchen@chen:~/testc$ mkfifo test xinchen@chen:~/testc$ ls 'Client&ServerDemo' processpool pytest test testc xinchen@chen:~/testc$ ls -l 总计 16 drwxrwxr-x 2 xinchen xinchen 4096 10月 23 13:56 'Client&ServerDemo' drwxrwxr-x 2 xinchen xinchen 4096 10月 23 13:15 processpool drwxrwxr-x 2 xinchen xinchen 4096 10月 23 10:11 pytest prw-rw-r-- 1 xinchen xinchen 0 10月 23 13:59 test drwxrwxr-x 3 xinchen xinchen 4096 10月 22 17:00 testc
以 P开头的都是 管道文件
创建管道的OS也有相应的接口
int mkfifo(const char * filename,mode_t mode);
使用命名管道实现server & client 通讯
.PHONY: all
all: clientPipe serverPipe
clientPipe:clientPipe.cpp
g++ -o $@ $^ -std=c++11 -g
serverPipe:serverPipe.cpp
g++ -o $@ $^ -std=c++11 -g
.PHONY:clean
clean:
rm -rf clientPipe serverPipe
comm.hpp
#include <iostream>
#include <string>
#include <cstring>
#include <cerrno>
#include <sys/types.h>
#include <sys/stat.h>
#include <cassert>
#include <fcntl.h>
#include <unistd.h>
#define NAME_PATH "/tmp/test"
bool createFifo(const std::string & path)
{
int n = mkfifo(path.c_str(),0666);
if(n == 0)return true;
else {
std::cout << "Error : " << errno << "err String : "<< strerror(errno) << std::endl;
return false;
}
}
void removeFifo(const std::string & path)
{
int n = unlink(path.c_str());
assert(n == 0);
(void)n;
}
serverPipe.cpp
#include "comm.hpp"
int main()
{
bool r = createFifo(NAME_PATH);
assert(r);
(void)r;
int rfd = open(NAME_PATH,O_RDONLY);
if(rfd < 0) exit(1);
// Read
while(true)
{
char buffer[1024];
ssize_t s = read(rfd,buffer,sizeof(buffer) - 1);
if(s > 0)
{
buffer[s] = 0;
std::cout << " client # ";
printf("%s",buffer);
}
else if(s == 0)
{
std::cout<< "client quit, me too! " << std::endl;
break;
}else{
std::cout << "error" << errno << ":"<<strerror(errno) << std::endl;
break;
}
}
close(rfd);
removeFifo(NAME_PATH);
return 0;
}
clientPipe.cpp
#include "comm.hpp"
int main()
{
std::cout <<" # Hello Client" << std::endl;
int wfd = open(NAME_PATH,O_WRONLY);
if(wfd < 0) exit(1);
// Write
char buffer[1024];
while(true)
{
std::cout << "Plase Say#" << std::endl;
fgets(buffer,sizeof(buffer),stdin);
ssize_t n = write(wfd,buffer,strlen(buffer));
assert(n == strlen(buffer));
(void)n;
}
return 0;
}
共享内存
共享内存的原理
task_struck (进程结构体)指向 进程地址空间 在进程地址空间通过页表获取到物理内存 而共享内存就是 两个进程通过页表获取到的是同一个物理内存
什么是共享内存
通过让不同的进程看到同一个内存块的方式: 共享内存
共享内存是一种通讯方式
共享内存接口函数
fotk 函数
key_t ftok(const char *pathname, int proj_id);
通过一定的运算方法返回/ 创建可以唯一标识的id
shmget函数 用来创建共享内存
int shmget(key_t key, size_t size, int shmflg);
key 共享内存段的名字
size 共享内存大小
shmflg 由9个权限标志构成
IPC_CREAT 不存在创建 /存在获取值
IPC_CREAT | IPC_EXCL 不存在创建 /存在出错(使用者的角度看拿到的永远都是新的)
key 是 代表进入共享内存时候设定的 用来标识内核中的唯一性
shmid 和 key 的额 关系 就像 fd 和 inode
共享内存的生命周期是随OS 的并不是 随进程 的
shmat函数 将共享内存段 连接到进程地址空间
void *shmat(int shmid, const void *_Nullable shmaddr, int shmflg);
shmid 共享内存标识
smaddr 指定链接的地址
shmflg:它的两个可能取值是SHM_RND|SHM_RDONLY
返回 返回指针指向共享内存子hi个字节
shmdt函数 将共享内存和当前进程脱离
int shmdt(const void *shmaddr);
shmaddr 由 shmat所返回的指针
成功返回 0 失败返回 -1
shmctl 函数 控制共享内存
int shmctl(int shmid, int cmd, struct shmid_ds *buf);
shmid 由shmget返回的共享内存标识码
cmd 代表将要采取的动作
IPC_STAT 将 shmid_ds 数据结构中数据设置为当前关联值
IPC_RMID 删除共享 内存段
buf 一个保存着共享内存的模式状态和访问权限的数据结构
返回值 成功返回 0 失败返回 -1
使用命令查看
ipcs -m
xinchen@chen:~/testc/shm$ ipcs -m
------------ 共享内存段 -------------- 键 shmid 拥有者 权限 字节 连接数 状态 0x00000000 229378 xinchen 600 67108864 2 目标 0x00000000 4 xinchen 600 16384 1 目标 0x00000000 589833 xinchen 600 4194304 2 目标 0x00000000 589837 xinchen 600 16384000 2 目标 0x00000000 589848 xinchen 600 5046272 2 目标 0x00000000 622639 xinchen 600 524288 2 目标 0x00000000 294971 xinchen 600 4194304 2 目标
nattch 是 连接数 (这里是中文)
ipcrm -m xxxxxx
删除 shm ipc资源
共享内存模拟代码
makefile
.PHONY:all
all: shm_client shm_server
shm_client:shm_client.cc
g++ -o $@ $^ -std=c++11
shm_server:shm_server.cc
g++ -o $@ $^ -std=c++11
.PHONY:clean
clean:
rm -rf shm_client shm_server
shm_client.cc
#include "comm.hpp" // 包含一个自定义的头文件,可能包含了一些共享内存操作的函数声明
#include <unistd.h> // 包含Unix标准函数定义,如sleep函数
using namespace std; // 使用标准命名空间
int main() // 主函数入口
{
key_t k = getKey(); // 调用getKey函数获取一个key值,这个key用于创建或访问共享内存
printf("0x%x\n",k); // 打印获取到的key值,通常是一个十六进制数
int shmid = getShm(k); // 使用上面获取的key值,调用getShm函数来获取共享内存的标识符(ID)
printf("0x%x\n",shmid); // 打印共享内存的ID
char * start = (char *)attchShm(shmid); // 将共享内存附加到进程的地址空间,并返回起始地址
printf("attach success, address start : %p \n",start); // 打印共享内存的起始地址
const char * message = "我是另一个我 :: can you see me ?"; // 定义一个要写入共享内存的消息字符串
int cnt = 0; // 定义一个计数器,用于追踪消息的发送次数
while(true) // 无限循环,持续发送消息
{
sleep(1); // 每次循环等待1秒
snprintf(start, MAX_SIZE, "%s [pid:%d][messageID:%d] ", message, getpid(), cnt++); // 格式化字符串并写入共享内存
}
// 正常情况下,这里不应该直接退出程序,因为会导致共享内存无法正确释放
// 应该在适当的时候调用detach函数来分离共享内存
// detach(start); // 分离共享内存,释放资源
// delShm(shmid); // 删除共享内存,释放资源
// return 0; // 程序正常退出返回值
}
shm_server.cc
#include "comm.hpp" // 包含一个自定义的头文件,可能包含了一些共享内存操作的函数声明
#include <unistd.h> // 包含Unix标准函数定义,如sleep函数
using namespace std; // 使用标准命名空间
int main() // 主函数入口
{
key_t k = getKey(); // 调用getKey函数获取一个key值,这个key用于创建或访问共享内存
printf("0x%x\n",k); // 打印获取到的key值,通常是一个十六进制数
int shmid = getShm(k); // 使用上面获取的key值,调用getShm函数来获取共享内存的标识符(ID)
printf("0x%x\n",shmid); // 打印共享内存的ID
char * start = (char *)attchShm(shmid); // 将共享内存附加到进程的地址空间,并返回起始地址
printf("attach success, address start : %p \n",start); // 打印共享内存的起始地址
const char * message = "我是另一个我 :: can you see me ?"; // 定义一个要写入共享内存的消息字符串
int cnt = 0; // 定义一个计数器,用于追踪消息的发送次数
while(true) // 无限循环,持续发送消息
{
sleep(1); // 每次循环等待1秒
snprintf(start, MAX_SIZE, "%s [pid:%d][messageID:%d] ", message, getpid(), cnt++); // 格式化字符串并写入共享内存
}
// 正常情况下,这里不应该直接退出程序,因为会导致共享内存无法正确释放
// 应该在适当的时候调用detach函数来分离共享内存
// detach(start); // 分离共享内存,释放资源
// delShm(shmid); // 删除共享内存,释放资源
// return 0; // 程序正常退出返回值
}
comm.hpp
#ifndef _COMM_HPP_ // 预处理指令,防止头文件内容被重复包含
#define _COMM_HPP_
#include <cstdio> // 包含C语言标准输入输出库
#include <iostream> // 包含C++标准输入输出流库
#include <cstring> // 包含C语言字符串处理库
#include <cerrno> // 包含错误号定义
#include <sys/shm.h> // 包含系统共享内存操作的函数和数据结构定义
#include <sys/ipc.h> // 包含系统IPC(进程间通信)操作的数据结构和宏定义
#define PATHNAME "." // 定义ftok函数使用的路径名,这里使用当前目录
#define PROJ_ID 0x66 // 定义ftok函数使用的项目ID
#define MAX_SIZE 4096 // 定义共享内存的最大大小为4096字节
// 错误处理函数,打印错误信息并退出程序
void puts_Error()
{
std::cerr << errno << " : " << strerror(errno) << std::endl;
exit(1);
}
// 获取key值的函数,使用ftok函数生成一个唯一的key
key_t getKey()
{
key_t k = ftok(PATHNAME,PROJ_ID);
if(k<0)puts_Error(); // 如果ftok失败,调用错误处理函数
return k;
}
// 辅助函数,用于创建或获取共享内存
int getShmHelper(int key,int flags)
{
int shmid = shmget(key,MAX_SIZE,flags); // 调用shmget函数创建或获取共享内存
if(shmid < 0)puts_Error(); // 如果shmget失败,调用错误处理函数
return shmid;
}
// 获取共享内存的函数,使用IPC_CREAT标志创建共享内存
int getShm(key_t k)
{
return getShmHelper(k,IPC_CREAT); // 调用辅助函数,创建共享内存
}
// 创建共享内存的函数,使用IPC_CREAT|IPC_EXCL|0600标志创建共享内存
int createShm(key_t k)
{
return getShmHelper(k,IPC_CREAT|IPC_EXCL|0600); // 调用辅助函数,创建共享内存
}
// 附加共享内存到进程地址空间的函数
void *attchShm(int shmid)
{
void *mem = shmat(shmid,nullptr,0); // 调用shmat函数将共享内存附加到进程地址空间
if((long long )mem == -1) // 如果shmat失败
puts_Error(); // 调用错误处理函数
return mem; // 返回共享内存的起始地址
}
// 分离共享内存的函数
void detach(void *start)
{
if(shmdt(start) == -1) // 如果shmdt失败
{
puts_Error(); // 调用错误处理函数
}
}
// 删除共享内存的函数
void delShm(int shmid)
{
if(shmctl(shmid,IPC_RMID,nullptr) == -1) // 如果shmctl失败
puts_Error(); // 调用错误处理函数
}
#endif // _COMM_HPP_
什么是进程互斥和进程同步?
进程互斥:
进程互斥是指多个进程在访问共享资源时,确保同一时刻只有一个进程可以访问这些资源,以避免数据不一致的情况。那些在同一时刻只允许一个进程使用的资源称为临界资源(或互斥资源)。访问这些资源的程序段称为临界区。
进程互斥可以通过使用锁(例如信号量、互斥量、条件变量)来实现,确保在一个进程执行临界区代码时,其他进程无法进入临界区。
进程同步:
进程同步是为了使多个进程能够协调执行,按照某种顺序来共享资源或通信。它确保进程之间的执行有序,解决生产者-消费者问题等需要数据和资源有序访问的情况。
进程同步常通过信号量、事件、条件变量等机制来实现,以确保进程间的相互依赖和协调。
2. 为什么共享内存没有进行同步和互斥,而管道有?
共享内存:
共享内存是一种高效的进程间通信(IPC)方式,因为内存是直接访问的,速度非常快。但由于共享内存本身不具备同步和互斥功能,多个进程同时对共享内存进行读写操作时可能导致数据竞争问题。因此,需要使用额外的同步机制(如信号量、互斥量)来控制对共享内存的访问,确保数据一致性。
管道:
管道是进程间通信的一种机制,系统会自动管理同步,防止进程间的数据竞争。当一个进程向管道写入数据时,另一个进程可以从管道读取数据,并且这些操作是有序的,从而避免了并发读写的冲突问题。