저작권 안내: 저작권자표시 Yes 상업적이용 No 컨텐츠변경 No

스프링5 입문

JSP 2.3

JPA 입문

DDD Start

인프런 객체 지향 입문 강의

예전에 신림프로그래머 페이스북 그룹에서 진행한 "코틀린 인 액션" 책 스터디 정리한 자료


kia-ch02.pdf

kia-ch03.pdf

kia-ch04.pdf

kia-ch05.pdf

kia-ch06.pdf

kia-ch07.pdf

kia-ch08.pdf



Posted by 최범균 madvirus

댓글을 달아 주세요

에러 처리

시퀀스는 데이터를 발생하는 과정에서 에러를 발생할 수 있다. 리액터는 에러를 처리하는 여러 방법을 제공하는데 이 글에서는 레퍼런스 문서에서 언급하는 에러 처리 방법을 차례대로 살펴볼 것이다.


참고로 에러 신호는 종료 신호이다. 따라서 에러 신호가 발생하면 시퀀스는 종료되고 더 이상 데이터를 발생하지 않는다.


에러 신호 처리

에러 신호가 발생하면 Subscriber의 onError 메서드가 호출된다. 이 메서드를 구현한 Subscriber를 이용해서 구독을 하면 에러 신호를 알맞게 처리할 수 있다. 또한 에러 처리를 위한 Consumber를 파라미터로 갖는 subscribe() 메서드를 사용해서 익셉션을 처리할 수 있다. 다음 코드는 에러 처리를 위한 Consumer를 파라미터로 갖는 subscribe() 메서드의 사용 예를 보여준다.


Flux.range(1, 10)

        .map(x -> {

            if (x == 5) throw new RuntimeException("exception"); // 에러 발생

            else return x;

        })

        .subscribe(

                i -> System.out.println(i), // next 신호 처리

                ex -> System.err.println(ex.getMessage()), // error 신호 처리

                () -> System.out.println("complete") // complete 신호 처리

        );


위 코드는 1부터 10개의 값을 발생하는데 값이 5이면 익셉션을 발생하는 시퀀스를 생성한다. subscribe() 메서드는 3개의 인자를 갖는데 차례대로 next, error, complete 신호를 처리한다. 실행 결과는 다음과 같다.


1

2

3

4

exception


에러 신호는 종료 신호이므로 익셉션 발생 이후에 더 이상 next 신호가 발생하지 않는 것을 확인할 수 있다.


에러 신호를 처리하기 위한 subscribe() 메서드는 다음과 같다.

  • subscribe(Consumer<? super T> consumer,
                    Consumer<? super Throwable> errorConsumer)
  • subscribe(Consumer<? super T> consumer,
                    Consumer<? super Throwable> errorConsumer,
                    Runnable completeConsumer)
  • subscribe(Consumer<? super T> consumer,
                    Consumer<? super Throwable> errorConsumer,
                    Runnable completeConsumer,
                    Consumer<? super Subscription> subscriptionConsumer)
  • subscribe(Subscriber<? super T> subscriber)


에러 발생하면 기본 값 사용하기: onErrorReturn

에러가 발생할 때 에러 대신에 특정 값을 발생하고 싶다면 onErrorReturn() 메서드를 사용한다. 이 메서드의 사용 예는 다음과 같다.


Flux<Integer> seq = Flux.range(1, 10)

        .map(x -> {

            if (x == 5) throw new RuntimeException("exception");

            else return x;

        })

        .onErrorReturn(-1);


seq.subscribe(System.out::println);


위 코드를 실행한 결과는 다음과 같다.


1

2

3

4

-1


실행 결과에서 알 수 있듯이 에러 대신에 값을 발생한 뒤에 시퀀스는 종료된다.


발생한 익셉션이 특정 조건을 충족하는 경우에만 에러 대신에 특정 값을 발생하고 싶다면 Predicate을 인자로 받는 onErrorReturn() 메서드를 사용한다. 익셉션이 특정 타입인 경우에만 에러 대신 특정 값을 발생하고 싶다면 Class 타입을 인자로 받는 onErrorReturn() 메서드를 사용한다.

  • Flux<T> onErrorReturn(Predicate<? super Throwable> predicate, T fallbackValue)
  • <E extends Throwable> Flux<T> onErrorReturn(Class<E> type, T fallbackValue)


