저작권 안내: 저작권자표시 Yes 상업적이용 No 컨텐츠변경 No

스프링5 입문

JSP 2.3

JPA 입문

DDD Start

인프런 객체 지향 입문 강의

java에서 동영상의 스틸컷을 추출하기 위해 ffmpeg을 Runtime.exec()로 실행하는데, ffmpeg이 실행이 종료되지 않고 뭄추는 현상이 발생했다. 확인해 본 결과 ffmpeg이 쏫아내는 에러 출력 메시지 때문이었다. Runtime.exec()로 ffmpeg Processor를 생성한 뒤에 아래 코드와 같이 에러 출력 스트림으로부터 데이터를 읽어오기만 하면 블록킹 없이 ffmpeg이 실행된다.


public File extractImage(File videoFile, int position,

File creatingImageFile) {

try {

int seconds = position % 60;

int minutes = (position - seconds) / 60;

int hours = (position - minutes * 60 - seconds) / 60 / 60;


String videoFilePath = videoFile.getAbsolutePath();

String imageFilePath = creatingImageFile.getAbsolutePath();


String[] commands = { "ffmpeg", "-ss",

String.format("%02d:%02d:%02d", hours, minutes, seconds),

"-i", videoFilePath, "-an", "-vframes", "1", "-y",

imageFilePath };


Process processor = Runtime.getRuntime().exec(commands);


String line1 = null;

BufferedReader error = new BufferedReader(new InputStreamReader(

processor.getErrorStream()));

while ((line1 = error.readLine()) != null) {

logger.debug(line1);

}

processor.waitFor();

int exitValue = processor.exitValue();

if (exitValue != 0) {

throw new RuntimeException("exit code is not 0 [" + exitValue

+ "]");

}

return creatingImageFile;

} catch (IOException e) {

throw new RuntimeException(e);

} catch (InterruptedException e) {

throw new RuntimeException(e);

}

}


참고로, 위 코드는 동영상으로부터 특정 시점의 썸네일 이미지를 추출하는 코드이다.


Posted by 최범균 madvirus

댓글을 달아 주세요

  1. 2013.04.24 02:46  댓글주소  수정/삭제  댓글쓰기

    비밀댓글입니다

많은 입문서에서 캡슐화(encapsulation)를 단순히 private을 사용해서 필드를 외부에 감추는 것 정도로 소개하고 넘어가는 경우가 많아, 많은 입문자들에게 오해를 심어주는 것 같다. 그래서 집필 중인 자바 프로그래밍 입문서에서 캡슐화와 관련된 부분을 블로그를 통해 공개하고자 한다. 아래 내용부터는 (아직 집필중인) 책의 캡슐화 부분을 발체한 내용이다.

* 본 문서는 책의 일부 내용을 발췌한 것으로서 온오프라인 상의 무단 배포를 금합니다.

캡슐화(encapsulation)

지금까지 자바에서 클래스를 만드는 방법을 살펴봤다. 필드를 이용해서 객체의 상태를 보관하고, 메서드를 이용해서 기능을 구현하고, 생성자를 이용해서 객체를 생성하는 방법을 공부 했다. 또한, public, protected, private을 이용한 접근 제어 방법도 살펴봤다.

그럼, 왜 클래스를 이용하는 걸까? C 언어의 구조체와 함수를 이용해도 자바의 클래스와 비슷한 구현이 가능할 것 같은데 말이다. 클래스를 사용하는 이유는 객체 지향 방식으로 구현하기 위해서이다. 자바는 클래스를 이용해서 객체를 표현하고 있으며, 자바 언어로 객체 지향을 잘 하기 위해서는 객체 지향 원칙에 따라 클래스를 작성해 주어야 한다.

그렇다면, 객체 지향의 가장 기본이 되는 원칙은 무엇일까? 바로 이 절의 제목인 '캡슐화(encapsulation)'가 객체 지향의 가장 기본이 되는 대원칙이다. 본 절에서는 완벽하게는 아니어도 조금이나마 독자가 캡슐화가 무엇이고 왜 캡슐화가 중요한 지 느낄 수 있도록 할 것이다.

흔히 객체 지향과 반대되는 개념으로 절차 지향을 언급하는데, 본 절에서는 아주 간단한 스톱워치 기능을 구현하는 과정을 통해서 절차 지향의 단점을 파악하고 캡슐화를 통해 어떻게 이 문제를 해결할 수 있는 지 살펴 볼 것이다.

절차 지향 방식의 구현

성능 측정 어플리케이션을 개발해서 고객에게 납품하기로 했다고 하자. 성능 측정을 위해 필요한 공통 기능은 실행 시간을 측정하는 것이다. 그래서 시간 데이터를 표현하기 위해 다음과 같이 밀리초(1/1000초) 단위로 시간을 보관할 수 있는 클래스를 작성하였다.

package ch03.util;

public class ProceduralStopWatch {

    public long startTime; // 밀리초(1/1000초) 단위
    public long stopTime; // 1/1000초 단위

    public long getElapsedTime() {
        return stopTime - startTime;
    }
}

ProceduralStopWatch 클래스는 시작 시간과 종료 시간을 저장할 수 있는 두 개의 public 필드를 제공하고 있다. 성능 측정 모듈에서 종료 시간과 시작 시간 사이의 차이를 구하는 코드가 많기 때문에, 흘러간 시간을 쉽게 구할 수 있도록 getElapsedTime() 메서드를 추가로 구현하였다.

이제 ProceduralStopWatch 클래스를 사용하는 코드는 다음과 같은 방식으로 작성될 것이다.

ProceduralStopWatch stopWatch = new ProceduralStopWatch();
stopWatch.startTime = System.currentTimeMillis(); // 시작 시간 설정

// 측정 대상 기능 실행

stopWatch.stopTime = System.currentTimeMillis(); // 종료 시간 설정
long elapsedTime = stopWatch.getElapsedTime(); // 시간 차이 구함

수 개월을 문제 없이 개발해 나갔다. 그런데, 늘 그렇듯이 고객으로부터 요구 사항을 변경하자는 연락이 왔다. 필요에 따라 밀리초(1/1000초) 보다 더 세밀한 단위로 측정할 필요가 있기 때문에 밀리초뿐만 아니라 더 세밀한 단위로도 측정할 수 있게 해 달라는 것이었다.

아주 간단한 방법은 다음과 같이 나노초 단위로 측정이 필요한 코드에서만 startTime 필드와 stopTime 필드에 나노초 단위로 시간을 저장하는 것이다.

ProceduralStopWatch stopWatch = new ProceduralStopWatch();
stopWatch.startTime = System.nanoTime(); // 시작 시간 나노초 단위 설정

// 측정 대상 기능 실행

stopWatch.stopTime = System.nanoTime(); // 종료 시간 나노초 단위 설정
long elapsedTime = stopWatch.getElapsedTime(); // 시간 차이 나노초 단위로 구함

하지만, startTime 필드에 어떤 경우에는 밀리초 단위의 값을 보관하고 어떤 경우에는 나노초 단위의 값을 보관하게 되면, 개발 과정뿐만 아니라 유지보수 과정에서 문제가 발생할 것 같다. 이미 개발자 중 몇 명은 기준 없이 밀리초와 나노초를 섞어서 사용하기 시작했다.

그래서, 나노초 단위를 저장할 수 있도록 ProceduralStopWatch 클래스에 startNanoTime, stopNanoTime의 두 필드와 getElapsedNanoTime() 메서드를 추가하기로 결심했다.

package ch03.util;

public class ProceduralStopWatch {

    public long startTime;
    public long stopTime;
   
    public long startNanoTime;
    public long stopNanoTime;

    public long getElapsedTime() {
        return stopTime - startTime;
    }
   
    public long getElapsedNanoTime() {
        return stopNanoTime - startNanoTime;
    }
}

이제 나노초 단위를 이용해서 시간을 측정해야 하는 코드를 다음과 같이 변경함으로써, 시간 단위가 나노초 임을 분명히 할 수 있게 되었다.

ProceduralStopWatch stopWatch = new ProceduralStopWatch();
stopWatch.startNanoTime = System.nanoTime(); // 시작 시간 나노초 단위 설정

// 측정 대상 기능 실행

stopWatch.stopNanoTime = System.nanoTime(); // 종료 시간 나노초 단위 설정
long elapsedTime = stopWatch.getElapsedNanoTime(); // 시간 차이 나노초 단위로 구함

