线程池

linux基础

线程池的原理

我们使用线程的时候就去创建一个线程,这样实现起来非常简便,但是就会有一个问题:如果并发得线程数量很多,并且每个线程都是执行一个时间很短得任务结束了,这样频繁得创建线程会降低系统的效率,因为频繁创建线程和销毁线程需要时间。

线程池是一种多线程处理形式,处理过程中将任务添加到队列,然后在创建线程后自动启动这些任务。线程池线程都是后台线程。每个线程都可以默认得堆栈大小,以默认优先级运行,并处于多线程单元中。如果某个线程在托管代码中空闲,那么线程池则插入另一个辅助线程来使所有处理器保持繁忙。如果所有线程池内的线程都处于繁忙状态,但是任务队列中仍然后任务在挂起等待,那么线程池则创建另一个辅助线程,但数目不会大于最大得线程数。

  • 线程池的组成主要分为3个部分,这三个部分配合工作就可以得到一个完整的线程池:
    • 任务队列

      • 通过任务池提供的API函数,将一个待处理的任务添加到任务队列,或者从任务队列中删除

      • 已处理的任务会从任务队列中删除

      • 线程池的使用者,也就是调用进程池函数往任务队列中添加任务的线程就是生产者线程

    • 工作的线程(任务队列的消费者)

      • 线程池中维护了一定数量的工作线程,他们的作用是不停的读任务队列,从里面取出来任务并处理

      • 工作的线程相当于任务队列的消费者角色

      • 如果任务队列为空,工作的线程将会被阻塞

      • 如果阻塞之后有了新任务,有生产者将阻塞解除,工作线程继续工作

    • 管理者线程

      • 他的任务是周期性的对任务队列中的任务数量以及处于忙状态的工作线程个数进行检测
        • 当任务过多的时候,可以适当的创建一些新的工作线程

        • 当任务过少的时候,可以适当的销毁一些工作的线程

代码

threadpool.c

#include "threadpool.h"

// 创建线程池并初始化
ThreadPool *threadPoolCreate(int min, int max, int queuesize)
{
    // 申请进程池的空间
    ThreadPool *tpool = (ThreadPool *)malloc(sizeof(ThreadPool));

    do {
        if(tpool == NULL) {
            printf("error :malloc tpool\n");
            break;
        }

        // 分配最大线程的空间
        tpool->workerID = (pthread_t *)malloc(sizeof(pthread_t) * max);
        if(tpool->workerID  == NULL) {
            printf("error :malloc wokerID\n");
            break;
        }
        memset(tpool->workerID, 0, sizeof(pthread_t)* max);

        // 设置线程管理
        tpool->minNumThread = min;
        tpool->maxNumThread = max;
        tpool->busyNumThread = 0;
        tpool->liveNumThread = min;
        tpool->exitNumThread = 0;
        
        if(pthread_mutex_init(&tpool->threadpool_mutex, NULL) != 0 ||
            pthread_mutex_init(&tpool->busythread_mutex, NULL) != 0 ||
            pthread_cond_init(&tpool->notFull, NULL) != 0||
            pthread_cond_init(&tpool->notEmpty, NULL) != 0 ){
                printf("error :mutex or cond init\n");
                break;    
        }
        
        // 设置任务队列
        tpool->taskQ = (Task*)malloc(sizeof(Task) * queuesize);
        tpool->queueCapacity = queuesize;
        tpool->queueFront = 0;
        tpool->queueRear = 0;
        tpool->queueSize = 0;
        
        tpool->shut_down = 0;

        // 创建线程
        pthread_create(&tpool->managerID, NULL, manager, NULL);
        for(int i = 0; i < min; i++) {
            pthread_create(&tpool->workerID[i], NULL, worker, tpool);
        }
        
        return tpool;
    } while(0);        // 用break-while+break的方式进行错误处理
    
    if(tpool && tpool->workerID) free(tpool->workerID);
    if(tpool && tpool->taskQ) free(tpool->taskQ);
    if(tpool) free(tpool);
    
    return NULL;
}

// 销毁线程池
int threadPoolDestroy(ThreadPool *tpool)
{
    if(tpool == NULL)
        return -1;
    // 等待管理者线程
    tpool->shut_down = 1;
    pthread_join(tpool->managerID, NULL);
    // 唤醒阻塞的消费者线程
    for(int i = 0; i < tpool->liveNumThread; i++) {
        pthread_cond_signal(&tpool->notEmpty);
    }
    // 释放堆内存
    if(tpool->taskQ) {
        free(tpool->taskQ);
    }
    if(tpool->workerID) {
        free(tpool->workerID);
    }
    
    // 释放IPC
    pthread_mutex_destroy(&tpool->threadpool_mutex);
    pthread_mutex_destroy(&tpool->busythread_mutex);
    pthread_cond_destroy(&tpool->notEmpty);
    pthread_cond_destroy(&tpool->notFull);
    
    free(tpool);
    tpool = NULL;

    return 0;
}

