๋ณธ๋ฌธ ๋ฐ”๋กœ๊ฐ€๊ธฐ
๐Ÿ Python

์ดํŽ™ํ‹ฐ๋ธŒ ํŒŒ์ด์ฌ 7์žฅ - ๋™์‹œ์„ฑ๊ณผ ๋ณ‘๋ ฌ์„ฑ

by dev.py 2025. 4. 1.

๋™์‹œ์„ฑ vs ๋ณ‘๋ ฌ์„ฑ

๊ตฌ๋ถ„ ๋™์‹œ์„ฑ (Concurrency) ๋ณ‘๋ ฌ์„ฑ (Parallelism)
์˜๋ฏธ ์—ฌ๋Ÿฌ ์ž‘์—…์„ ๋ฒˆ๊ฐˆ์•„๊ฐ€๋ฉฐ ์ฒ˜๋ฆฌ (๊ฒ‰์œผ๋กœ๋Š” ๋™์‹œ์—) ์—ฌ๋Ÿฌ ์ž‘์—…์„ ์‹ค์ œ๋กœ ๋™์‹œ์— ์ฒ˜๋ฆฌ
์ฒ˜๋ฆฌ ๋ฐฉ์‹ ํ•˜๋‚˜์˜ ์ฝ”์–ด๊ฐ€ ์ž‘์—…์„ ๋น ๋ฅด๊ฒŒ ์ „ํ™˜ํ•˜๋ฉฐ ์ฒ˜๋ฆฌ ์—ฌ๋Ÿฌ ์ฝ”์–ด๊ฐ€ ๊ฐ ์ž‘์—…์„ ๋™์‹œ์— ์‹คํ–‰
์˜ˆ์‹œ ์นดํŽ˜์—์„œ ํ˜ผ์ž ์ฃผ๋ฌธ ๋ฐ›๊ณ , ๊ณ„์‚ฐํ•˜๊ณ , ์Œ๋ฃŒ ๋งŒ๋“ค๊ธฐ ์นดํŽ˜์—์„œ ์„ธ ๋ช…์ด ๊ฐ๊ฐ ์ฃผ๋ฌธ, ๊ณ„์‚ฐ, ์Œ๋ฃŒ ๋งŒ๋“ค๊ธฐ
๋ชฉ์  ์‘๋‹ต์„ฑ ํ–ฅ์ƒ, I/O ๋Œ€๊ธฐ ์‹œ๊ฐ„ ํ™œ์šฉ ์ฒ˜๋ฆฌ ์†๋„ ํ–ฅ์ƒ
ํŒŒ์ด์ฌ ์Šค๋ ˆ๋“œ, ์ฝ”๋ฃจํ‹ด (asyncio) ํ•˜์œ„ ํ”„๋กœ์„ธ์Šค, C ํ™•์žฅ, ์‹œ์Šคํ…œ ์ฝœ
์ž๋ฐ”์Šคํฌ๋ฆฝํŠธ ์ด๋ฒคํŠธ ๋ฃจํ”„, ํ”„๋กœ๋ฏธ์Šค, async/await  Web Workers, Node.js์˜ Worker Threads

 

GIL (Global Interupt Lock)

  • ํŒŒ์ด์ฌ์—๋Š” GIL์ด๋ผ๋Š” ์ „์—ญ ์ธํ„ฐํ”„๋ฆฌํ„ฐ ๋ฝ์ด ์กด์žฌํ•จ
  • ํ•˜๋‚˜์˜ ์Šค๋ ˆ๋“œ๋งŒ ํŒŒ์ด์ฌ ๋ฐ”์ดํŠธ์ฝ”๋“œ๋ฅผ ์‹คํ–‰ํ•  ์ˆ˜ ์žˆ์Œ
  • -> ๋ฉ€ํ‹ฐ์Šค๋ ˆ๋“œ๋Š” ๋™์‹œ์„ฑ์€ ๊ฐ€๋Šฅํ•˜์ง€๋งŒ ๋ณ‘๋ ฌ์„ฑ์ด ์ œํ•œ์ 
  • ๋ณ‘๋ ฌ ์ฒ˜๋ฆฌ๋ฅผ ์›ํ•˜๋ฉด ํ•˜์œ„ ํ”„๋กœ์„ธ์Šค(์ž์‹ ํ”„๋กœ์„ธ์Šค), C ํ™•์žฅ, ์‹œ์Šคํ…œ ์ฝœ์„ ํ™œ์šฉํ•  ์ˆ˜ ์žˆ๋‹ค.

