주요글: 도커 시작하기
반응형

예전에 신림프로그래머 페이스북 그룹에서 진행한 "코틀린 인 액션" 책 스터디 정리한 자료


kia-ch02.pdf

kia-ch03.pdf

kia-ch04.pdf

kia-ch05.pdf

kia-ch06.pdf

kia-ch07.pdf

kia-ch08.pdf



반응형

에러 처리

시퀀스는 데이터를 발생하는 과정에서 에러를 발생할 수 있다. 리액터는 에러를 처리하는 여러 방법을 제공하는데 이 글에서는 레퍼런스 문서에서 언급하는 에러 처리 방법을 차례대로 살펴볼 것이다.


참고로 에러 신호는 종료 신호이다. 따라서 에러 신호가 발생하면 시퀀스는 종료되고 더 이상 데이터를 발생하지 않는다.


에러 신호 처리

에러 신호가 발생하면 Subscriber의 onError 메서드가 호출된다. 이 메서드를 구현한 Subscriber를 이용해서 구독을 하면 에러 신호를 알맞게 처리할 수 있다. 또한 에러 처리를 위한 Consumer를 파라미터로 갖는 subscribe() 메서드를 사용해서 익셉션을 처리할 수 있다. 다음 코드는 에러 처리를 위한 Consumer를 파라미터로 갖는 subscribe() 메서드의 사용 예를 보여준다.


Flux.range(1, 10)

        .map(x -> {

            if (x == 5) throw new RuntimeException("exception"); // 에러 발생

            else return x;

        })

        .subscribe(

                i -> System.out.println(i), // next 신호 처리

                ex -> System.err.println(ex.getMessage()), // error 신호 처리

                () -> System.out.println("complete") // complete 신호 처리

        );


위 코드는 1부터 10개의 값을 발생하는데 값이 5이면 익셉션을 발생하는 시퀀스를 생성한다. subscribe() 메서드는 3개의 인자를 갖는데 차례대로 next, error, complete 신호를 처리한다. 실행 결과는 다음과 같다.


1

2

3

4

exception


에러 신호는 종료 신호이므로 익셉션 발생 이후에 더 이상 next 신호가 발생하지 않는 것을 확인할 수 있다.


에러 신호를 처리하기 위한 subscribe() 메서드는 다음과 같다.

  • subscribe(Consumer<? super T> consumer,
                    Consumer<? super Throwable> errorConsumer)
  • subscribe(Consumer<? super T> consumer,
                    Consumer<? super Throwable> errorConsumer,
                    Runnable completeConsumer)
  • subscribe(Consumer<? super T> consumer,
                    Consumer<? super Throwable> errorConsumer,
                    Runnable completeConsumer,
                    Consumer<? super Subscription> subscriptionConsumer)
  • subscribe(Subscriber<? super T> subscriber)


에러 발생하면 기본 값 사용하기: onErrorReturn

에러가 발생할 때 에러 대신에 특정 값을 발생하고 싶다면 onErrorReturn() 메서드를 사용한다. 이 메서드의 사용 예는 다음과 같다.


Flux<Integer> seq = Flux.range(1, 10)

        .map(x -> {

            if (x == 5) throw new RuntimeException("exception");

            else return x;

        })

        .onErrorReturn(-1);


seq.subscribe(System.out::println);


위 코드를 실행한 결과는 다음과 같다.


1

2

3

4

-1


실행 결과에서 알 수 있듯이 에러 대신에 값을 발생한 뒤에 시퀀스는 종료된다.


발생한 익셉션이 특정 조건을 충족하는 경우에만 에러 대신에 특정 값을 발생하고 싶다면 Predicate을 인자로 받는 onErrorReturn() 메서드를 사용한다. 익셉션이 특정 타입인 경우에만 에러 대신 특정 값을 발생하고 싶다면 Class 타입을 인자로 받는 onErrorReturn() 메서드를 사용한다.

  • Flux<T> onErrorReturn(Predicate<? super Throwable> predicate, T fallbackValue)
  • <E extends Throwable> Flux<T> onErrorReturn(Class<E> type, T fallbackValue)


에러 발생하면 다른 신호(시퀀스)나 다른 에러로 대체하기: onErrorResume

onErrorResume 메서드를 사용하면 에러가 발생하면 다른 시퀀스나 에러로 대체할 수 있다. onErrorResume 메서드는 다음 타입의 함수를 파라미터로 갖는다.

  • Function<? super Throwable, ? extends Publisher<? extends T>> :
    Throwable을 입력으로 받고 Publisher를 리턴하는 함수
이 함수는 시퀀스에서 발생한 에러를 입력으로 받아 결과로 Publisher를 리턴한다. 즉 에러가 발생하면 이를 다른 데이터 신호로 대체한다. 다음 코드를 보자.

Random random = new Random();
Flux<Integer> seq = Flux.range(1, 10)
        .map(x -> {
            int rand = random.nextInt(8);
            if (rand == 0) throw new IllegalArgumentException("illarg");
            if (rand == 1) throw new IllegalStateException("illstate");
            if (rand == 2) throw new RuntimeException("exception");
            return x;
        })
        .onErrorResume(error -> {
            if (error instanceof IllegalArgumentException) {
                return Flux.just(21, 22);
            }
            if (error instanceof IllegalStateException) {
                return Flux.just(31, 32);
            }
            return Flux.error(error);
        });

seq.subscribe(System.out::println);


이 코드에서 onErrorResume() 메서드에 전달한 함수를 보자. 이 함수는 발생한 에러가 IllegalArgumentException이면 Flux.just(21, 22)로 21, 22 값을 생성하는 시퀀스를 리턴하고, 발생한 에러가 IllegalStateException이면 31, 32 값을 생성하는 시퀀스를 리턴한다. 두 조건에 해당하지 않는 경우 Flux.error() 메서드를 이용해서 익셉션을 다시 재발생시킨다.


map() 함수에서 임의로 익셉션을 발생시키도록 했으므로 실행할 때마다 결과가 달라진다. 다음은 여러 번 실행한 결과 중 하나이다. 이 실행 결과는 3번째 데이터를 map에서 처리하는 과정에서 IllegalArgumentException이 발생했고 onErrorResume()을 통해 에러 대신에 21, 22를 생성하는 Flux로 대체된 것을 보여준다.


1

2

21

22


onErrorReturn과 마찬가지로 onErrorResume도 Predicate이나 Class 타입을 파라미터로 갖는 메서드를 이용해서 처리할 익셉션을 제한할 수 있다.


에러를 다른 에러로 변환하기: onErrorMap

에러를 다른 에러로 변환할 때에는 onErrorMap을 사용한다. 다음은 간단한 사용 예이다.


Flux<Integer> seq = intSeq.onErrorMap(error -> new MyException(...));


onErrorMap도 Predicate이나 Class 타입을 파라미터로 갖는 메서드를 이용해서 변환할 익셉션을 제한할 수 있다.


재시도하기: retry

retry()를 사용하면 에러가 발생했을 구독을 재시도할 수 있다. 다음은 예제 코드이다.


Flux.range(1, 5)

        .map(input -> {

            if (input < 4) return "num " + input;

            throw new RuntimeException("boom");

        })

        .retry(1) // 에러 신호 발생시 1회 재시도

        .subscribe(System.out::println, System.err::println);


위 코드를 실행한 결과는 다음과 같다.


num 1

num 2

num 3

num 1

num 2

num 3

java.lang.RuntimeException: boom


재시도를 하면 원본 시퀀스를 다시 구독한다. 이런 이유로 위 결과는 에러가 처음 발생했을 때 다시 1부터 신호가 발생하고 있다. 두 번째 에러가 발생했을 때에는 재시도를 하지 않으므로 에러 메시지가 출력되는 것을 알 수 있다.


재시도하기: retryWhen

단순 재시도가 아닌 조금 더 복잡한 상황에 따라 재시도를 하고 싶다면 retryWhen을 사용한다. retryWhen 메서드는 사용법이 다소 복잡하다. 먼저 retryWhen 메서드의 파라미터를 보자. 파라미터 타입은 다음과 같다.

  • retryWhen(Function< Flux<Throwable>,  ? extends Publisher<?> > whenFactory)
whenFactory 파라미터는 Function 타입의 함수이다. 이 함수는 입력으로 Flux<Throwable>를 받고 결과로 Publisher를 리턴한다. 여기서 whenFactory의 함수의 입력인 Flux<Throwable>는 시퀀스가 발생하는 익셉션 신호에 해당한다. 재시도 횟수에 따라 익셉션이 여러 번 발생할 수 있는데 Flux<Throwable>이 발생하는 데이터는 바로 여러 번 발생할 수 있는 익셉션에 해당한다.

whenFactory 함수에 전달되는 Flux<Throwable>은 원본 시퀀스의 익셉션과 연관되어 있으므로 이를 컴페니언(companion) Flux라고 부른다.

whenFactory 함수는 재시도 조건에 맞게 변환한 컴페니언 Flux를 리턴한다. 이 변환한 컴페니언 Flux가 재시도 여부를 결정하는데 그 과정은 다음과 같다.
  1. 에러가 발생할 때마다 에러가 컴페니언 Flux로 전달된다.
  2. 컴페니언 Flux가 뭐든 발생하면 재시도가 일어난다.
  3. 컴페니언 Flux가 종료되면 재시도를 하지 않고 원본 시퀀스 역시 종료된다.
  4. 컴페니언 Flux가 에러를 발생하면 재시도를 하지 않고 컴페니언 Flux가 발생한 에러를 전파한다.
위 설명만으로는 감이 잘 안 올 테니 간단한 예를 살펴보자. 먼저 다음은 2번 재시도하는 예제 코드이다.


Flux<Integer> seq = Flux.just(1, 2, 3)

        .map(i -> {

            if (i < 3) return i;

            else throw new IllegalStateException("force");

        })

        .retryWhen(errorsFlux -> errorsFlux.take(2)); // 2개의 데이터 발생


seq.subscribe(

        System.out::println,

        err -> System.err.println("에러 발생: " + err),

        () -> System.out.println("compelte")

);


위 코드에서 retryWhen은 take(2)를 사용해서 2개의 데이터를 발생하는 컴페니언 Flux를 리턴한다. 이 컴페니언 Flux는 2개의 데이터를 발생하고 종료된다. 즉 2-3번 과정에 의해 2번 재시도를 하고 원본 시퀀스를 종료시킨다. 실제 실행 결과는 다음과 같다. 괄호 안의 파란 글씨는 재시도 여부를 표시한 것으로 실제 출력에 포함된 내용은 아니다.


1

2

1    (1번 재시도)

2

1    (2번 재시도)

2

complete


출력 결과에서 눈여겨 볼 점은 에러가 출력되지 않았다는 점이다. 즉 컴페니언 Flux가 complete 신호를 보내면 Subscriber에도 complete 신호가 전달되고 있다. 이 점은 retry()와 다르다. retry()는 최대 재시도 이후에도 에러가 발생하면 해당 에러를 Subscriber에 전달하는데 retryWhen은 컴페니언 Flux가 어떤 데이터를 발생하느냐에 따라 에러를 Subscriber에 전달할지 여부가 달라진다.


다음 코드는 컴페니언 Flux가 에러를 발생하는 예이다.


