Python asyncio 동시성 제어 완벽 가이드: Semaphore, Lock, Queue로 안전한 비동기 프로그래밍 구현하기

Updated Feb 6, 2026

asyncio 동시성 제어가 필요한 이유

비동기 프로그래밍은 높은 성능을 제공하지만, 여러 코루틴이 동시에 실행되면서 공유 자원에 접근하거나 동시 실행 수를 제한해야 하는 상황이 발생합니다. 이때 적절한 동시성 제어 없이는 데이터 손실, 레이스 컨디션, 리소스 고갈 등의 문제가 발생할 수 있습니다.

핵심: asyncio는 단일 스레드에서 동작하지만, 여러 코루틴이 번갈아 실행되기 때문에 원자성(atomicity)이 보장되지 않는 작업에서는 동기화가 필수입니다.

Semaphore: 동시 실행 수 제한

개념

Semaphore는 동시에 실행 가능한 코루틴의 수를 제한하는 동기화 도구입니다. 내부적으로 카운터를 관리하며, 카운터가 0이 되면 대기 상태로 전환됩니다.

실무 활용 예시

API 요청 시 서버 과부하를 방지하기 위해 동시 요청 수를 제한하는 경우:

import asyncio
import aiohttp

async def fetch_data(session, url, semaphore):
    async with semaphore:  # 세마포어 획득
        async with session.get(url) as response:
            return await response.text()

async def main():
    semaphore = asyncio.Semaphore(5)  # 최대 5개 동시 실행
    urls = [f"https://api.example.com/data/{i}" for i in range(100)]

    async with aiohttp.ClientSession() as session:
        tasks = [fetch_data(session, url, semaphore) for url in urls]
        results = await asyncio.gather(*tasks)

    print(f"총 {len(results)}개 요청 완료")

asyncio.run(main())

Semaphore vs BoundedSemaphore

구분 Semaphore BoundedSemaphore
카운터 증가 무제한 증가 가능 초기값 초과 불가
사용 사례 일반적인 동시성 제한 버그 방지 (릴리스 과다 호출 감지)
안전성 실수로 릴리스 과다 시 문제 ValueError 발생으로 에러 조기 발견

Lock: 상호 배제 (Mutual Exclusion)

개념

Lock은 한 번에 하나의 코루틴만 임계 영역에 접근하도록 보장합니다. 공유 자원을 수정하는 작업에서 필수적입니다.

실무 활용 예시

여러 코루틴이 동시에 카운터를 증가시키는 경우:

import asyncio

class Counter:
    def __init__(self):
        self.value = 0
        self.lock = asyncio.Lock()

    async def increment(self):
        async with self.lock:  # Lock 획득
            # 임계 영역: 원자적 실행 보장
            current = self.value
            await asyncio.sleep(0.001)  # I/O 시뮬레이션
            self.value = current + 1

async def worker(counter, name):
    for _ in range(1000):
        await counter.increment()
    print(f"{name} 완료")

async def main():
    counter = Counter()
    await asyncio.gather(
        worker(counter, "Worker-1"),
        worker(counter, "Worker-2"),
        worker(counter, "Worker-3")
    )
    print(f"최종 값: {counter.value}")  # 3000 (정확함)

asyncio.run(main())

주의: Lock 없이 실행하면 레이스 컨디션으로 인해 최종 값이 3000보다 작을 수 있습니다.

Queue: 비동기 작업 큐

개념

Queue는 Producer-Consumer 패턴을 구현할 때 사용하며, 작업을 안전하게 전달하고 처리 순서를 관리합니다.

Queue 종류

클래스 특징 사용 사례
Queue FIFO (First-In-First-Out) 순서대로 처리
LifoQueue LIFO (Last-In-First-Out) 스택 구조 필요 시
PriorityQueue 우선순위 기반 중요도에 따라 처리

실무 활용 예시

웹 크롤러에서 URL을 큐에 추가하고 워커가 처리하는 패턴:

import asyncio
import aiohttp

async def producer(queue, urls):
    """URL을 큐에 추가"""
    for url in urls:
        await queue.put(url)
        print(f"큐에 추가: {url}")

    # 종료 신호 (워커 수만큼)
    for _ in range(3):
        await queue.put(None)

async def consumer(queue, name):
    """큐에서 URL을 가져와 처리"""
    async with aiohttp.ClientSession() as session:
        while True:
            url = await queue.get()
            if url is None:  # 종료 신호
                queue.task_done()
                break

            try:
                async with session.get(url) as response:
                    print(f"[{name}] {url} 처리 완료 (상태: {response.status})")
            finally:
                queue.task_done()