// 给线程池添加任务
void threadPoolAddTask(ThreadPool *tpool, void(*func)(void*), void *arg)
{
    // 访问线程池的锁
    pthread_mutex_lock(&tpool->threadpool_mutex);

    while(tpool->queueSize == tpool->queueCapacity && (!tpool->shut_down)) {
        pthread_cond_wait(&tpool->notFull, &tpool->threadpool_mutex);
    }

    if(tpool->shut_down) {
        pthread_mutex_unlock(&tpool->threadpool_mutex);
        return ;
    }

    // 添加任务
    tpool->taskQ[tpool->queueRear].function = func;
    tpool->taskQ[tpool->queueRear].arg = arg;
    tpool->queueRear = (tpool->queueRear + 1) % tpool->queueCapacity;
    tpool->queueSize++;

    pthread_cond_signal(&tpool->notEmpty);
    pthread_mutex_unlock(&tpool->threadpool_mutex);
}

// 获取线程池当前处于工作状态的线程
int threadPoolBusyNum(ThreadPool *tpool)
{
    // 访问之前先上锁
    pthread_mutex_lock(&tpool->busythread_mutex);
    int num = tpool->busyNumThread;
    pthread_mutex_unlock(&tpool->busythread_mutex);
    return num;
}

// 获取当前线程池存活的线程个数
int threadPoolAliveNum(ThreadPool *tpool)
{
    // 访问之前先上锁
    pthread_mutex_lock(&tpool->threadpool_mutex);
    int num = tpool->liveNumThread;
    pthread_mutex_unlock(&tpool->threadpool_mutex);
    return num;
}

// 工作的线程任务函数
void *worker(void *arg)
{
    ThreadPool *tpool = (ThreadPool *)arg;
    if(tpool == NULL) {
        return NULL;
    }
    // 访问线程池锁
    while(1) {
        pthread_mutex_lock(&tpool->threadpool_mutex);
        
        // 阻塞工作线程
        while(tpool->queueSize == 0 && !tpool->shut_down) {
            pthread_cond_wait(&tpool->notEmpty, &tpool->threadpool_mutex);
            
            // 判断是否要删除线程
            if(tpool->exitNumThread > 0) {
                tpool->exitNumThread--;
                if(tpool->liveNumThread > tpool->minNumThread) {
                    tpool->liveNumThread--;
                    pthread_mutex_unlock(&tpool->threadpool_mutex);
                    threadExit(tpool);
                }
            }
        }
            
        // 判断进程池是否要关闭
        if(tpool->shut_down) {
            pthread_mutex_unlock(&tpool->threadpool_mutex);
            threadExit(tpool);            
        }

        // 从任务队列中拿出一任务来
        Task task;
        task.function = tpool->taskQ[tpool->queueFront].function;
        task.arg = tpool->taskQ[tpool->queueFront].arg;
        // 移动队列
        tpool->queueFront = (tpool->queueFront + 1) % tpool->queueCapacity;
        tpool->queueSize--;
        // 解锁
        pthread_cond_signal(&tpool->notFull);
        pthread_mutex_unlock(&tpool->threadpool_mutex);

        // 修改busy
        printf("thread %ld start working...\n", pthread_self());
        pthread_mutex_lock(&tpool->busythread_mutex);
        tpool->busyNumThread++;
        pthread_mutex_unlock(&tpool->busythread_mutex);
        task.function(task.arg);
        free(task.arg);
        task.arg = NULL;

        printf("thread %ld finsh working..\n", pthread_self());
        pthread_mutex_lock(&tpool->busythread_mutex);
        tpool->busyNumThread--;        
        pthread_mutex_unlock(&tpool->busythread_mutex);
    }
    return NULL;
}

// 管理者线程任务函数
void *manager(void *arg)
{
    ThreadPool *tpool = (ThreadPool *)arg;

    while(!tpool->shut_down) {
        // 间隔3s
        sleep(3);
        // 取出队列的数量,存活线程,忙线程
        pthread_mutex_lock(&tpool->threadpool_mutex);
        int queuesize = tpool->queueSize;
        int livenum = tpool->liveNumThread;
        pthread_mutex_unlock(&tpool->threadpool_mutex);
        
        pthread_mutex_lock(&tpool->busythread_mutex);
        int busynum = tpool->busyNumThread;
        pthread_mutex_unlock(&tpool->busythread_mutex);

        // 添加线程
        if(queuesize > livenum && livenum < tpool->maxNumThread) {
            pthread_mutex_lock(&tpool->threadpool_mutex);
            int count  = 0;
            for(int i = 0; i  < tpool->maxNumThread &&  count < CNUM && tpool->liveNumThread < tpool->maxNumThread; i++) {
                if(tpool->workerID[i] == 0) {
                    pthread_create(&tpool->workerID[i], NULL, worker, tpool);
                    tpool->liveNumThread++;
                    count++;
                }
            }
            
            pthread_mutex_unlock(&tpool->threadpool_mutex);
        }
        
        // 销毁线程
        if(queuesize * 2 < livenum && livenum > tpool->minNumThread) {
            pthread_mutex_lock(&tpool->threadpool_mutex);
            tpool->exitNumThread = CNUM;
            pthread_mutex_unlock(&tpool->threadpool_mutex);
            
            // 工作的线程自杀
            for(int i = 0; i < CNUM; i++) {
                pthread_cond_signal(&tpool->notEmpty);
            }        
        }
    }
    return NULL;
}