에러 발생하면 다른 신호(시퀀스)나 다른 에러로 대체하기: onErrorResume

onErrorResume 메서드를 사용하면 에러가 발생하면 다른 시퀀스나 에러로 대체할 수 있다. onErrorResume 메서드는 다음 타입의 함수를 파라미터로 갖는다.

  • Function<? super Throwable, ? extends Publisher<? extends T>> :
    Throwable을 입력으로 받고 Publisher를 리턴하는 함수
이 함수는 시퀀스에서 발생한 에러를 입력으로 받아 결과로 Publisher를 리턴한다. 즉 에러가 발생하면 이를 다른 데이터 신호로 대체한다. 다음 코드를 보자.

Random random = new Random();
Flux<Integer> seq = Flux.range(1, 10)
        .map(x -> {
            int rand = random.nextInt(8);
            if (rand == 0) throw new IllegalArgumentException("illarg");
            if (rand == 1) throw new IllegalStateException("illstate");
            if (rand == 2) throw new RuntimeException("exception");
            return x;
        })
        .onErrorResume(error -> {
            if (error instanceof IllegalArgumentException) {
                return Flux.just(21, 22);
            }
            if (error instanceof IllegalStateException) {
                return Flux.just(31, 32);
            }
            return Flux.error(error);
        });

seq.subscribe(System.out::println);


이 코드에서 onErrorResume() 메서드에 전달한 함수를 보자. 이 함수는 발생한 에러가 IllegalArgumentException이면 Flux.just(21, 22)로 21, 22 값을 생성하는 시퀀스를 리턴하고, 발생한 에러가 IllegalStateException이면 31, 32 값을 생성하는 시퀀스를 리턴한다. 두 조건에 해당하지 않는 경우 Flux.error() 메서드를 이용해서 익셉션을 다시 재발생시킨다.


map() 함수에서 임의로 익셉션을 발생시키도록 했으므로 실행할 때마다 결과가 달라진다. 다음은 여러 번 실행한 결과 중 하나이다. 이 실행 결과는 3번째 데이터를 map에서 처리하는 과정에서 IllegalArgumentException이 발생했고 onErrorResume()을 통해 에러 대신에 21, 22를 생성하는 Flux로 대체된 것을 보여준다.


1

2

21

22


onErrorReturn과 마찬가지로 onErrorResume도 Predicate이나 Class 타입을 파라미터로 갖는 메서드를 이용해서 처리할 익셉션을 제한할 수 있다.


에러를 다른 에러로 변환하기: onErrorMap

에러를 다른 에러로 변환할 때에는 onErrorMap을 사용한다. 다음은 간단한 사용 예이다.


Flux<Integer> seq = intSeq.onErrorMap(error -> new MyException(...));


onErrorMap도 Predicate이나 Class 타입을 파라미터로 갖는 메서드를 이용해서 변환할 익셉션을 제한할 수 있다.


재시도하기: retry

retry()를 사용하면 에러가 발생했을 구독을 재시도할 수 있다. 다음은 예제 코드이다.


Flux.range(1, 5)

        .map(input -> {

            if (input < 4) return "num " + input;

            throw new RuntimeException("boom");

        })

        .retry(1) // 에러 신호 발생시 1회 재시도

        .subscribe(System.out::println, System.err::println);


위 코드를 실행한 결과는 다음과 같다.


num 1

num 2

num 3

num 1

num 2

num 3

java.lang.RuntimeException: boom


재시도를 하면 원본 시퀀스를 다시 구독한다. 이런 이유로 위 결과는 에러가 처음 발생했을 때 다시 1부터 신호가 발생하고 있다. 두 번째 에러가 발생했을 때에는 재시도를 하지 않으므로 에러 메시지가 출력되는 것을 알 수 있다.


재시도하기: retryWhen

단순 재시도가 아닌 조금 더 복잡한 상황에 따라 재시도를 하고 싶다면 retryWhen을 사용한다. retryWhen 메서드는 사용법이 다소 복잡하다. 먼저 retryWhen 메서드의 파라미터를 보자. 파라미터 타입은 다음과 같다.

  • retryWhen(Function< Flux<Throwable>,  ? extends Publisher<?> > whenFactory)
