WEBKT

C++并发编程避坑指南:死锁场景分析与解决方案

55 0 0 0

C++并发编程避坑指南:死锁场景分析与解决方案

什么是死锁?

常见的死锁场景

1. 哲学家就餐问题

2. 嵌套锁

3. 资源竞争与顺序问题

如何避免死锁?

1. 避免循环等待:资源排序

2. 避免占有并等待:一次性获取所有资源

3. 使用超时机制:try_lock

4. 使用锁的 RAII 封装:std::lock_guard 和 std::unique_lock

5. 死锁检测和恢复

调试死锁的技巧

总结

C++并发编程避坑指南:死锁场景分析与解决方案

并发编程是C++中一个强大但复杂的领域。利用多线程可以显著提高程序的性能,但同时也引入了新的挑战,其中最令人头疼的就是死锁。想象一下,你的程序就像一群争抢资源的哲学家,如果处理不当,他们可能会陷入永久的等待,谁也无法继续工作,这就是死锁的真实写照。

本文将深入探讨C++并发编程中常见的死锁场景,并通过具体的代码示例和调试技巧,帮助你理解死锁的本质,并掌握避免死锁的有效策略。目标读者是需要编写高性能并发C++程序的开发者,无论你是初学者还是有一定经验的工程师,相信都能从中受益。

什么是死锁?

死锁是指两个或多个线程互相等待对方释放资源,导致所有线程都无法继续执行的情况。它就像交通堵塞,每辆车都在等待其他车辆让路,结果谁也走不了。

死锁的发生必须满足以下四个必要条件,也称为 Coffman 条件

  1. 互斥条件(Mutual Exclusion): 资源必须以独占方式被持有。一个线程占用资源后,其他线程必须等待,直到该资源被释放。
  2. 占有并等待条件(Hold and Wait): 线程必须占有至少一个资源,并且等待获取其他线程占有的资源。
  3. 不可剥夺条件(No Preemption): 线程已经获得的资源,在未使用完毕之前,不能被其他线程强行剥夺,只能由占有它的线程主动释放。
  4. 循环等待条件(Circular Wait): 存在一个线程等待资源的环路,例如,线程A等待线程B占有的资源,线程B等待线程C占有的资源,线程C等待线程A占有的资源。

只有当这四个条件同时满足时,才会发生死锁。因此,要避免死锁,只需要破坏其中一个或多个条件即可。

常见的死锁场景

接下来,我们分析几个C++并发编程中常见的死锁场景,并提供相应的代码示例。

1. 哲学家就餐问题

这是一个经典的并发问题,可以很好地说明死锁的原理。假设有五位哲学家围坐在一张圆桌旁,每两位哲学家之间都放有一根筷子。哲学家们思考一段时间后会饿,饿了就要拿起左右两边的筷子才能吃饭。吃完饭后,他们会放下筷子继续思考。

问题在于,如果所有哲学家同时拿起自己左边的筷子,那么他们每个人都在等待右边的哲学家放下筷子,导致所有人都在互相等待,谁也无法吃饭,这就是死锁。

#include <iostream>
#include <thread>
#include <mutex>
#include <vector>
class Philosopher {
public:
Philosopher(int id, std::mutex& left_chopstick, std::mutex& right_chopstick) :
id_(id), left_chopstick_(left_chopstick), right_chopstick_(right_chopstick) {}
void eat() {
// 模拟思考
std::this_thread::sleep_for(std::chrono::milliseconds(100));
// 尝试拿起左边的筷子
left_chopstick_.lock();
std::cout << "Philosopher " << id_ << " picked up left chopstick." << std::endl;
// 模拟思考
std::this_thread::sleep_for(std::chrono::milliseconds(100));
// 尝试拿起右边的筷子
right_chopstick_.lock();
std::cout << "Philosopher " << id_ << " picked up right chopstick." << std::endl;
// 吃饭
std::cout << "Philosopher " << id_ << " is eating." << std::endl;
std::this_thread::sleep_for(std::chrono::milliseconds(100));
// 放下筷子
right_chopstick_.unlock();
std::cout << "Philosopher " << id_ << " put down right chopstick." << std::endl;
left_chopstick_.unlock();
std::cout << "Philosopher " << id_ << " put down left chopstick." << std::endl;
}
private:
int id_;
std::mutex& left_chopstick_;
std::mutex& right_chopstick_;
};
int main() {
int num_philosophers = 5;
std::vector<std::mutex> chopsticks(num_philosophers);
std::vector<Philosopher> philosophers;
// 创建哲学家,注意最后一个哲学家的左右筷子顺序
for (int i = 0; i < num_philosophers; ++i) {
philosophers.emplace_back(i, chopsticks[i], chopsticks[(i + 1) % num_philosophers]);
}
std::vector<std::thread> threads;
for (int i = 0; i < num_philosophers; ++i) {
threads.emplace_back(&Philosopher::eat, &philosophers[i]);
}
for (auto& thread : threads) {
thread.join();
}
return 0;
}