async def main():
    queue = asyncio.Queue(maxsize=10)  # 최대 10개 항목
    urls = [f"https://httpbin.org/delay/{i}" for i in range(20)]

    # Producer 1개, Consumer 3개 동시 실행
    await asyncio.gather(
        producer(queue, urls),
        consumer(queue, "Worker-1"),
        consumer(queue, "Worker-2"),
        consumer(queue, "Worker-3")
    )

    await queue.join()  # 모든 작업 완료 대기
    print("모든 작업 완료")

asyncio.run(main())

Queue 주요 메서드

  • put(item): 큐에 항목 추가 (큐가 가득 차면 대기)
  • get(): 큐에서 항목 가져오기 (큐가 비어 있으면 대기)
  • task_done(): 항목 처리 완료 표시
  • join(): 모든 항목이 처리될 때까지 대기

동시성 제어 도구 비교

도구 목적 주요 사용 사례 동시 접근 수
Semaphore 동시 실행 수 제한 API 요청 제한, DB 커넥션 풀 N개 (설정 가능)
Lock 상호 배제 공유 자원 수정, 파일 쓰기 1개
Queue 작업 분배 Producer-Consumer, 작업 큐 무제한 (큐 크기 제한 가능)

조합 활용: 안전하고 효율적인 크롤러

실무에서는 여러 도구를 조합하여 사용합니다:

import asyncio
import aiohttp

class SafeCrawler:
    def __init__(self, max_concurrent=5):
        self.queue = asyncio.Queue()
        self.semaphore = asyncio.Semaphore(max_concurrent)
        self.results_lock = asyncio.Lock()
        self.results = []

    async def fetch(self, session, url):
        async with self.semaphore:  # 동시 요청 제한
            async with session.get(url) as response:
                data = await response.text()

                async with self.results_lock:  # 결과 저장 시 Lock
                    self.results.append({"url": url, "size": len(data)})

    async def worker(self, session):
        while True:
            url = await self.queue.get()
            if url is None:
                self.queue.task_done()
                break
            try:
                await self.fetch(session, url)
            finally:
                self.queue.task_done()

    async def crawl(self, urls, num_workers=3):
        async with aiohttp.ClientSession() as session:
            # 워커 시작
            workers = [asyncio.create_task(self.worker(session)) 
                      for _ in range(num_workers)]

            # URL 큐에 추가
            for url in urls:
                await self.queue.put(url)

            # 종료 신호
            for _ in range(num_workers):
                await self.queue.put(None)

            await self.queue.join()
            await asyncio.gather(*workers)

        return self.results

async def main():
    crawler = SafeCrawler(max_concurrent=3)
    urls = [f"https://httpbin.org/delay/{i%3}" for i in range(10)]
    results = await crawler.crawl(urls, num_workers=2)
    print(f"총 {len(results)}개 페이지 크롤링 완료")

asyncio.run(main())

성능 최적화 팁

  1. Semaphore 값 조정: 너무 크면 서버 과부하, 너무 작으면 성능 저하
    – 실험을 통해 최적값 찾기 (일반적으로 10~50)

  2. Lock 사용 최소화: 임계 영역을 최대한 작게 유지
    “`python
    # 나쁜 예
    async with self.lock:
    data = await fetch_data() # I/O 작업 포함
    self.results.append(data)

# 좋은 예
data = await fetch_data() # Lock 밖에서 I/O
async with self.lock:
self.results.append(data) # 최소한의 작업만
“`

  1. Queue 크기 제한: 메모리 사용량 제어를 위해 maxsize 설정

마무리

Python asyncio의 동시성 제어 도구는 각각 명확한 용도가 있습니다:

  • Semaphore: 동시 실행 수를 제한하여 리소스 고갈 방지
  • Lock: 공유 자원 접근 시 상호 배제로 데이터 무결성 보장
  • Queue: Producer-Consumer 패턴으로 작업 분배 및 순서 관리

실무에서는 이 세 가지를 조합하여 안전하고 효율적인 비동기 시스템을 구축할 수 있습니다. API 크롤러, 대용량 데이터 처리, 실시간 스트리밍 등 다양한 시나리오에서 적절한 도구를 선택하여 활용하세요.

Did you find this helpful?

☕ Buy me a coffee

Comments

Leave a Reply

Your email address will not be published. Required fields are marked *

TODAY 433 | TOTAL 2,656