일단, 급한 불은 껐다. 그런데, 여전히 불안함이 떠나질 않는다. 만약 고객이 초 단위로 값을 구해 달라는 요구가 추가되면 어떻게 해야 하나? 또는 같은 시간을 초 단위, 밀리초 단위, 나노초 단위로 표현해 달라고 하면 어떻게 해야 하나? 아마 코드는 점점 복잡해지고 요구사항이 추가되거나 변경될 때마다 함께 수정되는 코드도 많아질 것이다.

자, 그러면 왜 이런 일이 벌어졌을까? 가장 큰 이유는 데이터를 중심으로 개발했기 때문이다. (데이터를 중심으로 개발하는 것이 전형적인 절차 지향 방식 개발이다.)

[그림3.18] 데이터를 중심으로 개발되는 절차 지향 방식


ProceduralStopWatch 객체를 사용하는 코드는 객체의 필드에 직접 값을 할당하고 값을 가져올 수 있다. 편의상 getElapsedTime() 메서드를 만들었지만, 마음대로 필드의 값을 조작할 수 있다. ProceduralStopWatch 객체의 내부인 startTime 필드와 stopTime 필드를 마음대로 접근할 수 있기 때문에 많은 코드들이 직접 ProceduralStopWatch 객체의 필드 데이터를 조작할 수 있으며, 이로 인해 ProceduralStopWatch의 내부 코드를 변경하게 되면 많은 코드의 변화를 유발하게 된다.

[그림3.19] 절차 지향에서는 데이터의 변화가 많은 코드의 변화를 유발시킴

요구 사항의 변화가 없다면 또는 추가되지 않는다면, 데이터를 중심으로 개발하는 게 문제가 되지 않는다. 하지만, 거의 모든 프로젝트에서 요구사항은 추가되거나 변경되기 마련이며, 데이터를 중심으로 개발할 경우 데이터의 변화는 많은 코드에 수정을 발생시키는 요인이 된다.

그렇다면 어떻게 해야 ProceduralStopWatch 클래스를 사용하는 코드에 영향을 주지 않으면서 (또는 영향을 최소화하면서) ProceduralStopWatch 클래스의 내부 데이터 구조를 변경할 수 있을까? 정답은 바로 ProceduralStopWatch 클래스의 기능을 캡슐화하는 것이다.

객체 지향 방식의 구현

스톱워치 예를 객체 지향 방식으로 재구성해 보겠다. 객체 지향의 핵심 중의 핵심은 캡슐화에 있다. 캡슐화는 자세한 내부 구현을 외부에 드러내지 않고 숨기는 것이다. 캡슐화를 하게 되면 내부에 데이터를 어떻게 저장하는 지, 그 데이터를 어떻게 처리하는 지, 또는 특정 기능을 어떻게 제공하는 지에 대한 내용은 드러내지 않는다. 단지, 객체가 어떤 기능을 제공하는 지만 공유하게 된다.

예를 들어, 스톱워치 예의 경우 시간 데이터를 어떻게 구하는 지 또는 어떤 타입으로 저장하는 지 등의 구현은 외부로 드러내지 않고 다음의 기능만 외부에 제공하게 된다.
  • 스톱워치를 시작한다
  • 스톱워치를 중지한다.
  •  중지와 시작 사이의 시간 차이를 구한다.
스톱워치 기능을 제공하는 클래스를 캡슐화해서 다시 구현해 보자. 아래 코드는 객체 지향 방식으로 새롭게 구현한 스톱워치 클래스이다.

package ch03.util;

public class StopWatch {

    private long startTime;
    private long stopTime;

    public void start() {
        startTime = System.currentTimeMillis();
    }

    public void stop() {
        stopTime = System.currentTimeMillis();
    }

    public Time getElapsedTime() {
        return new Time(stopTime - startTime);
    }
}

StopWatch 클래스는 시작 시간과 종료 시간을 보관하기 위해 startTime 필드와 stopTime 필드를 사용하는데, 이 두 필드는 private 이다. 따라서, StopWatch 클래스를 제외한 다른 코드에서는 이 두 필드에 직접 접근할 수 없다.

start() 메서드는 startTime 필드에 현재 시간 값을 밀리초 단위로 저장한다. stop() 메서드도 유사하게 stopTime 필드에 밀리초 단위로 현재 시간 값을 저장한다. 즉, 스톱워치의 시작과 중지를 처리하려면 이 두 메서드를 호출해야 한다.

getElapsedTime()은 long이 아닌 Time이라는 클래스 타입을 리턴한다. Time은 시간을 표현하기 위해 만든 클래스로서 아래와 같다.

package ch03.util;

public class Time {

    private long t;
   
    public Time(long t) {
        this.t = t;
    }

    public long getMilliTime() {
        return t;
    }
}

StopWatch 클래스를 사용해서 시간 차를 구하는 코드는 다음과 같을 것이다.

StopWatch stopWatch = new StopWatch();
stopWatch.start(); // startTime 필드에 값을 할당하는 게 아닌, 기능 실행
// 코드
stopWatch.stop(); // stopTime 필드에 값을 할당하는 게 아닌, 기능 실행
Time time = stopWatch.getElapsedTime(); // long 타입이 아닌 시간을 표현하는 타입
// time.getMilliTime() 사용

앞서 절차 지향 방식에서는 필드에 직접 접근해서 필드 값을 변경했던 것에 반해 객체 지향 방식에서는 상세한 구현이 start() 메서드와 stop() 메서드로 캡슐화 되어 있다. StopWatch 클래스를 사용하는 코드는 start() 메서드가 내부적으로 어떻게 시작 시간을 저장하는 지에 대해서는 알 필요 없이, start() 메서드가 스톱워치의 시작 기능을 제공한다는 것만 알면 된다.

스톱워치를 중지한 뒤 흘러간 시간을 구할 때 사용된 리턴 타입은 long이 아닌 Time 클래스이다. long이 아닌 Time 클래스를 사용한 이유는 long은 시간을 표현하는 데 적합하지 않기 때문이다. 그래서 시간을 표현하기 위해 Time 클래스를 추가로 만들었고, Time 클래스로부터 필요한 값을 구하도록 했다.

이제 고객으로부터 새로운 요구사항을 받을 차례가 됐다. 절차 지향 방식을 설명할 때와 동일하게 나노초 단위로도 시간 차이 값을 구할 수 있어야 된다고 한다. 절차 지향 방식에서 ProceduralStopWatch 클래스를 사용하는 수 많은 코드를 변경해야 했던 기억이 새록 새록 떠오를 것이다. 하지만, 걱정하지 마라. 우리는 이미 스톱워치와 시간을 캡슐화하는 데 성공했고, StopWatch 클래스와 Time 클래스를 사용하는 코드에 어떤 영향도 주지 않고 이 두 클래스를 변경할 수 있다.

먼저, StopWatch 클래스가 내부적으로 나노초를 저장하도록 변경해보자. 다음과 같이 밀리초를 가져오는 코드를 나노초를 가져오는 코드로 변경했다.

package ch03.util;

public class StopWatch {

    private long startTime;
    private long stopTime;

    public void start() {
        startTime = System.nanoTime();
    }

    public void stop() {
        stopTime = System.nanoTime();
    }

    public Time getElapsedTime() {
        return new Time(stopTime - startTime);
    }
}

Time 클래스가 생성자에서 입력받는 값이 밀리초에서 나노초로 변경되었으므로 Time 클래스를 다음과 같이 변경하였다. t 필드가 저장하는 값의 단위가 나노초이므로 getMilliTime() 메서드의 구현이 일부 변경되었고, getNanoTime()메서드가 새로 추가되었다.

package ch03.util;

public class Time {

    private long t;
   
    public Time(long t) {
        this.t = t;
    }

    public long getMilliTime() {
        return t / 1000000L;
    }

    public long getNanoTime() {
        return t;
    }
}

나노초 단위를 수용할 수 있도록 StopWatch 클래스와 Time 클래스를 변경하였다. 이제 이두 클래스를 사용하던 코드가 나노초를 사용하도록 변경할 차례이다. 바꿔보자.

StopWatch stopWatch = new StopWatch();
stopWatch.start();
// 코드
stopWatch.stop();
Time time = stopWatch.getElapsedTime();
// time.getNanoTime() 사용

어떤가? 나노초 단위로 시간 차이를 구하기 위해서 변경한 코드는 Time 클래스의 getMilliTime() 메서드 대신 getNanoTime() 메서드를 사용하도록 변경한 것뿐이다. 나머지 코드는 하나도 변경되지 않았다.

