Linux 多线程

如何看待地址空间和页表?

  • 地址空间是进程能看到的资源窗口

  • 页表决定进程拥有资源的情况

  • 合理的对地址空间+页表进行资源的划分。这样就可以对一个进程的所有资源进行分类

虚拟地址到物理地址的映射过程

虚拟地址的分段

一个虚拟地址在32位架构下通常是32位的二进制数(64位架构下则是64位),可以分为三个部分:

  • 页目录索引(Page Directory Index):高10位,用于在页目录中查找页表的地址。

  • 页表索引(Page Table Index):中间10位,用于在页表中查找页框(Page Frame)的地址。

  • 页内偏移(Offset):低12位,用于定位页框中的具体字节位置。

页目录(Page Directory)

  • 页目录是一个数据结构,用于记录页表的地址。

  • 页目录的每一项(目录项)包含一个页表的物理地址和一些标志位(如有效位、读/写权限等)。

  • 进程的页目录存储在内存中,页目录的起始地址由CPU的CR3寄存器指定。

页表(Page Table)

  • 页表是一个数据结构,用于记录物理内存页框的地址。

  • 页表的每一项(表项)包含一个页框的物理地址和一些标志位(如有效位、读/写权限等)。

  • 每个进程都有自己独立的页表,用于映射其虚拟地址空间。

页框(Page Frame)

  • 页框(页帧)是物理内存中的一个固定大小的块,通常为4KB。

  • 页表指向的页框是实际存储数据的地方。

虚拟地址到物理地址的转换

1.页目录((包含一个页表的物理地址))查找->CPU首先使用虚拟地址的高10位作为页目录索引,在页目录中找到相应的目录项

2.页表((含一个页框的物理地址))查找->CPU使用虚拟地址的中间10位作为页表索引,在找到的页表中查找相应的页表项

3.计算物理地址->CPU将页表项中的页框地址与虚拟地址的低12位(页内偏移)组合,得到最终的物理地址

什么叫做多线程

进程和线程的定义

  • 进程是操作系统资源分配的基本单位,每个进程有独立的地址空间和系统资源(如文件描述符、内存等)。进程管理着这些资源,并且是系统中最重要的资源分配实体。

  • 线程是 CPU 调度的基本单位,是进程中的一个执行流,线程共享进程的资源(地址空间、文件描述符等),但也有自己的私有资源(如线程 ID、栈、寄存器集合等)。一个进程可以包含多个线程,这些线程共同执行任务。

Linux 中线程和进程的实现

Linux 线程模型

  • 在 Linux 内核中,没有真正意义上的线程概念,所有的线程都是通过轻量级进程(LWP,Light Weight Process)模拟的。每个线程在内核中都是一个独立的任务控制块(Task Control Block),类似于进程的 PCB(Process Control Block)。

  • 由于线程与进程有很多重叠的地方,Linux 直接复用了进程的 PCB 来实现线程。这样做的好处是简化了实现,降低了维护成本。

线程和进程切换的差异

  • 进程切换:涉及切换内存地址空间、更新 CPU 的页表寄存器、刷新缓存(cache)等。

  • 线程切换:只需要切换线程的私有资源(如寄存器、栈指针),无需切换内存地址空间和(刷新缓存),因此开销较小。

线程的使用场景

  • CPU 密集型任务:可以将任务分解为多个线程,利用多核处理器的并行性提高效率。

  • I/O 密集型任务:可以通过多线程实现 I/O 操作的重叠,提高程序响应速度,例如同时执行文件下载和数据处理。

很多情况下线程有自己的私有资源,什么资源是线程私有的呢?

  • PCB属性

  • 独立的栈

  • 一组寄存器(用于保存线程的上下文)

  • 错误码(errno)

Linux 中进程和线程的区别

资源管理

  • 进程:拥有独立的内存地址空间和系统资源,无法与其他进程共享这些资源。

  • 线程:共享进程的内存空间和大部分系统资源(如文件描述符、地址空间等),但有独立的栈和寄存器。

调度单位

  • 进程:是资源分配的基本单位。

  • 线程:是 CPU 调度的基本单位,进程中的多个线程共享该进程的资源。

