주요글: 도커 시작하기
반응형

에러 처리

시퀀스는 데이터를 발생하는 과정에서 에러를 발생할 수 있다. 리액터는 에러를 처리하는 여러 방법을 제공하는데 이 글에서는 레퍼런스 문서에서 언급하는 에러 처리 방법을 차례대로 살펴볼 것이다.


참고로 에러 신호는 종료 신호이다. 따라서 에러 신호가 발생하면 시퀀스는 종료되고 더 이상 데이터를 발생하지 않는다.


에러 신호 처리

에러 신호가 발생하면 Subscriber의 onError 메서드가 호출된다. 이 메서드를 구현한 Subscriber를 이용해서 구독을 하면 에러 신호를 알맞게 처리할 수 있다. 또한 에러 처리를 위한 Consumer를 파라미터로 갖는 subscribe() 메서드를 사용해서 익셉션을 처리할 수 있다. 다음 코드는 에러 처리를 위한 Consumer를 파라미터로 갖는 subscribe() 메서드의 사용 예를 보여준다.


Flux.range(1, 10)

        .map(x -> {

            if (x == 5) throw new RuntimeException("exception"); // 에러 발생

            else return x;

        })

        .subscribe(

                i -> System.out.println(i), // next 신호 처리

                ex -> System.err.println(ex.getMessage()), // error 신호 처리

                () -> System.out.println("complete") // complete 신호 처리

        );


위 코드는 1부터 10개의 값을 발생하는데 값이 5이면 익셉션을 발생하는 시퀀스를 생성한다. subscribe() 메서드는 3개의 인자를 갖는데 차례대로 next, error, complete 신호를 처리한다. 실행 결과는 다음과 같다.


1

2

3

4

exception


에러 신호는 종료 신호이므로 익셉션 발생 이후에 더 이상 next 신호가 발생하지 않는 것을 확인할 수 있다.


에러 신호를 처리하기 위한 subscribe() 메서드는 다음과 같다.

  • subscribe(Consumer<? super T> consumer,
                    Consumer<? super Throwable> errorConsumer)
  • subscribe(Consumer<? super T> consumer,
                    Consumer<? super Throwable> errorConsumer,
                    Runnable completeConsumer)
  • subscribe(Consumer<? super T> consumer,
                    Consumer<? super Throwable> errorConsumer,
                    Runnable completeConsumer,
                    Consumer<? super Subscription> subscriptionConsumer)
  • subscribe(Subscriber<? super T> subscriber)


에러 발생하면 기본 값 사용하기: onErrorReturn

에러가 발생할 때 에러 대신에 특정 값을 발생하고 싶다면 onErrorReturn() 메서드를 사용한다. 이 메서드의 사용 예는 다음과 같다.


Flux<Integer> seq = Flux.range(1, 10)

        .map(x -> {

            if (x == 5) throw new RuntimeException("exception");

            else return x;

        })

        .onErrorReturn(-1);


seq.subscribe(System.out::println);


위 코드를 실행한 결과는 다음과 같다.


1

2

3

4

-1


실행 결과에서 알 수 있듯이 에러 대신에 값을 발생한 뒤에 시퀀스는 종료된다.


발생한 익셉션이 특정 조건을 충족하는 경우에만 에러 대신에 특정 값을 발생하고 싶다면 Predicate을 인자로 받는 onErrorReturn() 메서드를 사용한다. 익셉션이 특정 타입인 경우에만 에러 대신 특정 값을 발생하고 싶다면 Class 타입을 인자로 받는 onErrorReturn() 메서드를 사용한다.

  • Flux<T> onErrorReturn(Predicate<? super Throwable> predicate, T fallbackValue)
  • <E extends Throwable> Flux<T> onErrorReturn(Class<E> type, T fallbackValue)


에러 발생하면 다른 신호(시퀀스)나 다른 에러로 대체하기: onErrorResume

onErrorResume 메서드를 사용하면 에러가 발생하면 다른 시퀀스나 에러로 대체할 수 있다. onErrorResume 메서드는 다음 타입의 함수를 파라미터로 갖는다.

  • Function<? super Throwable, ? extends Publisher<? extends T>> :
    Throwable을 입력으로 받고 Publisher를 리턴하는 함수
