C++ 多线程同步问题?

#include <iostream>
#include <thread>
#include <string>
#include <vector>
#include <mutex>
#include <condition_variable>
#include <map>
#include <ctime>
#include <random>
#include <chrono>
#include <unistd.h>

#define NUM_OF_SAMPLES 50
#define MAX_NUM_OF_THREADS 6

using namespace std;

class Sensor {
private:
    string sensorType;

public:
    Sensor(string& type) : sensorType(type){}

    //Declare a virtual method to be overridden by derived classes:
    virtual double getValue() = 0;

    //Declare non-virtual method:
    string getType() {
        return sensorType;
    }
};

class TempSensor : public Sensor {
public:
    TempSensor(string& s) : Sensor(s) {}

    double getValue() {
        return (double)rand() / ((double)RAND_MAX / 20) + 10;
    }
};

class PressureSensor : public Sensor {
public:
    PressureSensor(string& type) : Sensor(type) {}

    double getValue() {
        return (double)rand() / ((double)RAND_MAX / 10) + 95;
    }
};

class CapacitiveSensor : public Sensor {
public:
    CapacitiveSensor(string& type) : Sensor(type) {}

    double getValue() {
        return (double)rand() / ((double)RAND_MAX / 4) + 1;
    }
};

class BC {
private:
    bool lock = false; //'false' means that the BC is not locked
    vector<Sensor *>& theSensors; //reference to vector of Sensor pointers
    mutex BC_mu;
    condition_variable cv;
    map<int, string> sensorMap;
    map<int, int> countMap;

public:
    BC(vector<Sensor *>& sensors) : theSensors(sensors) {
        sensorMap.insert(pair<int, string>(0, "temperature"));
        sensorMap.insert(pair<int, string>(1, "pressure"));
        sensorMap.insert(pair<int, string>(2, "capacitive"));
    }

    void requestBC(int index) {
        unique_lock<mutex> lk(BC_mu);
        if (lock) {
            cout << "BusController is locked, thread " << index << " is about to suspend.." << endl;
            cv.wait(lk);
        }
        lock = true;
        cout << " BusController locked by thread " << index << endl;
        return;
    }

    double getSensorValue(int selector) {
        countMap[selector]++;
        return theSensors[selector]->getValue();
    }

    void releaseBC(int index) {
        unique_lock<mutex> lk(BC_mu);
        lock = false;
        cout << "BusController unlock by thread " << index << endl;
        cv.notify_one();
    }

    string getSensorType(int selector) {
        return sensorMap[selector];
    }

    void printCntSensorType() {
        for (auto it = countMap.begin(); it != countMap.end(); it++) {
            cout << sensorMap[it->first] << " accessed " << it->second << " times" << endl;
        }
    }
};


void run(BC& theBC, int index) {
    random_device rd;
    mt19937 gen(rd());
    uniform_int_distribution<> dis1(0, 2);
    uniform_int_distribution<> dis2(1000, 10000);

    for (int i = 0; i < NUM_OF_SAMPLES; i++) {
        theBC.requestBC(index);
        int sensorTypeIndex = dis1(gen);
        string sensorStr = theBC.getSensorType(sensorTypeIndex);
        double randSensorValue = theBC.getSensorValue(sensorTypeIndex);
        cout << "       sample value from thread " << index << " from " << sensorStr << " sensor = " << randSensorValue << endl;
        theBC.releaseBC(index);
        usleep(dis2(gen));
     }
}

int main() {
    vector<Sensor *> sensors;
    string temp = "temperature", pressure = "pressure", capa = "capacitive";
    sensors.push_back(new TempSensor(temp));
    sensors.push_back(new PressureSensor(pressure));
    sensors.push_back(new CapacitiveSensor(capa));

    BC theBC(ref(sensors));
    thread theThreads[MAX_NUM_OF_THREADS];
    for (int i = 0; i < MAX_NUM_OF_THREADS; i++) {
        theThreads[i] = thread(run, ref(theBC), i);
    }
    for (int i = 0; i < MAX_NUM_OF_THREADS; i++) {
        theThreads[i].join();
    }
    theBC.printCntSensorType();
    return 0;
}

