주요글: 도커 시작하기
반응형

리액티브 스트림즈, Flux, Mono

스프링은 웹 요청 처리, HTTP 클라이언트, NoSQL 연동 등 많은 영역에서 리액티프 프로그래밍을 지원하고 있다. 스프링 리액터는 스프링에서 리액티브 프로그래밍을 위한 핵심 모듈이다. 리액터를 잘 사용하려면 많은 것들을 알아야 하지만 가장 기본이 되는 두 타입인 Flux와 Mono에 대해 알아야 한다. 이 글에서는 리액티브 프로그래밍의 핵심 개념인 스프림에 대해 살펴보고 Flux와 Mono의 기본적인 사용법을 살펴본다.


왜 리액티브인가?

서버 관점에서 리액티브를 사용하는 이유 중 하나는 비동기/논블록을 이용해서 더 적은 자원으로 더 많은 트래픽을 처리하기 위함이다. 관련 내용은 "왜 리액티브인가 요약" 글을 참고한다.


리액티브 스트림

리액티브 스트림즈(http://www.reactive-streams.org/)는 비동기 스트림 처리를 위한 표준이다. 스프링 리액터는 이를 구현한 라이브러리이며 자바9의 Flow API도 리액티브 스트림 API를 따르고 있다. 


스트림은 시간이 지남에 따라 생성되는 일련의 데이터/이벤트(event)/신호(signal)이다. 맥락에 따라 데이터, 이벤트, 신호라는 용어를 사요한다. 이 글에서도 필요에 따라 이들 용어를 혼용해서 사용할 것이다. 리액티브 스트림즈 스펙에서는 용어로 신호를 사용한다. 리액티브 스트림즈는 다음 세 신호를 발생할 수 있다.

  • onNext* (onComplete | onError)?

스트림은 0개 이상의 next 신호를 발생할 수 있다. next 신호는 데이터를 담는다. complete 신호는 스트림이 끝났음을 의미하며 error 신호는 에러가 발생했음을 의미한다. complete와 error는 둘 중 하나만 발생할 수 있으며, 이 두 신호는 발생하지 않을 수도 있다. 


스트림의 예로 1분 간격 현재 기온 스트림을 들 수 있다. 이 데이터는 개념적으로 complete나 error 없이 next 신호만 1분 간격으로 발생한다. 파일 스트림은 파일을 읽는 동안 데이터를 담은 next 신호를 발생하고 파일을 다 읽으면 compelete 신호를 발생한다. 파일을 읽는 도중 에러가 발생하면 error 신호를 발생한다.


리액티브 스트림즈는 Publisher를 이용해서 스트림을 정의하며 Subscriber를 이용해서 발생한 신호를 처리한다. Subscriber가 Publisher로부터 신호를 받는 것을 구독이라고 한다. 다음 코드는 스프링 리액터가 제공하는 Publisher의 한 종류인 Flux에 대해 구독하는 코드 예를 보여준다.


Flux<Integer> seq = Flux.just(1, 2, 3); // Integer 값을 발생하는 Flux 생성


seq.subscribe(value -> System.out.println("데이터 : " + value)); // 구독


리액터는 스트림이라는 용어 대신 시퀀스라는 용어를 주로 사용한다. 위 코드에서 변수 이름이 seq인 이유는 시퀀스를 의미하기 위함이다. 첫 줄은 1, 2, 3 값을 차례대로 발생하는 Flux를 생성한다.


실제 값 발생은 구독(subscription) 시점에 이뤄진다. 위 코드는 Flux#subscribe(Consumer) 메서드를 이용해서 구독한다. 이 경우 Flux가 발생한 신호를 Consumer가 받아서 처리한다. 위 코드는 수신한 데이터를 콘솔에 출력하므로 위 코드를 실행하면 다음과 같은 결과가 출력된다.


데이터 : 1

데이터 : 2

데이터 : 3


물론 이렇게 단순한 작업을 하기 위해 리액티브 프로그래밍을 사용하는 것은 아니다. 스케줄링, 다양한 조합 기능을 이용해서 이전보다 더 간결하면서도 자원을 효율적으로 사용하는 코드를 작성할 수 있다.


[노트]

리액티브 스트림즈는 스트림이라는 표현을 사용하지만 이는 자바 8의 스트림과 혼동할 수 있다. 이런 이유로 "리액티브 스트림즈" 자체를 표현할 때가 아니면 스트림 대신 시퀀스라는 용어를 사용하겠다.


리액터 사용 위한 메이븐 설정

리액터는 reactor-core, reactor-netty, reactor-extra, reactor-adapter 등 다양한 모듈로 구성되어 있다. 각 모듈의 버전을 맞추기 위해 리액터는 메이븐 BOM(Bill Of Materials)을 제공한다. 이 글에서는 reactor-core 의존만 사용하긴 하지만 BOM을 포함한 의존 설정을 사용해보자. 메이븐 의존 설정은 다음과 같다.


<dependencyManagement>

    <dependencies>

        <dependency>

            <groupId>io.projectreactor</groupId>

            <artifactId>reactor-bom</artifactId>

            <version>Bismuth-SR9</version>

            <type>pom</type>

            <scope>import</scope>

        </dependency>

    </dependencies>

</dependencyManagement>


<dependencies>

    <dependency>

        <groupId>io.projectreactor</groupId>

        <artifactId>reactor-core</artifactId>

    </dependency>


    <dependency>

        <groupId>org.slf4j</groupId>

        <artifactId>slf4j-api</artifactId>

        <version>1.7.12</version>

    </dependency>

    <dependency>

        <groupId>ch.qos.logback</groupId>

        <artifactId>logback-classic</artifactId>

        <version>1.2.3</version>

    </dependency>

</dependencies>


reactor-bom의 Bismuth 버전은 스프링 리액터 3.1 버전을 정의한다. Bismuth-SR9 버전은 reactor-core 3.1.7.RELEASE를 기준으로 한다.


스프링 리액터의 Publisher: Flux와 Mono

스프링 리액터는 Flux와 Mono의 두 가지 Publisher를 제공하고 있다. 이 두 타입은 발생할 수 있는 데이터 개수에 차이가 있다. Flux는 0개 이상의 데이터를 발생할 수 있고 Mono는 0 또는 1개의 데이터를 발생할 수 있다.


앞서 Publisher는 next, complete, error 신호를 발생할 수 있다고 했다. Flux는 0개 이상의 데이터를 발생하므로 0개 이상의 next 신호를 발생할 수 있고 complete나 error 신호를 발생하거나 발생하지 않을 수 있다. 예를 들어 다음 코드를 보자.


Flux.just(1, 2, 3);


이 코드에서 seq 시퀀스는 1, 2, 3을 값으로 갖는 세 개의 next 신호를 발생하고 마지막에 complete 신호를 발생해서 시퀀스를 끝낸다. 즉 시간 순으로 표시하면 다음과 같이 시퀀스가 발생한다('--->'는 시간축, '|'는 complete 신호 의미).


--1-2-3-|-->


아래 코드와 같이 아무 값도 발생하지 않는 시퀀스는 complete 신호만 발생한다.


Flux.just(); // --|-->


Mono도 유사하다. 차이라면 최대 발생할 수 있는 값이 1개라는 점이다.


Mono.just(1); // --1-|-->

Mono.empty(); // --|-->


just() 메서드는 이미 존재하는 값을 사용해서 Flux/Mono를 생성할 때 사용된다. just() 외에 create(), generate()를 이용해서 생성할 수 있는데 이에 대한 내용은 나중에 정리해보겠다.


[노트]

Flux와 Mono를 직접 생성하기보다는 다른 라이브러리가 제공하는 Flux와 Mono를 사용할 때가 많다. 예를 들어 스프링 5 버전에 추가된 WebClient 클래스를 사용할 때에는 WebClient가 생성하는 Mono를 이용해서 데이터를 처리한다.


구독과 신호 발생

시퀀스는 바로 신호를 발생하지 않는다. 구독을 하는 시점에 신호를 발생하기 시작한다. 코드로 확인해보자. 먼저 다음 코드를 보자.


Flux.just(1, 2, 3)

     .doOnNext(i -> System.out.println("doOnNext: " + i))

     .subscribe(i -> System.out.println("Received: " + i));


위 코드에서 doOnNext() 메서드는 Flux가 Subscriber에 next 신호를 발생할 때 불린다. 실행 결과는 다음과 같다.


doOnNext: 1

Received: 1

doOnNext: 2

Received: 2

doOnNext: 3

Received: 3


이제 코드를 다음과 같이 바꾸고 다시 실행해보자.


Flux<Integer> seq = Flux.just(1, 2, 3)

        .doOnNext(i -> System.out.println("doOnNext: " + i));


System.out.println("시퀀스 생성");

seq.subscribe(i -> System.out.println("Received: " + i));


실행 결과는 다음과 같다.


시퀀스 생성

doOnNext: 1

Received: 1

doOnNext: 2

Received: 2

doOnNext: 3

Received: 3


이 결과를 보면 시퀀스를 생성한 시점에는 doOnNext에 전달한 함수가 실행되지 않는 것을 알 수 있다. doOnNext에 전달한 함수는 next 신호를 발생할 때 호출되기 때문이다. subscribe()를 실행해서 구독을 한 이후에 doOnNext에 전달한 코드가 실행되는데 이는 subscribe() 시점에 신호를 발생하기 시작한다는 것을 보여준다.


콜드 시퀀스 vs 핫 시퀀스

시퀀스는 구독 시점부터 데이터를 새로 생성하는 콜드(cold) 시퀀스와 구독자 수에 상관없이 데이터를 생성하는 핫(hot) 시퀀스로 나뉜다.


앞 예제 Flux.just()로 생성한 시퀀스가 콜드 시퀀스이다. 콜드 시퀀스는 구독을 하지 않으면 데이터를 생성하지 않는다. 구독을 하면 그 시점에 데이터를 새롭게 발생한다. 다음 코드는 이런 특징을 보여준다.


Flux<Integer> seq = Flux.just(1, 2, 3);

seq.subscribe(v -> System.out.println("구독1: " + v)); // 구독

seq.subscribe(v -> System.out.println("구독2: " + v)); // 구독


이 코드는 seq 시퀀스에 대해 구독을 두 번한다. 코드 결과는 다음과 같은데 이 결과를 보면 seq 시퀀스는 각 구독마다 데이터를 새롭게 생성하는 것을 알 수 있다.


구독1: 1

구독1: 2

구독1: 3

구독2: 1

구독2: 2

구독2: 3


콜드 시퀀스의 예로 API 호출을 들 수 있다. API 호출 시퀀스는 구독을 할 때마다 매번 새로운 요청을 서버에 전송하고 결과를 받는다.


핫 시퀀스는 구독 여부에 상관없이 데이터가 생성된다. 구독을 하면 구독한 시점 이후에 발생하는 데이터부터 신호를 받는다. 핫 시퀀스 예로 센서 데이터를 들 수 있다. 센서 데이터를 제공하는 시퀀스를 구독하면 그 시점 이후에 센서가 발생한 데이터부터 받게 된다.


Subscriber와 Subscription

앞 코드 예에서 다음의 subscribe() 메서드를 사용해서 구독을 했다.


// Flux나 Mono

subscribe(Consumer<? super T> consumer)


Consumer를 파라미터로 갖는 subscribe() 메서드는 리액터가 편의를 위해 제공하는 메서드로서 이 메서드는 내부적으로 Subscriber를 인자로 받는 subscribe() 메서드를 실행한다.


Subscriber는 리액티브 스트림즈에 포함된 인터페이스로 다음과 같이 정의되어 있다.


package org.reactivestreams;


public interface Subscriber<T> {

    void onSubscribe(Subscription s);

    void onNext(T t);

    void onError(Throwable t);

    void onComplete();

}


각 메서드는 다음과 같다.

  • onSubscribe(Subscription s): 구독을 하면 Publisher와 연동된 Subscription을 받는다. 전달받은 Subscription을 이용해서 Publisher에 데이터를 요청한다.
  • onNext(T t): Publisher가 next 신호를 보내면 호출된다.
  • onError(Throwable t): Publisher가 error 신호를 보내면 호출된다.
  • onComplete(): Publisher가 complete 신호를 보내면 호출된다.

각 메서드가 어떻게 동작하는지 다음 예제 코드로 알아보자.


Flux<Integer> seq = Flux.just(1, 2, 3);


seq.subscribe(new Subscriber<>() {

    private Subscription subscription;

    @Override

    public void onSubscribe(Subscription s) {

        System.out.println("Subscriber.onSubscribe");

        this.subscription = s;

        this.subscription.request(1); // Publisher에 데이터 요청

    }


    @Override

    public void onNext(Integer i) {

        System.out.println("Subscriber.onNext: " + i);

        this.subscription.request(1); // Publisher에 데이터 요청

    }


    @Override

    public void onError(Throwable t) {

        System.out.println("Subscriber.onError: " + t.getMessage());

    }


    @Override

    public void onComplete() {

        System.out.println("Subscriber.onComplete");

    }

});


subscribe() 메서드에 전달한 임의 Subscriber 객체의 onSubscribe() 메서드는 인자로 전달받은 Subscription 객체를 필드에 저장한다. Subscription은 구독 라이프사이클을 관리한다. 예를 들어 Subscription#request() 메서드는 Publisher에 데이터 요청 신호를 보낸다. 위 코드는 request(1)을 실행했는데 이는 1개의 데이터를 요청한다는 것을 의미한다. 즉 onSubscribe() 메서드는 파라미터로 전달받은 Subscription을 이용해서 Publisher에 1개의 데이터를 요청한다.

Publisher가 데이터 신호(next 신호)를 보내면 Subscriber#onNext() 메서드가 불린다. 위 예제에서는 전달받은 데이터를 출력하고 Subscription#request()를 이용해서 다음 데이터 1개를 요청한다. 즉 이 코드는 최초 구독 시점에 데이터 1개를 요청하고(onSubscribe) 이후 한 개의 데이터를 받으면 다시 한 개의 데이터를 요청한다(onNext).

실제 위 코드를 실행하면 다음 내용이 콘솔에 출력된다.

Subscriber.onSubscribe
Subscriber.onNext: 1
Subscriber.onNext: 2
Subscriber.onNext: 3
Subscriber.onComplete

이번엔 onNext() 메서드에서 다음처럼 subscription.request(1) 코드를 주석처리하고 다시 실행해보자.

Flux<Integer> seq = Flux.just(1, 2, 3);
seq.subscribe(new Subscriber<>() {
    ...생략

    @Override
    public void onNext(Integer i) {
        System.out.println("Subscriber.onNext: " + i);
        // this.subscription.request(1);
    }

    ...생략
});

결과는 다음과 같다.

Subscriber.onSubscribe
Subscriber.onNext: 1

onSubscribe()에서만 1개의 데이터를 요청하고 onNext()에서는 어떤 요청도 하지 않아 Publisher가 1개 데이터만 발생한 것을 알 수 있다.

request(Long.MAX_VALUE)를 사용하면 개수 제한없는 데이터 요청 신호를 Publisher에 보낸다. Publisher는 이 신호를 받으면 끝까지 데이터를 발생시킨다. 이를 다음과 같이 변경해보자.

Flux<Integer> seq = Flux.just(1, 2, 3);
seq.subscribe(new Subscriber<>() {
    private Subscription subscription;
    @Override
    public void onSubscribe(Subscription s) {
        System.out.println("Subscriber.onSubscribe");
        this.subscription = s;
        this.subscription.request(Long.MAX_VALUE);
    }

    @Override
    public void onNext(Integer i) {
        System.out.println("Subscriber.onNext: " + i);
    }

    ...생략
});

위 코드를 실행하면 Publisher가 모든 데이터를 발생한 것을 확인할 수 있다.

푸시 모델 vs 풀 모델

Subscription#request()는 Subscriber가 데이터를 처리할 수 있을 때 Publisher에게 데이터를 요청하는 풀(pull) 모델이다. 하지만 request(Long.MAX_VALUE)로 요청하면 Publisher는 개수 제한 없이 Subscriber에 데이터를 전송한다. 이는 완전한 푸시(push) 모델이다. 또 request(100000)을 사용하면 십 만 개의 데이터를 요청하고, Publisher는 발생한 데이터가 십 만 개가 될 때까지 신호를 보낸다. 데이터 요청은 풀 모델로 이루어졌지만 10만 개의 데이터를 전송하는 동안은 실질적으로 푸시 모델과 같다.

subscribe() 메서드

다음은 리액터가 제공하는 subscribe() 메서드이다.
  • subscribe()
  • subscribe(Consumer<? super T> consumer)
  • subscribe(Consumer<? super T> consumer,
                 Consumer<? super Throwable> errorConsumer)
  • subscribe(Consumer<? super T> consumer,
                 Consumer<? super Throwable> errorConsumer,
                 Runnable completeConsumer)
  • subscribe(Consumer<? super T> consumer,
                 Consumer<? super Throwable> errorConsumer,
                 Runnable completeConsumer,
                 Consumer<? super Subscription> subscriptionConsumer)
  • subscribe(Subscriber<? super T> actual)
  • subscribe(CoreSubscriber<? super T> actual)

메서드의 각 파라미터는 다음과 같다.

  • consumer: next 신호 처리
  • errorConsumer: error 신호 처리
  • completeConsumer: complete 신호 처리
  • subscriptionConsumer: Subscriber의 onSubscribe 메서드에 대응
  • actual: Subscriber나 CoreSubscriver 타입

관련글


+ Recent posts