동시성 Concurrency

Rust의 동시성 모델은 "Fearless Concurrency" — 컴파일러가 데이터 레이스를 원천 차단한다. 그 원리와 패턴을 깊이 살펴본다.

1. Send & Sync 트레이트

Rust의 스레드 안전성은 마커 트레이트 두 개로 작동한다.

Send — 값의 소유권을 다른 스레드로 넘길 수 있다.
Sync — 값의 참조를 여러 스레드가 동시에 가질 수 있다. (T: Sync iff &T: Send)

대부분의 타입은 컴파일러가 자동으로 구현해준다. 직접 구현할 일은 거의 없다.

// Rc는 Send/Sync 아님 — 참조 카운팅이 원자적이지 않음
let rc = Rc::new(5);
// thread::spawn(move || { drop(rc); }); // 컴파일 에러!

// Arc는 Send + Sync — 원자적 카운팅
let arc = Arc::new(5);
let arc2 = arc.clone();
thread::spawn(move || {
    println!("{}", arc2); // OK
}).join().unwrap();

Send ✓ / Sync ✓

  • Arc<T> (T: Send+Sync)
  • Mutex<T>
  • 기본 타입들 (i32, bool, …)

Send ✗ / Sync ✗

  • Rc<T> — 비원자적 refcount
  • RefCell<T> — Sync 아님
  • raw pointer *mut T
// unsafe impl — 직접 보장하겠다는 선언
// 실수하면 UB 발생. FFI 래퍼 등 극히 드문 경우에만.
struct MyHandle(*mut u8);
unsafe impl Send for MyHandle {}
unsafe impl Sync for MyHandle {}

2. Arc<Mutex<T>> 패턴

공유 상태의 표준 패턴. Arc로 소유권을 공유하고, Mutex로 접근을 직렬화한다.

use std::sync::{Arc, Mutex};
use std::thread;

let counter = Arc::new(Mutex::new(0u32));
let mut handles = vec![];

for _ in 0..8 {
    let c = Arc::clone(&counter);
    handles.push(thread::spawn(move || {
        let mut guard = c.lock().unwrap(); // MutexGuard — Drop 시 자동 해제
        *guard += 1;
    }));
}

for h in handles { h.join().unwrap(); }
println!("{}", *counter.lock().unwrap()); // 8
Deadlock 조심 — 여러 Mutex를 잠글 때 항상 같은 순서로. lock을 잡은 채 다른 lock을 잡으면 데드락.
// 위험: A → B 순서와 B → A 순서가 섞이면 데드락
// 해결: 항상 알파벳 순서 등 일관된 순서 유지

RwLock — 읽기 많고 쓰기 드물 때

use std::sync::RwLock;

let cache = Arc::new(RwLock::new(HashMap::new()));

// 읽기: 여러 스레드 동시 가능
let r = cache.read().unwrap();
println!("{:?}", r.get("key"));
drop(r); // 명시적 drop으로 빨리 해제

// 쓰기: 단독 접근
let mut w = cache.write().unwrap();
w.insert("key", "value");

Poisoned Lock 처리

let m = Arc::new(Mutex::new(0));

// lock 보유 중 panic → mutex가 poisoned 상태가 됨
let _ = std::panic::catch_unwind(|| {
    let _g = m.lock().unwrap();
    panic!("oh no");
});

// 이후 lock() → Err(PoisonError)
match m.lock() {
    Ok(g) => println!("{}", *g),
    Err(poisoned) => {
        // into_inner()로 값 회수 가능
        let g = poisoned.into_inner();
        println!("recovered: {}", *g);
    }
}

3. 채널 (mpsc)

공유 상태 없이 메시지 전달로 통신하는 패턴. Go의 채널과 유사하지만 소유권 이동이 보장된다.

use std::sync::mpsc; // multi-producer, single-consumer

let (tx, rx) = mpsc::channel::();

// Sender는 clone 가능 → 여러 생산자
let tx2 = tx.clone();

thread::spawn(move || tx.send("worker 1".to_string()).unwrap());
thread::spawn(move || tx2.send("worker 2".to_string()).unwrap());

// tx들이 모두 drop되면 rx.recv()가 Err 반환 → 루프 종료 신호
for msg in rx { // rx는 Iterator 구현
    println!("{}", msg);
}

