首页
登录 | 注册

C++11 编写可复用多线程任务池 (开源OEasyPool)

引言

C++之父说,C++11是一门新的语言。关于 C++11 特性想必大家早已经听说过他的强大。我一直秉性着应用见真知的原则。今天我们使用线程池,来学习部分 C++11 特性。C++11已经基本普及,来吧,我已经迫不及待了。

版权所有:_ OE _, 转载请注明出处:http://blog.csdn.net/csnd_ayo

简介

操作系统:windows 7
编程环境:VS2013
最后编辑:2017年6月13日

  • 引言
  • 简介
  • 涉及内容
  • 线程池的由来
    • 养鱼的大雄
    • 大雄的线程池
    • 结语
  • 使用介绍
    • 类的功能
    • Main函数
    • 配置
    • 扩展
  • 讲解
    • 结构图
    • 任务抽象基类
    • 任务队列
    • 线程池
  • 总结

涉及内容

  • 智能指针

  • 原子类型

  • 无序Map

  • 线程

  • 条件变量

  • 锁机制


  • 2017年6月13日

    1. 优化 OETaskQueue 成模板型

线程池的由来

养鱼的大雄

  机器猫每次看着大雄买鱼回家,还都是带着一只鱼缸,大雄跟我机器猫讲解,是因为一只鱼缸里只能装一只鱼,家里没有多余的鱼缸了。

  机器猫骂骂咧咧的说 “你每天晚上搞什么鬼,在那里不停的砸鱼缸,啪啪啪的,吓坏宝宝了。”

  大雄温声细语的讲解到:“这样还不是为了节省空间,让哆啦a梦你有更多的地方可以蹦跶呀。”

  “大雄,一看你就是个傻逼,我们的房子有2048平方米,鱼缸往大了算才0.4平米不到,你节省个毛线啊。你每天晚上在那里砸鱼缸就算了,第二天又买了个新的鱼缸回来是什么鬼!天天买,天天砸,你你你,你搞事情啊!!”哆啦a梦又骂骂咧咧的讲道。

  大雄在使劲的拍了下自己的脑袋,发现真的是那么一回事儿,但是内心小懒虫又耐不住寂寞。

  “可是鱼缸不好管理,我鱼买的多的时候,要加鱼缸,鱼买的少了的时候,又得把鱼缸丢掉。” 大雄支支吾吾的嘀咕着。买鱼缸的时候,还得想家里鱼缸足不足够。丢鱼缸的时候还要想明天会不会买鱼。

  C++11 编写可复用多线程任务池 (开源OEasyPool)

  “关键的时候还得靠我,你就只会偷懒。” 机器猫带着一点点嫌弃的口吻讲道。

  C++11 编写可复用多线程任务池 (开源OEasyPool)

  自豪的掏出了宝物:”自动鱼池~”

  他是一个自动控制大小的鱼池,他会根据鱼的数量来决定鱼池的大小,你可以设置鱼池的最大大小和最小大小,若鱼太多的时候,它就会等其它的鱼死了,再把新的鱼放入鱼池。他还有很多功能呢。

大雄的线程池

  原来线程池是这么来的,我也很惊讶。是大雄的愚蠢,才让我们见识到这么厉害的宝贝。

  有人说,大雄拿到的明明是自动鱼池,我们的是线程池,压根是两个东西。
  
  其实,这也多亏了大雄,因为大雄最近也在学编程。

  至于大雄学编程的故事,我们下回再讲。^_^.

  大雄不甘寂寞写了一份线程池分享给大家 [点击免费下载]

结语

  线程池是为了避免创建和回收,以减少不必要的资源开销,维护特定的线程资源,在需要的时候使用,不需要的时候等待需要,长时间处于闲置的线程,则主动释放。这样的运行机制,在请求频繁的系统中广为应用。更高级的用法还有多线程池的方法。

使用介绍

接下来,我将以逻辑顺序的方式讲解线程池的制作过程。(逻辑方式便于理解)若没有源码的朋友,可以翻到上面,下载源码后,跟着文章一起阅读。

