주요글: 도커 시작하기
반응형

시퀀스 생성 2: Flux.create(), Flux.fromStream()

이전 글(스프링 리액터 시작하기 2 - 시퀀스 생성 just, generate)에서 살펴본 Flux.generate()는 Subscriber로부터 요청이 있을 때에 next 신호를 발생하는 Flux를 생성한다. 즉 pull 방식의 Flux를 생성한다. 이는 단순하지만 데이터 발생을 비동기나 push 방식으로 할 수 없다는 제약도 있다. Flux.create()를 사용하면 이런 제약 없이 비동기나 push 방식으로 데이터를 발생할 수 있다.


Flux.create()를 이용한 pull 방식 메시지 생성

먼저 Flux.create() 메서드를 이용해서 pull 방식으로 메시지를 생성하는 방법을 살펴보자. 다음은 예제 코드이다.


Flux<Integer> flux = Flux.create( (FluxSink<Integer> sink) -> {

    sink.onRequest(request -> { // request는 Subscriber가 요청한 데이터 개수

        for (int i = 1; i <= request; i++) {

            sink.next(i); // Flux.generate()의 경우와 달리 한 번에 한 개 이상의 next() 신호 발생 가능

        }

    });

});


위 코드에서 Flux.create() 메서드의 파라머티는 함수형 타입 Consumer<? super FluxSink<T>>이다. 이 Consumer는 FluxSink를 이용해서 Subscriber에 신호를 발생할 수 있다.


FluxSink#onRequest(LongConsumer) 메서드의 Consumer는 Subscriber가 데이터를 요청했을 때 불린다. 이때 LongConsumer는 Subscriber가 요청한 데이터 개수를 전달받는다. 위 코드에서는 클라이언트가 요청한 데이터 개수만큼 next 신호를 발생하고 있다.


Flux.generate()와의 차이점은 Flux.generate()의 경우 한 번에 한 개의 next 신호만 발생할 수 있었던 데 비해 Flux.create()는 한 번에 한 개 이상의 next() 신호를 발생할 수 있다는 점이다.


Flux.create()를 이용한 push 방식 메시지  생성

Flux.create()를 이용하면 Subscriber의 요청과 상관없이 비동기로 데이터를 발생할 수 있다. 다음 코드를 보자.


DataPump pump = new DataPump();


Flux<Integer> bridge = Flux.create((FluxSink<Integer> sink) -> {

    pump.setListener(new DataListener<Integer>() {

        @Override

        public void onData(List<Integer> chunk) {

            chunk.forEach(s -> {

                sink.next(s); // Subscriber의 요청에 상관없이 신호 발생

            });

        }

        @Override

        public void complete() {

            logger.info("complete");

            sink.complete();

        }

    });

});


이 코드에서 DataPump는 데이터를 어딘가에서 데이터가 오면 setListener()로 등록한 DataListener의 onData()를 실행한다고 가정하자. DataListener#onData() 메서드는 FluxSink#next()를 이용해서 데이터를 발생한다. DataListener#onData() 메서드는 Subscriber의 데이터 요청과 상관없이 호출된다. 즉 위 코드는 Subscriber의 요청과 상관없이 데이터를 push한다.


Flux.create()와 배압

Subscriber로부터 요청이 왔을 때(FluxSink#onRequest) 데이터를 전송하거나(pull 방식) Subscriber의 요청에 상관없이 데이터를 전송하거나(push 방식) 두 방식 모두 Subscriber가 요청한 개수보다 더 많은 데이터를 발생할 수 있다. 예를 들어 아래 코드를 보자.


Flux<Integer> flux = Flux.create( (FluxSink<Integer> sink) -> {

    sink.onRequest(request -> {

        for (int i = 1; i <= request + 3 ; i++) { // Subscriber가 요청한 것보다 3개 더 발생

            sink.next(i);

        }

    });

});


이 코드는 Subscriber가 요청한 개수보다 3개 데이터를 더 발생한다. 이 경우 어떻게 될까? 기본적으로 Flux.create()로 생성한 Flux는 초과로 발생한 데이터를 버퍼에 보관한다. 버퍼에 보관된 데이터는 다음에 Subscriber가 데이터를 요청할 때 전달된다.


요청보다 발생한 데이터가 많을 때 선택할 수 있는 처리 방식은 다음과 같다.

  • IGNORE : Subscriber의 요청 무시하고 발생(Subscriber의 큐가 다 차면 IllegalStateException 발생)
  • ERROR : 익셉션(IllegalStateException) 발생
  • DROP : Subscriber가 데이터를 받을 준비가 안 되어 있으면 데이터 발생 누락
  • LATEST : 마지막 신호만 Subscriber에 전달
  • BUFFER : 버퍼에 저장했다가 Subscriber 요청시 전달. 버퍼 제한이 없으므로 OutOfMemoryError 발생 가능

Flux.create()의 두 번째 인자로 처리 방식을 전달하면 된다.


Flux.create(sink -> { ... }, FluxSink.OverflowStrategy.IGNORE);


Flux.fromStream(), Flux.fromIterable()을 이용한 Flux 생성

Flux.stream()을 사용하면 자바 8의 Stram에서 Flux를 생성할 수 있다. 다음은 예이다.


Stream<String> straem = Files.lines(Paths.get(filePath));

Flux<String> seq = Flux.fromStream(straem);

seq.subscribe(System.out::println);


Flux.fromIterable()을 이용하면 Iterable을 이용해서 Flux를 생성할 수 있다. List나 Set과 같은 콜렉션에서 Flux를 생성하고 싶을 때 이 메서드를 사용하면 된다.


관련 글





+ Recent posts