Flux<Integer> seq = Flux.just(1, 2, 3)

        .map(i -> {

            if (i < 3) return i;

            else throw new IllegalStateException("force");

        })

        .retryWhen(errorsFlux -> errorsFlux.zipWith(Flux.range(1, 3),

                (error, index) -> {

                    if (index < 3) return index;

                    else throw new RuntimeException("companion error"); // 

                })

        );


seq.subscribe(

        System.out::println,

        err -> System.err.println("에러 발생: " + err),

        () -> System.out.println("compelte")

);


위 코드에서 retryWhen이 생성하는 컴페니언 Flux는 에러가 세 번째 발생하면 RuntimeException을 발생한다. 즉 두 번째 에러까지는 데이터 신호를 발생하고 세 번째 에러에 에러 신호를 발생한다. 따라서 두 번 재시도를 한다. 실행 결과는 다음과 같다.


1

2

1

2

1

2

에러 발생: java.lang.RuntimeException: companion error


실행 결과를 보면 두 번 재시도 후에 에러 신호를 받은 것을 알 수 있다.


관련 글


  1. sangpire 2020.02.02 22:16 신고

    글 잘 읽었습니다.

    아주 사소한 오탈자가 보여 제보 드려요.
    바로 '에러 신호 처리' 절 두번째 줄 'Consumber' 가 혹시 'Consumer' 가 아닐까 싶은데 맞았으면 좋겠네요.

    다시한번 좋은 글 감사합니다.

반응형

시퀀스 변환

이 글에서는 시퀀스가 발생하는 데이터를 변환하는 몇 가지 방법을 살펴본다.


1-1 변환: map

첫 번째는 map()이다. map() 한 개의 데이터를 1-1 방식으로 변환해준다. 자바 스트림의 map()과 유사하다. 다음 코드는 map()의 예를 보여준다.


Flux.just("a", "bc", "def", "wxyz")

        .map(str -> str.length()) // 문자열을 Integer 값으로 1-1 변환

        .subscribe(len -> System.out.println(len));


1-n 변환: flatMap

flatMap은 1개의 데이터로부터 시퀀스를 생성할 때 사용한다. 즉 1-n 방식의 변환을 처리한다. 다음은 간단한 flatMap() 사용 예이다.


Flux<Integer> seq = Flux.just(1, 2, 3)

             .flatMap(i -> Flux.range(1, i)) // Integer를 Flux<Integer>로 1-N 변환


seq.subscribe(System.out::println);


flatMap()에 전달한 함수를 보면 Integer 값을 받아서 다시 1부터 해당 값 개수만큼의 숫자를 생성하는 Flux를 생성한다. 위 코드를 보면 다음과 같은 변환이 발생한다.

  • 1 -> Flux.range(1, 1) : [1] 생성
  • 2 -> Flux.range(1, 2) : [1, 2] 생성
  • 3 -> Flux.range(1, 3) : [1, 2, 3] 생성
flatMap에 전달한 함수가 생성하는 각 Flux는 하나의 시퀀스처럼 연결된다. 그래서 flatMap()의 결과로 생성되는 Flux의 타입이 Flux<Flux<Integer>>가 아니라 Flux<Integer>이다.

실제 실행 결과는 다음과 같다.

1
1
2
1
2
3


걸러내기: filter

filter()를 이용해서 시퀀스가 생성한 데이터를 걸러낼 수 있다. filter()에 전달한 함수의 결과가 true인 데이터만 전달하고 false인 데이터는 발생하지 않는다. 다음은 1부터 10 사이의 숫자 중에서 2로 나눠 나머지가 0인 (즉 짝수인) 숫자만 걸러내는 예를 보여준다.


Flux.range(1, 10)

        .filter(num -> num % 2 == 0)

        .subscribe(x -> System.out.print(x + " -> "));


실행 결과는 다음과 같다.


2 -> 4 -> 6 -> 8 -> 10 -> 


빈 시퀀스인 경우 기본 값 사용하기: defaultIfEmpty

시퀀스에 데이터가 없을 때 특정 값을 기본으로 사용하고 싶다면 defaultIfEmpty() 메서드를 사용하면 된다. Mono와 Flux 모두 defaultIfEmpty()를 제공한다. 아래 코드는 사용 예이다.


Flux<Item> popularItems = getPopularItems();

Flux<Item> recItems = popularItems.defaultIfEmpty(featureItem);


getPopularItems() 메서드가 인기 상품 목록을 제공한다고 하자. 이 코드는 인기 상품 목록을 제공하는 시퀀스인 popularItems가 데이터가 없는 빈 시퀀스면 featureItem을 값으로 제공하는 Flux로 변환한다. 즉 인기 상품이 있으면 그 상품 목록을 제공하고 그렇지 않으면 미리 지정한 featureItem을 값으로 제공하는 Flux를 생성한다.


위 코드를 조금 더 간결하게 표현하면 다음과 같다.


Flux<Item> recItems = getPopularItems().defaultIfEmpty(featureItem);


빈 시퀀스인 경우 다른 시퀀스 사용하기: switchIfEmpty

시퀀스에 값이 없을 때 다른 시퀀스를 사용하고 싶다면 switchIfEmpty() 메서드를 사용한다.


// public Flux<Item> getPopularItems() { ... }

// public Flux<Item> getFeatureItems() { ... }


Flux<Item> recItems = 

    getPopularItems().switchIfEmpty(getFeatureItems());



특정 값으로 시작하는 시퀀스로 변환: startWith

특정 값으로 시작하도록 시퀀스를 설정하고 싶다면 startWith(T ...) 메서드나 startWith(시퀀스) 메서드를 사용한다.


Flux<Integer> seq1 = Flux.just(1, 2, 3);


Flux<Integer> seq2 = seq1.startWith(-1, 0);

seq2.subscribe(System.out::println);


이 코드에서 seq1은 1, 2, 3을 생성하는 시퀀스인데 startWith(-1, 0)을 사용해서 -1, 0으로 시작하는 시퀀스로 변환했다. 따라서 seq2가 생성하는 데이터는 -1, 0, 1, 2, 3이 된다.


특정 값으로 끝나는 시퀀스로 변환: concatWithValues

시퀀스가 특정 값으로 끝나도록 변환하고 싶다면 concatWithValues(T ...) 메서드를 사용한다.


Flux<Integer> seq = someSeq.concatWithValues(100);

seq.subscribe(System.out::println);


위 코드는 someSeq가 어떤 값을 생성하는지에 상관없이 seq는 가장 마지막에 100을 생성한다.


시퀀스 순서대로 연결: cancatWith

concatWith() 메서드를 사용하면 여러 시퀀스를 순서대로 연결할 수 있다.


Flux<Integer> seq1 = Flux.just(1, 2, 3);

Flux<Integer> seq2 = Flux.just(4, 5, 6);

Flux<Integer> seq3 = Flux.just(7, 8, 9);


seq1.concatWith(seq2).concatWith(seq3).subscribe(System.out::println);


위 코드에서 seq1, seq2, seq3을 차례대로 연결하고 구독을 시작했다. 실행 결과는 1부터 9까지 정수를 출력한다.


concatWith로 연결한 시퀀스는 이전 시퀀스가 종료된 뒤에 구독을 시작한다. 위 예에서는 seq1이 종료된 뒤에 seq2 구독을 시작하고 seq2가 종료된 뒤에 seq3 구독을 시작한다.


시퀀스 발생 순서대로 섞기: mergeWith

시퀀스의 연결 순서가 아니라 시퀀스가 발생하는 데이터 순서대로 섞고 싶다면 mergeWith()를 사용한다. 다음은 예이다.


Flux<String> tick1 = Flux.interval(Duration.ofSeconds(1)).map(tick -> tick + "초틱");

Flux<String> tick2 = Flux.interval(Duration.ofMillis(700)).map(tick -> tick + "밀리초틱");

tick1.mergeWith(tick2).subscribe(System.out::println);


위 코드에서 tick1은 1초 간격으로 데이터를 발생하고 tick2는 700 밀리초 단위로 데이터를 발생한다. 이 두 시퀀스를 mergeWith()로 섞은 뒤 구독하면 두 시퀀스를 동시에 구독한다. 실행 결과는 다음과 같다.


0밀리초틱

0초틱

1밀리초틱

1초틱

2밀리초틱

3밀리초틱

2초틱

4밀리초틱

3초틱

5밀리초틱

...


다음은 실행 결과를 발생 시점 기준으로 그림으로 표시한 것이다. 1초 간격으로 데이터를 발생하는 seq1과 0.7초 간격으로 데이터를 발생하는 seq2를 mergeWith()로 섞을 때 데이터 발생 순서를 알 수 있다.





시퀀스 묶기: zipWith

zipWith()를 사용하면 두 시퀀스의 값을 묶은 값 쌍을 생성하는 시퀀스를 생성할 수 있다. 다음은 사용 예이다.


Flux<String> tick1 = Flux.interval(Duration.ofSeconds(1)).map(tick -> tick + "초틱");

Flux<String> tick2 = Flux.interval(Duration.ofMillis(700)).map(tick -> tick + "밀리초틱");

tick1.zipWith(tick2).subscribe(tup -> System.out.println(tup));


zipWith()는 개수를 맞춰서 두 시퀀스의 데이터를 묶는다. 따라서 위 코드는 아래 그림과 같이 쌍을 묶는다.


zipWith()는 리액터에 포함된 Tuple2 타입을 이용해서 두 값을 쌍으로 묶는다. 위 코드의 실행 결과는 아래와 같다.

[0초틱,0밀리초틱]
[1초틱,1밀리초틱]
[2초틱,2밀리초틱]
[3초틱,3밀리초틱]
[4초틱,4밀리초틱]

시퀀스 묶기: combineLatest


Flux.combineLatest() 메서드로 시퀀스를 묶을 수도 있다. 이 메서드 정적 메서드이다. 발생한 개수를 맞춰서 쌍을 만드는 zipWith()와 달리 combineLatest()는 가장 최근의 데이터를 쌍으로 만든다. 다음은 그 차이를 보여준다.



다음은 예제 코드이다.


Flux<String> tick1 = Flux.interval(Duration.ofSeconds(1)).map(tick -> tick + "초틱");

Flux<String> tick2 = Flux.interval(Duration.ofMillis(700)).map(tick -> tick + "밀리초틱");

Flux.combineLatest(tick1, tick2, (a, b) -> a + "\n" + b).subscribe(System.out::println);


지정한 개수/시간에 해당하는 데이터만 유지: take, takeLast

시퀀스에서 처음 n개의 데이터만 유지하고 싶다면 take(long) 메서드를 사용한다. 비슷하게 지정한 시간 동안 발생한 데이터만 유지하고 싶다면 take(Duration) 메서드를 사용한다. 마지막 n개의 데이터만 유지하고 싶다면 takeLast(long) 메서드를 사용한다.

Flux<Integer> seq1 = someSeq.take(10); // 최초 10개 데이터만 유지
Flux<Integer> seq2 = someSeq.take(Duration.ofSeconds(10)); // 최초 10초 동안 데이터 유지
Flux<Integer> seq3 = someSeq.takeLast(5); // 마지막 5개 데이터만 유지

