๋์์ฑ vs ๋ณ๋ ฌ์ฑ
๊ตฌ๋ถ | ๋์์ฑ (Concurrency) | ๋ณ๋ ฌ์ฑ (Parallelism) |
์๋ฏธ | ์ฌ๋ฌ ์์ ์ ๋ฒ๊ฐ์๊ฐ๋ฉฐ ์ฒ๋ฆฌ (๊ฒ์ผ๋ก๋ ๋์์) | ์ฌ๋ฌ ์์ ์ ์ค์ ๋ก ๋์์ ์ฒ๋ฆฌ |
์ฒ๋ฆฌ ๋ฐฉ์ | ํ๋์ ์ฝ์ด๊ฐ ์์ ์ ๋น ๋ฅด๊ฒ ์ ํํ๋ฉฐ ์ฒ๋ฆฌ | ์ฌ๋ฌ ์ฝ์ด๊ฐ ๊ฐ ์์ ์ ๋์์ ์คํ |
์์ | ์นดํ์์ ํผ์ ์ฃผ๋ฌธ ๋ฐ๊ณ , ๊ณ์ฐํ๊ณ , ์๋ฃ ๋ง๋ค๊ธฐ | ์นดํ์์ ์ธ ๋ช ์ด ๊ฐ๊ฐ ์ฃผ๋ฌธ, ๊ณ์ฐ, ์๋ฃ ๋ง๋ค๊ธฐ |
๋ชฉ์ | ์๋ต์ฑ ํฅ์, I/O ๋๊ธฐ ์๊ฐ ํ์ฉ | ์ฒ๋ฆฌ ์๋ ํฅ์ |
ํ์ด์ฌ | ์ค๋ ๋, ์ฝ๋ฃจํด (asyncio) | ํ์ ํ๋ก์ธ์ค, C ํ์ฅ, ์์คํ ์ฝ |
์๋ฐ์คํฌ๋ฆฝํธ | ์ด๋ฒคํธ ๋ฃจํ, ํ๋ก๋ฏธ์ค, async/await | Web Workers, Node.js์ Worker Threads |
GIL (Global Interupt Lock)
- ํ์ด์ฌ์๋ GIL์ด๋ผ๋ ์ ์ญ ์ธํฐํ๋ฆฌํฐ ๋ฝ์ด ์กด์ฌํจ
- ํ๋์ ์ค๋ ๋๋ง ํ์ด์ฌ ๋ฐ์ดํธ์ฝ๋๋ฅผ ์คํํ ์ ์์
- -> ๋ฉํฐ์ค๋ ๋๋ ๋์์ฑ์ ๊ฐ๋ฅํ์ง๋ง ๋ณ๋ ฌ์ฑ์ด ์ ํ์
- ๋ณ๋ ฌ ์ฒ๋ฆฌ๋ฅผ ์ํ๋ฉด ํ์ ํ๋ก์ธ์ค(์์ ํ๋ก์ธ์ค), C ํ์ฅ, ์์คํ ์ฝ์ ํ์ฉํ ์ ์๋ค.
Better way 52 ์์ ํ๋ก์ธ์ค๋ฅผ ๊ด๋ฆฌํ๊ธฐ ์ํด subprocess๋ฅผ ์ฌ์ฉํ๋ผ
- subprocess ๋ชจ๋์ ์ฌ์ฉํด ์์ ํ๋ก์ธ์ค๋ฅผ ์คํํ๊ณ ์ ๋ ฅ๊ณผ ์ถ๋ ฅ ์คํธ๋ฆผ์ ๊ด๋ฆฌ ํ ์ ์๋ค.
- ์์ ํ๋ก์ธ์ค๋ ํ์ด์ฌ ์ธํฐํ๋ฆฌํฐ์ ๋ณ๋ ฌ๋ก ์คํ๋๊ธฐ์ CPU ์ฝ์ด๋ฅผ ํ์ฉํ ์ ์๋ค.
- subprocess.run()์ ๋จ์ ์คํ
- subprocess.Popen()์ ํ์ดํ๋ผ์ธ ์ฐ๊ฒฐ ๋ฑ ๊ณ ๊ธ ์ ์ด
import subprocess
# ๋จ์ํ ๋ช
๋ น ์คํ - subprocess.run()
result = subprocess.run(['ls', '-l'], capture_output=True, text=True)
print("stdout:", result.stdout)
print("stderr:", result.stderr)
print("returncode:", result.returncode)
import subprocess
# ls์ grep์ ํ์ดํ๋ก ์ฐ๊ฒฐ
ls_proc = subprocess.Popen(['ls'], stdout=subprocess.PIPE)
grep_proc = subprocess.Popen(['grep', 'py'], stdin=ls_proc.stdout, stdout=subprocess.PIPE)
ls_proc.stdout.close() # ํ์ดํ ์ข
๋ฃ ์ ํธ
output, _ = grep_proc.communicate()
print(output.decode())
Better way 53 ๋ธ๋กํน I/O์ ๊ฒฝ์ฐ ์ค๋ ๋๋ฅผ ์ฌ์ฉํ๊ณ ๋ณ๋ ฌ์ฑ์ ํผํ๋ผ
import threading
import requests
def download(url):
response = requests.get(url)
print(f"{url}: {len(response.content)} bytes")
thread = threading.Thread(target=download, args=("https://example.com",))
thread.start()
Better way 54 ์ค๋ ๋์์ ๋ฐ์ดํฐ ๊ฒฝํฉ์ ํผํ๊ธฐ ์ํด Lock์ ์ฌ์ฉํ๋ผ
GIL (Global Interpreter Lock) ์ด ์กด์ฌํด์, ํ ์์ ์ ํ๋์ ์ค๋ ๋๋ง ๋ฐ์ดํธ์ฝ๋๋ฅผ ์คํํ ์ ์๋ค.
ํ์ง๋ง ์ฌ์ ํ ๊ณต์ ์์์ ๋ํ Race condition์ ๋ฐ์ํ ์ ์๋ค.
class Counter:
def __init__(self):
self.counter = 0
def increment(self, offset): # ์ฌ๊ธฐ์์ race condition์ด ๋ฐ์
value = self.counter
time.sleep(0.00001) # ๋๋ ์ด ์ถ๊ฐ
self.counter = value + offset
def worker(sensor_index, how_many, counter):
for _ in range(how_many):
counter.increment(1)
how_many = 1000 # ๋ฐ๋ณต ์ ๊ฒ
counter = Counter()
threads = []
for i in range(5):
thread = threading.Thread(target=worker, args=(i, how_many, counter))
threads.append(thread)
thread.start()
for thread in threads:
thread.join()
print(f"Counter should be {5 * how_many}, found {counter.counter}")
# ์ค์ ์ถ๋ ฅ๊ฐ - Counter should be 5000, found 1001
import threading
import time
class Counter:
def __init__(self):
self.counter = 0
self.lock = threading.Lock() # Lock ์ฌ์ฉ
def increment(self, offset):
with self.lock: # ๊ณต์ ์์ ์ ๊ทผ ์ Lock์ผ๋ก ๋ณดํธ
value = self.counter
time.sleep(0.00001) # ๋๋ ์ด ์์ด๋ ์์
self.counter = value + offset
def worker(sensor_index, how_many, counter):
for _ in range(how_many):
counter.increment(1)
how_many = 1000
counter = Counter()
threads = []
for i in range(5):
thread = threading.Thread(target=worker, args=(i, how_many, counter))
threads.append(thread)
thread.start()
for thread in threads:
thread.join()
print(f"Counter should be {5 * how_many}, found {counter.counter}")
# ์ค์ ์ถ๋ ฅ๊ฐ - Counter should be 5000, found 5000
Better Way 55 ~ 59 : ํ์ด์ฌ ๋ด๋ถ์ ๋์์ฑ ํจํค์ง
ํฌ์ธ (Fan-in)
- ์ฌ๋ฌ ์์ฐ์(source)๋ค์ด ๋ฐ์ดํฐ๋ฅผ ๋ชจ์์ ํ๋์ ์๋น์(consumer)์๊ฒ ์ ๋ฌํ๋ ๊ตฌ์กฐ
- ์ฌ๋ฌ ์ผ์ ๋ฐ์ดํฐ → ํ๋์ ์ฒ๋ฆฌ๊ธฐ
ํฌ์์ (Fan-in)
- ํ๋์ ์์ฐ์๊ฐ ์ฌ๋ฌ ์๋น์์๊ฒ ์์ ์ ๋ถ๋ฐฐํ๋ ๊ตฌ์กฐ
- ํฌ๋กค๋ฌ๊ฐ URL ๋ชฉ๋ก์ ๋ฐ์ ์ฌ๋ฌ ์ค๋ ๋/์์ ์ผ๋ก ๋ถ๋ฐฐ ์ฒ๋ฆฌ
์์ฝ
- 55 Queue๋ฅผ ์ฌ์ฉํด ์ค๋ ๋ ์ฌ์ด์ ์์
์ ์กฐ์จํ๋ผ
- queue.Queue ์ฌ์ฉํด ์ค๋ ๋ ๊ฐ ์์ ํ๊ฒ ๋ฐ์ดํฐ ํต์ ์ ํ ์ ์๋ค.
- 56 ์ธ์ ๋์์ฑ์ด ํ์ํ ์ง ์ธ์ํ๋ ๋ฐฉ๋ฒ์ ์์๋๋ผ
- I/O vs CPU-bound
- 57 ์๊ตฌ์ ๋ฐ๋ผ ํฌ์์์ ์งํํ๋ ค๋ฉด ์๋ก์ด ์ค๋ ๋๋ฅผ ์์ฑํ์ง ๋ง๋ผ
- ์์ฒญ๋ง๋ค ์ค๋ ๋ ์์ฑํ์ง ๋ง๊ณ , ThreadPoolExecutor๋ก ์ ํํ๋ผ
- 58 ๋์์ฑ๊ณผ Queue๋ฅผ ์ฌ์ฉํ๊ธฐ ์ํด ์ฝ๋๋ฅผ ์ด๋ป๊ฒ ๋ฆฌํฉํฐ๋งํด์ผ ํ๋์ง ์ดํดํ๋ผ
- Queue๋ฅผ ํ์ฉํ ์์ ๋ถ๋ฆฌ ๊ตฌ์กฐ ๋ฆฌํฉํฐ๋ง
- 59 ๋์์ฑ์ ์ํด ์ค๋ ๋๊ฐ ํ์ํ ๊ฒฝ์ฐ์๋ ThreadpoolExecutor๋ฅผ ์ฌ์ฉํ๋ผ
- ๊ฐ๋จํ ๋์์ฑ ์ฒ๋ฆฌ์๋ ThreadPoolExecutor๊ฐ ํจ๊ณผ์
queue.Queue ๋ฅผ ํ์ฉํ์ฌ ํฌ์ธ/ํฌ์์ ๊ตฌํ
import threading
from queue import Queue
import time
def worker(name, q):
while True:
task = q.get()
if task is None:
break
print(f"{name} processing {task}")
time.sleep(0.1)
q.task_done()
q = Queue()
# ํฌ์์: ํ๋์ ํ์ ์ฌ๋ฌ ์๋น์
threads = []
for i in range(3):
t = threading.Thread(target=worker, args=(f"Worker-{i}", q))
t.start()
threads.append(t)
# ํฌ์ธ: ์ฌ๋ฌ ์์ฐ์๊ฐ ํ์ ์์
์ ๋ฌ
for i in range(10):
q.put(f"task-{i}")
q.join()
for _ in range(3):
q.put(None)
for t in threads:
t.join()
ThreadPoolExecutor ๋ฅผ ํ์ฉํ์ฌ ํฌ์ธ/ํฌ์์ ๊ตฌํ
from concurrent.futures import ThreadPoolExecutor
def task(n):
print(f"Processing {n}")
return n * 2
with ThreadPoolExecutor(max_workers=4) as executor:
results = list(executor.map(task, range(10)))
Better Way 60 ~ 63 : asyncio (async - await) ์ฝ๋ฃจํด์ ์ฌ์ฉํด๋ผ
์ด๋ฒคํธ ๋ฃจํ
- asyncio๋ ๋จ์ผ ์ค๋ ๋ ๊ธฐ๋ฐ ์ด๋ฒคํธ ๋ฃจํ์์ ์ฝ๋ฃจํด์ ์์๋๋ก ์คํํ๋ฉฐ I/O ํ์ด๋ฐ์ ํจ์จ์ ์ผ๋ก ์ฒ๋ฆฌํจ
- "๋น๋๊ธฐ ํจ์ → ์์ ๋ฑ๋ก → ๋ฃจํ๊ฐ ํ์ด๋ฐ ๋ง์ถฐ ์คํ" ๊ตฌ์กฐ
- async def, await, asyncio.run()์ ๋ชจ๋ ์ด ์ด๋ฒคํธ ๋ฃจํ ๊ธฐ๋ฐ์์ ์๋ํจ
Better way 60 I/O๋ฅผ ํ ๋๋ ์ฝ๋ฃจํด์ ์ฌ์ฉํด ๋์์ฑ์ ๋์ฌ๋ผ
import aiohttp
import asyncio
async def fetch(url):
async with aiohttp.ClientSession() as session:
async with session.get(url) as resp:
return await resp.text()
async def main():
html = await fetch("https://example.com")
print(html)
asyncio.run(main())
Better way 61 ์ค๋ ๋๋ฅผ ์ฌ์ฉํ I/O๋ฅผ ์ด๋ป๊ฒ asyncio๋ก ํฌํ ํ ์ ์๋์ง ์์๋๋ผ
Better way 62 asyncio๋ก ์ฝ๊ฒ ์ฎ๊ฒจ๊ฐ ์ ์๋๋ก ์ค๋ ๋์ ์ฝ๋ฃจํด์ ํจ๊ป ์ฌ์ฉํ๋ผ
Better way 63 ์๋ต์ฑ์ ์ต๋๋ก ๋์ด๋ ค๋ฉด asyncio ์ด๋ฒคํธ ๋ฃจํ๋ฅผ ๋ธ๋กํ์ง ๋ง๋ผ
import asyncio
import time
from concurrent.futures import ThreadPoolExecutor
# ๊ธฐ์กด์ ๋ธ๋กํน I/O ํจ์ (์: ํ์ผ ์ฝ๊ธฐ, DB ์ ๊ทผ ๋ฑ)
def legacy_blocking_io(task_id):
print(f"[Blocking] Task {task_id} start")
time.sleep(2) # โ ๋ธ๋กํน ํจ์ (event loop ๋ง์ง ์๊ฒ executor์์ ์คํํด์ผ ํจ)
print(f"[Blocking] Task {task_id} done")
return f"result-{task_id}"
# asyncio ๊ธฐ๋ฐ์ ๋น๋๊ธฐ ์์
(์: I/O ๋ชจ๋๊ณผ ํจ๊ป ์ฐ์ด๋ ํจ์)
async def async_io_task(task_id):
print(f"[Async I/O] Task {task_id} start")
await asyncio.sleep(1) # โ
๋
ผ๋ธ๋กํน ๋๊ธฐ
print(f"[Async I/O] Task {task_id} done")
return f"async-result-{task_id}"
# ๋ธ๋กํน ํจ์๋ฅผ ์ฝ๋ฃจํด์ฒ๋ผ ์คํ (Better way 62)
async def wrapped_blocking_io(task_id, executor):
loop = asyncio.get_running_loop()
return await loop.run_in_executor(executor, legacy_blocking_io, task_id)
async def main():
executor = ThreadPoolExecutor(max_workers=3)
tasks = []
# ํผํฉ๋ ์์
๋ฆฌ์คํธ (๋ธ๋กํน + ๋น๋๊ธฐ)
for i in range(5):
if i % 2 == 0:
# ๋ธ๋กํน ํจ์๋ run_in_executor๋ก ์คํ
tasks.append(wrapped_blocking_io(i, executor))
else:
# ๋น๋๊ธฐ ํจ์๋ ๊ทธ๋๋ก await
tasks.append(async_io_task(i))
results = await asyncio.gather(*tasks)
print("\nFinal Results:")
for res in results:
print(res)
asyncio.run(main())
[Blocking] Task 0 start
[Async I/O] Task 1 start
[Blocking] Task 2 start
[Async I/O] Task 3 start
[Blocking] Task 4 start
[Async I/O] Task 1 done
[Async I/O] Task 3 done
[Blocking] Task 0 done
[Blocking] Task 2 done
[Blocking] Task 4 done
Final Results:
result-0
async-result-1
result-2
async-result-3
result-4
Better way 64 ์ง์ ํ ๋ณ๋ ฌ์ฑ์ ์ด๋ฆฌ๋ ค๋ฉด concurrent.futures๋ฅผ ์ฌ์ฉํ๋ผ
import time
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
def cpu_bound_task(n):
return sum(i * i for i in range(n))
numbers = [10**7] * 8 # 8๊ฐ์ ์์
def run_with_executor(executor_class, label):
start = time.time()
with executor_class() as executor:
results = list(executor.map(cpu_bound_task, numbers))
duration = time.time() - start
print(f"{label} took {duration:.2f} seconds")
if __name__ == "__main__":
run_with_executor(ThreadPoolExecutor, "ThreadPoolExecutor")
run_with_executor(ProcessPoolExecutor, "ProcessPoolExecutor")
# ์ถ๋ ฅ ๊ฒฐ๊ณผ
# ThreadPoolExecutor took 2.71 seconds
# ProcessPoolExecutor took 0.72 seconds
ํญ๋ชฉ | ThreadPoolExecutor | ProcessPoolExecutor |
๋ณ๋ ฌ์ฑ ๋ฐฉ์ | ์ค๋ ๋ (GIL ์ ํ ์์) | ํ๋ก์ธ์ค (๋ฉํฐ์ฝ์ด ๋ณ๋ ฌ ์คํ ๊ฐ๋ฅ) |
CPU-bound ์์ ์ฒ๋ฆฌ | โ ๋๋ฆผ | โ ๋น ๋ฆ (์ง์ ํ ๋ณ๋ ฌ์ฑ) |
I/O-bound ์์ ์ฒ๋ฆฌ | โ ํจ์จ์ | โ ์คํ๋ ค ๋๋ฆผ (ํ๋ก์ธ์ค ๊ด๋ฆฌ ๋น์ฉ ํผ) |
'๐ Python' ์นดํ ๊ณ ๋ฆฌ์ ๋ค๋ฅธ ๊ธ
์ดํํฐ๋ธ ํ์ด์ฌ 6์ฅ - ๋ฉํํด๋์ค์ ์ ํธ๋ฆฌ๋ทฐํธ (0) | 2025.03.30 |
---|---|
์ดํํฐ๋ธ ํ์ด์ฌ 5์ฅ - ํด๋์ค์ ์ธํฐํ์ด์ค (0) | 2025.03.21 |
์ดํํฐ๋ธ ํ์ด์ฌ 4์ฅ - ์ปดํ๋ฆฌํจ์ ๊ณผ ์ ๋๋ ์ดํฐ (0) | 2025.03.18 |
์ดํํฐ๋ธ ํ์ด์ฌ 3์ฅ - ํจ์ (0) | 2025.03.17 |
์ดํํฐ๋ธ ํ์ด์ฌ 2์ฅ - ๋ฆฌ์คํธ์ ๋์ ๋๋ฆฌ (0) | 2025.03.12 |