주요글: 도커 시작하기
반응형
개인적으로 관심을 가지고 지켜보던 Akka 프로젝트가 1.0 버전이 되었다. 평소에 병행 처리와 분산 처리에 관심이 많았는데, Akka는 이를 보다 쉽게 구현할 수 있도록 도와주는 프로젝트이다. 본 글에서는 Akka가 무엇인지 간단하게 설명하고 실제 Akka를 이용해서 액터를 생성하고 실행하는 방법을 살펴볼 것이다.

Akka란?

Akka는 병행(concurrent) 및 분산 처리를 위한 오픈 소스 프로젝트로서 액터(Actor) 모델을 이용하고 있다. 필자가 액터 모델 자체에 대한 이해가 완전하지 않지만, 액터 모델을 간단하게 설명하면 다음의 특징을 갖는다.

  • 액터들은 상태를 공유하지 않는다.
  • 액터들 간의 통신은 메시지 전달을 통해서 이루어진다. (이벤트 기반 모델)
  • 액터간의 통신은 비동기로 이루어진다.
  • 각 액터는 전달받은 메시지를 큐에 보관하며, 메시지를 순차적으로 처리한다.
  • 액터는 일종의 경량 프로세서다.
위와 같은 특징은 병행 처리 코드를 보다 쉽게 구현할 수 있도록 도와준다. 실제로 다중 쓰레드 프로그래밍을 해 본 개발자 중에서는 올바르지 못한 동기화 처리로 쓰레드 블럭킹 되는 등의 문제로 고생한 경험이 한 두 번씩은 존재할 것이다. 액터는 애초에 데이터를 서로 공유하지 않는 것을 원칙으로 하기 때문에, 데드락이나 락에 대한 고민을 줄여주고 병행 처리 그 자체에 집중할 수 있도록 도와준다.

Akka는 액터 모델을 통해서 병행 처리를 쉽게 할 수 있도록 도와줄 뿐만 아니라, 리모트 노드에 존재하는 액터를 마치 로컬에 존재하는 액터처럼 사용할 수 있도록 해 주고 있다. 개발자는 통신 프로토콜에 대해 고민할 필요 없이 리모트 액터를 사용할 수 있기 때문에 분산 처리 코드를 손쉽게 작성할 수 있게 된다.

Akka는 스카라(Scala)와 자바(Java)의 두 언어에 대한 API를 제공하고 있는데, 필자가 스카라 언어 자체에 대해서는 아직 잘 모르고 국내에서도 스카라 언어에 대한 관심이 적은 관계로 자바 API를 기준으로 Akka 사용법을 살펴볼 것이다.

Akka 코어 사용

Akka를 사용하려면 http://akka.io 사이트에서 필요한 파일을 다운로드 받으면 되는데, Maven을 사용하고 있다면 pom.xml 파일에 다음과 같이 리포지토리 설정과 의존 설정을 추가해주면 Akka가 제공하는 액터 기능을 사용할 수 있다.

<repositories>
    <repository>
        <id>Akka</id>
        <name>Akka Maven2 Repository</name>
        <url>http://akka.io/repository/</url>
    </repository>

    <repository>
        <id>Multiverse</id>
        <name>Multiverse Maven2 Repository</name>
        <url>http://multiverse.googlecode.com/svn/maven-repository/releases/</url>
    </repository>

    <repository>
        <id>GuiceyFruit</id>
        <name>GuiceyFruit Maven2 Repository</name>
        <url>http://guiceyfruit.googlecode.com/svn/repo/releases/</url>
    </repository>

    <repository>
        <id>JBoss</id>
        <name>JBoss Maven2 Repository</name>
        <url>http://repository.jboss.org/nexus/content/groups/public/</url>
    </repository>
</repositories>

<dependencies>
    <dependency>
        <groupId>se.scalablesolutions.akka</groupId>
        <artifactId>akka-actor</artifactId>
        <version>1.0</version>
    </dependency>
</dependencies>

[주의]
Akka는 Scala로 만들어졌기 때문에, 이클립스에서 Akka 모듈을 사용할 때 코드 어시스트가 제대로 동작하려면 Scala-IDE 등을 이용해서 개발환경을 구축해 주어야 한다. 그렇지 않으면, 코드 어시스트가 안 되서 짜증나는 시간을 보내게 될 것이다.