이 외에 takeWhile()과 takeUntil()도 있다. 이 두 메서드는 Predicate을 인자로 받는다. takeWhile()은 Predicate 구현이 true를 리턴하는 동안 데이터를 포함하고 takeUntil()은 처음 true를 리턴할 때까지 데이터를 포함한다.

지정한 개수/시간만큼 데이터 거르기: skip, skipLast

시퀀스에서 처음 n개의 데이터를 거르고 싶다면 skip(long) 메서드를 사용한다. 지정한 처음 시간 동안 발생한 데이터를 거르고 싶다면 skip(Duration) 메서드를 사용한다. 마지막 n개의 데이터를 거르고 싶다면 skipLast(long) 메서드를 사용한다.

take()와 비슷하게 skipWhile()과 skipUntil()을 지원한다.


관련 글


  1. 2020.10.20 17:44

    비밀댓글입니다

  2. aggom 2021.11.23 17:13

    좋은 꿀정보 정말 감사합니다!!

반응형

시퀀스 생성 2: Flux.create(), Flux.fromStream()

이전 글(스프링 리액터 시작하기 2 - 시퀀스 생성 just, generate)에서 살펴본 Flux.generate()는 Subscriber로부터 요청이 있을 때에 next 신호를 발생하는 Flux를 생성한다. 즉 pull 방식의 Flux를 생성한다. 이는 단순하지만 데이터 발생을 비동기나 push 방식으로 할 수 없다는 제약도 있다. Flux.create()를 사용하면 이런 제약 없이 비동기나 push 방식으로 데이터를 발생할 수 있다.


Flux.create()를 이용한 pull 방식 메시지 생성

먼저 Flux.create() 메서드를 이용해서 pull 방식으로 메시지를 생성하는 방법을 살펴보자. 다음은 예제 코드이다.


Flux<Integer> flux = Flux.create( (FluxSink<Integer> sink) -> {

    sink.onRequest(request -> { // request는 Subscriber가 요청한 데이터 개수

        for (int i = 1; i <= request; i++) {

            sink.next(i); // Flux.generate()의 경우와 달리 한 번에 한 개 이상의 next() 신호 발생 가능

        }

    });

});


위 코드에서 Flux.create() 메서드의 파라머티는 함수형 타입 Consumer<? super FluxSink<T>>이다. 이 Consumer는 FluxSink를 이용해서 Subscriber에 신호를 발생할 수 있다.


FluxSink#onRequest(LongConsumer) 메서드의 Consumer는 Subscriber가 데이터를 요청했을 때 불린다. 이때 LongConsumer는 Subscriber가 요청한 데이터 개수를 전달받는다. 위 코드에서는 클라이언트가 요청한 데이터 개수만큼 next 신호를 발생하고 있다.


Flux.generate()와의 차이점은 Flux.generate()의 경우 한 번에 한 개의 next 신호만 발생할 수 있었던 데 비해 Flux.create()는 한 번에 한 개 이상의 next() 신호를 발생할 수 있다는 점이다.


Flux.create()를 이용한 push 방식 메시지  생성

Flux.create()를 이용하면 Subscriber의 요청과 상관없이 비동기로 데이터를 발생할 수 있다. 다음 코드를 보자.


DataPump pump = new DataPump();


Flux<Integer> bridge = Flux.create((FluxSink<Integer> sink) -> {

    pump.setListener(new DataListener<Integer>() {

        @Override

        public void onData(List<Integer> chunk) {

            chunk.forEach(s -> {

                sink.next(s); // Subscriber의 요청에 상관없이 신호 발생

            });

        }

        @Override

        public void complete() {

            logger.info("complete");

            sink.complete();

        }

    });

});


이 코드에서 DataPump는 데이터를 어딘가에서 데이터가 오면 setListener()로 등록한 DataListener의 onData()를 실행한다고 가정하자. DataListener#onData() 메서드는 FluxSink#next()를 이용해서 데이터를 발생한다. DataListener#onData() 메서드는 Subscriber의 데이터 요청과 상관없이 호출된다. 즉 위 코드는 Subscriber의 요청과 상관없이 데이터를 push한다.


Flux.create()와 배압