创建开销

  • 进程:创建进程需要分配独立的资源和内存地址空间,因此代价较大。

  • 线程:创建线程的代价较小,因为线程共享进程的大部分资源,不需要额外的内存空间分配。

切换代价

  • 进程:进程切换涉及切换内存地址空间、页表等操作,开销较大。

  • 线程:线程切换无需切换内存地址空间,只需切换私有资源(如寄存器、栈指针),因此开销较小。

线程异常与多线程影响

在多线程环境中,一个线程出现异常一般不会直接影响其他线程。然而,如果在多线程程序中调用了 exit() 函数,它会终止整个进程,导致所有线程结束。为避免这一问题,POSIX 线程库提供了 pthread_exit 函数来单独终止某个线程而不影响其他线程:

#include <pthread.h>
[[noreturn]] void pthread_exit(void *retval);

线程等待与资源回收

与进程类似,线程的退出也需要进行回收,以避免资源泄漏。例如,使用 pthread_join 函数可以等待特定线程完成并获取它的退出信息:

#include <pthread.h>
int pthread_join(pthread_t thread, void **retval);
  • pthread_t thread: 表示目标线程的 ID。

  • void **retval: 指向一个指针的指针,用于接收目标线程的返回值。

pthread_join 会阻塞当前线程,直到目标线程结束,从而确保内核资源如 PCB(进程控制块)得到回收,防止内存泄漏。

void *void ** 的通用性

  • void *:表示通用指针类型,允许转换为任何数据类型。线程函数的参数 (void *args) 支持任意类型的参数传递。

  • void **:在 pthread_join 中用于接收目标线程的返回值,指向 pthread_exit 返回的值的地址。

(void *) 的转换用法

  • 整数转换:如 (void *)100 标识特定的退出状态码,作为简洁的标识方式。

  • 指针转换:可以返回动态分配的内存地址(如 malloc 返回的地址),使得 pthread_join 在获取返回值后能够访问该数据。

可重入性

若一个函数可以同时被多个线程调用而不会出现数据冲突,则该函数称为可重入函数。函数中的局部变量通常为每个线程分配独立的内存空间,以避免数据共享冲突。

多线程常用接口

创建线程:pthread_create

用于创建新的线程,使程序可以并行执行任务:

int pthread_create(pthread_t *restrict thread, const pthread_attr_t *restrict attr,
                   void *(*start_routine)(void *), void *restrict arg);
  • thread: 指向 pthread_t 变量的指针,保存新线程的 ID。

  • attr: 线程属性指针;为 NULL 时使用默认属性。

  • start_routine: 新线程启动时调用的函数。

  • arg: 传递给 start_routine 的参数。

线程退出:pthread_exit

用于结束一个线程,不影响其他线程:

[[noreturn]] void pthread_exit(void *retval);

线程等待:pthread_join

阻塞当前线程直到目标线程结束,从而确保资源回收。

获取线程 ID:pthread_self

获取调用线程的线程 ID:

pthread_t pthread_self(void);

分离线程:pthread_detach

默认情况下,新创建的线程是可 join 的(joinable),需要 pthread_join 回收资源。

不需要返回值的情况下 使用线程 分离 join 相当

分离线程可在其退出时自动释放资源,避免内存泄漏:

int pthread_detach(pthread_t thread);

自分离线程示例:pthread_detach(pthread_self());

线程取消:pthread_cancel

用于请求取消目标线程的执行。若线程被取消,退出码为 PTHREAD_CANCELED

int pthread_cancel(pthread_t thread);

__thread 修饰的变量称为 线程局部存储变量(Thread-Local Storage,TLS)。这种变量在每个线程中拥有独立的实例,不同线程对同一个 __thread 变量的访问不会互相影响,适用于需要每个线程独立保存的数据。

创建一批线程Demo 代码