分析:

在这个例子中,每个哲学家对应一个线程,每根筷子对应一个互斥锁。每个哲学家需要同时获得左右两边的筷子才能吃饭。如果所有哲学家同时尝试拿起左边的筷子,就会形成一个循环等待的局面,导致死锁。

运行结果:

你可能会发现程序卡住,没有任何输出,这就是死锁的现象。

2. 嵌套锁

当一个线程已经持有一个锁,并且尝试再次获取同一个锁时,就会发生死锁。这种情况通常发生在递归函数中,或者在同一个线程的不同部分重复使用同一个锁。

#include <iostream>
#include <thread>
#include <mutex>
std::mutex mtx;
void recursive_function(int n) {
mtx.lock();
std::cout << "Entering recursive_function with n = " << n << std::endl;
if (n > 0) {
recursive_function(n - 1);
}
std::cout << "Exiting recursive_function with n = " << n << std::endl;
mtx.unlock();
}
int main() {
std::thread t1(recursive_function, 5);
t1.join();
return 0;
}

分析:

在这个例子中,recursive_function 是一个递归函数,它在每次调用时都会尝试获取互斥锁 mtx。当 n 大于 0 时,函数会递归调用自身,但由于 mtx 已经被当前线程持有,因此递归调用会一直等待 mtx 被释放,从而导致死锁。

运行结果:

程序会卡在第一次 mtx.lock() 调用之后,没有任何输出。

3. 资源竞争与顺序问题

当多个线程需要访问多个共享资源时,如果它们获取资源的顺序不一致,也可能导致死锁。例如,线程A先获取资源1,再获取资源2;线程B先获取资源2,再获取资源1。如果线程A获取了资源1,线程B获取了资源2,那么线程A就会等待线程B释放资源2,线程B就会等待线程A释放资源1,从而导致死锁。

#include <iostream>
#include <thread>
#include <mutex>
std::mutex mtx1, mtx2;
void thread_A() {
mtx1.lock();
std::cout << "Thread A: acquired mtx1" << std::endl;
std::this_thread::sleep_for(std::chrono::milliseconds(100)); // 模拟一些工作
mtx2.lock();
std::cout << "Thread A: acquired mtx2" << std::endl;
// ... 使用资源 ...
mtx2.unlock();
mtx1.unlock();
}
void thread_B() {
mtx2.lock();
std::cout << "Thread B: acquired mtx2" << std::endl;
std::this_thread::sleep_for(std::chrono::milliseconds(100)); // 模拟一些工作
mtx1.lock();
std::cout << "Thread B: acquired mtx1" << std::endl;
// ... 使用资源 ...
mtx1.unlock();
mtx2.unlock();
}
int main() {
std::thread t1(thread_A);
std::thread t2(thread_B);
t1.join();
t2.join();
return 0;
}

分析:

线程A和线程B分别尝试获取 mtx1mtx2 两个互斥锁,但是它们获取锁的顺序相反。如果线程A先获取了 mtx1,线程B先获取了 mtx2,那么线程A就会等待线程B释放 mtx2,线程B就会等待线程A释放 mtx1,从而导致死锁。

运行结果:

程序可能会卡住,或者输出类似以下信息:

Thread A: acquired mtx1
Thread B: acquired mtx2

然后程序就停止响应了。

如何避免死锁?

既然我们已经了解了死锁的成因和常见场景,那么接下来就讨论如何避免死锁。以下是一些常用的策略:

1. 避免循环等待:资源排序

破坏循环等待条件是避免死锁的常用方法。最简单的方法是对所有资源进行排序,并要求所有线程按照固定的顺序获取资源。这样可以避免线程之间形成循环等待的环路。

例如,在哲学家就餐问题中,我们可以给筷子编号,并规定哲学家必须先拿起编号较小的筷子,再拿起编号较大的筷子。这样可以保证不会出现循环等待的情况。

#include <iostream>
#include <thread>
#include <mutex>
#include <vector>
class Philosopher {
public:
Philosopher(int id, std::mutex& first_chopstick, std::mutex& second_chopstick) :
id_(id), first_chopstick_(first_chopstick), second_chopstick_(second_chopstick) {}
void eat() {
// 模拟思考
std::this_thread::sleep_for(std::chrono::milliseconds(100));
// 尝试拿起第一根筷子
first_chopstick_.lock();
std::cout << "Philosopher " << id_ << " picked up first chopstick." << std::endl;
// 模拟思考
std::this_thread::sleep_for(std::chrono::milliseconds(100));
// 尝试拿起第二根筷子
second_chopstick_.lock();
std::cout << "Philosopher " << id_ << " picked up second chopstick." << std::endl;
// 吃饭
std::cout << "Philosopher " << id_ << " is eating." << std::endl;
std::this_thread::sleep_for(std::chrono::milliseconds(100));
// 放下筷子
second_chopstick_.unlock();
std::cout << "Philosopher " << id_ << " put down second chopstick." << std::endl;
first_chopstick_.unlock();
std::cout << "Philosopher " << id_ << " put down first chopstick." << std::endl;
}
private:
int id_;
std::mutex& first_chopstick_;
std::mutex& second_chopstick_;
};
int main() {
int num_philosophers = 5;
std::vector<std::mutex> chopsticks(num_philosophers);
std::vector<Philosopher> philosophers;
// 创建哲学家,注意最后一个哲学家的左右筷子顺序
for (int i = 0; i < num_philosophers; ++i) {
// 保证每个哲学家先拿起编号较小的筷子
if (i < (i + 1) % num_philosophers) {
philosophers.emplace_back(i, chopsticks[i], chopsticks[(i + 1) % num_philosophers]);
} else {
philosophers.emplace_back(i, chopsticks[(i + 1) % num_philosophers], chopsticks[i]);
}
}
std::vector<std::thread> threads;
for (int i = 0; i < num_philosophers; ++i) {
threads.emplace_back(&Philosopher::eat, &philosophers[i]);
}
for (auto& thread : threads) {
thread.join();
}
return 0;
}

在资源竞争与顺序问题中,也可以采用资源排序的方法:

#include <iostream>
#include <thread>
#include <mutex>
std::mutex mtx1, mtx2;
void thread_A() {
// 始终先获取 mtx1,再获取 mtx2
mtx1.lock();
std::cout << "Thread A: acquired mtx1" << std::endl;
std::this_thread::sleep_for(std::chrono::milliseconds(100)); // 模拟一些工作
mtx2.lock();
std::cout << "Thread A: acquired mtx2" << std::endl;
// ... 使用资源 ...
mtx2.unlock();
mtx1.unlock();
}
void thread_B() {
// 始终先获取 mtx1,再获取 mtx2
mtx1.lock();
std::cout << "Thread B: acquired mtx1" << std::endl;
std::this_thread::sleep_for(std::chrono::milliseconds(100)); // 模拟一些工作
mtx2.lock();
std::cout << "Thread B: acquired mtx2" << std::endl;
// ... 使用资源 ...
mtx2.unlock();
mtx1.unlock();
}
int main() {
std::thread t1(thread_A);
std::thread t2(thread_B);
t1.join();
t2.join();
return 0;
}

2. 避免占有并等待:一次性获取所有资源

另一种避免死锁的方法是破坏占有并等待条件。线程在执行任务之前,一次性获取所有需要的资源。如果无法获取所有资源,就释放已经获取的资源,稍后再尝试。这样可以避免线程在持有部分资源的情况下,等待其他资源而导致死锁。

例如,在哲学家就餐问题中,可以修改策略,让哲学家在拿起筷子之前,先检查左右两边的筷子是否都可用。如果都可用,则同时拿起;否则,什么也不拿,稍后再尝试。