代码如上所示,主要实现的功能是六个线程去抢占BC类,然后选择不同的sensor进行输出,但是BC类中关于线程的同步机制觉得存在问题,求大神指导。

https://blog.csdn.net/qq_42189368/article/details/80690948

线程池设计思路

线程池是什么

我们先来打个比方,线程池就好像一个工具箱,我们每次需要拧螺丝的时候都要从工具箱里面取出一个螺丝刀来,有时候需要取出一个来拧,有时候螺丝多的时候需要多个人取出多个来拧,拧完自己的螺丝那么就会把螺丝刀再放回去,然后别人下次用的时候再取出来用。也许我的例子不是太完美,但是我想我已经基本阐述清楚了线程池。说白了线程池就是相当于提前申请了一些资源也就是线程,需要的时候就从线程池中取出线程来处理一些事情,处理完毕之后再把线程放回去

线程池介绍
线程池介绍

为什么需要线程池

我们来思考一个问题,为什么需要线程池呢?假如没有线程池的话我们每次调用线程是什么样子的?显然首先是先创建一个线程,然后再把任务交给这个线程,最后再把这个线程销毁掉。那么如果我们改用线程池的话,我们在程序运行的时候就会首先创建一批线程,然后交给线程池来管理。有需要的时候我们把线程拿出去处理任务,不需要的时候我们再回收到线程池中,这样是不是就避免了每次都需要创建和销毁线程这种消耗时间的操作。有人会说你使用线程池一开始就消耗了一些内存,之后一直不释放这些内存,这样岂不是有点浪费。其实这是类似于空间换时间的概念,我们确实多占用了一点内存但是这些内存和我们珍惜出来的这些时间相比,是非常划算的。

池的概念是一种非常常见的空间换时间的概念,除了有线程池之外还有进程池、内存池等等。其实他们的思想都是一样的就是我先申请一批资源出来,然后就随用随拿,不用再放回来。听到这儿是不是有种云计算的思想了,他们道理都是一样的。

如何设计线程池

现在硬核的知识要开始了,请坐稳扶好、抓紧扶手~

二话不说,先上图看看,我们要设计的线程池长什么样子!

线程池的设计
线程池的设计

设计思路

我们需要一个线程池类,那么线程池类中都需要哪些东西呢?我们庖丁解牛来看一看

  • 我们需要存放我们创建好的线程,因此我们需要一个容器专门放线程
  • 需要一个容器来存放我们的任务,每次把任务放到这个容器里面
  • 由于是多线程的读取任务,所以必不可少的我们需要锁,每次读取任务需要加锁和解锁
  • 我们需要判断什么时候终止,因此还需要一个判断终止的变量
  • 为了避免轮询的判断任务集装箱里面是不是空的,这样效率太低了,因此我们这里采用条件变量

这里来说明一下什么是条件变量。条件变量是并发编程中的一种同步机制,条件变量使得线程能够阻塞到等待某个条件发生后,再继续执行,期间还会把之前拿到的锁先释放掉,不影响其它人拿这把锁。因此条件变量十分强大而高效。(条件变量和锁将会在我多线程文章中详细讲解,这里不是重点,所以不再展开细讲)

接下来我们来研究一下线程池中需要有哪些操作呢?

  • 将任务添加到线程池中的操作,并且这时应该通知线程可以来取任务来执行了
  • 一个循环操作,不断地等待任务集装箱里面有数据来执行,也就是初始化完毕后需要做的事情
  • 通过改变终止变量来让上面循环停止的操作

好了,到此已经详细的把设计思路写清楚了,接下来该看具体的实现了

线程池的实现

接下来先来看一看线程池类是怎么实现的,注释已经很详细了,就不多说了直接上代码。