whenFactory 파라미터는 Function 타입의 함수이다. 이 함수는 입력으로 Flux<Throwable>를 받고 결과로 Publisher를 리턴한다. 여기서 whenFactory의 함수의 입력인 Flux<Throwable>는 시퀀스가 발생하는 익셉션 신호에 해당한다. 재시도 횟수에 따라 익셉션이 여러 번 발생할 수 있는데 Flux<Throwable>이 발생하는 데이터는 바로 여러 번 발생할 수 있는 익셉션에 해당한다.

whenFactory 함수에 전달되는 Flux<Throwable>은 원본 시퀀스의 익셉션과 연관되어 있으므로 이를 컴페니언(companion) Flux라고 부른다.

whenFactory 함수는 재시도 조건에 맞게 변환한 컴페니언 Flux를 리턴한다. 이 변환한 컴페니언 Flux가 재시도 여부를 결정하는데 그 과정은 다음과 같다.
  1. 에러가 발생할 때마다 에러가 컴페니언 Flux로 전달된다.
  2. 컴페니언 Flux가 뭐든 발생하면 재시도가 일어난다.
  3. 컴페니언 Flux가 종료되면 재시도를 하지 않고 원본 시퀀스 역시 종료된다.
  4. 컴페니언 Flux가 에러를 발생하면 재시도를 하지 않고 컴페니언 Flux가 발생한 에러를 전파한다.
위 설명만으로는 감이 잘 안 올 테니 간단한 예를 살펴보자. 먼저 다음은 2번 재시도하는 예제 코드이다.


Flux<Integer> seq = Flux.just(1, 2, 3)

        .map(i -> {

            if (i < 3) return i;

            else throw new IllegalStateException("force");

        })

        .retryWhen(errorsFlux -> errorsFlux.take(2)); // 2개의 데이터 발생


seq.subscribe(

        System.out::println,

        err -> System.err.println("에러 발생: " + err),

        () -> System.out.println("compelte")

);


위 코드에서 retryWhen은 take(2)를 사용해서 2개의 데이터를 발생하는 컴페니언 Flux를 리턴한다. 이 컴페니언 Flux는 2개의 데이터를 발생하고 종료된다. 즉 2-3번 과정에 의해 2번 재시도를 하고 원본 시퀀스를 종료시킨다. 실제 실행 결과는 다음과 같다. 괄호 안의 파란 글씨는 재시도 여부를 표시한 것으로 실제 출력에 포함된 내용은 아니다.


1

2

1    (1번 재시도)

2

1    (2번 재시도)

2

complete


출력 결과에서 눈여겨 볼 점은 에러가 출력되지 않았다는 점이다. 즉 컴페니언 Flux가 complete 신호를 보내면 Subscriber에도 complete 신호가 전달되고 있다. 이 점은 retry()와 다르다. retry()는 최대 재시도 이후에도 에러가 발생하면 해당 에러를 Subscriber에 전달하는데 retryWhen은 컴페니언 Flux가 어떤 데이터를 발생하느냐에 따라 에러를 Subscriber에 전달할지 여부가 달라진다.


다음 코드는 컴페니언 Flux가 에러를 발생하는 예이다.


Flux<Integer> seq = Flux.just(1, 2, 3)

        .map(i -> {

            if (i < 3) return i;

            else throw new IllegalStateException("force");

        })

        .retryWhen(errorsFlux -> errorsFlux.zipWith(Flux.range(1, 3),

                (error, index) -> {

                    if (index < 3) return index;

                    else throw new RuntimeException("companion error"); // 

                })

        );


seq.subscribe(

        System.out::println,

        err -> System.err.println("에러 발생: " + err),

        () -> System.out.println("compelte")

);


위 코드에서 retryWhen이 생성하는 컴페니언 Flux는 에러가 세 번째 발생하면 RuntimeException을 발생한다. 즉 두 번째 에러까지는 데이터 신호를 발생하고 세 번째 에러에 에러 신호를 발생한다. 따라서 두 번 재시도를 한다. 실행 결과는 다음과 같다.