#include <iostream>
#include <thread>
#include <mutex>
#include <vector>
class Philosopher {
public:
Philosopher(int id, std::mutex& left_chopstick, std::mutex& right_chopstick) :
id_(id), left_chopstick_(left_chopstick), right_chopstick_(right_chopstick) {}
void eat() {
// 模拟思考
std::this_thread::sleep_for(std::chrono::milliseconds(100));
while (true) {
// 尝试同时获取左右两边的筷子
std::unique_lock<std::mutex> left_lock(left_chopstick_, std::defer_lock);
std::unique_lock<std::mutex> right_lock(right_chopstick_, std::defer_lock);
if (std::try_lock(left_lock, right_lock)) {
std::cout << "Philosopher " << id_ << " picked up both chopsticks." << std::endl;
// 吃饭
std::cout << "Philosopher " << id_ << " is eating." << std::endl;
std::this_thread::sleep_for(std::chrono::milliseconds(100));
// 放下筷子
right_lock.unlock();
left_lock.unlock();
std::cout << "Philosopher " << id_ << " put down both chopsticks." << std::endl;
break;
} else {
// 获取筷子失败,稍后再尝试
std::cout << "Philosopher " << id_ << " failed to pick up both chopsticks, retrying." << std::endl;
std::this_thread::sleep_for(std::chrono::milliseconds(50));
}
}
}
private:
int id_;
std::mutex& left_chopstick_;
std::mutex& right_chopstick_;
};
int main() {
int num_philosophers = 5;
std::vector<std::mutex> chopsticks(num_philosophers);
std::vector<Philosopher> philosophers;
// 创建哲学家
for (int i = 0; i < num_philosophers; ++i) {
philosophers.emplace_back(i, chopsticks[i], chopsticks[(i + 1) % num_philosophers]);
}
std::vector<std::thread> threads;
for (int i = 0; i < num_philosophers; ++i) {
threads.emplace_back(&Philosopher::eat, &philosophers[i]);
}
for (auto& thread : threads) {
thread.join();
}
return 0;
}

在这个修改后的版本中,我们使用了 std::try_lock 来尝试同时获取左右两边的筷子。如果获取成功,哲学家就可以吃饭;否则,哲学家会释放已经获取的筷子,并稍后再尝试。这样可以避免哲学家在只持有一根筷子的情况下等待另一根筷子,从而避免死锁。

3. 使用超时机制:try_lock

C++标准库提供了 std::mutex::try_lock 方法,它允许线程尝试获取锁,如果在指定的时间内无法获取锁,则立即返回,而不是一直等待。这可以有效地避免死锁。

#include <iostream>
#include <thread>
#include <mutex>
#include <chrono>
std::mutex mtx1, mtx2;
void thread_A() {
if (mtx1.try_lock_for(std::chrono::milliseconds(100))) {
std::cout << "Thread A: acquired mtx1" << std::endl;
std::this_thread::sleep_for(std::chrono::milliseconds(100)); // 模拟一些工作
if (mtx2.try_lock_for(std::chrono::milliseconds(100))) {
std::cout << "Thread A: acquired mtx2" << std::endl;
// ... 使用资源 ...
mtx2.unlock();
} else {
std::cout << "Thread A: failed to acquire mtx2" << std::endl;
}
mtx1.unlock();
} else {
std::cout << "Thread A: failed to acquire mtx1" << std::endl;
}
}
void thread_B() {
if (mtx2.try_lock_for(std::chrono::milliseconds(100))) {
std::cout << "Thread B: acquired mtx2" << std::endl;
std::this_thread::sleep_for(std::chrono::milliseconds(100)); // 模拟一些工作
if (mtx1.try_lock_for(std::chrono::milliseconds(100))) {
std::cout << "Thread B: acquired mtx1" << std::endl;
// ... 使用资源 ...
mtx1.unlock();
} else {
std::cout << "Thread B: failed to acquire mtx1" << std::endl;
}
mtx2.unlock();
} else {
std::cout << "Thread B: failed to acquire mtx2" << std::endl;
}
}
int main() {
std::thread t1(thread_A);
std::thread t2(thread_B);
t1.join();
t2.join();
return 0;
}

