저작권 안내: 저작권자표시 Yes 상업적이용 No 컨텐츠변경 No

스프링5 입문

JSP 2.3

JPA 입문

DDD Start

인프런 객체 지향 입문 강의

리액터 윈도우


일정 개수로 묶어서 Flux 만들기: window(int), window(int, int)

Flux#window(int) 메서드를 사용하면 시퀀스가 발생시키는 데이터를 일정 개수로 묶을 수 있다. 다음은 예제 코드이다.


Flux<Flux<Integer>> windowSeq = 

        Flux.just(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)

             .window(4); // 4개 간격으로 4개씩 새로운 Flux로 묶음


windowSeq.subscribe(seq -> { // seq는 Flux<Integer>

    Mono<List<Integer>> monoList = seq.collectList();

    monoList.subscribe(list -> logger.info("window: {}", list));

});


위 코드에서 Flux#window(4)가 리턴하는 타입은 Flux<Flux<Integer>>이다. 즉 값이 Flux<Integer>인 Flux를 리턴한다. 이 시퀀스(Flux<Integer>)가 발생하는 값의 개수는 최대 4개이다. 위 코드의 실행 결과는 다음과 같다. 결과를 보면 4개씩 데이터를 묶어서 하나의 Flux로 만든 것을 알 수 있다.


01:19:52.388 [parallel-2] INFO batch.WindowTest - window: [5, 6, 7, 8]

01:19:52.388 [parallel-1] INFO batch.WindowTest - window: [1, 2, 3, 4]

01:19:52.391 [parallel-1] INFO batch.WindowTest - window: [9, 10]


Flux.window(int maxSize, int skip) 메서드를 사용하면 어느 간격으로 데이터를 묶을지 정할 수 있다. 두 번째 파라미터는 몇 개씩 건너서 데이터를 묶을 지 결정한다. 예를 들어 다음 코드를 보자.


Flux<Flux<Integer>> windowSeq =

        Flux.just(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)

                .window(4, 3); // 3개 간격마다 4개씩 새로운 Flux로 묶음


windowSeq.subscribe(seq -> { // seq는 Flux<Integer>

    Mono<List<Integer>> monoList = seq.collectList();

    monoList.subscribe(list -> logger.info("window: {}", list));

});


위 코드는 두 번째 인자로 3을 주었다. 이 경우 3개 데이터 간격으로 4개씩 데이터를 묶는다. 데이터를 묶는 간격이 데이터를 묶는 개수보다 작으므로 일부 데이터에 중복이 발생한다.


15:18:37.898 [main] INFO batch.WindowTest - window: [1, 2, 3, 4]

15:18:37.898 [main] INFO batch.WindowTest - window: [4, 5, 6, 7]

15:18:37.898 [main] INFO batch.WindowTest - window: [7, 8, 9, 10]

15:18:37.898 [main] INFO batch.WindowTest - window: [10]


다음과 같이 skip 파라미터 값으로 5를 주면 어떻게 될까? 데이터를 묶는 개수보다 간격이 더 크므로 일부 데이터에 누락이 발생할 것이다. 


Flux<Flux<Integer>> windowSeq2 =

        Flux.just(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)

                .window(4, 5); // 5개 간격마다 4개씩 새로운 Flux 묶음


// 첫 번째 Flux<Integer> : [1, 2, 3, 4]
// 두 번째 Flux<Integer> : [6, 7, 8, 9]



일정 시간 간격으로 묶어서 Flux 만들기: window(Duration), window(Duration, Duration)

Flux#window(Duration) 메서드를 사용하면 시퀀스가 발생시키는 데이터를 일정 시간마다 묶을 수 있다. 다음은 예제 코드이다.


Flux<Flux<Long>> windowSeq = Flux.interval(Duration.ofMillis(100))

      .window(Duration.ofMillis(500)); // 500밀리초 간격마다 500밀리초 동안 데이터 묶음


이 코드는 500밀리초(0.5초) 동안 발생한 데이터를 묶는다.


데이터를 묶기 시작하는 간격을 지정하고 싶다면 Flux#window(Duration, Duration) 메서드를 사용한다.