#include <iostream>  // 包含输入输出流的头文件
#include <pthread.h> // POSIX线程库头文件,用于多线程编程
#include <vector>    // 包含向量容器的头文件
#include <cstdio>    // 包含C标准库输入输出函数的头文件
#include <unistd.h>  // UNIX标准头文件,包含POSIX操作系统API
​
using namespace std; // 使用标准命名空间
​
// 定义一个结构体,用于存储线程数据
class TreadData {
public:
    long long number; // 线程编号
    pthread_t tid;   // 线程ID
    char nameBuffer[64]; // 用于存储线程名称的缓冲区
};
​
// 定义线程执行的函数
void *start_Thread(void *td) {
    TreadData *ttd = (TreadData *)td; // 将void指针转换为TreadData指针
​
    while (true) { // 无限循环,线程将持续运行
        cout << "New Thread Success: " << ttd->tid << ":" << ttd->nameBuffer << endl;
        sleep(1); // 休眠1秒,减少CPU占用
    }
​
    return td; // 返回传入的线程数据指针
}
​
#define NUM 10 // 定义线程数量常量
​
int main() {
    // 创建一个向量,用于存储线程数据指针
    vector<TreadData *> threads;
    for (int i = 0; i < NUM; i++) {
        TreadData *td = new TreadData(); // 创建新的线程数据对象
​
        td->number = i + 1; // 设置线程编号
        snprintf(td->nameBuffer, sizeof(td->nameBuffer), "Thread ID: %d", td->number); // 设置线程名称
        pthread_create(&td->tid, nullptr, start_Thread, td); // 创建线程,传入线程数据
        cout << "----------------------------" << td->tid << " make successful" << endl; // 输出线程创建成功的信息
    }
​
    while (true) { // 主线程也进入无限循环
        cout << "new thread create success, name: main thread" << endl;
        sleep(1); // 休眠1秒,减少CPU占用
    }
​
    return 0; // 程序正常退出返回值
}

创建出新线程谁先运行呢?不确定

C++11线程接口

#include <iostream>
#include <unistd.h>
#include <thread>
​
​
using namespace std;
void Tread_Run()
{
    while(true)
    {
        std::cout << " 我是新线程" <<endl;
        sleep(1);
    }
}
int main()
{
    //任何语言 要是在Linux 中使用多线程 必须要使用othread库 
    // 而C++ 11中的多线程库是本质上对pthread的封装 而pthread 更加暴力
    thread T1(Tread_Run); // 创建新线程
​
    while(true) // 死循环  正常来说这边是跳不出来的  但是这里面有多线程
    {
​
        cout << "我是 main Tread " << endl;
        sleep(1);
    }
    T1.join();
    return 0;
}

用户线程 : 内核线程 = 1:1,

用户级线程ID 是什么?

是一个分配给线程的唯一标识符,用来区分每个线程。它通常在用户空间内由线程库维护,不涉及操作系统内核的管理。

1:1 模型下,每一个用户线程直接对应一个内核线程,因此每创建一个用户线程,都会同步创建一个对应的内核线程。这一模型的特点在于线程调度和管理直接依赖操作系统的内核,从而能充分利用多核CPU。

封装Thread()库

makefile

myThread:myThread.cc
	g++ -o $@ $^ -std=c++11 -lpthread
.PHONY:clean
clean:
	rm -rf myThread

Thread.hpp

#pragma once
#include <iostream>
#include <cstdio>
#include <string>
#include <cstring>
#include <cassert>
#include <functional>
​
class Thread;
​
class Context
{
public:
    Thread *this_;
    void * arges_;
public:
    Context():this_(nullptr),arges_(nullptr)
    {}
    ~Context()
    {}
};
​
class Thread
{
public:
    // using func_t = std::function<void*(void*)>;
    typedef std::function<void*(void*)> func_t;
    const int num = 1024;
public:
    Thread(func_t func,void *args = nullptr,int number=0):func_(func),args_(args)
    {
        char buffer[num];
        snprintf(buffer,sizeof buffer , "thread-%d",number);
        name_ = buffer;
​
        start();
    }
    // 在类内创建线程  想让线程执行对应的方法 将方法设置成 static
    static void * start_routine(void * args) // 类内 成员
    {
        Context *ctx = static_cast<Context*>(args);
        void *ret = ctx->this_->run(ctx->arges_);
        delete ctx;
        return ret;
    }
​
    void *run(void * args)
    {
        return func_(args);
    }
​
    void start()
    {
        Context *cxt = new Context();
        cxt->this_ = this;
        cxt->arges_ = args_;
        int n = pthread_create(&tid_,nullptr,start_routine,cxt);
        assert(n == 0);
        (void)n;
    }
​
    void join()
    {
        int n = pthread_join(tid_,nullptr);
        assert(n == 0);
        (void)n;
    }
    ~Thread()
    {
​
    }
​
private:
    std::string name_;
    // static func_t func_;
    // static void *args_;
​
    func_t func_;
    void *args_;
​
    pthread_t tid_;
};

