WEBKT

C++并发编程避坑指南:死锁场景分析与解决方案

216 0 0 0

C++并发编程避坑指南:死锁场景分析与解决方案

并发编程是C++中一个强大但复杂的领域。利用多线程可以显著提高程序的性能,但同时也引入了新的挑战,其中最令人头疼的就是死锁。想象一下,你的程序就像一群争抢资源的哲学家,如果处理不当,他们可能会陷入永久的等待,谁也无法继续工作,这就是死锁的真实写照。

本文将深入探讨C++并发编程中常见的死锁场景,并通过具体的代码示例和调试技巧,帮助你理解死锁的本质,并掌握避免死锁的有效策略。目标读者是需要编写高性能并发C++程序的开发者,无论你是初学者还是有一定经验的工程师,相信都能从中受益。

什么是死锁?

死锁是指两个或多个线程互相等待对方释放资源,导致所有线程都无法继续执行的情况。它就像交通堵塞,每辆车都在等待其他车辆让路,结果谁也走不了。

死锁的发生必须满足以下四个必要条件,也称为 Coffman 条件

  1. 互斥条件(Mutual Exclusion): 资源必须以独占方式被持有。一个线程占用资源后,其他线程必须等待,直到该资源被释放。
  2. 占有并等待条件(Hold and Wait): 线程必须占有至少一个资源,并且等待获取其他线程占有的资源。
  3. 不可剥夺条件(No Preemption): 线程已经获得的资源,在未使用完毕之前,不能被其他线程强行剥夺,只能由占有它的线程主动释放。
  4. 循环等待条件(Circular Wait): 存在一个线程等待资源的环路,例如,线程A等待线程B占有的资源,线程B等待线程C占有的资源,线程C等待线程A占有的资源。

只有当这四个条件同时满足时,才会发生死锁。因此,要避免死锁,只需要破坏其中一个或多个条件即可。

常见的死锁场景

接下来,我们分析几个C++并发编程中常见的死锁场景,并提供相应的代码示例。

1. 哲学家就餐问题

这是一个经典的并发问题,可以很好地说明死锁的原理。假设有五位哲学家围坐在一张圆桌旁,每两位哲学家之间都放有一根筷子。哲学家们思考一段时间后会饿,饿了就要拿起左右两边的筷子才能吃饭。吃完饭后,他们会放下筷子继续思考。

问题在于,如果所有哲学家同时拿起自己左边的筷子,那么他们每个人都在等待右边的哲学家放下筷子,导致所有人都在互相等待,谁也无法吃饭,这就是死锁。

#include <iostream>
#include <thread>
#include <mutex>
#include <vector>

class Philosopher {
public:
    Philosopher(int id, std::mutex& left_chopstick, std::mutex& right_chopstick) :
        id_(id), left_chopstick_(left_chopstick), right_chopstick_(right_chopstick) {}

    void eat() {
        // 模拟思考
        std::this_thread::sleep_for(std::chrono::milliseconds(100));

        // 尝试拿起左边的筷子
        left_chopstick_.lock();
        std::cout << "Philosopher " << id_ << " picked up left chopstick." << std::endl;

        // 模拟思考
        std::this_thread::sleep_for(std::chrono::milliseconds(100));

        // 尝试拿起右边的筷子
        right_chopstick_.lock();
        std::cout << "Philosopher " << id_ << " picked up right chopstick." << std::endl;

        // 吃饭
        std::cout << "Philosopher " << id_ << " is eating." << std::endl;
        std::this_thread::sleep_for(std::chrono::milliseconds(100));

        // 放下筷子
        right_chopstick_.unlock();
        std::cout << "Philosopher " << id_ << " put down right chopstick." << std::endl;
        left_chopstick_.unlock();
        std::cout << "Philosopher " << id_ << " put down left chopstick." << std::endl;
    }

private:
    int id_;
    std::mutex& left_chopstick_;
    std::mutex& right_chopstick_;
};