1

2

1

2

1

2

에러 발생: java.lang.RuntimeException: companion error


실행 결과를 보면 두 번 재시도 후에 에러 신호를 받은 것을 알 수 있다.


관련 글


Posted by 최범균 madvirus

댓글을 달아 주세요

시퀀스 변환

이 글에서는 시퀀스가 발생하는 데이터를 변환하는 몇 가지 방법을 살펴본다.


1-1 변환: map

첫 번째는 map()이다. map() 한 개의 데이터를 1-1 방식으로 변환해준다. 자바 스트림의 map()과 유사하다. 다음 코드는 map()의 예를 보여준다.


Flux.just("a", "bc", "def", "wxyz")

        .map(str -> str.length()) // 문자열을 Integer 값으로 1-1 변환

        .subscribe(len -> System.out.println(len));


1-n 변환: flatMap

flatMap은 1개의 데이터로부터 시퀀스를 생성할 때 사용한다. 즉 1-n 방식의 변환을 처리한다. 다음은 간단한 flatMap() 사용 예이다.


Flux<Integer> seq = Flux.just(1, 2, 3)

             .flatMap(i -> Flux.range(1, i)) // Integer를 Flux<Integer>로 1-N 변환


seq.subscribe(System.out::println);


flatMap()에 전달한 함수를 보면 Integer 값을 받아서 다시 1부터 해당 값 개수만큼의 숫자를 생성하는 Flux를 생성한다. 위 코드를 보면 다음과 같은 변환이 발생한다.

  • 1 -> Flux.range(1, 1) : [1] 생성
  • 2 -> Flux.range(1, 2) : [1, 2] 생성
  • 3 -> Flux.range(1, 3) : [1, 2, 3] 생성
flatMap에 전달한 함수가 생성하는 각 Flux는 하나의 시퀀스처럼 연결된다. 그래서 flatMap()의 결과로 생성되는 Flux의 타입이 Flux<Flux<Integer>>가 아니라 Flux<Integer>이다.

실제 실행 결과는 다음과 같다.

1
1
2
1
2
3


걸러내기: filter

filter()를 이용해서 시퀀스가 생성한 데이터를 걸러낼 수 있다. filter()에 전달한 함수의 결과가 true인 데이터만 전달하고 false인 데이터는 발생하지 않는다. 다음은 1부터 10 사이의 숫자 중에서 2로 나눠 나머지가 0인 (즉 짝수인) 숫자만 걸러내는 예를 보여준다.


Flux.range(1, 10)

        .filter(num -> num % 2 == 0)

        .subscribe(x -> System.out.print(x + " -> "));


실행 결과는 다음과 같다.


2 -> 4 -> 6 -> 8 -> 10 -> 


빈 시퀀스인 경우 기본 값 사용하기: defaultIfEmpty

시퀀스에 데이터가 없을 때 특정 값을 기본으로 사용하고 싶다면 defaultIfEmpty() 메서드를 사용하면 된다. Mono와 Flux 모두 defaultIfEmpty()를 제공한다. 아래 코드는 사용 예이다.


Flux<Item> popularItems = getPopularItems();

Flux<Item> recItems = popularItems.defaultIfEmpty(featureItem);


getPopularItems() 메서드가 인기 상품 목록을 제공한다고 하자. 이 코드는 인기 상품 목록을 제공하는 시퀀스인 popularItems가 데이터가 없는 빈 시퀀스면 featureItem을 값으로 제공하는 Flux로 변환한다. 즉 인기 상품이 있으면 그 상품 목록을 제공하고 그렇지 않으면 미리 지정한 featureItem을 값으로 제공하는 Flux를 생성한다.


위 코드를 조금 더 간결하게 표현하면 다음과 같다.


Flux<Item> recItems = getPopularItems().defaultIfEmpty(featureItem);


빈 시퀀스인 경우 다른 시퀀스 사용하기: switchIfEmpty

시퀀스에 값이 없을 때 다른 시퀀스를 사용하고 싶다면 switchIfEmpty() 메서드를 사용한다.


// public Flux<Item> getPopularItems() { ... }

// public Flux<Item> getFeatureItems() { ... }