类的功能

  • Task (任务基类)

    该类主要实现一个任务类
    virtual int doWork() = 0;

  • TaskQueue (任务队列)

    该类主要针对任务的存储、删除、撤回等状态做管理

  • ThreadPool (线程池)

    整个线程池的核心业务处理类

Main函数

我们来讲解第一个示例Demo test_base

  • ThreadPoolDemo.cpp

    
    #include <time.h>
    
    
    #include <iostream>
    
    
    #include <memory>
    
    
    
    #include "ThreadPool.h"
    
    
    #include "../test/TaskTest.h"
    
    using namespace std;
    
    int main(void)
    {
        OEThreadPool::ThreadPoolConfig threadPoolConfig;
        threadPoolConfig.nMaxThreadsNum = 100;
        threadPoolConfig.nMinThreadsNum = 5;
        threadPoolConfig.dbTaskAddThreadRate = 3;
        threadPoolConfig.dbTaskSubThreadRate = 0.5;
        clock_t start = clock();
        {
    
            std::shared_ptr<OEThreadPool> threadPool(new OEThreadPool);
            threadPool->init(threadPoolConfig);
    
            int nID = 0;
            while (true)
            {
                std::shared_ptr<OETaskTest> request = std::shared_ptr<OETaskTest>(new OETaskTest());
    
                threadPool->addTask(request);
                if (request->getID() == 101000) {
                    break;
                }
            }
    
            threadPool->release();
        }
        clock_t finish = clock();
        std::cout << "duration:" << finish - start << "ms"<< std::endl;
        getchar();
        return 0;
    }

    这里主要做了一些线程池的运行测试工作,我们从这个Demo,可以了解到对这个线程池基本的操作。

    1. 线程池的初始化
    2. 创建任务类
    3. 添加任务到线程池当中
    4. 线程池异步处理
    5. 清理线程池资源

配置

关于线程池的配置,我们也留出了接口。
通过 OEThreadPool 类中的 tagThreadPoolConfig 结构体与 init 函数,进行线程池的资源配置。

代码中已经有了比较详尽的注释,但是在这里我还是想贴出来再强调一遍,虽然他很简单,但是看起来非常重要。

/// 线程池配置参数
typedef struct tagThreadPoolConfig {
    int nMaxThreadsNum;         /// 最大线程数量
    int nMinThreadsNum;         /// 最小线程数量
    double dbTaskAddThreadRate;   /// 增 最大线程任务比 (任务数量与线程数量,什么比例的时候才加)
    double dbTaskSubThreadRate;   /// 减 最小线程任务比 (任务数量与线程数量,什么比例的时候才减)
} ThreadPoolConfig;

线程池在没有合理的init之前,是不会开展工作的。

扩展

关于任务线程池的扩展,我们已经提供了很好的解决方案。
例如继承 Task 类,来完成线程任务的扩展。

例如案例中 TaskTest.h 中使用的那样,将主要的业务逻辑放在 doWork 中即可。

讲解

从关联的角度讲解各个类的实现

OETask –> OETaskQueue –> OEThreadPool

结构图

C++11 编写可复用多线程任务池 (开源OEasyPool)

主要从任务基类开始做延伸扩展。

C++11 编写可复用多线程任务池 (开源OEasyPool)

点击查看大图

任务抽象基类

文件名:Task.h

#ifndef __OETASK_H__
#define __OETASK_H__

#include <atomic>

// 任务基类
class OETask
{

protected:

    // 任务的唯一标识
    int id_;

private:
    static int nRequestID_;
    // 任务取消状态
    std::atomic<bool>  bIsCancelRequired_;

public:
    OETask() :id_(nRequestID_++), bIsCancelRequired_(false) {};
    virtual ~OETask() {};

public:
    // 任务类虚接口,继承这个类的必须要实现这个接口
    virtual int doWork() = 0;

    // 任务已取消回调
    virtual int onCanceled(){ return 1; }
    // 任务已完成
    virtual int onCompleted(int){ return 1; }

    // 获取任务ID
    int getID(){ return id_; }
    // 设置ID
    void setID(int nID){ id_ = nID; }
    // 获取任务取消状态
    bool isCancelRequired(){ return bIsCancelRequired_; }
    // 设置任务取消状态
    void setCancelRequired(){ bIsCancelRequired_ = true; }

};