액터 클래스 및 사용

액터를 생성하고 사용하려면, 먼저 Akka가 제공하는 기반 클래스를 이용해서 액터 역할을 수행할 클래스를 구현해 주어야 한다. Akka는 akka.actor.UntypedActor 클래스를 제공하고 있으며, 이 클래스를 상속받아서 액터를 구현할 수 있다.

import akka.actor.UntypedActor;

public class PrintActor extends UntypedActor {

    @Override
    public void onReceive(Object message) throws Exception {
        Thread.sleep(500); // 테스트를 위해 0.5초 sleep
        System.out.println(message);
    }

}

UntypedActor를 상속받은 클래스는 onReceive() 메서드를 구현해 주어야 하는데, onReceive() 메서드는 액터에 전달된 메시지를 처리하게 된다.

액터 클래스를 만들었다면, 액터를 생성한 뒤에 액터에 메시지를 전달할 수 있다. 액터를 생성할 때에는 akka.actor.Actors 클래스의 static 메서드인 actorOf() 메서드를 사용한다. Actors.actorOf() 메서드는 액터를 구현한 클래스의 Class를 전달받으며, 액터를 생성한다.

ActorRef actor = Actors.actorOf(PrintActor.class);
actor.start();
// actor를 이용해서 액터에 메시지를 전달

ActorRef의 start() 메서드는 액터를 시작하며, 액터가 시작된 이후부터 액터에 메시지를 전달할 수 있게 된다.

액터에 메시지 전달하기

Actors.actorOf()를 이용해서 액터를 생성했다면, 이후 ActorRef가 제공하는 메서드를 이용해서 액터에 메시지를 전달할 수 있다. 다음의 세 가지 방법으로 액터에 메시지를 전달할 수 있다.
  • Fire-And-Forget: 메시지를 전달하고 메시지에 대한 응답을 기다리지 않는다. 병행 및 확장에 적합한 메시지 전달 방식이다.
  • Send-And-Receive-Eventually: 메시지를 전달하고 응답을 받는다. 응답을 받을 때 까지 블록킹된다.
  • Send-And-Receive-Future: 메시지를 전달하고 응답을 받기 위한 Future를 리턴한다.
sendOneWay() 메서드를 이용한 Fire-And-Forget 방식 메시지 전달

ActorRef.sendOneWay() 메서드는 메시지를 액터에 전달할 때 사용된다. sendOneWay()라는 이름에서 알 수 있듯이 이 메서드는 액터로부터 어떤 값도 받지 않으며, 액터로부터 응답을 기다리지 않고 곧 바로 리턴한다.

ActorRef actor = Actors.actorOf(PrintActor.class);
actor.start();
actor.sendOneWay("받아라"); // actor에 "받아라" 메시지를 전달하고 바로 리턴.
actor.sendOneWay("받아라2");
actor.sendOneWay("받아라3");
System.out.println("비동기로 실행");

메시지를 받은 액터는 내부적으로 사용하는 큐에 메시지를 보관한 뒤, 차례대로 액터의 onReceive(Object message) 메서드에 메시지를 전달한다. PrintActor의 onReceive() 메서드는 0.5초후에 전달받은 메시지를 출력하고 sendOneWay()메서드는 응답 대기 없이 바로 리턴하므로, 위 코드가 실행되면 콘솔에는 다음과 같은 순서로 문자열이 출력된다.

비동기로 실행
받아라
받아라2
받아라3

sendOneWay() 메서드는 다음의 두 가지를 제공된다.
  • sendOneWay(Object message)
  • sendOneWay(Object message, ActorRef sender) : 메시지를 전송하면서 메시지를 보낸 액터로 sender를 지정한다.

sendRequestReply() 메서드를 이용한 Send-And-Receive-Eventually 방식 메시지 전달

ActorRef.sendRequestReply() 메서드는 액터에 메시지를 전달하고, 그 메시지에 대한 응답이 올 때 까지 대기하고 싶을 때 사용된다. 액터 구현 클래스는 getContext().replyUnsafe() 메서드를 이용해서 메시지에 대해 응답할 수 있는데, ActorRef.sendRequestReply() 메서드는 이 응답을 리턴하게 된다. 예를 들어, 다음과 같이 메시지에 대해 응답하는 액터가 있다고 하자.