Flux<Item> recItems = 

    getPopularItems().switchIfEmpty(getFeatureItems());



특정 값으로 시작하는 시퀀스로 변환: startWith

특정 값으로 시작하도록 시퀀스를 설정하고 싶다면 startWith(T ...) 메서드나 startWith(시퀀스) 메서드를 사용한다.


Flux<Integer> seq1 = Flux.just(1, 2, 3);


Flux<Integer> seq2 = seq1.startWith(-1, 0);

seq2.subscribe(System.out::println);


이 코드에서 seq1은 1, 2, 3을 생성하는 시퀀스인데 startWith(-1, 0)을 사용해서 -1, 0으로 시작하는 시퀀스로 변환했다. 따라서 seq2가 생성하는 데이터는 -1, 0, 1, 2, 3이 된다.


특정 값으로 끝나는 시퀀스로 변환: concatWithValues

시퀀스가 특정 값으로 끝나도록 변환하고 싶다면 concatWithValues(T ...) 메서드를 사용한다.


Flux<Integer> seq = someSeq.concatWithValues(100);

seq.subscribe(System.out::println);


위 코드는 someSeq가 어떤 값을 생성하는지에 상관없이 seq는 가장 마지막에 100을 생성한다.


시퀀스 순서대로 연결: cancatWith

concatWith() 메서드를 사용하면 여러 시퀀스를 순서대로 연결할 수 있다.


Flux<Integer> seq1 = Flux.just(1, 2, 3);

Flux<Integer> seq2 = Flux.just(4, 5, 6);

Flux<Integer> seq3 = Flux.just(7, 8, 9);


seq1.concatWith(seq2).concatWith(seq3).subscribe(System.out::println);


위 코드에서 seq1, seq2, seq3을 차례대로 연결하고 구독을 시작했다. 실행 결과는 1부터 9까지 정수를 출력한다.


concatWith로 연결한 시퀀스는 이전 시퀀스가 종료된 뒤에 구독을 시작한다. 위 예에서는 seq1이 종료된 뒤에 seq2 구독을 시작하고 seq2가 종료된 뒤에 seq3 구독을 시작한다.


시퀀스 발생 순서대로 섞기: mergeWith

시퀀스의 연결 순서가 아니라 시퀀스가 발생하는 데이터 순서대로 섞고 싶다면 mergeWith()를 사용한다. 다음은 예이다.


Flux<String> tick1 = Flux.interval(Duration.ofSeconds(1)).map(tick -> tick + "초틱");

Flux<String> tick2 = Flux.interval(Duration.ofMillis(700)).map(tick -> tick + "밀리초틱");

tick1.mergeWith(tick2).subscribe(System.out::println);


위 코드에서 tick1은 1초 간격으로 데이터를 발생하고 tick2는 700 밀리초 단위로 데이터를 발생한다. 이 두 시퀀스를 mergeWith()로 섞은 뒤 구독하면 두 시퀀스를 동시에 구독한다. 실행 결과는 다음과 같다.


0밀리초틱

0초틱

1밀리초틱

1초틱

2밀리초틱

3밀리초틱

2초틱

4밀리초틱

3초틱

5밀리초틱

...


다음은 실행 결과를 발생 시점 기준으로 그림으로 표시한 것이다. 1초 간격으로 데이터를 발생하는 seq1과 0.7초 간격으로 데이터를 발생하는 seq2를 mergeWith()로 섞을 때 데이터 발생 순서를 알 수 있다.





시퀀스 묶기: zipWith

zipWith()를 사용하면 두 시퀀스의 값을 묶은 값 쌍을 생성하는 시퀀스를 생성할 수 있다. 다음은 사용 예이다.


Flux<String> tick1 = Flux.interval(Duration.ofSeconds(1)).map(tick -> tick + "초틱");

Flux<String> tick2 = Flux.interval(Duration.ofMillis(700)).map(tick -> tick + "밀리초틱");

tick1.zipWith(tick2).subscribe(tup -> System.out.println(tup));


zipWith()는 개수를 맞춰서 두 시퀀스의 데이터를 묶는다. 따라서 위 코드는 아래 그림과 같이 쌍을 묶는다.