__declspec(selectany) int OETask::nRequestID_ = 100000;
#endif // __OETASK_H__

这是一个抽象的任务基类, 他的声明是如此简单,是不是觉得有些鸡肋? 其实不是的,你可以根据这个基类,做一些自己的扩展。

例如,我做了以下扩展,来帮助我查询任务的执行状态,有效的异步监控任务。


class SKTask :
    public OETask
{
public:

    SKTask() :OETask() {
        CommonData::setTaskStatus(id_, MODULE_TASK_CREATE);
    }

    virtual ~SKTask() {
    }

    virtual int doWork() {
        CommonData::setTaskStatus(id_, MODULE_TASK_PROCESS);
        return 0;
    }

    virtual int onCanceled(void) {
        CommonData::setTaskStatus(id_, MODULE_TASK_CANCEL);
        return 0;
    }
    // 任务已完成
    virtual int onCompleted(int code) {
        CommonData::setTaskStatus(id_, code);
        return 0;
    }
};

这里有必要着重讲一下的是 onCompleted 函数,这个函数的参数是doWork的返回值。

当然 你也可以做很多你自己的扩展,例如添加几个成员函数,再或者使用一下C++11 的 bind 函数,是的这个函数非常棒,至少我很喜欢。如果你觉得这个函数很面生,我想你有必要好好的了解下他。

任务队列

文件名:taskqueue.h

#ifndef __OETASKQUEUE_H__
#define __OETASKQUEUE_H__

#include <deque>
#include <mutex>
#include <condition_variable>
#include <unordered_map>
#include <memory>

#include "Task.h"

/// 任务队列类
class OETaskQueue
{
public:
    OETaskQueue();
    ~OETaskQueue();

private:
    /// 就绪任务队列
    std::deque<std::shared_ptr<OETask> > queue_;
    /// 运行任务map
    std::unordered_map<int, std::shared_ptr<OETask>> mapDoingTask_;
    /// 互斥量
    std::mutex mutex_;
    /// 条件变量
    std::condition_variable conditPut_;

public:
    /**
    * @brief :向队列的末尾插入任务
    * @param :task 任务类
    */
    void put_back(std::shared_ptr<OETask> task);
    /**
    * @brief :向队列的头部插入任务
    * @param :task 任务类
    */
    void put_front(std::shared_ptr<OETask> task);
    /**
    * @brief :获取队首(并将任务加到运行任务列表中)
    * @return:任务类
    */
    std::shared_ptr<OETask> get(void);
    /**
    * @brief :获取整个双向链表的大小
    * @return:大小
    */
    size_t size(void);

    /**
    * @brief :释放队列
    */
    void release(void);
    /**
    * @brief :删除任务(从就绪队列删除,如果就绪队列没有,则看执行队列有没有,有的话置下取消状态位)
    * @param :nID 任务的编号
    * @return:成功返回0 失败返回非0
    */
    int deleteTask(int nID);
    /**
    * @brief :删除所有任务
    * @return:成功返回0 失败返回非0
    */
    int deleteAllTasks(void);
    /**
    * @brief :任务完成回调(从运行列表中删除指定任务)
    * @param :nID 任务的编号
    * @return:成功返回0 失败返回非0
    */
    int onTaskFinished(int nID);
    /**
    * @brief :判断任务是否执行完毕
    * @param :nID 任务的编号
    * @return:任务类
    */
    std::shared_ptr<OETask> isTaskProcessed(int nId);

    /**
    * @brief :等待有任务到达(带超时:超时自动唤醒)
    * @param :millsec 超时时间(毫秒)
    * @return:成功返回true 失败返回false
    */
    bool wait(std::chrono::milliseconds millsec);

};

#endif // __OETASKQUEUE_H__