class CThreadMangerPool
{

public:
 CThreadMangerPool(void):is_runing(false){};
 bool init(int threadnum);//初始化函数
 ~CThreadMangerPool(void);
 void Run(void);  //执行函数
 void stop(void)//用来终止循环的函数
 void addTask(ThreadTask* task);//向任务集装箱中添加任务的函数
private:
 bool CreateThreads(int threadnum = 5);
 std::vector<std::shared_ptr<std::thread>> threadsPool;    //线程集装箱,用来存放线程
 std::list<std::shared_ptr<ThreadTask>>    threadTaskList; //任务集装箱,用来存放线程执行的任务
 std::condition_variable       threadPool_cv;  //条件变量
 std::mutex          threadMutex;   //互斥锁
 //std::vector<std::shared_ptr<CTcpClient>>  tcpClients;
 bool is_runing; //终止变量
};

我们来几个重点的函数实现~

在Run函数中,我们设计了一个循环,不断地执行等待并取出任务执行,如果没有的任务可以执行的话就睡眠等待(用之前提到的条件变量来实现)

注意这里使用了一个手法,我们用while来判断任务集装箱中的数据是不是空的,是因为类似于进程的惊群现象,这里出现条件变量的虚假唤醒。(在这里并不是重点就不展开讲了,会在我文章的多线程处详细讲解)

void CThreadMangerPool::Run(){
 std::shared_ptr<ThreadTask> task; 
 while(true){ //处在循环中

  std::unique_lock<std::mutex> guard(threadMutex);//利用RALL来管理锁,不用手动释放

  while(threadTaskList.empty()){ // 这里防止条件变量的虚假唤醒,所以不用if判断
   if (!is_runing)
    break;
   threadPool_cv.wait(guard); //条件变量的使用
  }
  if (!is_runing)  //同上 都是判断如果未启动或者调用了stop函数都会退出循环
   break;

  task = threadTaskList.front(); //取出任务
  threadTaskList.pop_front(); //把任务从容器中拿走

  if (task == NULL)
   continue;

  task->DoIt(); //执行任务处理函数
  task.reset(); //重置指针
 }

}

接下来看看增加任务的函数是怎么实现的

void CThreadMangerPool::addTask(ThreadTask* task){
 std::shared_ptr<ThreadTask> ptr; //创建一个指向任务的智能指针
 ptr.reset(task);
 {
  std::lock_guard<std::mutex> guard(threadMutex);  //同样是用RALL来管理锁,免去手动释放
  threadTaskList.push_back(ptr); //往任务集装箱中添加任务
 }
 threadPool_cv.notify_all(); //通知线程可以执行了,就是唤醒刚才在条件变量处睡眠的条件
 
}

好了,重点函数已经看完了,其他的轻松就可以实现包括初始化函数终止函数等等

完结撒花~

这些代码出自我的后端框架Ratel,感兴趣的童鞋可以去看看

github地址:https://github.com/hailong666/Ratel

往期回顾:

http://mp.weixin.qq.com/s?__biz=MzUxMTk4MDkzOA==&mid=2247483729&idx=1&sn=774022250827ea0a6aa07664627dce92&chksm=f96a25b4ce1daca2e74662efa0ccd32d5110fe49a5486caea410296d80d8cdcc24620d9743d0#rd

http://mp.weixin.qq.com/s?__biz=MzUxMTk4MDkzOA==&mid=2247483712&idx=1&sn=6902c270a6db9d06a0e97a8c7b264b5a&chksm=f96a25a5ce1dacb3d64c0154df9fb80ef672a98ce5dcad4f75391a92727fc571d12eafb35c6b#rd

http://mp.weixin.qq.com/s?__biz=MzUxMTk4MDkzOA==&mid=2247483939&idx=1&sn=76eeb6db0c832bcb913663c7bc26874c&chksm=f96a26c6ce1dafd0f6f51836c97ff6132b56550d3bb1f6b1cc354de06577eb540cf7284e8683#rd

http://mp.weixin.qq.com/s?__biz=MzUxMTk4MDkzOA==&mid=2247483660&idx=1&sn=dadc9e4b515b377a745de15a42180850&chksm=f96a25e9ce1dacff51ac9f3309e750da6db6d27f3a5b8e5e0c973fecfdaf6c065710bd388291#rd