주요글: 도커 시작하기
반응형

시퀀스 생성 1 :just(), generate()

[노트]

시퀀스를 직접 생성할 일이 많지는 않다. 보통은 라이브러리가 제공하는 기능을 사용하기 때문이다. 그럼에도 불구하고 시퀀스 생성 방법을 정리한 이유는 리액터 시퀀스를 생성하는 방법을 살펴보면 리액티브의 동작을 이해하는데 도움이 되기 때문이다.

 

Flux.just(), Mono.just()로 만들기

시퀀스를 생성하는 가장 쉬운 방법은 Flux.just()를 사용하는 것이다. just() 메서드는 시퀀스로 사용할 데이터가 이미 존재할 때 사용한다. 다음은 사용 예이다.

 

Flux<Integer> seq = Flux.just(1, 2, 3);

 

이 Flux는 1, 2, 3 데이터를 차례대로 발생하고 complete 신호를 발생한다. just() 메서드는 가변 인자로 0개 이상의 데이터를 전달할 수 있다. 아래와 같이 발생할 데이터를 주지 않으면 complete 신호만 발생한다.

 

Flux<Integer> seq = Flux.just();

 

Mono.just()도 동일하다. 차이라면 Mono는 1개 값만 생성하므로 데이터도 한 개만 받는다는 것이다.

 

Mono<Integer> seq = Mono.just(1);

 

Mono.just(null)과 같이 null을 값으로 주면 NullPointerException이 발생한다. 데이터를 발생하지 않는 Mono를 생성하고 싶다면 Mono.empty()를 사용한다.

 

값이 있을 수도 있고 없을 수도 있는 Mono를 생성할 때에는 justOrEmpty() 메서드를 사용하면 된다. 다음은 사용 예이다.

 

// null을 값으로 받으면 값이 없는 Mono

Mono<Integer> seq1 = Mono.justOrEmpty(null); // complete 신호

Mono<Integer> seq2 = Mono.justOrEmpty(1); // next(1) 신호- complete 신호

 

// Optional을 값으로 받음

Mono<Integer> seq3 = Mono.justOrEmpty(Optional.empty()); // complete 신호

Mono<Integer> seq4 = Mono.justOrEmpty(Optional.of(1)); // next(1) 신호 - complete 신호

 

Flux.range()로 정수 생성하기

Flux.range() 메서드를 사용하면 순차적으로 증가하는 Integer를 생성하는 Flux를 생성할 수 있다. 예를 들어 다음 코드는 11부터 시작해서 5개의 Integer를 생성하는 Flux 시퀀스를 생성한다. 즉 11부터 15까지의 Integer를 생성한다.

 

Flux<Integer> seq = Flux.range(11, 5);

 

Flux.generate() 메서드로 Flux 만들기

Flux.generate() 메서드를 사용하면 데이터를 함수를 이용해서 생성할 수 있다. Flux.generate() 함수는 동기 방식으로 한 번에 1개의 데이터를 생성할 때 사용한다. Flux.generate() 메서드 중 하나는 다음과 같다.

  • Flux<T> generate(Consumer<SynchronousSink<T>> generator)
generator는 Subscriber로부터 요청이 왔을 때 신호를 생성한다. generate()가 생성한 Flux는 다음과 같은 방식으로 신호를 발생한다.
  • Subscriber의 요청에 대해 인자로 전달받은 generator를 실행한다. generator를 실행할 때 인자로 SynchronousSink를 전달한다.
  • generator는 전달받은 SynchronousSink를 사용해서 next, complete, error 신호를 발생한다. 한 번에 1개의 next() 신호만 발생할 수 있다.
예제 코드를 보자
 
Consumer<SynchronousSink<Integer>> randGen = new Consumer<>() {
    private int emitCount = 0;
    private Random rand = new Random();
 
    @Override
    public void accept(SynchronousSink<Integer> sink) {
        emitCount++;
        int data = rand.nextInt(100) + 1; // 1~100 사이 임의 정수
        logger.info("Generator sink next " + data);
        sink.next(data); // 임의 정수 데이터 발생
        if (emitCount == 10) { // 10개 데이터를 발생했으면
            logger.info("Generator sink complete");
            sink.complete(); // 완료 신호 발생
        }
    }
};
 
Flux<Integer> seq = Flux.generate(randGen);
 
seq.subscribe(new BaseSubscriber<>() {
    private int receiveCount = 0;
    @Override
    protected void hookOnSubscribe(Subscription subscription) {
        logger.info("Subscriber#onSubscribe");
        logger.info("Subscriber request first 3 items");
        request(3);
    }
 
    @Override
    protected void hookOnNext(Integer value) {
        logger.info("Subscriber#onNext: " + value);
        receiveCount++;
        if (receiveCount % 3 == 0) {
            logger.info("Subscriber request next 3 items");
            request(3);
        }
    }
 
    @Override
    protected void hookOnComplete() {
        logger.info("Subscriber#onComplete");
    }
});.
 