Subscriber로부터 요청이 왔을 때(FluxSink#onRequest) 데이터를 전송하거나(pull 방식) Subscriber의 요청에 상관없이 데이터를 전송하거나(push 방식) 두 방식 모두 Subscriber가 요청한 개수보다 더 많은 데이터를 발생할 수 있다. 예를 들어 아래 코드를 보자.


Flux<Integer> flux = Flux.create( (FluxSink<Integer> sink) -> {

    sink.onRequest(request -> {

        for (int i = 1; i <= request + 3 ; i++) { // Subscriber가 요청한 것보다 3개 더 발생

            sink.next(i);

        }

    });

});


이 코드는 Subscriber가 요청한 개수보다 3개 데이터를 더 발생한다. 이 경우 어떻게 될까? 기본적으로 Flux.create()로 생성한 Flux는 초과로 발생한 데이터를 버퍼에 보관한다. 버퍼에 보관된 데이터는 다음에 Subscriber가 데이터를 요청할 때 전달된다.


요청보다 발생한 데이터가 많을 때 선택할 수 있는 처리 방식은 다음과 같다.

  • IGNORE : Subscriber의 요청 무시하고 발생(Subscriber의 큐가 다 차면 IllegalStateException 발생)
  • ERROR : 익셉션(IllegalStateException) 발생
  • DROP : Subscriber가 데이터를 받을 준비가 안 되어 있으면 데이터 발생 누락
  • LATEST : 마지막 신호만 Subscriber에 전달
  • BUFFER : 버퍼에 저장했다가 Subscriber 요청시 전달. 버퍼 제한이 없으므로 OutOfMemoryError 발생 가능

Flux.create()의 두 번째 인자로 처리 방식을 전달하면 된다.


Flux.create(sink -> { ... }, FluxSink.OverflowStrategy.IGNORE);


Flux.fromStream(), Flux.fromIterable()을 이용한 Flux 생성

Flux.stream()을 사용하면 자바 8의 Stram에서 Flux를 생성할 수 있다. 다음은 예이다.


Stream<String> straem = Files.lines(Paths.get(filePath));

Flux<String> seq = Flux.fromStream(straem);

seq.subscribe(System.out::println);


Flux.fromIterable()을 이용하면 Iterable을 이용해서 Flux를 생성할 수 있다. List나 Set과 같은 콜렉션에서 Flux를 생성하고 싶을 때 이 메서드를 사용하면 된다.


관련 글





  1. sangpire 2020.02.02 12:45 신고

    Flux.create 로 Publisher 생성시, FluxSink.OverflowStrategy.LATEST 옵션을 준다면, 이 전에 언급하신 '핫 시퀀스' 의 한 형태가 되는 걸까요?

    ps. 블로그도 책도 정말 잘 보고있습니다.
    항상 감사합니다.

    • 최범균 madvirus 2020.02.09 14:11 신고

      OverflowStragey는 Subscriber의 상태에 따라 어떻게 할지를 결정하는 것으로 이는 핫 시퀀스인지 콜드 시퀀스인지 여부와는 다릅니다.

      콜드와 핫을 구분하는 건 구독 시점에 발생하는 신호와 관련이 있죠. 콜드는 새로운 subscriber가 구독을 할 때마다 다시 (동일한) 신호를 발생하죠. 반면에 핫은 구독하는 시점 이후에 발생한 신호만 전달합니다.

반응형

시퀀스 생성 1 :just(), generate()

[노트]

시퀀스를 직접 생성할 일이 많지는 않다. 보통은 라이브러리가 제공하는 기능을 사용하기 때문이다. 그럼에도 불구하고 시퀀스 생성 방법을 정리한 이유는 리액터 시퀀스를 생성하는 방법을 살펴보면 리액티브의 동작을 이해하는데 도움이 되기 때문이다.

 

Flux.just(), Mono.just()로 만들기

시퀀스를 생성하는 가장 쉬운 방법은 Flux.just()를 사용하는 것이다. just() 메서드는 시퀀스로 사용할 데이터가 이미 존재할 때 사용한다. 다음은 사용 예이다.

 

Flux<Integer> seq = Flux.just(1, 2, 3);

 

이 Flux는 1, 2, 3 데이터를 차례대로 발생하고 complete 신호를 발생한다. just() 메서드는 가변 인자로 0개 이상의 데이터를 전달할 수 있다. 아래와 같이 발생할 데이터를 주지 않으면 complete 신호만 발생한다.

 

Flux<Integer> seq = Flux.just();

 

Mono.just()도 동일하다. 차이라면 Mono는 1개 값만 생성하므로 데이터도 한 개만 받는다는 것이다.

 

Mono<Integer> seq = Mono.just(1);

 

Mono.just(null)과 같이 null을 값으로 주면 NullPointerException이 발생한다. 데이터를 발생하지 않는 Mono를 생성하고 싶다면 Mono.empty()를 사용한다.

 

값이 있을 수도 있고 없을 수도 있는 Mono를 생성할 때에는 justOrEmpty() 메서드를 사용하면 된다. 다음은 사용 예이다.

 

// null을 값으로 받으면 값이 없는 Mono

Mono<Integer> seq1 = Mono.justOrEmpty(null); // complete 신호

Mono<Integer> seq2 = Mono.justOrEmpty(1); // next(1) 신호- complete 신호

 

// Optional을 값으로 받음

Mono<Integer> seq3 = Mono.justOrEmpty(Optional.empty()); // complete 신호

Mono<Integer> seq4 = Mono.justOrEmpty(Optional.of(1)); // next(1) 신호 - complete 신호

 

Flux.range()로 정수 생성하기

Flux.range() 메서드를 사용하면 순차적으로 증가하는 Integer를 생성하는 Flux를 생성할 수 있다. 예를 들어 다음 코드는 11부터 시작해서 5개의 Integer를 생성하는 Flux 시퀀스를 생성한다. 즉 11부터 15까지의 Integer를 생성한다.

 

Flux<Integer> seq = Flux.range(11, 5);

 

Flux.generate() 메서드로 Flux 만들기

Flux.generate() 메서드를 사용하면 데이터를 함수를 이용해서 생성할 수 있다. Flux.generate() 함수는 동기 방식으로 한 번에 1개의 데이터를 생성할 때 사용한다. Flux.generate() 메서드 중 하나는 다음과 같다.

  • Flux<T> generate(Consumer<SynchronousSink<T>> generator)
generator는 Subscriber로부터 요청이 왔을 때 신호를 생성한다. generate()가 생성한 Flux는 다음과 같은 방식으로 신호를 발생한다.
  • Subscriber의 요청에 대해 인자로 전달받은 generator를 실행한다. generator를 실행할 때 인자로 SynchronousSink를 전달한다.
  • generator는 전달받은 SynchronousSink를 사용해서 next, complete, error 신호를 발생한다. 한 번에 1개의 next() 신호만 발생할 수 있다.
예제 코드를 보자
 
Consumer<SynchronousSink<Integer>> randGen = new Consumer<>() {
    private int emitCount = 0;
    private Random rand = new Random();
 
    @Override
    public void accept(SynchronousSink<Integer> sink) {
        emitCount++;
        int data = rand.nextInt(100) + 1; // 1~100 사이 임의 정수
        logger.info("Generator sink next " + data);
        sink.next(data); // 임의 정수 데이터 발생
        if (emitCount == 10) { // 10개 데이터를 발생했으면
            logger.info("Generator sink complete");
            sink.complete(); // 완료 신호 발생
        }
    }
};
 
Flux<Integer> seq = Flux.generate(randGen);
 
seq.subscribe(new BaseSubscriber<>() {
    private int receiveCount = 0;
    @Override
    protected void hookOnSubscribe(Subscription subscription) {
        logger.info("Subscriber#onSubscribe");
        logger.info("Subscriber request first 3 items");
        request(3);
    }
 
    @Override
    protected void hookOnNext(Integer value) {
        logger.info("Subscriber#onNext: " + value);
        receiveCount++;
        if (receiveCount % 3 == 0) {
            logger.info("Subscriber request next 3 items");
            request(3);
        }
    }
 
    @Override
    protected void hookOnComplete() {
        logger.info("Subscriber#onComplete");
    }
});.
 
randGen의 accept() 메서드는 1~100 사이의 임의 정수를 생성한 뒤 인자로 SynchronousSink의 next() 메서드를 이용해서 next 신호를 발생한다. emitCount가 10이면(즉 데이터를 10개 발생했다면) SynchronousSink#complete() 메서드를 이용해서 complete 신호를 발생한다.
 
randGen은 신호 발생 기능을 제공할 뿐이며 실제 시퀀스는 Flux.generate()를 이용해서 생성했다.
 
seq.subscribe() 메서드에 전달한 Subscriber는 구독 시점에 3개의 데이터를 요청하고(hookOnSubscribe() 메서드의 request(3) 코드), 데이터를 3개 수신할 때마다 다시 3개의 데이터를 요청한다(hookOnNext() 메서드의 request(3) 코드).
 
콘솔에 관련 문장을 출력해서 실행 흐름을 알 수 있도록 했다. 위 코드를 실제로 실행해보면 다음 내용이 콘솔에 출력된다. 원래 출력에는 빈 줄이 없는데 쉬운 구분을 위해 빈 줄을 넣었고 Subscriber의 출력은 파란색으로 표시했다.
 
Subscriber#onSubscribe
Subscriber request first 3 items
 
Generator sink next 17
Subscriber#onNext: 17
Generator sink next 83
Subscriber#onNext: 83
Generator sink next 53
Subscriber#onNext: 53
Subscriber request next 3 items
 
Generator sink next 12
Subscriber#onNext: 12
Generator sink next 38
Subscriber#onNext: 38
Generator sink next 90
Subscriber#onNext: 90
Subscriber request next 3 items
 
Generator sink next 23
Subscriber#onNext: 23
Generator sink next 70
Subscriber#onNext: 70
Generator sink next 76
Subscriber#onNext: 76
Subscriber request next 3 items
 
Generator sink next 52
Subscriber#onNext: 52
Generator sink complete
Subscriber#onComplete
 
Flux가 제공하는 다른 generate() 메서드로는 다음이 있다.
  • Flux<T> generate(Callable<S> stateSupplier, BiFunction<S, SynchronousSink<T>, S> generator)
  • Flux<T> generate(Callable<S> stateSupplier, BiFunction<S, SynchronousSink<T>, S> generator,
                                 Consumer<? super S> stateConsumer)
stateSupplier는 값을 생성할 때 사용할 최초 상태이다. BiFunction 타입의 generator는 인자로 상태와 SynchronousSink를 입력받아 결과로 다음 상태를 리턴하는 함수이다. 앞서 예제와 마찬가지로 SynchronousSink를 사용해서 신호를 생성한다. 두 번째 generate() 메서드의 stateConsumer는 상태를 정리할 때 사용한다. generator가 complete 신호나 error 신호를 발생하면 상태 정리를 위해 stateConsumer를 실행한다.
 
다음은 상태를 사용하는 Flux.generate()를 이용해서 임의 숫자 10개를 발생시키는 Flux를 생성하는 코드 예이다.
 
Flux<String> flux = Flux.generate(
        () -> { // Callable<S> stateSupplier
            return 0;
        },
        (state, sink) -> { // BiFunction<S, SynchronousSink<T>, S> generator
            sink.next("3 x " + state + " = " + 3 * state);
            if (state == 10) {
                sink.complete();
            }
            return state + 1;
        });
 
이 코드에서 flux는 "3 x 0 = 0" 부터 "3 x 10 = 30" 까지의 데이터를 담은 next 신호를 차례대로 발생하고 state 값이 10이 되면 compelete 신호를 발생한다.
 
Flux.just()나 Flux.generate()는 데이터 생성 과정 자체가 동기 방식이다. Subscriber로부터 데이터 요청이 오면 그 시점에 SynchronousSink를 이용해서 데이터를 생성한다. 반면에 별도 쓰레드를 이용해서 비동기로 데이터를 생성해야 하는 경우에는 SynchronousSink를 사용할 수 없다. 게다가 SynchronousSink는 한 번에 하나의 next 신호만 발생할 수 있다. 예를 들어 아래 코드는 에러를 발생한다.

 

Flux<String> flux = Flux.generate(

        () -> 1,

        (state, sink) -> {

            sink.next("Q: 3 * " + state);

            sink.next("A: " + (3 * state)); // 에러!

            if (state == 10) {

                sink.complete();

            }

            return state + 1;

        });

 

데이터 자체를 비동기로 생성해야 하거나 번에 다수의 next 신호를 발생해야 할 경우 Flux.generate()로는 처리할 수 없다. 이 때에는 Flux.create() 메서드를 사용해야 하는데 이 메서드를 포함한 Flux를 생성하는 또 다른 방법은 다음 글에서 이어서 살펴본다.

 

관련 글

 

  1. bysu 2021.09.08 16:19

    Flux<String> flux = Flux.generate(
    () -> { // Callable<S> stateSupplier
    return 0;
    },
    (state, sink) -> { // BiFunction<S, SynchronousSink<T>, S> generator
    sink.next("3 x " + state + " = " + 3 * state);
    if (state == 10) {
    sink.complete();
    }
    return state + 1;
    });

    이 코드에서 flux는 "3 x 1 = 3" 부터 "3 x 10 = 10" 까지의 데이터를 담은 next 신호를 차례대로 발생하고 state 값이 10이 되면 compelete 신호를 발생한다.

    3x1 부터 ->3x0 부터로 정정 되어야 하지 않을까 싶습니다.

반응형

리액티브 스트림즈, Flux, Mono

스프링은 웹 요청 처리, HTTP 클라이언트, NoSQL 연동 등 많은 영역에서 리액티프 프로그래밍을 지원하고 있다. 스프링 리액터는 스프링에서 리액티브 프로그래밍을 위한 핵심 모듈이다. 리액터를 잘 사용하려면 많은 것들을 알아야 하지만 가장 기본이 되는 두 타입인 Flux와 Mono에 대해 알아야 한다. 이 글에서는 리액티브 프로그래밍의 핵심 개념인 스프림에 대해 살펴보고 Flux와 Mono의 기본적인 사용법을 살펴본다.


왜 리액티브인가?

서버 관점에서 리액티브를 사용하는 이유 중 하나는 비동기/논블록을 이용해서 더 적은 자원으로 더 많은 트래픽을 처리하기 위함이다. 관련 내용은 "왜 리액티브인가 요약" 글을 참고한다.


리액티브 스트림

리액티브 스트림즈(http://www.reactive-streams.org/)는 비동기 스트림 처리를 위한 표준이다. 스프링 리액터는 이를 구현한 라이브러리이며 자바9의 Flow API도 리액티브 스트림 API를 따르고 있다. 


스트림은 시간이 지남에 따라 생성되는 일련의 데이터/이벤트(event)/신호(signal)이다. 맥락에 따라 데이터, 이벤트, 신호라는 용어를 사요한다. 이 글에서도 필요에 따라 이들 용어를 혼용해서 사용할 것이다. 리액티브 스트림즈 스펙에서는 용어로 신호를 사용한다. 리액티브 스트림즈는 다음 세 신호를 발생할 수 있다.

  • onNext* (onComplete | onError)?

스트림은 0개 이상의 next 신호를 발생할 수 있다. next 신호는 데이터를 담는다. complete 신호는 스트림이 끝났음을 의미하며 error 신호는 에러가 발생했음을 의미한다. complete와 error는 둘 중 하나만 발생할 수 있으며, 이 두 신호는 발생하지 않을 수도 있다. 


스트림의 예로 1분 간격 현재 기온 스트림을 들 수 있다. 이 데이터는 개념적으로 complete나 error 없이 next 신호만 1분 간격으로 발생한다. 파일 스트림은 파일을 읽는 동안 데이터를 담은 next 신호를 발생하고 파일을 다 읽으면 compelete 신호를 발생한다. 파일을 읽는 도중 에러가 발생하면 error 신호를 발생한다.


리액티브 스트림즈는 Publisher를 이용해서 스트림을 정의하며 Subscriber를 이용해서 발생한 신호를 처리한다. Subscriber가 Publisher로부터 신호를 받는 것을 구독이라고 한다. 다음 코드는 스프링 리액터가 제공하는 Publisher의 한 종류인 Flux에 대해 구독하는 코드 예를 보여준다.


Flux<Integer> seq = Flux.just(1, 2, 3); // Integer 값을 발생하는 Flux 생성


seq.subscribe(value -> System.out.println("데이터 : " + value)); // 구독


리액터는 스트림이라는 용어 대신 시퀀스라는 용어를 주로 사용한다. 위 코드에서 변수 이름이 seq인 이유는 시퀀스를 의미하기 위함이다. 첫 줄은 1, 2, 3 값을 차례대로 발생하는 Flux를 생성한다.


실제 값 발생은 구독(subscription) 시점에 이뤄진다. 위 코드는 Flux#subscribe(Consumer) 메서드를 이용해서 구독한다. 이 경우 Flux가 발생한 신호를 Consumer가 받아서 처리한다. 위 코드는 수신한 데이터를 콘솔에 출력하므로 위 코드를 실행하면 다음과 같은 결과가 출력된다.


데이터 : 1

데이터 : 2

데이터 : 3


물론 이렇게 단순한 작업을 하기 위해 리액티브 프로그래밍을 사용하는 것은 아니다. 스케줄링, 다양한 조합 기능을 이용해서 이전보다 더 간결하면서도 자원을 효율적으로 사용하는 코드를 작성할 수 있다.


[노트]

리액티브 스트림즈는 스트림이라는 표현을 사용하지만 이는 자바 8의 스트림과 혼동할 수 있다. 이런 이유로 "리액티브 스트림즈" 자체를 표현할 때가 아니면 스트림 대신 시퀀스라는 용어를 사용하겠다.


리액터 사용 위한 메이븐 설정

리액터는 reactor-core, reactor-netty, reactor-extra, reactor-adapter 등 다양한 모듈로 구성되어 있다. 각 모듈의 버전을 맞추기 위해 리액터는 메이븐 BOM(Bill Of Materials)을 제공한다. 이 글에서는 reactor-core 의존만 사용하긴 하지만 BOM을 포함한 의존 설정을 사용해보자. 메이븐 의존 설정은 다음과 같다.


<dependencyManagement>

    <dependencies>

        <dependency>

            <groupId>io.projectreactor</groupId>

            <artifactId>reactor-bom</artifactId>

            <version>Bismuth-SR9</version>

            <type>pom</type>

            <scope>import</scope>

        </dependency>

    </dependencies>

</dependencyManagement>


<dependencies>

    <dependency>

        <groupId>io.projectreactor</groupId>

        <artifactId>reactor-core</artifactId>

    </dependency>


    <dependency>

        <groupId>org.slf4j</groupId>

        <artifactId>slf4j-api</artifactId>

        <version>1.7.12</version>

    </dependency>

    <dependency>

        <groupId>ch.qos.logback</groupId>

        <artifactId>logback-classic</artifactId>

        <version>1.2.3</version>

    </dependency>

</dependencies>


reactor-bom의 Bismuth 버전은 스프링 리액터 3.1 버전을 정의한다. Bismuth-SR9 버전은 reactor-core 3.1.7.RELEASE를 기준으로 한다.


스프링 리액터의 Publisher: Flux와 Mono

스프링 리액터는 Flux와 Mono의 두 가지 Publisher를 제공하고 있다. 이 두 타입은 발생할 수 있는 데이터 개수에 차이가 있다. Flux는 0개 이상의 데이터를 발생할 수 있고 Mono는 0 또는 1개의 데이터를 발생할 수 있다.


앞서 Publisher는 next, complete, error 신호를 발생할 수 있다고 했다. Flux는 0개 이상의 데이터를 발생하므로 0개 이상의 next 신호를 발생할 수 있고 complete나 error 신호를 발생하거나 발생하지 않을 수 있다. 예를 들어 다음 코드를 보자.


Flux.just(1, 2, 3);


이 코드에서 seq 시퀀스는 1, 2, 3을 값으로 갖는 세 개의 next 신호를 발생하고 마지막에 complete 신호를 발생해서 시퀀스를 끝낸다. 즉 시간 순으로 표시하면 다음과 같이 시퀀스가 발생한다('--->'는 시간축, '|'는 complete 신호 의미).


--1-2-3-|-->


아래 코드와 같이 아무 값도 발생하지 않는 시퀀스는 complete 신호만 발생한다.


Flux.just(); // --|-->


Mono도 유사하다. 차이라면 최대 발생할 수 있는 값이 1개라는 점이다.


Mono.just(1); // --1-|-->

Mono.empty(); // --|-->


just() 메서드는 이미 존재하는 값을 사용해서 Flux/Mono를 생성할 때 사용된다. just() 외에 create(), generate()를 이용해서 생성할 수 있는데 이에 대한 내용은 나중에 정리해보겠다.


[노트]

Flux와 Mono를 직접 생성하기보다는 다른 라이브러리가 제공하는 Flux와 Mono를 사용할 때가 많다. 예를 들어 스프링 5 버전에 추가된 WebClient 클래스를 사용할 때에는 WebClient가 생성하는 Mono를 이용해서 데이터를 처리한다.


구독과 신호 발생

시퀀스는 바로 신호를 발생하지 않는다. 구독을 하는 시점에 신호를 발생하기 시작한다. 코드로 확인해보자. 먼저 다음 코드를 보자.


Flux.just(1, 2, 3)

     .doOnNext(i -> System.out.println("doOnNext: " + i))

     .subscribe(i -> System.out.println("Received: " + i));


위 코드에서 doOnNext() 메서드는 Flux가 Subscriber에 next 신호를 발생할 때 불린다. 실행 결과는 다음과 같다.


doOnNext: 1

Received: 1

doOnNext: 2

Received: 2

doOnNext: 3

Received: 3


이제 코드를 다음과 같이 바꾸고 다시 실행해보자.


Flux<Integer> seq = Flux.just(1, 2, 3)

        .doOnNext(i -> System.out.println("doOnNext: " + i));


System.out.println("시퀀스 생성");

seq.subscribe(i -> System.out.println("Received: " + i));


실행 결과는 다음과 같다.


시퀀스 생성

doOnNext: 1

Received: 1

doOnNext: 2

Received: 2

doOnNext: 3

Received: 3


이 결과를 보면 시퀀스를 생성한 시점에는 doOnNext에 전달한 함수가 실행되지 않는 것을 알 수 있다. doOnNext에 전달한 함수는 next 신호를 발생할 때 호출되기 때문이다. subscribe()를 실행해서 구독을 한 이후에 doOnNext에 전달한 코드가 실행되는데 이는 subscribe() 시점에 신호를 발생하기 시작한다는 것을 보여준다.


콜드 시퀀스 vs 핫 시퀀스

시퀀스는 구독 시점부터 데이터를 새로 생성하는 콜드(cold) 시퀀스와 구독자 수에 상관없이 데이터를 생성하는 핫(hot) 시퀀스로 나뉜다.


앞 예제 Flux.just()로 생성한 시퀀스가 콜드 시퀀스이다. 콜드 시퀀스는 구독을 하지 않으면 데이터를 생성하지 않는다. 구독을 하면 그 시점에 데이터를 새롭게 발생한다. 다음 코드는 이런 특징을 보여준다.


Flux<Integer> seq = Flux.just(1, 2, 3);

seq.subscribe(v -> System.out.println("구독1: " + v)); // 구독

seq.subscribe(v -> System.out.println("구독2: " + v)); // 구독


이 코드는 seq 시퀀스에 대해 구독을 두 번한다. 코드 결과는 다음과 같은데 이 결과를 보면 seq 시퀀스는 각 구독마다 데이터를 새롭게 생성하는 것을 알 수 있다.


구독1: 1

구독1: 2

구독1: 3

구독2: 1

구독2: 2

구독2: 3


콜드 시퀀스의 예로 API 호출을 들 수 있다. API 호출 시퀀스는 구독을 할 때마다 매번 새로운 요청을 서버에 전송하고 결과를 받는다.


핫 시퀀스는 구독 여부에 상관없이 데이터가 생성된다. 구독을 하면 구독한 시점 이후에 발생하는 데이터부터 신호를 받는다. 핫 시퀀스 예로 센서 데이터를 들 수 있다. 센서 데이터를 제공하는 시퀀스를 구독하면 그 시점 이후에 센서가 발생한 데이터부터 받게 된다.


Subscriber와 Subscription

앞 코드 예에서 다음의 subscribe() 메서드를 사용해서 구독을 했다.


// Flux나 Mono

subscribe(Consumer<? super T> consumer)


Consumer를 파라미터로 갖는 subscribe() 메서드는 리액터가 편의를 위해 제공하는 메서드로서 이 메서드는 내부적으로 Subscriber를 인자로 받는 subscribe() 메서드를 실행한다.


Subscriber는 리액티브 스트림즈에 포함된 인터페이스로 다음과 같이 정의되어 있다.


package org.reactivestreams;


public interface Subscriber<T> {

    void onSubscribe(Subscription s);

    void onNext(T t);

    void onError(Throwable t);

    void onComplete();

}


각 메서드는 다음과 같다.

  • onSubscribe(Subscription s): 구독을 하면 Publisher와 연동된 Subscription을 받는다. 전달받은 Subscription을 이용해서 Publisher에 데이터를 요청한다.
  • onNext(T t): Publisher가 next 신호를 보내면 호출된다.
  • onError(Throwable t): Publisher가 error 신호를 보내면 호출된다.
  • onComplete(): Publisher가 complete 신호를 보내면 호출된다.

각 메서드가 어떻게 동작하는지 다음 예제 코드로 알아보자.


Flux<Integer> seq = Flux.just(1, 2, 3);


seq.subscribe(new Subscriber<>() {

    private Subscription subscription;

    @Override

    public void onSubscribe(Subscription s) {

        System.out.println("Subscriber.onSubscribe");

        this.subscription = s;

        this.subscription.request(1); // Publisher에 데이터 요청

    }


    @Override

    public void onNext(Integer i) {

        System.out.println("Subscriber.onNext: " + i);

        this.subscription.request(1); // Publisher에 데이터 요청

    }


    @Override

    public void onError(Throwable t) {

        System.out.println("Subscriber.onError: " + t.getMessage());

    }


    @Override

    public void onComplete() {

        System.out.println("Subscriber.onComplete");

    }

});


subscribe() 메서드에 전달한 임의 Subscriber 객체의 onSubscribe() 메서드는 인자로 전달받은 Subscription 객체를 필드에 저장한다. Subscription은 구독 라이프사이클을 관리한다. 예를 들어 Subscription#request() 메서드는 Publisher에 데이터 요청 신호를 보낸다. 위 코드는 request(1)을 실행했는데 이는 1개의 데이터를 요청한다는 것을 의미한다. 즉 onSubscribe() 메서드는 파라미터로 전달받은 Subscription을 이용해서 Publisher에 1개의 데이터를 요청한다.

Publisher가 데이터 신호(next 신호)를 보내면 Subscriber#onNext() 메서드가 불린다. 위 예제에서는 전달받은 데이터를 출력하고 Subscription#request()를 이용해서 다음 데이터 1개를 요청한다. 즉 이 코드는 최초 구독 시점에 데이터 1개를 요청하고(onSubscribe) 이후 한 개의 데이터를 받으면 다시 한 개의 데이터를 요청한다(onNext).

실제 위 코드를 실행하면 다음 내용이 콘솔에 출력된다.

Subscriber.onSubscribe
Subscriber.onNext: 1
Subscriber.onNext: 2
Subscriber.onNext: 3
Subscriber.onComplete

이번엔 onNext() 메서드에서 다음처럼 subscription.request(1) 코드를 주석처리하고 다시 실행해보자.

Flux<Integer> seq = Flux.just(1, 2, 3);
seq.subscribe(new Subscriber<>() {
    ...생략

    @Override
    public void onNext(Integer i) {
        System.out.println("Subscriber.onNext: " + i);
        // this.subscription.request(1);
    }

    ...생략
});

결과는 다음과 같다.

Subscriber.onSubscribe
Subscriber.onNext: 1

onSubscribe()에서만 1개의 데이터를 요청하고 onNext()에서는 어떤 요청도 하지 않아 Publisher가 1개 데이터만 발생한 것을 알 수 있다.

request(Long.MAX_VALUE)를 사용하면 개수 제한없는 데이터 요청 신호를 Publisher에 보낸다. Publisher는 이 신호를 받으면 끝까지 데이터를 발생시킨다. 이를 다음과 같이 변경해보자.

Flux<Integer> seq = Flux.just(1, 2, 3);
seq.subscribe(new Subscriber<>() {
    private Subscription subscription;
    @Override
    public void onSubscribe(Subscription s) {
        System.out.println("Subscriber.onSubscribe");
        this.subscription = s;
        this.subscription.request(Long.MAX_VALUE);
    }

    @Override
    public void onNext(Integer i) {
        System.out.println("Subscriber.onNext: " + i);
    }

    ...생략
});

위 코드를 실행하면 Publisher가 모든 데이터를 발생한 것을 확인할 수 있다.

푸시 모델 vs 풀 모델

Subscription#request()는 Subscriber가 데이터를 처리할 수 있을 때 Publisher에게 데이터를 요청하는 풀(pull) 모델이다. 하지만 request(Long.MAX_VALUE)로 요청하면 Publisher는 개수 제한 없이 Subscriber에 데이터를 전송한다. 이는 완전한 푸시(push) 모델이다. 또 request(100000)을 사용하면 십 만 개의 데이터를 요청하고, Publisher는 발생한 데이터가 십 만 개가 될 때까지 신호를 보낸다. 데이터 요청은 풀 모델로 이루어졌지만 10만 개의 데이터를 전송하는 동안은 실질적으로 푸시 모델과 같다.

subscribe() 메서드

다음은 리액터가 제공하는 subscribe() 메서드이다.
  • subscribe()
  • subscribe(Consumer<? super T> consumer)
  • subscribe(Consumer<? super T> consumer,
                 Consumer<? super Throwable> errorConsumer)
  • subscribe(Consumer<? super T> consumer,
                 Consumer<? super Throwable> errorConsumer,
                 Runnable completeConsumer)
  • subscribe(Consumer<? super T> consumer,
                 Consumer<? super Throwable> errorConsumer,
                 Runnable completeConsumer,
                 Consumer<? super Subscription> subscriptionConsumer)
  • subscribe(Subscriber<? super T> actual)
  • subscribe(CoreSubscriber<? super T> actual)

메서드의 각 파라미터는 다음과 같다.

  • consumer: next 신호 처리
  • errorConsumer: error 신호 처리
  • completeConsumer: complete 신호 처리
  • subscriptionConsumer: Subscriber의 onSubscribe 메서드에 대응
  • actual: Subscriber나 CoreSubscriver 타입

관련글


  1. kyungsik-oh 2018.09.27 14:49

    핫 시퀀스는 구독 여부에 상관없이 데이터가 생성된다. 구독을 하면 구독한 시점 이후에 발생하는 데이터부터 신호를 받는다. 콜드 스트림의 예로 센서 데이터를 들 수 있다. 센서 데이터를 제공하는 시퀀스스에 구독을 하면 그 시점 이후에 센서가 발생한 데이터부터 받게 된다.

    -->

    여기 구문에 오타가 있는 것 같습니다 : )
    콜드스트림의 예로 센서 데이터를 들 수 있다 --> 핫 시퀀스(핫스트림)의 예로 센서 데이터를 들 수 있다.
    시퀀스스에 구독을 하면 --> 시퀀스에 구독을 하면

  2. kkakka 2020.06.13 11:45

    안녕하세요. 항상 블로그 잘보고 있습니다.

    https://projectreactor.io/docs/core/release/reference/#reactor.hotCold
    위 문서에서보면

    hot publisher는 구독자 수에 상관없이 처음 생성될때부터 데이터를 생성하여 계속 데이터를 방출하고,
    cold는 반대로 구독시점에 데이터를 새로 생성하는 건 블로그와 문서의 내용이 같은데
    just 연산자가 hot publisher의 연산자중 하나로 예시가 나와있어서 블로그와 내용이 조금 다른것 같아서요.
    저도 이제 막 공부하기 시작해서 제가 틀린걸 수도 있지만 확인부탁드립니다~

GIT
반응형

윈도우에서 git 체크아웃 받는 과정에서 아래와 같이 파일 이름이 길어서 파일을 생성할 수 없다는 오류가 발생할 때가 있다.


error: unable to create file ...파일경로 (Filename too long)


이는 윈도우 API의 파일 경로 길이가 260자 제한을 갖기 때문이다. 이 제한을 없애려면 다음 명령어를 사용해서 git의 core.longpaths 설정을 true로 지정하면 된다.


git config --system core.longpaths true



  1. Hojong 2021.06.24 10:37 신고

    감사합니다

반응형

스프링 부트 2에서 JUnit 5를 사용하는 방법을 정리한다. 먼저 pom.xml 파일을 다음과 같이 수정한다.


  • spring-boot-starter-test 의존 설정에서 junit:junit을 제외 처리
  • junit-jupiter-api 의존 추가
  • maven-surefire-plugin 플러그인 JUnit 5 기준 설정. 주의할 점은 maven-sufire-plugin의 버전을 2.19.1로 설정해야 한다는 점이다. 스프링 부트 2.0.2는 maven-surefire-plugin의 2.21.0 버전을 기본으로 사용하는데 이 버전은 JUnit 5를 제대로 처리하지 못한다.

다음은 설정 예이다.


<?xml version="1.0" encoding="UTF-8"?>

<project ...>

    <modelVersion>4.0.0</modelVersion>


    <groupId>madvirus</groupId>

    <artifactId>boot2-junit5</artifactId>

    <version>0.0.1-SNAPSHOT</version>

    <packaging>jar</packaging>


    <name>boot2-junit5</name>


    <parent>

        <groupId>org.springframework.boot</groupId>

        <artifactId>spring-boot-starter-parent</artifactId>

        <version>2.0.2.RELEASE</version>

        <relativePath/> <!-- lookup parent from repository -->

    </parent>


    <properties>

        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>

        <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>

        <java.version>1.9</java.version>

        <junit-jupiter.version>5.1.1</junit-jupiter.version>

        <junit-platform.version>1.1.1</junit-platform.version>

    </properties>


    <dependencies>

        <dependency>

            <groupId>org.springframework.boot</groupId>

            <artifactId>spring-boot-starter-web</artifactId>

        </dependency>


        <dependency>

            <groupId>org.springframework.boot</groupId>

            <artifactId>spring-boot-devtools</artifactId>

            <scope>runtime</scope>

        </dependency>

        <dependency>

            <groupId>org.springframework.boot</groupId>

            <artifactId>spring-boot-starter-test</artifactId>

            <scope>test</scope>

            <exclusions>

                <exclusion>

                    <groupId>junit</groupId>

                    <artifactId>junit</artifactId>

                </exclusion>

            </exclusions>

        </dependency>


        <dependency>

            <groupId>org.junit.jupiter</groupId>

            <artifactId>junit-jupiter-api</artifactId>

        </dependency>

    </dependencies>


    <build>

        <plugins>

            <plugin>

                <groupId>org.springframework.boot</groupId>

                <artifactId>spring-boot-maven-plugin</artifactId>

            </plugin>


            <plugin>

                <groupId>org.apache.maven.plugins</groupId>

                <artifactId>maven-surefire-plugin</artifactId>

                <version>2.19.1</version>

                <dependencies>

                    <dependency>

                        <groupId>org.junit.platform</groupId>

                        <artifactId>junit-platform-surefire-provider</artifactId>

                        <version>${junit-platform.version}</version>

                    </dependency>

                    <dependency>

                        <groupId>org.junit.jupiter</groupId>

                        <artifactId>junit-jupiter-engine</artifactId>

                        <version>${junit-jupiter.version}</version>

                    </dependency>

                </dependencies>

            </plugin>

        </plugins>

    </build>


</project>


JUnit5를 이용해서 스프링 부트 테스트를 실행하는 예제 코드는 다음과 같다.


package boot2junit5;


import org.assertj.core.api.Assertions;

import org.junit.jupiter.api.Test;

import org.junit.jupiter.api.extension.ExtendWith;

import org.springframework.beans.factory.annotation.Autowired;

import org.springframework.boot.test.context.SpringBootTest;

import org.springframework.test.context.junit.jupiter.SpringExtension;


@ExtendWith(SpringExtension.class)

@SpringBootTest

public class ApplicationTest {


    @Autowired

    private HelloService helloService;


    @Test

    void hello() {

        Assertions.assertThat(helloService.hello("안녕")).isEqualTo("안녕");

    }


}


@ExtendWith 애노테이션은 JUnit5에서 확장 기능을 실행할 때 사용한다. SpringExtension은 JUnit5를 위한 스프링 확장 기능으로 스프링 연동 테스트를 실행할 수 있게 한다.


관련 링크


반응형
OKKY 세미나에서 발표한 자료.


반응형

스프링 부트 2.0에서 엑셀 다운로드 기능을 구현하는 방법을 정리했다.


pom.xml 설정


https://start.spring.io/ 사이트에서 스프링 부트 2.0.x 버전을 선택해서 프로젝트를 생성한다. Dependencies로는 Web과 Thymeleaf를 선택한다. 생성한 프로젝트 pom.xml 파일에 엑셀 생성을 위해 poi 의존을 추가한다.


<?xml version="1.0" encoding="UTF-8"?>

<project xmlns="http://maven.apache.org/POM/4.0.0"

         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"

         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 

              http://maven.apache.org/xsd/maven-4.0.0.xsd">

    <modelVersion>4.0.0</modelVersion>


    <groupId>madvirus</groupId>

    <artifactId>excel-download</artifactId>

    <version>0.0.1-SNAPSHOT</version>

    <packaging>jar</packaging>


    <name>excel-download</name>

    <description>Demo project for Spring Boot</description>


    <parent>

        <groupId>org.springframework.boot</groupId>

        <artifactId>spring-boot-starter-parent</artifactId>

        <version>2.0.1.RELEASE</version>

        <relativePath/> <!-- lookup parent from repository -->

    </parent>


    <properties>

        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>

        <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>

        <java.version>1.8</java.version>

    </properties>


    <dependencies>

        <dependency>

            <groupId>org.springframework.boot</groupId>

            <artifactId>spring-boot-starter-web</artifactId>

        </dependency>

        <dependency>

            <groupId>org.springframework.boot</groupId>

            <artifactId>spring-boot-starter-thymeleaf</artifactId>

        </dependency>


        <dependency>

            <groupId>org.apache.poi</groupId>

            <artifactId>poi</artifactId>

            <version>3.17</version>

        </dependency>


        <dependency>

            <groupId>org.springframework.boot</groupId>

            <artifactId>spring-boot-devtools</artifactId>

            <scope>runtime</scope>

        </dependency>

        <dependency>

            <groupId>org.springframework.boot</groupId>

            <artifactId>spring-boot-starter-test</artifactId>

            <scope>test</scope>

        </dependency>

    </dependencies>


    <build>

        <plugins>

            <plugin>

                <groupId>org.springframework.boot</groupId>

                <artifactId>spring-boot-maven-plugin</artifactId>

            </plugin>

        </plugins>

    </build>



</project>


엑셀 다운로드 위한 application.properties 파일 설정


확장자나 파라미터를 이용해서 엑셀 다운로드를 처리할 수 있도록 application.propertie 파일에 다음 설정을 추가한다.


spring.mvc.contentnegotiation.favor-parameter=true

spring.mvc.contentnegotiation.favor-path-extension=true

spring.mvc.contentnegotiation.media-types.xls=application/vnd.ms-excel


스프링 부트는 기본적으로 ContentNegotiationViewResolver를 사용하는데 각 프로퍼티는 다음을 설정한다.

  • favor-parameter: 이 값이 true면 ContentNegotiationViewResolver가 format 파라미터로 지정한 미디어 타입을 사용하도록 설정
  • favor-path-extension: 이 값이 true면 ContentNegotiationViewResolver가 확장자로 지정한 미디어 타입을 사용하도록 설정
  • media-types.타입: 타입에 해당하는 컨텐츠 타입을 지정

예를 들어 위 설정을 사용하면 다음 요청을 엑셀 타입(application/vnd.ms-excel) 요청으로 인지하고, 엑셀 미디어 타입에 해당하는 응답을 처리할 수 있는 뷰를 사용해서 응답을 생성한다.

  • stat.xls (확장자가 xls)
  • stat?format=xls (format 파라미터가 xls)

예제 컨트롤러


다음 코드는 일반 뷰와 엑셀 다운로드를 처리하는 컨트롤러 코드이다.


@Controller

public class StatController {

    private void populateModel(Model model) {

        List<StatRow> rows = Arrays.asList(

                new StatRow("고객1", 1000, 1500),

                new StatRow("고객2", 2000, 2500),

                new StatRow("고객3", 3000, 3500)

        );

        model.addAttribute("rows", rows);

    }


    @GetMapping("/stat")

    public String get(Model model) {

        populateModel(model);

        return "stat";

    }


    @GetMapping("/stat.xls")

    public String getExcelByExt(Model model) {

        populateModel(model);

        return "statXls";

    }


    @GetMapping(path = "/stat", params = "format=xls")

    public String getExcelByParam(Model model) {

        populateModel(model);

        return "statXls";

    }

}


get() 메서드는 일반 뷰를 사용해서 응답을 생성한다. getExcelByExt() 메서드는 확장자가 xls인 요청 경로를 처리하므로 "statXls"에 대응하는 뷰 중에서 엑셀 타입을 응답으로 생성할 수 있는 뷰를 선택한다. 비슷하게 getExcelByParam() 역시 format 파라미터가 xls인 요청을 처리하므로 엑셀 타입을 생성할 수 있는 뷰를 선택한다.


엑셀 생성을 위한 뷰 클래스


엑셀 다운로드를 위한 뷰 클래스는 다음과 같이 구현한다. 빈 객체 이름으로 "statxls"를 사용했는데 이 이름은 앞서 컨트롤러에서 리턴한 뷰 이름과 같다.


package exceldownload;


import org.apache.poi.ss.usermodel.*;

import org.springframework.stereotype.Component;

import org.springframework.web.servlet.view.document.AbstractXlsView;


import javax.servlet.http.HttpServletRequest;

import javax.servlet.http.HttpServletResponse;

import java.util.List;

import java.util.Map;


@Component("statXls")

public class StatXlsView extends AbstractXlsView {

    @Override

    protected void buildExcelDocument(

            Map<String, Object> model, Workbook workbook,

            HttpServletRequest request, HttpServletResponse response) throws Exception {

        response.setHeader("Content-Disposition", "attachment; filename=\"stat.xls\"");


        List<StatRow> stats = (List<StatRow>) model.get("rows");


        CellStyle numberCellStyle = workbook.createCellStyle();

        DataFormat numberDataFormat = workbook.createDataFormat();

        numberCellStyle.setDataFormat(numberDataFormat.getFormat("#,##0"));


        Sheet sheet = workbook.createSheet("mobilestat");

        for (int i = 0 ; i < stats.size() ; i++) {

            StatRow stat = stats.get(i);

            Row row = sheet.createRow(i);


            Cell cell0 = row.createCell(0);

            cell0.setCellValue(stat.getName());


            Cell cell1 = row.createCell(1);

            cell1.setCellType(CellType.NUMERIC);

            cell1.setCellValue(stat.getValue1());

            cell1.setCellStyle(numberCellStyle);


            Cell cell2 = row.createCell(2);

            cell2.setCellType(CellType.NUMERIC);

            cell2.setCellValue(stat.getValue2());

            cell2.setCellStyle(numberCellStyle);

        }

    }

}



타임리프 뷰 구현


타임리프트를 이용한 뷰 구현 파일인 stat.html은 다음과 같아 간단하게 구현했다. 엑셀 다운로드를 위한 링크를 추가했다.


<!DOCTYPE HTML>

<html xmlns:th="http://www.thymeleaf.org">

<head>

    <meta charset="utf-8" />

    <meta http-equiv="Content-Type" content="text/html; charset=UTF-8" />

    <title>통계</title>

</head>

<body>


<a href="stat.xls">엑셀다운, 확장자(stat.xls)</a> |

<a href="stat?format=xls">엑셀다운, 파라미터(stat?format=xls)</a>

<table border="1">

    <thead>

    <tr>

        <th>이름</th>

        <th>값1</th>

        <th>값2</th>

    </tr>

    </thead>

    <tbody>

    <tr th:each="row : ${rows}">

        <td th:text="${row.name}"></td>

        <td th:text="${#numbers.formatInteger(row.value1, 1, 'COMMA')}"></td>

        <td th:text="${#numbers.formatInteger(row.value2, 1, 'COMMA')}"></td>

    </tr>

    </tbody>

</table>


</body>

</html>


예제 실행


완전한 예제 프로젝트는 https://github.com/madvirus/excel-download 리포지토리에서 구할 수 있다. 명령 프롬프트에서 "mvnw spring-boot:run" 명령어를 부트 어플리케이션을 실행한 뒤에 http://localhost:8080/stat 주소에 연결해보자. 다음 결과를 볼 수 있다.



엑셀 다운로드 링크를 클릭해보자. 두 링크 중 아무거나 클릭하면 엑셀 파일을 다운로드 한다.



실제 다운로드한 파일을 열어보자. 아래와 같이 엑셀 파일이 올바르게 생성된 것을 확인할 수 있다.



반응형

스프링 부트(spring boot)를 사용한다면 타임리프(thymeleaf)의 식 객체(expression object)를 쉽게 확장할 수 있다. 먼저 식 객체를 생성해주는 타임리프 IExpressionObjectDialect를 구현한다. 이 클래스를 스프링 빈으로 등록해야 한다. 아래 예는 @Component를 붙여 컴포넌트 스캔 대상으로 설정했다.


import java.util.Collections;

import java.util.Set;


import org.springframework.stereotype.Component;

import org.thymeleaf.context.IExpressionContext;

import org.thymeleaf.dialect.AbstractDialect;

import org.thymeleaf.dialect.IExpressionObjectDialect;

import org.thymeleaf.expression.IExpressionObjectFactory;


@Component

public class MyFormatDialect extends AbstractDialect implements IExpressionObjectDialect {


    protected ScgFormatDialect() {

        super("myFormat");

    }


    @Override

    public IExpressionObjectFactory getExpressionObjectFactory() {

        return new IExpressionObjectFactory() {


            @Override

            public Set<String> getAllExpressionObjectNames() {

                return Collections.singleton("scgFormat");

            }


            @Override

            public Object buildObject(IExpressionContext context, String expressionObjectName) {

                return new MyFormat();

            }


            @Override

            public boolean isCacheable(String expressionObjectName) {

                return true;

            }

        };

    }


}


생성자에서는 식 객체의 이름을 "myFormat"으로 지정한다.

getExpressionObjectFactory() 메서드는 IExpressionObjectFactory 객체를 리턴한다. 이 객체의 buildObject() 메서드가 생성하는 객체가 식 객체가 된다. 이 객체는 타임리프 식에서 사용할 메서드를 제공한다. 다음은 식 객체로 사용할 클래스의 구현 예이다.


public class MyFormat {


    public String date(String date) {

        if (!StringUtils.hasText(date))

            return null;

        if (date.length() == 8) {

            return date.substring(0, 4) + "-" + date.substring(4, 6) + "-" + date.substring(6, 8);

        } else {

            return date;

        }

    }


    public String contractNum(String contractNum) {

        if (!StringUtils.hasText(contractNum))

            return null;

        if (contractNum.length() > 5) {

            return contractNum.substring(0, 5) + "-" + contractNum.substring(5);

        } else {

            return contractNum;

        }

    }


    public String phone(String phone) {

        if (!StringUtils.hasText(phone))

            return null;

        if (phone.length() == 11) {

            return phone.substring(0, 3) + "-" + phone.substring(3, 7) + "-" + phone.substring(7);

        } else if (phone.length() == 10) {

            return phone.substring(0, 3) + "-" + phone.substring(3, 6) + "-" + phone.substring(6);

        } else {

            return phone;

        }

    }

}


이제 커스텀 식 객체를 타임리프 식에서 사용하면 된다.


<td th:text="${#myFormat.phone(item.handphone)}"></td>

<td th:text="${#myFormat.contractNum(item.useContractNum)}"></td>




반응형

Why Reactive? 책 요약(책은 여기에서 다운로드 가능)


1장. 소개


시스템은 다음과 같아야 함

  • 사용자에게 응답해야 함
  • 실패를 다루고 사용불능 상태에서도 동작해야 함
  • 다양한 부하 상황에서 견뎌야 함
  • 다양한 네트워크 상황에서 메시지를 보내고, 받고, 전달할 수 있어야 함
이를 위해 다음 필요
  • 하드웨어 자원 활용 제어 -> 응답성
  • 구성 요소를 분리 -> 독립적인 확장
  • 시스템 간 비동기 통신 ->  복원력, 탄력성(resilience)
리액티브 시스템의 이점이 여기에 있음. 

왜 지금인가?

다음 몇 가지 움직임이 리액티브 프로그래밍이 뜨는데 일조함
  • IoT와 모바일 : 서버가 동시에 수 백만의 연결된 장치를 처리해야 함 -> 병행과 분산 어플리케이션에 대한 요구가 급격히 증가
  • 클라우드, 컨테이너 : 경량의 가상화와 컨테이너 기술로 세밀한 범위로 더 빠르게 배포 가능 -> 더 편하게 할 수 있는 도구
과거부터 존재한 개념이 요즘 뜨는 이유
  1. 더 나은 자원 활용과 확정성에 대한 필요성 증가. 이를 위한 도구가 사용 가능해짐
  2. 모든 구현체가 상호운영할 수 있는 표준으로 특정 구현체에 얽매이는 위험 감소

2장. 어플리케이션 수준에서의 리액티브


리액티브와 비동기
  • 리액티브 프로그래밍에 있어 핵심 요소 중 하나는 태스크를 비동기로 실행할 수 있는 것
  • 리액티브 라이브러리, 구현의 공통 테마는 어떤 종류의 쓰레드 풀에 기반한 이벤트 루프나 공유 디스패처 인프라를 사용하는 것. 저렴한 구성 요소 간에 비싼 자원(thread)을 공유함으로써 단일 어플리케이션을 멀티 코어로 확장할 수 있음
  • 이런 멀티플렉싱 기술은 단일 장비로 백 만 엔티티를 다룰 수 있게 함
    • 쓰레드를 직접 사용하면, 사용자 당 액터를 만들 수 없고, 너무 무거워서 빨리 할 수 없고, 또한 쓰레드를 직접 다루는 게 간단하지 않으며, 프로그램이 다른 쓰레드 간 데이터 동기화 코드로 빠르게 지배당함
  • 비동기에서 주의할 점은 블로킹임. 파일, 네트워크 IO와 같은 블로킹 연산은 CPU 자원을 사용하지 않는데, 쓰레드가 블로킹되면서 다른 액터(또는 작업)을 실행하지 못하게 됨
    • 블로킹을 별도 쓰레드 풀로 분리해서 실행 -> 메인 이벤트 루프나 디스패처를 블록하면 안 됨
USL(Universal Scalability Law)
  • 통신 비용, 데이터 동기화 비용까지 고려
  • 적정 지점 이상으로 시스템 자원을 사용하게 밀어 붙이면, 더 이상 속도가 빨리지 않을 뿐 아니라, 전체 처리량에 부정적인 영향을 줌(백그라우드에서 일어나는 모든 종류의 조정 때문에 발생, 예 네트워크나 메모리 포화로 이런 조정 발생 가능)
  • 다양한 자원에 대한 경쟁이 발생할 수 있고, 과사용 문제는 CPU부터 네트워크까지 적용
역압(backpressure)
  • 최적 사용 상태 유지를 위한 역압 사용
    • 동기 API는 블록 연산으로 절로 역압이 된다
    • 비동기 API는 최대 성능을 낼 수 있는데, 이는 느린 하류 스트림이나 다른 어플리케이션을 압도할 수 있음
    • 리액티브 스트림 스펙은 스트림 라이브러리 사용자에게 투명하게 역압을 적용할 수 있도록 요구
    • 비동기 + 역압 -> 시스템의 한계까지 push할 수 있으나, 한계를 넘지는 않게 할 수 있음
  • 스트리밍 API와 제한된 메모리
    • 스트리밍의 두 면 : 소비 API, 생성 API
    • 스트리밍 API와 메모리에 제한된 처리 : 필요 이상 데이터를 메모리에 적재하지 않게 해 줌 -> 이는 용량 설계에 유용한 속성
    • 하류가 느릴 때 버퍼를 두어 하류에서 데이터를 처리할 수 있게 함
    • 무한정 버퍼에 담을 수 없으므로 제한된 크기의 버퍼 사용
    • 버퍼에 적재된 메시지의 비율로 지표 측정 가능(클라이언트가 얼마나 느린지, 서비스 레벨, 노드 활용 등)
    • 큐 버퍼를 모니터링해서 이벤트 발생 가능(예, 다 차면 클라이언트를 끊거나 오래된 메시지 삭제)

3장, 시스템 수준에서의 리액티브


분산 시스템과 메시징
  • 요청-응답 API의 문제 : 개발자가 할 수 있는 것을 제한
    • 예, HTTP는 이벤트 스트림을 받으려면 일부 hack이 필요, HTTP로 Job을 제출하고 제출 완료를 polling해야 함
  • 메시지 기반 시스템(akka 등)을 사용하면 어렵지 않게 할 수 있음
    • 예, Job 제출 세미지를 fire-and-forget으로 보내고 진행 상태를 메시지로 받으면 됨
    • 메시지를 받을 수 있는 addressable 엔티티만 있으면 됨
    • 요점: 매우 작음. 다른 시스템과 리액티브하게 상호작용하기 위한 비용 작음 (연결된 커넥션 X, 라운드 트림 X, 폴링 X)
    • HTTP로 메시징 같은 API를 설계 하려면 많은 고민(블로킹, 웹소켓, 폴링, SSE 선택) 필요하고 확장성도 떨어짐
부하에 살아남고 요금 아끼기
  • 최근 추세 : 컨테이너, 클러스터 오케스트레이션 -> 유연함(elastic)이 중요해짐
    • 스케일 up/out 만큼 scale down도 중요
    • 부하에 맞게 확장/축소해서 적정 비용 유지
    • 사전(proactively) 확정 : 언제 확장할지 결정할 수 있는 기술
    • 리액티브 서비스는 서비스 확장을 위한 성공 요인
탄력성(resilience)이 없으면 다른 것은 아무것도 아님
  • 리액티브의 여러 특징(성능, 확장성, 메시징 패턴 등) 중 가장 중요한 것은 탄력성
  • 시스템에 문제가 있을 때 완전히 불능 상태가 되면 제 아무리 빠르고 확장 가능한 시스템도 의미 없음
  • 공통의 비싼 자원에 접근하면, 그 자원이 단일 실패 지점이 될 수 있음
  • 리액티브 시스템은 "own your own data" 패턴을 따른다. 시스템이 주변에 대한 상태를 내부에 보관한다면 외부 정지 상황에서 살아남을 수 있음
4장, 리액티브 시스템의 빌딩 블록

  • 모든 것이 비동기인 경우는 흔치 않으므로, 점진적으로 리액티브 아키텍처로 넘어가기 위한 생산적인 방법 필요(예, Strangler Pattern: 점진적으로 블로킹/동기 앱을 비동기 앱으로 교체하는 패턴)
  • 리액티브로 넘어가기 전에 리액티브에 대해 고민하고 원칙을 이해하는 것이 중요 (단순히 기술만 추종하면 안 됨. 애자일에 대한 이해없이 애자일을 도입하고선 뭔가 잘 안 될 때 애자일을 탓하는 것과 동일한 상황이 발생할 수 있음)


반응형

"A Journey into Reactive Streams" 글 요약(원문은 여기 참고, 번역글은 여기 참고)


스트림

  • 스트림은 시간이 지남에 따라 생성되는 일련의 요소들
  • 시작과 끝이 없을 수 있음
스트림과 배열의 차이
  • 스트림 처리 시스템에서 모든 스트림 요소를 접근할 수는 없음
  • 요소가 다른 속도로 발생할 뿐만 아니라 모든 요소를 처리할 거라 보장할 수 없음
  • 스트림이 아직 존재하지 않을 수도 있고, 끝 개념이 없음

리액티브 스트림


리액티브 스트림 목표

  • 비즈니스 개발자가 스트림 처리를 위한 저수준 작업에 신경쓰지 않도록 추상화 수준을 높이는 것
스트림 처리 영역의 도전 과제를 해결하기 위해
  1. 무한 버퍼 없이, 소비 비율에 따라 리액티브(push)와 인터랙티브(pull) 모델을 자동 전환
  2. 라이브러리, 시스템, 네트워크, 프로세스 간 상호운영
비동기성
  • 컴퓨팅 자원을 병렬 사용하기 위해 필요 (동기 블로킹 호출 -> 자원 활용의 악!)
  • 비동기 경계 개념이 스펙 핵심에 위치
    • 비동기 경계 -> 시스템 컴포넌트 결합 제거, 시간에 대한 결합 제거
역압(back pressure)
  • 스트림 발생과 구독자 간의 발생/처리 속도 차 발생
    • 스트림 참영자가 흐름 제어에 참여해서 꾸준하게 운영 상태를 유지하고 매끄럽게 저하시킬 수 있게 함으로써 탄력성을 제공한다.
  • 데이터 양방향 흐름 제어
    • 구독자가 퍼블리셔에 demand 신호
    • 퍼블리셔는 안전하게 요청한 개수의 요소 제공
    • demand는 비동기로 요청 -> 자원 낭비 제거
    • demand는 그 다음 상류까지 전파: 전체 흐름에서 역압은 과부하에서 응답할 수 있는 기회를 줌
  • 리액티브 시스템은 사실상 push 기반
    • 구독이 느리고 퍼블리셔가 빠르면 pull처럼 동작
    • demand(1) = pull
  • 주요 가치 : 모든 호환 라이브러리 간 탄력성


반응형

What is Reactive Programming? 글 요약 (원문은 여기 참고)


원문 제목은 리액티브 프로그래밍이나 실제 내용은 리액티브 시스템에 대한 내용을 담고 있음.


개발자가 직면한 것

  • 1) H/W 발전, 2) 인터넷(엄청난 트래픽!)
  • SW의 중요성, 기대 수준, 규모