ํŒŒ์ด์ฌ์—์„œ์˜ ๋ฉ€ํ‹ฐ์“ฐ๋ ˆ๋”ฉ ์ž‘์—…

 

 

 

Better way 52 ์ž์‹ ํ”„๋กœ์„ธ์Šค๋ฅผ ๊ด€๋ฆฌํ•˜๊ธฐ ์œ„ํ•ด subprocess๋ฅผ ์‚ฌ์šฉํ•˜๋ผ

  • subprocess ๋ชจ๋“ˆ์„ ์‚ฌ์šฉํ•ด ์ž์‹ ํ”„๋กœ์„ธ์Šค๋ฅผ ์‹คํ–‰ํ•˜๊ณ  ์ž…๋ ฅ๊ณผ ์ถœ๋ ฅ ์ŠคํŠธ๋ฆผ์„ ๊ด€๋ฆฌ ํ•  ์ˆ˜ ์žˆ๋‹ค.
  • ์ž์‹ ํ”„๋กœ์„ธ์Šค๋Š” ํŒŒ์ด์ฌ ์ธํ„ฐํ”„๋ฆฌํ„ฐ์™€ ๋ณ‘๋ ฌ๋กœ ์‹คํ–‰๋˜๊ธฐ์— CPU ์ฝ”์–ด๋ฅผ ํ™œ์šฉํ•  ์ˆ˜ ์žˆ๋‹ค.
  • subprocess.run()์€ ๋‹จ์ˆœ ์‹คํ–‰
  • subprocess.Popen()์€ ํŒŒ์ดํ”„๋ผ์ธ ์—ฐ๊ฒฐ ๋“ฑ ๊ณ ๊ธ‰ ์ œ์–ด
import subprocess

# ๋‹จ์ˆœํ•œ ๋ช…๋ น ์‹คํ–‰ - subprocess.run()
result = subprocess.run(['ls', '-l'], capture_output=True, text=True)

print("stdout:", result.stdout)
print("stderr:", result.stderr)
print("returncode:", result.returncode)

 

import subprocess

# ls์™€ grep์„ ํŒŒ์ดํ”„๋กœ ์—ฐ๊ฒฐ
ls_proc = subprocess.Popen(['ls'], stdout=subprocess.PIPE)
grep_proc = subprocess.Popen(['grep', 'py'], stdin=ls_proc.stdout, stdout=subprocess.PIPE)

ls_proc.stdout.close()  # ํŒŒ์ดํ”„ ์ข…๋ฃŒ ์‹ ํ˜ธ
output, _ = grep_proc.communicate()

print(output.decode())

 

 

Better way 53 ๋ธ”๋กœํ‚น I/O์˜ ๊ฒฝ์šฐ ์Šค๋ ˆ๋“œ๋ฅผ ์‚ฌ์šฉํ•˜๊ณ  ๋ณ‘๋ ฌ์„ฑ์„ ํ”ผํ•˜๋ผ

import threading
import requests

def download(url):
    response = requests.get(url)
    print(f"{url}: {len(response.content)} bytes")

thread = threading.Thread(target=download, args=("https://example.com",))
thread.start()

 

Better way 54 ์Šค๋ ˆ๋“œ์—์„œ ๋ฐ์ดํ„ฐ ๊ฒฝํ•ฉ์„ ํ”ผํ•˜๊ธฐ ์œ„ํ•ด Lock์„ ์‚ฌ์šฉํ•˜๋ผ

GIL (Global Interpreter Lock) ์ด ์กด์žฌํ•ด์„œ, ํ•œ ์‹œ์ ์— ํ•˜๋‚˜์˜ ์Šค๋ ˆ๋“œ๋งŒ ๋ฐ”์ดํŠธ์ฝ”๋“œ๋ฅผ ์‹คํ–‰ํ•  ์ˆ˜ ์žˆ๋‹ค.

