信号量和线程池
什么是同步和互斥 ?
同步:确保多个线程按照特定顺序执行,以协调它们之间的操作,处理依赖关系。
互斥:确保在同一时刻只有一个线程能够访问共享资源,以防止数据竞争和不一致性。
信号量
信号量用于同步操作,确保对共享资源的无冲突访问。POSIX信号量和SystemV信号量的功能相同,但POSIX信号量可以用于线程间同步。
关键概念
临界资源:在访问临界资源时,必须确保资源满足特定条件。资源的可用性在操作前不可直接得知。
加锁与检测:
为了访问临界资源,线程首先需要加锁。
加锁后,线程可以检测资源状态,但这本身也算是对临界资源的访问。
资源可能尚未准备就绪,因此需要在加锁后进行状态检测,根据结果决定后续操作。
并发访问:允许多个线程并发访问公共资源的不同区域,只要这些访问不冲突。
信号量的定义
本质:信号量是一种计数器,用于衡量临界资源的数量。
申请机制:访问公共资源前,线程必须申请信号量,这类似于预定机制,确保未来可拥有资源的一部分。
资源状态:
申请成功:表示有可用资源。
申请失败:表示条件不满足,线程需等待。
计数器操作
信号量的增减操作(sem++ / sem--)必须保证原子性。
资源申请使用P操作(等待)和资源归还使用V操作(释放),这两个操作也称为PV原语。
基于环形队列 的生产消费模型
MAKEFILE
main:main.cc
g++ -o $@ $^ -std=c++11 -lpthread
.PHONY:clean
clean:
rm -rf main
main.cc
#include "RingQueue.hpp"
#include <ctime>
#include <unistd.h>
#include <string>
#include <iostream>
#include "Task.hpp"
std::string SelfName()
{
return "thread[0x" + std::to_string(pthread_self()) +"]";
}
void *ProductRoutine(void * rq)
{
RingQueue<Task> * ringqueue = static_cast<RingQueue<Task> *>(rq);
while(true)
{
int x = rand() % 10;
int y = rand() % 5;
char op = oper[rand() % oper.size()];
Task t(x,y,op,mymath);
ringqueue->push(t);
std::cout << SelfName() <<",OK Create :" << t.toTaskString() << std::endl;
sleep(2);
}
}
void *ConsumerRoutine(void * rq)
{
RingQueue<Task> * ringqueue = static_cast<RingQueue<Task> *>(rq);
while(true)
{
Task t;
ringqueue->pop(&t);
std::string result = t();
std::cout << SelfName() << ", make:ojbk :" << result << std::endl;
}
}
int main(){
srand((unsigned int)time(nullptr) ^ getpid() ^ pthread_self() ^ 0x12345);
RingQueue<Task> * rq = new RingQueue<Task>(); // 看到同一份资源
pthread_t p[8],c[4];
for(int i = 0;i < 8;i ++)pthread_create(p+i,nullptr,ProductRoutine,rq);
for(int i = 0;i < 4;i ++)pthread_create(p+i,nullptr,ConsumerRoutine,rq);
for(int i = 0;i < 8;i ++)pthread_join(p[i],nullptr);
for(int i = 0;i < 4;i ++)pthread_join(c[i],nullptr);
delete rq;
return 0;
}
RingQueue.hpp
#pragma once
#include <iostream>
#include <vector>
#include <cassert>
#include <semaphore.h>
#include <pthread.h>
static const int gcap = 5;
template<class T>
class RingQueue
{
private:
void P(sem_t &sem)
{
int n = sem_wait(&sem);
assert(n == 0);
(void)n;
}
void V(sem_t & sem)
{
int n = sem_post(&sem);
assert(n == 0);
(void)n;
}
public:
RingQueue(const int & cap = gcap):_queue(cap),_cap(cap)
{
int n = sem_init(&_spaceSem,0,_cap); // 初始空间为 cap
assert(n == 0);
n = sem_init(&_dataSem,0,0); // 初始资源为0
assert(n == 0);
_productorStep = _consumerStep = 0; // l r init = 0
}
~RingQueue(){
sem_destroy(&_spaceSem);
sem_destroy(&_dataSem);
pthread_mutex_destroy(&_pmutex);
pthread_mutex_destroy(&_cmutex);
}
// 生产者将数据推入队列
void push(const T & in)
{
// 等待空间信号量,确保有足够的空间进行生产
P(_spaceSem);
// 加锁,确保对共享资源(队列)的互斥访问
pthread_mutex_lock(&_pmutex);
// 将输入数据放入队列,更新生产者指针
_queue[_productorStep++] = in;
// 处理环形队列,确保生产者指针不超过队列容量
_productorStep %= _cap;
// 解锁,允许其他线程访问队列
pthread_mutex_unlock(&_pmutex);
// 释放数据信号量,表示队列中有新数据可供消费
V(_dataSem);
}
// 消费者从队列中取出数据
void pop(T *out)
{
// 等待数据信号量,确保队列中有数据可供消费
P(_dataSem);
// 加锁,确保对共享资源(队列)的互斥访问
pthread_mutex_lock(&_cmutex);
// 从队列中取出数据并存储到输出参数
*out = _queue[_consumerStep++];
// 处理环形队列,确保消费者指针不超过队列容量
_consumerStep %= _cap;
// 解锁,允许其他线程访问队列
pthread_mutex_unlock(&_cmutex);
// 释放空间信号量,表示队列中有空间可以生产新数据
V(_spaceSem);
}
private:
std::vector<T> _queue;
int _cap; // 最大大小
sem_t _spaceSem; // 生产者 要生产 需要空间资源
sem_t _dataSem; // 消费者 要消费 需要数据资源
int _productorStep; // 环形队列指针 | 生产者
int _consumerStep; // 环形队列指针 | 消费者
pthread_mutex_t _pmutex; // 加锁保护多个生产者
pthread_mutex_t _cmutex; // 加锁保护多个消费者
};
Task.hpp
#pragma once
#include <iostream>
#include <string>
#include <cstdio>
#include <functional>
class Task
{
using func_t = std::function<int(int ,int ,char)>;
public:
Task(){}
Task(int x,int y,char op,func_t func)
:_x(x),_y(y),_op(op),_callback(func)
{}
std::string operator()(){
int result = _callback(_x,_y,_op);
char buffer[1024];
snprintf(buffer,sizeof buffer,"%d %c %d = %d",_x,_op,_y,result);
return buffer;
}
std::string toTaskString()
{
char buffer[1024];
snprintf(buffer,sizeof buffer,"%d %c %d = ???",_x,_op,_y);
return buffer;
}
private:
int _x;
int _y ;
char _op;
func_t _callback;
};
const std::string oper = "+-*/%";
int mymath(int x, int y, char op)
{
int result = 0;
switch (op)
{
case '+':
result = x + y;
break;
case '-':
result = x - y;
break;
case '*':
result = x * y;
break;
case '/':
{
if (y == 0)
{
std::cerr << "div zero error!" << std::endl;
result = -1;
}
else
result = x / y;
}
break;
case '%':
{
if (y == 0)
{
std::cerr << "mod zero error!" << std::endl;
result = -1;
}
else
result = x % y;
}
break;
default:
// do nothing
break;
}
return result;
}
什么是池化技术?
池化技术是一种优化资源管理的方法,通过预先创建和管理资源的集合(池),以便在需要时重复利用这些资源,而不是频繁地创建和销毁。
例子 : 在 STL Vetcor 容器申请内存的时候扩容2倍 或者 1.5 用来备用
线程池:管理多个线程,避免频繁创建和销毁线程,提高性能。
连接池:管理数据库连接,减少连接建立和关闭的开销,提高数据库访问效率。
对象池:重用对象实例,减少内存分配和垃圾回收的压力。
池化技术可以显著提高系统的性能和资源利用率。
线程池
定义
线程池是一种线程使用模式,维护多个线程以等待管理者分配可并发执行的任务。通过线程池,可以避免在处理短时间任务时频繁创建和销毁线程的开销,从而提高性能并充分利用系统资源。
优点
减少线程调度开销,提升缓存局部性和整体性能。
防止过度调度,避免产生大量线程。
可用线程数量应根据可用的处理器、内核、内存、网络连接等资源进行调整。
应用场景
短时间任务:适合需要大量线程快速完成的小任务(如WEB服务器处理网页请求)。在这种情况下,任务数量巨大,而单个任务时间较短。
性能要求苛刻的应用:对服务器响应速度要求高的场合。
突发性大量请求:应对短时间内大量请求而不导致系统产生过多线程,避免内存溢出错误。
线程池种类
固定数量线程池:创建固定数量的线程,循环从任务队列中获取任务对象并执行。
示例
创建固定数量的线程池,循环从任务队列中获取任务对象。
获取到任务对象后,执行任务对象中的任务接口。
makefile
main:main.cc
g++ -o $@ $^ -std=c++11 -lpthread
.PHONY:clean
clean:
rm -rf main
main.cc
#include <iostream>
#include "Task.hpp"
#include "ThreadPool.hpp"
int main()
{
/// ---------------------------------
//运行到这的时候才会创建 (饿汉模式)
ThreadPool<Task>::getInstance()->run();
// ////////////////////////
int x,y;
char op;
while(1)
{
std:: cout << " input | x | y | z " << std::endl;
std:: cin >> x >> op >> y ;
Task t(x,y,op,mymath);
ThreadPool<Task>::getInstance()->push(t);
sleep(1);
}
return 0;
}
Thread.hpp
#pragma once
#include <iostream>
#include <cstdio>
#include <string>
#include <cstring>
#include <cassert>
#include <functional>
namespace TreadNS
{
using func_t = std::function<void*(void*)>;
const int num = 1024;
class Thread
{
private:
// 在类内创建线程 想让线程执行对应的方法 将方法设置成 static
static void *start_routine(void *args) // 类内 成员
{
Thread *_this = static_cast<Thread*>(args);
return _this->callback();
}
public:
Thread()
{
char namebuffer[num];
snprintf(namebuffer, sizeof namebuffer, "thread-%d", threadnum++);
name_ = namebuffer;
}
void start(func_t func,void *args = nullptr)
{
func_ = func;
args_ = args;
int n = pthread_create(&tid_, nullptr, start_routine, this);
assert(n == 0);
(void)n;
}
void join()
{
int n = pthread_join(tid_, nullptr);
assert(n == 0);
(void)n;
}
std::string threadname()
{
return name_;
}
~Thread()
{
}
void *callback()
{
return func_(args_);
}
private:
std::string name_;
func_t func_;
void *args_;
pthread_t tid_;
static int threadnum;
};
int Thread::threadnum = 1;
} /// endl namespace Thread
Task.hpp
#pragma once
#include <cstdio>
#include <iostream>
#include <string>
#include <functional>
class Task
{
using func_t = std::function<int(int,int,char)>;
public:
Task()
{}
Task(int x,int y,char op,func_t func)
:_x(x),_y(y),_op(op),_callback(func)
{}
~Task()
{}
std::string operator()()
{
int result = _callback(_x,_y,_op);
char buffer[1024];
snprintf(buffer,sizeof buffer,"%d %c %d = %d",_x,_op,_y,result);
return buffer;
}
std::string toTaskString()
{
char buffer[1024];
snprintf(buffer,sizeof buffer,"%d %c %d = ?",_x,_op,_y);
return buffer;
}
private:
int _x;
int _y;
char _op;
func_t _callback;
};
int mymath(int x, int y, char op)
{
int result = 0;
switch (op)
{
case '+':
result = x + y;
break;
case '-':
result = x - y;
break;
case '*':
result = x * y;
break;
case '/':
{
if (y == 0)
{
std::cerr << "div zero error!" << std::endl;
result = -1;
}
else
result = x / y;
}
break;
case '%':
{
if (y == 0)
{
std::cerr << "mod zero error!" << std::endl;
result = -1;
}
else
result = x % y;
}
break;
default:
// do nothing
break;
}
return result;
}
LockGuard.hpp
#pragma once
#include <iostream>
#include <pthread.h>
class Mutex
{
public:
Mutex(pthread_mutex_t *lock_p = nullptr):lock_p_(lock_p)
{}
~Mutex()
{}
void lock()
{
if(lock_p_)pthread_mutex_lock(lock_p_);
}
void unlock()
{
if(lock_p_)pthread_mutex_unlock(lock_p_);
}
private:
pthread_mutex_t *lock_p_;
};
class LockGuard // RAII
{
private:
Mutex mutex_;
public:
LockGuard(pthread_mutex_t *mutex): mutex_(mutex)
{
mutex_.lock();
}
~LockGuard()
{
mutex_.unlock();
}
};
线程安全设计模式
单例模式
单例模式的特点
唯一性:某些类需要确保只生成一个对象实例,例如服务器只需一个类实例来管理大数据。
应用场景:在服务器开发中,为了在内存中加载大量数据(上百 GB),常使用单例模式来集中管理这些数据。
单例模式的特点
唯一性:某些类需要确保只生成一个对象实例,例如一个男人只能有一个媳妇,或者服务器只需一个类实例来管理大数据。
应用场景:在服务器开发中,为了在内存中加载大量数据(上百 GB),常使用单例模式来集中管理这些数据。
饿汉式与懒汉式实现方式
饿汉方式 在对象创建时立即实例化单例,适用于不考虑延时加载的场景。
例子:吃完饭立刻洗碗,下次吃饭时可以直接拿碗吃饭。
代码实现:
template <typename T> class Singleton { static T data; public: static T* GetInstance() { return &data; } };
实例在类加载时创建,线程安全。
懒汉方式 仅在首次使用时实例化对象,以延迟加载来优化服务器启动速度。
例子:吃完饭先不洗碗,等到下次吃饭需要时再洗碗。
代码实现:
template <typename T> class Singleton { static T* inst; public: static T* GetInstance() { if (inst == NULL) { inst = new T(); } return inst; } };
懒汉方式在多线程中不安全,可能会导致多个实例的创建。
线程安全的懒汉方式
在多线程场景下,实现线程安全的懒汉式单例通常使用双重检查锁定来优化性能。
template <typename T>
class Singleton {
volatile static T* inst; // volatile 防止编译器优化
static std::mutex lock;
public:
static T* GetInstance() {
if (inst == NULL) {
lock.lock();
if (inst == NULL) {
inst = new T();
}
lock.unlock();
}
return inst;
}
};
加锁位置:需在实例创建前后加锁,以保证线程安全。
双重检查:在加锁前后都进行 if
判断,避免不必要的锁竞争,提升性能。
volatile 关键字:防止编译器优化导致的多线程不安全问题。
STL中容器是否是安全的 ?
不是 STL的设计初衷是性能。一旦涉及到加锁保证线程安全,对性能造成巨大影响。
对不同容器的加锁的方式也是不同的。
所以STL 默认不是线程安全的,如果在多线程下使用则需要自己保持线程安全。
智能指针是否是线程安全的?
unique_ptr
特点:
unique_ptr
实现了独占所有权,即同一时间内只能有一个unique_ptr
指向一个对象。线程安全性:由于
unique_ptr
只能在当前代码块中生效,且不允许拷贝(只能通过std::move
转移所有权),因此不存在共享引用的问题,也就不涉及线程安全问题。
shared_ptr
特点:
shared_ptr
允许多个智能指针共享一个对象的所有权,并通过内部的引用计数来跟踪当前对象的使用情况。线程安全性:由于 shared_ptr需要维护一个引用计数(use_count),在多线程环境下,当多个 shared_ptr同时对同一个对象增加或减少引用计数时,会存在线程安全问题。
解决方案:标准库的
shared_ptr
实现了原子操作(例如 CAS, Compare-And-Swap),保证了引用计数的增减操作是线程安全的。效率:通过使用原子操作,
shared_ptr
的引用计数操作既线程安全又高效。
自旋锁 (Spinlock)
自旋锁是一种低开销、轻量级的锁机制,允许线程在等待资源时进行自旋,即在锁上不断循环检查是否可以获取资源,而不是进入休眠状态等待唤醒。
适用场景
自旋锁通常适用于临界区非常短的情况,即线程占用临界资源的时间非常短,不需要长时间等待的情况。
在多核处理器上,当线程可以在不同的核上自旋时,自旋锁的性能较好。
挂起等待与自旋
挂起等待和自旋是两种不同的等待策略。
挂起等待:当资源不可用时,线程会被挂起,等待其他线程释放资源。系统会将挂起的线程放入等待队列,并在资源释放时唤醒该线程。
自旋等待:线程不被挂起,而是不断地循环检查锁状态,直到资源变为可用。
等待方式的选择 - 什么决定了最终的等待方式?
临界区持有时间长短:如果预期线程持有临界资源的时间较长(如文件I/O操作或网络请求),使用挂起等待更为高效,因为自旋会消耗CPU资源。在这种情况下,挂起线程可以将CPU资源让给其他任务,从而提高整体效率。
系统负载与资源竞争:在多核处理器系统中,如果竞争资源的线程数量多、系统负载较高时,挂起线程可以避免自旋造成的CPU资源浪费。而在轻量级、负载低的系统上,自旋锁可能会更合适。
上下文切换开销:挂起线程通常伴随上下文切换的开销。因此,如果预期等待时间较短(如毫秒级别),自旋等待可能更高效,因为自旋避免了上下文切换的成本。
等待时间长短问题
决定自旋等待时间与挂起时间的一个关键因素是线程持有临界区资源的时间长短。(由程序员自己测试决定)
时间阈值判断:设置一个时间阈值,当线程在锁前自旋等待超过阈值后,则进入挂起状态。例如,可以使用一种自适应的方式,首先进行短时间的自旋,超过阈值时再进入挂起等待。
动态调整:根据历史资源等待时间、系统负载等参数动态调整自旋和挂起的策略,以最小化CPU资源浪费和等待时间。
不同等待方式的应用场景
挂起等待 当一个线程需要等待的资源是磁盘I/O、网络资源等外部设备时,线程持有锁的时间通常较长,其他线程应该进入挂起等待,以节省CPU资源。
自旋等待 如果线程只是访问一个轻量级的内存数据结构(如更新一个共享变量),且预期锁的持有时间极短,则自旋锁能避免不必要的上下文切换开销。
#include <pthread.h>
#include <pthread.h>
int pthread_spin_init(pthread_spinlock_t *lock, int pshared);
int pthread_spin_destroy(pthread_spinlock_t *lock);
int pthread_spin_lock(pthread_spinlock_t *lock); // 用于获取自旋锁。如果锁已被其他线程持有,调用线程会一直自旋等待,直到锁被释放,然后获取锁。
int pthread_spin_trylock(pthread_spinlock_t *lock); // 尝试一次性获取自旋锁,如果锁已经被其他线程持有,函数立即返回而不进行等待。
int pthread_spin_unlock(pthread_spinlock_t *lock); //释放自旋锁,使其他等待的线程可以获取该锁。
读写锁
读写锁是一种常用的同步机制,用于保护共享资源的访问,以允许多个线程并发读取,但确保写操作是互斥的。即:
写独占:同一时刻只能有一个写者线程进入临界区。
读共享:多个读者线程可以同时读取,不会相互阻塞。
读优先:读者优先级更高。当有读者正在读取时,写者需等待所有读者释放读锁才能获得写权限。
读写锁适用于读多写少的场景,因为多个读者可以并发访问,从而提高性能。
读写锁的特性
读-写互斥:读操作和写操作不能同时进行,读写者间必须互斥。
写-写互斥:写者之间是互斥的,确保同一时间只有一个写操作。
读-读并行:多个读者之间可以并行访问,不会阻塞彼此。
读写锁与生产者-消费者模型的对比
在生产者-消费者模型中,消费者会拿走数据(消费数据),而读者只是读取数据,不会改变数据状态。
写者相当于“生产者”,会更新数据内容,因此写者与读者、写者与写者都必须互斥。
读者之间不必互斥,可以并行读取数据。
相应接口
#include <pthread.h>
int pthread_rwlock_rdlock(pthread_rwlock_t *rwlock);
int pthread_rwlock_tryrdlock(pthread_rwlock_t *rwlock);
int pthread_rwlock_wrlock(pthread_rwlock_t *rwloc