这个队列针对任务的存储、删除、撤回等状态做了一系列的管理。使之有序的被执行(我反正是按顺序派发的任务,至于异步执行而导致的时间片的占用效率问题,我可管不了

  • 智能锁

    这里对异步任务的提取工作,一定要注意,所以我使用了 mutex 做了一些线程安全方面的保护,对于程序而言, mutex 还是显得格格不入,我利用 std::unique_lock<std::mutex> 让这个 mutex 显得有些生机。

    如果你经常使用多线程编程,而且线程管理范围达几十个,我想你会喜欢 C++11 的,更准确的说,你会喜欢 C++11 的智能锁。

线程池

文件名:threadpool.h

#ifndef __OETHREADPOOL_H__
#define __OETHREADPOOL_H__

#include "Task.h"
#include "TaskQueue.h"

/// 任务管理类
class OEThreadPool
{
public:
    /// 线程池配置参数
    typedef struct tagThreadPoolConfig {
        int nMaxThreadsNum;         /// 最大线程数量
        int nMinThreadsNum;         /// 最小线程数量
        double dbTaskAddThreadRate;   /// 增 最大线程任务比 (任务数量与线程数量,什么比例的时候才加)
        double dbTaskSubThreadRate;   /// 减 最小线程任务比 (任务数量与线程数量,什么比例的时候才减)
    } ThreadPoolConfig;

private:
    /// 任务队列
    OETaskQueue taskQueue_;

    /// 线程池配置(如果最小线程数量为1,则表示需要一个常驻的处理线程)
    ThreadPoolConfig threadPoolConfig_;
    /// 线程池是否被要求结束
    std::atomic<bool> atcWorking_;
    /// 当前线程个数
    std::atomic<int>  atcCurTotalThrNum_;
    /// 互斥量
    std::mutex mutex_;

public:
    OEThreadPool(void);
    ~OEThreadPool(void);

    /**
    * @brief :线程池资源配置初始化
    * @param :config 初始化的配置信息
    * @return:0 执行成功  非0 执行失败
    */
    int init(const ThreadPoolConfig& config);
    /**
    * @brief :释放资源(释放线程池、释放任务队列)
    * @return:true 执行成功  false 执行失败
    */
    bool release(void);

    /**
    * @brief :添加任务
    * @param :taskptr 任务类
    * @param :priority 是否有限处理 true:优先处理
    * @return:0 执行成功  非0 执行失败
    */
    int addTask(std::shared_ptr<OETask> taskptr, bool priority = false);

    /**
    * @brief :删除任务(从就绪队列删除,如果就绪队列没有,则看执行队列有没有,有的话置下取消状态位)
    * @param :nID 任务编号
    * @return:0 执行成功  非0 执行失败
    */
    int deleteTask(int nID);

    /**
    * @brief :删除所有任务
    * @return:0 执行成功  非0 执行失败
    */
    inline int deleteAllTasks(void);
    /**
    * @brief :判断任务是否执行完毕
    * @param :nID 任务编号
    * @return:执行完毕,执行完返回null,否则返回任务指针
    */
    inline std::shared_ptr<OETask> isTaskProcessed(int nId);

private:
    /**
    * @brief :获取当前线程任务比
    * @return:线程任务比
    */
    double getThreadTaskRate(void);
    /**
    * @brief :当前线程是否需要结束
    * @return:true:可以结束 false:不可以结束
    * @note  :已考虑到最小线程数量
    */
    bool shouldEnd(void);
    /**
    * @brief :添加指定数量的处理线程 
    * @param :nThreadsNum 添加的线程数量
    * @return:0 执行成功  非0 执行失败
    */
    int addProThreads(int nThreadsNum);
    /**
    * @brief :释放线程池
    * @return:true 执行成功  false 执行失败
    */
    bool releaseThreadPool(void);
    /**
    * @brief :任务处理线程函数
    */
    void taskProcThread(void);

};

extern OEThreadPool SystemThreadPool;

#endif // __OETHREADPOOL_H__

总结

这里的注释都是遵循 doxygen 的文档系统的注释规则,可以使用 doxygen 一键生成,可供查阅的文档。

  • 我们的收获

    1. 学习并制作了一个可复用的线程池
    2. 初步运用C++11的线程锁、条件变量、无序Map、智能指针等


2020 jeepxie.net webmaster#jeepxie.net
10 q. 0.010 s.
京ICP备10005923号