int main() {
    int num_philosophers = 5;
    std::vector<std::mutex> chopsticks(num_philosophers);
    std::vector<Philosopher> philosophers;

    // 创建哲学家,注意最后一个哲学家的左右筷子顺序
    for (int i = 0; i < num_philosophers; ++i) {
        philosophers.emplace_back(i, chopsticks[i], chopsticks[(i + 1) % num_philosophers]);
    }

    std::vector<std::thread> threads;
    for (int i = 0; i < num_philosophers; ++i) {
        threads.emplace_back(&Philosopher::eat, &philosophers[i]);
    }

    for (auto& thread : threads) {
        thread.join();
    }

    return 0;
}

分析:

在这个例子中,每个哲学家对应一个线程,每根筷子对应一个互斥锁。每个哲学家需要同时获得左右两边的筷子才能吃饭。如果所有哲学家同时尝试拿起左边的筷子,就会形成一个循环等待的局面,导致死锁。

运行结果:

你可能会发现程序卡住,没有任何输出,这就是死锁的现象。

2. 嵌套锁

当一个线程已经持有一个锁,并且尝试再次获取同一个锁时,就会发生死锁。这种情况通常发生在递归函数中,或者在同一个线程的不同部分重复使用同一个锁。

#include <iostream>
#include <thread>
#include <mutex>

std::mutex mtx;

void recursive_function(int n) {
    mtx.lock();
    std::cout << "Entering recursive_function with n = " << n << std::endl;
    if (n > 0) {
        recursive_function(n - 1);
    }
    std::cout << "Exiting recursive_function with n = " << n << std::endl;
    mtx.unlock();
}

int main() {
    std::thread t1(recursive_function, 5);
    t1.join();
    return 0;
}

分析:

在这个例子中,recursive_function 是一个递归函数,它在每次调用时都会尝试获取互斥锁 mtx。当 n 大于 0 时,函数会递归调用自身,但由于 mtx 已经被当前线程持有,因此递归调用会一直等待 mtx 被释放,从而导致死锁。

运行结果:

程序会卡在第一次 mtx.lock() 调用之后,没有任何输出。

3. 资源竞争与顺序问题

当多个线程需要访问多个共享资源时,如果它们获取资源的顺序不一致,也可能导致死锁。例如,线程A先获取资源1,再获取资源2;线程B先获取资源2,再获取资源1。如果线程A获取了资源1,线程B获取了资源2,那么线程A就会等待线程B释放资源2,线程B就会等待线程A释放资源1,从而导致死锁。

#include <iostream>
#include <thread>
#include <mutex>

std::mutex mtx1, mtx2;

void thread_A() {
    mtx1.lock();
    std::cout << "Thread A: acquired mtx1" << std::endl;
    std::this_thread::sleep_for(std::chrono::milliseconds(100)); // 模拟一些工作
    mtx2.lock();
    std::cout << "Thread A: acquired mtx2" << std::endl;

    // ... 使用资源 ...

    mtx2.unlock();
    mtx1.unlock();
}

void thread_B() {
    mtx2.lock();
    std::cout << "Thread B: acquired mtx2" << std::endl;
    std::this_thread::sleep_for(std::chrono::milliseconds(100)); // 模拟一些工作
    mtx1.lock();
    std::cout << "Thread B: acquired mtx1" << std::endl;

    // ... 使用资源 ...

    mtx1.unlock();
    mtx2.unlock();
}

int main() {
    std::thread t1(thread_A);
    std::thread t2(thread_B);

    t1.join();
    t2.join();

    return 0;
}

分析:

线程A和线程B分别尝试获取 mtx1mtx2 两个互斥锁,但是它们获取锁的顺序相反。如果线程A先获取了 mtx1,线程B先获取了 mtx2,那么线程A就会等待线程B释放 mtx2,线程B就会等待线程A释放 mtx1,从而导致死锁。

运行结果:

程序可能会卡住,或者输出类似以下信息:

Thread A: acquired mtx1
Thread B: acquired mtx2

然后程序就停止响应了。

如何避免死锁?

既然我们已经了解了死锁的成因和常见场景,那么接下来就讨论如何避免死锁。以下是一些常用的策略:

1. 避免循环等待:资源排序

破坏循环等待条件是避免死锁的常用方法。最简单的方法是对所有资源进行排序,并要求所有线程按照固定的顺序获取资源。这样可以避免线程之间形成循环等待的环路。

