시퀀스 생성 1 :just(), generate()
[노트]
시퀀스를 직접 생성할 일이 많지는 않다. 보통은 라이브러리가 제공하는 기능을 사용하기 때문이다. 그럼에도 불구하고 시퀀스 생성 방법을 정리한 이유는 리액터 시퀀스를 생성하는 방법을 살펴보면 리액티브의 동작을 이해하는데 도움이 되기 때문이다.
Flux.just(), Mono.just()로 만들기
시퀀스를 생성하는 가장 쉬운 방법은 Flux.just()를 사용하는 것이다. just() 메서드는 시퀀스로 사용할 데이터가 이미 존재할 때 사용한다. 다음은 사용 예이다.
Flux<Integer> seq = Flux.just(1, 2, 3);
이 Flux는 1, 2, 3 데이터를 차례대로 발생하고 complete 신호를 발생한다. just() 메서드는 가변 인자로 0개 이상의 데이터를 전달할 수 있다. 아래와 같이 발생할 데이터를 주지 않으면 complete 신호만 발생한다.
Flux<Integer> seq = Flux.just();
Mono.just()도 동일하다. 차이라면 Mono는 1개 값만 생성하므로 데이터도 한 개만 받는다는 것이다.
Mono<Integer> seq = Mono.just(1);
Mono.just(null)과 같이 null을 값으로 주면 NullPointerException이 발생한다. 데이터를 발생하지 않는 Mono를 생성하고 싶다면 Mono.empty()를 사용한다.
값이 있을 수도 있고 없을 수도 있는 Mono를 생성할 때에는 justOrEmpty() 메서드를 사용하면 된다. 다음은 사용 예이다.
// null을 값으로 받으면 값이 없는 Mono
Mono<Integer> seq1 = Mono.justOrEmpty(null); // complete 신호
Mono<Integer> seq2 = Mono.justOrEmpty(1); // next(1) 신호- complete 신호
// Optional을 값으로 받음
Mono<Integer> seq3 = Mono.justOrEmpty(Optional.empty()); // complete 신호
Mono<Integer> seq4 = Mono.justOrEmpty(Optional.of(1)); // next(1) 신호 - complete 신호
Flux.range()로 정수 생성하기
Flux.range() 메서드를 사용하면 순차적으로 증가하는 Integer를 생성하는 Flux를 생성할 수 있다. 예를 들어 다음 코드는 11부터 시작해서 5개의 Integer를 생성하는 Flux 시퀀스를 생성한다. 즉 11부터 15까지의 Integer를 생성한다.
Flux<Integer> seq = Flux.range(11, 5);
Flux.generate() 메서드로 Flux 만들기
Flux.generate() 메서드를 사용하면 데이터를 함수를 이용해서 생성할 수 있다. Flux.generate() 함수는 동기 방식으로 한 번에 1개의 데이터를 생성할 때 사용한다. Flux.generate() 메서드 중 하나는 다음과 같다.
- Flux<T> generate(Consumer<SynchronousSink<T>> generator)
- Subscriber의 요청에 대해 인자로 전달받은 generator를 실행한다. generator를 실행할 때 인자로 SynchronousSink를 전달한다.
- generator는 전달받은 SynchronousSink를 사용해서 next, complete, error 신호를 발생한다. 한 번에 1개의 next() 신호만 발생할 수 있다.
Consumer<SynchronousSink<Integer>> randGen = new Consumer<>() {private int emitCount = 0;private Random rand = new Random();@Overridepublic void accept(SynchronousSink<Integer> sink) {emitCount++;int data = rand.nextInt(100) + 1; // 1~100 사이 임의 정수logger.info("Generator sink next " + data);sink.next(data); // 임의 정수 데이터 발생if (emitCount == 10) { // 10개 데이터를 발생했으면logger.info("Generator sink complete");sink.complete(); // 완료 신호 발생}}};Flux<Integer> seq = Flux.generate(randGen);seq.subscribe(new BaseSubscriber<>() {private int receiveCount = 0;@Overrideprotected void hookOnSubscribe(Subscription subscription) {logger.info("Subscriber#onSubscribe");logger.info("Subscriber request first 3 items");request(3);}@Overrideprotected void hookOnNext(Integer value) {logger.info("Subscriber#onNext: " + value);receiveCount++;if (receiveCount % 3 == 0) {logger.info("Subscriber request next 3 items");request(3);}}@Overrideprotected void hookOnComplete() {logger.info("Subscriber#onComplete");}});.
Subscriber#onSubscribeSubscriber request first 3 itemsGenerator sink next 17Subscriber#onNext: 17Generator sink next 83Subscriber#onNext: 83Generator sink next 53Subscriber#onNext: 53Subscriber request next 3 itemsGenerator sink next 12Subscriber#onNext: 12Generator sink next 38Subscriber#onNext: 38Generator sink next 90Subscriber#onNext: 90Subscriber request next 3 itemsGenerator sink next 23Subscriber#onNext: 23Generator sink next 70Subscriber#onNext: 70Generator sink next 76Subscriber#onNext: 76Subscriber request next 3 itemsGenerator sink next 52Subscriber#onNext: 52Generator sink completeSubscriber#onComplete
- Flux<T> generate(Callable<S> stateSupplier, BiFunction<S, SynchronousSink<T>, S> generator)
- Flux<T> generate(Callable<S> stateSupplier, BiFunction<S, SynchronousSink<T>, S> generator,
Consumer<? super S> stateConsumer)
Flux<String> flux = Flux.generate(() -> { // Callable<S> stateSupplierreturn 0;},(state, sink) -> { // BiFunction<S, SynchronousSink<T>, S> generatorsink.next("3 x " + state + " = " + 3 * state);if (state == 10) {sink.complete();}return state + 1;});
Flux<String> flux = Flux.generate(
() -> 1,
(state, sink) -> {
sink.next("Q: 3 * " + state);
sink.next("A: " + (3 * state)); // 에러!
if (state == 10) {
sink.complete();
}
return state + 1;
});
데이터 자체를 비동기로 생성해야 하거나 번에 다수의 next 신호를 발생해야 할 경우 Flux.generate()로는 처리할 수 없다. 이 때에는 Flux.create() 메서드를 사용해야 하는데 이 메서드를 포함한 Flux를 생성하는 또 다른 방법은 다음 글에서 이어서 살펴본다.
관련 글