public class PingActor extends UntypedActor {

    @Override
    public void onReceive(Object message) throws Exception {
        getContext().replyUnsafe("응답: "+ message); // 메시지 sender에 응답
    }

}

이 경우 다음과 같이 sendRequestReply() 메서드를 이용함으로써 액터에 전달한 메시지에 대한 응답이 도착할 때 까지 대기할 수 있다.

ActorRef actor = Actors.actorOf(PingActor.class);
actor.start();
Object res = actor.sendRequestReply("헬로우"); // 액터로부터 응답이 도착할 때 까지 대기

sendRequestReply() 메서드는 일정 시간 동안 액터로부터 응답이 없을 경우 akka.actor.ActorTimeoutException 예외를 발생시킨다. 별도 설정을 하지 않은 경우 기본 타입 시간은 5초이며, sendRequestReply() 메서드를 호출할 때 타임아웃을 지정할 수도 있다.

Object res = actor.sendRequestReply("헬로우", 1000, null); // 1초간 응답 대기

sendRequestReply() 메서드는 다음의 세 가지가 존재한다.
  • sendRequestReply(Object message)
  • sendRequestReply(Object message, ActorRef sender): 메시지를 보낸 액터로 sender를 지정한다.
  • sendRequestReply(Object message, long timeout, ActorRef sender)

sendRequestReplyFuture() 메서드를 이용한 Send-And-Receive-Future 방식 메시지 전달

sendRequestReplyFuture() 메서드는 메시지를 전달한 뒤 응답을 받기 위한 Future를 리턴한다. Future는 자바가 제공하는 Future가 아닌 Akka가 제공하는 akka.dispatch.Future 타입이다. Future는 주로 다음과 같은 형식으로 주로 사용된다.

Future future = actor.sendRequestReplyFuture("하이");
future.await(); // 응답을 대기. 대기 시간을 초과하면 예외 발생
if (future.isCompleted()) { // 완료되었다면
    Option resultOption = future.result(); // 응답 구함
    if (resultOption.isDefined()) { // 응답 데이터가 있다면,
        Object result = resultOption.get(); // 응답 데이터 구함
        System.out.println(result);
    }
}

sendRequestReplyFuture()가 리턴한 Future의 await() 메서드는 시간이 초과될 때 까지 대기한다. 시간이 초과되기 전에 응답이 도착하면 다음으로 넘어가고, 시간이 초과되면 ActorTimeoutException 예외를 발생시킨다.

sendRequestReplyFuture() 메서드는 다음의 세 가지가 존재한다.
  • sendRequestReplyFuture(Object message)
  • sendRequestReplyFuture(Object message, ActorRef sender) : 메시지를 보낸 액터로 sender를 지정한다.
  • sendRequestReplyFuture(Object message, long timeout, ActorRef sender)
두 개 이상의 액터에 메시지를 전달한 후 액터로부터의 응답이 모두 도착할 때 까지 대기해야 한다면, Futures.awaitAll(Future[] futures) 메서드를 사용하면 된다.