ํ•˜์ง€๋งŒ ์—ฌ์ „ํžˆ ๊ณต์œ  ์ž์›์— ๋Œ€ํ•œ Race condition์€ ๋ฐœ์ƒํ•  ์ˆ˜ ์žˆ๋‹ค. 

class Counter:
    def __init__(self):
        self.counter = 0

    def increment(self, offset): # ์—ฌ๊ธฐ์—์„œ race condition์ด ๋ฐœ์ƒ
        value = self.counter
        time.sleep(0.00001)  # ๋”œ๋ ˆ์ด ์ถ”๊ฐ€
        self.counter = value + offset


def worker(sensor_index, how_many, counter):
    for _ in range(how_many):
        counter.increment(1)


how_many = 1000  # ๋ฐ˜๋ณต ์ ๊ฒŒ
counter = Counter()

threads = []
for i in range(5):
    thread = threading.Thread(target=worker, args=(i, how_many, counter))
    threads.append(thread)
    thread.start()

for thread in threads:
    thread.join()

print(f"Counter should be {5 * how_many}, found {counter.counter}")

# ์‹ค์ œ ์ถœ๋ ฅ๊ฐ’ - Counter should be 5000, found 1001

 

import threading
import time

class Counter:
    def __init__(self):
        self.counter = 0
        self.lock = threading.Lock() # Lock ์‚ฌ์šฉ

    def increment(self, offset):
        with self.lock:  # ๊ณต์œ  ์ž์› ์ ‘๊ทผ ์‹œ Lock์œผ๋กœ ๋ณดํ˜ธ
            value = self.counter
            time.sleep(0.00001)  # ๋”œ๋ ˆ์ด ์žˆ์–ด๋„ ์•ˆ์ „
            self.counter = value + offset


def worker(sensor_index, how_many, counter):
    for _ in range(how_many):
        counter.increment(1)


how_many = 1000
counter = Counter()

threads = []
for i in range(5):
    thread = threading.Thread(target=worker, args=(i, how_many, counter))
    threads.append(thread)
    thread.start()

for thread in threads:
    thread.join()

print(f"Counter should be {5 * how_many}, found {counter.counter}")

# ์‹ค์ œ ์ถœ๋ ฅ๊ฐ’ - Counter should be 5000, found 5000

 

 

 

 

 

 

Better Way 55 ~ 59 : ํŒŒ์ด์ฌ ๋‚ด๋ถ€์˜ ๋™์‹œ์„ฑ ํŒจํ‚ค์ง€

ํŒฌ์ธ (Fan-in)

  • ์—ฌ๋Ÿฌ ์ƒ์‚ฐ์ž(source)๋“ค์ด ๋ฐ์ดํ„ฐ๋ฅผ ๋ชจ์•„์„œ ํ•˜๋‚˜์˜ ์†Œ๋น„์ž(consumer)์—๊ฒŒ ์ „๋‹ฌํ•˜๋Š” ๊ตฌ์กฐ
  • ์—ฌ๋Ÿฌ ์„ผ์„œ ๋ฐ์ดํ„ฐ → ํ•˜๋‚˜์˜ ์ฒ˜๋ฆฌ๊ธฐ

ํŒฌ์•„์›ƒ (Fan-in)

  • ํ•˜๋‚˜์˜ ์ƒ์‚ฐ์ž๊ฐ€ ์—ฌ๋Ÿฌ ์†Œ๋น„์ž์—๊ฒŒ ์ž‘์—…์„ ๋ถ„๋ฐฐํ•˜๋Š” ๊ตฌ์กฐ
  • ํฌ๋กค๋Ÿฌ๊ฐ€ URL ๋ชฉ๋ก์„ ๋ฐ›์•„ ์—ฌ๋Ÿฌ ์Šค๋ ˆ๋“œ/์ž‘์—…์œผ๋กœ ๋ถ„๋ฐฐ ์ฒ˜๋ฆฌ

 