Flux<Flux<Long>> windowSeq = Flux.interval(Duration.ofMillis(100))

        // 400밀리초 간격마다 500밀리초 동안 데이터 묶음

        .window(Duration.ofMillis(500), Duration.ofMillis(400))



특정 조건에 다다를 때가지 묶어서 Flux 만들기: windowUntil(Predicate)

특정 조건을 충족하는 데이터를 만날 때까지 묶어서 Flux로 만들고 싶다면 windowUntil()을 사용한다. 다음은 사용 예이다.


Flux.just(1,1,2,3,3,4,5)

        .windowUntil(x -> x % 2 == 0)

        .subscribe((Flux<Integer> seq) -> {

            seq.collectList().subscribe(lst -> logger.info("window: {}", lst));

        });


위 코드는 2로 나눠서 나머지가 0인(즉 짝수인) 값을 만날 때까지 묶는다. 실제 실행 결과를 보면 다음과 같다.


01:19:27.166 [main] INFO batch.WindowTest - window: [1, 1, 2]

01:19:27.169 [main] INFO batch.WindowTest - window: [3, 3, 4]

01:19:27.169 [main] INFO batch.WindowTest - window: [5]


다음과 같이 마지막 데이터가 조건에 해당하면 어떻게 될까?


Flux.just(1,1,2,3,3,4)

        .windowUntil(x -> x % 2 == 0)

        .subscribe(seq -> {

            seq.collectList().subscribe(lst -> logger.info("window: {}", lst));

        });


결과를 보면 다음과 같이 마지막에 빈 Flux가 하나 더 발생되는 것을 알 수 있다.


17:23:22.724 [main] INFO batch.WindowTest - window: [1, 1, 2]

17:23:22.727 [main] INFO batch.WindowTest - window: [3, 3, 4]

17:23:22.727 [main] INFO batch.WindowTest - window: []


특정 조건을 충족하는 동안 묶어서 Flux 만들기: windowWhile(Predicate)

Flux#windowWhile(Predicate)은 해당 조건을 충족하지 않는 데이터가 나올 때까지 묶어서 Flux를 만든다. 조건을 충족하지 않는 데이터로 시작하거나 연속해서 데이터가 조건을 충족하지 않으면 빈 윈도우를 생성한다.


Flux.just(1,1,2,4,3,3,4,6,8,9,10)

        .windowWhile(x -> x % 2 == 0) // 짝수인 동안

        .subscribe(seq -> {

            seq.collectList().subscribe(lst -> logger.info("window: {}", lst));

        });


이 코드의 결과는 다음과 같다.


01:07:00.239 [main] INFO batch.WindowTest - window: []

01:07:00.242 [main] INFO batch.WindowTest - window: []

01:07:00.242 [main] INFO batch.WindowTest - window: [2, 4]

01:07:00.242 [main] INFO batch.WindowTest - window: []

01:07:00.242 [main] INFO batch.WindowTest - window: [4, 6, 8]

01:07:00.242 [main] INFO batch.WindowTest - window: [10]


Flux 대신 List로 묶기: buffer류 메서드

window류 메서드가 Flux로 묶는다면 buffer류 메서드는 Collection으로 묶는다. 메서드 이름이 window에서 buffer로 바뀔뿐 시그너쳐는 동일하다. 다음은 buffer류 메서드의 사용 예이다.


Flux<List<Integer>> bufferSeq = Flux.just(1, 2, 3, 4, 5, 6, 7, 8, 9, 10).buffer(4);

bufferSeq.subscribe(list -> logger.info("window: {}", list));



관련글




Posted by 최범균 madvirus

댓글을 달아 주세요

IO 스트림에서 출력 버퍼가 성능에 미치는 영향에 대해서 살펴본다.

IO 스트림의 출력 버퍼

IO 스트림은 파일시스템이나 다른 주변장치들과의 데이터 통신뿐만이 아니라 프로세스간 파이프 같은 데이터 통신 및 네트워크 상의 다른 컴퓨터의 프로세스 간의 데이터 통신을 표준화된 방법으로 처리할 수 있도록 규정한 방법이다. 이런 IO 스트림은 OS 레벨에서 지원하게 되며, 고급화된 표준 IO 데이터 통신 방법이다.