고객으로부터 요구사항이 또 들어왔다. 초 단위로도 시간 차이를 표현하고 싶단다. 이건 아주 쉽다. 왜냐면 우리는 Time 클래스를 이용해서 시간을 표현하고 있기 때문이다. 다음과 같이 Time 클래스에 메서드 하나만 더 추가해주면 끝이다.

package ch03.util;

public class Time {
    ...
    public double getSecondTime() {
        return (double)t / (double)1000000000;
    }
}

자세한 내부 구현을 숨기고 외부에는 기능만을 제공하도록 StopWatch 클래스를 캡슐화 하였다. 그리고, 스톱워치의 요구 사항이 변경되었지만 StopWatch 클래스를 잘 캡슐화 한 덕분에 StopWatch 클래스를 사용하는 코드에 거의 영향을 주지 않고 StopWatch 클래스를 변경할 수 있었다.

이게 바로 캡슐화의 힘이다. 내부 구현(특히 데이터)을 외부에 노출하지 않고 기능을 잘 캡슐화하게 되면, 해당 기능을 변경해야 하는 상황이 발생할 경우 특정 클래스로만 변화가 수렴되는 특징을 갖게 된다. 변화가 여러 클래스로 확산되지 않기 때문에 그 만큼 캡슐화 된 클래스의 수정은 용이하게 된다.

[그림3.20] 캡슐화는 요구사항 변화에 따른 코드 수정 범위를 최소화 해 준다.

실제로 다수의 소프트웨어는 개발 과정뿐만 아니라 개발이 완료된 이후에도 요구사항의 추가, 버그 수정 등으로 코드를 수정하게 된다. 만약 절차 지향으로 개발했다면 이런 수정 과정에서 변경되는 코드의 범위는 점점 커지게 되고 시간이 흐를수록 변화의 폭은 더욱 증폭된다. 따라서 절차 지향 방식에서 무엇인가를 변경하는 것은 개발자 입장에서 많은 위험과 어려움이 따르는 작업이 된다.

반면에 객체 지향 방식으로 개발했다면 변화의 범위는 소수의 클래스로 한정되는 경향이 있으며, 따라서 새로운 요구 사항을 (절차 지향에 비해) 쉽고 빠르게 수용할 수 있게 된다. 우리는 이미 StopWatch 예제에서 이런 특징을 확인할 수 있었다.

Posted by 최범균 madvirus

댓글을 달아 주세요

  1. 권남 2011.07.15 11:45 신고  댓글주소  수정/삭제  댓글쓰기

    멋진 설명 잘 읽었습니다. 책이 기대되네요.

  2. 2011.07.15 22:54  댓글주소  수정/삭제  댓글쓰기

    비밀댓글입니다

  3. richpapa 2011.07.18 11:01 신고  댓글주소  수정/삭제  댓글쓰기

    기대됩니다. 언제 나오나요?

  4. 백명석 2011.08.04 16:41 신고  댓글주소  수정/삭제  댓글쓰기

    근데 Time 클래스에 요구사항이 늘어감에 따라 메소드가 추가되는 것이 보기 않 좋네.
    방법이 있지 않나 ?
    하나 또 오르는 방법이 있는데... 어찌 생각하나 ?

    • 최범균 madvirus 2011.08.05 08:50 신고  댓글주소  수정/삭제

      메서드가 증가가 많지 않을 것 같긴 하지만, 메서드가 계속 증가한다면 다음과 같은 메서드를 추가하면 어떨까 합니다.

      long elapsedTime(TimeUnit unit)

      이 경우에는 StopWatch 클래스에서 바로 위 클래스를 넣어도 될 듯 합니다. Time 클래스에 넣어야 한다면

      long value(TimeUnit unit) 정도의 이름이 좋을 것 같구요.

  5. 백명석 2011.08.05 09:14 신고  댓글주소  수정/삭제  댓글쓰기

    아니면 Visitor Pattern으로 원하는 timeUnit 별로 elapse Time을 계산하는 객체를 전달하여 계산하도록 하면 어떨까 ?
    long getElapsedTime(ElapsedTimeCalculateVisitor visitor) 이런 식으로.
    구조체는 새로운 타입 추가시 많은 변경을 유발하지만, 새로운 기능 추가시에는 해당 기능만 추가하면 되고,
    클래스는 polymorphism을 활용하여 변경 없이 새로운 타입 추가할 수 있지만, 기존 클래스에 새로운 기능 추가시에는 해당 클래스의 모든 클라이언트와 서브클래스까지 변경되어야 하는 재앙이 발생한다.
    그런데 이 같은 클래스의 문제는 Visitor로 해소할 수 있지 않을가 싶다 ^^

    • 최범균 madvirus 2011.08.05 09:40 신고  댓글주소  수정/삭제

      너무 많이 가시는 거 같아요. 저도 Visitor와 같은 double dispatch 방식을 생각해보긴 했는데, Visitor가 결국 StopWatch의 내장(시간 저장하기 위한 타입)을 알게 되는 상황이 발생해서 고민이 좀 되더라구요.

      그래서 Time을 좀 더 추상화해서 Time 객체가 크기 비교를 할 수 있는 기능을 추가하고, Time이 스스로 자기를 표현할 수 있도록 구현해주는 것도 생각해 볼 만 할 것 같아요.

      그런데요,,, 이 논의 거기 팀원들하고 마저 하심 안 되요? 제가 어제 술독에 빠졌다가 나와서 정신이 없어요.

  6. 백명석 2011.08.05 10:09 신고  댓글주소  수정/삭제  댓글쓰기

    ㅎㅎㅎ 이런 논의가 관심있는 많은 분들과 공유될 수 있었으면 좋겠다. 몸 추스려 ^^

  7. Kunny 2012.04.04 15:44 신고  댓글주소  수정/삭제  댓글쓰기

    디자인 패턴 책을 읽던중 패턴의 원칙중 하나는 변화하는것을 캡슐화한다고 적혀있었는데 도대체 뭐가 캡슐화인지 감이 안오던 중에 ..이 글을 보고 어렴풋이 이해하고 갑니다.

    사실 4일째 한번씩 보는중.

  8. 넘고 2012.08.21 15:52 신고  댓글주소  수정/삭제  댓글쓰기

    맨 처음 말씀하신대로 캡슐화에 대해 데이터를 숨기는 정도로만 알고 있었는데...
    정말 좋은글 감사합니다!

  9. 황윤성 2016.03.11 23:02 신고  댓글주소  수정/삭제  댓글쓰기

    읽고 나니까 제가 캡슐화에 대해서 잘 모르고 있었다고 깨달았어요

    공부 중인 학생인데 감사합니다!

  10. 맹구 2018.05.30 15:43 신고  댓글주소  수정/삭제  댓글쓰기

    과거에 캡슐화를 쓰지 않은 저를 용서할수 없게되었습니다...

    좋은 글 올려주셔서 감사합니다.

특정 노드에 갑자기 장애가 발생했다. 이럴 때 가장 먼저 개발자들이 선택하는 장애 대처 방법은? 아마도 관련 프로세스를 재시작하는 방법이 가장 많이 사용될 것이다. 웹서버를 재시작하거나 로그 수신기를 재시작하는 등 뭔가 문제가 발생한 영역의 프로세스나 쓰레드를 재시작함으로써 서비스 다운타임을 줄이는 것이 장애 발생 시 첫 번째로 수행하는 작업이다. 일단, 재시작해서 서비스가 살아나도록 만든 뒤, 그 다음에 원인 분석을 하게 된다.

Akka도 이와 비슷한 방법으로 액터의 장애에 대응할 수 있는 방법을 제공하고 있다. 액터는 자신을 관리하는 Supervisor를 가질 수 있으며, Supervisor는 액터가 다운될 경우 재시작함으로써 다운타임을 최소화하도록 해 준다. 이 방식은 얼랭(erlang)으로부터 빌려온 방식으로서 Akka는 Supervisor를 통해 무정지 서비스를 구현할 수 있도록 하고 있다.

액터의 두 가지 라이프 사이클: permanent, temporary

액터는 다음의 두 가지 라이프 사이클을 가진다.
  • permanent: 메시지 처리 과정에서 예외가 발생해도 액터가 살아 있음.
  • temporary: 메시지 처리 과정에서 예외가 발생하면 액터가 죽음.
액터를 permanent로 설정할 지 temporary로 설정할 지의 여부는 다음과 같이 설정할 수 있다.

import akka.config.Supervision;