์š”์•ฝ

  • 55 Queue๋ฅผ ์‚ฌ์šฉํ•ด ์Šค๋ ˆ๋“œ ์‚ฌ์ด์˜ ์ž‘์—…์„ ์กฐ์œจํ•˜๋ผ
    • queue.Queue ์‚ฌ์šฉํ•ด ์Šค๋ ˆ๋“œ ๊ฐ„ ์•ˆ์ „ํ•˜๊ฒŒ ๋ฐ์ดํ„ฐ ํ†ต์‹ ์„ ํ•  ์ˆ˜ ์žˆ๋‹ค.
  • 56 ์–ธ์ œ ๋™์‹œ์„ฑ์ด ํ•„์š”ํ• ์ง€ ์ธ์‹ํ•˜๋Š” ๋ฐฉ๋ฒ•์„ ์•Œ์•„๋‘๋ผ 
    • I/O vs CPU-bound
  • 57 ์š”๊ตฌ์— ๋”ฐ๋ผ ํŒฌ์•„์›ƒ์„ ์ง„ํ–‰ํ•˜๋ ค๋ฉด ์ƒˆ๋กœ์šด ์Šค๋ ˆ๋“œ๋ฅผ ์ƒ์„ฑํ•˜์ง€ ๋ง๋ผ
    • ์š”์ฒญ๋งˆ๋‹ค ์Šค๋ ˆ๋“œ ์ƒ์„ฑํ•˜์ง€ ๋ง๊ณ , ThreadPoolExecutor๋กœ ์ œํ•œํ•˜๋ผ
  • 58 ๋™์‹œ์„ฑ๊ณผ Queue๋ฅผ ์‚ฌ์šฉํ•˜๊ธฐ ์œ„ํ•ด ์ฝ”๋“œ๋ฅผ ์–ด๋–ป๊ฒŒ ๋ฆฌํŒฉํ„ฐ๋งํ•ด์•ผ ํ•˜๋Š”์ง€ ์ดํ•ดํ•˜๋ผ
    • Queue๋ฅผ ํ™œ์šฉํ•œ ์ž‘์—… ๋ถ„๋ฆฌ ๊ตฌ์กฐ ๋ฆฌํŒฉํ„ฐ๋ง
  • 59 ๋™์‹œ์„ฑ์„ ์œ„ํ•ด ์Šค๋ ˆ๋“œ๊ฐ€ ํ•„์š”ํ•œ ๊ฒฝ์šฐ์—๋Š” ThreadpoolExecutor๋ฅผ ์‚ฌ์šฉํ•˜๋ผ
    • ๊ฐ„๋‹จํ•œ ๋™์‹œ์„ฑ ์ฒ˜๋ฆฌ์—๋Š” ThreadPoolExecutor๊ฐ€ ํšจ๊ณผ์ 

 

queue.Queue ๋ฅผ ํ™œ์šฉํ•˜์—ฌ ํŒฌ์ธ/ํŒฌ์•„์›ƒ ๊ตฌํ˜„

import threading
from queue import Queue
import time

def worker(name, q):
    while True:
        task = q.get()
        if task is None:
            break
        print(f"{name} processing {task}")
        time.sleep(0.1)
        q.task_done()

q = Queue()

# ํŒฌ์•„์›ƒ: ํ•˜๋‚˜์˜ ํ์— ์—ฌ๋Ÿฌ ์†Œ๋น„์ž
threads = []
for i in range(3):
    t = threading.Thread(target=worker, args=(f"Worker-{i}", q))
    t.start()
    threads.append(t)

# ํŒฌ์ธ: ์—ฌ๋Ÿฌ ์ƒ์‚ฐ์ž๊ฐ€ ํ์— ์ž‘์—… ์ „๋‹ฌ
for i in range(10):
    q.put(f"task-{i}")

q.join()

for _ in range(3):
    q.put(None)
for t in threads:
    t.join()

 

ThreadPoolExecutor ๋ฅผ ํ™œ์šฉํ•˜์—ฌ ํŒฌ์ธ/ํŒฌ์•„์›ƒ ๊ตฌํ˜„

from concurrent.futures import ThreadPoolExecutor

def task(n):
    print(f"Processing {n}")
    return n * 2

with ThreadPoolExecutor(max_workers=4) as executor:
    results = list(executor.map(task, range(10)))

 

