线程池原理和简单实现
1. 为什么需要线程池
当我们的程序需要同时处理多个业务的时候,我们就需要使用多线程来进行编程。尤其界面程序,如果采用单线程,当点击UI控件后,如果事件处理函数耗费较长时间,就会照成UI卡死无法继续响应的情况。此外,如果程序需要不断地进行I/O操作等耗时操作时,也可以将其放入一个专门的线程中。
上述中我们谈到了线程的作用。线程是CPU调度程序的最小单元,CPU通过调度机制分配给每个线程时间片,从而让线程得以运行。假如我们需要动态地创建大量线程,并且线程所处理的任务耗时较少,就会照成一定的CPU资源浪费了,因为创建线程、销毁线程对于CPU来说也是一个不小的开支。这种场景下线程池就能发挥它的作用了。线程池的概念是事先创建一定数目的线程,并处在阻塞状态下,等待任务的到来;当任务到来时,线程池会释放一个信号量,某个线程就会抢到信号量并处理该任务。
下面我们就将实现一个简单的线程池,系统为Win10,IDE为VisualStudio2013。
2. 如何实现线程池
根据线程池的功能,我们可以创建三个类,任务类、线程池类、线程信息类。
1) 任务类AbstractTask
任务接口类,需要用户继承该类并实现run函数。
2) 线程池类
负责线程的创建、任务的添加、信号的创建和控制、锁对象的创建和管理、线程池的销毁等任务。通过信号量来实现多线程之间的同步,并在多线程之间利用胡策锁来保护成员对象。
3) 线程参数类
该类主要是作为创建线程时的参数,传递给线程处理函数,从而进行相应的处理。
u 需要注意的问题:
我们都知道,创建线程时需要指定线程处理函数,该函数必须时静态函数,而不能是类的非静态成员函数,所以ThreadPool类中定义为:
//线程处理函数
static voidThreadFunc(void* arg);
但同时我们也知道,在类的静态函数中不能“直接”调用类的非静态成员。那在线程池中我们又必须在静态函数中访问线程池中的非静态成员,这该如何办呢?你仔细想一想…
对,注意到arg参数了吗?创建线程时,将线程池的指针利用参数传入到静态函数中,从而就可以利用该线程池指针访问非静态成员了。你想到了吗?
其实在Win32编程中也会遇到相似的问题,当我们创建窗口类时,必须指定窗口处理函数,该函数同样必须是静态的。如果我们封装了一个窗口类,并且想在窗口处理函数中访问该类的非静态成员时,也可以将该窗口的指针通过参数传入到窗口处理函数中,从而达到我们的目的,学习要融会贯通哦!
----------------------------------------------------------------------------
#include <Windows.h>
#include <list>
#include <vector>
using std::list;
using std::vector;
class Task;
class ThreadPara;//传递给线程的参数
{
public:
ThreadPool();
ThreadPool(int maxNum);
~ThreadPool();
//线程池初始化
void init();
//线程池销毁
void release();
void addTask(Task* task);
static void ThreadFunc(void* arg);
private:
typedef list<Task*> TaskList;
TaskList m_taskList; //未处理的任务池
HANDLE m_mutexTaskList;//任务池锁
HANDLE m_semNewTask; //增加新任务信号
HANDLE m_semDoneTask; //完成任务信号
ThreadList m_threadList;//线程集合
HANDLE m_mutexThreadPools;//线程池锁
int m_threadNum;//线程数目
};
---------------------------------------------------------------------------------------
#include "ThreadPara.h"
#include "Task.h"
#include <errno.h>
#include <process.h>
{
}
m_threadNum = maxNum;
}
{
}
if (isInit){
return;
}
//创建信号量和锁对象
m_mutexTaskList = CreateMutex(NULL, FALSE, L"m_mutexTaskList");//如果是true,则当前线程已经获取该锁对象,一定要注意
m_mutexThreadPools = CreateMutex(NULL, FALSE, L"m_mutexThreadPools");
m_semDoneTask = CreateSemaphore(NULL, 0, 100, L"m_semDoneTask");
m_semDestoryThreadPool = CreateSemaphore(NULL, 0, 100, L"m_semDestoryThreadPool");
if (m_mutexTaskList == 0
|| m_mutexThreadPools == 0
|| m_semNewTask == 0
|| m_semDoneTask == 0
|| m_semDestoryThreadPool == 0){
printf("Fatal Error:创建信号量和锁失败!\n");
return;
}
m_threadList.reserve(m_threadNum);
for (int i = 0; i < m_threadNum; i++)
{
ThreadPara thradPara;
thradPara.id = i+1;
thradPara.pool = this;
thradPara.run = true;
m_threadList.push_back(thradPara);
m_threadList[i].hid = _beginthread(ThreadFunc, 0, (void*)para);
}
}
{
WaitForSingleObject(m_mutexTaskList, INFINITE);
m_taskList.push_back(task);
printf("add task%d\n", task->id);
ReleaseMutex(m_mutexTaskList);
ReleaseSemaphore(m_semNewTask, 1, NULL);
}
void ThreadPool::ThreadFunc(void* arg){
//参数信息
ThreadPara* threadPara = (ThreadPara*)arg;
ThreadPool* pool = threadPara->pool;
printf("线程%d[%d]启动,pool=%x...\n", threadPara->id, threadPara->hid, pool);
HANDLE hWaitHandle[2];
hWaitHandle[0] = pool->m_semDestoryThreadPool;
hWaitHandle[1] = pool->m_semNewTask;
while (threadPara->run)
{
//等待信号
DWORD val = WaitForMultipleObjects(2, hWaitHandle, false, INFINITE);
if (val == WAIT_OBJECT_0){//线程池销毁
printf("线程%d退出...\n", threadPara->id);
break;
}
WaitForSingleObject(pool->m_mutexTaskList, INFINITE);
Task* task = *pool->m_taskList.begin();
pool->m_taskList.pop_front();
ReleaseMutex(pool->m_mutexTaskList);
task->run();
delete task;
//释放信号量
//ReleaseSemaphore(pool->m_semDoneTask, 1, NULL);
}
}
ReleaseSemaphore(m_semDestoryThreadPool, m_threadNum, NULL);
m_threadList.clear();
delete *iter;
}
m_taskList.clear();
CloseHandle(m_mutexThreadPools);
CloseHandle(m_semDestoryThreadPool);
CloseHandle(m_semDoneTask);
CloseHandle(m_semNewTask);
}
---------------------------------------------------------------------------------------
---------------------------------------------------------------------------------------
class Task
{
public:
Task(int id);
~Task();
virtual void run() = 0;
int id;
};
---------------------------------------------------------------------------------------
---------------------------------------------------------------------------------------
{
this->id = id;
}
Task::~Task()
{
}
---------------------------------------------------------------------------------------
---------------------------------------------------------------------------------------
<ThreadPara.h>
#pragma once
class ThreadPool;
class ThreadPara{
public:
int id;//线程序号
int hid;//线程句柄号
bool run;//运行标志
ThreadPool* pool;//线程池
public:
ThreadPara():id(0), hid(0), run(false), pool(nullptr){}
~ThreadPara(){}
};
---------------------------------------------------------------------------------------
---------------------------------------------------------------------------------------
<Test.cpp>
#include"ThreadPool.h"
#include<stdio.h>
{
public:
MyTask(int id):Task(id){}
~MyTask(){}
};
void main(){
ThreadPool threadPool(10);
threadPool.init();
for (int i = 0; i < 20; i++){
Task* task = new MyTask(i+1);
threadPool.addTask(task);
}
threadPool.release();
printf("release threadpool\n");
getchar();
}