이 함수는 시퀀스에서 발생한 에러를 입력으로 받아 결과로 Publisher를 리턴한다. 즉 에러가 발생하면 이를 다른 데이터 신호로 대체한다. 다음 코드를 보자.

Random random = new Random();
Flux<Integer> seq = Flux.range(1, 10)
        .map(x -> {
            int rand = random.nextInt(8);
            if (rand == 0) throw new IllegalArgumentException("illarg");
            if (rand == 1) throw new IllegalStateException("illstate");
            if (rand == 2) throw new RuntimeException("exception");
            return x;
        })
        .onErrorResume(error -> {
            if (error instanceof IllegalArgumentException) {
                return Flux.just(21, 22);
            }
            if (error instanceof IllegalStateException) {
                return Flux.just(31, 32);
            }
            return Flux.error(error);
        });

seq.subscribe(System.out::println);


이 코드에서 onErrorResume() 메서드에 전달한 함수를 보자. 이 함수는 발생한 에러가 IllegalArgumentException이면 Flux.just(21, 22)로 21, 22 값을 생성하는 시퀀스를 리턴하고, 발생한 에러가 IllegalStateException이면 31, 32 값을 생성하는 시퀀스를 리턴한다. 두 조건에 해당하지 않는 경우 Flux.error() 메서드를 이용해서 익셉션을 다시 재발생시킨다.


map() 함수에서 임의로 익셉션을 발생시키도록 했으므로 실행할 때마다 결과가 달라진다. 다음은 여러 번 실행한 결과 중 하나이다. 이 실행 결과는 3번째 데이터를 map에서 처리하는 과정에서 IllegalArgumentException이 발생했고 onErrorResume()을 통해 에러 대신에 21, 22를 생성하는 Flux로 대체된 것을 보여준다.


1

2

21

22


onErrorReturn과 마찬가지로 onErrorResume도 Predicate이나 Class 타입을 파라미터로 갖는 메서드를 이용해서 처리할 익셉션을 제한할 수 있다.


에러를 다른 에러로 변환하기: onErrorMap

에러를 다른 에러로 변환할 때에는 onErrorMap을 사용한다. 다음은 간단한 사용 예이다.


Flux<Integer> seq = intSeq.onErrorMap(error -> new MyException(...));


onErrorMap도 Predicate이나 Class 타입을 파라미터로 갖는 메서드를 이용해서 변환할 익셉션을 제한할 수 있다.


재시도하기: retry

retry()를 사용하면 에러가 발생했을 구독을 재시도할 수 있다. 다음은 예제 코드이다.


Flux.range(1, 5)

        .map(input -> {

            if (input < 4) return "num " + input;

            throw new RuntimeException("boom");

        })

        .retry(1) // 에러 신호 발생시 1회 재시도

        .subscribe(System.out::println, System.err::println);


위 코드를 실행한 결과는 다음과 같다.


num 1

num 2

num 3

num 1

num 2

num 3

java.lang.RuntimeException: boom


재시도를 하면 원본 시퀀스를 다시 구독한다. 이런 이유로 위 결과는 에러가 처음 발생했을 때 다시 1부터 신호가 발생하고 있다. 두 번째 에러가 발생했을 때에는 재시도를 하지 않으므로 에러 메시지가 출력되는 것을 알 수 있다.


재시도하기: retryWhen

단순 재시도가 아닌 조금 더 복잡한 상황에 따라 재시도를 하고 싶다면 retryWhen을 사용한다. retryWhen 메서드는 사용법이 다소 복잡하다. 먼저 retryWhen 메서드의 파라미터를 보자. 파라미터 타입은 다음과 같다.

  • retryWhen(Function< Flux<Throwable>,  ? extends Publisher<?> > whenFactory)
