线程同步的概念
多个线程对内存中的共享资源访问时,让线程进行线性的方式,有顺序的访问。线程对内存的这种访问方式就是线程同步。
下面是一个两个线程同时对变量num,进行加1的操作的demo,但是最终结果与预想结果,有很大差异。下面我们将分析并解决线程同步的问题。
#include <pthread.h>
#include <stdio.h>
#include <unistd.h>
#include <stdlib.h>
/* 创建一个全局变量当作共享资源 */
int sum;
/* 线程回调函数用于访问共享资源 */
void *threada_callback(void *arg)
{
int i, num;
for(int i = 0; i < 100; i++) {
num = sum;
printf("thread A sum = %d\n", num);
usleep(10); // 挂起10ms用于进行上下文切换
num ++;
sum = num;
}
pthread_exit(NULL); // 线程退出
}
void *threadb_callback(void *arg)
{
int i, num;
for(int i = 0; i < 100; i++) {
num = sum;
printf("thread B sum = %d\n", num);
usleep(10); // 挂起10ms用于进行上下文切换
num ++;
sum = num;
}
pthread_exit(NULL); // 线程退出
}
int main()
{
/* 创建两个子线程 */
pthread_t tida, tidb;
int i, ret;
sum = 0;
ret = pthread_create(&tidb, NULL, threada_callback, NULL);
if(ret == 0) {
printf("create pid%d success \n", i);
}
ret = pthread_create(&tida, NULL, threadb_callback, NULL);
if(ret == 0) {
printf("create pid%d success \n", i);
}
pthread_join(tida, NULL);
pthread_join(tidb, NULL);
return 0;
}
多线程工作时为分时共享时间片,并且测试程序中调用sleep()导致线程的CPU时间片用完就被迫挂起阻塞了,这样就能让CPU进行上下文切换的操作更加频繁,更容易出现数据混乱的现象。
CPU对应寄存器、一级缓存、二级缓存、三级缓存都是独占的,用于存储处理的数据和线程的状态信息,数据被CPU处理完成需要再次被写入的物理内存中,物理内存数据也可以通过文件IO操作写入到磁盘中。
同步方式
对于多个线程访问共享资源出现数据混乱的问题,需要进行线程同步。常用的线程同步方式有四种:互斥锁、读写锁、条件变量、信号量。共享资源就是多个线程共同访问的变量,这些变量通常为全局数据区变量或者堆区变量。这些资源被称为“临界资源”。
找到临界资源之后,确认临界资源相关的代码,即成为临界区。在临界区部分进行线程同步了。大概思路为:
- 在临界区上方加入添加加锁函数
- 那个线程调用这句代码,就会把这项锁锁上,其他线程就只能阻塞在锁上
- 在临界区代码下面添加解锁函数
- 出临界区的线程会把锁定的锁打开,其他抢到锁的线程就可以进入到临界区了
- 通过锁机制能保证临界区代码最多只能同时有一个线程访问,这样并行访问就变为串行访问了
互斥锁
互斥锁函数
互斥锁是线程同步最常用的一种方式,通过互斥锁可以锁定一个代码块,被锁定的这个代码块,所有的线程只能顺序的执行(不能并行处理),这样多线程访问共享资源数据混乱的问题就可以解决了,需要付出的代价就是执行效率降低,因为默认临界区每个线程是可以并行处理的吗。
在Linux中互斥锁的数据类型为pthread_mutex_t,创建一个这种类型的变量就得到了一把互斥锁。
在创建的锁对象中保存了当前这把锁的状态信息:锁定还是打开,如果锁定状态还记录了这把锁加锁的线程信息。一个互斥锁变量只能被一个线程锁定,被锁定之后其他线程再对互斥锁变量加锁就会阻塞,直到这把互斥锁被解锁,被解锁的线程才能被解除阻塞。
int pthread_mutex_init(pthread_mutex_t *resitrict mutex, // 初始化互斥锁
const pthread_mutexattr_t *resitrict attr);
int pthread_mutex_destory(pthread_mutex_t *mutex); // 释放互斥锁变量
- 参数:
- mutex:互斥锁变量的地址
- attr:互斥锁的属性,一般使用默认属性即可,指定为NULL
int pthread_mutex_lock(pthread_mutex_t *mutex);
int pthread_mutex_trylock(pthread_mutex *mutex);
上面两个函数均为判断参数中的mutex互斥锁中的状态是不是锁定状态。
对于第一个函数
- 没有被锁定,是打开的,这个线程可以加锁成功,这个锁会记录是被当前线程加锁的
- 如果已经被锁定,那么其他线程加锁失败,这些线程会进入阻塞状态,等待这把锁的释放
- 当这把锁解开后,这些阻塞在互斥锁上的线程会解除阻塞,通过竞争的方式对互斥锁进行加锁操作,这一部分是一个概率性问题
对于第二个函数,相比第一个函数不同之处在于当互斥锁已经被加锁后,调用这个函数的线程不会阻塞,而是直接返回错误号。
不管如何加锁,都是用pthread_mutex_unlock进行解锁。那个线程加的锁必须那个线程解锁才可以。
int pthread_mutex_unlock(pthread_mutex_t *mutex);
互斥锁的使用
#include <pthread.h>
#include <stdio.h>
#include <unistd.h>
#include <stdlib.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <string.h>
/* 创建一个全局变量当作共享资源 */
int sum;
pthread_mutex_t mutex;
/* 线程回调函数用于访问共享资源 */
void* threada_callback(void *arg)
{
int i, num;
for(int i = 0; i < 100; i++) {
// 获取互斥锁
pthread_mutex_lock(&mutex);
num = sum;
num ++;
sum = num;
// 解锁互斥锁
pthread_mutex_unlock(&mutex);
printf("thread A sum = %d\n", num);
usleep(10); // 挂起10ms用于进行上下文切换
}
return NULL;
}
void* threadb_callback(void *arg)
{
int i, num;
for(int i = 0; i < 100; i++) {
// 获取互斥锁
pthread_mutex_lock(&mutex);
num = sum;
num ++;
sum = num;
// 解锁互斥锁
pthread_mutex_unlock(&mutex);
printf("thread B sum = %d\n", num);
usleep(10); // 挂起10ms用于进行上下文切换
}
return NULL;
}
int main()
{
/* 创建两个子线程 */
pthread_t tida, tidb;
int i, ret;
sum = 0;
/* 创建互斥量 */
pthread_mutex_init(&mutex, NULL);
ret = pthread_create(&tidb, NULL, threada_callback, NULL);
if(ret == 0) {
printf("create pid%d success \n", i);
}
ret = pthread_create(&tida, NULL, threadb_callback, NULL);
if(ret == 0) {
printf("create pid%d success \n", i);
}
pthread_join(tida, NULL);
pthread_join(tidb, NULL);
pthread_mutex_destroy(&mutex);
return 0;
}
死锁
当多个线程访问共享资源时候,需要加锁,如果锁使用不当,就会造成死锁情况。如果线程死锁造成的后果是:所有的线程都阻塞,常见场景是:
- 线程上锁后忘记解锁
- 上锁后线程函数退出,没有解锁(return 之前要解锁)
- 重复加锁,常见发生在递归中
- 在程序中有多个共享资源,因此有多个锁,随意加锁导致互相被阻塞(两个线程等待不同的资源)
在进行多线程编程时候,如何避免死锁?
- 避免多次锁定,多检查
- 对共享资源访问完成后,一定要解锁,或者加锁时候使用trylock方法
- 如果程序中有多把锁,可以控制对锁的访问顺序(顺序访问共享资源),或者对互斥锁进行加锁操作之前,先释放当前线程拥有的互斥锁
- 项目程序中可以引入一些专门用于死锁检测的模块
读写锁
读写锁函数
读写锁是互斥锁的升级,一把锁但是可以分别对读和写进行锁定。在做读操作的时候可以提高程序的执行效率,如果所有的线程都是做读操作,那么读是可以实现并行的。(互斥锁,读和写都是串行)
当对临界有大量读操作的时候,可以使用读写锁。
互斥锁的类型为pthread_rwlock_t rwlock。
之所以称为读写锁,是因为这把锁既可以锁定读操作,也可以锁定写操作。为了方便理解,可以大致认为把锁中记录了一些信息:
- 锁的状态
- 锁定的什么操作:读/写 使用读写锁锁定了读操作,需要先解锁才能锁定写操作,反之一样
- 是由那个线程锁的
读写锁的使用方式与互斥锁的使用方式基本相同的:找到共享资源——确定临界区,在临界区上下进行操作。
因为一把锁可以锁定读或者锁定写操作,下面是读写锁的特点:
- 使用读写锁的读锁锁定了临界区,线程对临界区的访问是并行的,可以实现共享。
- 使用读写锁的写锁锁定了临界区,线程对临界区的访问时串行的,写锁是独占的
- 使用读写锁分别对两个临界区加了读锁和写锁,两个线程同时访问临界区,此时访问读锁的临界区的线程阻塞,因为 写锁比读锁的优先级高
// 初始化读写锁
int pthread_rwlock_init(pthread_rwlock_t *restrict rwlock, const pthread_rwlockattr_t *restrict attr);
// 释放读写锁占用的系统资源
int pthread_rwlock_destory(pthread_rwlock_t *rwlock);
- 参数
- relock 读写锁的地址
- attr 读写锁的属性,一般使用默认属性,指定为NULL
// 在程序中进行加读锁,锁定的是读操作
int pthread_rwlock_rdlock(pthread_rwlock_t *lock);
// 避免死锁的加读锁操作,不会阻塞,直接返回错误号
int pthread_rwlock_tryrdlock(pthread_rwlock_t *lock);
调用上面两个函数,如果读写锁是打开的,那么加锁成功;如果读写锁已经锁定了读操作,调用这个函数可以可以加锁,因为读锁是共享的;如果读写锁已经进行了写锁,那么调用第一个函数会阻塞。
// 对读写锁进行加写锁,锁定的是写操作
int pthread_rwlock_wrlock(pthread_rwlock_t *lock);
// 避免死锁的加写锁操作,不会阻塞,直接返回错误号
int pthread_rwlock_wrlock(pthread_rwlock_t *lock);
调用上面两个函数,如果读写锁是打开的,那么加锁成功;如果读写锁是锁定了(不管锁定读还是写)调用第一个函数的线程就会阻塞。
读写锁的使用
#include <pthread.h>
#include <stdio.h>
#include <unistd.h>
#include <stdlib.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <string.h>
/* 创建一个全局变量当作共享资源 */
int sum;
pthread_rwlock_t rwlock;
/* 线程回调函数用于访问共享资源 */
void* rdthread_callback(void *arg)
{
int i, num;
for(int i = 0; i < 100; i++) {
// 访问读写的读锁
pthread_rwlock_rdlock(&rwlock);
num = sum;
// 释放读写锁的读锁
pthread_rwlock_unlock(&rwlock);
printf("Read thread sum = %d\n", num);
usleep(rand()%5); // 挂起一段时间用于进行上下文切换
}
return NULL;
}
void* wrthread_callback(void *arg)
{
int i, num;
for(int i = 0; i < 90; i++) {
// 访问读写锁的写锁
pthread_rwlock_wrlock(&rwlock);
num = sum;
num ++;
sum = num;
// 解锁互斥锁
pthread_rwlock_unlock(&rwlock);
printf("Write thread sum = %d\n", num);
usleep(rand()%5); // 挂起一段时间用于进行上下文切换
}
return NULL;
}
int main()
{
/* 创建多个子线程 */
pthread_t tid1[2], tid2[3];
int i, ret;
sum = 0;
/* 创建一个读写锁 */
pthread_rwlock_init(&rwlock, NULL);
/* 创建2访问写锁的线程 */
for(i = 0; i < 2; i++) {
ret = pthread_create(&tid1[i], NULL, wrthread_callback, NULL);
}
/* 创建3访问读锁的线程 */
for(i = 0; i < 3; i++) {
ret = pthread_create(&tid2[i], NULL, rdthread_callback, NULL);
}
/* 释放子线程 */
for(i = 0; i < 2; i++) {
pthread_join(tid1[i], NULL);
}
for(i = 0; i < 3; i++) {
pthread_join(tid2[i], NULL);
}
pthread_rwlock_destroy(&rwlock);
return 0;
}
条件变量
条件变量函数
条件变量主要用于线程的阻塞。如果在多线程程序中只使用条件变量是无法实现线程的同步,必须要配合互斥锁来实现,虽然条件变量和互斥锁都能阻塞线程,但是二者效果不一样:
- 互斥锁实现多个线程同时访问共享资源的时候,实现串行访问的方式
- 条件变量只有在满足某种条件下才可以让线程阻塞,否则还是可以多个线程并行访问共享资源的,这样的话共享资源还会出现数据混乱
条件变量主要出现在生产者与消费者模型上,并且配合互斥锁使用,条件变量的类型为pthread_cond_t。
被条件变量阻塞的线程的线程信息会被记录,以便解除阻塞的时候使用。
// 初始化条件变量
int pthread_conf_init(pthread_cond_t *restrict cond, pthread_condattr_t *restrict attr);
// 销毁环境变量
int pthread_conf_destory(pthread_cond_t cond);
- 参数:
- cond 条件变量的地址
- attr 条件变量的属性,一般使用默认属性,指定NULL
// 线程阻塞函数,调用这个函数,线程就会被阻塞
int pthread_cond_wait(pthread_cond_t *restrict cond, pthread_mutex_t * restrict mutex);
该函数阻塞的时候,需要一个互斥锁参数,这个互斥锁主要功能是进行线程同步,让线程串行的方式进入临界区,避免共享资源数据混乱。阻塞函数会进行一个操作:
- 当阻塞线程时候,如果线程已经对互斥锁进行上锁操作,那么会解锁,避免进入死锁
- 当线程被环形的始胡,函数内部会再次将这个互斥锁进行上锁操作,继续访问临界区
// 防止死锁的阻塞,阻塞固定时间
int pthread_cond_timewait(pthread_cond_t *restrict cond, pthread_mutex_t *restrict mutex, struct timespec *restrict abstime);
该函数与上面的函数前面两个参数是一样的,但是第三个参数为一个时间结构体,时间结构体的内容为:
struct timespec {
time_v tv_sec; // 秒
long tv_nsec; // 纳秒
}
// 这里是表示从 1971年1.1到某个时间节点所用到的是时长,总长度为秒+纳秒表示。首先我们要先知道当前时间距离1971多少秒?
// 使用time(NULL)获取当前时间, 并在返回值基础上加要等待的时间
time_t now_time = time(NULL);
struct time_spec clock;
clock.tv_nsec = 0;
clock.tv_sec = time(NULL) + 100; //线程阻塞100s+0ns
// 唤醒阻塞在条件变量上的线程,至少由一个被解锁阻塞
int pthread_cond_signal(pthread_cond_t *cond);
// 唤醒全部阻塞在条件变量上的线程
int pthread_cond_broadcast(pthread_cond_t *cond);
调用上面两个函数,都可以唤醒被阻塞的线程,区别在于signal是唤醒至少一个被阻塞的线程,总的个数不确定;而boradcase(广播)及唤醒所有阻塞在条件变量上的线程。
条件变量的使用
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <pthread.h>
/* 创建数据链表 */
struct node {
int num;
struct node *next;
};
// 定义条件变量
pthread_cond_t cond;
// 定义互斥锁变量
pthread_mutex_t mutex;
// 定义链表头节点
struct node *head = NULL;
/* 生产者的回调函数 */
void *producer_callback(void *arg)
{
while(1) {
// 访问互斥锁并上锁
pthread_mutex_lock(&mutex);
/* 临界区 */
struct node* new = (struct node *)malloc(sizeof(struct node));
new->num = rand()%100;
new->next = head; // 使用头插法的方式
head = new;
printf("producer num = %d, tid = %ld\n", new->num, pthread_self());
// 释放互斥锁
pthread_mutex_unlock(&mutex);
sleep(rand()%3); // 挂起,用于进行上下文切换
}
return NULL;
}
/* 消费者的回调函数 */
void *consumer_callback(void *arg)
{
while(1) {
// 访问互斥锁并上锁
pthread_mutex_lock(&mutex);
while(head == NULL) { // 当头节点为空时,表示没有商品
pthread_cond_wait(&cond, &mutex);
}
struct node *getn = head;
head = getn->next;
printf("consumer num = %d, tid = %ld\n", getn->num, pthread_self());
free(getn); // 释放获取的节点
// 释放互斥锁
pthread_mutex_unlock(&mutex);
sleep(rand()%3);
}
return NULL;
}
int main()
{
int i;
// 定义生产者与消费者的线程
pthread_t ptid[5];
pthread_t ctid[5];
// 初始化互斥锁
pthread_mutex_init(&mutex, NULL);
// 初始化条件变量
pthread_cond_init(&cond, NULL);
// 初始化生产者与消费者线程
for(i = 0; i < 5; i++) {
pthread_create(&ptid[i], NULL, producer_callback, NULL);
pthread_create(&ctid[i], NULL, consumer_callback, NULL);
}
// 等待数据结束
for(i = 0; i < 5; i++) {
pthread_join(ptid[i], NULL);
pthread_join(ctid[i], NULL);
}
// 销毁条件变量
pthread_cond_destroy(&cond);
// 销毁互斥锁
pthread_mutex_destroy(&mutex);
return 0;
}
信号量
信号量函数
信号量用在多线程多任务同步的,一个线程完成了某一个动作就通过信号量告诉别的线程,别的线程再进行某些操作。信号量不一定是锁定某个线程。
信号量理解为信号灯,与互斥量不同的点在于“灯“的概念,灯亮表示资源可能,信号量主要阻塞线程,但不能保证线程的绝对安全,所以一般信号量配置互斥量来实现。
信号量和条件变量一样,一般用于消费者与生产者模型,用于阻塞生产者线程或者消费者线程的运行,信号量的数据类型为sem_t。
#include <semaphore.h>
// 初始化信号量
sem_t sem;
int sem_init(sem_t *sem, int pshared, unsigned int value);
// 销毁信号量
sem_destory(sem_t *sem);
- 参数
- sem:表示信号量变量地址
- pshared:0表示线程同步,1表示进程同步
- value:表示初始化当前信号量拥有的资源数(>= 0),如果资源为0那么线程/进程就阻塞了。
int sem_wait(sem_t *sem);
int sem_trywait(sem_t *sem);
int sem_timewair(sem_t *sem, struct timespec *abs_timeout);
上面三个函数均使用来访问信号量的,如果sem的资源>0,线程不会阻塞,线程会占用一个资源,因此信号量-1;如果sem为0,第一个函数会阻塞,第二个函数不会阻塞,直接返回错误号,第三个函数会堵塞一定的时长。
// 释放信号量,使信号量+1
int sem_post(sem_t *sem);
// 获取当前信号量的资源数
int sem_getvalue(sem_t *sem, int *sval);
通过上面函数可以查看sem拥有的资源个数,通过第二个参数sval将数据传出。
信号量的使用
当信号量初始化资源为1时
使用两个信号量,一个表示消费者,一个表示生产者的资源
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <pthread.h>
#include <semaphore.h>
/* 创建数据链表 */
struct node {
int num;
struct node *next;
};
// 定义信号量
sem_t psem, csem; // 生产者、消费者信号量
// 定义互斥锁变量
pthread_mutex_t mutex;
// 定义链表头节点
struct node *head = NULL;
/* 生产者的回调函数 */
void *producer_callback(void *arg)
{
while(1) {
// 获取信号量资源
sem_wait(&psem);
/* 临界区 */
struct node* new = (struct node *)malloc(sizeof(struct node));
new->num = rand()%100;
new->next = head; // 使用头插法的方式
head = new;
printf("producer num = %d, tid = %ld\n", new->num, pthread_self());
// 释放消费者资源
sem_post(&csem);
sleep(rand()%3); // 挂起,用于进行上下文切换
}
return NULL;
}
/* 消费者的回调函数 */
void *consumer_callback(void *arg)
{
while(1) {
// 获取信号量资源
sem_wait(&csem);
struct node *getn = head;
head = getn->next;
printf("consumer num = %d, tid = %ld\n", getn->num, pthread_self());
free(getn); // 释放获取的节点
// 释放生产者资源,让其继续生产
sem_post(&psem);
sleep(rand()%3);
}
return NULL;
}
int main()
{
int i;
// 定义生产者与消费者的线程
pthread_t ptid[5];
pthread_t ctid[5];
// 初始化互斥锁(先不删)
// pthread_mutex_init(&mutex, NULL);
// 初始化信号量
sem_init(&psem, 0, 1); // 初始化生产者的信号量为1
sem_init(&csem, 0, 0); // 初始化消费者的信号量为0
// 初始化生产者与消费者线程
for(i = 0; i < 5; i++) {
pthread_create(&ptid[i], NULL, producer_callback, NULL);
pthread_create(&ctid[i], NULL, consumer_callback, NULL);
}
// 等待数据结束
for(i = 0; i < 5; i++) {
pthread_join(ptid[i], NULL);
pthread_join(ctid[i], NULL);
}
// 销毁信号量
sem_destroy(&psem);
sem_destroy(&csem);
// 销毁互斥锁
// pthread_mutex_destroy(&mutex);
return 0;
}
当信号量初始化资源大于1时
这里要使用信号量+互斥锁的方式,并且要注意方式死锁。
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <pthread.h>
#include <semaphore.h>
/* 创建数据链表 */
struct node {
int num;
struct node *next;
};
// 定义信号量
sem_t psem, csem; // 生产者、消费者信号量
// 定义互斥锁变量
pthread_mutex_t mutex;
// 定义链表头节点
struct node *head = NULL;
/* 生产者的回调函数 */
void *producer_callback(void *arg)
{
while(1) {
// 获取信号量资源
sem_wait(&psem);
// 获取互斥锁
// 这是互斥锁必须放在信号量的下面,因为先获取互斥锁的时候,但信号量资源为0的时候挂起了,这时候就会陷入死锁
pthread_mutex_lock(&mutex);
/* 临界区 */
struct node* new = (struct node *)malloc(sizeof(struct node));
new->num = rand()%100;
new->next = head; // 使用头插法的方式
head = new;
printf("producer num = %d, tid = %ld\n", new->num, pthread_self());
// 释放互斥锁
pthread_mutex_unlock(&mutex);
// 释放消费者资源
sem_post(&csem);
sleep(rand()%3); // 挂起,用于进行上下文切换
}
return NULL;
}
/* 消费者的回调函数 */
void *consumer_callback(void *arg)
{
while(1) {
// 获取信号量资源
sem_wait(&csem);
// 获取互斥锁
// 这是互斥锁必须放在信号量的下面,因为先获取互斥锁的时候,但信号量资源为0的时候挂起了,这时候就会陷入死锁
pthread_mutex_lock(&mutex);
struct node *getn = head;
head = getn->next;
printf("consumer num = %d, tid = %ld\n", getn->num, pthread_self());
free(getn); // 释放获取的节点
// 释放互斥锁
pthread_mutex_unlock(&mutex);
// 释放生产者资源,让其继续生产
sem_post(&psem);
sleep(rand()%3);
}
return NULL;
}
int main()
{
int i;
// 定义生产者与消费者的线程
pthread_t ptid[5];
pthread_t ctid[5];
// 初始化互斥锁(先不删)
pthread_mutex_init(&mutex, NULL);
// 初始化信号量
sem_init(&psem, 0, 5); // 初始化生产者的信号量为1
sem_init(&csem, 0, 0); // 初始化消费者的信号量为0
// 初始化生产者与消费者线程
for(i = 0; i < 5; i++) {
pthread_create(&ptid[i], NULL, producer_callback, NULL);
pthread_create(&ctid[i], NULL, consumer_callback, NULL);
}
// 等待数据结束
for(i = 0; i < 5; i++) {
pthread_join(ptid[i], NULL);
pthread_join(ctid[i], NULL);
}
// 销毁信号量
sem_destroy(&psem);
sem_destroy(&csem);
// 销毁互斥锁
pthread_mutex_destroy(&mutex);
return 0;
}
鸣谢苏丙榅 大丙老师