저작권 안내: 저작권자표시 Yes 상업적이용 No 컨텐츠변경 No

스프링5 입문

JSP 2.3

JPA 입문

DDD Start

인프런 객체 지향 입문 강의

시퀀스 생성 1 :just(), generate()

[노트]

시퀀스를 직접 생성할 일이 많지는 않다. 보통은 라이브러리가 제공하는 기능을 사용하기 때문이다. 그럼에도 불구하고 시퀀스 생성 방법을 정리한 이유는 리액터 시퀀스를 생성하는 방법을 살펴보면 리액티브의 동작을 이해하는데 도움이 되기 때문이다.


Flux.just(), Mono.just()로 만들기

시퀀스를 생성하는 가장 쉬운 방법은 Flux.just()를 사용하는 것이다. just() 메서드는 시퀀스로 사용할 데이터가 이미 존재할 때 사용한다. 다음은 사용 예이다.


Flux<Integer> seq = Flux.just(1, 2, 3);


이 Flux는 1, 2, 3 데이터를 차례대로 발생하고 complete 신호를 발생한다. just() 메서드는 가변 인자로 0개 이상의 데이터를 전달할 수 있다. 아래와 같이 발생할 데이터를 주지 않으면 complete 신호만 발생한다.


Flux<Integer> seq = Flux.just();


Mono.just()도 동일하다. 차이라면 Mono는 1개 값만 생성하므로 데이터도 한 개만 받는다는 것이다.


Mono<Integer> seq = Mono.just(1);


Mono.just(null)과 같이 null을 값으로 주면 NullPointerException이 발생한다. 데이터를 발생하지 않는 Mono를 생성하고 싶다면 Mono.empty()를 사용한다.


값이 있을 수도 있고 없을 수도 있는 Mono를 생성할 때에는 justOrEmpty() 메서드를 사용하면 된다. 다음은 사용 예이다.


// null을 값으로 받으면 값이 없는 Mono

Mono<Integer> seq1 = Mono.justOrEmpty(null); // complete 신호

Mono<Integer> seq2 = Mono.justOrEmpty(1); // next(1) 신호- complete 신호


// Optional을 값으로 받음

Mono<Integer> seq3 = Mono.justOrEmpty(Optional.empty()); // complete 신호

Mono<Integer> seq4 = Mono.justOrEmpty(Optional.of(1)); // next(1) 신호 - complete 신호


Flux.range()로 정수 생성하기

Flux.range() 메서드를 사용하면 순차적으로 증가하는 Integer를 생성하는 Flux를 생성할 수 있다. 예를 들어 다음 코드는 11부터 시작해서 5개의 Integer를 생성하는 Flux 시퀀스를 생성한다. 즉 11부터 15까지의 Integer를 생성한다.


Flux<Integer> seq = Flux.range(11, 5);


Flux.generate() 메서드로 Flux 만들기

Flux.generate() 메서드를 사용하면 데이터를 함수를 이용해서 생성할 수 있다. Flux.generate() 함수는 동기 방식으로 한 번에 1개의 데이터를 생성할 때 사용한다. Flux.generate() 메서드 중 하나는 다음과 같다.

  • Flux<T> generate(Consumer<SynchronousSink<T>> generator)
generator는 Subscriber로부터 요청이 왔을 때 신호를 생성한다. generate()가 생성한 Flux는 다음과 같은 방식으로 신호를 발생한다.
  • Subscriber의 요청에 대해 인자로 전달받은 generator를 실행한다. generator를 실행할 때 인자로 SynchronousSink를 전달한다.
  • generator는 전달받은 SynchronousSink를 사용해서 next, complete, error 신호를 발생한다. 한 번에 1개의 next() 신호만 발생할 수 있다.
예제 코드를 보자

Consumer<SynchronousSink<Integer>> randGen = new Consumer<>() {
    private int emitCount = 0;
    private Random rand = new Random();

    @Override
    public void accept(SynchronousSink<Integer> sink) {
        emitCount++;
        int data = rand.nextInt(100) + 1; // 1~100 사이 임의 정수
        logger.info("Generator sink next " + data);
        sink.next(data); // 임의 정수 데이터 발생
        if (emitCount == 10) { // 10개 데이터를 발생했으면
            logger.info("Generator sink complete");
            sink.complete(); // 완료 신호 발생
        }
    }
};