myThread.cc

#include "Thread.hpp"

#include <memory>
#include <cstdio>
#include <iostream>
#include <string>
#include <unistd.h>

void *Thread_run(void *args)
{
    std::string Work = static_cast<const char*>(args);
    while(true)
    {
        std::cout << "I am new Tread :" << Work << std::endl;
        sleep(1);
    }
}

int main()
{
    std::unique_ptr<Thread> thread1(new Thread(Thread_run,(void*)"hello Hread1",1));
    std::unique_ptr<Thread> thread2(new Thread(Thread_run,(void*)"hello Hread2",2));
    std::unique_ptr<Thread> thread3(new Thread(Thread_run,(void*)"hello Hread3",3));
    thread1->join();
    thread2->join();
    thread3->join();
}

std::function<void*(void*)> 定义了一个可调用对象的类型,该对象接受一个 void* 参数并返回一个 void*

typedef std::function<void*(void*)> func_t;using func_t = std::function<void*(void*)>; 都是起别名的方式。

线程互斥与安全性问题

  • 全局变量与数据安全:多个线程交替执行时,访问未保护的全局变量可能导致数据不一致,因此需要采取保护措施。

  • 线程交叉执行:通过调度器让多个线程频繁切换执行,使各线程交叉运行,从而优化资源的使用。

  • 线程切换的条件

    • 时间片用完。

    • 更高优先级的线程到来。

    • 线程进入等待状态。

  • 内核态到用户态切换:线程在从内核态返回用户态时,调度程序会检测调度状态并可能触发进程切换。

关键概念

  • 临界资源:被多个线程共享的资源,称为临界资源。

  • 临界区:访问临界资源的代码块,称为临界区。

  • 互斥:确保在任意时刻只有一个线程可以进入临界区。

  • 原子性:操作不可被打断,要么完全执行完,要么不执行。

互斥量 (Mutex)

  • 互斥量的作用:在临界区代码中加入互斥锁,防止其他线程进入临界区,确保临界资源的安全。

  • 共享变量:在多线程之间共享的变量需要通过互斥量进行保护,以避免数据竞争和不一致。

使用互斥量保护代码流程

  1. 非临界区(可以执行并发代码)

  2. 加锁lock

  3. 临界区(仅允许一个线程进入,其他线程被阻止)

  4. 解锁unlock

  5. 返回非临界区

互斥量的初始化方式

静态分配

pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
  • 适用于全局变量。

  • 静态初始化的互斥量无需销毁。

动态分配

int pthread_mutex_init(pthread_mutex_t *mutex, NULL);
  • 适用于需要手动初始化的互斥量。

销毁互斥量

  • 无需销毁:使用 PTHREAD_MUTEX_INITIALIZER 初始化的互斥量无需手动销毁。

  • 注意事项

    • 不能销毁一个已加锁的互斥量。

    • 销毁的互斥量不能再被任何线程加锁。

加锁与解锁

  • 加锁

    int pthread_mutex_lock(pthread_mutex_t *mutex);
    • 如果互斥量处于未锁状态,则该函数会将互斥量锁定,返回成功。

    • 如果互斥量已被其他线程锁定,则调用该函数会阻塞,等待互斥量解锁。

  • 解锁

    int pthread_mutex_unlock(pthread_mutex_t *mutex);
    • 解锁后,其他阻塞的线程可以尝试重新获取锁,进入临界区。

函数是可重入的,那就是线程安全的 函数是不可重入的,那就不能由多个线程使用,有可能引发线程安全问题 如果一个函数中有全局变量,那么这个函数既不是线程安全也不是可重入的。

对Mutex进行封装