whenFactory 파라미터는 Function 타입의 함수이다. 이 함수는 입력으로 Flux<Throwable>를 받고 결과로 Publisher를 리턴한다. 여기서 whenFactory의 함수의 입력인 Flux<Throwable>는 시퀀스가 발생하는 익셉션 신호에 해당한다. 재시도 횟수에 따라 익셉션이 여러 번 발생할 수 있는데 Flux<Throwable>이 발생하는 데이터는 바로 여러 번 발생할 수 있는 익셉션에 해당한다.

whenFactory 함수에 전달되는 Flux<Throwable>은 원본 시퀀스의 익셉션과 연관되어 있으므로 이를 컴페니언(companion) Flux라고 부른다.

whenFactory 함수는 재시도 조건에 맞게 변환한 컴페니언 Flux를 리턴한다. 이 변환한 컴페니언 Flux가 재시도 여부를 결정하는데 그 과정은 다음과 같다.
  1. 에러가 발생할 때마다 에러가 컴페니언 Flux로 전달된다.
  2. 컴페니언 Flux가 뭐든 발생하면 재시도가 일어난다.
  3. 컴페니언 Flux가 종료되면 재시도를 하지 않고 원본 시퀀스 역시 종료된다.
  4. 컴페니언 Flux가 에러를 발생하면 재시도를 하지 않고 컴페니언 Flux가 발생한 에러를 전파한다.
위 설명만으로는 감이 잘 안 올 테니 간단한 예를 살펴보자. 먼저 다음은 2번 재시도하는 예제 코드이다.


Flux<Integer> seq = Flux.just(1, 2, 3)

        .map(i -> {

            if (i < 3) return i;

            else throw new IllegalStateException("force");

        })

        .retryWhen(errorsFlux -> errorsFlux.take(2)); // 2개의 데이터 발생


seq.subscribe(

        System.out::println,

        err -> System.err.println("에러 발생: " + err),

        () -> System.out.println("compelte")

);


위 코드에서 retryWhen은 take(2)를 사용해서 2개의 데이터를 발생하는 컴페니언 Flux를 리턴한다. 이 컴페니언 Flux는 2개의 데이터를 발생하고 종료된다. 즉 2-3번 과정에 의해 2번 재시도를 하고 원본 시퀀스를 종료시킨다. 실제 실행 결과는 다음과 같다. 괄호 안의 파란 글씨는 재시도 여부를 표시한 것으로 실제 출력에 포함된 내용은 아니다.


1

2

1    (1번 재시도)

2

1    (2번 재시도)

2

complete


출력 결과에서 눈여겨 볼 점은 에러가 출력되지 않았다는 점이다. 즉 컴페니언 Flux가 complete 신호를 보내면 Subscriber에도 complete 신호가 전달되고 있다. 이 점은 retry()와 다르다. retry()는 최대 재시도 이후에도 에러가 발생하면 해당 에러를 Subscriber에 전달하는데 retryWhen은 컴페니언 Flux가 어떤 데이터를 발생하느냐에 따라 에러를 Subscriber에 전달할지 여부가 달라진다.


다음 코드는 컴페니언 Flux가 에러를 발생하는 예이다.


Flux<Integer> seq = Flux.just(1, 2, 3)

        .map(i -> {

            if (i < 3) return i;

            else throw new IllegalStateException("force");

        })

        .retryWhen(errorsFlux -> errorsFlux.zipWith(Flux.range(1, 3),

                (error, index) -> {

                    if (index < 3) return index;

                    else throw new RuntimeException("companion error"); // 

                })

        );


seq.subscribe(

        System.out::println,

        err -> System.err.println("에러 발생: " + err),

        () -> System.out.println("compelte")

);


위 코드에서 retryWhen이 생성하는 컴페니언 Flux는 에러가 세 번째 발생하면 RuntimeException을 발생한다. 즉 두 번째 에러까지는 데이터 신호를 발생하고 세 번째 에러에 에러 신호를 발생한다. 따라서 두 번 재시도를 한다. 실행 결과는 다음과 같다.


1

2

1

2

1

2

에러 발생: java.lang.RuntimeException: companion error


실행 결과를 보면 두 번 재시도 후에 에러 신호를 받은 것을 알 수 있다.


관련 글


+ Recent posts