例如,在哲学家就餐问题中,我们可以给筷子编号,并规定哲学家必须先拿起编号较小的筷子,再拿起编号较大的筷子。这样可以保证不会出现循环等待的情况。

#include <iostream>
#include <thread>
#include <mutex>
#include <vector>

class Philosopher {
public:
    Philosopher(int id, std::mutex& first_chopstick, std::mutex& second_chopstick) :
        id_(id), first_chopstick_(first_chopstick), second_chopstick_(second_chopstick) {}

    void eat() {
        // 模拟思考
        std::this_thread::sleep_for(std::chrono::milliseconds(100));

        // 尝试拿起第一根筷子
        first_chopstick_.lock();
        std::cout << "Philosopher " << id_ << " picked up first chopstick." << std::endl;

        // 模拟思考
        std::this_thread::sleep_for(std::chrono::milliseconds(100));

        // 尝试拿起第二根筷子
        second_chopstick_.lock();
        std::cout << "Philosopher " << id_ << " picked up second chopstick." << std::endl;

        // 吃饭
        std::cout << "Philosopher " << id_ << " is eating." << std::endl;
        std::this_thread::sleep_for(std::chrono::milliseconds(100));

        // 放下筷子
        second_chopstick_.unlock();
        std::cout << "Philosopher " << id_ << " put down second chopstick." << std::endl;
        first_chopstick_.unlock();
        std::cout << "Philosopher " << id_ << " put down first chopstick." << std::endl;
    }

private:
    int id_;
    std::mutex& first_chopstick_;
    std::mutex& second_chopstick_;
};

int main() {
    int num_philosophers = 5;
    std::vector<std::mutex> chopsticks(num_philosophers);
    std::vector<Philosopher> philosophers;

    // 创建哲学家,注意最后一个哲学家的左右筷子顺序
    for (int i = 0; i < num_philosophers; ++i) {
        // 保证每个哲学家先拿起编号较小的筷子
        if (i < (i + 1) % num_philosophers) {
            philosophers.emplace_back(i, chopsticks[i], chopsticks[(i + 1) % num_philosophers]);
        } else {
            philosophers.emplace_back(i, chopsticks[(i + 1) % num_philosophers], chopsticks[i]);
        }
    }

    std::vector<std::thread> threads;
    for (int i = 0; i < num_philosophers; ++i) {
        threads.emplace_back(&Philosopher::eat, &philosophers[i]);
    }

    for (auto& thread : threads) {
        thread.join();
    }

    return 0;
}

在资源竞争与顺序问题中,也可以采用资源排序的方法:

#include <iostream>
#include <thread>
#include <mutex>

std::mutex mtx1, mtx2;

void thread_A() {
    // 始终先获取 mtx1,再获取 mtx2
    mtx1.lock();
    std::cout << "Thread A: acquired mtx1" << std::endl;
    std::this_thread::sleep_for(std::chrono::milliseconds(100)); // 模拟一些工作
    mtx2.lock();
    std::cout << "Thread A: acquired mtx2" << std::endl;

    // ... 使用资源 ...

    mtx2.unlock();
    mtx1.unlock();
}

void thread_B() {
    // 始终先获取 mtx1,再获取 mtx2
    mtx1.lock();
    std::cout << "Thread B: acquired mtx1" << std::endl;
    std::this_thread::sleep_for(std::chrono::milliseconds(100)); // 模拟一些工作
    mtx2.lock();
    std::cout << "Thread B: acquired mtx2" << std::endl;

    // ... 使用资源 ...

    mtx2.unlock();
    mtx1.unlock();
}

int main() {
    std::thread t1(thread_A);
    std::thread t2(thread_B);

    t1.join();
    t2.join();

    return 0;
}

2. 避免占有并等待:一次性获取所有资源

另一种避免死锁的方法是破坏占有并等待条件。线程在执行任务之前,一次性获取所有需要的资源。如果无法获取所有资源,就释放已经获取的资源,稍后再尝试。这样可以避免线程在持有部分资源的情况下,等待其他资源而导致死锁。

例如,在哲学家就餐问题中,可以修改策略,让哲学家在拿起筷子之前,先检查左右两边的筷子是否都可用。如果都可用,则同时拿起;否则,什么也不拿,稍后再尝试。