randGen의 accept() 메서드는 1~100 사이의 임의 정수를 생성한 뒤 인자로 SynchronousSink의 next() 메서드를 이용해서 next 신호를 발생한다. emitCount가 10이면(즉 데이터를 10개 발생했다면) SynchronousSink#complete() 메서드를 이용해서 complete 신호를 발생한다.
 
randGen은 신호 발생 기능을 제공할 뿐이며 실제 시퀀스는 Flux.generate()를 이용해서 생성했다.
 
seq.subscribe() 메서드에 전달한 Subscriber는 구독 시점에 3개의 데이터를 요청하고(hookOnSubscribe() 메서드의 request(3) 코드), 데이터를 3개 수신할 때마다 다시 3개의 데이터를 요청한다(hookOnNext() 메서드의 request(3) 코드).
 
콘솔에 관련 문장을 출력해서 실행 흐름을 알 수 있도록 했다. 위 코드를 실제로 실행해보면 다음 내용이 콘솔에 출력된다. 원래 출력에는 빈 줄이 없는데 쉬운 구분을 위해 빈 줄을 넣었고 Subscriber의 출력은 파란색으로 표시했다.
 
Subscriber#onSubscribe
Subscriber request first 3 items
 
Generator sink next 17
Subscriber#onNext: 17
Generator sink next 83
Subscriber#onNext: 83
Generator sink next 53
Subscriber#onNext: 53
Subscriber request next 3 items
 
Generator sink next 12
Subscriber#onNext: 12
Generator sink next 38
Subscriber#onNext: 38
Generator sink next 90
Subscriber#onNext: 90
Subscriber request next 3 items
 
Generator sink next 23
Subscriber#onNext: 23
Generator sink next 70
Subscriber#onNext: 70
Generator sink next 76
Subscriber#onNext: 76
Subscriber request next 3 items
 
Generator sink next 52
Subscriber#onNext: 52
Generator sink complete
Subscriber#onComplete
 
Flux가 제공하는 다른 generate() 메서드로는 다음이 있다.
  • Flux<T> generate(Callable<S> stateSupplier, BiFunction<S, SynchronousSink<T>, S> generator)
  • Flux<T> generate(Callable<S> stateSupplier, BiFunction<S, SynchronousSink<T>, S> generator,
                                 Consumer<? super S> stateConsumer)
stateSupplier는 값을 생성할 때 사용할 최초 상태이다. BiFunction 타입의 generator는 인자로 상태와 SynchronousSink를 입력받아 결과로 다음 상태를 리턴하는 함수이다. 앞서 예제와 마찬가지로 SynchronousSink를 사용해서 신호를 생성한다. 두 번째 generate() 메서드의 stateConsumer는 상태를 정리할 때 사용한다. generator가 complete 신호나 error 신호를 발생하면 상태 정리를 위해 stateConsumer를 실행한다.
 
다음은 상태를 사용하는 Flux.generate()를 이용해서 임의 숫자 10개를 발생시키는 Flux를 생성하는 코드 예이다.
 
Flux<String> flux = Flux.generate(
        () -> { // Callable<S> stateSupplier
            return 0;
        },
        (state, sink) -> { // BiFunction<S, SynchronousSink<T>, S> generator
            sink.next("3 x " + state + " = " + 3 * state);
            if (state == 10) {
                sink.complete();
            }
            return state + 1;
        });
 
이 코드에서 flux는 "3 x 0 = 0" 부터 "3 x 10 = 30" 까지의 데이터를 담은 next 신호를 차례대로 발생하고 state 값이 10이 되면 compelete 신호를 발생한다.
 
Flux.just()나 Flux.generate()는 데이터 생성 과정 자체가 동기 방식이다. Subscriber로부터 데이터 요청이 오면 그 시점에 SynchronousSink를 이용해서 데이터를 생성한다. 반면에 별도 쓰레드를 이용해서 비동기로 데이터를 생성해야 하는 경우에는 SynchronousSink를 사용할 수 없다. 게다가 SynchronousSink는 한 번에 하나의 next 신호만 발생할 수 있다. 예를 들어 아래 코드는 에러를 발생한다.

 

Flux<String> flux = Flux.generate(

        () -> 1,

        (state, sink) -> {

            sink.next("Q: 3 * " + state);

            sink.next("A: " + (3 * state)); // 에러!

            if (state == 10) {

                sink.complete();

            }

            return state + 1;

        });

 

데이터 자체를 비동기로 생성해야 하거나 번에 다수의 next 신호를 발생해야 할 경우 Flux.generate()로는 처리할 수 없다. 이 때에는 Flux.create() 메서드를 사용해야 하는데 이 메서드를 포함한 Flux를 생성하는 또 다른 방법은 다음 글에서 이어서 살펴본다.

 

관련 글

 

+ Recent posts