public class WorkerActor extends UntypedActor {
    public WorkerActor() {
        getContext().setLifeCycle(Supervision.temporary());
        ...
    }


Akka의 액터 예외 대응 방식: Let it Crash

다중 쓰레드를 이용해서 병행 처리 코드를 작성할 경우, 병행 처리 코드에서 예외가 발생했을 때 이를 알 수 있는 방법은 예외 추적 메시지를 확인하는 방법 뿐이다. (또는 try-catch로 모든 예외를 잡아서 알림해 주는 기능을 넣는 방법 뿐이다.) 예외가 발생해서 병행 처리 쓰레드가 종료된 경우 이를 복구하는 방법은 재시작해주는 것 외에 특별한 방법이 없다.

Akka는 액터가 더 이상 정상적으로 동작할 수 없는 상태가 되어 메시지 처리 과정 중 예외를 발생시키면, 해당 액터를 재시작하는 방법으로 장애에 대응한다. 복구할 수 없는 예외 상황 발생시 액터가 뭔가 하지 않고 그냥 죽도록 놔두고 안정된 상태로 초기화하고 재시작하기 때문에, 이 방식을 "Let it Crash"라고 부른다.

수버바이저를 이용한 액터 관리

Akka는 수퍼바이저를 이용해서 액터를 관리한다. 수퍼바이저는 다른 액터를 모니터링하는 액터로서, 수퍼바이저 액터에서 다른 액터를 연결(link)함으로써 수퍼바이저가 다른 액터를 관리하게 된다.

수퍼바이저는 연결된 액터가 죽었을 때 다음의 두 가지 방식을 이용해서 연결된 액터를 재시작한다. 참고로 permanent 모드의 액터만 재시작되며, temporary 액터는 재시작되지 않는다.
  • One-For-One
  • All-For-One
One-For-One은 수퍼바이저와 연결된 액터가 죽으면, 죽은 액터만 재시작하고 나머지 연결된 액터는 그대로 유지한다. (아래 그림 참고)


[발췌: http://doc.akka.io/fault-tolerance-java]

반면 All-For-One은 수퍼바이저와 연결된 액터 중 하나가 죽으면, 연결된 모든 액터를 재시작한다. (아래 그림 참고) 이는 수퍼바이저에 의해 관리되는 액터 중 하나로도 비정상적으로 동작하면 나머지 액터들도 영향을 받아서 비정상적으로 동작하게 될 때에 사용된다.


[발췌: http://doc.akka.io/fault-tolerance-java]

수퍼바이저(Supervisor) 액터 만들기

수퍼바이저 액터는 일반 액터와 동일한 액터로서, 다음의 두 가지 방법을 이용해서 만들 수 있다.
  • link() 메서드를 이용
  • Supervisor 클래스를 이용해서 생성

link()를 이용한 액터 연결 및 관리

액터는 다른 액터를 연결함으로써 수퍼바이저 액터가 될 수 있으며, 연결할 때에는 link() 메서드를 사용한다. link()를 이용해서 액터를 관리할 경우 다음과 같은 방법으로 개발을 진행하면 된다.
  1. 수퍼바이저 액터로 동작할 클래스의 생성자에 FaultHandler를 지정한다. FaultHandler는 관리되는 액터가 죽었을 때, 그 액터만 재시작할 지 아니면 관리되는 모든 액터를 재시작할 지의 여부를 지정한다.
  2. 수퍼바이저 액터를 생성한 뒤, 관리할 액터를 link()로 연결한다.

1번, 수퍼바이저 액터를 직접 구현할 경우 다음과 같이 수퍼바이저 액터 생성자에서 재시작 전략을 지정해 주어야 한다.

import akka.actor.UntypedActor;
import akka.config.Supervision.OneForOneStrategy;

public class MasterActor extends UntypedActor {

    public MasterActor() {
        getContext().setFaultHandler(
                new OneForOneStrategy(
                        new Class[] { RuntimeException.class }, 3, 1000));
    }

    @Override
    public void onReceive(Object message) throws Exception {
        System.out.println("Master가 받은 메시지: " + message);
    }

}

위 코드에서 MasterActor는 관리하는 액터가 죽으면 해당 액터만 재시작하도록 설정하였다. OneForOneStrategy 객체를 생성할 때 첫 번째 파라미터는 액터를 재시작할 예외 타입을 지정한다. 위 코드는 모니터링 대상 액터의 onReceive() 메서드에서 RuntimeException이 발생하면 액터를 재시작한다는 것을 의미한다. 뒤의 두 숫자에 대해서는 뒤에서 다시 설명하겠다.

관리되는 액터가 죽을 때 관리되는 다른 액터들도 함께 재시작하고 싶은 경우에는 AllForOneStrategy 클래스를 사용하면 된다. 생성자에 전달되는 파라미터 목록은 OneForOneStrategy 클래스와 동일하다.

2번, 수퍼바이저 액터를 알맞게 구현했다면 그 다음으로 할 작업은 link() 메서드를 이용해서 수퍼바이저에 관리할 액터를 연결해 주는 것이다. 아래 코드는 예를 보여주고 있다.

ActorRef master = Actors.actorOf(MasterActor.class);
master.start();

ActorRef worker1 = Actors.actorOf(WorkerActor.class);
worker1.start();

master.link(worker1);

테스트를 위해 WorkerActor가 "die"라는 메시지를 받으면 RuntimeException을 발생시키도록 구현해 보았다.

@SuppressWarnings("unchecked")
public class WorkerActor extends UntypedActor {
    private static int num = 1;
   
    private int id;
    public WorkerActor() {
        id = num++;
        System.out.println("액터 생성됨: " + id);
    }
   
    @Override
    public void onReceive(Object message) throws Exception {
        if (message.equals("die")) {
            throw new RuntimeException("고의적 DIE");
        }
        System.out.println("Worker " + id + ": " + message);
    }
   
    @Override
    public void preRestart(Throwable cause) {
        System.out.println("Worker " + id + ": 재시작 전처리");
    }
   
    @Override
    public void postRestart(Throwable cause) {
        System.out.println("Worker " + id + ": 재시작 후처리");
    }
   
}

WorkerActor는 preRestart() 메서드와 postRestart() 메서드를 구현하고 있는데, 이 두 메서드는 각각 액터가 재시작하기 전/후에 호출된다. WorkerActor가 생성될 때 마다 1씩 증가된 id 값을 할당하는데 id 값을 새로 부여한 이유는 액터가 재시작할 때 액터 객체를 새로 생성하는 지의 여부를 확인하기 위해서다.

ActorRef master = Actors.actorOf(MasterActor.class);
master.start();

ActorRef worker1 = Actors.actorOf(WorkerActor.class);
worker1.start();
ActorRef worker2 = Actors.actorOf(WorkerActor.class);
worker2.start();

master.link(worker1); // master에 worker1 액터 연결
master.link(worker2); // master에 worker2 액터 연결

worker1.sendOneWay("메시지1-1");
worker2.sendOneWay("메시지2-1");
worker1.sendOneWay("메시지1-2");
worker2.sendOneWay("메시지2-2");

worker1.sendOneWay("die"); // worker1 액터 죽임!
worker1.sendOneWay("메시지1-3"); // worker1 액터에 메시지 전달
worker2.sendOneWay("메시지2-3");

위 코드는 중간에 worker1에 "die" 메시지를 보냄으로써 worker1을 죽인다. worker1 액터는 "die" 메시지를 받으면 RuntimeException을 발생시키는데, MasterWorker는 RuntimeException이 발생할 경우 해당 액터를 재시작하라고 설정하고 있다. 따라서, worker1 액터는 "die" 메시지를 받는 순간 RuntimeException을 발생시키며 죽지만 곧이어 재시작하게 되고, 따라서 죽은 이후에 받은 "메시지1-3" 메시지를 재시작한 액터가 처리하게 된다.

실제 위 코드의 실행 결과는 다음과 같다. (Akka가 출력하는 로그 메시지 중 중요한 것만 남기고 나머지는 생략하였다.)

액터 생성됨: 1
액터 생성됨: 2
16:43:40.843 [main] DEBUG akka.actor.Actor$ - Linking actor [Actor[tuto3.WorkerActor:46f67fb0-506a-11e0-a0e5-001d92ad4c1a]] to actor [Actor[tuto3.MasterActor:46f1c4c0-506a-11e0-a0e5-001d92ad4c1a]]
16:43:40.843 [main] DEBUG akka.actor.Actor$ - Linking actor [Actor[tuto3.WorkerActor:46f67fb1-506a-11e0-a0e5-001d92ad4c1a]] to actor [Actor[tuto3.MasterActor:46f1c4c0-506a-11e0-a0e5-001d92ad4c1a]]
Worker 1: 메시지1-1
Worker 1: 메시지1-2
16:43:40.875 [akka:event-driven:dispatcher:global-1] ERROR akka.actor.Actor$ - Exception when invoking
    actor [Actor[tuto3.WorkerActor:46f67fb0-506a-11e0-a0e5-001d92ad4c1a]]
    with message [die]
16:43:40.875 [akka:event-driven:dispatcher:global-1] ERROR akka.actor.Actor$ - Problem
java.lang.RuntimeException: 고의적 DIE
    at tuto3.WorkerActor.onReceive(WorkerActor.java:18) ~[classes/:na]
    ...
Worker 2: 메시지2-1
Worker 2: 메시지2-2
Worker 2: 메시지2-3
16:43:40.968 [akka:event-driven:dispatcher:global-3] INFO  akka.actor.Actor$ - Restarting actor [tuto3.WorkerActor] configured as PERMANENT.
16:43:40.968 [akka:event-driven:dispatcher:global-3] DEBUG akka.actor.Actor$ - Invoking 'preRestart' for failed actor instance [tuto3.WorkerActor].
Worker 1: 재시작 전처리
액터 생성됨: 3
16:43:40.968 [akka:event-driven:dispatcher:global-3] DEBUG akka.actor.Actor$ - Invoking 'postRestart' for new actor instance [tuto3.WorkerActor].
Worker 3: 재시작 후처리
16:43:40.968 [akka:event-driven:dispatcher:global-3] DEBUG akka.actor.Actor$ - Restart: true for [tuto3.WorkerActor].
16:43:40.968 [akka:event-driven:dispatcher:global-3] DEBUG a.d.Dispatchers$globalExecutorBasedEventDrivenDispatcher$ - Resuming 46f67fb0-506a-11e0-a0e5-001d92ad4c1a
16:43:40.984 [akka:event-driven:dispatcher:global-4] DEBUG akka.dispatch.MonitorableThread - Created thread akka:event-driven:dispatcher:global-4
Worker 3: 메시지1-3

위 실행 결과를 보면 다음의 사실을 확인할 수 있다.
  • 재시작 전처리는 1번 Worker가 수행한다.
  • 전처리 후, worker1에 해당하는 새로운 액터 객체를 생성한다. (액터 생성된: 3)
  • 재시작 후처리는 3번 Worker가 수행한다.
  • 이후 worker1은 3번 Worker와 연결되며, "메시지1-3" 메시지는 3번 Worker가 수행하게 된다.
즉, 액터가 죽으면 그 액터 객체를 재사용하는 것이 아니라 새로운 액터 객체를 생성하는 방법으로 재시작하는 것을 알 수 있다.

Supervisor 클래스를 이용한 수퍼바이저 액터 생성

수퍼바이저 액터에서 직접 관리할 액터를 생성하는 경우가 아니면 수퍼바이저 액터를 별도로 구현하기 보다는 Akka가 제공하는 Supervisor 클래스를 이용하는 것이 편리하다.


import akka.actor.Supervisor;
import akka.actor.SupervisorFactory;
import akka.config.Supervision;
import akka.config.Supervision.OneForOneStrategy;
import akka.config.Supervision.Supervise;
import akka.config.Supervision.SupervisorConfig;

...

ActorRef worker1 = Actors.actorOf(WorkerActor.class);
ActorRef worker2 = Actors.actorOf(WorkerActor.class);

Supervise[] supervises = new Supervise[2];
supervises[0] = new Supervise(worker1, Supervision.permanent());
supervises[1] = new Supervise(worker2, Supervision.permanent());

OneForOneStrategy strategy = new OneForOneStrategy(
        new Class[] {RuntimeException.class}, 3, 3000);
SupervisorConfig config = new SupervisorConfig(strategy, supervises);

Supervisor supervisor = new SupervisorFactory(config).newInstance();
// supervisor 생성 시점에서 내부적으로 생성한 SupervisorActor와 worker1가 worker2가 시작됨

worker1.sendOneWay("메시지");

SupervisorFactory를 통해서 Supervisor를 생성하면, 내부적으로 SupervisorActor 타입의 액터를 생성하고, 그 액터에 SupervisorConfig에 지정된 모든 액터를 연결(link)하고, 각 액터를 시작(start) 한다.

내부적으로 생성한 SupervisorActor에 접근하고 싶다면, 다음과 같이 supervisor() 메서드를 사용하면 된다.

Supervisor supervisor = ...;
ActorRef supervisorActor = supervisor.supervisor();


재시작 횟수 제한

OneForOneStrategy나 AllForOneStrategy를 생성할 때 두 번째/세 번째 파라미터는 각각 제한된 시간 내의 최대 재시작 시도 회수와 제한을 시간 의미한다.  예를 들어, 아래 코드는 1초 이내에 최대 3번의 재시작 시도를 시도한다는 것을 의미한다. (1초 안에 재시작을 3번까지 허용한다는 의미가 아니다.)

new OneForOneStrategy(new Class[] { RuntimeException.class }, 3, 1000);

1초 안에 액터 재시작을 3번 실패하면 (예를 들어, postRestart() 메서드에서 런타임 예외가 발생해서 실패), 해당 액터에 대해 재시작 시도를 하지 않으며 더 이상 액터를 사용할 수 없게 된다.

액터 생성/연결 한번에 하기

ActorRef는 액터를 생성하고 관리하기 위한 메서드를 제공하고 있으며, 이들 메서드는 다음과 같다.
  • ActorRef spawn(Class clazz): 액터를 생성하고 시작한다.
  • ActorRef spawnLink(Class clazz): 액터를 생성하고 시작하고, 연결한다.
  • ActorRef spawnRemote(Class clazz, String host, int port, long timeout): 리모트 액터를 생성하고 시작한다.
  • void startLink(ActorRef actor): 액터를 시작하고 연결한다.
위 메서드를 이용하면 다음과 같이 코드를 조금 더 간결하게 작성할 수 있다.

ActorRef master = Actors.actorOf(MasterActor.class);
master.start();
ActorRef worker1 = master.spawnLink(WorkerActor.class); // worker1 액터 시작/연결 됨

ActorRef worker2 = Actors.actorOf(WorkerActor.class);
master.startLink(worker2); // worker2 시작/연결 됨


참고자료


Posted by 최범균 madvirus

댓글을 달아 주세요

Akka가 관심을 끄는 이유는 사실 액터 모델 자체보다는 리모트 노드에 위치한 액터를 마치 로컬에 위치한 액터처럼 사용할 수 있다는 것이었다. Scala 언어가 자체적으로 액터를 제공하고 있지만, Akka의 액터는 이 액터 모델을 리모트까지 확장했기 때문에, Akka를 사용하면 한 노드에서의 병행 처리 뿐만 아니라 다수 노드에서의 병행 처리까지 쉽게 구현할 수 있다.

리모트 액터를 사용하기 위한 과정

리모트 액터를 사용하려면 다음의 과정을 거치면 된다.
  1. 리모트 서버를 만든다. 리모트 서버는 리모트로 제공될 액터를 관리하며, 클라이언트는 리모트 서버에 연결해서 리모트로 제공되는 액터를 사용하게 된다.
  2. 리모트 서버에 액터 등록하기 (클라이언트에서 액터 등록하기, 서버에서 액터 등록하기)

[주의]
클라이언트와 서버는 모두 액터에서 사용되는 클래스를 갖고 있어야 한다. 이후 버전에서는 클라이언트와 서버간의 코드 제공 기능이 포함될 거라고 한다.


단계1, 리모드 액터를 실행할 리모트 서버 만들기

액터를 외부 노드에 제공하고 싶다면, 먼저 클라이언트와의 연결을 처리할 서버를 생성해 주어야 한다. 서버는 다음의 코드를 이용해서 리모트 액터를 실행할 서버를 생성할 수 있다.

Actors.remote().start("0.0.0.0", 2552); // 모든 호스트에 대해 2552 포트로 들어오는 요청 처리


Actors.remote().start("localhost", 2553); // 로컬 호스트의 2553 포트로 들어오는 요청 처리


Actors.remote().start(); // 설정 파일에 있는 기본 값 사용 (설정 파일 없을 시 기본값은 "localhost", 2552)


start() 메서드에서 호스트 값으로 "localhost"를 지정하면, 로컬호스트로 들어오는 요청에 대해서만 처리할 수 있기 때문에, 실 환경에서는 의미가 없다. 실 환경에서는 "0.0.0.0"이나 "192.168.0.1"과 같이 전체 허용 또는 특정 호스트를 지정해 주는 것이 좋다.

단계2, 리모트 서버에 액터 생성하기

다음의 두 가지 방법을 이용해서 리모트 서버에서 액터를 실행할 수 있다.3
  • 클라이언트에서 생성/관리: 리모트 노드에 있는 액터를 클라이언트에서 관리해야 할 때 사용 (액터 모니터링, 액터 수퍼바이징 등)
  • 서버에서 생성/관리: 클라이언트에 액터 서비스만 제공하고 서버에서 액터에 대한 관리를 할 때 주로 사용한다.

클라이언트에서 원격지 서버에 액터 생성하기

다음의 코드를 사용하면 클라이언트에서 리모트 서버에 액터를 생성하고 관리할 수 있다.

ActorRef actor1 = Actors.remote().actorOf(MyActor.class, "172.20.1.11", 2552);
actor1.start();
actor1.sendOneWay("hello");
actor1.stop();

Actors.remote().actorOf() 메서드는 리모트 서버에 MyActor 타입의 액터를 생성한다. 클라이언트에서 액터를 생성한 경우 로컬 액터를 사용하듯이 start() 메서드를 이용해서 액터를 시작하고 stop() 메서드를 이용해서 액터를 종료할 수 있다.

클라이언트에서 리모트 서버에 액터를 생성할 때 주의할 점은 호스트와 포트가 "localhost"와 2552 이면, 리모트 액터가 아닌 로컬 액터로 생성해서 실행된다는 점이다.

서버에서 액터 생성해서 등록하기

서버에서 액터를 생성해서 클라이언트에 제공할 수도 있다. 서버에서 액터를 등록할 때에는 다음과 같이 register() 메서드를 사용하면 된다.

Actors.remote().start("0.0.0.0", 2552);
// MyActor를 리모트 액터로 등록, 식별값은 "hello-service"
Actors.remote().register("hello-service", Actors.actorOf(MyActor.class));

register() 메서드를 사용하면 액터는 자동으로 시작된다.

클라이언트는 액터의 식별값을 이용해서 리모트 액터에 대한 레퍼런스를 구할 수 있으며, 이 레퍼런스를 이용해서 리모트 액터에 메시지를 전달할 수 있다. actorFor() 메서드를 사용하면 리모트 노드에서 생성한 액터에 접근할 수 있다.

// 192.168.1.11:2553 포트로 실행중인 리모트 서버에 등록된 "hello-service" 액터 접근
ActorRef actor = Actors.remote().actorFor("hello-service", "192.168.1.11", 2553);
actor.sendOneWay("테스트!!!"); // 리모트 액터에 메시지 전달


리모트 액터에서 클라이언트에 응답하기

리모트 액터에서 클라이언트에 응답하는 방법은 앞서 'Akka 첫 번째, Akka를 이용한 Concurrent 프로그래밍 시작하기' 에서 살펴봤던 것과 동일하다.

로컬 액터에서 리모트 액터로 메시지를 전송하면, 리모트 액터는 getContext().getSender()를 이용해서 로컬 액터에 메시지를 전달할 수 있다. (즉, 리모트 액터 입장에서는 로컬 액터가 리모트 액터가 되는 것이다.) 리모트 액터에서 로컬 액터에 메시지를 전송할 때에도 결국 네트워크를 통해서 보내기 때문에, 클라이언트도 리모트 서버를 실행해야 리모트 액터에서 로컬 액터에 메시지를 전송할 수 있게 된다.

---- 클라이언트 코드

// 리모트 액터에서 로컬 액터에 메시지 보낼 때 사용할 서버 실행
Actors.remote().start("192.168.4.4", 2552);

ActorRef localActor = Actors.actorOf(LocalActor.class);
localActor.start();

ActorRef actor = Actors.remote().actorFor("hello-service", "192.168.4.3", 2552);
actor.sendOneWay("테스트!!!", localActor); // 로컬 액터를 sender로 지정

---- 리모트 액터 코드
public class MyActor extends UntypedActor {
   
    @Override
    public void onReceive(Object msg) throws Exception {
        if (getContext().getSender().isDefined()) {
            ActorRef sender = getContext().getSender().get(); // 클라이언트의 LocalActor가 sender
            sender.sendOneWay(msg); // 192.168.4.4:2552 로 메시지 전송
        }
    }
}

클라이언트 코드에서 리모트 서버를 실행하지 않으면, 리모트 액터가 로컬 액터에 메시지를 전달할 수 없게 된다. 즉, 서로 다른 노드에 있는 액터들 간에 메시지를 주고 받기 위해서는 각 노드마다 리모트 서버를 실행시켜 주어야 한다.

비신뢰 모드(UntrustedMode)로 리모트 서버 실행하기

리모트 서버를 비신뢰 모드로 실행하게 되면, 클라이언트에서 액터를 생성할 수 없게 된다. 리모트 서버를 비신뢰 모드로 실행하려면 설정 파일에 다음과 같이 untrusted-mode 값을 on으로 설정해 주면 된다.

akka {
    remote {
        server {
            untrusted-mode = on  # 기본 값은 off
        }
    }
}

비신뢰 모드로 실행하면 클라이언트에서 리모트 액터에 대해 다음의 메서드에 대한 호출이 제한된다.
  • start(), stop(), link(), unlink(), spawnLink() 등

리모트 서버와 클라이언트 종료 처리

아래의 클라이언트 코드를 실행하면 JVM이 종료되지 않고 실행된 채로 남아 있는다. 이유는 Akka가 내부적으로 리모트 서버와의 연결 처리를 위해 사용하는 쓰레드가 죽지 않기 때문이다.

public class Client {

    public static void main(String[] args) {
        ActorRef actor = Actors.remote().actorFor("hello-service", "172.20.4.64", 2553);
        actor.sendOneWay("테스트!!!");
        // JVM 종료되지 않음
    }
}

shutdown() 메서드를 이용해서 리모트 서버와의 연결을 종료시키고 관련된 모든 쓰레드를 함께 종료시켜주므로, 클라이언트 코드에서 리모트 액터에 대한 사용이 끝나면 shutdown() 메서드를 호출해서 JVM을 종료처리할 수 있다.

ActorRef actor = Actors.remote().actorFor("hello-service", "172.20.4.64", 2553);
// actor 사용
Actors.remote().shutdown(); // 프로그램 종료시 반드시 실행해 주어야 함

리모트 서버에서도 마찬가지로, 어플리케이션을 종료처리할 때 shutdown() 메서드를 호출해 주어야 관련된 쓰레드가 모두 정리되어 JVM이 종료하게 된다.

Actors.remote().start("0.0.0.0", 2553);
...
Actors.remote().shutdown(); // 프로그램 종료시 반드시 실행해 주어야 함


이벤트 처리

클라이언트와 서버는 액터를 이용해서 클라이언트의 연결/해제 등의 이벤트를 수시할 수 있다. 리모트 기능과 관련된 이벤트를 처리하고 싶다면 다음과 같이 이벤트를 수신할 액터를 Actors.remote().addListener() 메서드를 이용해서 이벤트 리스너로 등록해주면 된다.

ActoreRef listener = Actors.actorOf(ListenerActor.class);
listener.start();

Actors.remote().addListener(listener);

리스너로 사용되는 액터에는 리모트 기능과 관련된 이벤트 객체가 메시지로 전달되며, 액터는 이벤트 타입에 따라서 알맞은 작업을 수행하면 된다.

public class ListenerActor extends UntypedActor {

    @Override
    public void onReceive(Object message) throws Exception {
        if (message instanceof RemoteServerStarted) {
            ...
        }
    }

}

주요 이벤트 클래스는 다음과 같다.
  • 서버측 이벤트
    • RemoteServerStarted
    • RemoteServerShutdown
    • RemoteServerClientConnected
    • RemoteServerClientDisconnected
    • RemoteServerClientClosed
    • RemoteServerWriteFailed
  • 클라이언트측 이벤트
    • RemoteClientConnected
    • RemoteClientDisconnected
    • RemoteClientStarted
    • RemoteClientShutdown
    • RemoteClientError
    • RemoteClientWriteFailed

참고자료

Posted by 최범균 madvirus

댓글을 달아 주세요

개인적으로 관심을 가지고 지켜보던 Akka 프로젝트가 1.0 버전이 되었다. 평소에 병행 처리와 분산 처리에 관심이 많았는데, Akka는 이를 보다 쉽게 구현할 수 있도록 도와주는 프로젝트이다. 본 글에서는 Akka가 무엇인지 간단하게 설명하고 실제 Akka를 이용해서 액터를 생성하고 실행하는 방법을 살펴볼 것이다.

Akka란?

Akka는 병행(concurrent) 및 분산 처리를 위한 오픈 소스 프로젝트로서 액터(Actor) 모델을 이용하고 있다. 필자가 액터 모델 자체에 대한 이해가 완전하지 않지만, 액터 모델을 간단하게 설명하면 다음의 특징을 갖는다.

  • 액터들은 상태를 공유하지 않는다.
  • 액터들 간의 통신은 메시지 전달을 통해서 이루어진다. (이벤트 기반 모델)
  • 액터간의 통신은 비동기로 이루어진다.
  • 각 액터는 전달받은 메시지를 큐에 보관하며, 메시지를 순차적으로 처리한다.
  • 액터는 일종의 경량 프로세서다.
위와 같은 특징은 병행 처리 코드를 보다 쉽게 구현할 수 있도록 도와준다. 실제로 다중 쓰레드 프로그래밍을 해 본 개발자 중에서는 올바르지 못한 동기화 처리로 쓰레드 블럭킹 되는 등의 문제로 고생한 경험이 한 두 번씩은 존재할 것이다. 액터는 애초에 데이터를 서로 공유하지 않는 것을 원칙으로 하기 때문에, 데드락이나 락에 대한 고민을 줄여주고 병행 처리 그 자체에 집중할 수 있도록 도와준다.

Akka는 액터 모델을 통해서 병행 처리를 쉽게 할 수 있도록 도와줄 뿐만 아니라, 리모트 노드에 존재하는 액터를 마치 로컬에 존재하는 액터처럼 사용할 수 있도록 해 주고 있다. 개발자는 통신 프로토콜에 대해 고민할 필요 없이 리모트 액터를 사용할 수 있기 때문에 분산 처리 코드를 손쉽게 작성할 수 있게 된다.

Akka는 스카라(Scala)와 자바(Java)의 두 언어에 대한 API를 제공하고 있는데, 필자가 스카라 언어 자체에 대해서는 아직 잘 모르고 국내에서도 스카라 언어에 대한 관심이 적은 관계로 자바 API를 기준으로 Akka 사용법을 살펴볼 것이다.

Akka 코어 사용

Akka를 사용하려면 http://akka.io 사이트에서 필요한 파일을 다운로드 받으면 되는데, Maven을 사용하고 있다면 pom.xml 파일에 다음과 같이 리포지토리 설정과 의존 설정을 추가해주면 Akka가 제공하는 액터 기능을 사용할 수 있다.

<repositories>
    <repository>
        <id>Akka</id>
        <name>Akka Maven2 Repository</name>
        <url>http://akka.io/repository/</url>
    </repository>

    <repository>
        <id>Multiverse</id>
        <name>Multiverse Maven2 Repository</name>
        <url>http://multiverse.googlecode.com/svn/maven-repository/releases/</url>
    </repository>

    <repository>
        <id>GuiceyFruit</id>
        <name>GuiceyFruit Maven2 Repository</name>
        <url>http://guiceyfruit.googlecode.com/svn/repo/releases/</url>
    </repository>

    <repository>
        <id>JBoss</id>
        <name>JBoss Maven2 Repository</name>
        <url>http://repository.jboss.org/nexus/content/groups/public/</url>
    </repository>
</repositories>

<dependencies>
    <dependency>
        <groupId>se.scalablesolutions.akka</groupId>
        <artifactId>akka-actor</artifactId>
        <version>1.0</version>
    </dependency>
</dependencies>

[주의]
Akka는 Scala로 만들어졌기 때문에, 이클립스에서 Akka 모듈을 사용할 때 코드 어시스트가 제대로 동작하려면 Scala-IDE 등을 이용해서 개발환경을 구축해 주어야 한다. 그렇지 않으면, 코드 어시스트가 안 되서 짜증나는 시간을 보내게 될 것이다.

액터 클래스 및 사용

액터를 생성하고 사용하려면, 먼저 Akka가 제공하는 기반 클래스를 이용해서 액터 역할을 수행할 클래스를 구현해 주어야 한다. Akka는 akka.actor.UntypedActor 클래스를 제공하고 있으며, 이 클래스를 상속받아서 액터를 구현할 수 있다.

import akka.actor.UntypedActor;

public class PrintActor extends UntypedActor {

    @Override
    public void onReceive(Object message) throws Exception {
        Thread.sleep(500); // 테스트를 위해 0.5초 sleep
        System.out.println(message);
    }

}

UntypedActor를 상속받은 클래스는 onReceive() 메서드를 구현해 주어야 하는데, onReceive() 메서드는 액터에 전달된 메시지를 처리하게 된다.

액터 클래스를 만들었다면, 액터를 생성한 뒤에 액터에 메시지를 전달할 수 있다. 액터를 생성할 때에는 akka.actor.Actors 클래스의 static 메서드인 actorOf() 메서드를 사용한다. Actors.actorOf() 메서드는 액터를 구현한 클래스의 Class를 전달받으며, 액터를 생성한다.

ActorRef actor = Actors.actorOf(PrintActor.class);
actor.start();
// actor를 이용해서 액터에 메시지를 전달

ActorRef의 start() 메서드는 액터를 시작하며, 액터가 시작된 이후부터 액터에 메시지를 전달할 수 있게 된다.

액터에 메시지 전달하기

Actors.actorOf()를 이용해서 액터를 생성했다면, 이후 ActorRef가 제공하는 메서드를 이용해서 액터에 메시지를 전달할 수 있다. 다음의 세 가지 방법으로 액터에 메시지를 전달할 수 있다.
  • Fire-And-Forget: 메시지를 전달하고 메시지에 대한 응답을 기다리지 않는다. 병행 및 확장에 적합한 메시지 전달 방식이다.
  • Send-And-Receive-Eventually: 메시지를 전달하고 응답을 받는다. 응답을 받을 때 까지 블록킹된다.
  • Send-And-Receive-Future: 메시지를 전달하고 응답을 받기 위한 Future를 리턴한다.
sendOneWay() 메서드를 이용한 Fire-And-Forget 방식 메시지 전달

ActorRef.sendOneWay() 메서드는 메시지를 액터에 전달할 때 사용된다. sendOneWay()라는 이름에서 알 수 있듯이 이 메서드는 액터로부터 어떤 값도 받지 않으며, 액터로부터 응답을 기다리지 않고 곧 바로 리턴한다.

ActorRef actor = Actors.actorOf(PrintActor.class);
actor.start();
actor.sendOneWay("받아라"); // actor에 "받아라" 메시지를 전달하고 바로 리턴.
actor.sendOneWay("받아라2");
actor.sendOneWay("받아라3");
System.out.println("비동기로 실행");

메시지를 받은 액터는 내부적으로 사용하는 큐에 메시지를 보관한 뒤, 차례대로 액터의 onReceive(Object message) 메서드에 메시지를 전달한다. PrintActor의 onReceive() 메서드는 0.5초후에 전달받은 메시지를 출력하고 sendOneWay()메서드는 응답 대기 없이 바로 리턴하므로, 위 코드가 실행되면 콘솔에는 다음과 같은 순서로 문자열이 출력된다.

비동기로 실행
받아라
받아라2
받아라3

sendOneWay() 메서드는 다음의 두 가지를 제공된다.
  • sendOneWay(Object message)
  • sendOneWay(Object message, ActorRef sender) : 메시지를 전송하면서 메시지를 보낸 액터로 sender를 지정한다.

sendRequestReply() 메서드를 이용한 Send-And-Receive-Eventually 방식 메시지 전달

ActorRef.sendRequestReply() 메서드는 액터에 메시지를 전달하고, 그 메시지에 대한 응답이 올 때 까지 대기하고 싶을 때 사용된다. 액터 구현 클래스는 getContext().replyUnsafe() 메서드를 이용해서 메시지에 대해 응답할 수 있는데, ActorRef.sendRequestReply() 메서드는 이 응답을 리턴하게 된다. 예를 들어, 다음과 같이 메시지에 대해 응답하는 액터가 있다고 하자.

public class PingActor extends UntypedActor {

    @Override
    public void onReceive(Object message) throws Exception {
        getContext().replyUnsafe("응답: "+ message); // 메시지 sender에 응답
    }

}

이 경우 다음과 같이 sendRequestReply() 메서드를 이용함으로써 액터에 전달한 메시지에 대한 응답이 도착할 때 까지 대기할 수 있다.

ActorRef actor = Actors.actorOf(PingActor.class);
actor.start();
Object res = actor.sendRequestReply("헬로우"); // 액터로부터 응답이 도착할 때 까지 대기

sendRequestReply() 메서드는 일정 시간 동안 액터로부터 응답이 없을 경우 akka.actor.ActorTimeoutException 예외를 발생시킨다. 별도 설정을 하지 않은 경우 기본 타입 시간은 5초이며, sendRequestReply() 메서드를 호출할 때 타임아웃을 지정할 수도 있다.

Object res = actor.sendRequestReply("헬로우", 1000, null); // 1초간 응답 대기

sendRequestReply() 메서드는 다음의 세 가지가 존재한다.
  • sendRequestReply(Object message)
  • sendRequestReply(Object message, ActorRef sender): 메시지를 보낸 액터로 sender를 지정한다.
  • sendRequestReply(Object message, long timeout, ActorRef sender)

sendRequestReplyFuture() 메서드를 이용한 Send-And-Receive-Future 방식 메시지 전달

sendRequestReplyFuture() 메서드는 메시지를 전달한 뒤 응답을 받기 위한 Future를 리턴한다. Future는 자바가 제공하는 Future가 아닌 Akka가 제공하는 akka.dispatch.Future 타입이다. Future는 주로 다음과 같은 형식으로 주로 사용된다.

Future future = actor.sendRequestReplyFuture("하이");
future.await(); // 응답을 대기. 대기 시간을 초과하면 예외 발생
if (future.isCompleted()) { // 완료되었다면
    Option resultOption = future.result(); // 응답 구함
    if (resultOption.isDefined()) { // 응답 데이터가 있다면,
        Object result = resultOption.get(); // 응답 데이터 구함
        System.out.println(result);
    }
}

sendRequestReplyFuture()가 리턴한 Future의 await() 메서드는 시간이 초과될 때 까지 대기한다. 시간이 초과되기 전에 응답이 도착하면 다음으로 넘어가고, 시간이 초과되면 ActorTimeoutException 예외를 발생시킨다.

sendRequestReplyFuture() 메서드는 다음의 세 가지가 존재한다.
  • sendRequestReplyFuture(Object message)
  • sendRequestReplyFuture(Object message, ActorRef sender) : 메시지를 보낸 액터로 sender를 지정한다.
  • sendRequestReplyFuture(Object message, long timeout, ActorRef sender)
두 개 이상의 액터에 메시지를 전달한 후 액터로부터의 응답이 모두 도착할 때 까지 대기해야 한다면, Futures.awaitAll(Future[] futures) 메서드를 사용하면 된다.

// actor들에 메시지 전달(작업 전달)
for (ActorRef actor : actors) {
    futureList.add(actor.sendRequestReplyFuture(someWork);
}
Future[] futures =  futureList.toArray();
Future.awaitAll(futures); // 모든 액터로부터 응답이 (작업 결과가) 올 때 까지 대기.
// futures로부터 응답 구해서 처리


액터에서 메시지 받아 처리하기

ActorRef의 send*() 메서드를 통해서 전달된 메시지는 UntypedActor 클랫를 상속받은 액터 구현 클래스의 onReceive(Object message)에 차례대로 전달된다. (TypedActor를 사용하면 인터페이스를 이용해서 메시지를 전달받을 메서드를 정의할 수도 있는데, 이에 대한 내용은 다음에 살펴볼 것이다.)

onReceive() 메서드는 다음과 같이 메시지의 타입을 확인한 뒤 메시지 타입에 맞는 동작을 수행하도록 구현하는 것이 보통이다.

public class ActorImpl extends UntypedActor {
    public void onReceive(Object message) throws Exception {
        if (message instanceof String) {
            // 메시지 처리
        }
        ...
    }
}


메시지에 응답하기

replyUnsafe()/replySafe()를 이용한 응답

액터 구현 클래스는 getContext()를 이용해서 해당 액터에 대한 ActorRef를 구할 수 있는데, ActorRef의 replyUnsafe() 또는 replySafe() 메서드를 이용해서 메시지에 대한 응답을 전달할 수 있다. replyUnsafe() 메서드는 응답 실패시 예외를 발생시키는 반면에 replySafe() 메서드는 응답에 실패할 경우 false를 리턴한다.

public class PingActor extends UntypedActor {

    public void onReceive(Object message) throws Exception {
        if (message.equals("ping")) {
            if (! getContext.replySafe("pong")) {
                // 실패에 대한 처리
            }
        }
    }
}

액터에 메시지를 전달할 때 sendRequestReply() 메서드나 sendRequestReplyFuture() 메서드를 사용한 경우, replyUnsafe()와 replySafe()를 이용해서 응답한 데이터를 리턴 값으로 받게 된다.
 
메시지를 전달한 액터에 메시지로 응답하기

액터가 다른 액터에게 메시지를 전달하기도 한다. 이때 sendRequestReply*() 메서드에 대한 응답이 아니라 메시지를 전달한 액터에 메시지를 전달하는 방법으로 응답할 수도 있을 것이다. 이렇게 메시지를 전달한 액터에 응답으로 메시지를 전송하고 싶다면, getContext().getSender() 메서드를 이용해서 메시지를 보낸 액터에 대한 ActorRef를 구한 뒤, 그 ActorRef의 send*() 메서드를 이용해서 응답을 전달하면 된다.

public class PongActor extends UntypedActor {

    @Override
    public void onReceive(Object message) throws Exception {
        if (message.equals("ping")) {
            if (getContext().getSender().isDefined()) {
                ActorRef sender = getContext().getSender().get();
                sender.sendOneWay("pong", getContext());
            } else {
                getContext().replyUnsafe("pong");
            }
        }
    }
}


액터의 라이프사이클

액터의 라이프 사이클은 다음과 같다.
  • NEW: 액터가 만들어졌을 때. 메시지를 수신하지 못한다.
  • STARTED: start()가 호출되었을 때. 메시지를 수신할 수 있다.
  • SHUTDOWN: exit()나 stop()이 호출되었을 때. 어떤 것도 하지 못한다.
ActorRef는 start(), stop() 메서드를 제공하고 있으며, 이들 메서드를 이용해서 액터를 시작하고, 중지할 수 있다. 아래 코드는 전형적인 액터의 사용방법을 보여주고 있다.

actor.start(); // 액터를 시작
// 필요한 만큼 액터에 메시지 전달
actor.sendOneWay(msg);
...
actor.stop(); // 액터 종료

start() 메서드는 액터와 메시지 큐를 시작하고, stop() 메서드는 액터의 디스패처와 메시지 큐를 포함한 액터를 종료시킨다.

모든 액터를 종료시키고 싶다면, 다음과 같은 코드를 사용하면 된다.

Actors.registry().shtudownAll();


UntypedActor 클래스의 라이프 사이클 관련 콜백 메서드

UntypedActor 클래스는 라이프 사이클과 관련해서 시작/중지 이벤트를 처리할 수 있는 콜백 메서드를 제공하고 있다.
  • preStart(): 액터 시작 전에 호출된다.
  • postStop(): 액터 종료 후에 호출된다.
  • preRestart(Throwable reason): 액터 재시작 전에 호출된다. (무정지 액터 기능과 관련됨)
  • postRestart(Throwable reason): 액터 재시작 후에 호출된다. (무정지 액터 기능과 관련됨)

설정 파일 지정 및 기본 값

Akka는 다음의 세 가지 방법 중 한가지를 이용해서 설정 파일을 찾는다.
  • akka.config 시스템 프로퍼티로 지정한 파일 (java -Dakka.config=... )
  • 클래스패스에 위치한 akka.config 파일
  • AKKA_HOME 환경변수 존재 시, '$AKKA_HOME/config 디렉터리의 설정 파일 사용. (또는 akka.home 시스템 프로퍼티를 AKKA_HOME 환경 변수 대신 사용)
각 설정 정보 및 기본 값은 http://doc.akka.io/configuration 참고하기 바란다.

참고자료

Posted by 최범균 madvirus

댓글을 달아 주세요

  1. 고광현 2016.10.25 17:41 신고  댓글주소  수정/삭제  댓글쓰기

    메시지에 응답하기
    replyUnsafe()/replySafe()를 이용한 응답 예제에서 실패에 대한처리 부분에 질문이 있는데요
    if (! getContext.replySafe("pong")) {

    실패시 처리인데 아직 해당 actor가 작업을 마치지 않았늗네 결과를 알수가 있나요?

    • 최범균 madvirus 2016.10.27 08:13 신고  댓글주소  수정/삭제

      제가 akka 내부를 잘 모르지만, reply라는 게 메시지를 보내는 용도니 메시지 보내는데 실패했다를 의미합니다.
      메시지를 수신한 것과, 수신한 메시지를 처리하는 건 구별된다고 알고 있습니다.