zipWith()는 리액터에 포함된 Tuple2 타입을 이용해서 두 값을 쌍으로 묶는다. 위 코드의 실행 결과는 아래와 같다.

[0초틱,0밀리초틱]
[1초틱,1밀리초틱]
[2초틱,2밀리초틱]
[3초틱,3밀리초틱]
[4초틱,4밀리초틱]

시퀀스 묶기: combineLatest


Flux.combineLatest() 메서드로 시퀀스를 묶을 수도 있다. 이 메서드 정적 메서드이다. 발생한 개수를 맞춰서 쌍을 만드는 zipWith()와 달리 combineLatest()는 가장 최근의 데이터를 쌍으로 만든다. 다음은 그 차이를 보여준다.



다음은 예제 코드이다.


Flux<String> tick1 = Flux.interval(Duration.ofSeconds(1)).map(tick -> tick + "초틱");

Flux<String> tick2 = Flux.interval(Duration.ofMillis(700)).map(tick -> tick + "밀리초틱");

Flux.combineLatest(tick1, tick2, (a, b) -> a + "\n" + b).subscribe(System.out::println);


지정한 개수/시간에 해당하는 데이터만 유지: take, takeLast

시퀀스에서 처음 n개의 데이터만 유지하고 싶다면 take(long) 메서드를 사용한다. 비슷하게 지정한 시간 동안 발생한 데이터만 유지하고 싶다면 take(Duration) 메서드를 사용한다. 마지막 n개의 데이터만 유지하고 싶다면 takeLast(long) 메서드를 사용한다.

Flux<Integer> seq1 = someSeq.take(10); // 최초 10개 데이터만 유지
Flux<Integer> seq2 = someSeq.take(Duration.ofSeconds(10)); // 최초 10초 동안 데이터 유지
Flux<Integer> seq3 = someSeq.takeLast(5); // 마지막 5개 데이터만 유지

이 외에 takeWhile()과 takeUntil()도 있다. 이 두 메서드는 Predicate을 인자로 받는다. takeWhile()은 Predicate 구현이 true를 리턴하는 동안 데이터를 포함하고 takeUntil()은 처음 true를 리턴할 때까지 데이터를 포함한다.

지정한 개수/시간만큼 데이터 거르기: skip, skipLast

시퀀스에서 처음 n개의 데이터를 거르고 싶다면 skip(long) 메서드를 사용한다. 지정한 처음 시간 동안 발생한 데이터를 거르고 싶다면 skip(Duration) 메서드를 사용한다. 마지막 n개의 데이터를 거르고 싶다면 skipLast(long) 메서드를 사용한다.

take()와 비슷하게 skipWhile()과 skipUntil()을 지원한다.


관련 글


Posted by 최범균 madvirus

댓글을 달아 주세요

시퀀스 생성 2: Flux.create(), Flux.fromStream()

이전 글(스프링 리액터 시작하기 2 - 시퀀스 생성 just, generate)에서 살펴본 Flux.generate()는 Subscriber로부터 요청이 있을 때에 next 신호를 발생하는 Flux를 생성한다. 즉 pull 방식의 Flux를 생성한다. 이는 단순하지만 데이터 발생을 비동기나 push 방식으로 할 수 없다는 제약도 있다. Flux.create()를 사용하면 이런 제약 없이 비동기나 push 방식으로 데이터를 발생할 수 있다.


Flux.create()를 이용한 pull 방식 메시지 생성

먼저 Flux.create() 메서드를 이용해서 pull 방식으로 메시지를 생성하는 방법을 살펴보자. 다음은 예제 코드이다.


Flux<Integer> flux = Flux.create( (FluxSink<Integer> sink) -> {

    sink.onRequest(request -> { // request는 Subscriber가 요청한 데이터 개수

        for (int i = 1; i <= request; i++) {

            sink.next(i); // Flux.generate()의 경우와 달리 한 번에 한 개 이상의 next() 신호 발생 가능

        }

    });

});


위 코드에서 Flux.create() 메서드의 파라머티는 함수형 타입 Consumer<? super FluxSink<T>>이다. 이 Consumer는 FluxSink를 이용해서 Subscriber에 신호를 발생할 수 있다.