IO 스트림에 데이터를 출력할 때 자바 언어의 스트림에서 제공하는 출력 스트림에 데이터를 내보내면 내부적으로 OS 차원에서 제공하는 고수준 프로시져를 호출하고, 다시 OS 커널의 저수준 프로시져를 거치게 된다. 이 때 데이터는 하나의 덩어리로 보내지게 된다.

이런 이유로, 1바이트의 데이터를 1000번에 걸쳐 보내는 것과, 1000바이트의 데이터를 1번에 보내는 것은 엄청난 속도 차이를 불러오게 된다. 이것은 언어에 상관없이 OS 차원에서 지원하는 것이므로, 자바만의 특징이 아니라 IO 스트림의 특징이다.

데이터를 스트림을 통하여 출력할 때, 큰 덩어리로 묶어서 보낼 수 있다면, 작은 단위로 나누어 출력하는 것보다 성능을 향상시킬 수 있다. 만일 데이터의 특성상 묶을 수가 없고, 작은 단위로 반복해서 보낼 수 밖에 없는 경우에는 출력 버퍼를 사용하는 것이 좋다.

출력 버퍼는 스트림으로 출력하는 데이터를 메모리상에서 임시로 보관하고 있다가 일정량이상 모이면 한번에 출력하는 방법이다.

파일 시스템에 데이터를 출력하는 경우를 생각해보자. 짧은 문자열을 수만번에 걸쳐서 저장하는 경우에 출력 버퍼를 사용하는 경우와 사용하지않는 경우를 비교해 본다. str5는 짧은 문자열을 가지고 있는 String 객체이다. test1() 메소드는 오직 FileWriter 클래스만을 이용하여 텍스트를 파일에 저장하는 메소드이다. test3() 메소드는 FileWriter 클래스에 PrintWriter 클래스를 붙여 println() 메소드를 포함한 편리한 메소드를 사용할 수 있게 된 메소드이다.

   public void test1() throws IOException {
      FileWriter wr = new FileWriter("test1.txt");
      for (int i = 0; i <= count; i++) {
         for (int j=0; j < size; j++) 
            wr.write(str5,0,str5.length());
      }
      wr.close();
   }

   public void test3() throws IOException {
      PrintWriter wr = new PrintWriter(new BufferedWriter(new FileWriter("test3.txt")));
      for (int i = 0; i <= count; i++) {
         for (int j=0; j < size; j++) 
            wr.println(str1);
      }
      wr.close();
   }

test2() 메소드는 BufferedWriter 클래스를 이용하여 test1() 메소드에 출력버퍼를 추가한 경우이다. test4() 메소드는 test3() 메소드에 BufferedWriter 클래스를 추가하여 FileWriter 클래스를 BufferedWriter 클래스와 PrintWriter 클래스로 감싼 경우이다.

   public void test2() throws IOException {
      BufferedWriter wr = new BufferedWriter(new FileWriter("test2.txt"));
      for (int i = 0; i <= count; i++) {
         for (int j=0; j < size; j++) 
            wr.write(str5,0,str5.length());
      }
      wr.close();
   }

   public void test4() throws IOException {
      PrintWriter wr = new PrintWriter(new FileWriter("test4.txt"));
      for (int i = 0; i <= count; i++) {
         for (int j=0; j < size; j++) 
            wr.println(str1);
      }
      wr.close();
   }

네가지 메소드를 6만4천회 반복(64번씩 천번 반복)하였을 때의 소요 시간과 남은 자유 메모리의 양을 비교해보면 다음과 같다.

- x 축은 반복횟수이며 64회 반복당 1씩 증가하며 총 6만4천번을 의미한다.
- y 축은 소요시간은 밀리초, 남은 자유 메모리는 MB이며 처음 힙 메모리의 크기는 64MB이다.


소요시간을 살펴보면 출력버퍼를 사용하지 않은 경우가 대략 50%정도의 시간이 더 걸린 것을 알 수 있다. PrinterWriter 클래스로 인한 소요시간의 증가는 println() 메소드를 사용하였기 때문이다. println() 메소드는 시스템에 맞는 줄바꿈기호를 추가하기 위하여 시간을 더 소모한다.