// actor들에 메시지 전달(작업 전달)
for (ActorRef actor : actors) {
    futureList.add(actor.sendRequestReplyFuture(someWork);
}
Future[] futures =  futureList.toArray();
Future.awaitAll(futures); // 모든 액터로부터 응답이 (작업 결과가) 올 때 까지 대기.
// futures로부터 응답 구해서 처리


액터에서 메시지 받아 처리하기

ActorRef의 send*() 메서드를 통해서 전달된 메시지는 UntypedActor 클랫를 상속받은 액터 구현 클래스의 onReceive(Object message)에 차례대로 전달된다. (TypedActor를 사용하면 인터페이스를 이용해서 메시지를 전달받을 메서드를 정의할 수도 있는데, 이에 대한 내용은 다음에 살펴볼 것이다.)

onReceive() 메서드는 다음과 같이 메시지의 타입을 확인한 뒤 메시지 타입에 맞는 동작을 수행하도록 구현하는 것이 보통이다.

public class ActorImpl extends UntypedActor {
    public void onReceive(Object message) throws Exception {
        if (message instanceof String) {
            // 메시지 처리
        }
        ...
    }
}


메시지에 응답하기

replyUnsafe()/replySafe()를 이용한 응답

액터 구현 클래스는 getContext()를 이용해서 해당 액터에 대한 ActorRef를 구할 수 있는데, ActorRef의 replyUnsafe() 또는 replySafe() 메서드를 이용해서 메시지에 대한 응답을 전달할 수 있다. replyUnsafe() 메서드는 응답 실패시 예외를 발생시키는 반면에 replySafe() 메서드는 응답에 실패할 경우 false를 리턴한다.

public class PingActor extends UntypedActor {

    public void onReceive(Object message) throws Exception {
        if (message.equals("ping")) {
            if (! getContext.replySafe("pong")) {
                // 실패에 대한 처리
            }
        }
    }
}

액터에 메시지를 전달할 때 sendRequestReply() 메서드나 sendRequestReplyFuture() 메서드를 사용한 경우, replyUnsafe()와 replySafe()를 이용해서 응답한 데이터를 리턴 값으로 받게 된다.
 
메시지를 전달한 액터에 메시지로 응답하기

액터가 다른 액터에게 메시지를 전달하기도 한다. 이때 sendRequestReply*() 메서드에 대한 응답이 아니라 메시지를 전달한 액터에 메시지를 전달하는 방법으로 응답할 수도 있을 것이다. 이렇게 메시지를 전달한 액터에 응답으로 메시지를 전송하고 싶다면, getContext().getSender() 메서드를 이용해서 메시지를 보낸 액터에 대한 ActorRef를 구한 뒤, 그 ActorRef의 send*() 메서드를 이용해서 응답을 전달하면 된다.

public class PongActor extends UntypedActor {

    @Override
    public void onReceive(Object message) throws Exception {
        if (message.equals("ping")) {
            if (getContext().getSender().isDefined()) {
                ActorRef sender = getContext().getSender().get();
                sender.sendOneWay("pong", getContext());
            } else {
                getContext().replyUnsafe("pong");
            }
        }
    }
}


액터의 라이프사이클

액터의 라이프 사이클은 다음과 같다.
  • NEW: 액터가 만들어졌을 때. 메시지를 수신하지 못한다.
  • STARTED: start()가 호출되었을 때. 메시지를 수신할 수 있다.
  • SHUTDOWN: exit()나 stop()이 호출되었을 때. 어떤 것도 하지 못한다.
ActorRef는 start(), stop() 메서드를 제공하고 있으며, 이들 메서드를 이용해서 액터를 시작하고, 중지할 수 있다. 아래 코드는 전형적인 액터의 사용방법을 보여주고 있다.

actor.start(); // 액터를 시작
// 필요한 만큼 액터에 메시지 전달
actor.sendOneWay(msg);
...
actor.stop(); // 액터 종료

start() 메서드는 액터와 메시지 큐를 시작하고, stop() 메서드는 액터의 디스패처와 메시지 큐를 포함한 액터를 종료시킨다.

모든 액터를 종료시키고 싶다면, 다음과 같은 코드를 사용하면 된다.

Actors.registry().shtudownAll();


UntypedActor 클래스의 라이프 사이클 관련 콜백 메서드

UntypedActor 클래스는 라이프 사이클과 관련해서 시작/중지 이벤트를 처리할 수 있는 콜백 메서드를 제공하고 있다.
  • preStart(): 액터 시작 전에 호출된다.
  • postStop(): 액터 종료 후에 호출된다.
  • preRestart(Throwable reason): 액터 재시작 전에 호출된다. (무정지 액터 기능과 관련됨)
  • postRestart(Throwable reason): 액터 재시작 후에 호출된다. (무정지 액터 기능과 관련됨)

설정 파일 지정 및 기본 값

Akka는 다음의 세 가지 방법 중 한가지를 이용해서 설정 파일을 찾는다.
  • akka.config 시스템 프로퍼티로 지정한 파일 (java -Dakka.config=... )
  • 클래스패스에 위치한 akka.config 파일
  • AKKA_HOME 환경변수 존재 시, '$AKKA_HOME/config 디렉터리의 설정 파일 사용. (또는 akka.home 시스템 프로퍼티를 AKKA_HOME 환경 변수 대신 사용)
각 설정 정보 및 기본 값은 http://doc.akka.io/configuration 참고하기 바란다.

참고자료

+ Recent posts