Better Way 60 ~ 63 : asyncio (async - await) ์ฝ”๋ฃจํ‹ด์„ ์‚ฌ์šฉํ•ด๋ผ

์ด๋ฒคํŠธ ๋ฃจํ”„

 

  • asyncio๋Š” ๋‹จ์ผ ์Šค๋ ˆ๋“œ ๊ธฐ๋ฐ˜ ์ด๋ฒคํŠธ ๋ฃจํ”„์—์„œ ์ฝ”๋ฃจํ‹ด์„ ์ˆœ์„œ๋Œ€๋กœ ์‹คํ–‰ํ•˜๋ฉฐ I/O ํƒ€์ด๋ฐ์„ ํšจ์œจ์ ์œผ๋กœ ์ฒ˜๋ฆฌํ•จ
  • "๋น„๋™๊ธฐ ํ•จ์ˆ˜ → ์ž‘์—… ๋“ฑ๋ก → ๋ฃจํ”„๊ฐ€ ํƒ€์ด๋ฐ ๋งž์ถฐ ์‹คํ–‰" ๊ตฌ์กฐ
  • async def, await, asyncio.run()์€ ๋ชจ๋‘ ์ด ์ด๋ฒคํŠธ ๋ฃจํ”„ ๊ธฐ๋ฐ˜์—์„œ ์ž‘๋™ํ•จ

 

 

 

 

Better way 60 I/O๋ฅผ ํ•  ๋•Œ๋Š” ์ฝ”๋ฃจํ‹ด์„ ์‚ฌ์šฉํ•ด ๋™์‹œ์„ฑ์„ ๋†’์—ฌ๋ผ

import aiohttp
import asyncio

async def fetch(url):
    async with aiohttp.ClientSession() as session:
        async with session.get(url) as resp:
            return await resp.text()

async def main():
    html = await fetch("https://example.com")
    print(html)

asyncio.run(main())

 

 

 

 

Better way 61 ์Šค๋ ˆ๋“œ๋ฅผ ์‚ฌ์šฉํ•œ I/O๋ฅผ ์–ด๋–ป๊ฒŒ asyncio๋กœ ํฌํŒ…ํ•  ์ˆ˜ ์žˆ๋Š”์ง€ ์•Œ์•„๋‘๋ผ

Better way 62 asyncio๋กœ ์‰ฝ๊ฒŒ ์˜ฎ๊ฒจ๊ฐˆ ์ˆ˜ ์žˆ๋„๋ก ์Šค๋ ˆ๋“œ์™€ ์ฝ”๋ฃจํ‹ด์„ ํ•จ๊ป˜ ์‚ฌ์šฉํ•˜๋ผ

Better way 63 ์‘๋‹ต์„ฑ์„ ์ตœ๋Œ€๋กœ ๋†’์ด๋ ค๋ฉด asyncio ์ด๋ฒคํŠธ ๋ฃจํ”„๋ฅผ ๋ธ”๋กํ•˜์ง€ ๋ง๋ผ

 

import asyncio
import time
from concurrent.futures import ThreadPoolExecutor

# ๊ธฐ์กด์˜ ๋ธ”๋กœํ‚น I/O ํ•จ์ˆ˜ (์˜ˆ: ํŒŒ์ผ ์ฝ๊ธฐ, DB ์ ‘๊ทผ ๋“ฑ)
def legacy_blocking_io(task_id):
    print(f"[Blocking] Task {task_id} start")
    time.sleep(2)  # โŒ ๋ธ”๋กœํ‚น ํ•จ์ˆ˜ (event loop ๋ง‰์ง€ ์•Š๊ฒŒ executor์—์„œ ์‹คํ–‰ํ•ด์•ผ ํ•จ)
    print(f"[Blocking] Task {task_id} done")
    return f"result-{task_id}"

# asyncio ๊ธฐ๋ฐ˜์˜ ๋น„๋™๊ธฐ ์ž‘์—… (์˜ˆ: I/O ๋ชจ๋“ˆ๊ณผ ํ•จ๊ป˜ ์“ฐ์ด๋Š” ํ•จ์ˆ˜)
async def async_io_task(task_id):
    print(f"[Async I/O] Task {task_id} start")
    await asyncio.sleep(1)  # โœ… ๋…ผ๋ธ”๋กœํ‚น ๋Œ€๊ธฐ
    print(f"[Async I/O] Task {task_id} done")
    return f"async-result-{task_id}"