문제 상황(grey sky)을 대비하지 않으면, 매우 높은 대가를 치를 수 있다.


리액티브 시스템으로 이런 문제를 해결


리액티브 시스템 4가지 원칙


[출처: https://www.reactivemanifesto.org/]

  • 응답성(Responsive) : 사용자에게 일관된 긍정적인 경험을 제공하기 위해 상황에 상관없이 모든 사용자에게 빠르게 응답
  • 유연성(Elastic) : 다양한 부하 상황에서 응답
  • 탄력성(Resilient) : 실패 상황에서 응답
  • 메시지 구동(Message Driven) : 시간, 공간에 대한 커플링 제거(비동기 경계)

탄력성

  • 오늘날 어플리케이션은 다양한 상황에서 응답성을 유지하기 위해 회복 탄력성을 가져야 한다.
  • 성능, 내구성, 보안 등 모든 측면이 탄력성 필요
메시지 구동에 기반한 탄력성

메시지 구동 아키텍처로 얻을 수 있는 것: 견고한 에러 처리와 내고장성
  • 격리 : 시스템이 자가 회복하기 위해 필요. 격리된 컴포넌트의 실패는 전체 시스템의 응답성에 영향을 주지 않음. 실패한 컴포넌트 또한 회복할 기회를 가짐
  • 위치 투명성 : 같은 VM의 프로세스처럼 다른 노드의 다른 프로세스와 상호작용할 수 있다.
  • 전용 에러 채널 : 호출한 곳에 에러를 던지는 것 외에 다른 어딘가로 에러 신호를 리다이렉트하는 것을 허용
유연성 또는 확장성
  • 다양항 부하 상황에서 응답할 수 있도록 시스템을 쉽게 확대/축소할 수 있어야 함
  • 패러다임 먼저
    • 병행 처리에서 쓰레드 기반은 한계를 가짐
      • 공유 가변 상태, 요청 당 쓰레드, 변이 상태 동시 접근 : 성능, 확장 임계점에 빠르게 도달
      • 쓰레드 안정성을 위한 복잡함,어려움 증가

메시지 구동 2가지 : 이벤트 방식과 액터 기반


이벤트 방식

  • 0개 이상 옵저버가 모니터링하는 이벤트에 기반
  • 호출자가 옵저버 응답을 기다리지 않음
  • 이벤트는 특정 수신자(줏소)를 직접 지정하지 않음
액터 기반
  • 메시지 전달 아키텍처 확장
  • 메시지를 전달할 수취인 지정
  • 메시지를 쓰레드 경계 간에 또는 다른 서버 액터에 전달 가능

액터 장점

  • 네트워크 경계를 넘어 연산 확장 쉬움
  • 액터에 메시지를 직접 보내서 콜백지옥 없어짐(액터 간 메시지 흐름만 생각하면 됨)
  • 컴포넌트 간 결합도 낮춤


반응형

리액티브 관련 자주 나오는 용어 정리


동기, 비동기


 용어

동기(synchronous)

비동기(asynchronous)

 설명

한 프로세스(쓰레드)가 작업을 순차 실행

다른 프로세스(쓰레드)로 작업을 실행

 비고

작업 완료 여부를 호출하는 곳에서 처리

작업 완료 여부를 호출된 곳에서 처리 

동시에 실행할 수도 있고 지금이 아닌 미래 시점에 실행할 수도 있음


블록, 논블록


 용어

블록(block)

논블록(non-block)

 설명

작업 실행이 끝날 때가지 쓰레드가 다른 작업을 하지 못하고 대기

작업 완료를 기다리지 않고 진행

 비고

호출된 곳에서 작업 완료 후 리턴

호출된 곳에서 바로 리턴


병행, 병렬


 용어

병행(concurrency)

병렬(parallelism)

설명

주어진 시점에 두 개 이상의 작업 진행

주어진 시점에 두 개 이상의 작업이 동시 발생

  • 병렬 처리를 위한 서로 다른 처리 장치(CPU) 필요 (병행처리는 필수 아님)
  • 병렬은 병행을 내포

비고

모듈화, 응답성, 유지보수성 중요

(프로그램 속성)


주요 관심

  • 언제 실행 시작
  • 정보 교환 방식
  • 공유 자원 관리

효율이 주요 관심

(머신 속성)


주요 관심

더 빨리 계산하기 위해

- 큰 문제를 작은 문제로 나누는 방법 고민

- 병렬 HW 사용 최적화

 




+ Recent posts