Flux<Integer> seq = Flux.generate(randGen);

seq.subscribe(new BaseSubscriber<>() {
    private int receiveCount = 0;
    @Override
    protected void hookOnSubscribe(Subscription subscription) {
        logger.info("Subscriber#onSubscribe");
        logger.info("Subscriber request first 3 items");
        request(3);
    }

    @Override
    protected void hookOnNext(Integer value) {
        logger.info("Subscriber#onNext: " + value);
        receiveCount++;
        if (receiveCount % 3 == 0) {
            logger.info("Subscriber request next 3 items");
            request(3);
        }
    }

    @Override
    protected void hookOnComplete() {
        logger.info("Subscriber#onComplete");
    }
});.

randGen의 accept() 메서드는 1~100 사이의 임의 정수를 생성한 뒤 인자로 SynchronousSink의 next() 메서드를 이용해서 next 신호를 발생한다. emitCount가 10이면(즉 데이터를 10개 발생했다면) SynchronousSink#complete() 메서드를 이용해서 complete 신호를 발생한다.

randGen은 신호 발생 기능을 제공할 뿐이며 실제 시퀀스는 Flux.generate()를 이용해서 생성했다.

seq.subscribe() 메서드에 전달한 Subscriber는 구독 시점에 3개의 데이터를 요청하고(hookOnSubscribe() 메서드의 request(3) 코드), 데이터를 3개 수신할 때마다 다시 3개의 데이터를 요청한다(hookOnNext() 메서드의 request(3) 코드).

콘솔에 관련 문장을 출력해서 실행 흐름을 알 수 있도록 했다. 위 코드를 실제로 실행해보면 다음 내용이 콘솔에 출력된다. 원래 출력에는 빈 줄이 없는데 쉬운 구분을 위해 빈 줄을 넣었고 Subscriber의 출력은 파란색으로 표시했다.

Subscriber#onSubscribe
Subscriber request first 3 items

Generator sink next 17
Subscriber#onNext: 17
Generator sink next 83
Subscriber#onNext: 83
Generator sink next 53
Subscriber#onNext: 53
Subscriber request next 3 items

Generator sink next 12
Subscriber#onNext: 12
Generator sink next 38
Subscriber#onNext: 38
Generator sink next 90
Subscriber#onNext: 90
Subscriber request next 3 items

Generator sink next 23
Subscriber#onNext: 23
Generator sink next 70
Subscriber#onNext: 70
Generator sink next 76
Subscriber#onNext: 76
Subscriber request next 3 items

Generator sink next 52
Subscriber#onNext: 52
Generator sink complete
Subscriber#onComplete

Flux가 제공하는 다른 generate() 메서드로는 다음이 있다.
  • Flux<T> generate(Callable<S> stateSupplier, BiFunction<S, SynchronousSink<T>, S> generator)
  • Flux<T> generate(Callable<S> stateSupplier, BiFunction<S, SynchronousSink<T>, S> generator,
                                 Consumer<? super S> stateConsumer)
stateSupplier는 값을 생성할 때 사용할 최초 상태이다. BiFunction 타입의 generator는 인자로 상태와 SynchronousSink를 입력받아 결과로 다음 상태를 리턴하는 함수이다. 앞서 예제와 마찬가지로 SynchronousSink를 사용해서 신호를 생성한다. 두 번째 generate() 메서드의 stateConsumer는 상태를 정리할 때 사용한다. generator가 complete 신호나 error 신호를 발생하면 상태 정리를 위해 stateConsumer를 실행한다.

다음은 상태를 사용하는 Flux.generate()를 이용해서 임의 숫자 10개를 발생시키는 Flux를 생성하는 코드 예이다.

Flux<String> flux = Flux.generate(
        () -> { // Callable<S> stateSupplier
            return 0;
        },
        (state, sink) -> { // BiFunction<S, SynchronousSink<T>, S> generator
            sink.next("3 x " + state + " = " + 3 * state);
            if (state == 10) {
                sink.complete();
            }
            return state + 1;
        });

