저작권 안내: 저작권자표시 Yes 상업적이용 No 컨텐츠변경 No

스프링5 입문

JSP 2.3

JPA 입문

DDD Start

인프런 객체 지향 입문 강의
특정 노드에 갑자기 장애가 발생했다. 이럴 때 가장 먼저 개발자들이 선택하는 장애 대처 방법은? 아마도 관련 프로세스를 재시작하는 방법이 가장 많이 사용될 것이다. 웹서버를 재시작하거나 로그 수신기를 재시작하는 등 뭔가 문제가 발생한 영역의 프로세스나 쓰레드를 재시작함으로써 서비스 다운타임을 줄이는 것이 장애 발생 시 첫 번째로 수행하는 작업이다. 일단, 재시작해서 서비스가 살아나도록 만든 뒤, 그 다음에 원인 분석을 하게 된다.

Akka도 이와 비슷한 방법으로 액터의 장애에 대응할 수 있는 방법을 제공하고 있다. 액터는 자신을 관리하는 Supervisor를 가질 수 있으며, Supervisor는 액터가 다운될 경우 재시작함으로써 다운타임을 최소화하도록 해 준다. 이 방식은 얼랭(erlang)으로부터 빌려온 방식으로서 Akka는 Supervisor를 통해 무정지 서비스를 구현할 수 있도록 하고 있다.

액터의 두 가지 라이프 사이클: permanent, temporary

액터는 다음의 두 가지 라이프 사이클을 가진다.
  • permanent: 메시지 처리 과정에서 예외가 발생해도 액터가 살아 있음.
  • temporary: 메시지 처리 과정에서 예외가 발생하면 액터가 죽음.
액터를 permanent로 설정할 지 temporary로 설정할 지의 여부는 다음과 같이 설정할 수 있다.

import akka.config.Supervision;