Bounded vs Unbounded

// unbounded: 버퍼 무제한, send가 즉시 반환
let (tx, rx) = mpsc::channel::();

// sync_channel: bounded, 버퍼 가득 차면 send가 블록
let (tx, rx) = mpsc::sync_channel::(100); // 버퍼 100개

워커 풀 패턴

use std::sync::mpsc;

type Job = Box;

struct ThreadPool {
    workers: Vec>,
    sender: mpsc::Sender,
}

impl ThreadPool {
    fn new(size: usize) -> Self {
        let (tx, rx) = mpsc::channel::();
        let rx = Arc::new(Mutex::new(rx)); // 수신자 공유

        let workers = (0..size).map(|_| {
            let rx = Arc::clone(&rx);
            thread::spawn(move || loop {
                let job = rx.lock().unwrap().recv();
                match job {
                    Ok(f) => f(),
                    Err(_) => break, // 채널 닫힘 → 종료
                }
            })
        }).collect();

        ThreadPool { workers, sender: tx }
    }

    fn execute(&self, f: impl FnOnce() + Send + 'static) {
        self.sender.send(Box::new(f)).unwrap();
    }
}

4. Rayon — 데이터 병렬처리

use rayon::prelude::*;

// 한 글자만 바꾸면 병렬화
let sum: i64 = (0..1_000_000i64)
    .into_par_iter()           // par_iter()
    .filter(|&x| x % 2 == 0)
    .map(|x| x * x)
    .sum();
Work-stealing 스케줄러: Rayon은 CPU 코어 수만큼 스레드 풀을 만들고, 작업 큐가 비면 다른 스레드 큐에서 훔쳐온다. 작업 크기가 고르지 않아도 부하가 균등하게 분산된다.

Rayon이 빠를 때

  • CPU bound 작업 (수치 계산, 압축)
  • 원소 수 수천 개 이상
  • 각 원소 처리 비용이 비쌀 때

Rayon이 느릴 수도 있을 때

  • I/O bound (async가 나음)
  • 원소 수 적음 (스레드 오버헤드)
  • 원소당 처리가 매우 가볍고 빠를 때

5. async/await

OS 스레드 없이 수만 개의 동시 작업을 처리하는 모델. 핵심 개념은 Future.

// async fn은 컴파일 시 Future를 구현하는 익명 구조체로 변환됨
async fn fetch(url: &str) -> String {
    // await 지점에서 잠시 다른 태스크에 CPU 양보
    let body = reqwest::get(url).await.unwrap().text().await.unwrap();
    body
}

// Future는 poll() 될 때까지 아무것도 안 함 (lazy)
// tokio::main이 런타임(executor) 역할
#[tokio::main]
async fn main() {
    // join!: 여러 Future 동시 실행
    let (a, b) = tokio::join!(
        fetch("https://example.com/a"),
        fetch("https://example.com/b"),
    );

    // select!: 먼저 완료된 것만 처리
    tokio::select! {
        r = fetch("https://fast.com") => println!("fast: {r}"),
        r = fetch("https://slow.com") => println!("slow: {r}"),
    }
}
// async fn 내부 컴파일 결과 (개념적으로)
enum FetchStateMachine {
    Start,
    WaitingForResponse(/* future */),
    WaitingForText(/* future */),
    Done,
}

impl Future for FetchStateMachine {
    type Output = String;
    fn poll(&mut self, cx: &mut Context) -> Poll {
        // 각 await 지점이 상태 전환 포인트
        ...
    }
}

thread::spawn

  • OS 스레드 1개 = ~수 MB 스택
  • CPU bound에 적합
  • 블로킹 코드 그대로 사용 가능

tokio::spawn

  • Green thread (태스크) = ~수 KB
  • I/O bound에 적합
  • await 포인트에서만 다른 태스크 실행

6. 동시성 패턴 비교

방식공유통신적합한 상황
thread::spawnArc+MutexmpscCPU 병렬, 독립 작업
Rayon자동 관리내부 처리컬렉션 병렬 처리
mpsc 채널불필요메시지파이프라인, 워커 풀
async/tokioArc+Mutextokio::syncI/O 집약, 수만 연결