이 코드에서 flux는 "3 x 1 = 3" 부터 "3 x 10 = 10" 까지의 데이터를 담은 next 신호를 차례대로 발생하고 state 값이 10이 되면 compelete 신호를 발생한다.

Flux.just()나 Flux.generate()는 데이터 생성 과정 자체가 동기 방식이다. Subscriber로부터 데이터 요청이 오면 그 시점에 SynchronousSink를 이용해서 데이터를 생성한다. 반면에 별도 쓰레드를 이용해서 비동기로 데이터를 생성해야 하는 경우에는 SynchronousSink를 사용할 수 없다. 게다가 SynchronousSink는 한 번에 하나의 next 신호만 발생할 수 있다. 예를 들어 아래 코드는 에러를 발생한다.


Flux<String> flux = Flux.generate(

        () -> 1,

        (state, sink) -> {

            sink.next("Q: 3 * " + state);

            sink.next("A: " + (3 * state)); // 에러!

            if (state == 10) {

                sink.complete();

            }

            return state + 1;

        });


데이터 자체를 비동기로 생성해야 하거나 번에 다수의 next 신호를 발생해야 할 경우 Flux.generate()로는 처리할 수 없다. 이 때에는 Flux.create() 메서드를 사용해야 하는데 이 메서드를 포함한 Flux를 생성하는 또 다른 방법은 다음 글에서 이어서 살펴본다.


관련 글


Posted by 최범균 madvirus

댓글을 달아 주세요

리액티브 스트림즈, Flux, Mono

스프링은 웹 요청 처리, HTTP 클라이언트, NoSQL 연동 등 많은 영역에서 리액티프 프로그래밍을 지원하고 있다. 스프링 리액터는 스프링에서 리액티브 프로그래밍을 위한 핵심 모듈이다. 리액터를 잘 사용하려면 많은 것들을 알아야 하지만 가장 기본이 되는 두 타입인 Flux와 Mono에 대해 알아야 한다. 이 글에서는 리액티브 프로그래밍의 핵심 개념인 스프림에 대해 살펴보고 Flux와 Mono의 기본적인 사용법을 살펴본다.


왜 리액티브인가?

서버 관점에서 리액티브를 사용하는 이유 중 하나는 비동기/논블록을 이용해서 더 적은 자원으로 더 많은 트래픽을 처리하기 위함이다. 관련 내용은 "왜 리액티브인가 요약" 글을 참고한다.


리액티브 스트림