남은 자유 메모리의 양을 보면 출력버퍼가 메모리 소모를 극단적으로 줄이는 것을 알 수 있다. 출력버퍼를 사용하지 않으므로 빈번한 출력은 메모리의 소모를 증가시킨다. 이것은 나중에 garbage collection 등에 의한 시스템 지연효과도 불러올 수 있다.

PrintWriter 클래스의 autoFlush

출력버퍼를 얼마나 자주 flush 해야 할까? 출력버퍼에 임시로 보관되어 스트림으로 출력될 때까지 대기중인 데이터를 스트림으로 내보내는 것을 flush 라고 한다. BufferedWriter 클래스는 버퍼가 가득 차거나 스트림이 정상적으로 닫힐 때 자동으로 flush() 메소드를 호출한다. BufferedWriter 클래스의 flush() 메소드가 호출되기 전까지는 버퍼에 임시로 보관된 데이터는 스트림으로 실제로 출력되지 않고 버퍼에 대기하고 있게 된다.

PrintWriter 클래스는 생성자 중에 autoFlush 옵션이 있는 것이 있다. 이 옵션이 true 값으로 설정되면 print() 또는 write() 메소드의 경우엔 상관없지만, println() 메소드가 호출되면 자동으로 flush() 메소드를 호출한다. 그러나, 실제로 이 옵션을 사용하면 지나치게 빈번하게 버퍼를 비우는 경향이 발생한다.

다음은 BufferedWriter 클래스를 사용할 때와 사용하지 않을 때, PrintWriter 클래스의 autoFlush 옵션에 관한 성능 테스트이다. test5() 메소드와 test6() 메소드는 각각 test3() 메소드와 test4() 메소드에 autoFlush 옵션을 추가한 메소드이다.

   public void test5() throws IOException {
      PrintWriter wr = new PrintWriter(new BufferedWriter(new FileWriter("test5.txt")),true);
      for (int i = 0; i <= count; i++) {
         for (int j=0; j < size; j++) 
            wr.println(str1);
      }
      wr.close();
   }

   public void test6() throws IOException {
      PrintWriter wr = new PrintWriter(new FileWriter("test6.txt"),true);
      for (int i = 0; i <= count; i++) {
         for (int j=0; j < size; j++) 
            wr.println(str1);
      }
      wr.close();
   }

각 메소드를 6만4천회 반복하여 성능 테스트를 한 결과이다.


PrintWriter 클래스의 autoFlush 옵션을 true 값으로 설정한 경우 지나치게 빈번한 flush() 메소드의 호출로 출력 버퍼의 효과가 거의 없고 3-4배에 달하는 소요시간을 필요로 하게 된다. 남은 자유 메모리는 autoFlush 옵션보다는 BufferedWriter 클래스를 사용하는지 여부에 더 관련된다.

따라서, 특별히 이유가 있지 않다면 flush() 메소드나 autoFlush 옵션으로 출력 버퍼를 지나치게 빈번히 비우는 것은 자원을 낭비하게 된다. 따라서 BufferedWriter 클래스가 자동으로 출력 버퍼를 비우도록 하는 것이 좋다.

결론

IO 스트림에서 출력 버퍼를 사용하는 것이 어느 정도 성능 향상과 관련이 있는지를 알아보았다. 또한, autoFlush 옵션에 따른 빈번한 flush() 호출이 성능에 미치는 효과를 알아보았다. 다음 편에는 출력 버퍼의 크기에 따른 성능 비교를 통하여 적절한 버퍼의 크기를 알아본다.



본 글의 저작권은 이동훈에 있으며 저작권자의 허락없이 온라인/오프라인으로 본 글을 유보/복사하는 것을 금합니다.
Posted by 최범균 madvirus

댓글을 달아 주세요

  1. 개발자 2017.07.08 13:33 신고  댓글주소  수정/삭제  댓글쓰기

    감사합니다
    filewriter 만 쓰고 개발을 했더니 메모리가 점점 잡아먹는 현상ㅇ있었는데
    이 블로그의 내용대로 한다면 그런증상은 없어지겠네요

    다시한번 정보 공유에 감사드립니다.