원문: Effective Scala - Twitter's standard libraries
Locals
Util의 Local은 특정 Future 디스패치 트리에 국한된 참조 셀을 제공합니다. Local의 값을 설정하면, 동일한 스레드에서 Future에 의해 지연된 모든 계산에서 해당 값을 사용할 수 있습니다. 이들은 스레드 로컬과 유사하지만, 범위가 Java 스레드가 아닌 "Future 스레드"의 트리입니다.
trait User {
def name: String
def incrCost(points: Int)
}
val user = new Local[User]
...
user() = currentUser
rpc() ensure {
user().incrCost(10)
}
위 코드에서 ensure 블록의 user()는 콜백이 추가된 시점의 user 로컬 값을 참조합니다.
스레드 로컬과 마찬가지로 로컬은 매우 편리할 수 있지만, 피할 수 있다면 항상 피해야 합니다. 데이터 전달이 다소 번거롭더라도, 명시적으로 데이터를 전달하는 방식으로 문제를 해결할 수 있는지 확인하세요.
로컬은 RPC trace를 연결하거나, Monitor를 전달하거나, Future 콜백에 대한 "스택 추적"을 생성하는 등 핵심 라이브러리에서 매우 일반적인 문제에 효과적으로 사용됩니다. 이러한 상황에서는 다른 솔루션이 사용자에게 과도한 부담을 줄 수 있습니다. 그러나 이와 다른 대부분의 경우에는 로컬을 사용하는 것이 적절하지 않습니다.
Offer / Broker
동시성 시스템은 공유 데이터와 자원에 대한 접근을 조율해야 하기 때문에 매우 복잡합니다. 액터(akka 라이브러리)는 이러한 복잡성을 단순화하는 한 가지 전략으로, 각 액터는 자체 상태와 자원을 유지하며 다른 액터들과 메시지를 통해 데이터를 공유합니다. 공유 데이터는 액터 간의 통신을 필요로 합니다.
Offer / Broker는 이 개념을 세 가지 중요한 방식으로 확장합니다.
1. 커뮤니케이션 채널(Brokers)도 일급 객체입니다. 즉, 브로커를 통해 메시지를 전달하지, 액터로 바로 메시지를 전달하지 않습니다.
2. Offer / Broker는 동기 매커니즘입니다. 통신은 곧 동기화를 의미합니다. 즉, 브로커를 조정 메커니즘으로 사용한다는 것입니다. 만약 프로세스 a가 프로세스 b로 메시지를 보낸다면, a와 b는 현재의 시스템 상태에 대해 동의하게 됩니다.
3. 통신은 선택적으로 동작하게 됩니다. 프로세스는 여러 개의 통신을 제안할 수 있으며, 그 중 정확히 하나가 이루어질 수 있습니다.
선택적 통신과 기타 조합을 일반적으로 지원하려면, 통신의 설명과 통신의 행위를 분리해야 합니다. 이것이 Offer가 하는 역할입니다 — Offer는 통신을 설명하는 지속적인 값이며, 이 통신을 수행하려면 sync() 메서드를 통해 동기화합니다.
trait Offer[T] {
def sync(): Future[T]
}
이는 통신이 성사될 때 교환된 값을 포함한 Future[T]를 반환합니다.
Broker는 Offer를 통해 값을 교환하는 것을 조정하는 역할을 합니다. 즉, 통신 채널입니다.
trait Broker[T] {
def send(msg: T): Offer[Unit]
val recv: Offer[T]
}
다음처럼 두 개의 offer를 만들고 양 쪽에서 sync를 호출하면 서로 1이라는 값이 통신됩니다.
val b: Broker[Int]
val sendOf = b.send(1)
val recvOf = b.recv
// process 1
sendOf.sync()
// process 2
recvOf.sync()
선택적 통신은 여러 Offer를 Offer.choose로 결합하여 수행합니다.
def choose[T](ofs: Offer[T]*): Offer[T]
이는 새로운 Offer를 반환하며, 동기화될 때 ofs 중 정확히 하나를 얻습니다 — 선택되는 것은, 가장 먼저 사용 가능한 Offer입니다. 여러 개가 즉시 사용 가능한 경우 무작위로 하나가 선택됩니다.
Offer 객체는 Broker와 결합하여 사용할 수 있는 여러 가지 일회용 Offer를 정의합니다.
// 주어진 기간 후에 활성화되는 Offer
Offer.timeout(duration): Offer[Unit]
// 절대 활성화되지 않는 Offer
Offer.never
// 주어진 값을 즉시 얻는 Offer
Offer.const(value)
이들은 선택적 통신을 통해 조합할 때 유용합니다. 예를 들어, 전송 작업에 타임아웃을 적용하려면 다음과 같이 사용할 수 있습니다.
Offer.choose(
Offer.timeout(10.seconds),
broker.send("my value")
).sync()
Offer/Broker의 사용을 SynchronousQueue와 비교할 수 있지만, 미묘하고 중요한 차이점이 있습니다. Offer는 그러한 큐로는 불가능한 방식으로 조합될 수 있습니다. 예를 들어, Brokers로 표현된 큐의 집합을 고려해 봅시다.
val q0 = new Broker[Int]
val q1 = new Broker[Int]
val q2 = new Broker[Int]
이제 읽기 위한 병합된 큐를 생성해 봅시다.
val anyq: Offer[Int] = Offer.choose(q0.recv, q1.recv, q2.recv)
anyq는 첫 번째로 사용 가능해진 큐에서 읽는 Offer입니다. anyq는 여전히 동기적이며, 기본 큐의 의미를 유지하고 있습니다. 이러한 조합은 단순한 큐만으로는 불가능합니다.
Example: A Simple Connection Pool
커넥션 풀은 네트워크 어플리케이션에서 흔히 사용되며, 구현이 까다로운 경우가 많습니다. 예를 들어, 다양한 클라이언트가 서로 다른 지연 시간 요구사항을 가지기 때문에 풀에서 커넥션을 획득할 때 타임아웃을 적용하는 것이 바람직할 때가 많습니다. 풀의 원리는 간단합니다. 사용 가능한 커넥션의 큐를 유지하고, 대기자가 들어올 때 이를 만족시키는 것입니다. 전통적인 동기화를 구현한다면 보통 두 개의 큐가 필요합니다. 하나는 커넥션이 없을 때 대기자들을 위한 큐, 다른 하나는 대기자가 없을 때 커넥션을 저장하는 큐입니다.
Offer/Broker를 사용하면 이러한 문제를 자연스레 표현할 수 있습니다.
class Pool(conns: Seq[Conn]) {
private[this] val waiters = new Broker[Conn]
private[this] val returnConn = new Broker[Conn]
val get: Offer[Conn] = waiters.recv
def put(c: Conn) { returnConn ! c } // !는 actor에서 tell, 메시지의 전달을 의미합니다
private[this] def loop(connq: Queue[Conn]) {
Offer.choose(
if (connq.isEmpty) Offer.never
else {
val (head, rest) = connq.dequeue()
waiters.send(head) map { _ => loop(rest) }
},
returnConn.recv map { c => loop(connq.enqueue(c)) }
).sync()
}
loop(Queue.empty ++ conns)
}
loop는 항상 커넥션을 제공하기를 원하지만, 큐가 비어 있지 않을 때만 하나의 커넥션을 제공합니다. 영속적인 큐를 사용하면 이러한 동작을 더 쉽게 이해할 수 있습니다. 풀의 인터페이스 역시 Offer를 통해 이루어지므로, 호출자가 타임아웃을 적용하고자 할 때 조합을 사용하여 이를 간단히 구현할 수 있습니다.
val conn: Future[Option[Conn]] = Offer.choose(
pool.get map { conn => Some(conn) },
Offer.timeout(1.second) map { _ => None }
).sync()
타임아웃을 구현하기 위해 별도의 추가 작업이 필요하지 않습니다. 이는 Offer의 의미론 덕분입니다. 만약 Offer.timeout이 선택되면, 더 이상 풀에서 커넥션을 받을 수 있는 제안이 존재하지 않게 됩니다. 즉, 풀과 호출자는 대기자 브로커에서 각각 동시에 수신 및 송신에 동의하지 않은 상태가 됩니다.
Example: Sieve of Eratosthenes (에라토스테네스의 체)
동시성 프로그램을 순차적인 프로세스로 구조화하여 동기적으로 통신하도록 하는 것은 종종 유용하고, 때로는 훨씬 단순하게 구현되도록 해줍니다. Offer와 Broker는 이를 간단하고 일관되게 만드는 도구 세트를 제공합니다. 사실, 이들의 응용은 전통적인 "고전적" 동시성 문제를 초월합니다. Offer/Broker를 사용한 동시성 프로그래밍은 하위 루틴, 클래스, 모듈과 마찬가지로 유용한 구조화 도구로, CSP(통신 순차 프로세스)의 중요한 개념 중 하나입니다.
이러한 구조화의 예로 에라토스테네스의 체가 있습니다. 이는 정수 스트림에 대한 필터의 연속적인 적용으로 구조화할 수 있습니다. 먼저, 정수의 소스를 정의합니다.
def integers(from: Int): Offer[Int] = {
val b = new Broker[Int]
def gen(n: Int): Unit = b.send(n).sync() ensure gen(n + 1)
gen(from)
b.recv
}
integers(n)은 n부터 시작하는 연속된 정수의 Offer입니다. 다음으로 필터가 필요합니다.
def filter(in: Offer[Int], prime: Int): Offer[Int] = {
val b = new Broker[Int]
def loop() {
in.sync() onSuccess { i =>
if (i % prime != 0) b.send(i).sync() ensure loop()
else loop()
}
}
loop()
b.recv
}
filter(in, p)는 입력 in에서 소수 p의 배수를 제거하는 Offer를 반환합니다. 마지막으로, 체를 정의합니다.
def sieve = {
val b = new Broker[Int]
def loop(of: Offer[Int]) {
for (prime <- of.sync(); _ <- b.send(prime).sync())
loop(filter(of, prime))
}
loop(integers(2))
b.recv
}
loop()는 단순히 of에서 다음 (소수) 숫자를 읽고, 이 소수를 제외하는 필터를 of에 적용합니다. loop가 재귀적으로 호출되면서 연속적인 소수들이 필터링되어 체가 형성됩니다. 이제 처음 10,000개의 소수를 출력할 수 있습니다.
val primes = sieve
0 until 10000 foreach { _ =>
println(primes.sync()())
}
이 접근 방식은 간단하고 독립적인 구성 요소로 구조화되었을 뿐만 아니라, 스트리밍 방식의 체를 제공합니다. 관심 있는 소수의 집합을 사전에 계산할 필요가 없어, 모듈성이 더욱 향상됩니다.