리액티브 스트림즈(http://www.reactive-streams.org/)는 비동기 스트림 처리를 위한 표준이다. 스프링 리액터는 이를 구현한 라이브러리이며 자바9의 Flow API도 리액티브 스트림 API를 따르고 있다. 


스트림은 시간이 지남에 따라 생성되는 일련의 데이터/이벤트(event)/신호(signal)이다. 맥락에 따라 데이터, 이벤트, 신호라는 용어를 사요한다. 이 글에서도 필요에 따라 이들 용어를 혼용해서 사용할 것이다. 리액티브 스트림즈 스펙에서는 용어로 신호를 사용한다. 리액티브 스트림즈는 다음 세 신호를 발생할 수 있다.

  • onNext* (onComplete | onError)?

스트림은 0개 이상의 next 신호를 발생할 수 있다. next 신호는 데이터를 담는다. complete 신호는 스트림이 끝났음을 의미하며 error 신호는 에러가 발생했음을 의미한다. complete와 error는 둘 중 하나만 발생할 수 있으며, 이 두 신호는 발생하지 않을 수도 있다. 


스트림의 예로 1분 간격 현재 기온 스트림을 들 수 있다. 이 데이터는 개념적으로 complete나 error 없이 next 신호만 1분 간격으로 발생한다. 파일 스트림은 파일을 읽는 동안 데이터를 담은 next 신호를 발생하고 파일을 다 읽으면 compelete 신호를 발생한다. 파일을 읽는 도중 에러가 발생하면 error 신호를 발생한다.


리액티브 스트림즈는 Publisher를 이용해서 스트림을 정의하며 Subscriber를 이용해서 발생한 신호를 처리한다. Subscriber가 Publisher로부터 신호를 받는 것을 구독이라고 한다. 다음 코드는 스프링 리액터가 제공하는 Publisher의 한 종류인 Flux에 대해 구독하는 코드 예를 보여준다.


Flux<Integer> seq = Flux.just(1, 2, 3); // Integer 값을 발생하는 Flux 생성


seq.subscribe(value -> System.out.println("데이터 : " + value)); // 구독


리액터는 스트림이라는 용어 대신 시퀀스라는 용어를 주로 사용한다. 위 코드에서 변수 이름이 seq인 이유는 시퀀스를 의미하기 위함이다. 첫 줄은 1, 2, 3 값을 차례대로 발생하는 Flux를 생성한다.


실제 값 발생은 구독(subscription) 시점에 이뤄진다. 위 코드는 Flux#subscribe(Consumer) 메서드를 이용해서 구독한다. 이 경우 Flux가 발생한 신호를 Consumer가 받아서 처리한다. 위 코드는 수신한 데이터를 콘솔에 출력하므로 위 코드를 실행하면 다음과 같은 결과가 출력된다.


데이터 : 1

데이터 : 2

데이터 : 3


물론 이렇게 단순한 작업을 하기 위해 리액티브 프로그래밍을 사용하는 것은 아니다. 스케줄링, 다양한 조합 기능을 이용해서 이전보다 더 간결하면서도 자원을 효율적으로 사용하는 코드를 작성할 수 있다.


[노트]

리액티브 스트림즈는 스트림이라는 표현을 사용하지만 이는 자바 8의 스트림과 혼동할 수 있다. 이런 이유로 "리액티브 스트림즈" 자체를 표현할 때가 아니면 스트림 대신 시퀀스라는 용어를 사용하겠다.


리액터 사용 위한 메이븐 설정

리액터는 reactor-core, reactor-netty, reactor-extra, reactor-adapter 등 다양한 모듈로 구성되어 있다. 각 모듈의 버전을 맞추기 위해 리액터는 메이븐 BOM(Bill Of Materials)을 제공한다. 이 글에서는 reactor-core 의존만 사용하긴 하지만 BOM을 포함한 의존 설정을 사용해보자. 메이븐 의존 설정은 다음과 같다.


<dependencyManagement>

    <dependencies>

        <dependency>

            <groupId>io.projectreactor</groupId>

            <artifactId>reactor-bom</artifactId>

            <version>Bismuth-SR9</version>

            <type>pom</type>

            <scope>import</scope>

        </dependency>

    </dependencies>

</dependencyManagement>


<dependencies>

    <dependency>

        <groupId>io.projectreactor</groupId>

        <artifactId>reactor-core</artifactId>

    </dependency>


    <dependency>

        <groupId>org.slf4j</groupId>

        <artifactId>slf4j-api</artifactId>

        <version>1.7.12</version>

    </dependency>

    <dependency>

        <groupId>ch.qos.logback</groupId>

        <artifactId>logback-classic</artifactId>

        <version>1.2.3</version>

    </dependency>

</dependencies>


reactor-bom의 Bismuth 버전은 스프링 리액터 3.1 버전을 정의한다. Bismuth-SR9 버전은 reactor-core 3.1.7.RELEASE를 기준으로 한다.


스프링 리액터의 Publisher: Flux와 Mono

스프링 리액터는 Flux와 Mono의 두 가지 Publisher를 제공하고 있다. 이 두 타입은 발생할 수 있는 데이터 개수에 차이가 있다. Flux는 0개 이상의 데이터를 발생할 수 있고 Mono는 0 또는 1개의 데이터를 발생할 수 있다.


앞서 Publisher는 next, complete, error 신호를 발생할 수 있다고 했다. Flux는 0개 이상의 데이터를 발생하므로 0개 이상의 next 신호를 발생할 수 있고 complete나 error 신호를 발생하거나 발생하지 않을 수 있다. 예를 들어 다음 코드를 보자.


Flux.just(1, 2, 3);


이 코드에서 seq 시퀀스는 1, 2, 3을 값으로 갖는 세 개의 next 신호를 발생하고 마지막에 complete 신호를 발생해서 시퀀스를 끝낸다. 즉 시간 순으로 표시하면 다음과 같이 시퀀스가 발생한다('--->'는 시간축, '|'는 complete 신호 의미).


--1-2-3-|-->


아래 코드와 같이 아무 값도 발생하지 않는 시퀀스는 complete 신호만 발생한다.


Flux.just(); // --|-->


Mono도 유사하다. 차이라면 최대 발생할 수 있는 값이 1개라는 점이다.


Mono.just(1); // --1-|-->

Mono.empty(); // --|-->


just() 메서드는 이미 존재하는 값을 사용해서 Flux/Mono를 생성할 때 사용된다. just() 외에 create(), generate()를 이용해서 생성할 수 있는데 이에 대한 내용은 나중에 정리해보겠다.


[노트]

Flux와 Mono를 직접 생성하기보다는 다른 라이브러리가 제공하는 Flux와 Mono를 사용할 때가 많다. 예를 들어 스프링 5 버전에 추가된 WebClient 클래스를 사용할 때에는 WebClient가 생성하는 Mono를 이용해서 데이터를 처리한다.


구독과 신호 발생

시퀀스는 바로 신호를 발생하지 않는다. 구독을 하는 시점에 신호를 발생하기 시작한다. 코드로 확인해보자. 먼저 다음 코드를 보자.


Flux.just(1, 2, 3)

     .doOnNext(i -> System.out.println("doOnNext: " + i))

     .subscribe(i -> System.out.println("Received: " + i));


위 코드에서 doOnNext() 메서드는 Flux가 Subscriber에 next 신호를 발생할 때 불린다. 실행 결과는 다음과 같다.


doOnNext: 1

Received: 1

doOnNext: 2

Received: 2

doOnNext: 3

Received: 3


이제 코드를 다음과 같이 바꾸고 다시 실행해보자.


Flux<Integer> seq = Flux.just(1, 2, 3)

        .doOnNext(i -> System.out.println("doOnNext: " + i));


System.out.println("시퀀스 생성");

seq.subscribe(i -> System.out.println("Received: " + i));


실행 결과는 다음과 같다.


시퀀스 생성

doOnNext: 1

Received: 1

doOnNext: 2

Received: 2

doOnNext: 3

Received: 3


이 결과를 보면 시퀀스를 생성한 시점에는 doOnNext에 전달한 함수가 실행되지 않는 것을 알 수 있다. doOnNext에 전달한 함수는 next 신호를 발생할 때 호출되기 때문이다. subscribe()를 실행해서 구독을 한 이후에 doOnNext에 전달한 코드가 실행되는데 이는 subscribe() 시점에 신호를 발생하기 시작한다는 것을 보여준다.


콜드 시퀀스 vs 핫 시퀀스

시퀀스는 구독 시점부터 데이터를 새로 생성하는 콜드(cold) 시퀀스와 구독자 수에 상관없이 데이터를 생성하는 핫(hot) 시퀀스로 나뉜다.


앞 예제 Flux.just()로 생성한 시퀀스가 콜드 시퀀스이다. 콜드 시퀀스는 구독을 하지 않으면 데이터를 생성하지 않는다. 구독을 하면 그 시점에 데이터를 새롭게 발생한다. 다음 코드는 이런 특징을 보여준다.


Flux<Integer> seq = Flux.just(1, 2, 3);

seq.subscribe(v -> System.out.println("구독1: " + v)); // 구독

seq.subscribe(v -> System.out.println("구독2: " + v)); // 구독


이 코드는 seq 시퀀스에 대해 구독을 두 번한다. 코드 결과는 다음과 같은데 이 결과를 보면 seq 시퀀스는 각 구독마다 데이터를 새롭게 생성하는 것을 알 수 있다.


구독1: 1

구독1: 2

구독1: 3

구독2: 1

구독2: 2

구독2: 3


콜드 시퀀스의 예로 API 호출을 들 수 있다. API 호출 시퀀스는 구독을 할 때마다 매번 새로운 요청을 서버에 전송하고 결과를 받는다.


핫 시퀀스는 구독 여부에 상관없이 데이터가 생성된다. 구독을 하면 구독한 시점 이후에 발생하는 데이터부터 신호를 받는다. 핫 시퀀스 예로 센서 데이터를 들 수 있다. 센서 데이터를 제공하는 시퀀스를 구독하면 그 시점 이후에 센서가 발생한 데이터부터 받게 된다.


Subscriber와 Subscription

앞 코드 예에서 다음의 subscribe() 메서드를 사용해서 구독을 했다.


// Flux나 Mono

subscribe(Consumer<? super T> consumer)


Consumer를 파라미터로 갖는 subscribe() 메서드는 리액터가 편의를 위해 제공하는 메서드로서 이 메서드는 내부적으로 Subscriber를 인자로 받는 subscribe() 메서드를 실행한다.


Subscriber는 리액티브 스트림즈에 포함된 인터페이스로 다음과 같이 정의되어 있다.


package org.reactivestreams;


public interface Subscriber<T> {

    void onSubscribe(Subscription s);

    void onNext(T t);

    void onError(Throwable t);

    void onComplete();

}


각 메서드는 다음과 같다.

  • onSubscribe(Subscription s): 구독을 하면 Publisher와 연동된 Subscription을 받는다. 전달받은 Subscription을 이용해서 Publisher에 데이터를 요청한다.
  • onNext(T t): Publisher가 next 신호를 보내면 호출된다.
  • onError(Throwable t): Publisher가 error 신호를 보내면 호출된다.
  • onComplete(): Publisher가 complete 신호를 보내면 호출된다.

각 메서드가 어떻게 동작하는지 다음 예제 코드로 알아보자.


Flux<Integer> seq = Flux.just(1, 2, 3);


seq.subscribe(new Subscriber<>() {

    private Subscription subscription;

    @Override

    public void onSubscribe(Subscription s) {

        System.out.println("Subscriber.onSubscribe");

        this.subscription = s;

        this.subscription.request(1); // Publisher에 데이터 요청

    }


    @Override

    public void onNext(Integer i) {

        System.out.println("Subscriber.onNext: " + i);

        this.subscription.request(1); // Publisher에 데이터 요청

    }


    @Override

    public void onError(Throwable t) {

        System.out.println("Subscriber.onError: " + t.getMessage());

    }


    @Override

    public void onComplete() {

        System.out.println("Subscriber.onComplete");

    }

});


subscribe() 메서드에 전달한 임의 Subscriber 객체의 onSubscribe() 메서드는 인자로 전달받은 Subscription 객체를 필드에 저장한다. Subscription은 구독 라이프사이클을 관리한다. 예를 들어 Subscription#request() 메서드는 Publisher에 데이터 요청 신호를 보낸다. 위 코드는 request(1)을 실행했는데 이는 1개의 데이터를 요청한다는 것을 의미한다. 즉 onSubscribe() 메서드는 파라미터로 전달받은 Subscription을 이용해서 Publisher에 1개의 데이터를 요청한다.

Publisher가 데이터 신호(next 신호)를 보내면 Subscriber#onNext() 메서드가 불린다. 위 예제에서는 전달받은 데이터를 출력하고 Subscription#request()를 이용해서 다음 데이터 1개를 요청한다. 즉 이 코드는 최초 구독 시점에 데이터 1개를 요청하고(onSubscribe) 이후 한 개의 데이터를 받으면 다시 한 개의 데이터를 요청한다(onNext).

실제 위 코드를 실행하면 다음 내용이 콘솔에 출력된다.

Subscriber.onSubscribe
Subscriber.onNext: 1
Subscriber.onNext: 2
Subscriber.onNext: 3
Subscriber.onComplete

이번엔 onNext() 메서드에서 다음처럼 subscription.request(1) 코드를 주석처리하고 다시 실행해보자.

Flux<Integer> seq = Flux.just(1, 2, 3);
seq.subscribe(new Subscriber<>() {
    ...생략

    @Override
    public void onNext(Integer i) {
        System.out.println("Subscriber.onNext: " + i);
        // this.subscription.request(1);
    }

    ...생략
});

결과는 다음과 같다.

Subscriber.onSubscribe
Subscriber.onNext: 1

onSubscribe()에서만 1개의 데이터를 요청하고 onNext()에서는 어떤 요청도 하지 않아 Publisher가 1개 데이터만 발생한 것을 알 수 있다.

request(Long.MAX_VALUE)를 사용하면 개수 제한없는 데이터 요청 신호를 Publisher에 보낸다. Publisher는 이 신호를 받으면 끝까지 데이터를 발생시킨다. 이를 다음과 같이 변경해보자.

Flux<Integer> seq = Flux.just(1, 2, 3);
seq.subscribe(new Subscriber<>() {
    private Subscription subscription;
    @Override
    public void onSubscribe(Subscription s) {
        System.out.println("Subscriber.onSubscribe");
        this.subscription = s;
        this.subscription.request(Long.MAX_VALUE);
    }

    @Override
    public void onNext(Integer i) {
        System.out.println("Subscriber.onNext: " + i);
    }

    ...생략
});

위 코드를 실행하면 Publisher가 모든 데이터를 발생한 것을 확인할 수 있다.

푸시 모델 vs 풀 모델

Subscription#request()는 Subscriber가 데이터를 처리할 수 있을 때 Publisher에게 데이터를 요청하는 풀(pull) 모델이다. 하지만 request(Long.MAX_VALUE)로 요청하면 Publisher는 개수 제한 없이 Subscriber에 데이터를 전송한다. 이는 완전한 푸시(push) 모델이다. 또 request(100000)을 사용하면 십 만 개의 데이터를 요청하고, Publisher는 발생한 데이터가 십 만 개가 될 때까지 신호를 보낸다. 데이터 요청은 풀 모델로 이루어졌지만 10만 개의 데이터를 전송하는 동안은 실질적으로 푸시 모델과 같다.

subscribe() 메서드

다음은 리액터가 제공하는 subscribe() 메서드이다.
  • subscribe()
  • subscribe(Consumer<? super T> consumer)
  • subscribe(Consumer<? super T> consumer,
                 Consumer<? super Throwable> errorConsumer)
  • subscribe(Consumer<? super T> consumer,
                 Consumer<? super Throwable> errorConsumer,
                 Runnable completeConsumer)
  • subscribe(Consumer<? super T> consumer,
                 Consumer<? super Throwable> errorConsumer,
                 Runnable completeConsumer,
                 Consumer<? super Subscription> subscriptionConsumer)
  • subscribe(Subscriber<? super T> actual)
  • subscribe(CoreSubscriber<? super T> actual)

메서드의 각 파라미터는 다음과 같다.

  • consumer: next 신호 처리
  • errorConsumer: error 신호 처리
  • completeConsumer: complete 신호 처리
  • subscriptionConsumer: Subscriber의 onSubscribe 메서드에 대응
  • actual: Subscriber나 CoreSubscriver 타입

관련글


Posted by 최범균 madvirus

댓글을 달아 주세요

  1. kyungsik-oh 2018.09.27 14:49 신고  댓글주소  수정/삭제  댓글쓰기

    핫 시퀀스는 구독 여부에 상관없이 데이터가 생성된다. 구독을 하면 구독한 시점 이후에 발생하는 데이터부터 신호를 받는다. 콜드 스트림의 예로 센서 데이터를 들 수 있다. 센서 데이터를 제공하는 시퀀스스에 구독을 하면 그 시점 이후에 센서가 발생한 데이터부터 받게 된다.

    -->

    여기 구문에 오타가 있는 것 같습니다 : )
    콜드스트림의 예로 센서 데이터를 들 수 있다 --> 핫 시퀀스(핫스트림)의 예로 센서 데이터를 들 수 있다.
    시퀀스스에 구독을 하면 --> 시퀀스에 구독을 하면