[05์ฃผ์ฐจ] ๋๊ธฐํ ์ฐ์ฐ & CAS
- Lock-free ์๋ฃ๊ตฌ์กฐ
: ์ฌ๋ฌ ๊ฐ์ ์ฐ๋ ๋์์ ๋์์ ํธ์ถํ์ ๋์๋ ์ ํด์ง ๋จ์ ์๊ฐ๋ง๋ค ์ ์ด๋ ํ ๊ฐ์ ํธ์ถ์ด ์๋ฃ๋๋ ์๊ณ ๋ฆฌ์ฆ
: Non-blocking์ด ๋ณด์ฅ๋์ด์ผ ํจ
: ์๊ณ ๋ฆฌ์ฆ ๋ด์ Lock์ด ์์ผ๋ฉด ๋น์ฐํ Lock-free X, Lock์ด ์๋ค๊ณ ํด๋ ๋ฌด์กฐ๊ฑด Lock-free์ธ ๊ฒ์ ์๋
: Lock์ด ์กด์ฌํ๋ ๊ฒฝ์ฐ ๋ค๋ฅธ ์ฐ๋ ๋๋ฅผ ์คํํ ์ ์์ด ์ฑ๊ธ ์ฐ๋ ๋์ ๋ค๋ฅผ ๋ฐ๊ฐ ์์
→ wait-free๋ฅผ ์ ์งํ๋ฉฐ, ๋ฉ๋ชจ๋ฆฌ๋ก Atomic Memory๋ฅผ ๋ง๋ค ์ ์๋ ์๊ณ ๋ฆฌ์ฆ ์กด์ฌ?
: ์กด์ฌ, ์ด ๊ฐ์์์ ์๋ต.
→ wait-free๋ฅผ ์ ์งํ๋ฉฐ, ๊ธฐ์กด ์ฝ๊ธ ์ค๋ ๋ ์๋ฃ๊ตฌ์กฐ๋ Atomic Memory๋ฅผ ์ฌ์ฉํด ๋ฉํฐ์ฐ๋ ๋ ์๋ฃ๊ตฌ์กฐ๋ก ๋ณํ ๊ฐ๋ฅ?
: ๋ถ๊ฐ๋ฅํ๋ค. ๋์ค์ ์์ธํ ๋ค๋ฃธ.
: Atomic memory๋ฅผ ์ฌ์ฉ๋งํด์๋ ๋ ผ๋ธ๋กํน ์๋ฃ๊ตฌ์กฐ๋ฅผ ๋ง๋ค ์ ์์
→ wait-free๋ฅผ ์ ์งํ๋ฉฐ, ์ผ๋ฐ์ ์ธ ์๋ฃ๊ตฌ์กฐ๋ฅผ ๋ฉํฐ์ฐ๋ ๋ ์๋ฃ๊ตฌ์กฐ๋ก ๋ณํํ๊ธฐ ์ํด Atomic-memory๋ง๊ณ ๋ฌด์์ด ๋
ํ์?
: CompareAndSet() ์ฐ์ฐ์ด๋ฉด ์ถฉ๋ถํจ
- CompareAndSet
: ๋๊ธฐํ ์ฐ์ฐ์ ์ผ์ข
→ ๋๊ธฐํ ์ฐ์ฐ
: ๊ณต์ ๋ฉ๋ชจ๋ฆฌ์ ๋ํ ์ฐ์ฐ (ํ word์ ๋ํ ์ฐ์ฐ - ์ฝ๊ณ ์ฐ๊ธฐ ๋ฑ)
: ๋ค๋ฅธ ์ฐ๋ ๋์ ํต์ ํ๊ธฐ ์ํ ๊ธฐ๋ณธ ๊ธฐ๋ฅ
: CPU์ ๋ช ๋ น์ด๋ก ๊ตฌํ + Wait-free๋ก ๊ตฌํ๋์ด์ผ ํจ (๊ทธ๋ ์ง ์์ผ๋ฉด Non-Blocking ์๊ณ ๋ฆฌ์ฆ์์ ์ฌ์ฉํ ์ ์์)
• Load (wait-free)
• Store (wait-free)
• Lock/Unlock (blocking)
• atomic_thread_fence (wait-free)
• +=, fetch_and_add (wait-free)
+ CAS(Compare And Set, wait-free) ์ถ๊ฐ ํ์
: bool ๋ฉ๋ชจ๋ฆฌ::CAS(expected, update)
: ๋ฉ๋ชจ๋ฆฌ์ ๊ฐ์ด exptected๋ฉด update๋ก ๋ฐ๊พธ๊ณ true, ๋ง์ฝ expected๊ฐ ์๋๋ฉด false๋ฅผ return.
bool ๋ฉ๋ชจ๋ฆฌ::CAS(expected, update)
{
if (๋ฉ๋ชจ๋ฆฌ.value == expected) {
๋ฉ๋ชจ๋ฆฌ.value = update;
return true;
} else return false;
}
: atomic์ load/store๋ก ๊ตฌํํ ์ ์์ - wait-free ์กฐ๊ฑด ๋๋ฌธ
→ CAS๋ wait-free๋ฅผ ์ ์งํ๋ฉฐ ์ผ๋ฐ์ ์ธ ์๋ฃ๊ตฌ์กฐ๋ฅผ ๋ฉํฐ์ฐ๋ ๋ ์๋ฃ๊ตฌ์กฐ๋ก ์ด๋ป๊ฒ ๋ณํํ๋๊ฐ?
: ๋ชจ๋ ๊ธฐ์กด ์๋ฃ๊ตฌ์กฐ๋ฅผ wait-free multithread๋ก ๋ณํํด์ฃผ๋ ์๊ณ ๋ฆฌ์ฆ์ด ์กด์ฌ
: ํ์ค ) ๋นํจ์จ์
: ํจ์จ์ ์ธ ๋ณํ์ ์๋นํ ๋ ธ๋ ฅ์ ํ์๋ก ํจ
• ์ค์ CAS์ ๊ตฌํ
: atomic_compare_and_set ์๊ณ , atomic_compare_exchange๋ฅผ ๋์ ์ฌ์ฉ
: ๋ ผ๋ธ๋กํน ์๋ฃ๊ตฌ์กฐ๋ฅผ ๋ง๋๋ ๋ฐ ํ์ ์ฐ์ฐ
bool CAS(atomic_int *addr, int expected, int new_val)
{
return atomic_compare_exchange_strong(addr, &expected, new_val);
}
bool CAS(volatile int *addr, int expected, int new_val)
{
return atomic_compare_exchange_strong(reinterpret_cast<volatile atomic_int *>(addr),
&expected, new_val);
}
[ CAS๋ฅผ ์ด์ฉํ lock๊ณผ unlock๊ตฌํ, 1์ต ๋ง๋ค๊ธฐ ํ๋ก๊ทธ๋จ ]
volatile int X;
volatile int sum;
bool g_ready = false;
int g_data = 0;
std::mutex srm;
bool CAS(volatile int* addr, int expected, int new_value)
{
return std::atomic_compare_exchange_strong(reinterpret_cast<volatile std::atomic_int*>(addr), &expected, new_value);
}
void cas_lock()
{
while (false == CAS(&X, 0, 1));
}
// X = 1 ๋๊ฐ lock์ ์ป์๋ค. 0์ด๋ฉด ์๋ฌด๋ lock์ ์ป์ง ์์๋ค.
void cas_unlock()
{
X = 0;
}
void thread_func(int num_threads, int th_id)
{
for (int i = 0; i < 500'0000 / num_threads; ++i){
cas_lock();
sum += 2;
cas_unlock();
}
}
int main()
{
for (int num_threads = 1; num_threads <= 16; num_threads *= 2) {
sum = 0;
auto start_t = std::chrono::high_resolution_clock::now();
std::vector<std::thread> threads;
for (int i = 0; i < num_threads; ++i)
threads.emplace_back(thread_func, num_threads, i);
for (auto& th : threads)
th.join();
auto exec_t = std::chrono::high_resolution_clock::now() - start_t;
auto ms = std::chrono::duration_cast<std::chrono::milliseconds>(exec_t).count();
std::cout << "Thread : " << num_threads << ", sum = " << sum << ", Time = " << ms << "ms" << std::endl;
}
}
: CAS๋ฅผ ์คํํ ๋ ์ค์ง ํ๋์ ์ฐ๋ ๋์์๋ง true๋ฅผ ๋ฐํํ๋ฉด ๋จ
: ์ฐ๋ ๋๊ฐ ๋์ด๋ ์๋ก ์ฑ๋ฅ์ ๋จ์ด์ง๋ค, ์?
1) CAS๋ฅผ ์คํจํ์ ๊ฒฝ์ฐ ๊ณ์ํด์ CAS ํจ (mutex๋ณด๋ค๋ ์๋์ง๋ง, ์ค๋ฒํค๋๊ฐ ์กด์ฌํ๋ค)
2) ์ฐ๋ ๋๊ฐ ๋์ด๋๋ ๋งํผ ๊ณ์ํด์ CAS๋ฅผ ์งํํ๋ค