#pragma once
​
#include <iostream>
#include <pthread.h>
​
using namespace std;
class Mutex
{
public:
    Mutex(pthread_mutex_t * lock_p = nullptr)
        :lock_p_(lock_p)
    {}
​
    void lock()
    {
        if(lock_p_) pthread_mutex_lock(lock_p_);
    }
    void unlock()
    {
        if(lock_p_) pthread_mutex_unlock(lock_p_);
    }
    ~Mutex(){
​
    }
private:
    pthread_mutex_t * lock_p_;
};
​
class LockGuard
{
private:
    Mutex mutex_;
public:
    LockGuard(pthread_mutex_t *mutex):mutex_(mutex)
    {     
        // RAII 的设计模式
        mutex_.lock();  // 构造函数中进行枷锁
    }
    ~LockGuard()
    {
        mutex_.unlock();   // 在析构函数中进行解锁
    }
};


如何看待锁?

锁本身就是一个共享资源!全局的变量是要被保护的。锁是用来保护全局资源的,所本身也是全局资源,锁的安全是谁来保护的呢?

在lock 和 unlock 加锁的过程都是原子操作

如果申请成功继续执行 锁申请失败 执行流就会阻塞

谁有锁 谁才能进入临界区


线程死锁

在多线程环境中,当一个线程持有自己的锁却请求其他线程的锁,而其他线程持有其自己的锁并同时请求第一个线程的锁时,可能会导致死锁。死锁会使各线程相互等待,阻止线程释放资源,从而陷入无限阻塞状态。

使用锁的必要性:多线程访问共享资源时,使用锁可以保护临界资源,防止数据不一致。但如果锁的使用不当,会导致死锁现象。

死锁的四个必要条件

  1. 互斥:资源每次只能被一个线程使用。

  2. 请求与保持:一个线程因请求资源而被阻塞时,保持已获得的资源不释放。

  3. 不剥夺:线程获得的资源在未使用完之前不会被强行夺走。

  4. 环路等待:线程之间形成一个环状等待关系,彼此相互等待对方持有的资源。

避免死锁的策略

  1. 破坏死锁的必要条件:在设计系统时,可以通过打破死锁的四个条件来避免死锁。

  2. 加锁顺序一致:确保所有线程按相同的顺序请求和释放资源,避免形成循环等待。

  3. 避免锁未释放的情况:确保每个锁的加锁与解锁成对出现,防止资源长时间被持有。

  4. 资源一次性分配:在开始时一次性分配所有必要资源,避免线程在执行过程中再次请求新资源。


Linux线程同步

通过 pthread_cond_wait 保持线程同步运行demo

#include <iostream>
#include <pthread.h>
#include <string> 
#include <unistd.h>
#include <cstring>
​
using namespace std;
​
pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
pthread_cond_t cond = PTHREAD_COND_INITIALIZER;
​
int  tickets = 1000;
​
void * start_Routine(void *args)
{
​
    std::string name = static_cast<const char *>(args);
    while(true)
    {
        pthread_mutex_lock(&mutex);
        pthread_cond_wait(&cond,&mutex);
        cout <<  name << "->" <<  tickets << std::endl;
        tickets --;
        pthread_mutex_unlock(&mutex);
    }
}
int main()
{
    pthread_t t[5];
    
    for(int i = 0;i < 5;i ++)
    {
        string* s = new string("Thread" + to_string(i + 1));
        pthread_create(&t[i], nullptr, start_Routine, (void*)s->c_str());
​
        // char * name = new char[64];
        // snprintf(name,sizeof name ,"Tread %d :",i+1);
        // pthread_create(&t[i],nullptr,start_Routine,name);
​
​
        cout << "Tread " << i+1 << "is Creat!" << endl; 
    }
    while(true)
    {
        sleep(1);
        pthread_cond_signal(&cond);
        cout << " main thread ................." << endl;
    }
​
    for(int i = 0;i , 5;i ++)
    {
        pthread_join(t[i],nullptr);
​
    }
    return 0;
}

基于阻塞队列(blockingQueue)的 生产者消费者模型

[chen@RE blockQueue]$ tree .

├── BlockQueue.hpp

├── log.txt

├── mainCp

├── mainCp.cc

├── makefile

└── Task.hpp

makefile