FluxSink#onRequest(LongConsumer) 메서드의 Consumer는 Subscriber가 데이터를 요청했을 때 불린다. 이때 LongConsumer는 Subscriber가 요청한 데이터 개수를 전달받는다. 위 코드에서는 클라이언트가 요청한 데이터 개수만큼 next 신호를 발생하고 있다.


Flux.generate()와의 차이점은 Flux.generate()의 경우 한 번에 한 개의 next 신호만 발생할 수 있었던 데 비해 Flux.create()는 한 번에 한 개 이상의 next() 신호를 발생할 수 있다는 점이다.


Flux.create()를 이용한 push 방식 메시지  생성

Flux.create()를 이용하면 Subscriber의 요청과 상관없이 비동기로 데이터를 발생할 수 있다. 다음 코드를 보자.


DataPump pump = new DataPump();


Flux<Integer> bridge = Flux.create((FluxSink<Integer> sink) -> {

    pump.setListener(new DataListener<Integer>() {

        @Override

        public void onData(List<Integer> chunk) {

            chunk.forEach(s -> {

                sink.next(s); // Subscriber의 요청에 상관없이 신호 발생

            });

        }

        @Override

        public void complete() {

            logger.info("complete");

            sink.complete();

        }

    });

});


이 코드에서 DataPump는 데이터를 어딘가에서 데이터가 오면 setListener()로 등록한 DataListener의 onData()를 실행한다고 가정하자. DataListener#onData() 메서드는 FluxSink#next()를 이용해서 데이터를 발생한다. DataListener#onData() 메서드는 Subscriber의 데이터 요청과 상관없이 호출된다. 즉 위 코드는 Subscriber의 요청과 상관없이 데이터를 push한다.


Flux.create()와 배압

Subscriber로부터 요청이 왔을 때(FluxSink#onRequest) 데이터를 전송하거나(pull 방식) Subscriber의 요청에 상관없이 데이터를 전송하거나(push 방식) 두 방식 모두 Subscriber가 요청한 개수보다 더 많은 데이터를 발생할 수 있다. 예를 들어 아래 코드를 보자.


Flux<Integer> flux = Flux.create( (FluxSink<Integer> sink) -> {

    sink.onRequest(request -> {

        for (int i = 1; i <= request + 3 ; i++) { // Subscriber가 요청한 것보다 3개 더 발생

            sink.next(i);

        }

    });

});


이 코드는 Subscriber가 요청한 개수보다 3개 데이터를 더 발생한다. 이 경우 어떻게 될까? 기본적으로 Flux.create()로 생성한 Flux는 초과로 발생한 데이터를 버퍼에 보관한다. 버퍼에 보관된 데이터는 다음에 Subscriber가 데이터를 요청할 때 전달된다.


요청보다 발생한 데이터가 많을 때 선택할 수 있는 처리 방식은 다음과 같다.

  • IGNORE : Subscriber의 요청 무시하고 발생(Subscriber의 큐가 다 차면 IllegalStateException 발생)
  • ERROR : 익셉션(IllegalStateException) 발생
  • DROP : Subscriber가 데이터를 받을 준비가 안 되어 있으면 데이터 발생 누락
  • LATEST : 마지막 신호만 Subscriber에 전달
  • BUFFER : 버퍼에 저장했다가 Subscriber 요청시 전달. 버퍼 제한이 없으므로 OutOfMemoryError 발생 가능

Flux.create()의 두 번째 인자로 처리 방식을 전달하면 된다.


Flux.create(sink -> { ... }, FluxSink.OverflowStrategy.IGNORE);


Flux.fromStream(), Flux.fromIterable()을 이용한 Flux 생성

Flux.stream()을 사용하면 자바 8의 Stram에서 Flux를 생성할 수 있다. 다음은 예이다.


Stream<String> straem = Files.lines(Paths.get(filePath));

Flux<String> seq = Flux.fromStream(straem);

seq.subscribe(System.out::println);


Flux.fromIterable()을 이용하면 Iterable을 이용해서 Flux를 생성할 수 있다. List나 Set과 같은 콜렉션에서 Flux를 생성하고 싶을 때 이 메서드를 사용하면 된다.


관련 글





Posted by 최범균 madvirus

댓글을 달아 주세요