在这个例子中,我们使用了 mtx1.try_lock_formtx2.try_lock_for 来尝试获取锁,并设置了超时时间为 100 毫秒。如果线程在 100 毫秒内无法获取锁,try_lock_for 会返回 false,线程可以根据返回值来判断是否获取锁成功,并采取相应的措施,例如释放已经获取的锁,或者稍后再尝试。

4. 使用锁的 RAII 封装:std::lock_guard 和 std::unique_lock

C++标准库提供了 std::lock_guardstd::unique_lock 两种锁的 RAII 封装,它们可以在构造函数中自动获取锁,在析构函数中自动释放锁。这可以有效地避免忘记释放锁而导致的死锁。

std::lock_guard 是一种简单的 RAII 封装,它在构造函数中获取锁,在析构函数中释放锁。它不允许手动释放锁,因此使用起来非常简单。

#include <iostream>
#include <thread>
#include <mutex>
std::mutex mtx;
void thread_function() {
std::lock_guard<std::mutex> lock(mtx); // 自动获取锁
std::cout << "Thread: acquired lock" << std::endl;
// ... 使用资源 ...
// 锁在函数结束时自动释放
}
int main() {
std::thread t1(thread_function);
t1.join();
return 0;
}

std::unique_lock 是一种更灵活的 RAII 封装,它也提供了自动获取和释放锁的功能,但它还允许手动释放锁,以及延迟获取锁等高级功能。

#include <iostream>
#include <thread>
#include <mutex>
std::mutex mtx;
void thread_function() {
std::unique_lock<std::mutex> lock(mtx); // 自动获取锁
std::cout << "Thread: acquired lock" << std::endl;
// ... 使用资源 ...
lock.unlock(); // 手动释放锁
std::cout << "Thread: released lock" << std::endl;
// ... 执行其他操作 ...
lock.lock(); // 再次获取锁
std::cout << "Thread: acquired lock again" << std::endl;
// 锁在函数结束时自动释放
}
int main() {
std::thread t1(thread_function);
t1.join();
return 0;
}

5. 死锁检测和恢复

即使采取了各种预防措施,死锁仍然可能发生。因此,在一些复杂的系统中,需要实现死锁检测和恢复机制。死锁检测是指定期检查系统中是否存在死锁,如果检测到死锁,则采取一些措施来恢复系统,例如杀死一个或多个线程,或者回滚事务。

死锁检测和恢复的实现比较复杂,需要根据具体的系统架构和业务逻辑进行设计。通常可以使用图论算法来检测循环等待,并使用事务回滚或者资源抢占等技术来恢复系统。

调试死锁的技巧

当程序发生死锁时,调试起来可能会比较困难。以下是一些常用的调试技巧:

  1. 使用调试器: 使用GDB等调试器可以查看线程的状态,以及它们正在等待的资源。这可以帮助你快速定位死锁发生的位置。
  2. 添加日志: 在关键的代码段中添加日志,记录线程获取和释放锁的信息。这可以帮助你了解线程的执行顺序,以及它们之间的资源竞争关系。
  3. 使用可视化工具: 一些可视化工具可以帮助你更直观地了解线程之间的关系,以及它们正在等待的资源。例如,可以使用 Intel Inspector 等工具来检测死锁。
  4. 简化问题: 如果死锁发生在复杂的代码中,可以尝试将问题简化,例如减少线程的数量,或者减少资源的数量。这可以帮助你更容易地定位死锁的根源。

总结

死锁是并发编程中一个常见但棘手的问题。理解死锁的成因和常见场景,掌握避免死锁的有效策略,以及熟练掌握调试死锁的技巧,是编写健壮的并发程序的关键。希望本文能够帮助你更好地理解和解决死锁问题,写出更高效、更可靠的C++并发程序。

记住,预防胜于治疗。在编写并发程序时,一定要时刻注意潜在的死锁风险,并采取相应的预防措施。只有这样,才能避免死锁的发生,保证程序的稳定性和可靠性。

并发老司机 C++并发编程死锁

评论点评

打赏赞助
sponsor

感谢您的支持让我们更好的前行

分享

QRcode

https://www.webkt.com/article/9290