public class WorkerActor extends UntypedActor {
    public WorkerActor() {
        getContext().setLifeCycle(Supervision.temporary());
        ...
    }


Akka의 액터 예외 대응 방식: Let it Crash

다중 쓰레드를 이용해서 병행 처리 코드를 작성할 경우, 병행 처리 코드에서 예외가 발생했을 때 이를 알 수 있는 방법은 예외 추적 메시지를 확인하는 방법 뿐이다. (또는 try-catch로 모든 예외를 잡아서 알림해 주는 기능을 넣는 방법 뿐이다.) 예외가 발생해서 병행 처리 쓰레드가 종료된 경우 이를 복구하는 방법은 재시작해주는 것 외에 특별한 방법이 없다.

Akka는 액터가 더 이상 정상적으로 동작할 수 없는 상태가 되어 메시지 처리 과정 중 예외를 발생시키면, 해당 액터를 재시작하는 방법으로 장애에 대응한다. 복구할 수 없는 예외 상황 발생시 액터가 뭔가 하지 않고 그냥 죽도록 놔두고 안정된 상태로 초기화하고 재시작하기 때문에, 이 방식을 "Let it Crash"라고 부른다.

수버바이저를 이용한 액터 관리

Akka는 수퍼바이저를 이용해서 액터를 관리한다. 수퍼바이저는 다른 액터를 모니터링하는 액터로서, 수퍼바이저 액터에서 다른 액터를 연결(link)함으로써 수퍼바이저가 다른 액터를 관리하게 된다.

수퍼바이저는 연결된 액터가 죽었을 때 다음의 두 가지 방식을 이용해서 연결된 액터를 재시작한다. 참고로 permanent 모드의 액터만 재시작되며, temporary 액터는 재시작되지 않는다.
  • One-For-One
  • All-For-One
One-For-One은 수퍼바이저와 연결된 액터가 죽으면, 죽은 액터만 재시작하고 나머지 연결된 액터는 그대로 유지한다. (아래 그림 참고)


[발췌: http://doc.akka.io/fault-tolerance-java]

반면 All-For-One은 수퍼바이저와 연결된 액터 중 하나가 죽으면, 연결된 모든 액터를 재시작한다. (아래 그림 참고) 이는 수퍼바이저에 의해 관리되는 액터 중 하나로도 비정상적으로 동작하면 나머지 액터들도 영향을 받아서 비정상적으로 동작하게 될 때에 사용된다.


[발췌: http://doc.akka.io/fault-tolerance-java]

수퍼바이저(Supervisor) 액터 만들기

수퍼바이저 액터는 일반 액터와 동일한 액터로서, 다음의 두 가지 방법을 이용해서 만들 수 있다.
  • link() 메서드를 이용
  • Supervisor 클래스를 이용해서 생성

link()를 이용한 액터 연결 및 관리

액터는 다른 액터를 연결함으로써 수퍼바이저 액터가 될 수 있으며, 연결할 때에는 link() 메서드를 사용한다. link()를 이용해서 액터를 관리할 경우 다음과 같은 방법으로 개발을 진행하면 된다.
  1. 수퍼바이저 액터로 동작할 클래스의 생성자에 FaultHandler를 지정한다. FaultHandler는 관리되는 액터가 죽었을 때, 그 액터만 재시작할 지 아니면 관리되는 모든 액터를 재시작할 지의 여부를 지정한다.
  2. 수퍼바이저 액터를 생성한 뒤, 관리할 액터를 link()로 연결한다.

1번, 수퍼바이저 액터를 직접 구현할 경우 다음과 같이 수퍼바이저 액터 생성자에서 재시작 전략을 지정해 주어야 한다.

import akka.actor.UntypedActor;
import akka.config.Supervision.OneForOneStrategy;

public class MasterActor extends UntypedActor {

    public MasterActor() {
        getContext().setFaultHandler(
                new OneForOneStrategy(
                        new Class[] { RuntimeException.class }, 3, 1000));
    }

    @Override
    public void onReceive(Object message) throws Exception {
        System.out.println("Master가 받은 메시지: " + message);
    }

}

위 코드에서 MasterActor는 관리하는 액터가 죽으면 해당 액터만 재시작하도록 설정하였다. OneForOneStrategy 객체를 생성할 때 첫 번째 파라미터는 액터를 재시작할 예외 타입을 지정한다. 위 코드는 모니터링 대상 액터의 onReceive() 메서드에서 RuntimeException이 발생하면 액터를 재시작한다는 것을 의미한다. 뒤의 두 숫자에 대해서는 뒤에서 다시 설명하겠다.

관리되는 액터가 죽을 때 관리되는 다른 액터들도 함께 재시작하고 싶은 경우에는 AllForOneStrategy 클래스를 사용하면 된다. 생성자에 전달되는 파라미터 목록은 OneForOneStrategy 클래스와 동일하다.

2번, 수퍼바이저 액터를 알맞게 구현했다면 그 다음으로 할 작업은 link() 메서드를 이용해서 수퍼바이저에 관리할 액터를 연결해 주는 것이다. 아래 코드는 예를 보여주고 있다.

ActorRef master = Actors.actorOf(MasterActor.class);
master.start();

ActorRef worker1 = Actors.actorOf(WorkerActor.class);
worker1.start();

master.link(worker1);

테스트를 위해 WorkerActor가 "die"라는 메시지를 받으면 RuntimeException을 발생시키도록 구현해 보았다.

@SuppressWarnings("unchecked")
public class WorkerActor extends UntypedActor {
    private static int num = 1;
   
    private int id;
    public WorkerActor() {
        id = num++;
        System.out.println("액터 생성됨: " + id);
    }
   
    @Override
    public void onReceive(Object message) throws Exception {
        if (message.equals("die")) {
            throw new RuntimeException("고의적 DIE");
        }
        System.out.println("Worker " + id + ": " + message);
    }
   
    @Override
    public void preRestart(Throwable cause) {
        System.out.println("Worker " + id + ": 재시작 전처리");
    }
   
    @Override
    public void postRestart(Throwable cause) {
        System.out.println("Worker " + id + ": 재시작 후처리");
    }
   
}

WorkerActor는 preRestart() 메서드와 postRestart() 메서드를 구현하고 있는데, 이 두 메서드는 각각 액터가 재시작하기 전/후에 호출된다. WorkerActor가 생성될 때 마다 1씩 증가된 id 값을 할당하는데 id 값을 새로 부여한 이유는 액터가 재시작할 때 액터 객체를 새로 생성하는 지의 여부를 확인하기 위해서다.

ActorRef master = Actors.actorOf(MasterActor.class);
master.start();

ActorRef worker1 = Actors.actorOf(WorkerActor.class);
worker1.start();
ActorRef worker2 = Actors.actorOf(WorkerActor.class);
worker2.start();

master.link(worker1); // master에 worker1 액터 연결
master.link(worker2); // master에 worker2 액터 연결

worker1.sendOneWay("메시지1-1");
worker2.sendOneWay("메시지2-1");
worker1.sendOneWay("메시지1-2");
worker2.sendOneWay("메시지2-2");

worker1.sendOneWay("die"); // worker1 액터 죽임!
worker1.sendOneWay("메시지1-3"); // worker1 액터에 메시지 전달
worker2.sendOneWay("메시지2-3");

위 코드는 중간에 worker1에 "die" 메시지를 보냄으로써 worker1을 죽인다. worker1 액터는 "die" 메시지를 받으면 RuntimeException을 발생시키는데, MasterWorker는 RuntimeException이 발생할 경우 해당 액터를 재시작하라고 설정하고 있다. 따라서, worker1 액터는 "die" 메시지를 받는 순간 RuntimeException을 발생시키며 죽지만 곧이어 재시작하게 되고, 따라서 죽은 이후에 받은 "메시지1-3" 메시지를 재시작한 액터가 처리하게 된다.

실제 위 코드의 실행 결과는 다음과 같다. (Akka가 출력하는 로그 메시지 중 중요한 것만 남기고 나머지는 생략하였다.)

액터 생성됨: 1
액터 생성됨: 2
16:43:40.843 [main] DEBUG akka.actor.Actor$ - Linking actor [Actor[tuto3.WorkerActor:46f67fb0-506a-11e0-a0e5-001d92ad4c1a]] to actor [Actor[tuto3.MasterActor:46f1c4c0-506a-11e0-a0e5-001d92ad4c1a]]
16:43:40.843 [main] DEBUG akka.actor.Actor$ - Linking actor [Actor[tuto3.WorkerActor:46f67fb1-506a-11e0-a0e5-001d92ad4c1a]] to actor [Actor[tuto3.MasterActor:46f1c4c0-506a-11e0-a0e5-001d92ad4c1a]]
Worker 1: 메시지1-1
Worker 1: 메시지1-2
16:43:40.875 [akka:event-driven:dispatcher:global-1] ERROR akka.actor.Actor$ - Exception when invoking
    actor [Actor[tuto3.WorkerActor:46f67fb0-506a-11e0-a0e5-001d92ad4c1a]]
    with message [die]
16:43:40.875 [akka:event-driven:dispatcher:global-1] ERROR akka.actor.Actor$ - Problem
java.lang.RuntimeException: 고의적 DIE
    at tuto3.WorkerActor.onReceive(WorkerActor.java:18) ~[classes/:na]
    ...
Worker 2: 메시지2-1
Worker 2: 메시지2-2
Worker 2: 메시지2-3
16:43:40.968 [akka:event-driven:dispatcher:global-3] INFO  akka.actor.Actor$ - Restarting actor [tuto3.WorkerActor] configured as PERMANENT.
16:43:40.968 [akka:event-driven:dispatcher:global-3] DEBUG akka.actor.Actor$ - Invoking 'preRestart' for failed actor instance [tuto3.WorkerActor].
Worker 1: 재시작 전처리
액터 생성됨: 3
16:43:40.968 [akka:event-driven:dispatcher:global-3] DEBUG akka.actor.Actor$ - Invoking 'postRestart' for new actor instance [tuto3.WorkerActor].
Worker 3: 재시작 후처리
16:43:40.968 [akka:event-driven:dispatcher:global-3] DEBUG akka.actor.Actor$ - Restart: true for [tuto3.WorkerActor].
16:43:40.968 [akka:event-driven:dispatcher:global-3] DEBUG a.d.Dispatchers$globalExecutorBasedEventDrivenDispatcher$ - Resuming 46f67fb0-506a-11e0-a0e5-001d92ad4c1a
16:43:40.984 [akka:event-driven:dispatcher:global-4] DEBUG akka.dispatch.MonitorableThread - Created thread akka:event-driven:dispatcher:global-4
Worker 3: 메시지1-3

위 실행 결과를 보면 다음의 사실을 확인할 수 있다.
  • 재시작 전처리는 1번 Worker가 수행한다.
  • 전처리 후, worker1에 해당하는 새로운 액터 객체를 생성한다. (액터 생성된: 3)
  • 재시작 후처리는 3번 Worker가 수행한다.
  • 이후 worker1은 3번 Worker와 연결되며, "메시지1-3" 메시지는 3번 Worker가 수행하게 된다.
즉, 액터가 죽으면 그 액터 객체를 재사용하는 것이 아니라 새로운 액터 객체를 생성하는 방법으로 재시작하는 것을 알 수 있다.

Supervisor 클래스를 이용한 수퍼바이저 액터 생성

수퍼바이저 액터에서 직접 관리할 액터를 생성하는 경우가 아니면 수퍼바이저 액터를 별도로 구현하기 보다는 Akka가 제공하는 Supervisor 클래스를 이용하는 것이 편리하다.


import akka.actor.Supervisor;
import akka.actor.SupervisorFactory;
import akka.config.Supervision;
import akka.config.Supervision.OneForOneStrategy;
import akka.config.Supervision.Supervise;
import akka.config.Supervision.SupervisorConfig;

...

ActorRef worker1 = Actors.actorOf(WorkerActor.class);
ActorRef worker2 = Actors.actorOf(WorkerActor.class);

Supervise[] supervises = new Supervise[2];
supervises[0] = new Supervise(worker1, Supervision.permanent());
supervises[1] = new Supervise(worker2, Supervision.permanent());

OneForOneStrategy strategy = new OneForOneStrategy(
        new Class[] {RuntimeException.class}, 3, 3000);
SupervisorConfig config = new SupervisorConfig(strategy, supervises);

Supervisor supervisor = new SupervisorFactory(config).newInstance();
// supervisor 생성 시점에서 내부적으로 생성한 SupervisorActor와 worker1가 worker2가 시작됨

worker1.sendOneWay("메시지");

SupervisorFactory를 통해서 Supervisor를 생성하면, 내부적으로 SupervisorActor 타입의 액터를 생성하고, 그 액터에 SupervisorConfig에 지정된 모든 액터를 연결(link)하고, 각 액터를 시작(start) 한다.

내부적으로 생성한 SupervisorActor에 접근하고 싶다면, 다음과 같이 supervisor() 메서드를 사용하면 된다.

Supervisor supervisor = ...;
ActorRef supervisorActor = supervisor.supervisor();


재시작 횟수 제한

OneForOneStrategy나 AllForOneStrategy를 생성할 때 두 번째/세 번째 파라미터는 각각 제한된 시간 내의 최대 재시작 시도 회수와 제한을 시간 의미한다.  예를 들어, 아래 코드는 1초 이내에 최대 3번의 재시작 시도를 시도한다는 것을 의미한다. (1초 안에 재시작을 3번까지 허용한다는 의미가 아니다.)

new OneForOneStrategy(new Class[] { RuntimeException.class }, 3, 1000);

1초 안에 액터 재시작을 3번 실패하면 (예를 들어, postRestart() 메서드에서 런타임 예외가 발생해서 실패), 해당 액터에 대해 재시작 시도를 하지 않으며 더 이상 액터를 사용할 수 없게 된다.

액터 생성/연결 한번에 하기

ActorRef는 액터를 생성하고 관리하기 위한 메서드를 제공하고 있으며, 이들 메서드는 다음과 같다.
  • ActorRef spawn(Class clazz): 액터를 생성하고 시작한다.
  • ActorRef spawnLink(Class clazz): 액터를 생성하고 시작하고, 연결한다.
  • ActorRef spawnRemote(Class clazz, String host, int port, long timeout): 리모트 액터를 생성하고 시작한다.
  • void startLink(ActorRef actor): 액터를 시작하고 연결한다.
위 메서드를 이용하면 다음과 같이 코드를 조금 더 간결하게 작성할 수 있다.

ActorRef master = Actors.actorOf(MasterActor.class);
master.start();
ActorRef worker1 = master.spawnLink(WorkerActor.class); // worker1 액터 시작/연결 됨

ActorRef worker2 = Actors.actorOf(WorkerActor.class);
master.startLink(worker2); // worker2 시작/연결 됨


참고자료


Posted by 최범균 madvirus

댓글을 달아 주세요

개인적으로 관심을 가지고 지켜보던 Akka 프로젝트가 1.0 버전이 되었다. 평소에 병행 처리와 분산 처리에 관심이 많았는데, Akka는 이를 보다 쉽게 구현할 수 있도록 도와주는 프로젝트이다. 본 글에서는 Akka가 무엇인지 간단하게 설명하고 실제 Akka를 이용해서 액터를 생성하고 실행하는 방법을 살펴볼 것이다.

Akka란?

Akka는 병행(concurrent) 및 분산 처리를 위한 오픈 소스 프로젝트로서 액터(Actor) 모델을 이용하고 있다. 필자가 액터 모델 자체에 대한 이해가 완전하지 않지만, 액터 모델을 간단하게 설명하면 다음의 특징을 갖는다.

  • 액터들은 상태를 공유하지 않는다.
  • 액터들 간의 통신은 메시지 전달을 통해서 이루어진다. (이벤트 기반 모델)
  • 액터간의 통신은 비동기로 이루어진다.
  • 각 액터는 전달받은 메시지를 큐에 보관하며, 메시지를 순차적으로 처리한다.
  • 액터는 일종의 경량 프로세서다.
위와 같은 특징은 병행 처리 코드를 보다 쉽게 구현할 수 있도록 도와준다. 실제로 다중 쓰레드 프로그래밍을 해 본 개발자 중에서는 올바르지 못한 동기화 처리로 쓰레드 블럭킹 되는 등의 문제로 고생한 경험이 한 두 번씩은 존재할 것이다. 액터는 애초에 데이터를 서로 공유하지 않는 것을 원칙으로 하기 때문에, 데드락이나 락에 대한 고민을 줄여주고 병행 처리 그 자체에 집중할 수 있도록 도와준다.

Akka는 액터 모델을 통해서 병행 처리를 쉽게 할 수 있도록 도와줄 뿐만 아니라, 리모트 노드에 존재하는 액터를 마치 로컬에 존재하는 액터처럼 사용할 수 있도록 해 주고 있다. 개발자는 통신 프로토콜에 대해 고민할 필요 없이 리모트 액터를 사용할 수 있기 때문에 분산 처리 코드를 손쉽게 작성할 수 있게 된다.

Akka는 스카라(Scala)와 자바(Java)의 두 언어에 대한 API를 제공하고 있는데, 필자가 스카라 언어 자체에 대해서는 아직 잘 모르고 국내에서도 스카라 언어에 대한 관심이 적은 관계로 자바 API를 기준으로 Akka 사용법을 살펴볼 것이다.

Akka 코어 사용

Akka를 사용하려면 http://akka.io 사이트에서 필요한 파일을 다운로드 받으면 되는데, Maven을 사용하고 있다면 pom.xml 파일에 다음과 같이 리포지토리 설정과 의존 설정을 추가해주면 Akka가 제공하는 액터 기능을 사용할 수 있다.

<repositories>
    <repository>
        <id>Akka</id>
        <name>Akka Maven2 Repository</name>
        <url>http://akka.io/repository/</url>
    </repository>

    <repository>
        <id>Multiverse</id>
        <name>Multiverse Maven2 Repository</name>
        <url>http://multiverse.googlecode.com/svn/maven-repository/releases/</url>
    </repository>

    <repository>
        <id>GuiceyFruit</id>
        <name>GuiceyFruit Maven2 Repository</name>
        <url>http://guiceyfruit.googlecode.com/svn/repo/releases/</url>
    </repository>

    <repository>
        <id>JBoss</id>
        <name>JBoss Maven2 Repository</name>
        <url>http://repository.jboss.org/nexus/content/groups/public/</url>
    </repository>
</repositories>

<dependencies>
    <dependency>
        <groupId>se.scalablesolutions.akka</groupId>
        <artifactId>akka-actor</artifactId>
        <version>1.0</version>
    </dependency>
</dependencies>

[주의]
Akka는 Scala로 만들어졌기 때문에, 이클립스에서 Akka 모듈을 사용할 때 코드 어시스트가 제대로 동작하려면 Scala-IDE 등을 이용해서 개발환경을 구축해 주어야 한다. 그렇지 않으면, 코드 어시스트가 안 되서 짜증나는 시간을 보내게 될 것이다.

액터 클래스 및 사용

액터를 생성하고 사용하려면, 먼저 Akka가 제공하는 기반 클래스를 이용해서 액터 역할을 수행할 클래스를 구현해 주어야 한다. Akka는 akka.actor.UntypedActor 클래스를 제공하고 있으며, 이 클래스를 상속받아서 액터를 구현할 수 있다.

import akka.actor.UntypedActor;

public class PrintActor extends UntypedActor {

    @Override
    public void onReceive(Object message) throws Exception {
        Thread.sleep(500); // 테스트를 위해 0.5초 sleep
        System.out.println(message);
    }

}

UntypedActor를 상속받은 클래스는 onReceive() 메서드를 구현해 주어야 하는데, onReceive() 메서드는 액터에 전달된 메시지를 처리하게 된다.

액터 클래스를 만들었다면, 액터를 생성한 뒤에 액터에 메시지를 전달할 수 있다. 액터를 생성할 때에는 akka.actor.Actors 클래스의 static 메서드인 actorOf() 메서드를 사용한다. Actors.actorOf() 메서드는 액터를 구현한 클래스의 Class를 전달받으며, 액터를 생성한다.

ActorRef actor = Actors.actorOf(PrintActor.class);
actor.start();
// actor를 이용해서 액터에 메시지를 전달

ActorRef의 start() 메서드는 액터를 시작하며, 액터가 시작된 이후부터 액터에 메시지를 전달할 수 있게 된다.

액터에 메시지 전달하기

Actors.actorOf()를 이용해서 액터를 생성했다면, 이후 ActorRef가 제공하는 메서드를 이용해서 액터에 메시지를 전달할 수 있다. 다음의 세 가지 방법으로 액터에 메시지를 전달할 수 있다.
  • Fire-And-Forget: 메시지를 전달하고 메시지에 대한 응답을 기다리지 않는다. 병행 및 확장에 적합한 메시지 전달 방식이다.
  • Send-And-Receive-Eventually: 메시지를 전달하고 응답을 받는다. 응답을 받을 때 까지 블록킹된다.
  • Send-And-Receive-Future: 메시지를 전달하고 응답을 받기 위한 Future를 리턴한다.
sendOneWay() 메서드를 이용한 Fire-And-Forget 방식 메시지 전달

ActorRef.sendOneWay() 메서드는 메시지를 액터에 전달할 때 사용된다. sendOneWay()라는 이름에서 알 수 있듯이 이 메서드는 액터로부터 어떤 값도 받지 않으며, 액터로부터 응답을 기다리지 않고 곧 바로 리턴한다.

ActorRef actor = Actors.actorOf(PrintActor.class);
actor.start();
actor.sendOneWay("받아라"); // actor에 "받아라" 메시지를 전달하고 바로 리턴.
actor.sendOneWay("받아라2");
actor.sendOneWay("받아라3");
System.out.println("비동기로 실행");

메시지를 받은 액터는 내부적으로 사용하는 큐에 메시지를 보관한 뒤, 차례대로 액터의 onReceive(Object message) 메서드에 메시지를 전달한다. PrintActor의 onReceive() 메서드는 0.5초후에 전달받은 메시지를 출력하고 sendOneWay()메서드는 응답 대기 없이 바로 리턴하므로, 위 코드가 실행되면 콘솔에는 다음과 같은 순서로 문자열이 출력된다.

비동기로 실행
받아라
받아라2
받아라3

sendOneWay() 메서드는 다음의 두 가지를 제공된다.
  • sendOneWay(Object message)
  • sendOneWay(Object message, ActorRef sender) : 메시지를 전송하면서 메시지를 보낸 액터로 sender를 지정한다.

sendRequestReply() 메서드를 이용한 Send-And-Receive-Eventually 방식 메시지 전달

ActorRef.sendRequestReply() 메서드는 액터에 메시지를 전달하고, 그 메시지에 대한 응답이 올 때 까지 대기하고 싶을 때 사용된다. 액터 구현 클래스는 getContext().replyUnsafe() 메서드를 이용해서 메시지에 대해 응답할 수 있는데, ActorRef.sendRequestReply() 메서드는 이 응답을 리턴하게 된다. 예를 들어, 다음과 같이 메시지에 대해 응답하는 액터가 있다고 하자.

public class PingActor extends UntypedActor {

    @Override
    public void onReceive(Object message) throws Exception {
        getContext().replyUnsafe("응답: "+ message); // 메시지 sender에 응답
    }

}

이 경우 다음과 같이 sendRequestReply() 메서드를 이용함으로써 액터에 전달한 메시지에 대한 응답이 도착할 때 까지 대기할 수 있다.

ActorRef actor = Actors.actorOf(PingActor.class);
actor.start();
Object res = actor.sendRequestReply("헬로우"); // 액터로부터 응답이 도착할 때 까지 대기

sendRequestReply() 메서드는 일정 시간 동안 액터로부터 응답이 없을 경우 akka.actor.ActorTimeoutException 예외를 발생시킨다. 별도 설정을 하지 않은 경우 기본 타입 시간은 5초이며, sendRequestReply() 메서드를 호출할 때 타임아웃을 지정할 수도 있다.

Object res = actor.sendRequestReply("헬로우", 1000, null); // 1초간 응답 대기

sendRequestReply() 메서드는 다음의 세 가지가 존재한다.
  • sendRequestReply(Object message)
  • sendRequestReply(Object message, ActorRef sender): 메시지를 보낸 액터로 sender를 지정한다.
  • sendRequestReply(Object message, long timeout, ActorRef sender)

sendRequestReplyFuture() 메서드를 이용한 Send-And-Receive-Future 방식 메시지 전달

sendRequestReplyFuture() 메서드는 메시지를 전달한 뒤 응답을 받기 위한 Future를 리턴한다. Future는 자바가 제공하는 Future가 아닌 Akka가 제공하는 akka.dispatch.Future 타입이다. Future는 주로 다음과 같은 형식으로 주로 사용된다.

Future future = actor.sendRequestReplyFuture("하이");
future.await(); // 응답을 대기. 대기 시간을 초과하면 예외 발생
if (future.isCompleted()) { // 완료되었다면
    Option resultOption = future.result(); // 응답 구함
    if (resultOption.isDefined()) { // 응답 데이터가 있다면,
        Object result = resultOption.get(); // 응답 데이터 구함
        System.out.println(result);
    }
}

sendRequestReplyFuture()가 리턴한 Future의 await() 메서드는 시간이 초과될 때 까지 대기한다. 시간이 초과되기 전에 응답이 도착하면 다음으로 넘어가고, 시간이 초과되면 ActorTimeoutException 예외를 발생시킨다.

sendRequestReplyFuture() 메서드는 다음의 세 가지가 존재한다.
  • sendRequestReplyFuture(Object message)
  • sendRequestReplyFuture(Object message, ActorRef sender) : 메시지를 보낸 액터로 sender를 지정한다.
  • sendRequestReplyFuture(Object message, long timeout, ActorRef sender)
두 개 이상의 액터에 메시지를 전달한 후 액터로부터의 응답이 모두 도착할 때 까지 대기해야 한다면, Futures.awaitAll(Future[] futures) 메서드를 사용하면 된다.

// actor들에 메시지 전달(작업 전달)
for (ActorRef actor : actors) {
    futureList.add(actor.sendRequestReplyFuture(someWork);
}
Future[] futures =  futureList.toArray();
Future.awaitAll(futures); // 모든 액터로부터 응답이 (작업 결과가) 올 때 까지 대기.
// futures로부터 응답 구해서 처리


액터에서 메시지 받아 처리하기

ActorRef의 send*() 메서드를 통해서 전달된 메시지는 UntypedActor 클랫를 상속받은 액터 구현 클래스의 onReceive(Object message)에 차례대로 전달된다. (TypedActor를 사용하면 인터페이스를 이용해서 메시지를 전달받을 메서드를 정의할 수도 있는데, 이에 대한 내용은 다음에 살펴볼 것이다.)

onReceive() 메서드는 다음과 같이 메시지의 타입을 확인한 뒤 메시지 타입에 맞는 동작을 수행하도록 구현하는 것이 보통이다.

public class ActorImpl extends UntypedActor {
    public void onReceive(Object message) throws Exception {
        if (message instanceof String) {
            // 메시지 처리
        }
        ...
    }
}


메시지에 응답하기

replyUnsafe()/replySafe()를 이용한 응답

액터 구현 클래스는 getContext()를 이용해서 해당 액터에 대한 ActorRef를 구할 수 있는데, ActorRef의 replyUnsafe() 또는 replySafe() 메서드를 이용해서 메시지에 대한 응답을 전달할 수 있다. replyUnsafe() 메서드는 응답 실패시 예외를 발생시키는 반면에 replySafe() 메서드는 응답에 실패할 경우 false를 리턴한다.

public class PingActor extends UntypedActor {

    public void onReceive(Object message) throws Exception {
        if (message.equals("ping")) {
            if (! getContext.replySafe("pong")) {
                // 실패에 대한 처리
            }
        }
    }
}

액터에 메시지를 전달할 때 sendRequestReply() 메서드나 sendRequestReplyFuture() 메서드를 사용한 경우, replyUnsafe()와 replySafe()를 이용해서 응답한 데이터를 리턴 값으로 받게 된다.
 
메시지를 전달한 액터에 메시지로 응답하기

액터가 다른 액터에게 메시지를 전달하기도 한다. 이때 sendRequestReply*() 메서드에 대한 응답이 아니라 메시지를 전달한 액터에 메시지를 전달하는 방법으로 응답할 수도 있을 것이다. 이렇게 메시지를 전달한 액터에 응답으로 메시지를 전송하고 싶다면, getContext().getSender() 메서드를 이용해서 메시지를 보낸 액터에 대한 ActorRef를 구한 뒤, 그 ActorRef의 send*() 메서드를 이용해서 응답을 전달하면 된다.

public class PongActor extends UntypedActor {

    @Override
    public void onReceive(Object message) throws Exception {
        if (message.equals("ping")) {
            if (getContext().getSender().isDefined()) {
                ActorRef sender = getContext().getSender().get();
                sender.sendOneWay("pong", getContext());
            } else {
                getContext().replyUnsafe("pong");
            }
        }
    }
}


액터의 라이프사이클

액터의 라이프 사이클은 다음과 같다.
  • NEW: 액터가 만들어졌을 때. 메시지를 수신하지 못한다.
  • STARTED: start()가 호출되었을 때. 메시지를 수신할 수 있다.
  • SHUTDOWN: exit()나 stop()이 호출되었을 때. 어떤 것도 하지 못한다.
ActorRef는 start(), stop() 메서드를 제공하고 있으며, 이들 메서드를 이용해서 액터를 시작하고, 중지할 수 있다. 아래 코드는 전형적인 액터의 사용방법을 보여주고 있다.

actor.start(); // 액터를 시작
// 필요한 만큼 액터에 메시지 전달
actor.sendOneWay(msg);
...
actor.stop(); // 액터 종료

start() 메서드는 액터와 메시지 큐를 시작하고, stop() 메서드는 액터의 디스패처와 메시지 큐를 포함한 액터를 종료시킨다.

모든 액터를 종료시키고 싶다면, 다음과 같은 코드를 사용하면 된다.

Actors.registry().shtudownAll();


UntypedActor 클래스의 라이프 사이클 관련 콜백 메서드

UntypedActor 클래스는 라이프 사이클과 관련해서 시작/중지 이벤트를 처리할 수 있는 콜백 메서드를 제공하고 있다.
  • preStart(): 액터 시작 전에 호출된다.
  • postStop(): 액터 종료 후에 호출된다.
  • preRestart(Throwable reason): 액터 재시작 전에 호출된다. (무정지 액터 기능과 관련됨)
  • postRestart(Throwable reason): 액터 재시작 후에 호출된다. (무정지 액터 기능과 관련됨)

설정 파일 지정 및 기본 값

Akka는 다음의 세 가지 방법 중 한가지를 이용해서 설정 파일을 찾는다.
  • akka.config 시스템 프로퍼티로 지정한 파일 (java -Dakka.config=... )
  • 클래스패스에 위치한 akka.config 파일
  • AKKA_HOME 환경변수 존재 시, '$AKKA_HOME/config 디렉터리의 설정 파일 사용. (또는 akka.home 시스템 프로퍼티를 AKKA_HOME 환경 변수 대신 사용)
각 설정 정보 및 기본 값은 http://doc.akka.io/configuration 참고하기 바란다.

참고자료

Posted by 최범균 madvirus

댓글을 달아 주세요

  1. 고광현 2016.10.25 17:41 신고  댓글주소  수정/삭제  댓글쓰기

    메시지에 응답하기
    replyUnsafe()/replySafe()를 이용한 응답 예제에서 실패에 대한처리 부분에 질문이 있는데요
    if (! getContext.replySafe("pong")) {

    실패시 처리인데 아직 해당 actor가 작업을 마치지 않았늗네 결과를 알수가 있나요?

    • 최범균 madvirus 2016.10.27 08:13 신고  댓글주소  수정/삭제

      제가 akka 내부를 잘 모르지만, reply라는 게 메시지를 보내는 용도니 메시지 보내는데 실패했다를 의미합니다.
      메시지를 수신한 것과, 수신한 메시지를 처리하는 건 구별된다고 알고 있습니다.