// 单个线程退出
void threadExit(ThreadPool *pool)
{
    pthread_t tid = pthread_self();
    for(int i = 0; i < pool->maxNumThread; i++) {
        if(pool->workerID[i] == tid) {
            pool->workerID[i] = 0;
            printf("threadExit() called, %ld exiting ...\n", tid);
            break;
        }
    }
    pthread_exit(NULL);
}

threadpool.h

#ifndef __THREADPOOL_H
#define __THREADPOOL_H

#include <stdio.h>
#include <pthread.h>
#include <stdio.h>
#include <unistd.h>
#include <stdlib.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <string.h>
#include <stdlib.h>

#define CNUM 2

// 任务结构体
typedef struct{
    void (*function)(void *arg);
    void *arg;
}Task;

// 线程池定义
struct ThreadPool {
    // 任务队列
    Task *taskQ;
    int queueCapacity;    // 容量
    int queueSize;    // 当前任务个数
    int queueFront;    // 队头
    int queueRear;    // 队尾

    // 线程管理
    pthread_t managerID;    // 管理者线程ID
    pthread_t *workerID;    // 工作的线程ID
    int minNumThread;        // 最小的线程数量
    int maxNumThread;        // 最大的线程数量
    int busyNumThread;        // 在工作的线程数量
    int liveNumThread;        // 存活的线程数量
    int exitNumThread;        // 要销毁的线程个数
    pthread_mutex_t threadpool_mutex;    // 锁整个线程池的锁
    pthread_mutex_t busythread_mutex;// 锁忙的线程个数的锁

    pthread_cond_t notFull;        // 任务队列是不是已经满了
    pthread_cond_t notEmpty;    // 任务队列是不是是为空

    int shut_down;        // 要不要销毁整个线程池
};

typedef struct ThreadPool ThreadPool;

/* 函数声明 */
// 创建线程池并初始化
ThreadPool *threadPoolCreate(int min, int max, int queuesize);

// 销毁线程池
int threadPoolDestroy(ThreadPool *tpool);

// 给线程池添加任务
void threadPoolAddTask(ThreadPool *tpool, void(*func)(void*), void *arg);

// 获取线程池当前处于工作状态的线程
int threadPoolBusyNum(ThreadPool *tpool);

// 获取当前线程池存活的线程个数
int threadPoolAliveNum(ThreadPool *tpool);

// 工作的线程任务函数
void  *worker(void *arg);

// 管理者线程任务函数
void *manager(void *arg);

// 单个线程退出
void threadExit(ThreadPool  *pool);

#endif 

threadpooldemo.h

#include <stdio.h>
#include <pthread.h>
#include <stdio.h>
#include <unistd.h>
#include <stdlib.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <string.h>

#include "./threadpool.h"

void taskFunc(void* arg)
{
    int num = *(int*)arg;
    printf("thread %ld is working, number = %d\n",
    pthread_self(), num);
    sleep(1);
}

int main()
{
    // 创建线程池
    ThreadPool* pool = threadPoolCreate(3, 10, 100);
    for (int i = 0; i < 100; ++i)
    {
         int* num = (int*)malloc(sizeof(int));
        *num = i + 100;
        threadPoolAddTask(pool, taskFunc, num);
    }

    sleep(30);

    threadPoolDestroy(pool);
    return 0;
}

相关文章

迈进Makefile的世界(入门)

简介 Linux的`make`程序用来自动话编译大型源码,实现只需要一个`make`执行就可以全自动完成。 `make`能自动化完成,是因为项目路径下提供了一个`Makefile`文件,由该文件负责告诉`make`,应该去编译和链接该项目程序。 `make`起初只针对C语言开发,但它实际应用并不限定C语言,而是执行Linux命令去应用到任意项目,甚至不是编程语言。 >此外`make`...

linux基础

线程同步

线程同步的概念 多个线程对内存中的共享资源访问时,让线程进行线性的方式,有顺序的访问。线程对内存的这种访问方式就是线程同步。 下面是一个两个线程同时对变量num,进行加1的操作的demo,但是最终结果与预想结果,有很大差异。下面我们将分析并解决线程同步的问题。 ``` #include <pthread.h> #include <stdio.h> #include <unistd.h> #...

linux基础

多线程与线程同步

多线程特点 线程是轻量级的进程,在Linux环境下线程的本质仍是进程。在计算机上运行的程序是一组指令及指令参数的组合,指令按照既定的逻辑控制计算机运行。操作系统会以进程为单位,分配系统资源。 可以理解为:进程是资源分配的最小单位,线程是操作系统调度的最小单位。 在概念上了解线程与进程的区别: - 进程有自己独立的地址空间,多个线程共用一个地址空间 - 线程更加节省系统资源,效率不...

linux基础