mainCp:mainCp.cc
    g++ -o $@ $^ -std=c++11 -lpthread
.PHONY:clean
clean:
    rm -rf mainCp

mainCp.cc

#include "BlockQueue.hpp"
#include <unistd.h>
#include <sys/types.h>
#include <ctime>
#include "Task.hpp"
​
​
​
template<class C,class S>
class BlockQueues
{
public:
    BlockQueue<C> *c_bq;
    BlockQueue<S> *s_bq;
};
​
void *productor(void * bqs_)
{
    BlockQueue<CalTask> *bq = (static_cast<BlockQueues<CalTask,SaveTask> *>(bqs_))->c_bq;
    // BlockQueue<SaveTask> *save_bq = (static_cast<BlockQueues<CalTask,SaveTask> *>(bqs_))->s_bq;
​
​
    while(true)
    {
​
        sleep(3);
        // 生产活动
        int x = rand() % 100 + 1;
        int y = rand() % 10;
        int operCode = rand() % oper.size();
        CalTask t(x,y,oper[operCode],mymath);
    
​
​
        bq->push(t);
        std::cout << "productor Thread 生产计算任务" << t.toTaskString() << std::endl;
​
        // sleep(1);
       
    }
    
    return nullptr;
}
​
​
​
void * consumer(void * bqs_)
{
    BlockQueue<CalTask> *bq = (static_cast<BlockQueues<CalTask,SaveTask> *>(bqs_))->c_bq;
    BlockQueue<SaveTask> *save_bq = (static_cast<BlockQueues<CalTask,SaveTask> *>(bqs_))->s_bq;
​
​
    while(true)
    {
        CalTask t;
        bq->pop(&t);
​
        sleep(1);
​
        std::string result = t();
        std::cout << "Cal Tread, 完成计算任务 : " << result << "...///down" << std::endl;
​
        SaveTask save(result,Save);
        save_bq->push(save);
        std::cout << "Cal Thread 任务推送完成 :" << std::endl;
         
    }
    return nullptr;
​
}
void * saver(void * bqs_)
{
​
    BlockQueue<SaveTask> * save_bq = (static_cast<BlockQueues<CalTask,SaveTask>*>(bqs_)->s_bq);
​
    while(true)
    {
        SaveTask st;
        save_bq->pop(&st);
        st();
        std::cout << "Save Thread, 保存任务完成" << std::endl;
    }
​
    return nullptr;
​
}
​
​
​
int main()
{
​
​
    srand((unsigned long)time(nullptr) ^ getpid());
​
    BlockQueues<CalTask,SaveTask> bqs;
    bqs.c_bq = new BlockQueue<CalTask>();
    bqs.s_bq = new BlockQueue<SaveTask>();
​
    pthread_t c[2],p[3],s;
    pthread_create(p,nullptr,productor,&bqs);
    pthread_create(p+1,nullptr,productor,&bqs);
    pthread_create(p+2,nullptr,productor,&bqs);
    pthread_create(c,nullptr,consumer,&bqs);
    pthread_create(c+1,nullptr,consumer,&bqs);
    
    pthread_create(&s,nullptr,saver,&bqs);
​
    pthread_join(c[0],nullptr);
    pthread_join(c[1],nullptr);
    pthread_join(p[0],nullptr);
    pthread_join(p[1],nullptr);
    pthread_join(p[2],nullptr);
    pthread_join(s,nullptr);
​
    delete bqs.c_bq;
    delete bqs.s_bq;
​
    return 0;
}

BlockQueue.hpp