#include <iostream>
#include <thread>
#include <mutex>
#include <vector>

class Philosopher {
public:
    Philosopher(int id, std::mutex& left_chopstick, std::mutex& right_chopstick) :
        id_(id), left_chopstick_(left_chopstick), right_chopstick_(right_chopstick) {}

    void eat() {
        // 模拟思考
        std::this_thread::sleep_for(std::chrono::milliseconds(100));

        while (true) {
            // 尝试同时获取左右两边的筷子
            std::unique_lock<std::mutex> left_lock(left_chopstick_, std::defer_lock);
            std::unique_lock<std::mutex> right_lock(right_chopstick_, std::defer_lock);

            if (std::try_lock(left_lock, right_lock)) {
                std::cout << "Philosopher " << id_ << " picked up both chopsticks." << std::endl;

                // 吃饭
                std::cout << "Philosopher " << id_ << " is eating." << std::endl;
                std::this_thread::sleep_for(std::chrono::milliseconds(100));

                // 放下筷子
                right_lock.unlock();
                left_lock.unlock();
                std::cout << "Philosopher " << id_ << " put down both chopsticks." << std::endl;
                break;
            } else {
                // 获取筷子失败,稍后再尝试
                std::cout << "Philosopher " << id_ << " failed to pick up both chopsticks, retrying." << std::endl;
                std::this_thread::sleep_for(std::chrono::milliseconds(50));
            }
        }
    }

private:
    int id_;
    std::mutex& left_chopstick_;
    std::mutex& right_chopstick_;
};

int main() {
    int num_philosophers = 5;
    std::vector<std::mutex> chopsticks(num_philosophers);
    std::vector<Philosopher> philosophers;

    // 创建哲学家
    for (int i = 0; i < num_philosophers; ++i) {
        philosophers.emplace_back(i, chopsticks[i], chopsticks[(i + 1) % num_philosophers]);
    }

    std::vector<std::thread> threads;
    for (int i = 0; i < num_philosophers; ++i) {
        threads.emplace_back(&Philosopher::eat, &philosophers[i]);
    }

    for (auto& thread : threads) {
        thread.join();
    }

    return 0;
}

在这个修改后的版本中,我们使用了 std::try_lock 来尝试同时获取左右两边的筷子。如果获取成功,哲学家就可以吃饭;否则,哲学家会释放已经获取的筷子,并稍后再尝试。这样可以避免哲学家在只持有一根筷子的情况下等待另一根筷子,从而避免死锁。

3. 使用超时机制:try_lock

C++标准库提供了 std::mutex::try_lock 方法,它允许线程尝试获取锁,如果在指定的时间内无法获取锁,则立即返回,而不是一直等待。这可以有效地避免死锁。

#include <iostream>
#include <thread>
#include <mutex>
#include <chrono>

std::mutex mtx1, mtx2;

void thread_A() {
    if (mtx1.try_lock_for(std::chrono::milliseconds(100))) {
        std::cout << "Thread A: acquired mtx1" << std::endl;
        std::this_thread::sleep_for(std::chrono::milliseconds(100)); // 模拟一些工作
        if (mtx2.try_lock_for(std::chrono::milliseconds(100))) {
            std::cout << "Thread A: acquired mtx2" << std::endl;

            // ... 使用资源 ...

            mtx2.unlock();
        } else {
            std::cout << "Thread A: failed to acquire mtx2" << std::endl;
        }
        mtx1.unlock();
    } else {
        std::cout << "Thread A: failed to acquire mtx1" << std::endl;
    }
}

void thread_B() {
    if (mtx2.try_lock_for(std::chrono::milliseconds(100))) {
        std::cout << "Thread B: acquired mtx2" << std::endl;
        std::this_thread::sleep_for(std::chrono::milliseconds(100)); // 模拟一些工作
        if (mtx1.try_lock_for(std::chrono::milliseconds(100))) {
            std::cout << "Thread B: acquired mtx1" << std::endl;

            // ... 使用资源 ...

            mtx1.unlock();
        } else {
            std::cout << "Thread B: failed to acquire mtx1" << std::endl;
        }
        mtx2.unlock();
    } else {
        std::cout << "Thread B: failed to acquire mtx2" << std::endl;
    }
}

