์ง๋ ์๊ฐ์์๋ ์ค๋ ๋ ์ฌ์ฉํ ๋ ์ฃผ์ํ ์ ๊ณผ ์ค๋ ๋์ ์ค์ ์ฌ์ฉ ์์๋ฅผ ์ดํด๋ดค์ต๋๋ค. [C++] 21. ์ค๋ ๋ ๊ฒฝ์ ์ํ(Race condition), ๋ฎคํ ์ค(mutex), ๋ฐ๋๋ฝ, std::conditional_variable, std::async
์ด๋ฒ ์๊ฐ์๋ ์ค๋ ๋ ํจํค์ง์ ๋ง์ง๋ง ์๊ฐ, ์ค๋ ๋ ํ์ ๋ํด์ ์ดํด๋ณด์์ฃ !
์์ฐ์-์๋น์ ํจํด(Producer-Consumer)
Producer-Consumer ํจํด์ ์ํํธ์จ์ด ๋์์ธ์์ ์ฌ์ฉ๋๋ ์ค์ํ ๋์์ธ ํจํด ์ค ํ๋๋ก, ๋ค๋ฅธ ์ปดํฌ๋ํธ ๊ฐ์ ์์
์ ๋ถ๋ฆฌํ๊ณ ์กฐ์ ํ๋ ๋ฐ ์ฌ์ฉ๋ฉ๋๋ค. ์ด ํจํด์ ๋ค์์ ์์ฐ์(Producer)์ ์๋น์(Consumer) ๊ฐ์ ํ๋ ฅ์ ํจ๊ณผ์ ์ผ๋ก ๊ด๋ฆฌํ๋ ๋ฐ ์ฌ์ฉ๋ฉ๋๋ค.
์์ฐ์ ์๋น์ ํจํด์ ๋ฐ์ดํฐ์ ์ง์
(income)๊ณผ ์ฒ๋ฆฌ(process)๊ฐ ๋ณํ์ ์ผ๋ก ์งํ๋ ๋ ์ฌ์ฉํฉ๋๋ค. ์๋ฅผ ๋ค์ด, ํ ์ค๋ ๋์์ ๋ฐ์ดํฐ๋ฅผ ์์ฐ(or ์ด๋ฒคํธ ์์ )ํ๊ณ ๋ค๋ฅธ ์ค๋ ๋์์ ์ด ๋ฐ์ดํฐ๋ฅผ ์ฒ๋ฆฌํ๋ ๊ฒฝ์ฐ์ ์ ์ฉํฉ๋๋ค.
์์ฐ์-์๋น์ ํจํด์ ๊ตฌ์ฑ ์์์ ์ญํ
1. ์์ฐ์(Producer): ๋ฐ์ดํฐ๋ฅผ ์์ฑํ๊ณ ๊ณต์ ๋ฐ์ดํฐ ๊ตฌ์กฐ(์: ํ ๋๋ ๋ฒํผ)์ ๋ฐ์ดํฐ๋ฅผ ๋ฃ์ต๋๋ค.
2. ์๋น์(Consumer): ๊ณต์ ๋ฐ์ดํฐ ๊ตฌ์กฐ์์ ๋ฐ์ดํฐ๋ฅผ ๊ฐ์ ธ์ ์ฒ๋ฆฌํฉ๋๋ค.
3. ๊ณต์ ๋ฐ์ดํฐ ๊ตฌ์กฐ(Shared Data Structure): ์์ฐ์์ ์๋น์๊ฐ ๋ฐ์ดํฐ๋ฅผ ์ฃผ๊ณ ๋ฐ๋ ๊ณตํต์ ์ ์ฅ์์
๋๋ค. ์ด ๊ตฌ์กฐ๋ ์ค๋ ๋ ๊ฐ ๋๊ธฐํ๋ฅผ ๊ด๋ฆฌํ๊ณ ๋ฐ์ดํฐ๋ฅผ ์์ ํ๊ฒ ๋ณด๊ดํ๋ ์ญํ ์ ํฉ๋๋ค.
4. ๋๊ธฐํ ๋ฉ์ปค๋์ฆ(Synchronization Mechanism): ์ค๋ ๋ ๊ฐ์ ์์ ํ ๋ฐ์ดํฐ ๊ณต์ ๋ฅผ ์ํด ์ฌ์ฉ๋ฉ๋๋ค. ๋ํ์ ์ผ๋ก ๋ฎคํ
์ค(mutex)๋ ์ธ๋งํฌ์ด(semaphore)์ ๊ฐ์ ๋๊ธฐํ ๋๊ตฌ๋ฅผ ํ์ฉํฉ๋๋ค.
์์ฐ์-์๋น์ ํจํด์ ์ฅ์
- ์ฑ๋ฅ ํฅ์: ๋ฐ์ดํฐ ์์ฑ๊ณผ ์ฒ๋ฆฌ๋ฅผ ๋ณ๋ ฌ๋ก ์คํํ์ฌ ์์คํ
์ฑ๋ฅ์ ํฅ์์ํต๋๋ค.
- ๋ฐ์ดํฐ ์ฒ๋ฆฌ ๋ถ๋ฆฌ: ๋ฐ์ดํฐ ์์ฑ๊ณผ ์ฒ๋ฆฌ ๋ก์ง์ ๋ถ๋ฆฌํจ์ผ๋ก์จ ์ฝ๋ ์ ์ง ๊ด๋ฆฌ์ ํ์ฅ์ฑ์ ํฅ์์ํต๋๋ค.
- ์์ ๊ด๋ฆฌ: ๊ณต์ ์์์ ๋ํ ์์ ํ ์ ๊ทผ์ ๋ณด์ฅํ์ฌ ๊ฒฝํฉ ์กฐ๊ฑด(race condition) ๋ฐ ๋ฐ๋๋ฝ(deadlock) ๊ฐ์ ๋ค์ค ์ค๋ ๋ ๋ฌธ์ ๋ฅผ ๋ฐฉ์งํฉ๋๋ค.
Producer-Consumer ํจํด์ ๋ค์ํ ์ธ์ด์ ํ๊ฒฝ์์ ๊ตฌํ๋ ์ ์์ผ๋ฉฐ, ๋ฉํฐ์ค๋ ๋ฉ, ๋ฉํฐํ๋ก์ธ์ฑ, ๋ถ์ฐ ์์คํ
๋ฑ์์ ์ฌ์ฉ๋ฉ๋๋ค. ์ด ํจํด์ ์ ์ดํดํ๊ณ ํ์ฉํ๋ฉด ๋ณต์กํ ๋ค์ค ์ค๋ ๋ ํ๊ฒฝ์์๋ ์์ ์ ์ด๊ณ ํจ์จ์ ์ธ ์ํํธ์จ์ด๋ฅผ ๊ฐ๋ฐํ๋ ๋ฐ ๋์์ด ๋ฉ๋๋ค.
์ค๋ ๋ํ(Thread Pool)
์ค๋ ๋ ํ(Thread Pool)์ ๋ค์ค ์ค๋ ๋ ํ๊ฒฝ์์ ์์
์ ๊ด๋ฆฌํ๊ณ ์คํํ๋ ๋ฐ ์ฌ์ฉ๋๋ ์ํํธ์จ์ด ์คํฌ์
๋๋ค. ์ฃผ๋ก ๋ฉํฐ์ค๋ ๋ ์๋ฒ ์ ํ๋ฆฌ์ผ์ด์
๊ณผ ๊ฐ์ด ๋์์ ์ฌ๋ฌ ์์
์ ์ฒ๋ฆฌํด์ผ ํ๋ ์ํฉ์์ ์ ์ฉํ๊ฒ ํ์ฉ๋ฉ๋๋ค. ์์ ์๊ฐํ ์์ฐ์-์๋น์ ํจํด์ ์ค๋ ๋ํ๋ก ๊ตฌํํ ์์ ์
๋๋ค.
์ค๋ ๋ ํ์ ๊ตฌ์ฑ ์์
1. ์ค๋ ๋ ์งํฉ(workers): ์ค๋ ๋ ํ์ ๋ฏธ๋ฆฌ ์์ฑ๋ ์ค๋ ๋ ์งํฉ(workers)์ ๊ฐ์ง๊ณ ์์ต๋๋ค. ์ด ์ค๋ ๋๋ค์ ์์
์ ์คํํ ์ค๋น๊ฐ ๋์ด ์์ต๋๋ค. ์ค๋ ๋ ํ์ ํฌ๊ธฐ๋ ์ผ๋ฐ์ ์ผ๋ก ์ค์ ๊ฐ๋ฅํ๋ฉฐ, ์ด ํฌ๊ธฐ๋ ์์คํ
์์๊ณผ ์์
๋ถํ์ ๋ฐ๋ผ ์กฐ์ ํ ์ ์์ต๋๋ค.
2. ์์
ํ(job queue): ์ค๋ ๋ ํ์ ์์
์ ์ ์ฅํ๋ ํ๋ฅผ ๊ด๋ฆฌํฉ๋๋ค. ์ด ํ๋ ์ค๋ ๋ ํ์ ์ ์ถ๋ ์์
๋ค์ ์ ์ฅํ๊ณ ๋๊ธฐ์ํต๋๋ค. ์์
์ ํ์ ์ถ๊ฐ๋๋ฉด ์ค๋ ๋ ํ ๋ด์ ์ค๋ ๋๋ค์ด ์์๋๋ก ์์
์ ๊ฐ์ ธ์ ์คํํฉ๋๋ค.
3. ์์
์ ์ถ ์ธํฐํ์ด์ค(inqueue): ํด๋ผ์ด์ธํธ ์ ํ๋ฆฌ์ผ์ด์
์ ์ค๋ ๋ ํ์ ์์
์ ์ ์ถํ๋ ์ธํฐํ์ด์ค๋ฅผ ์ฌ์ฉํ์ฌ ์์
์ ์ค๋ ๋ ํ์ ์ถ๊ฐํ ์ ์์ต๋๋ค. ์์
์ ์ค๋ ๋ ํ์ ์ ์ถ๋๋ฉด ํ์ ์ถ๊ฐ๋์ด ์ค๋ ๋ ํ ๋ด์ ์ค๋ ๋๋ค์ด ์ฐจ๋ก๋๋ก ์ฒ๋ฆฌํฉ๋๋ค.
์ค๋ ๋ ํ ๊ตฌํ
๋จผ์ ์ค๋ ๋ ํ์ ๊ตฌํํ๊ธฐ ์ํ ๊ธฐ์ ์ std::thread, std::condition_variable, std::mutex, std::queue, std::vector, template ์ ๋๊ฐ ์์ต๋๋ค. ์ด๋ณด์๋ถ๋ค์ ๊ตฌํํ๊ธฐ ์ฝ์ง ์์ง๋ง ์ค๋ ๋ ํ์ ๊ฒฝ์ฐ ํ ๋ฒ๋ง ์ดํดํ๊ณ ์์ฑํ๋ค๋ฉด ์ด๋์๋ ์ฌ์ฉ ๊ฐ๋ฅํ ๋ฌด๊ธฐ๊ฐ ๋ ๊ฒ์ ๋๋ค.
#include <iostream>
#include <thread>
#include <vector>
#include <queue>
#include <functional>
#include <mutex>
#include <condition_variable>
class ThreadPool {
public:
ThreadPool(size_t numThreads) : stop(false) {
for (size_t i = 0; i < numThreads; ++i) {
workers.emplace_back([this] {
while (true) {
std::function<void()> task;
{
std::unique_lock<std::mutex> lock(queueMutex);
condition.wait(lock, [this] { return stop || !tasks.empty(); });
if (stop && tasks.empty()) {
return;
}
task = std::move(tasks.front());
tasks.pop();
}
task();
}
});
}
}
template <class F>
void enqueue(F&& func) {
{
std::unique_lock<std::mutex> lock(queueMutex);
tasks.emplace(std::forward<F>(func));
}
condition.notify_one();
}
~ThreadPool() {
{
std::unique_lock<std::mutex> lock(queueMutex);
stop = true;
}
condition.notify_all();
for (std::thread& worker : workers) {
worker.join();
}
}
private:
std::vector<std::thread> workers;
std::queue<std::function<void()>> tasks;
std::mutex queueMutex;
std::condition_variable condition;
bool stop;
};
int main() {
ThreadPool pool(10); // 10๊ฐ์ ์ค๋ ๋๋ฅผ ๊ฐ์ง ์ค๋ ๋ ํ ์์ฑ
// ์์
์ ์ค๋ ๋ ํ์ ์ถ๊ฐ
for (int i = 0; i < 30; ++i) {
pool.enqueue([i] {
std::cout << "Task " << i << " executed by thread " << std::this_thread::get_id() << std::endl;
});
}
// ๋ชจ๋ ์์
์ด ์๋ฃ๋ ๋๊น์ง ๋๊ธฐ
std::this_thread::sleep_for(std::chrono::seconds(2));
return 0;
}
๊ฐ๋จํ ์ค๋ ๋ํ์ ๊ตฌํํ ์์์ ๋๋ค. ์ค๋ ๋ํ์ ์์ฑ์์๋ ์์ฑํ ์ค๋ ๋์ ๊ฐ์๋ฅผ ์ ๋ ฅ๋ฐ์์ ๊ฐ์๋งํผ ์ค๋ ๋๋ฅผ ๋ง๋ญ๋๋ค. ์ค๋ ๋๊ฐ ์คํ๋๋ ํจ์๋ ๋๋คํจ์๋ก ์ฆ์ ๋ง๋ค์ด์ ํธ์ถํ์ต๋๋ค. ์์ฑ๋ ์ค๋ ๋์์๋ notify๋ฅผ ๋๊ธฐํ๊ณ ์๋ค๊ฐ notify๊ฐ ๋๋ฉด ์กํ(Job-queue)์์ task๋ฅผ ๋นผ์(front์ pop) ์ฒ๋ฆฌํ๋ ๋ฐฉ์์ ๋๋ค. ์์ฐ์ ๋ถ๋ถ(enqueue ํจ์)์์๋ ํ์ ์คํ๋ ๋ด์ฉ์ ๋ฃ๊ณ ๊ธฐ๋ค๋ฆฌ๊ณ ์๋ workers(๋๊ธฐ ์ค๋ ๋)์๊ฒ notify ํต์ง๋ฅผ ๋ ๋ฆฝ๋๋ค.
์ค๋ ๋ ํ์ ์๋ฉธ์์์๋ ๋๊ธฐํ๊ณ ์๋ ๋ชจ๋ ์ค๋ ๋๋ค์ ๊นจ์์ joinํ๊ธฐ ์ํด stopํ๋๊ทธ๋ฅผ ์ผฌ๊ณผ ๋์์ notify_all์ ์คํํฉ๋๋ค. ์ดํ ๋ชจ๋ workers์ ๋ํด join์ผ๋ก ์ค๋ ๋๋ฅผ ์๋ฉธ์ํต๋๋ค. ์ ์์ ์ Task์์๋ ๊ฐ๋จํ ์ค๋ ๋ id์ ํจ๊ป ๋ฉ์ธ์ง๋ฅผ ์ถ๋ ฅํ์ต๋๋ค. 10๊ฐ์ ์ค๋ ๋๋ฅผ ์์ฑ์์ผฐ๊ณ , 30๊ฐ์ task๋ฅผ ์คํ์์ผฐ์ต๋๋ค.
๋ง์ฝ ์ฌ๋ฌ๋ถ์ด ๋๊ท๋ชจ ์๋น์ค๋ฅผ ์ด์ํ๋ค๊ณ ๊ฐ์ ํ์ ๋, ๋์์ ๋ค์ด์ค๋ ์์ฒญ์ด ๋ง์ ๊ฒ์ ๋๋ค. ์ด์ ๋ํด์ ๋น ๋ฅด๊ฒ ์ฒ๋ฆฌํ๊ธฐ ์ํด์๋ ๋ณ๋ ฌ์ฒ๋ฆฌ๋ ํ์์ ๋๋ค. ๊ทธ๋ฆฌ๊ณ ํ๋ก์ธ์ค์ ์์์ ๊ณ ๋ คํ์ ๋ ์ค๋ ๋ ํ์ด ์ด๋ค ๊ฒ ๋ณด๋ค๋ ํจ๊ณผ์ ์ ๋๋ค. ๋๊ท๋ชจ ์์ฒญ์ด ๋ค์ด์ค๋๋ผ๋, ์์ฐ์๋ Task๋ฅผ ํ์ ๋ฃ๊ธฐ๋ง ํ๊ณ , ์๋น์(workers)๊ฐ Task๋ฅผ ์ฒ๋ฆฌํจ์ผ๋ก์จ ์ด๋ฅผ ์ํํ ์ ์์ต๋๋ค. ์ด์ฒ๋ผ ์ค๋ ๋ํ์ ํ๋ก๊ทธ๋จ์ ์ฑ๋ฅ์ ํฅ์์ํค๊ณ ์์คํ ์์ ์ฑ์ ์ ์งํ๋ ๋ฐ ๋์์ด ๋ ์ ์์ต๋๋ค.
๋ค์ ํฌ์คํ ์์๋ ํ ํ๋ฆฟ(Template)์ ๋ํด ์์๋ณด๊ฒ ์ต๋๋ค^^
๋์์ด ๋์ จ๊ธธ ๋ฐ๋๊ป์!
๋๊ธ