#pragma once
​
#include <iostream>
#include <queue>
#include <pthread.h>
​
​
​
const int gmaxcap = 30;
​
​
template<class  T>
class BlockQueue
{
public:
    BlockQueue(const int &maxcap = gmaxcap):_maxcap(maxcap)
    {
        pthread_mutex_init(&_mutex,nullptr);
        pthread_cond_init(&_pcond,nullptr);
        pthread_cond_init(&_ccond,nullptr);
    }
    ~BlockQueue()
    {
        pthread_mutex_destroy(&_mutex);
        pthread_cond_destroy(&_pcond);
        pthread_cond_destroy(&_ccond);
    
    }
​
public:
    void push(const T& in)
    {
        pthread_mutex_lock(&_mutex);
​
        // 使用while 判断
        while(is_full())
        {
            ///  pthread_cond_wait 函数的第二个参数是我们正在使用的互斥锁
            /// 调用函数的时候 会使用原子性的方式将锁释放,将自己挂起
            ///  pthread_cond_wait: 函数被唤醒的时候 会自动重新获取传入的锁
            pthread_cond_wait(&_pcond,&_mutex);    // 生产条件不满足,无法生产 我们的生产者进行等待
​
        }
        // 走到这里一定没有满
        _q.push(in);
        // 阻塞队列里面一定有数据
        // pthread_cond_signal()  这个函数可以放在临界区 也可以放在外部
        
        //当一个线程完成了某些工作,并且需要通知其他线程继续执行时,
        // 它会调用 pthread_cond_signal 来唤醒等待该条件变量的线程。
        pthread_cond_signal(&_ccond);
        pthread_mutex_unlock(&_mutex);        
​
    }
​
    void pop(T * out)
    {
        pthread_mutex_lock(&_mutex);
​
        while(is_empty())
        {
            pthread_cond_wait(&_ccond,&_mutex);
        }
        
        *out = _q.front();
        _q.pop();
​
        pthread_cond_signal(&_pcond);
        pthread_mutex_unlock(&_mutex);
    }
​
private:
    bool is_empty()
    {
        return _q.empty();
    }
    bool is_full()
    {
        return _q.size() == _maxcap;
    }
​
​
private:
    std::queue<T> _q;
    int _maxcap;
    pthread_mutex_t _mutex;
    pthread_cond_t _pcond;
    pthread_cond_t _ccond;
};
​
​
​

Task.hpp

#pragma once
#include <iostream>
#include <string>
#include <cstdio>
#include <functional>
​
class CalTask
{
    // using func_t = std::function<int(int,int ,char)>;  // 表示 指向int func(int, int, char); 类型的函数
    typedef std::function<int(int,int,char)> func_t;
​
public:
    CalTask()
    {}
    CalTask(int x,int y,char op,func_t func)
        :_x(x),_y(y),_op(op),_callback(func)
    {}
​
    std::string operator()()
    {
        int result = _callback(_x,_y,_op);  // 调用回调函数
        char buffer[1024];
        snprintf(buffer,sizeof buffer,"%d %c %d = %d",_x,_op,_y,result);
        return buffer;
    
    }
​
    std::string toTaskString()
    {
        char buffer[1024];
        snprintf(buffer,sizeof buffer,"%d %c %d = ?",_x,_op,_y);
        return buffer;    
    }
​
​
private:
    int _x;
    int _y;
    char _op;
    func_t _callback;
};
​
const std::string oper = "+-x/%";
​
​
int mymath(int x,int y,char op)
{
    int result = 0;
​
    switch (op)
    {
        case '+':
            result = x + y;
            break;
        case '-':
            result = x - y;
            break;
        case 'x':
            result = x * y;
            break;
        case '/':
            {
                if(y == 0){
                    std::cout << " err" << std::endl;
                    result = -999;
                }
                else 
                    result = x / y;
            }
            
            break;
        case '%':
            {
                if(y == 0){
                    std::cout << " err" << std::endl;
                    result = -999;
                }
                else 
                    result = x % y;
            }
            
            break;
        default:
            // do nothing
            break;
    }
​
​
​
    return result;
​
​
}
​
​
​
class SaveTask
{
​
typedef std::function<void(const std::string&)> func_t;
​
public:
​
    SaveTask()
    {}
    SaveTask(const std::string & massage, func_t func)
        :_massage(massage),_func(func)
    {}
​
    void operator()()
    {
        _func(_massage);
    }
​
private:
    func_t _func;
    std::string _massage;
};
​
​
​
void Save(const std::string & massage){
​
    const std::string target = "./log.txt";
​
    FILE * fp = fopen(target.c_str(),"a+");
​
    if(!fp)
    {
        std::cerr << " fopen error" << std::endl;
        return;
    }
​
    fputs(massage.c_str(),fp);
    fputs("\n",fp);
    fclose(fp);
​
}