int main() {
    std::thread t1(thread_A);
    std::thread t2(thread_B);

    t1.join();
    t2.join();

    return 0;
}

在这个例子中,我们使用了 mtx1.try_lock_formtx2.try_lock_for 来尝试获取锁,并设置了超时时间为 100 毫秒。如果线程在 100 毫秒内无法获取锁,try_lock_for 会返回 false,线程可以根据返回值来判断是否获取锁成功,并采取相应的措施,例如释放已经获取的锁,或者稍后再尝试。

4. 使用锁的 RAII 封装:std::lock_guard 和 std::unique_lock

C++标准库提供了 std::lock_guardstd::unique_lock 两种锁的 RAII 封装,它们可以在构造函数中自动获取锁,在析构函数中自动释放锁。这可以有效地避免忘记释放锁而导致的死锁。

std::lock_guard 是一种简单的 RAII 封装,它在构造函数中获取锁,在析构函数中释放锁。它不允许手动释放锁,因此使用起来非常简单。

#include <iostream>
#include <thread>
#include <mutex>

std::mutex mtx;

void thread_function() {
    std::lock_guard<std::mutex> lock(mtx); // 自动获取锁
    std::cout << "Thread: acquired lock" << std::endl;

    // ... 使用资源 ...

    // 锁在函数结束时自动释放
}

int main() {
    std::thread t1(thread_function);
    t1.join();
    return 0;
}

std::unique_lock 是一种更灵活的 RAII 封装,它也提供了自动获取和释放锁的功能,但它还允许手动释放锁,以及延迟获取锁等高级功能。

#include <iostream>
#include <thread>
#include <mutex>

std::mutex mtx;

void thread_function() {
    std::unique_lock<std::mutex> lock(mtx); // 自动获取锁
    std::cout << "Thread: acquired lock" << std::endl;

    // ... 使用资源 ...

    lock.unlock(); // 手动释放锁
    std::cout << "Thread: released lock" << std::endl;

    // ... 执行其他操作 ...

    lock.lock(); // 再次获取锁
    std::cout << "Thread: acquired lock again" << std::endl;

    // 锁在函数结束时自动释放
}

int main() {
    std::thread t1(thread_function);
    t1.join();
    return 0;
}

5. 死锁检测和恢复

即使采取了各种预防措施,死锁仍然可能发生。因此,在一些复杂的系统中,需要实现死锁检测和恢复机制。死锁检测是指定期检查系统中是否存在死锁,如果检测到死锁,则采取一些措施来恢复系统,例如杀死一个或多个线程,或者回滚事务。

死锁检测和恢复的实现比较复杂,需要根据具体的系统架构和业务逻辑进行设计。通常可以使用图论算法来检测循环等待,并使用事务回滚或者资源抢占等技术来恢复系统。

调试死锁的技巧

当程序发生死锁时,调试起来可能会比较困难。以下是一些常用的调试技巧:

  1. 使用调试器: 使用GDB等调试器可以查看线程的状态,以及它们正在等待的资源。这可以帮助你快速定位死锁发生的位置。
  2. 添加日志: 在关键的代码段中添加日志,记录线程获取和释放锁的信息。这可以帮助你了解线程的执行顺序,以及它们之间的资源竞争关系。
  3. 使用可视化工具: 一些可视化工具可以帮助你更直观地了解线程之间的关系,以及它们正在等待的资源。例如,可以使用 Intel Inspector 等工具来检测死锁。
  4. 简化问题: 如果死锁发生在复杂的代码中,可以尝试将问题简化,例如减少线程的数量,或者减少资源的数量。这可以帮助你更容易地定位死锁的根源。

总结

死锁是并发编程中一个常见但棘手的问题。理解死锁的成因和常见场景,掌握避免死锁的有效策略,以及熟练掌握调试死锁的技巧,是编写健壮的并发程序的关键。希望本文能够帮助你更好地理解和解决死锁问题,写出更高效、更可靠的C++并发程序。

记住,预防胜于治疗。在编写并发程序时,一定要时刻注意潜在的死锁风险,并采取相应的预防措施。只有这样,才能避免死锁的发生,保证程序的稳定性和可靠性。

并发老司机 C++并发编程死锁

评论点评