# ๋ธ”๋กœํ‚น ํ•จ์ˆ˜๋ฅผ ์ฝ”๋ฃจํ‹ด์ฒ˜๋Ÿผ ์‹คํ–‰ (Better way 62)
async def wrapped_blocking_io(task_id, executor):
    loop = asyncio.get_running_loop()
    return await loop.run_in_executor(executor, legacy_blocking_io, task_id)

async def main():
    executor = ThreadPoolExecutor(max_workers=3)

    tasks = []

    # ํ˜ผํ•ฉ๋œ ์ž‘์—… ๋ฆฌ์ŠคํŠธ (๋ธ”๋กœํ‚น + ๋น„๋™๊ธฐ)
    for i in range(5):
        if i % 2 == 0:
            # ๋ธ”๋กœํ‚น ํ•จ์ˆ˜๋Š” run_in_executor๋กœ ์‹คํ–‰
            tasks.append(wrapped_blocking_io(i, executor))
        else:
            # ๋น„๋™๊ธฐ ํ•จ์ˆ˜๋Š” ๊ทธ๋Œ€๋กœ await
            tasks.append(async_io_task(i))

    results = await asyncio.gather(*tasks)

    print("\nFinal Results:")
    for res in results:
        print(res)

asyncio.run(main())

 

[Blocking] Task 0 start
[Async I/O] Task 1 start
[Blocking] Task 2 start
[Async I/O] Task 3 start
[Blocking] Task 4 start
[Async I/O] Task 1 done
[Async I/O] Task 3 done
[Blocking] Task 0 done
[Blocking] Task 2 done
[Blocking] Task 4 done

Final Results:
result-0
async-result-1
result-2
async-result-3
result-4

 

Better way 64 ์ง„์ •ํ•œ ๋ณ‘๋ ฌ์„ฑ์„ ์‚ด๋ฆฌ๋ ค๋ฉด concurrent.futures๋ฅผ ์‚ฌ์šฉํ•˜๋ผ

import time
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor

def cpu_bound_task(n):
    return sum(i * i for i in range(n))

numbers = [10**7] * 8  # 8๊ฐœ์˜ ์ž‘์—…

def run_with_executor(executor_class, label):
    start = time.time()
    with executor_class() as executor:
        results = list(executor.map(cpu_bound_task, numbers))
    duration = time.time() - start
    print(f"{label} took {duration:.2f} seconds")

if __name__ == "__main__":
    run_with_executor(ThreadPoolExecutor, "ThreadPoolExecutor")
    run_with_executor(ProcessPoolExecutor, "ProcessPoolExecutor")

# ์ถœ๋ ฅ ๊ฒฐ๊ณผ
# ThreadPoolExecutor took 2.71 seconds
# ProcessPoolExecutor took 0.72 seconds

 

ํ•ญ๋ชฉ ThreadPoolExecutor ProcessPoolExecutor
๋ณ‘๋ ฌ์„ฑ ๋ฐฉ์‹ ์Šค๋ ˆ๋“œ (GIL ์ œํ•œ ์žˆ์Œ) ํ”„๋กœ์„ธ์Šค (๋ฉ€ํ‹ฐ์ฝ”์–ด ๋ณ‘๋ ฌ ์‹คํ–‰ ๊ฐ€๋Šฅ)
CPU-bound ์ž‘์—… ์ฒ˜๋ฆฌ โŒ ๋Š๋ฆผ โœ… ๋น ๋ฆ„ (์ง„์ •ํ•œ ๋ณ‘๋ ฌ์„ฑ)
I/O-bound ์ž‘์—… ์ฒ˜๋ฆฌ โœ… ํšจ์œจ์  โŒ ์˜คํžˆ๋ ค ๋Š๋ฆผ (ํ”„๋กœ์„ธ์Šค ๊ด€๋ฆฌ ๋น„์šฉ ํผ)