주요글: 도커 시작하기

스프링 데이터(Spring Data) R2DBC를 사용하면 R2DBC 기반의 리포지토리를 쉽게 구현할 수쉽게 사용할 수 있다. 이 글에서는 이 중에서 스프링 데이터 R2DBC가 제공하는 DatabaseClient를 이용한 R2DBC 연동 방법을 소개한다.

의존 설정

스프링 데이터 R2DBC를 사용하기 위한 의존 설정은 다음과 같다.

<dependencyManagement>
    <dependencies>
        <dependency>
            <groupId>io.r2dbc</groupId>
            <artifactId>r2dbc-bom</artifactId>
            <version>Arabba-SR2</version>
            <type>pom</type>
            <scope>import</scope>
        </dependency>
    </dependencies>
</dependencyManagement>

<dependencies>
    <dependency>
        <groupId>dev.miku</groupId>
        <artifactId>r2dbc-mysql</artifactId>
    </dependency>
    <dependency>
        <groupId>io.r2dbc</groupId>
        <artifactId>r2dbc-pool</artifactId>
    </dependency>
    <dependency>
        <groupId>io.r2dbc</groupId>
        <artifactId>r2dbc-spi</artifactId>
    </dependency>

    <dependency>
        <groupId>org.springframework.data</groupId>
        <artifactId>spring-data-r2dbc</artifactId>
        <version>1.0.0.RELEASE</version>
    </dependency>
</dependencies>

r2dbc-bom의 Arabba-SR2 버전은 r2dbc 0.8.1 버전에 대응한다. 이 글을 쓰는 시점 기준으로 spring-data-r2dbc 최신 버전은 1.0.0이다.

DatabaseClient 생성

DatabaseClient는 ConnectionFactory를 이용해서 생성한다.

String url = "r2dbcs:mysql://user:user@localhost:3306/test";
ConnectionFactory connectionFactory = ConnectionFactories.get(url);

DatabaseClient client = DatabaseClient.create(connectionFactory);

스프링 빈으로 설정하고 싶다면 AbstractR2dbcConfiguration 클래스를 사용하면 된다.

@Configuration
public class ApplicationConfig extends AbstractR2dbcConfiguration {
    @Override
    @Bean
    public ConnectionFactory connectionFactory() {
        String url = "r2dbcs:mysql://user:user@localhost:3306/test";
        ConnectionFactory connectionFactory = ConnectionFactories.get(url);
        return connectionFactory;
    }
}

이 설정을 사용하면 connectionFactory() 메서드로 정의한 ConnectionFactory 빈을 이용해서 DatabaseClient 객체를 생성하고 빈으로 등록한다. AbstractR2dbcConfiguration 클래스의 databaseClient() 메서드가 이 과정을 처리한다.

DatabaseClient로 쿼리 실행하기

DatabaseClient로 쿼리를 실행하는 방법에는 두 가지가 있다. 하나는 SQL을 사용하는 것이고 다른 하나는 매핑 객체를 사용하는 것이다. 이 글에서는 SQL을 사용하는 방법을 살펴보고 다음에 매핑 객체를 이용하는 방법을 살펴보도록 하자.

조회 쿼리

다음 코드는 DatabaseClient를 이용해서 조회 쿼리를 실행하는 예를 보여준다.

Flux<Map<String, Object>> selectPub =
        client.execute("select id, name from member")
                .fetch()
                .all();

selectPub.subscribe(m -> {
    logger.info("m.get(id) : {}", m.get("id"));
});

위 코드에서 각 메서드는 다음을 지정한다.

  • execute : 실행할 SQL을 지정한다.
  • fetch : 쿼리 실행 결과를 읽어온다는 것을 지정한다.
  • all : 쿼리 실행 결과를 모두 읽어온다는 것을 지정한다.

'지정한다'고 설명했는데 실제 쿼리는 Flux를 구독하는 시점에 실행한다. 위 코드에서는 selectPub.subscribe() 코드를 실행하는 시점에 지정한 동작(execute로 쿼리 실행하고 fetch로 쿼리 결과를 가져오고 all로 모든 결과를 조회)을 수행한다.

 

쿼리 실행 결과는 Map에 담긴다. 한 개의 행이 한 개의 Map에 대응하며 칼럼 이름을 Map의 키로 사용한다. 칼럼 이름은 대소문자를 가리지 않는다. (실제 Map으로 spring-core 모듈에 포함된 LinkedCaseInsensitiveMap 구현을 사용한다.)

 

all() 대신 first()를 사용하면 전체 결과가 아닌 첫 번째 결과만 구한다. first() 메서드의 결과는 Mono이다.

Mono<Map<String, Object>> firstPub =
        client.execute("select id, name from member where id = '124124'")
                .fetch()
                .first();

 

결과가 딱 한 개이거나 없다면 one()을 사용할 수도 있다. 단 결과가 두 개 이상이면 구독 시점에 익셉션이 발생한다.

Mono<Map<String, Object>> one =
        client.execute("select id, name from member limit 1")
                .fetch()
                .one();

바인딩 파라미터

DatabaseClient는 이름과 인덱스의 두 가지 바인딩 파라미터를 지원한다. 다음은 이름 기반 바인딩 파라미터 사용 예이다. 쿼리에서 바인딩 파라미터 이름은 ":이름"의 형식을 갖는다. R2DBC의 Statement#bind() 메서드도 이름 기반 파라미터를 지원하지만 이때 바인딩 파라미터의 형식은 DBMS의 형식을 사용해야 하는 것과 다르다.

Flux<Map<String, Object>> all =
        client.execute("select id, name from member where id like :id or name like :name")
                .bind("id", "%jdbc%")
                .bind("name", "%jdbc%")
                .fetch()
                .all();

bind() 메서드는 첫 번째 인자로 파라미터 이름을, 두 번째 인자로 값을 사용한다.

 

다음은 인덱스 기반 바인딩 파라미터 사용 예이다.

Flux<Map<String, Object>> all =
        client.execute("select id, name from member where name like ? or id like ?")
                .bind(0, "%r2dbc%")
                .bind(1, "%r2dbc%")
                .fetch()
                .all();

인덱스는 0부터 시작한다.

수정 쿼리

다음은 수정 쿼리 지정 예이다.

Mono<Void> updPub = client.execute("update member set name = ? where id = ?")
        .bind(0, "id2")
        .bind(1, "bkchoi")
        .then();

쿼리 실행 결과로 변경된 행 개수를 구하고 싶다면 fetch(), rowsUpdated() 메서드를 사용하면 된다.

Mono<Integer> updPub = client.execute("update member set name = :name where id = :id")
        .bind("name", "id3")
        .bind("id", "bkchoi")
        .fetch()
        .rowsUpdated();

여러 쿼리 실행

구독하지 않고 여러 쿼리를 실행하려면 flatMap() 등을 사용해서 DatabaseClient로 생성한 Mono나 Flux를 연결한다. 아래 코드는 select 쿼리 결과가 존재하면 update 쿼리를 실행하고 존재하지 않으면 insert 쿼리를 실행하는 스트림 생성 예이다.

Mono<Integer> saveOrUpdate = client.execute("select id, name from member where id = :id")
        .bind("id", id)
        .fetch()
        .one()
        .flatMap(m ->
                client.execute("update member set name = :name where id = :id")
                        .bind("name", name)
                        .bind("id", id)
                        .fetch()
                        .rowsUpdated())
        .switchIfEmpty(
                client.execute("insert into member (id, name) values (:id, :name)")
                        .bind("name", name)
                        .bind("id", id)
                        .fetch()
                        .rowsUpdated()
        );

saveOrUpdate.subscribe( ...생략 )

 

위 코드에서 주의할 점은 각 쿼리를 실행할 때마다 DB 커넥션을 새로 구한다는 점이다. 즉 select 쿼리를 실행할 때 DB 커넥션을 구하고 쿼리를 실행한 뒤에는 커넥션을 닫는다. 비슷하게 update나 insert 쿼리를 실행할 때에도 DB 커넥션을 구하고 종료한다.

트랜잭션

스프링 데이터 R2DBC는 리액티브를 위한 트랜잭션 관리자인 R2dbcTransactionManager를 제공한다. 이 클래스를 이용해서 트랜잭션을 제어할 수 있다. 다음은 R2dbcTransactionManager를 이용해서 트랜잭션 범위 안에서 쿼리를 실행하는 코드 작성 예이다.

R2dbcTransactionManager tm = new R2dbcTransactionManager(connectionFactory);
TransactionalOperator operator = TransactionalOperator.create(tm);

Mono<Integer> saveOrUpdate = client.execute("select id, name from member where id = :id")
        .bind("id", id)
        .fetch()
        .one()
        .flatMap(m ->
                client.execute("update member set name = :name where id = :id")
                        .bind("name", name)
                        .bind("id", id)
                        .fetch()
                        .rowsUpdated())
        .switchIfEmpty(
                client.execute("insert into member (id, name) values (:id, :name)")
                        .bind("name", name)
                        .bind("id", id)
                        .fetch()
                        .rowsUpdated()
        ).as(operator::transactional);

다음은 R2dbcTransactionManager를 스프링 빈으로 등록한 예를 보여준다.

@Configuration
@EnableTransactionManagement
public class ApplicationConfig extends AbstractR2dbcConfiguration {
    @Override
    @Bean
    public ConnectionFactory connectionFactory() {
        String url = "r2dbcs:mysql://user:user@localhost:3306/test";
        ConnectionFactory connectionFactory = ConnectionFactories.get(url);
        return connectionFactory;
    }

    @Bean
    ReactiveTransactionManager transactionManager(ConnectionFactory connectionFactory) {
        return new R2dbcTransactionManager(connectionFactory);
    }
}

@EnableTransactionManagement를 사용했는데 이 경우 @Transactional 애노테이션을 이용해서 트랜잭션을 처리할 수 있다. 예를 들어 다음 코드는 saveOrUpdate() 메서드에 @Transactional 애노테이션을 적용했는데 이 경우 saveOrUpdate() 메서드가 리턴한 Mono를 구독할 때 트랜잭션 범위에서 관련 쿼리를 실행한다.

@Service
public class MemberService {
    private DatabaseClient client;

    public MemberService(DatabaseClient client) {
        this.client = client;
    }

    @Transactional
    public Mono<Integer> saveOrUpdate(String id, String name) {
        return client.execute("select id, name from member where id = :id")
                .bind("id", id)
                .fetch()
                .one()
                .flatMap(m ->
                        client.execute("update member set name = :name where id = :id")
                                .bind("name", name)
                                .bind("id", id)
                                .fetch()
                                .rowsUpdated())
                .switchIfEmpty(
                        client.execute("insert into member (id, name) values (:id, :name)")
                                .bind("name", name)
                                .bind("id", id)
                                .fetch()
                                .rowsUpdated()
                );
    }
}

 

R2DBC는 커넥션 풀 라이브러리를 제공한다. 이 라이브러릴 사용하면 어렵지 않게 R2DBC에 대한 커넥션 풀을 설정할 수 있다.

의존 설정

커넥션 풀을 설정하려면 r2dbc-pool 모듈을 추가한다.

<dependency>
    <groupId>io.r2dbc</groupId>
    <artifactId>r2dbc-pool</artifactId>
    <version>0.8.0.RELEASE</version>
</dependency>

URL로 커넥션 풀 사용하기

커넥션 풀을 사용하는 가장 쉬운 방법은 URL에 pool을 사용하는 것이다.

String url = "r2dbcs:pool:mysql://root:1@localhost:3306/test";
ConnectionFactory connectionFactory = ConnectionFactories.get(url);

Mono<Integer> updatedMono = Mono.from(connectionFactory.create())
        .flatMap(conn -> {
            Mono<Void> txMono = Mono.from(conn.beginTransaction());
            ...
            return updMono.delayUntil(ret -> conn.commitTransaction())
                    .onErrorResume(err -> Mono.from(conn.rollbackTransaction())
                                              .then(Mono.error(err)))
                    .doFinally(signal -> Mono.from(conn.close()).subscribe());
        });

URL의 드라이버 위치에 "pool"을 사용했고 나머지는 동일하다. URL을 사용하면 초기 크기나 최대 크기 기본값으로 10을 사용한다.

ConnectionFactoryOptions로 커넥션 풀 사용하기

커넥션 풀의 초기 크기, 최대 크기, 검증 쿼리를 직접 제어하고 싶다면 ConnectionFactoryOptions를 이용해서 ConnectionFactory를 생성하면 된다. 아래 코드는 사용 예이다.

ConnectionFactory connectionFactory = ConnectionFactories.get(ConnectionFactoryOptions.builder()
        .option(ConnectionFactoryOptions.SSL, true)
        .option(ConnectionFactoryOptions.DRIVER, "pool")
        .option(ConnectionFactoryOptions.PROTOCOL, "mysql")
        .option(ConnectionFactoryOptions.HOST, "localhost")
        .option(ConnectionFactoryOptions.PORT, 3306)
        .option(ConnectionFactoryOptions.USER, "root")
        .option(ConnectionFactoryOptions.PASSWORD, "1")
        .option(ConnectionFactoryOptions.DATABASE, "test")
        .option(Option.<Integer>valueOf("initialSize"), 5)
        .option(Option.<Integer>valueOf("maxSize"), 20)
        .option(Option.<String>valueOf("validationQuery"), "select 1+1")
        .build());

DRIVER 옵션 값은 "pool"로 지정하고 PROTOCOL 옵션에 실제 드라이버를 지정한다.

ConnectionPool 클래스로 커넥션 풀 사용하기

커넥션 풀에 대해 보다 상세한 설정을 하고 싶다면 ConnectionPool 클래스를 이용해서 커넥션 풀을 생성한다. 다음은 사용 예를 보여준다.

String url = "r2dbcs:mysql://root:1@localhost:3306/test";
ConnectionFactory connectionFactory = ConnectionFactories.get(url);

ConnectionPoolConfiguration configuration = ConnectionPoolConfiguration.builder(connectionFactory)
        .initialSize(5)
        .maxSize(20)
        .maxIdleTime(Duration.ofMinutes(10))
        .maxLifeTime(Duration.ofMinutes(60))
        .maxCreateConnectionTime(Duration.ofMillis(500))
        .maxAcquireTime(Duration.ofMillis(500))
        .validationDepth(ValidationDepth.LOCAL)
        .validationQuery("select 1+1")
        .name("POOL 01")
        .build();

ConnectionPool pool = new ConnectionPool(configuration);

Mono<Integer> updatedMono = Mono.from(pool.create())
        .flatMap(conn -> {
            Mono<Void> txMono = Mono.from(conn.beginTransaction());
            ...
            return updMono.delayUntil(ret -> conn.commitTransaction())
                .onErrorResume(err -> Mono.from(conn.rollbackTransaction())
                                          .then(Mono.error(err)))
                .doFinally(signal -> Mono.from(conn.close()).subscribe());
        });

ConnectionPoolConfiguration은 커넥션 풀 설정 정보를 담는다. ConnectionPoolConfiguration.builder() 메서드는 커넥션을 생성할 때 사용할 ConnectionFactory를 인자로 받으며 이후 initialSize(), maxSize() 등의 메서드를 이용해서 커넥션 풀을 설정한다.

 

설정한 ConnectionPoolConfiguration을 이용해서 ConnectionPool 객체를 생성한 뒤 ConnectionFactory 대신에 ConnectionPool을 이용해서 커넥션을 구하면 된다.

R2DBC(Reactive Relational Database Connectivity)는 SQL 데이터베이스를 위한 리액티브 API이다. 리액티브 스트림즈를 기반으로 SQL을 실행하는데 필요한 커넥션, 쿼리 실행, 트랜잭션 처리 등에 대한 API를 정의하고 있다. JDBC API처럼 R2DBC는 API만 정의하고 있다. r2dbc-spi 모듈이 SPI(service-provider interface)로서 각 드라이버는 SPI에 정의된 인터페이스를 알맞게 구현한다. 이 글을 쓰는 시점에서 R2DBC 버전은 0.8.0이며 스펙, 드라이버 구현 등 관련 문서는 https://r2dbc.io/ 사이트에서 확인할 수 있다.

 

* 이 글에서는 리액티브 스트림즈에 대한 내용은 설명하지 않는다. Publisher나 Subscriber에 대한 내용은 https://javacan.tistory.com/entry/Reactor-Start-1-RS-Flux-Mono-Subscriber 글을 참고한다.

의존 설정

R2DBC를 사용하려면 구현을 제공하는 드라이버가 필요하다. 예제에서는 r2dbc-mysql 드라이버를 사용한다.

<dependency>
    <groupId>dev.miku</groupId>
    <artifactId>r2dbc-mysql</artifactId>
    <version>0.8.0.RELEASE</version>
</dependency>

r2dbc-mysql 0.8.0은 r2dbc-spi 0.8.0 버전을 지원하며 r2dbc-mysql은 스프링 리액터와 네티를 이용해서 구현되어 있다.

주요 구성 요소

r2dbc를 이용해서 SQL을 실행하기 위해 사용하는 주요 구성 요소는 다음과 같다. 모두 io.r2dbc.spi 패키지에 위치하며ConnectionFactories를 제외한 나머지는 인터페이스이다.

 

  • ConnectionFactory : Connection을 생성하는 팩토리
  • ConnectionFactories : ConnectionFactory를 검색해서 제공하는 유틸리티
  • Connection : 데이터베이스에 대한 커넥션
  • Statement : 실행할 SQL
  • Batch : 배치로 실행할 SQL
  • Result : 쿼리 실행 결과

ConnectionFactory 구하기

쿼리를 실행하려면 커넥션을 먼저 구해야 한다. R2DBC는 ConnectionFactory를 이용해서 커넥션을 구할 수 있다. 드라이버이가 제공하는 ConnectionFactory를 직접 생성하거나 ConnectionFactories를 이용해서 ConnectionFactory를 구할 수도 있다. 참고로 ConnectionFactories는 자바 서비스 프로바이더를 이용해서 ConnectionFactory를 찾는다.

 

다음 코드는 ConnectionFactories.get() 메서드를 이용해서 ConnectionFactory를 구하는 코드 예이다.

String url = "r2dbcs:mysql://user:pw@localhost:3306/test";
ConnectionFactory connectionFactory = ConnectionFactories.get(url);

JDBC URL과 비슷하게 R2DBC도 URL을 이용해서 연결할 DB 정보를 지정한다. 다음은 URL 구성 요소를 보여준다.

scheme:driver:protocol://authority/path?query
  • scheme : URL이 유효한 R2DBC URL임을 지정한다. 유효한 스킴은 r2dbc와 r2dbcs(SSL 용)이다.
  • driver : 드라이버를 지정한다.
  • protocol : 드라이버에 따라 프로토콜 정보를 지정한다(선택).
  • authority : 접속할 DB와 인증 정보를 포함한다.
  • path : 초기 스카마나 데이터베이스 이름을 지정한다(선택).
  • query : 추가 설정 옵션을 전달할 때 사용한다(선택).

각 드라이버마다 URL 값이 다르므로 드라이버 문서를 참고한다.

Connection을 구하고 쿼리 실행하기

다음 코드는 Connection을 구하고 쿼리를 실행해서 원하는 결과를 출력하는 예이다.

CountDownLatch latch = new CountDownLatch(1);

ConnectionFactory connectionFactory = ConnectionFactories.get(url);

Publisher<? extends Connection> connPub = connectionFactory.create();
connPub.subscribe(new BaseSubscriber<Connection>() {
    @Override
    protected void hookOnNext(Connection conn) {
        Statement stmt = conn.createStatement("select id, name from member where name = ?name");
        stmt.bind("name", "최범균");
        Publisher<? extends Result> resultPub = stmt.execute();
        resultPub.subscribe(new BaseSubscriber<Result>() {
            @Override
            protected void hookOnNext(Result result) {
                Publisher<Member> memberPub = result.map((row, meta) -> 
                    new Member(row.get(0, String.class), row.get(1, String.class))
                );
                memberPub.subscribe(new BaseSubscriber<Member>() {
                    @Override
                    protected void hookOnNext(Member member) {
                        logger.info("회원 데이터 : {}", member);
                    }
                });
            }

            @Override
            protected void hookFinally(SignalType type) {
                conn.close().subscribe(new BaseSubscriber<Void>() {
                    @Override
                    protected void hookFinally(SignalType type) {
                        latch.countDown();
                    }
                });
            }
        });
    }
});

latch.await(); // latch.countDown() 실행 전까지 블록킹

코드가 다소 복잡하다. R2DBC의 주요 API는 리액티브 스트림의 Publisher 타입을 리턴하는데 그 타입을 그대로 사용해서 다소 복잡해졌다. Publisher#subscribe() 메서드에 전달할 Subscriber 구현 객체는 BaseSubscriber를 이용해서 생성했다. BaseSubscriber는 스프링 리액터가 제공하는 Subscriber 구현 클래스로 필요한 기능만 구현하기 위해 이 클래스를 사용했다.

 

먼저 Connection을 구하고 Statement를 생성하는 코드만 보자.

ConnectionFactory connectionFactory = ConnectionFactories.get(url);

final Publisher<? extends Connection> connPub = connectionFactory.create();
connPub.subscribe(new BaseSubscriber<Connection>() {
    @Override
    protected void hookOnNext(Connection conn) {
        Statement stmt = conn.createStatement("select id, name from member where name = ?name");
        stmt.bind("name", "최범균");
        Publisher<? extends Result> resultPub = stmt.execute();
        resultPub.subscribe(...);
    }
});

ConnectionFactory#create() 메서드가 생성한 Publisher는 연결에 성공하면 next 신호로 Connection을 보낸다. Subscriber는 onNext() 메서드로 Connection을 받아 쿼리를 실행한다(BaseSubscriber를 사용한 경우 hookOnNext() 메서드 사용).

 

쿼리를 실행하기 위한 Statement는 Connection#createStatement() 메서드로 생성한다. createStatement() 메서드는 실행할 쿼리를 입력받는다. JDBC의 Statement와 PreparedStatement가 R2DBC에서는 Statement 하나로 처리한다.

 

Statement#bind() 메서드를 이용해서 바인딩 파라미터에 값을 전달한다. 다음은 Statement가 제공하는 바인딩 파라미터 관련 메서드이다.

  • Statement bind(int index, Object value)
  • Statement bind(String name, Object value)
  • Statement bindNull(int index, Class<?> type)
  • Statement bindNull(String name, Class<?> type)

한 가지 주의할 점은 쿼리에서 사용하는 바인딩 파라미터가 JDBC와 다르다는 점이다. JDBC의 PreparedStatement는 물음표(?)를 이용해서 바인딩 파라미터를 지정하고 1부터 시작하는 인덱스를 사용한다. 반면에 DBMS가 사용하는 바인딩 파라미터를 사용하며 이름이나 0부터 시작하는 인덱스를 사용할 수 있다. 예를 들어 MySQL은 위 코드에서 보는 것처럼 바인딩 파라미터로 "?이름" 형식을 사용한다.

 

Statement#execute() 메서드는 Publisher<Result> 타입을 리턴한다. Result는 쿼리 실행 결과를 제공한다. Result가 제공하는 다음의 두 메서드를 이용해서 쿼리 실행 결과를 구할 수 있다.

public interface Result {
    // 변경된 행 개수를 리러탄한다.
    Publisher<Integer> getRowsUpdated();
    // 조회 결과(Row)를 변환한다.
    <T> Publisher<T> map(BiFunction<Row, RowMetadata, ? extends T> mappingFunction);
}

Row는 조회한 한 행에 대응한다. Row는 행의 데이터를 조회하기 위한 get() 메서드를 제공한다.

  • T get(int index, Class<T> type) : JDBC API와 달리 index는 0부터 시작한다. 
  • T get(String name, Class<T> type)

예제 코드는 SELECT 쿼리 실행 결과를 처리하므로 map() 메서드를 이용해서 조회환 결과를 Member 객체로 변환했다. 

Publisher<? extends Result> resultPub = stmt.execute();
resultPub.subscribe(new BaseSubscriber<Result>() {
    @Override
    protected void hookOnNext(Result result) {
        Publisher<Member> memberPub = result.map((row, meta) ->
                new Member(row.get(0, String.class), row.get(1, String.class))
        );
        memberPub.subscribe(new BaseSubscriber<Member>() {
            @Override
            protected void hookOnNext(Member member) {
                logger.info("회원 데이터 : {}", member);
            }
        });
    }

    @Override
    protected void hookFinally(SignalType type) {
        conn.close().subscribe(...);
    }
});

DB 작업이 끝나면 Connection#close() 메서드로 커넥션을 종료해야 한다. close() 메서드는 Publisher<Void>를 리턴하므로 실제 커넥션 종료는 close() 메서드가 리턴한 Publisher를 구독해야 실행된다.

 

리액터가 제공하는 BaseSubscriber는 complete 신호나 error 신호가 오면 hookFinally()를 실행하는 기능을 구현하고 있으므로 에러 여부에 상관없이 커넥션을 종료하기 위해 이 메서드를 이용했다.

Connection을 구하고 쿼리 실행하기 : 리액터 이용

Publisher를 이용하면 구독 처리 코드가 중첩되어 복잡해진다. 중첩 구조는 리액터를 사용해서 없앨 수 있다. 다음 코드는 리액터를 사용해서 구현현 예이다.

Mono<? extends Connection> connMono = Mono.from(connectionFactory.create());

Flux<Member> members = connMono.flatMapMany(conn -> {
        Flux<? extends Result> resultFlux = Flux.from(
                conn.createStatement("select id, name from member where name like ?name")
                        .bind("name", "%범%")
                        .execute()
        );

        Flux<Member> memberFlux = resultFlux.flatMap(result ->
                result.map((row, meta) ->
                        new Member(row.get("id", String.class), row.get("name", String.class))
                )
        );

        return memberFlux.doFinally(signal -> Mono.from(conn.close()).subscribe());
    }
);

List<Member> ret = members.collectList().block();

트랜잭션 처리

Connection은 트랜잭션 처리를 위해 다음 메서드를 제공한다.

  • Publisher<Void> beginTransaction()
  • Publisher<Void> commitTransaction()
  • Publisher<Void> rollbackTransaction()

다음은 트랜잭션 처리 예를 보여준다.

Mono<Integer> updatedMono = Mono.from(connectionFactory.create())
    .flatMap(conn -> {
        final Mono<Void> txMono = Mono.from(conn.beginTransaction());
        final Mono<Integer> updMono = txMono.then(
                Mono.from(conn.createStatement("insert into member values (?id, ?name)")
                        .bind("id", "bkchoi4")
                        .bind("name", "최범균2")
                        .execute()
                ).flatMap(
                        result -> Mono.from(result.getRowsUpdated())
                )
        );
        return updMono.delayUntil(ret -> conn.commitTransaction())
            .onErrorResume(err -> Mono.from(conn.rollbackTransaction()).then(Mono.error(err)))
            .doFinally(signal -> Mono.from(conn.close()).subscribe());
    });

트랜잭션 커밋/롤백 처리를 위해 리액터의 delayUntil(), onErrorResume() 메서드를 사용했다. 참고로 delayUntil() 메서드는 인자로 받은 Publisher가 종료된 뒤에 종료하며 onErrorResume() 메서드는 에러 발생시 인자로 받은 Publisher를 실행한다. 트랜잭션 롤백 처리 후에 Mono.error()를 이용해서 에러를 다시 발생시키도록 했다.

 

다소 동접이 발생하는 간단한 TCP 서버를 구현할 기술을 찾다가 리액터 네티(Reactor Netty)를 알게 되었다. 리액터 네티를 이용하면 네티를 기반으로 한 네트워크 프로그램을 리액터 API로 만들 수 있다. 리액터 네티를 사용하면 네티를 직접 사용하는 것보다 간결한 코드로 비동기 네트워크 프로그램을 만들 수 있는 이점이 있다.


다음은 리액터 네티(Reactor Netty)의 주요 특징이다.

  • 네티 기반
  • 리액터 API 사용
  • 논블로킹 TCP, UDP, HTTP 클라이언트/서버

이 글에서는 리액터 네티를 이용해서 간단한 소켓 서버를 만들어 보겠다.

TcpServer를 이용한 소켓 서버 만들기

리액터 네티는 TcpServer 클래스를 제공한다. 이 클래스를 이용해서 비교적 간단하게 비동기 소켓 서버를 구현할 수 있다. 이 글에서는 간단한 에코 서버를 만들어 본다. 만들 기능은 다음과 같다.

  • 클라이언트가 한 줄을 입력하면 "echo: 입력한 줄\r\n"으로 응답한다.
  • 클라이언트가 exit를 입력하면 클라이언트와 연결을 끊는다.
  • 클라이언트가 SHUTDOWN을 입력하면 서버를 종료한다.
  • 10초 이내에 클라이언트로부터 입력이 없으면 연결을 종료한다.
리액터 네티를 사용하기 위한 메이븐 설정은 다음과 같다.

    <dependencyManagement>
        <dependencies>
            <dependency>
                <groupId>io.projectreactor</groupId>
                <artifactId>reactor-bom</artifactId>
                <version>Californium-SR3</version> <!-- 리액터 네티 0.8.3에 대응 -->
                <type>pom</type>
                <scope>import</scope>
            </dependency>
        </dependencies>
    </dependencyManagement>

    <dependencies>
        <dependency>
            <groupId>io.projectreactor.netty</groupId>
            <artifactId>reactor-netty</artifactId>
        </dependency>

        <dependency>
            <groupId>io.projectreactor.addons</groupId>
            <artifactId>reactor-logback</artifactId>
        </dependency>
    </dependencies>

다음 코드는 리액터 네티로 만든 에코 서버의 전체 코드이다.

package demo;

import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.LineBasedFrameDecoder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Mono;
import reactor.netty.DisposableServer;
import reactor.netty.tcp.TcpServer;

import java.util.concurrent.CountDownLatch;

public class EchoServer {
    private static Logger log = LoggerFactory.getLogger(EchoServer.class);

    public static void main(String[] args) {
        CountDownLatch latch = new CountDownLatch(1);
        DisposableServer server = TcpServer.create()
                .port(9999) // 서버가 사용할 포트
                .doOnConnection(conn -> { // 클라이언트 연결시 호출
                    // conn: reactor.netty.Connection
                    conn.addHandler(new LineBasedFrameDecoder(1024));
                    conn.addHandler(new ChannelHandlerAdapter() {
                        @Override
                        public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
                            log.info("client added");
                        }

                        @Override
                        public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
                            log.info("client removed");
                        }

                        @Override
                        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
                                       throws Exception {
                            log.warn("exception {}", cause.toString());
                            ctx.close();
                        }
                    });
                    conn.onReadIdle(10_000, () -> {
                        log.warn("client read timeout");
                        conn.dispose();
                    });
                })
                .handle((in, out) -> // 연결된 커넥션에 대한 IN/OUT 처리
                        // reactor.netty (NettyInbound, NettyOutbound)
                        in.receive() // 데이터 읽기 선언, ByteBufFlux 리턴
                          .asString()  // 문자열로 변환 선언, Flux<String> 리턴
                          .flatMap(msg -> {
                                      log.debug("doOnNext: [{}]", msg);
                                      if (msg.equals("exit")) {
                                          return out.withConnection(conn -> conn.dispose());
                                      } else if (msg.equals("SHUTDOWN")) {
                                          latch.countDown();
                                          return out;
                                      } else {
                                          return out.sendString(Mono.just("echo: " + msg + "\r\n"));
                                      }
                                  }
                          )
                )
                .bind() // Mono<DisposableServer> 리턴
                .block();

        try {
            latch.await();
        } catch (InterruptedException e) {
        }
        log.info("dispose server");
        server.disposeNow(); // 서버 종료
    }
}


먼저 전체 코드 구조를 살펴보자.

DisposableServer server = TcpServer.create()
        .port(9999) // 포트 지정
        .doOnConnection(conn -> { // 클라이언트 연결시 호출 코드
            ...
        })
        .handle((in, out) -> // 데이터 입출력 처리 코드
            ...
        )
        .bind() // 서버 실행에 사용할 Mono<DisposableServer>
        .block(); // 서버 실행 및 DisposableServer 리턴

...(서버 사용)

// 서버 중지
server.disposeNow();

전체 코드 구조는 다음과 같다.
  • TcpServer.create()로 TcpServer 준비
  • port()로 사용할 포트 포트
  • doOnConnection() 메서드로 클라이언트 연결시 실행할 함수 설정
    • 이 함수에서 커넥션에 ChannelHandler를 등록하는 것과 같은 작업 수행
  • handle() 메서드로 클라이언트와 데이터를 주고 받는 함수 설정
  • bind() 메서드로 서버 연결에 사용할 Mono<DisposableServer> 생성
  • bind()가 리턴한 Mono의 block()을 호출해서 서버 실행하고 DisposableServer 리턴
서버가 정상적으로 구동되면 block() 메서드는 구동중인 DisposableServer를 리턴한다. DisposableServer의 disposeNow() 메서드는 서버를 중지할 때 사용한다. 이 외에도 서버 중지에 사용되는 몇 가지 dispose로 시작하는 메서드를 제공한다.

doOnConnection()으로 커넥션 초기화

doOnConnection() 메서드의 파라미터는 다음 함수형 타입이다.
  • Consumer<? super Connection>
reactor.netty.Connection 타입은 인터페이스로 네티의 ChannelHandler 등록과 몇 가지 이벤트 연동 기능을 제공한다. 예제 코드의 doOnConnection 설정 부분을 다시 보자.

.doOnConnection(conn -> { // 클라이언트 연결시 호출
    conn.addHandler(new LineBasedFrameDecoder(1024));
    conn.addHandler(new ChannelHandlerAdapter() {
        @Override
        public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
            log.info("client added");
        }
        ...
        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
        throws Exception {
            log.warn("exception {}", cause.toString());
            ctx.close();
        }
    });
    conn.onReadIdle(10_000, () -> {
        log.warn("client read timeout");
        conn.dispose();
    });

})

Connection#addHandler()는 네티의 ChannelHandler를 등록한다. 이 외에 addHandlerFirst(), addHandlerLast() 메서드를 제공한다. 이 메서드를 이용해서 필요한 네티 코덱을 등록하면 된다. 예제 코드에서는 한 줄씩 데이터를 읽어오는 LineBasedFrameDecoder를 등록했고 클라이언트 연결 이벤트에 따라 로그를 출력하기 위해 임의 ChannelHandlerAdapter 객체를 등록했다.

Connection#onReadIdle() 메서드는 첫 번째 인자로 지정한 시간(밀리초) 동안 데이터 읽기가 없으면 두 번째 인자로 전달받은 코드를 실행한다. 위 코드는 10초 동안 데이터 읽기가 없으면 연결을 종료한다. 비슷하게 onWriteIdle() 메서드는 지정한 시간 동안 쓰기가 없으면 코드를 실행한다.

handle() 메서드로 데이터 입출력 처리

데이터 송수신과 관련된 코드는 handle() 메서드로 지정한다. handle() 메서드가 전달 받는 함수형 타입은 다음과 같다.

BiFunction<? super NettyInbound, ? super NettyOutbound, ? extends Publisher<Void>>

이 함수는 NettyInbound와 NettyOutbound를 인자로 갖고 Publisher<Void>나 그 하위 타입을 리턴한다. 예제 코드의 handle() 메서드를 다시 보자.

.handle((in, out) -> // 연결된 커넥션에 대한 IN/OUT 처리
        // (NettyInbound, NettyOutbound)
        in.receive() // 데이터 읽기 선언, ByteBufFlux 리턴
          .asString()  // 문자열로 변환 선언, Flux<String> 리턴
          .flatMap(msg -> {
                      log.debug("doOnNext: [{}]", msg);
                      if (msg.equals("exit")) {
                          return out.withConnection(conn -> conn.dispose());
                      } else if (msg.equals("SHUTDOWN")) {
                          latch.countDown();
                          return out;
                      } else {
                          return out.sendString(Mono.just("echo: " + msg + "\r\n"));
                      }
                  }
          )
)


위 코드를 요약하면 다음과 같다.

  • NettyInbound#receive()는 데이터 수신을 위한 ByteBufFlux를 리턴
  • ByteBufFlux#asString()은 데이터를 문자열로 수신 처리
  • flatMap을 이용해서 수신한 메시지 처리

flatMap은 수신한 데이터를 이용해서 알맞은 처리를 한다. 클라이언트에 데이터를 전송할 때에는  NettyOutbound를 이용한다. NettyOutbound#sendString() 메서드를 이용하면 문자열 데이터를 클라이언트에 전송한다. NettyOutbound#sendString()의 파라미터는 Publisher<? extends String> 타입이기 때문에 위 코드에 Mono.just()를 이용했다.


Connection이 필요하면 NettyOutbound#withConnection() 메서드를 사용한다. 위 코드에서는 클라이언트가 "exit"를 전송하면 연결을 끊기 위해 이 메서드를 사용했다.


ByteBufFlux#asString() 메서드는 기본 캐릭터셋을 사용한다. 다른 캐릭터셋을 사용하고 싶다면 asString(Charset) 메서드를 사용한다. 비슷하게 NettyOutbound#sendString() 메서드도 기본 캐릭터셋을 사용하므로 다른 캐릭터셋을 사용하려면 NettyOutbound#sendString(Publisher, Charset) 메서드를 사용한다.


예제 실행

EchoServer를 실행해보자. 로그백을 사용했다면 아래와 비슷한 메시지가 출력되면서 서버가 구동된다.


08:49:10.522 [reactor-tcp-nio-1] DEBUG reactor.netty.tcp.TcpServer - [id: 0x1fb82e53, L:/127.0.0.1:9999] Bound new server


telnet을 이용해서 에코가 제대로 동작하는지 확인해본다. 클라이언트가 전송한 데이터를 굵은 글씨로 표시했고 서버가 응답한 데이터를 파란색으로 표시했다.


$ telnet localhost 9999

Trying 127.0.0.1...

Connected to localhost.

Escape character is '^]'.

124124

echo: 124124

wefwef

echo: wefwef

exit

Connection closed by foreign host.

$


위 과정에서 서버에 출력되는 로그는 다음과 같다(리액터 네티가 출력하는 로그는 생략했다.)


08:50:40.187 [reactor-tcp-nio-5] INFO  demo.EchoServer - client added

08:50:37.374 [reactor-tcp-nio-4] INFO  demo.EchoServer - doOnNext: [124124]

08:50:44.506 [reactor-tcp-nio-4] INFO  demo.EchoServer - doOnNext: [wefwef]

08:50:46.218 [reactor-tcp-nio-4] INFO  demo.EchoServer - doOnNext: [exit]

08:50:46.221 [reactor-tcp-nio-4] INFO  demo.EchoServer - client removed


Connection#onReadIdle()을 이용해서 읽기 타임아웃을 10초로 설정했는데 실제로 서버 접속 후 10초 동안 데이터를 전송하지 않으면 연결이 끊기는 것을 확인할 수 있다.


08:56:23.358 [reactor-tcp-nio-2] WARN  demo.EchoServer - client read timeout

08:56:23.360 [reactor-tcp-nio-2] INFO  demo.EchoServer - client removed


마지막으로 SHUTDOWN 명령어를 전송해 보자. 그러면 서버가 종료되는 것도 확인할 수 있을 것이다.


08:57:46.372 [reactor-tcp-nio-3] INFO  demo.EchoServer - doOnNext: [SHUTDOWN]

08:57:46.373 [main] INFO  demo.EchoServer - dispose server


스프링 리액터 로깅과 체크포인트


로깅

리액터의 동작을 보다 자세히 보고 싶다면 다음과 같이 log() 메서드를 사용한다. 아래 코드를 보자.


Flux.just(1, 2, 4, 5, 6)

        .log()

        .map(x -> x * 2)

        .subscribe(x -> logger.info("next: {}", x));


로깅 프레임워크로 SLF4j를 사용할 경우 실행한 결과는 다음과 같다.

08:38:29.990 [main] DEBUG reactor.util.Loggers$LoggerFactory - Using Slf4j logging framework
08:38:30.010 [main] INFO reactor.Flux.Array.1 - | onSubscribe([Synchronous Fuseable] FluxArray.ArraySubscription)
08:38:30.013 [main] INFO reactor.Flux.Array.1 - | request(unbounded)
08:38:30.014 [main] INFO reactor.Flux.Array.1 - | onNext(1)
08:38:30.014 [main] INFO logging.LoggingTest - next: 2
08:38:30.014 [main] INFO reactor.Flux.Array.1 - | onNext(2)
08:38:30.014 [main] INFO logging.LoggingTest - next: 4
08:38:30.014 [main] INFO reactor.Flux.Array.1 - | onNext(4)
08:38:30.014 [main] INFO logging.LoggingTest - next: 8
08:38:30.014 [main] INFO reactor.Flux.Array.1 - | onNext(5)
08:38:30.014 [main] INFO logging.LoggingTest - next: 10
08:38:30.014 [main] INFO reactor.Flux.Array.1 - | onNext(6)
08:38:30.014 [main] INFO logging.LoggingTest - next: 12
08:38:30.015 [main] INFO reactor.Flux.Array.1 - | onComplete()

"reactor.Flux.Array.1"이라는 로거가 출력한 로그 메시지는 Flux.just()가 생성한 시퀀스의 동작을 로그로 남긴 것이다. 로그를 보면 시퀀스가 request() 신호를 받은 시점, next 신호(onNext(2) 등)나 complete 신호(onComplete())를 발생한 시점을 확인할 수 있다.


로그 레벨은 INFO인데 로그 레벨을 변경하고 싶다면 다음과 같이 log() 메서드를 사용하면 된다.


Flux.just(1, 2, 4, 5, 6)

        .log(null, Level.FINE) // java.util.logging.Level 타입

        .subscribe(x -> logger.info("next: {}", x));


두 번째 인자로 자바 로깅의 Level.FINE을 주었다. SLF4j를 사용할 경우 리액터는 자바의 FINE 레벨을 SLF4j의 DEBUG 레벨로 기록한다. 따라서 위 코드를 실행하면 다음과 같이 DEBUG 레벨로 로그를 남기는 것을 확인할 수 있다.


08:50:30.098 [main] DEBUG reactor.Flux.Array.1 - | onSubscribe([Synchronous Fuseable] FluxArray.ArraySubscription)

08:50:30.101 [main] DEBUG reactor.Flux.Array.1 - | request(unbounded)

08:50:30.102 [main] DEBUG reactor.Flux.Array.1 - | onNext(1)

08:50:30.102 [main] INFO logging.LoggingTest - next: 1

08:50:30.102 [main] DEBUG reactor.Flux.Array.1 - | onNext(2)

08:50:30.102 [main] INFO logging.LoggingTest - next: 2


다음과 같이 특정 로거를 이용하도록 지정할 수도 있다. 


Flux.just(1, 2, 4, 5, 6)

        .log("MYLOG") // 또는 log("MYLOG", Level.INFO)

        .subscribe(x -> logger.info("next: {}", x));


위 코드를 실행하면 다음과 같이 지정한 로거를 이용해서 로그를 남긴다.


08:51:55.180 [main] INFO MYLOG - | onSubscribe([Synchronous Fuseable] FluxArray.ArraySubscription)

08:51:55.184 [main] INFO MYLOG - | request(unbounded)

08:51:55.184 [main] INFO MYLOG - | onNext(1)

08:51:55.184 [main] INFO logging.LoggingTest - next: 1

08:51:55.184 [main] INFO MYLOG - | onNext(2)

08:51:55.184 [main] INFO logging.LoggingTest - next: 2

08:51:55.184 [main] INFO MYLOG - | onNext(4)


체크포인트

시퀀스가 신호를 발생하는 과정에서 익셉션이 발생하면 어떻게 될까? 시퀀스가 여러 단게를 거쳐 변환한다면 어떤 시점에 익셉션이 발생했는지 단번에 찾기 힘들 수도 있다. 이럴 때 도움이 되는 것이 체크포인트이다. 다음은 체크포인트 사용 예이다.


Flux.just(1, 2, 4, -1, 5, 6)

        .map(x -> x + 1)

        .checkpoint("MAP1")

        .map(x -> 10 / x) // 원본 데이터가 -1인 경우 x는 0이 되어 익셉션이 발생

        .checkpoint("MAP2")

        .subscribe(

                x -> System.out.println("next: " + x),

                err -> err.printStackTrace());


이 코드는 데이터에 1을 더하고 다시 10을 데이터로 나누는 변환을 수행한다. 원본 데이터에 -1이 있으므로 중간에 0으로 나누게 되어 익셉션이 발생하게 된다. checkpoint()를 사용하면 어떤 단계에서 익셉션이 발생했는지 쉽게 확인할 수 있다. 아래 코드는 익셉션이 발생했을 때 출력한 익셉션 트레이스 메시지인데 이 메시지를 보면 checkpoint()로 지정한 description이 익셉션 트레이스 마지막에 출력되는 것을 알 수 있다. 이를 통해 어느 과정에서 익셉션이 발생했는지 쉽게 찾을 수 있다.

java.lang.ArithmeticException: / by zero
    at logging.CheckpointTest.lambda$checkpoint$1(CheckpointTest.java:15)
    at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onNext(FluxMapFuseable.java:107)
    ...생략
    Suppressed: reactor.core.publisher.FluxOnAssembly$OnAssemblyException: 
Assembly site of producer [reactor.core.publisher.FluxMapFuseable] is identified by light checkpoint [MAP2]."description" : "MAP2"



관련글


리액터 윈도우


일정 개수로 묶어서 Flux 만들기: window(int), window(int, int)

Flux#window(int) 메서드를 사용하면 시퀀스가 발생시키는 데이터를 일정 개수로 묶을 수 있다. 다음은 예제 코드이다.


Flux<Flux<Integer>> windowSeq = 

        Flux.just(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)

             .window(4); // 4개 간격으로 4개씩 새로운 Flux로 묶음


windowSeq.subscribe(seq -> { // seq는 Flux<Integer>

    Mono<List<Integer>> monoList = seq.collectList();

    monoList.subscribe(list -> logger.info("window: {}", list));

});


위 코드에서 Flux#window(4)가 리턴하는 타입은 Flux<Flux<Integer>>이다. 즉 값이 Flux<Integer>인 Flux를 리턴한다. 이 시퀀스(Flux<Integer>)가 발생하는 값의 개수는 최대 4개이다. 위 코드의 실행 결과는 다음과 같다. 결과를 보면 4개씩 데이터를 묶어서 하나의 Flux로 만든 것을 알 수 있다.


01:19:52.388 [parallel-2] INFO batch.WindowTest - window: [5, 6, 7, 8]

01:19:52.388 [parallel-1] INFO batch.WindowTest - window: [1, 2, 3, 4]

01:19:52.391 [parallel-1] INFO batch.WindowTest - window: [9, 10]


Flux.window(int maxSize, int skip) 메서드를 사용하면 어느 간격으로 데이터를 묶을지 정할 수 있다. 두 번째 파라미터는 몇 개씩 건너서 데이터를 묶을 지 결정한다. 예를 들어 다음 코드를 보자.


Flux<Flux<Integer>> windowSeq =

        Flux.just(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)

                .window(4, 3); // 3개 간격마다 4개씩 새로운 Flux로 묶음


windowSeq.subscribe(seq -> { // seq는 Flux<Integer>

    Mono<List<Integer>> monoList = seq.collectList();

    monoList.subscribe(list -> logger.info("window: {}", list));

});


위 코드는 두 번째 인자로 3을 주었다. 이 경우 3개 데이터 간격으로 4개씩 데이터를 묶는다. 데이터를 묶는 간격이 데이터를 묶는 개수보다 작으므로 일부 데이터에 중복이 발생한다.


15:18:37.898 [main] INFO batch.WindowTest - window: [1, 2, 3, 4]

15:18:37.898 [main] INFO batch.WindowTest - window: [4, 5, 6, 7]

15:18:37.898 [main] INFO batch.WindowTest - window: [7, 8, 9, 10]

15:18:37.898 [main] INFO batch.WindowTest - window: [10]


다음과 같이 skip 파라미터 값으로 5를 주면 어떻게 될까? 데이터를 묶는 개수보다 간격이 더 크므로 일부 데이터에 누락이 발생할 것이다. 


Flux<Flux<Integer>> windowSeq2 =

        Flux.just(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)

                .window(4, 5); // 5개 간격마다 4개씩 새로운 Flux 묶음


// 첫 번째 Flux<Integer> : [1, 2, 3, 4]
// 두 번째 Flux<Integer> : [6, 7, 8, 9]



일정 시간 간격으로 묶어서 Flux 만들기: window(Duration), window(Duration, Duration)

Flux#window(Duration) 메서드를 사용하면 시퀀스가 발생시키는 데이터를 일정 시간마다 묶을 수 있다. 다음은 예제 코드이다.


Flux<Flux<Long>> windowSeq = Flux.interval(Duration.ofMillis(100))

      .window(Duration.ofMillis(500)); // 500밀리초 간격마다 500밀리초 동안 데이터 묶음


이 코드는 500밀리초(0.5초) 동안 발생한 데이터를 묶는다.


데이터를 묶기 시작하는 간격을 지정하고 싶다면 Flux#window(Duration, Duration) 메서드를 사용한다.


Flux<Flux<Long>> windowSeq = Flux.interval(Duration.ofMillis(100))

        // 400밀리초 간격마다 500밀리초 동안 데이터 묶음

        .window(Duration.ofMillis(500), Duration.ofMillis(400))



특정 조건에 다다를 때가지 묶어서 Flux 만들기: windowUntil(Predicate)

특정 조건을 충족하는 데이터를 만날 때까지 묶어서 Flux로 만들고 싶다면 windowUntil()을 사용한다. 다음은 사용 예이다.


Flux.just(1,1,2,3,3,4,5)

        .windowUntil(x -> x % 2 == 0)

        .subscribe((Flux<Integer> seq) -> {

            seq.collectList().subscribe(lst -> logger.info("window: {}", lst));

        });


위 코드는 2로 나눠서 나머지가 0인(즉 짝수인) 값을 만날 때까지 묶는다. 실제 실행 결과를 보면 다음과 같다.


01:19:27.166 [main] INFO batch.WindowTest - window: [1, 1, 2]

01:19:27.169 [main] INFO batch.WindowTest - window: [3, 3, 4]

01:19:27.169 [main] INFO batch.WindowTest - window: [5]


다음과 같이 마지막 데이터가 조건에 해당하면 어떻게 될까?


Flux.just(1,1,2,3,3,4)

        .windowUntil(x -> x % 2 == 0)

        .subscribe(seq -> {

            seq.collectList().subscribe(lst -> logger.info("window: {}", lst));

        });


결과를 보면 다음과 같이 마지막에 빈 Flux가 하나 더 발생되는 것을 알 수 있다.


17:23:22.724 [main] INFO batch.WindowTest - window: [1, 1, 2]

17:23:22.727 [main] INFO batch.WindowTest - window: [3, 3, 4]

17:23:22.727 [main] INFO batch.WindowTest - window: []


특정 조건을 충족하는 동안 묶어서 Flux 만들기: windowWhile(Predicate)

Flux#windowWhile(Predicate)은 해당 조건을 충족하지 않는 데이터가 나올 때까지 묶어서 Flux를 만든다. 조건을 충족하지 않는 데이터로 시작하거나 연속해서 데이터가 조건을 충족하지 않으면 빈 윈도우를 생성한다.


Flux.just(1,1,2,4,3,3,4,6,8,9,10)

        .windowWhile(x -> x % 2 == 0) // 짝수인 동안

        .subscribe(seq -> {

            seq.collectList().subscribe(lst -> logger.info("window: {}", lst));

        });


이 코드의 결과는 다음과 같다.


01:07:00.239 [main] INFO batch.WindowTest - window: []

01:07:00.242 [main] INFO batch.WindowTest - window: []

01:07:00.242 [main] INFO batch.WindowTest - window: [2, 4]

01:07:00.242 [main] INFO batch.WindowTest - window: []

01:07:00.242 [main] INFO batch.WindowTest - window: [4, 6, 8]

01:07:00.242 [main] INFO batch.WindowTest - window: [10]


Flux 대신 List로 묶기: buffer류 메서드

window류 메서드가 Flux로 묶는다면 buffer류 메서드는 Collection으로 묶는다. 메서드 이름이 window에서 buffer로 바뀔뿐 시그너쳐는 동일하다. 다음은 buffer류 메서드의 사용 예이다.


Flux<List<Integer>> bufferSeq = Flux.just(1, 2, 3, 4, 5, 6, 7, 8, 9, 10).buffer(4);

bufferSeq.subscribe(list -> logger.info("window: {}", list));



관련글




리액터 모으기(aggregation) 연산


List 콜렉션으로 모으기: collectList()

Flux는 데이터를 콜렉션으로 모을 수 있는 기능을 제공한다. 이 중에서 List로 모아주는 collectList()는 다음과 같이 사용한다.


Mono<List<Integer>> mono = someFlux.collectList();

mono.subscribe(lst -> System.out.println(lst));


collectList()의 리턴 타입은 Mono<List<T>>이므로 Mono를 구독해서 값을 사용하면 된다.


Map 콜렉션으로 모으기: collectMap()

다음의 Flux#collectMap()을 이용해서 Map으로 모을 수도 있다.


  • Mono<Map<K, T>> collectMap(Function<? super T, ? extends K> keyExtractor)
  • Mono<Map<K, V>> collectMap(Function<? super T, ? extends K> keyExtractor,
                                              Function<? super T, ? extends V> valueExtractor)
  • Mono<Map<K, V>> collectMap(Function<? super T, ? extends K> keyExtractor,
                                              Function<? super T, ? extends V> valueExtractor,
                                              Supplier<Map<K, V>> mapSupplier)

각 인자는 다음과 같다.

  • keyExtractor : 데이터에서 맵의 키를 제공하는 함수
  • valueExtractor : 데이터에서 맵의 값을 제공하는 함수
  • mapSupplier : 사용할 Map 객체를 제공(mapSupplier가 없는 메서드는 기본으로 HashMap 사용)

다음 코드는 각 메서드의 사용 예이다.


// keyExtractor만 지정. 값은 그대로 사용.


Mono<Map<Integer, Tuple2<Integer, String>>> numTupMapMono =

        Flux.just(Tuples.of(1, "일"), Tuples.of(2, "이"), Tuples.of(3, "삼"), Tuples.of(4, "사"))

                .collectMap(x -> x.getT1()); // keyExtractor



// String을 리턴하는 valueExtractor 사용.


Mono<Map<Integer, String>> numLabelMapMono =

        Flux.just(Tuples.of(1, "일"), Tuples.of(2, "이"), Tuples.of(3, "삼"), Tuples.of(4, "사"))

                .collectMap(x -> x.getT1(), // keyExtractor

                        x -> x.getT2()); // valueExtractor



// Map으로 TreeMap 사용


Mono<Map<Integer, String>> numLabelTreeMapMono =

        Flux.just(Tuples.of(1, "일"), Tuples.of(2, "이"), Tuples.of(3, "삼"), Tuples.of(4, "사"))

                .collectMap(x -> x.getT1(), // keyExtractor

                        x -> x.getT2(), // valueExtractor

                        () -> new TreeMap<>()); // mapSupplier


collectMap은 중복된 키가 존재하면 마지막 데이터와 관련된 값이 사용된다. 예를 들어 아래 코드는 Flux가 생성하는 데이터는 4개지만 키로 사용하는 값이 중복되므로 실제 Map에는 2와 4 두 개의 데이터만 저장된다.


Flux.just(1, 2, 3, 4)
     .collectMap(x -> x % 2)
     .subscribe(map -> System.out.println(map)); // {0=4, 1=3}

Map의 값을 콜렉션으로 모으기: collectMultiMap()

collectMultiMap()을 사용하면 같은 키를 가진 데이터를 List로 갖는 Map을 생성할 수 있다. 다음은 예제 코드이다.


Mono<Map<Integer, Collection<Integer>>> oddEvenList =

        Flux.just(1, 2, 3, 4).collectMultimap(x -> x % 2);

oddEvenList.subscribe(map -> System.out.println(map)); // {0=[2, 4], 1=[1, 3]}


collectMultiMap() 메서드는 collectMap() 메서드와 동일한 파라미터를 갖는다.


개수 새기: count()

Flux#count() 메서드를 사용하면 개수를 제공하는 Mono를 리턴한다.


Mono<Long> countMono = Flux.just(1, 2, 3, 4).count();


누적 하기: reduce()

reduce()는 각 값에 연산을 누적해서 결과를 생성한다. Flux의 데이터를 이용해서 단일 값을 생성하는 범용 기능이라고 보면 된다. 첫 번째 살펴볼 reduce() 메서드 다음과 같다. 이 메서드는 Flux가 발생하는 데이터와 동일 타입으로 누적할 때 사용한다.

  • Mono<T> reduce(BiFunction<T, T, T> aggregator)

aggregator는 인자가 두 개인 함수이다. 이 함수의 첫 번째 인자는 지금까지 누적된 값을 받으며, 두 번째 인자는 누적할 데이터를 받는다. aggregator는 두 인자를 이용해서 새로운 누적 값을 리턴한다. 새 누적 값은 다음 데이터를 aggregator 함수로 누적할 때 첫 번째 인자로 사용된다.


예를 들어 간단한 곱셈 기능을 reduce()를 이용해서 다음과 같이 구현할 수 있다.


Mono<Integer> mulMono = Flux.just(1, 2, 3, 4).reduce((acc, ele) -> acc * ele);

mulMono.subscribe(sum -> System.out.println("sum : " + sum);


acc는 이전까지 누적된 값인데, 두 번째 데이터를 누적할 때 첫 번째 데이터를 누적된 값(acc)으로 사용한다. 위 코드는 다음과 같은 계산을 거쳐 최종 값으로 24를 출력한다.


acc1 = 1 // 첫 번째 값을 누적 값의 초기 값으로 사용

acc2 = aggregator(acc1, 2) // 1 * 2

acc3 = aggregator(acc2, 3) // 2 * 3

acc4 = aggregator(acc3, 4) // 6 * 4


누적 값의 초기 값을 지정하고 싶거나 데이터와 다른 타입으로 누적하고 싶다면 다음 reduce() 메서드를 사용한다.

  • Mono<A> reduce(A initial, BiFunction<A, ? super T, A> accumulator)
  • Mono<A> reduceWith(Supplier<A> initial, BiFunction<A, ? super T, A> accumulator)

reduce()의 initial은 초기 값이고, reduceWith()의 initial은 초기값을 제공하는 Supplier이다. 다음은 초기 값을 사용하는 reduce() 메서드의 사용예이다.


Mono<String> strMono = Flux.just(1, 2, 3, 4)

                                        .reduce("", (str, ele) -> str + "-" + ele.toString());

strMono.subscribe(System.out::println); // -1-2-3-4 출력


누적하면서 값 생성하기: scan()

데이터를 누적하면 중간 누적 결과를 데이터로 생성하고 싶다면 scan() 메서드를 사용한다. 최종 누적된 값 한 개만 발생하는 reduce()와 달리 scan()은 중간 결과를 포함한 여러 값을 생성하므로, scan()의 리턴 타입은 Flux이다. 다음은 같은 타입으로 누적한 결과를 발생하는 scan() 메서드이다.

  • Flux<T> scan(BiFunction<T, T, T> accumulator)

리턴 타입이 Flux인 것을 제외하면 reduce()와 동일하다. 


다음은 예제 코드이다.


Flux<Integer> seq = Flux.just(1, 2, 3, 4).scan((acc, x) -> acc * x);

seq.subscribe(System.out::println);


다음은 위 코드의 출력 결과이다. 중간 결과가 출력되는 것을 알 수 있다.


1

2

6

24


reduce()와 동일하게 누적 초기값을 갖는 메서드를 제공한다.

  • Flux<A> scan(A initial, BiFunction<A, ? super T, A> accumulator)
  • Flux<A> scanWith(Supplier<A> initial, BiFunction<A, ? super T, A> accumulator)

다음은 초기 값을 지정하는 사용 예이다.


Flux<Integer> seq = Flux.just(2, 3, 4).scan(1, (acc, x) -> acc * x);

seq.subscribe(System.out::println);


실행 결과는 다음과 같다.


1

2

6

24


결과를 보면 초기 값(1)도 시퀀스의 값으로 발생한 것을 알 수 있다.


데이터 조건 검사

모든/일부 데이터가 특정 조건을 충족하는지 검사할 때는 all()이나 any()를 사용한다.


Mono<Boolean> all = Flux.just(1, 2, 3, 4).all(x -> x > 2);

all.subscribe(b -> System.out.println("all: " + b)); // false


Mono<Boolean> any = Flux.just(1, 2, 3, 4).any(x -> x > 2);

any.subscribe(b -> System.out.println("any: " + b)); // true


데이터가 존재하는지 또는 특정 데이터를 포함하는지 검사할 때는 hasElements()나 hasElement()를 사용한다.


Mono<Boolean> hasElements = Flux.just(1, 2, 3, 4).hasElements();

hasElements.subscribe(b -> System.out.println("hasElements: " + b)); // true


Mono<Boolean> hasElement = Flux.just(1, 2, 3, 4).hasElement(3);

hasElement.subscribe(b -> System.out.println("hasElement: " + b)); // true


관련 글





병렬(Parallel) 처리

시퀀스는 순차적으로 next 신호를 발생하고 Subscriber는 순차적으로 신호를 처리한다. 리액터는 시퀀스가 발생하는 next 신호를 병렬로 처리할 수 있는 방법을 제공한다. 이 글에서는 Flux의 parallel()을 사용하는 방법과 zip()을 이용한 방법에 대해 살펴본다.


parallel()과 runOn()으로 Flux 병렬 처리하기

Flux#parallel()과 runOn()을 사용하면 Flux가 생성하는 next 신호를 병렬로 처리할 수 있다. 다음 예를 보자.


Flux.range(1, 20)

        .parallel(2) // 작업을 레일로 나누기만 함

        .runOn(Schedulers.newParallel("PAR", 2))  // 각 레일을 병렬로 실행

        .map(x -> {

            int sleepTime = nextSleepTime(x % 2 == 0 ? 50 : 100, x % 2 == 0 ? 150 : 300);

            logger.info("map1 {}, sleepTime {}", x, sleepTime);

            sleep(sleepTime);

            return String.format("%02d", x);

        })

        .subscribe(i -> logger.info("next {}", i) );


// nextSleepTime은 인자로 받은 두 정수 값 범위에 해당하는 임의의 값을 생성한다고 가정


Flux#parallel(int parallelism) 메서드는 Flux가 생성하는 next 신호를 parallelism 개수만큼 라운드 로빈 방식으로 신호를 나눈다. 분리한 신호는 일종의 신호를 전달할 레일(rail)을 구성한다. 위 코드는 2를 값으로 주었으므로 2개의 레일을 생성한다. 라운드 로빈 방식을 사용해서 각 레일에 값을 전달하므로 위 코드는 [1, 3, 5, .., 19]를 제공하는 레일과 [2, 4, 6, ..., 20]를 제공하는 레일을 생성한다.


parallel()로 여러 레일을 만든다고 해서 병렬로 신호를 처리하는 것은 아니다. parallel()은 병렬로 신호를 처리할 수 있는 ParallelFlux를 리턴하는데, ParallelFlux의 runOn() 메서드에 다중 쓰레드를 사용하는 스케줄러를 전달해야 병렬로 신호를 처리할 수 있다. 위 코드는 2개 쓰레드를 사용하는 parallel 스케줄러를 전달했으므로 동시에 2개 레일로부터 오는 신호를 처리하게 된다.


병렬로 처리되는 것을 확인하기 위해 map() 메서드는 값이 짝수인 경우 50~150 밀리초, 홀수인 경우 100~300 밀리초 동안 슬립하고 문자열로 변환한 값을 리턴하도록 구현했다. parallel()은 라운드 로빈 방식으로 레일을 나누므로 짝수 레일과 홀수 레일이 생성되므로 슬립 타임 구간이 작은 짝수 레일이 더 빨리 끝나게 된다.


실제 결과를 확인해보자.


13:45:14.272 [PAR-1] INFO parallel.ParallelTest - map1 1, sleepTime 117

13:45:14.272 [PAR-2] INFO parallel.ParallelTest - map1 2, sleepTime 96

13:45:14.378 [PAR-2] INFO parallel.ParallelTest - next 02

13:45:14.378 [PAR-2] INFO parallel.ParallelTest - map1 4, sleepTime 98

13:45:14.399 [PAR-1] INFO parallel.ParallelTest - next 01

13:45:14.399 [PAR-1] INFO parallel.ParallelTest - map1 3, sleepTime 268

13:45:14.477 [PAR-2] INFO parallel.ParallelTest - next 04

13:45:14.477 [PAR-2] INFO parallel.ParallelTest - map1 6, sleepTime 93

13:45:14.570 [PAR-2] INFO parallel.ParallelTest - next 06

...생략

13:45:14.868 [PAR-2] INFO parallel.ParallelTest - map1 16, sleepTime 50

13:45:14.905 [PAR-1] INFO parallel.ParallelTest - next 05

13:45:14.905 [PAR-1] INFO parallel.ParallelTest - map1 7, sleepTime 201

13:45:14.918 [PAR-2] INFO parallel.ParallelTest - next 16

13:45:14.918 [PAR-2] INFO parallel.ParallelTest - map1 18, sleepTime 122

13:45:15.040 [PAR-2] INFO parallel.ParallelTest - next 18

13:45:15.040 [PAR-2] INFO parallel.ParallelTest - map1 20, sleepTime 62

13:45:15.102 [PAR-2] INFO parallel.ParallelTest - next 20

13:45:15.106 [PAR-1] INFO parallel.ParallelTest - next 07

13:45:15.106 [PAR-1] INFO parallel.ParallelTest - map1 9, sleepTime 202

13:45:15.308 [PAR-1] INFO parallel.ParallelTest - next 09

13:45:15.308 [PAR-1] INFO parallel.ParallelTest - map1 11, sleepTime 131

13:45:15.439 [PAR-1] INFO parallel.ParallelTest - next 11

13:45:15.439 [PAR-1] INFO parallel.ParallelTest - map1 13, sleepTime 289

13:45:15.728 [PAR-1] INFO parallel.ParallelTest - next 13

13:45:15.728 [PAR-1] INFO parallel.ParallelTest - map1 15, sleepTime 288

13:45:16.017 [PAR-1] INFO parallel.ParallelTest - next 15

13:45:16.017 [PAR-1] INFO parallel.ParallelTest - map1 17, sleepTime 156

13:45:16.173 [PAR-1] INFO parallel.ParallelTest - next 17

13:45:16.173 [PAR-1] INFO parallel.ParallelTest - map1 19, sleepTime 247

13:45:16.420 [PAR-1] INFO parallel.ParallelTest - next 19


실행 결과를 보면 PAR-1 쓰레드는 홀수를 PAR-2는 짝수를 처리하는 것을 알 수 있다. 즉 쓰레드마다 한 레일을 처리하고 있다. 짝수인 경우 슬립 타임을 더 작은 범위로 주었으므로 짝수 레일을 처리한 PAR-2가 먼저 레일을 처리하고 있다.

아래와 같이 레일은 4개로 나누었는데 쓰레드가 2개인 병렬 스케줄러를 사용하면 어떻게 될까?

Flux.range(1, 20)
        .parallel(4)
        .runOn(Schedulers.newParallel("PAR", 2))
        .map(x -> {
            ...
        })
        .subscribe(i -> logger.info("next {}", i) );


이 경우 스케줄러는 2개의 레일을 먼저 처리한다. 한 레일에 남아 있는 데이터가 없으면 데이터가 남아 있는 다른 레일을 처리한다.


레일당 크기

ParallelFlux#runOn() 메서드는 기본적으로 한 레일 당 Queues.SMALL_BUFFER_SIZE 만큼의 데이터를 저장한다. (이 값은 reactor.bufferSize.small 시스템 프로퍼티 값을 사용하는데 이 값을 지정하지 않으면 256을 사용하고 이 값이 16보다 작으면 16을 사용한다.)


레일에 미리 채울 데이터 개수를 변경하려면 다음과 같이 runOn() 메서드의 두 번째 인자로 값을 주면 된다. 다음 코드는 레일에 미리 채울 값(prefetch)으로 2를 사용한 예이다.


Flux.range(1, 20)

        .parallel(4)

        .runOn(Schedulers.newParallel("PAR", 2), 2) // 레일에 미리 채울 값으로 2 사용

        .subscribe(x -> logger.info("next {}", x));


위 코드의 경우 최초에 각 레일에 다음과 같이 데이터가 채워진다.

레일0: 1, 5
레일1: 2, 6
레일2: 3, 7
레일3: 4, 8


스케줄러는 2개의 쓰레드를 사용하는데 두 쓰레드를 PAR-1, PAR-2라고 하자. 이 두 쓰레드가 처음에 각각 레일0과 레일1을 선택했다고 하자.


레일0: 1, 5 (PAR-1)
레일1: 2, 6 (PAR-2)
레일2: 3, 7
레일3: 4, 8

두 쓰레드가 레일의 데이터를 처리하면 상태는 다음과 같이 바뀐다.


레일0: (PAR-1)
레일1: (PAR-2)
레일2: 3, 7
레일3: 4, 8

이 상태에서 PAR-2가 레일1이 비어있는지 여부를 검사한다면 레일이 비워져 있으므로 다음 레일을 선택한다. 이때 레일3을 선택했다고 하자. 그리고 PAR-1이 레일0이 비어있는지 여부를 검사하기 전에 레일0과 레일1이 채워졌다고 하자. 그럼 상태는 다음과 같이 바뀐다.


레일0: 9 (PAR-1)
레일1: 10
레일2: 3, 7 
레일3: 4, 8 (PAR-2)

그러면 PAR-2는 4를 처리하고 PAR-1은 9를 처리한다. PAR-1이 9를 처리하는 동안에 레일0에 데이터가 채워지지 않았다면 다음 레일을 선택하는데 이때 레일1을 선택할 수 있다.

레일0: 11,
레일1: 10 (PAR-1)
레일2: 3, 7 
레일3: 8 (PAR-2)

이렇게 병렬 스케줄러의 쓰레드 개수가 레일 개수보다 작으면 그때 그때 레일의 데이터 개수에 따라 스케줄러가 선택하는 레일이 달라지게 된다.

Mono.zip()으로 병렬 처리하기

각 Mono의 구독 처리 쓰레드를 병렬 스케줄러로 실행하고 Mono.zip() 메서드를 이용해서 Mono를 묶으면 각 Mono를 병렬로 처리할 수 있다. 다음은 예제 코드이다.


Mono m1 = Mono.just(1).map(x -> {

    logger.info("1 sleep");

    sleep(1500);

    return x;

}).subscribeOn(Schedulers.parallel());


Mono m2 = Mono.just(2).map(x -> {

    logger.info("2 sleep");

    sleep(3000);

    return x;

}).subscribeOn(Schedulers.parallel());


Mono m3 = Mono.just(3).map(x -> {

    logger.info("3 sleep");

    sleep(2000);

    return x;

}).subscribeOn(Schedulers.parallel());


logger.info("Mono.zip(m1, m2, m3)");


Mono.zip(m1, m2, m3)

        .subscribe(tup -> logger.info("next: {}", tup);


위 코드에서 m1, m2, m3는 각각 1.5초, 3초, 2초간 슬립한다. 각각은 subscribeOn()을 이용해서 Parallel 스케줄러를 이용해서 구독 요청을 처리하도록 했다. 그리고 Mono.zip()으로 m1, m2, m3를 묶었다.


실제 실행 결과를 보면 m1, m2, m3가 슬립을 동시에 시작하고 약 3초 뒤에 세 Mono의 값을 묶은 Tuple3의 값을 출력하는 것을 알 수 있다. 이를 통해 m1, m2, m3를 동시에 실행했음을 확인할 수 있다.


16:12:34.424 [main] INFO parallel.ParallelTest - Mono.zip(m1, m2, m3)

16:12:34.447 [parallel-1] INFO parallel.ParallelTest - 1 sleep

16:12:34.447 [parallel-3] INFO parallel.ParallelTest - 3 sleep

16:12:34.447 [parallel-2] INFO parallel.ParallelTest - 2 sleep

16:12:37.469 [parallel-2] INFO parallel.ParallelTest - next: [1,2,3]



관련 글


리액터 쓰레드 스케줄링

리액터는 비동기 실행을 강제하지 않는다. 예를 들어 아래 코드를 보자.


Flux.range(1, 3)

        .map(i -> {

            logger.info("map {} to {}", i, i + 2);

            return i + 2;

        })

        .flatMap(i -> {

            logger.info("flatMap {} to Flux.range({}, {})", i, 1, i);

            return Flux.range(1, i);

        })

        .subscribe(i -> logger.info("next " + i));


위 코드에서 logger는 쓰레드 이름을 남기도록 설정한 로거라고 하자. 위 코드를 main 메서드에서 실행하면 다음과 같은 결과를 출력한다.


17:44:57.180 [main] INFO schedule.ScheduleTest - map 1 to 3

17:44:57.183 [main] INFO schedule.ScheduleTest - flatMap 3 to Flux.range(1, 3)

17:44:57.202 [main] INFO schedule.ScheduleTest - next 1

17:44:57.202 [main] INFO schedule.ScheduleTest - next 2

17:44:57.202 [main] INFO schedule.ScheduleTest - next 3

17:44:57.202 [main] INFO schedule.ScheduleTest - map 2 to 4

17:44:57.202 [main] INFO schedule.ScheduleTest - flatMap 4 to Flux.range(1, 4)

17:44:57.202 [main] INFO schedule.ScheduleTest - next 1

17:44:57.202 [main] INFO schedule.ScheduleTest - next 2

17:44:57.202 [main] INFO schedule.ScheduleTest - next 3

17:44:57.202 [main] INFO schedule.ScheduleTest - next 4

17:44:57.202 [main] INFO schedule.ScheduleTest - map 3 to 5

17:44:57.202 [main] INFO schedule.ScheduleTest - flatMap 5 to Flux.range(1, 5)

17:44:57.203 [main] INFO schedule.ScheduleTest - next 1

17:44:57.203 [main] INFO schedule.ScheduleTest - next 2

17:44:57.203 [main] INFO schedule.ScheduleTest - next 3

17:44:57.203 [main] INFO schedule.ScheduleTest - next 4

17:44:57.203 [main] INFO schedule.ScheduleTest - next 5


실행 결과를 보면 map(), flatMap(), subscribe()에 전달한 코드가 모두 main 쓰레드에서 실행된 것을 알 수 있다. 즉 map 연산, flatMap 연산뿐만 아니라 subscribe를 이용한 구독까지 모두 main 쓰레드가 실행한다.


스케줄러를 사용하면 구독이나 신호 처리를 별도 쓰레드로 실행할 수 있다.


publishOn을 이용한 신호 처리 쓰레드 스케줄링

publishOn() 메서드를 이용하면 next, complete, error신호를 별도 쓰레드로 처리할 수 있다. map(), flatMap() 등의 변환도 publishOn()이 지정한 쓰레드를 이용해서 처리한다. 다음 코드를 보자.


CountDownLatch latch = new CountDownLatch(1);

Flux.range(1, 6)

        .map(i -> {

            logger.info("map 1: {} + 10", i);

            return i + 10;

        })

        .publishOn(Schedulers.newElastic("PUB"), 2)

        .map(i -> { // publishOn에서 지정한 PUB 스케줄러가 실행

            logger.info("map 2: {} + 10", i);

            return i + 10;

        })

        .subscribe(new BaseSubscriber<Integer>() {

            @Override

            protected void hookOnSubscribe(Subscription subscription) {

                logger.info("hookOnSubscribe");

                requestUnbounded();

            }


            @Override

            protected void hookOnNext(Integer value) {

                logger.info("hookOnNext: " + value); // publishOn에서 지정한 스케줄러가 실행

            }


            @Override

            protected void hookOnComplete() {

                logger.info("hookOnComplete"); // publishOn에서 지정한 스케줄러가 실행

                latch.countDown();

            }

        });

latch.await();


publishOn()은 두 개의 인자를 받는다. 이 코드에서 첫 번째 인자인 Schedulers.newElastic("PUB")은 비동기로 신호를 처리할 스케줄러이다. 다양한 스케줄러가 존재하는데 이에 대해서는 뒤에서 다시 살펴본다. 일단 지금은 스케줄러가 별도 쓰레드를 이용해서 신호를 처리한다고 생각하면 된다.


두 번째 인자인 2는 스케줄러가 신호를 처리하기 전에 미리 가져올 (prefetch) 데이터 개수이다. 이는 스케줄러가 생성하는 비동기 경계 시점에 보관할 수 있는 데이터의 개수로 일종의 버퍼 크기가 된다.


위 코드를 실제로 실행하면 어떤 일이 벌어지는지 보자. 다음은 결과이다.


13:01:03.026 [main] INFO schedule.ScheduleTest - hookOnSubscribe

13:01:03.029 [main] INFO schedule.ScheduleTest - map 1: 1 + 10

13:01:03.030 [main] INFO schedule.ScheduleTest - map 1: 2 + 10


13:01:03.031 [PUB-2] INFO schedule.ScheduleTest - map 2: 11 + 10

13:01:03.037 [PUB-2] INFO schedule.ScheduleTest - hookOnNext: 21

13:01:03.037 [PUB-2] INFO schedule.ScheduleTest - map 2: 12 + 10

13:01:03.037 [PUB-2] INFO schedule.ScheduleTest - hookOnNext: 22

13:01:03.037 [PUB-2] INFO schedule.ScheduleTest - map 1: 3 + 10

13:01:03.038 [PUB-2] INFO schedule.ScheduleTest - map 1: 4 + 10

13:01:03.038 [PUB-2] INFO schedule.ScheduleTest - map 2: 13 + 10

13:01:03.038 [PUB-2] INFO schedule.ScheduleTest - hookOnNext: 23

13:01:03.038 [PUB-2] INFO schedule.ScheduleTest - map 2: 14 + 10

13:01:03.038 [PUB-2] INFO schedule.ScheduleTest - hookOnNext: 24

13:01:03.038 [PUB-2] INFO schedule.ScheduleTest - map 1: 5 + 10

13:01:03.038 [PUB-2] INFO schedule.ScheduleTest - map 1: 6 + 10

13:01:03.038 [PUB-2] INFO schedule.ScheduleTest - map 2: 15 + 10

13:01:03.038 [PUB-2] INFO schedule.ScheduleTest - hookOnNext: 25

13:01:03.038 [PUB-2] INFO schedule.ScheduleTest - map 2: 16 + 10

13:01:03.038 [PUB-2] INFO schedule.ScheduleTest - hookOnNext: 26

13:01:03.038 [PUB-2] INFO schedule.ScheduleTest - hookOnComplete


최초에 2개를 미리 가져올 때를 제외하면 나머지는 모두 publishOn()으로 전달한 스케줄러의 쓰레드(쓰레드 이름이 "PUB"로 시작)가 처리하는 것을 알 수 있다.


publishOn()에 지정한 스케줄러는 다음 publishOn()을 설정할 때까지 적용된다. 예를 들어 다음과 같이 이름이 PUB1과 PUB2인 두 개의 스케줄러를 설정했다고 하자.


Flux.range(1, 6)

        .publishOn(Schedulers.newElastic("PUB1"), 2)

        .map(i -> {

            logger.info("map 1: {} + 10", i);

            return i + 10;

        })

        .publishOn(Schedulers.newElastic("PUB2"))

        .map(i -> {

            logger.info("map 2: {} + 10", i);

            return i + 10;

        })

        .subscribe(new BaseSubscriber<Integer>() {

            @Override

            protected void hookOnSubscribe(Subscription subscription) {

                logger.info("hookOnSubscribe");

                requestUnbounded();

            }


            @Override

            protected void hookOnNext(Integer value) {

                logger.info("hookOnNext: " + value);

            }


            @Override

            protected void hookOnComplete() {

                logger.info("hookOnComplete");

                latch.countDown();

            }

        });


이 코드를 실행한 결과는 다음과 같다.


13:38:14.957 [main] INFO schedule.ScheduleTest - hookOnSubscribe

13:38:14.960 [PUB1-4] INFO schedule.ScheduleTest - map 1: 1 + 10

13:38:14.963 [PUB1-4] INFO schedule.ScheduleTest - map 1: 2 + 10

13:38:14.963 [PUB2-3] INFO schedule.ScheduleTest - map 2: 11 + 10

13:38:14.963 [PUB1-4] INFO schedule.ScheduleTest - map 1: 3 + 10

13:38:14.964 [PUB1-4] INFO schedule.ScheduleTest - map 1: 4 + 10

13:38:14.964 [PUB1-4] INFO schedule.ScheduleTest - map 1: 5 + 10

13:38:14.964 [PUB1-4] INFO schedule.ScheduleTest - map 1: 6 + 10

13:38:14.969 [PUB2-3] INFO schedule.ScheduleTest - hookOnNext: 21

13:38:14.979 [PUB2-3] INFO schedule.ScheduleTest - map 2: 12 + 10

13:38:14.979 [PUB2-3] INFO schedule.ScheduleTest - hookOnNext: 22

...생략

13:38:15.021 [PUB2-3] INFO schedule.ScheduleTest - map 2: 16 + 10

13:38:15.021 [PUB2-3] INFO schedule.ScheduleTest - hookOnNext: 26

13:38:15.031 [PUB2-3] INFO schedule.ScheduleTest - hookOnComplete


결과를 보면 첫 번째 publishOn()과 두 번째 publishOn() 사이의 map() 처리는 PUB1 스케줄러가 실행하고 두 번째 publishOn() 이후의 map(), 신호 처리는 PUB2 스케줄러가 실행한 것을 알 수 있다.


subscribeOn을 이용한 구독 처리 쓰레드 스케줄링

subscribeOn()을 사용하면 Subscriber가 시퀀스에 대한 request 신호를 별도 스케줄러로 처리한다. 즉 시퀀스(Flux나 Mono)를 실행할 스케줄러를 지정한다. 다음은 subscribeOn()의 사용예이다.


CountDownLatch latch = new CountDownLatch(1);

Flux.range(1, 6)

        .log() // 보다 상세한 로그 출력 위함

        .subscribeOn(Schedulers.newElastic("SUB"))

        .map(i -> {

            logger.info("map: {} + 10", i);

            return i + 10;

        })

        .subscribe(new BaseSubscriber<Integer>() {

            @Override

            protected void hookOnSubscribe(Subscription subscription) {

                logger.info("hookOnSubscribe"); // main thread

                request(1);

            }


            @Override

            protected void hookOnNext(Integer value) {

                logger.info("hookOnNext: " + value); // SUB 쓰레드

                request(1);

            }


            @Override

            protected void hookOnComplete() {

                logger.info("hookOnComplete"); // SUB 쓰레드

                latch.countDown();

            }

        });


latch.await();


subscribeOn()으로 지정한 스케줄러는 시퀀스의 request 요청 처리뿐만 아니라 첫 번째 publishOn() 이전까지의 신호 처리를 실행한다. 따라서 위 코드를 실행하면 Flux.range()가 생성한 시퀀스의 신호 발생뿐만 아니라 map() 실행, Subscriber의 next, complete 신호 처리를 "SUB" 스케줄러가 실행한다. 참고로 시퀀스의 request 요청과 관련된 로그를 보기 위해 log() 메서드를 사용했다.


다음은 실행 결과이다.


14:56:24.996 [main] INFO schedule.ScheduleTest - hookOnSubscribe

14:56:25.005 [SUB-2] INFO reactor.Flux.Range.1 - | onSubscribe([Synchronous Fuseable] FluxRange.RangeSubscription)

14:56:25.010 [SUB-2] INFO reactor.Flux.Range.1 - | request(1)

14:56:25.010 [SUB-2] INFO reactor.Flux.Range.1 - | onNext(1)

14:56:25.011 [SUB-2] INFO schedule.ScheduleTest - map: 1 + 10

14:56:25.016 [SUB-2] INFO schedule.ScheduleTest - hookOnNext: 11

14:56:25.016 [SUB-2] INFO reactor.Flux.Range.1 - | request(1)

14:56:25.016 [SUB-2] INFO reactor.Flux.Range.1 - | onNext(2)

14:56:25.016 [SUB-2] INFO schedule.ScheduleTest - map: 2 + 10

14:56:25.017 [SUB-2] INFO schedule.ScheduleTest - hookOnNext: 12

...(생략)

14:56:25.017 [SUB-2] INFO reactor.Flux.Range.1 - | request(1)

14:56:25.017 [SUB-2] INFO reactor.Flux.Range.1 - | onNext(6)

14:56:25.017 [SUB-2] INFO schedule.ScheduleTest - map: 6 + 10

14:56:25.017 [SUB-2] INFO schedule.ScheduleTest - hookOnNext: 16

14:56:25.017 [SUB-2] INFO reactor.Flux.Range.1 - | request(1)

14:56:25.018 [SUB-2] INFO reactor.Flux.Range.1 - | onComplete()

14:56:25.018 [SUB-2] INFO schedule.ScheduleTest - hookOnComplete


실행 결과에서 Flux.Range 타입은 Flux.range() 메서드가 생성한 시퀀스 객체의 타입이다. 위 결과에서 Flux.Range.1의 reques(1), onNext(), onComplete() 로그는 Subscriber의 request 신호를 처리하는 로그이다. 이 로그를 보면 SUB 스케줄러가 해당 기능을 실행하고 있음을 알 수 있다. 또한 map()과 Subscriber의 신호 처리 메서드(hookOnNext, hookOnComplete)도 SUB 스케줄러가 실행하고 있다.


subscribeOn() + publishOn() 조합

앞서 말했듯이 subscribeOn으로 지정한 스케줄러는 첫 번째 publishOn이 올때까지 적용된다. 다음 코드를 통해 이를 확인할 수 있다.


CountDownLatch latch = new CountDownLatch(1);

Flux.range(1, 6)

        .log()

        .subscribeOn(Schedulers.newElastic("SUB"))

        .map(i -> {

            logger.info("map1: " + i + " --> " + (i + 20));

            return i + 20;

        })

        .map(i -> {

            logger.info("mapBySub: " + i + " --> " + (i + 100));

            return i + 100;

        })

        .publishOn(Schedulers.newElastic("PUB1"), 2)

        .map(i -> {

            logger.info("mapByPub1: " + i + " --> " + (i + 1000));

            return i + 1000;

        })

        .publishOn(Schedulers.newElastic("PUB2"), 2)

        .subscribe(new BaseSubscriber<Integer>() {

            @Override

            protected void hookOnSubscribe(Subscription subscription) {

                logger.info("hookOnSubscribe");

                request(1);

            }


            @Override

            protected void hookOnNext(Integer value) {

                logger.info("hookOnNext: " + value);

                request(1);

            }


            @Override

            protected void hookOnComplete() {

                logger.info("hookOnComplete");

                latch.countDown();

            }

        });


latch.await();


이 코드는 구독을 위한 "SUB" 스케줄러와 신호 처리를 위한 "PUB1", "PUB2" 스케줄러를 설정하고 있다. 


다음은 실행 결과이다.


15:10:05.660 [main] INFO schedule.ScheduleTest - hookOnSubscribe

15:10:05.681 [SUB-6] INFO reactor.Flux.Range.1 - | onSubscribe([Synchronous Fuseable] FluxRange.RangeSubscription)

15:10:05.687 [SUB-6] INFO reactor.Flux.Range.1 - | request(2)

15:10:05.688 [SUB-6] INFO reactor.Flux.Range.1 - | onNext(1)

15:10:05.718 [SUB-6] INFO schedule.ScheduleTest - map1: 1 --> 21

15:10:05.719 [SUB-6] INFO schedule.ScheduleTest - mapBySub: 21 --> 121

15:10:05.720 [SUB-6] INFO reactor.Flux.Range.1 - | onNext(2)

15:10:05.720 [SUB-6] INFO schedule.ScheduleTest - map1: 2 --> 22

15:10:05.720 [SUB-6] INFO schedule.ScheduleTest - mapBySub: 22 --> 122

15:10:05.721 [PUB1-5] INFO schedule.ScheduleTest - mapByPub1: 121 --> 1121

15:10:05.722 [PUB1-5] INFO schedule.ScheduleTest - mapByPub1: 122 --> 1122

15:10:05.734 [SUB-6] INFO reactor.Flux.Range.1 - | request(2)

15:10:05.735 [SUB-6] INFO reactor.Flux.Range.1 - | onNext(3)

15:10:05.735 [SUB-6] INFO schedule.ScheduleTest - map1: 3 --> 23

15:10:05.735 [SUB-6] INFO schedule.ScheduleTest - mapBySub: 23 --> 123

15:10:05.735 [SUB-6] INFO reactor.Flux.Range.1 - | onNext(4)

15:10:05.735 [SUB-6] INFO schedule.ScheduleTest - map1: 4 --> 24

15:10:05.735 [SUB-6] INFO schedule.ScheduleTest - mapBySub: 24 --> 124

15:10:05.736 [PUB2-4] INFO schedule.ScheduleTest - hookOnNext: 1121

15:10:05.736 [PUB2-4] INFO schedule.ScheduleTest - hookOnNext: 1122

15:10:05.736 [PUB1-5] INFO schedule.ScheduleTest - mapByPub1: 123 --> 1123

15:10:05.736 [PUB1-5] INFO schedule.ScheduleTest - mapByPub1: 124 --> 1124

15:10:05.736 [PUB2-4] INFO schedule.ScheduleTest - hookOnNext: 1123

15:10:05.736 [PUB2-4] INFO schedule.ScheduleTest - hookOnNext: 1124

15:10:05.736 [SUB-6] INFO reactor.Flux.Range.1 - | request(2)

15:10:05.736 [SUB-6] INFO reactor.Flux.Range.1 - | onNext(5)

15:10:05.736 [SUB-6] INFO schedule.ScheduleTest - map1: 5 --> 25

15:10:05.736 [SUB-6] INFO schedule.ScheduleTest - mapBySub: 25 --> 125

15:10:05.737 [SUB-6] INFO reactor.Flux.Range.1 - | onNext(6)

15:10:05.737 [PUB1-5] INFO schedule.ScheduleTest - mapByPub1: 125 --> 1125

15:10:05.737 [SUB-6] INFO schedule.ScheduleTest - map1: 6 --> 26

15:10:05.737 [SUB-6] INFO schedule.ScheduleTest - mapBySub: 26 --> 126

15:10:05.737 [PUB2-4] INFO schedule.ScheduleTest - hookOnNext: 1125

15:10:05.737 [PUB1-5] INFO schedule.ScheduleTest - mapByPub1: 126 --> 1126

15:10:05.737 [PUB2-4] INFO schedule.ScheduleTest - hookOnNext: 1126

15:10:05.737 [SUB-6] INFO reactor.Flux.Range.1 - | onComplete()

15:10:05.738 [PUB2-4] INFO schedule.ScheduleTest - hookOnComplete


실행 결과를 보면 첫 번째 publishOn()으로 PUB1 스케줄러를 지정하기 전까지는 SUB 스케줄러가 request 요청과 map1, mapBySub 변환을 처리하는 것을 확인할 수 있다.


[노트]

subscribeOn()이 publishOn() 뒤에 위치하면 실질적으로 prefetch할 때를 제외하면 적용되지 않는다. subscribeOn()은 원본 시퀀스의 신호 발생을 처리할 스케줄러를 지정하므로 시퀀스 생성 바로 뒤에 subscribeOn()을 지정하도록 하자. 또한 두 개 이상 subscribeOn()을 지정해도 첫 번째 subscribeOn()만 적용된다.


스케줄러 종류

스프링 리액터는 다음 스케줄러를 기본 제공한다.


  • Schedulers.immediate() : 현재 쓰레드에서 실행한다.
  • Schedulers.single() : 쓰레드가 한 개인 쓰레드 풀을 이용해서 실행한다. 즉 한 쓰레드를 공유한다.
  • Schedulers.elastic() : 쓰레드 풀을 이용해서 실행한다. 블로킹 IO를 리액터로 처리할 때 적합하다. 쓰레드가 필요하면 새로 생성하고 일정 시간(기본 60초) 이상 유휴 상태인 쓰레드는 제거한다. 데몬 쓰레드를 생성한다.
  • Schedulers.parallel() : 고정 크기 쓰레드 풀을 이용해서 실행한다. 병렬 작업에 적합하다.

single(), elastic(), parallel()은 매번 새로운 쓰레드 풀을 만들지 않고 동일한 쓰레드 풀을 리턴한다. 예를 들어 아래 코드에서 두 publishOn()은 같은 쓰레드 풀을 공유한다.


someFlux.publishOn(Schedulers.parallel())

            .map(...)

            .publishOn(Schedulers.parallel())

            .subscribe(...);


single(), elastic(), parallel()이 생성하는 쓰레드는 데몬 쓰레드로서 main 쓰레드가 종료되면 함께 종료된다.


같은 종류의 쓰레드 풀인데 새로 생성하고 싶다면 다음 메서드를 사용하면 된다.

  • newSingle(String name)
  • newSingle(String name, boolean daemon)
  • newElastic(String name)
  • newElastic(String name, int ttlSeconds)
  • newElastic(String name, int ttlSeconds, boolean daemon)
  • newParallel(String name)
  • newParallel(String name, int parallelism)
  • newParallel(String name, int parallelism, boolean daemon)

각 파라미터는 다음과 같다.

  • name : 쓰레드 이름으로 사용할 접두사이다.
  • daemon : 데몬 쓰레드 여부를 지정한다. 지정하지 않으면 false이다. 데몬 쓰레드가 아닌 경우 JVM 종료시에 생성한 스케줄러의 dispose()를 호출해서 풀에 있는 쓰레드를 종료해야 한다.
  • ttlSeconds : elastic 쓰레드 풀의 쓰레드 유휴 시간을 지정한다. 지정하지 않으면 60(초)이다.
  • parallelism : 작업 쓰레드 개수를 지정한다. 지정하지 않으면 Runtime.getRuntime().availableProcessors()이 리턴한 값을 사용한다.

newXXX() 로 생성하는 쓰레드 풀은 기본으로 데몬 쓰레드가 아니기 때문에 어플리케이션 종료시에는 다음과 같이 dispose() 메서드를 호출해서 쓰레드를 종료시켜 주어야 한다. 그렇지 않으면 어플리케이션이 종료되지 않는 문제가 발생할 수 있다.


// 비데몬 스케줄러 초기화

Scheduler scheduler = Schedulers.newElastic("SUB", 60, false);


// 비데몬 스케줄러 사용

someFlux.publishOn(scheduler)

            .map(...)

            .subscribe(...)


// 어플리케이션 종료시에 스케줄러 종료 처리

scheduler.dispose();


병렬 처리와 관련된 내용은 다음에 더 자세히 살펴본다.


일정 주기로 tick 발생: Flux.interval

Flux.interval()을 사용하면 일정 주기로 신호를 발생할 수 있다. 발생 순서에 따라 발생한 정수 값을 1씩 증가시킨다. 다음은 간단한 사용 예이다.


Flux.interval(Duration.ofSeconds(1)) // Flux<Long>

        .subscribe(tick -> System.out.println("Tick " + tick));


Thread.sleep(5000);


위 코드를 실행한 결과는 다음과 같다.

Tick 0
Tick 1
Tick 2
Tick 3
Tick 4


1초 간격으로 신호가 발생하는 것을 알 수 있다.


interval()은 Schedulers.parallel()를 사용해서 신호를 주기적으로 발생한다. 다른 스케줄러를 사용하고 싶다면 internval(Duration, Scheduler) 메서드를 사용하면 된다.


관련글

에러 처리

시퀀스는 데이터를 발생하는 과정에서 에러를 발생할 수 있다. 리액터는 에러를 처리하는 여러 방법을 제공하는데 이 글에서는 레퍼런스 문서에서 언급하는 에러 처리 방법을 차례대로 살펴볼 것이다.


참고로 에러 신호는 종료 신호이다. 따라서 에러 신호가 발생하면 시퀀스는 종료되고 더 이상 데이터를 발생하지 않는다.


에러 신호 처리

에러 신호가 발생하면 Subscriber의 onError 메서드가 호출된다. 이 메서드를 구현한 Subscriber를 이용해서 구독을 하면 에러 신호를 알맞게 처리할 수 있다. 또한 에러 처리를 위한 Consumer를 파라미터로 갖는 subscribe() 메서드를 사용해서 익셉션을 처리할 수 있다. 다음 코드는 에러 처리를 위한 Consumer를 파라미터로 갖는 subscribe() 메서드의 사용 예를 보여준다.


Flux.range(1, 10)

        .map(x -> {

            if (x == 5) throw new RuntimeException("exception"); // 에러 발생

            else return x;

        })

        .subscribe(

                i -> System.out.println(i), // next 신호 처리

                ex -> System.err.println(ex.getMessage()), // error 신호 처리

                () -> System.out.println("complete") // complete 신호 처리

        );


위 코드는 1부터 10개의 값을 발생하는데 값이 5이면 익셉션을 발생하는 시퀀스를 생성한다. subscribe() 메서드는 3개의 인자를 갖는데 차례대로 next, error, complete 신호를 처리한다. 실행 결과는 다음과 같다.


1

2

3

4

exception


에러 신호는 종료 신호이므로 익셉션 발생 이후에 더 이상 next 신호가 발생하지 않는 것을 확인할 수 있다.


에러 신호를 처리하기 위한 subscribe() 메서드는 다음과 같다.

  • subscribe(Consumer<? super T> consumer,
                    Consumer<? super Throwable> errorConsumer)
  • subscribe(Consumer<? super T> consumer,
                    Consumer<? super Throwable> errorConsumer,
                    Runnable completeConsumer)
  • subscribe(Consumer<? super T> consumer,
                    Consumer<? super Throwable> errorConsumer,
                    Runnable completeConsumer,
                    Consumer<? super Subscription> subscriptionConsumer)
  • subscribe(Subscriber<? super T> subscriber)


에러 발생하면 기본 값 사용하기: onErrorReturn

에러가 발생할 때 에러 대신에 특정 값을 발생하고 싶다면 onErrorReturn() 메서드를 사용한다. 이 메서드의 사용 예는 다음과 같다.


Flux<Integer> seq = Flux.range(1, 10)

        .map(x -> {

            if (x == 5) throw new RuntimeException("exception");

            else return x;

        })

        .onErrorReturn(-1);


seq.subscribe(System.out::println);


위 코드를 실행한 결과는 다음과 같다.


1

2

3

4

-1


실행 결과에서 알 수 있듯이 에러 대신에 값을 발생한 뒤에 시퀀스는 종료된다.


발생한 익셉션이 특정 조건을 충족하는 경우에만 에러 대신에 특정 값을 발생하고 싶다면 Predicate을 인자로 받는 onErrorReturn() 메서드를 사용한다. 익셉션이 특정 타입인 경우에만 에러 대신 특정 값을 발생하고 싶다면 Class 타입을 인자로 받는 onErrorReturn() 메서드를 사용한다.

  • Flux<T> onErrorReturn(Predicate<? super Throwable> predicate, T fallbackValue)
  • <E extends Throwable> Flux<T> onErrorReturn(Class<E> type, T fallbackValue)


에러 발생하면 다른 신호(시퀀스)나 다른 에러로 대체하기: onErrorResume

onErrorResume 메서드를 사용하면 에러가 발생하면 다른 시퀀스나 에러로 대체할 수 있다. onErrorResume 메서드는 다음 타입의 함수를 파라미터로 갖는다.

  • Function<? super Throwable, ? extends Publisher<? extends T>> :
    Throwable을 입력으로 받고 Publisher를 리턴하는 함수
이 함수는 시퀀스에서 발생한 에러를 입력으로 받아 결과로 Publisher를 리턴한다. 즉 에러가 발생하면 이를 다른 데이터 신호로 대체한다. 다음 코드를 보자.

Random random = new Random();
Flux<Integer> seq = Flux.range(1, 10)
        .map(x -> {
            int rand = random.nextInt(8);
            if (rand == 0) throw new IllegalArgumentException("illarg");
            if (rand == 1) throw new IllegalStateException("illstate");
            if (rand == 2) throw new RuntimeException("exception");
            return x;
        })
        .onErrorResume(error -> {
            if (error instanceof IllegalArgumentException) {
                return Flux.just(21, 22);
            }
            if (error instanceof IllegalStateException) {
                return Flux.just(31, 32);
            }
            return Flux.error(error);
        });

seq.subscribe(System.out::println);


이 코드에서 onErrorResume() 메서드에 전달한 함수를 보자. 이 함수는 발생한 에러가 IllegalArgumentException이면 Flux.just(21, 22)로 21, 22 값을 생성하는 시퀀스를 리턴하고, 발생한 에러가 IllegalStateException이면 31, 32 값을 생성하는 시퀀스를 리턴한다. 두 조건에 해당하지 않는 경우 Flux.error() 메서드를 이용해서 익셉션을 다시 재발생시킨다.


map() 함수에서 임의로 익셉션을 발생시키도록 했으므로 실행할 때마다 결과가 달라진다. 다음은 여러 번 실행한 결과 중 하나이다. 이 실행 결과는 3번째 데이터를 map에서 처리하는 과정에서 IllegalArgumentException이 발생했고 onErrorResume()을 통해 에러 대신에 21, 22를 생성하는 Flux로 대체된 것을 보여준다.


1

2

21

22


onErrorReturn과 마찬가지로 onErrorResume도 Predicate이나 Class 타입을 파라미터로 갖는 메서드를 이용해서 처리할 익셉션을 제한할 수 있다.


에러를 다른 에러로 변환하기: onErrorMap

에러를 다른 에러로 변환할 때에는 onErrorMap을 사용한다. 다음은 간단한 사용 예이다.


Flux<Integer> seq = intSeq.onErrorMap(error -> new MyException(...));


onErrorMap도 Predicate이나 Class 타입을 파라미터로 갖는 메서드를 이용해서 변환할 익셉션을 제한할 수 있다.


재시도하기: retry

retry()를 사용하면 에러가 발생했을 구독을 재시도할 수 있다. 다음은 예제 코드이다.


Flux.range(1, 5)

        .map(input -> {

            if (input < 4) return "num " + input;

            throw new RuntimeException("boom");

        })

        .retry(1) // 에러 신호 발생시 1회 재시도

        .subscribe(System.out::println, System.err::println);


위 코드를 실행한 결과는 다음과 같다.


num 1

num 2

num 3

num 1

num 2

num 3

java.lang.RuntimeException: boom


재시도를 하면 원본 시퀀스를 다시 구독한다. 이런 이유로 위 결과는 에러가 처음 발생했을 때 다시 1부터 신호가 발생하고 있다. 두 번째 에러가 발생했을 때에는 재시도를 하지 않으므로 에러 메시지가 출력되는 것을 알 수 있다.


재시도하기: retryWhen

단순 재시도가 아닌 조금 더 복잡한 상황에 따라 재시도를 하고 싶다면 retryWhen을 사용한다. retryWhen 메서드는 사용법이 다소 복잡하다. 먼저 retryWhen 메서드의 파라미터를 보자. 파라미터 타입은 다음과 같다.

  • retryWhen(Function< Flux<Throwable>,  ? extends Publisher<?> > whenFactory)
whenFactory 파라미터는 Function 타입의 함수이다. 이 함수는 입력으로 Flux<Throwable>를 받고 결과로 Publisher를 리턴한다. 여기서 whenFactory의 함수의 입력인 Flux<Throwable>는 시퀀스가 발생하는 익셉션 신호에 해당한다. 재시도 횟수에 따라 익셉션이 여러 번 발생할 수 있는데 Flux<Throwable>이 발생하는 데이터는 바로 여러 번 발생할 수 있는 익셉션에 해당한다.

whenFactory 함수에 전달되는 Flux<Throwable>은 원본 시퀀스의 익셉션과 연관되어 있으므로 이를 컴페니언(companion) Flux라고 부른다.

whenFactory 함수는 재시도 조건에 맞게 변환한 컴페니언 Flux를 리턴한다. 이 변환한 컴페니언 Flux가 재시도 여부를 결정하는데 그 과정은 다음과 같다.
  1. 에러가 발생할 때마다 에러가 컴페니언 Flux로 전달된다.
  2. 컴페니언 Flux가 뭐든 발생하면 재시도가 일어난다.
  3. 컴페니언 Flux가 종료되면 재시도를 하지 않고 원본 시퀀스 역시 종료된다.
  4. 컴페니언 Flux가 에러를 발생하면 재시도를 하지 않고 컴페니언 Flux가 발생한 에러를 전파한다.
위 설명만으로는 감이 잘 안 올 테니 간단한 예를 살펴보자. 먼저 다음은 2번 재시도하는 예제 코드이다.


Flux<Integer> seq = Flux.just(1, 2, 3)

        .map(i -> {

            if (i < 3) return i;

            else throw new IllegalStateException("force");

        })

        .retryWhen(errorsFlux -> errorsFlux.take(2)); // 2개의 데이터 발생


seq.subscribe(

        System.out::println,

        err -> System.err.println("에러 발생: " + err),

        () -> System.out.println("compelte")

);


위 코드에서 retryWhen은 take(2)를 사용해서 2개의 데이터를 발생하는 컴페니언 Flux를 리턴한다. 이 컴페니언 Flux는 2개의 데이터를 발생하고 종료된다. 즉 2-3번 과정에 의해 2번 재시도를 하고 원본 시퀀스를 종료시킨다. 실제 실행 결과는 다음과 같다. 괄호 안의 파란 글씨는 재시도 여부를 표시한 것으로 실제 출력에 포함된 내용은 아니다.


1

2

1    (1번 재시도)

2

1    (2번 재시도)

2

complete


출력 결과에서 눈여겨 볼 점은 에러가 출력되지 않았다는 점이다. 즉 컴페니언 Flux가 complete 신호를 보내면 Subscriber에도 complete 신호가 전달되고 있다. 이 점은 retry()와 다르다. retry()는 최대 재시도 이후에도 에러가 발생하면 해당 에러를 Subscriber에 전달하는데 retryWhen은 컴페니언 Flux가 어떤 데이터를 발생하느냐에 따라 에러를 Subscriber에 전달할지 여부가 달라진다.


다음 코드는 컴페니언 Flux가 에러를 발생하는 예이다.


Flux<Integer> seq = Flux.just(1, 2, 3)

        .map(i -> {

            if (i < 3) return i;

            else throw new IllegalStateException("force");

        })

        .retryWhen(errorsFlux -> errorsFlux.zipWith(Flux.range(1, 3),

                (error, index) -> {

                    if (index < 3) return index;

                    else throw new RuntimeException("companion error"); // 

                })

        );


seq.subscribe(

        System.out::println,

        err -> System.err.println("에러 발생: " + err),

        () -> System.out.println("compelte")

);


위 코드에서 retryWhen이 생성하는 컴페니언 Flux는 에러가 세 번째 발생하면 RuntimeException을 발생한다. 즉 두 번째 에러까지는 데이터 신호를 발생하고 세 번째 에러에 에러 신호를 발생한다. 따라서 두 번 재시도를 한다. 실행 결과는 다음과 같다.


1

2

1

2

1

2

에러 발생: java.lang.RuntimeException: companion error


실행 결과를 보면 두 번 재시도 후에 에러 신호를 받은 것을 알 수 있다.


관련 글


  1. sangpire 2020.02.02 22:16 신고

    글 잘 읽었습니다.

    아주 사소한 오탈자가 보여 제보 드려요.
    바로 '에러 신호 처리' 절 두번째 줄 'Consumber' 가 혹시 'Consumer' 가 아닐까 싶은데 맞았으면 좋겠네요.

    다시한번 좋은 글 감사합니다.

시퀀스 변환

이 글에서는 시퀀스가 발생하는 데이터를 변환하는 몇 가지 방법을 살펴본다.


1-1 변환: map

첫 번째는 map()이다. map() 한 개의 데이터를 1-1 방식으로 변환해준다. 자바 스트림의 map()과 유사하다. 다음 코드는 map()의 예를 보여준다.


Flux.just("a", "bc", "def", "wxyz")

        .map(str -> str.length()) // 문자열을 Integer 값으로 1-1 변환

        .subscribe(len -> System.out.println(len));


1-n 변환: flatMap

flatMap은 1개의 데이터로부터 시퀀스를 생성할 때 사용한다. 즉 1-n 방식의 변환을 처리한다. 다음은 간단한 flatMap() 사용 예이다.


Flux<Integer> seq = Flux.just(1, 2, 3)

             .flatMap(i -> Flux.range(1, i)) // Integer를 Flux<Integer>로 1-N 변환


seq.subscribe(System.out::println);


flatMap()에 전달한 함수를 보면 Integer 값을 받아서 다시 1부터 해당 값 개수만큼의 숫자를 생성하는 Flux를 생성한다. 위 코드를 보면 다음과 같은 변환이 발생한다.

  • 1 -> Flux.range(1, 1) : [1] 생성
  • 2 -> Flux.range(1, 2) : [1, 2] 생성
  • 3 -> Flux.range(1, 3) : [1, 2, 3] 생성
flatMap에 전달한 함수가 생성하는 각 Flux는 하나의 시퀀스처럼 연결된다. 그래서 flatMap()의 결과로 생성되는 Flux의 타입이 Flux<Flux<Integer>>가 아니라 Flux<Integer>이다.

실제 실행 결과는 다음과 같다.

1
1
2
1
2
3


걸러내기: filter

filter()를 이용해서 시퀀스가 생성한 데이터를 걸러낼 수 있다. filter()에 전달한 함수의 결과가 true인 데이터만 전달하고 false인 데이터는 발생하지 않는다. 다음은 1부터 10 사이의 숫자 중에서 2로 나눠 나머지가 0인 (즉 짝수인) 숫자만 걸러내는 예를 보여준다.


Flux.range(1, 10)

        .filter(num -> num % 2 == 0)

        .subscribe(x -> System.out.print(x + " -> "));


실행 결과는 다음과 같다.


2 -> 4 -> 6 -> 8 -> 10 -> 


빈 시퀀스인 경우 기본 값 사용하기: defaultIfEmpty

시퀀스에 데이터가 없을 때 특정 값을 기본으로 사용하고 싶다면 defaultIfEmpty() 메서드를 사용하면 된다. Mono와 Flux 모두 defaultIfEmpty()를 제공한다. 아래 코드는 사용 예이다.


Flux<Item> popularItems = getPopularItems();

Flux<Item> recItems = popularItems.defaultIfEmpty(featureItem);


getPopularItems() 메서드가 인기 상품 목록을 제공한다고 하자. 이 코드는 인기 상품 목록을 제공하는 시퀀스인 popularItems가 데이터가 없는 빈 시퀀스면 featureItem을 값으로 제공하는 Flux로 변환한다. 즉 인기 상품이 있으면 그 상품 목록을 제공하고 그렇지 않으면 미리 지정한 featureItem을 값으로 제공하는 Flux를 생성한다.


위 코드를 조금 더 간결하게 표현하면 다음과 같다.


Flux<Item> recItems = getPopularItems().defaultIfEmpty(featureItem);


빈 시퀀스인 경우 다른 시퀀스 사용하기: switchIfEmpty

시퀀스에 값이 없을 때 다른 시퀀스를 사용하고 싶다면 switchIfEmpty() 메서드를 사용한다.


// public Flux<Item> getPopularItems() { ... }

// public Flux<Item> getFeatureItems() { ... }


Flux<Item> recItems = 

    getPopularItems().switchIfEmpty(getFeatureItems());



특정 값으로 시작하는 시퀀스로 변환: startWith

특정 값으로 시작하도록 시퀀스를 설정하고 싶다면 startWith(T ...) 메서드나 startWith(시퀀스) 메서드를 사용한다.


Flux<Integer> seq1 = Flux.just(1, 2, 3);


Flux<Integer> seq2 = seq1.startWith(-1, 0);

seq2.subscribe(System.out::println);


이 코드에서 seq1은 1, 2, 3을 생성하는 시퀀스인데 startWith(-1, 0)을 사용해서 -1, 0으로 시작하는 시퀀스로 변환했다. 따라서 seq2가 생성하는 데이터는 -1, 0, 1, 2, 3이 된다.


특정 값으로 끝나는 시퀀스로 변환: concatWithValues

시퀀스가 특정 값으로 끝나도록 변환하고 싶다면 concatWithValues(T ...) 메서드를 사용한다.


Flux<Integer> seq = someSeq.concatWithValues(100);

seq.subscribe(System.out::println);


위 코드는 someSeq가 어떤 값을 생성하는지에 상관없이 seq는 가장 마지막에 100을 생성한다.


시퀀스 순서대로 연결: cancatWith

concatWith() 메서드를 사용하면 여러 시퀀스를 순서대로 연결할 수 있다.


Flux<Integer> seq1 = Flux.just(1, 2, 3);

Flux<Integer> seq2 = Flux.just(4, 5, 6);

Flux<Integer> seq3 = Flux.just(7, 8, 9);


seq1.concatWith(seq2).concatWith(seq3).subscribe(System.out::println);


위 코드에서 seq1, seq2, seq3을 차례대로 연결하고 구독을 시작했다. 실행 결과는 1부터 9까지 정수를 출력한다.


concatWith로 연결한 시퀀스는 이전 시퀀스가 종료된 뒤에 구독을 시작한다. 위 예에서는 seq1이 종료된 뒤에 seq2 구독을 시작하고 seq2가 종료된 뒤에 seq3 구독을 시작한다.


시퀀스 발생 순서대로 섞기: mergeWith

시퀀스의 연결 순서가 아니라 시퀀스가 발생하는 데이터 순서대로 섞고 싶다면 mergeWith()를 사용한다. 다음은 예이다.


Flux<String> tick1 = Flux.interval(Duration.ofSeconds(1)).map(tick -> tick + "초틱");

Flux<String> tick2 = Flux.interval(Duration.ofMillis(700)).map(tick -> tick + "밀리초틱");

tick1.mergeWith(tick2).subscribe(System.out::println);


위 코드에서 tick1은 1초 간격으로 데이터를 발생하고 tick2는 700 밀리초 단위로 데이터를 발생한다. 이 두 시퀀스를 mergeWith()로 섞은 뒤 구독하면 두 시퀀스를 동시에 구독한다. 실행 결과는 다음과 같다.


0밀리초틱

0초틱

1밀리초틱

1초틱

2밀리초틱

3밀리초틱

2초틱

4밀리초틱

3초틱

5밀리초틱

...


다음은 실행 결과를 발생 시점 기준으로 그림으로 표시한 것이다. 1초 간격으로 데이터를 발생하는 seq1과 0.7초 간격으로 데이터를 발생하는 seq2를 mergeWith()로 섞을 때 데이터 발생 순서를 알 수 있다.





시퀀스 묶기: zipWith

zipWith()를 사용하면 두 시퀀스의 값을 묶은 값 쌍을 생성하는 시퀀스를 생성할 수 있다. 다음은 사용 예이다.


Flux<String> tick1 = Flux.interval(Duration.ofSeconds(1)).map(tick -> tick + "초틱");

Flux<String> tick2 = Flux.interval(Duration.ofMillis(700)).map(tick -> tick + "밀리초틱");

tick1.zipWith(tick2).subscribe(tup -> System.out.println(tup));


zipWith()는 개수를 맞춰서 두 시퀀스의 데이터를 묶는다. 따라서 위 코드는 아래 그림과 같이 쌍을 묶는다.


zipWith()는 리액터에 포함된 Tuple2 타입을 이용해서 두 값을 쌍으로 묶는다. 위 코드의 실행 결과는 아래와 같다.

[0초틱,0밀리초틱]
[1초틱,1밀리초틱]
[2초틱,2밀리초틱]
[3초틱,3밀리초틱]
[4초틱,4밀리초틱]

시퀀스 묶기: combineLatest


Flux.combineLatest() 메서드로 시퀀스를 묶을 수도 있다. 이 메서드 정적 메서드이다. 발생한 개수를 맞춰서 쌍을 만드는 zipWith()와 달리 combineLatest()는 가장 최근의 데이터를 쌍으로 만든다. 다음은 그 차이를 보여준다.



다음은 예제 코드이다.


Flux<String> tick1 = Flux.interval(Duration.ofSeconds(1)).map(tick -> tick + "초틱");

Flux<String> tick2 = Flux.interval(Duration.ofMillis(700)).map(tick -> tick + "밀리초틱");

Flux.combineLatest(tick1, tick2, (a, b) -> a + "\n" + b).subscribe(System.out::println);


지정한 개수/시간에 해당하는 데이터만 유지: take, takeLast

시퀀스에서 처음 n개의 데이터만 유지하고 싶다면 take(long) 메서드를 사용한다. 비슷하게 지정한 시간 동안 발생한 데이터만 유지하고 싶다면 take(Duration) 메서드를 사용한다. 마지막 n개의 데이터만 유지하고 싶다면 takeLast(long) 메서드를 사용한다.

Flux<Integer> seq1 = someSeq.take(10); // 최초 10개 데이터만 유지
Flux<Integer> seq2 = someSeq.take(Duration.ofSeconds(10)); // 최초 10초 동안 데이터 유지
Flux<Integer> seq3 = someSeq.takeLast(5); // 마지막 5개 데이터만 유지

이 외에 takeWhile()과 takeUntil()도 있다. 이 두 메서드는 Predicate을 인자로 받는다. takeWhile()은 Predicate 구현이 true를 리턴하는 동안 데이터를 포함하고 takeUntil()은 처음 true를 리턴할 때까지 데이터를 포함한다.

지정한 개수/시간만큼 데이터 거르기: skip, skipLast

시퀀스에서 처음 n개의 데이터를 거르고 싶다면 skip(long) 메서드를 사용한다. 지정한 처음 시간 동안 발생한 데이터를 거르고 싶다면 skip(Duration) 메서드를 사용한다. 마지막 n개의 데이터를 거르고 싶다면 skipLast(long) 메서드를 사용한다.

take()와 비슷하게 skipWhile()과 skipUntil()을 지원한다.


관련 글


시퀀스 생성 2: Flux.create(), Flux.fromStream()

이전 글(스프링 리액터 시작하기 2 - 시퀀스 생성 just, generate)에서 살펴본 Flux.generate()는 Subscriber로부터 요청이 있을 때에 next 신호를 발생하는 Flux를 생성한다. 즉 pull 방식의 Flux를 생성한다. 이는 단순하지만 데이터 발생을 비동기나 push 방식으로 할 수 없다는 제약도 있다. Flux.create()를 사용하면 이런 제약 없이 비동기나 push 방식으로 데이터를 발생할 수 있다.


Flux.create()를 이용한 pull 방식 메시지 생성

먼저 Flux.create() 메서드를 이용해서 pull 방식으로 메시지를 생성하는 방법을 살펴보자. 다음은 예제 코드이다.


Flux<Integer> flux = Flux.create( (FluxSink<Integer> sink) -> {

    sink.onRequest(request -> { // request는 Subscriber가 요청한 데이터 개수

        for (int i = 1; i <= request; i++) {

            sink.next(i); // Flux.generate()의 경우와 달리 한 번에 한 개 이상의 next() 신호 발생 가능

        }

    });

});


위 코드에서 Flux.create() 메서드의 파라머티는 함수형 타입 Consumer<? super FluxSink<T>>이다. 이 Consumer는 FluxSink를 이용해서 Subscriber에 신호를 발생할 수 있다.


FluxSink#onRequest(LongConsumer) 메서드의 Consumer는 Subscriber가 데이터를 요청했을 때 불린다. 이때 LongConsumer는 Subscriber가 요청한 데이터 개수를 전달받는다. 위 코드에서는 클라이언트가 요청한 데이터 개수만큼 next 신호를 발생하고 있다.


Flux.generate()와의 차이점은 Flux.generate()의 경우 한 번에 한 개의 next 신호만 발생할 수 있었던 데 비해 Flux.create()는 한 번에 한 개 이상의 next() 신호를 발생할 수 있다는 점이다.


Flux.create()를 이용한 push 방식 메시지  생성

Flux.create()를 이용하면 Subscriber의 요청과 상관없이 비동기로 데이터를 발생할 수 있다. 다음 코드를 보자.


DataPump pump = new DataPump();


Flux<Integer> bridge = Flux.create((FluxSink<Integer> sink) -> {

    pump.setListener(new DataListener<Integer>() {

        @Override

        public void onData(List<Integer> chunk) {

            chunk.forEach(s -> {

                sink.next(s); // Subscriber의 요청에 상관없이 신호 발생

            });

        }

        @Override

        public void complete() {

            logger.info("complete");

            sink.complete();

        }

    });

});


이 코드에서 DataPump는 데이터를 어딘가에서 데이터가 오면 setListener()로 등록한 DataListener의 onData()를 실행한다고 가정하자. DataListener#onData() 메서드는 FluxSink#next()를 이용해서 데이터를 발생한다. DataListener#onData() 메서드는 Subscriber의 데이터 요청과 상관없이 호출된다. 즉 위 코드는 Subscriber의 요청과 상관없이 데이터를 push한다.


Flux.create()와 배압

Subscriber로부터 요청이 왔을 때(FluxSink#onRequest) 데이터를 전송하거나(pull 방식) Subscriber의 요청에 상관없이 데이터를 전송하거나(push 방식) 두 방식 모두 Subscriber가 요청한 개수보다 더 많은 데이터를 발생할 수 있다. 예를 들어 아래 코드를 보자.


Flux<Integer> flux = Flux.create( (FluxSink<Integer> sink) -> {

    sink.onRequest(request -> {

        for (int i = 1; i <= request + 3 ; i++) { // Subscriber가 요청한 것보다 3개 더 발생

            sink.next(i);

        }

    });

});


이 코드는 Subscriber가 요청한 개수보다 3개 데이터를 더 발생한다. 이 경우 어떻게 될까? 기본적으로 Flux.create()로 생성한 Flux는 초과로 발생한 데이터를 버퍼에 보관한다. 버퍼에 보관된 데이터는 다음에 Subscriber가 데이터를 요청할 때 전달된다.


요청보다 발생한 데이터가 많을 때 선택할 수 있는 처리 방식은 다음과 같다.

  • IGNORE : Subscriber의 요청 무시하고 발생(Subscriber의 큐가 다 차면 IllegalStateException 발생)
  • ERROR : 익셉션(IllegalStateException) 발생
  • DROP : Subscriber가 데이터를 받을 준비가 안 되어 있으면 데이터 발생 누락
  • LATEST : 마지막 신호만 Subscriber에 전달
  • BUFFER : 버퍼에 저장했다가 Subscriber 요청시 전달. 버퍼 제한이 없으므로 OutOfMemoryError 발생 가능

Flux.create()의 두 번째 인자로 처리 방식을 전달하면 된다.


Flux.create(sink -> { ... }, FluxSink.OverflowStrategy.IGNORE);


Flux.fromStream(), Flux.fromIterable()을 이용한 Flux 생성

Flux.stream()을 사용하면 자바 8의 Stram에서 Flux를 생성할 수 있다. 다음은 예이다.


Stream<String> straem = Files.lines(Paths.get(filePath));

Flux<String> seq = Flux.fromStream(straem);

seq.subscribe(System.out::println);


Flux.fromIterable()을 이용하면 Iterable을 이용해서 Flux를 생성할 수 있다. List나 Set과 같은 콜렉션에서 Flux를 생성하고 싶을 때 이 메서드를 사용하면 된다.


관련 글





  1. sangpire 2020.02.02 12:45 신고

    Flux.create 로 Publisher 생성시, FluxSink.OverflowStrategy.LATEST 옵션을 준다면, 이 전에 언급하신 '핫 시퀀스' 의 한 형태가 되는 걸까요?

    ps. 블로그도 책도 정말 잘 보고있습니다.
    항상 감사합니다.

    • 최범균 madvirus 2020.02.09 14:11 신고

      OverflowStragey는 Subscriber의 상태에 따라 어떻게 할지를 결정하는 것으로 이는 핫 시퀀스인지 콜드 시퀀스인지 여부와는 다릅니다.

      콜드와 핫을 구분하는 건 구독 시점에 발생하는 신호와 관련이 있죠. 콜드는 새로운 subscriber가 구독을 할 때마다 다시 (동일한) 신호를 발생하죠. 반면에 핫은 구독하는 시점 이후에 발생한 신호만 전달합니다.

시퀀스 생성 1 :just(), generate()

[노트]

시퀀스를 직접 생성할 일이 많지는 않다. 보통은 라이브러리가 제공하는 기능을 사용하기 때문이다. 그럼에도 불구하고 시퀀스 생성 방법을 정리한 이유는 리액터 시퀀스를 생성하는 방법을 살펴보면 리액티브의 동작을 이해하는데 도움이 되기 때문이다.


Flux.just(), Mono.just()로 만들기

시퀀스를 생성하는 가장 쉬운 방법은 Flux.just()를 사용하는 것이다. just() 메서드는 시퀀스로 사용할 데이터가 이미 존재할 때 사용한다. 다음은 사용 예이다.


Flux<Integer> seq = Flux.just(1, 2, 3);


이 Flux는 1, 2, 3 데이터를 차례대로 발생하고 complete 신호를 발생한다. just() 메서드는 가변 인자로 0개 이상의 데이터를 전달할 수 있다. 아래와 같이 발생할 데이터를 주지 않으면 complete 신호만 발생한다.


Flux<Integer> seq = Flux.just();


Mono.just()도 동일하다. 차이라면 Mono는 1개 값만 생성하므로 데이터도 한 개만 받는다는 것이다.


Mono<Integer> seq = Mono.just(1);


Mono.just(null)과 같이 null을 값으로 주면 NullPointerException이 발생한다. 데이터를 발생하지 않는 Mono를 생성하고 싶다면 Mono.empty()를 사용한다.


값이 있을 수도 있고 없을 수도 있는 Mono를 생성할 때에는 justOrEmpty() 메서드를 사용하면 된다. 다음은 사용 예이다.


// null을 값으로 받으면 값이 없는 Mono

Mono<Integer> seq1 = Mono.justOrEmpty(null); // complete 신호

Mono<Integer> seq2 = Mono.justOrEmpty(1); // next(1) 신호- complete 신호


// Optional을 값으로 받음

Mono<Integer> seq3 = Mono.justOrEmpty(Optional.empty()); // complete 신호

Mono<Integer> seq4 = Mono.justOrEmpty(Optional.of(1)); // next(1) 신호 - complete 신호


Flux.range()로 정수 생성하기

Flux.range() 메서드를 사용하면 순차적으로 증가하는 Integer를 생성하는 Flux를 생성할 수 있다. 예를 들어 다음 코드는 11부터 시작해서 5개의 Integer를 생성하는 Flux 시퀀스를 생성한다. 즉 11부터 15까지의 Integer를 생성한다.


Flux<Integer> seq = Flux.range(11, 5);


Flux.generate() 메서드로 Flux 만들기

Flux.generate() 메서드를 사용하면 데이터를 함수를 이용해서 생성할 수 있다. Flux.generate() 함수는 동기 방식으로 한 번에 1개의 데이터를 생성할 때 사용한다. Flux.generate() 메서드 중 하나는 다음과 같다.

  • Flux<T> generate(Consumer<SynchronousSink<T>> generator)
generator는 Subscriber로부터 요청이 왔을 때 신호를 생성한다. generate()가 생성한 Flux는 다음과 같은 방식으로 신호를 발생한다.
  • Subscriber의 요청에 대해 인자로 전달받은 generator를 실행한다. generator를 실행할 때 인자로 SynchronousSink를 전달한다.
  • generator는 전달받은 SynchronousSink를 사용해서 next, complete, error 신호를 발생한다. 한 번에 1개의 next() 신호만 발생할 수 있다.
예제 코드를 보자

Consumer<SynchronousSink<Integer>> randGen = new Consumer<>() {
    private int emitCount = 0;
    private Random rand = new Random();

    @Override
    public void accept(SynchronousSink<Integer> sink) {
        emitCount++;
        int data = rand.nextInt(100) + 1; // 1~100 사이 임의 정수
        logger.info("Generator sink next " + data);
        sink.next(data); // 임의 정수 데이터 발생
        if (emitCount == 10) { // 10개 데이터를 발생했으면
            logger.info("Generator sink complete");
            sink.complete(); // 완료 신호 발생
        }
    }
};

Flux<Integer> seq = Flux.generate(randGen);

seq.subscribe(new BaseSubscriber<>() {
    private int receiveCount = 0;
    @Override
    protected void hookOnSubscribe(Subscription subscription) {
        logger.info("Subscriber#onSubscribe");
        logger.info("Subscriber request first 3 items");
        request(3);
    }

    @Override
    protected void hookOnNext(Integer value) {
        logger.info("Subscriber#onNext: " + value);
        receiveCount++;
        if (receiveCount % 3 == 0) {
            logger.info("Subscriber request next 3 items");
            request(3);
        }
    }

    @Override
    protected void hookOnComplete() {
        logger.info("Subscriber#onComplete");
    }
});.

randGen의 accept() 메서드는 1~100 사이의 임의 정수를 생성한 뒤 인자로 SynchronousSink의 next() 메서드를 이용해서 next 신호를 발생한다. emitCount가 10이면(즉 데이터를 10개 발생했다면) SynchronousSink#complete() 메서드를 이용해서 complete 신호를 발생한다.

randGen은 신호 발생 기능을 제공할 뿐이며 실제 시퀀스는 Flux.generate()를 이용해서 생성했다.

seq.subscribe() 메서드에 전달한 Subscriber는 구독 시점에 3개의 데이터를 요청하고(hookOnSubscribe() 메서드의 request(3) 코드), 데이터를 3개 수신할 때마다 다시 3개의 데이터를 요청한다(hookOnNext() 메서드의 request(3) 코드).

콘솔에 관련 문장을 출력해서 실행 흐름을 알 수 있도록 했다. 위 코드를 실제로 실행해보면 다음 내용이 콘솔에 출력된다. 원래 출력에는 빈 줄이 없는데 쉬운 구분을 위해 빈 줄을 넣었고 Subscriber의 출력은 파란색으로 표시했다.

Subscriber#onSubscribe
Subscriber request first 3 items

Generator sink next 17
Subscriber#onNext: 17
Generator sink next 83
Subscriber#onNext: 83
Generator sink next 53
Subscriber#onNext: 53
Subscriber request next 3 items

Generator sink next 12
Subscriber#onNext: 12
Generator sink next 38
Subscriber#onNext: 38
Generator sink next 90
Subscriber#onNext: 90
Subscriber request next 3 items

Generator sink next 23
Subscriber#onNext: 23
Generator sink next 70
Subscriber#onNext: 70
Generator sink next 76
Subscriber#onNext: 76
Subscriber request next 3 items

Generator sink next 52
Subscriber#onNext: 52
Generator sink complete
Subscriber#onComplete

Flux가 제공하는 다른 generate() 메서드로는 다음이 있다.
  • Flux<T> generate(Callable<S> stateSupplier, BiFunction<S, SynchronousSink<T>, S> generator)
  • Flux<T> generate(Callable<S> stateSupplier, BiFunction<S, SynchronousSink<T>, S> generator,
                                 Consumer<? super S> stateConsumer)
stateSupplier는 값을 생성할 때 사용할 최초 상태이다. BiFunction 타입의 generator는 인자로 상태와 SynchronousSink를 입력받아 결과로 다음 상태를 리턴하는 함수이다. 앞서 예제와 마찬가지로 SynchronousSink를 사용해서 신호를 생성한다. 두 번째 generate() 메서드의 stateConsumer는 상태를 정리할 때 사용한다. generator가 complete 신호나 error 신호를 발생하면 상태 정리를 위해 stateConsumer를 실행한다.

다음은 상태를 사용하는 Flux.generate()를 이용해서 임의 숫자 10개를 발생시키는 Flux를 생성하는 코드 예이다.

Flux<String> flux = Flux.generate(
        () -> { // Callable<S> stateSupplier
            return 0;
        },
        (state, sink) -> { // BiFunction<S, SynchronousSink<T>, S> generator
            sink.next("3 x " + state + " = " + 3 * state);
            if (state == 10) {
                sink.complete();
            }
            return state + 1;
        });

이 코드에서 flux는 "3 x 1 = 3" 부터 "3 x 10 = 10" 까지의 데이터를 담은 next 신호를 차례대로 발생하고 state 값이 10이 되면 compelete 신호를 발생한다.

Flux.just()나 Flux.generate()는 데이터 생성 과정 자체가 동기 방식이다. Subscriber로부터 데이터 요청이 오면 그 시점에 SynchronousSink를 이용해서 데이터를 생성한다. 반면에 별도 쓰레드를 이용해서 비동기로 데이터를 생성해야 하는 경우에는 SynchronousSink를 사용할 수 없다. 게다가 SynchronousSink는 한 번에 하나의 next 신호만 발생할 수 있다. 예를 들어 아래 코드는 에러를 발생한다.


Flux<String> flux = Flux.generate(

        () -> 1,

        (state, sink) -> {

            sink.next("Q: 3 * " + state);

            sink.next("A: " + (3 * state)); // 에러!

            if (state == 10) {

                sink.complete();

            }

            return state + 1;

        });


데이터 자체를 비동기로 생성해야 하거나 번에 다수의 next 신호를 발생해야 할 경우 Flux.generate()로는 처리할 수 없다. 이 때에는 Flux.create() 메서드를 사용해야 하는데 이 메서드를 포함한 Flux를 생성하는 또 다른 방법은 다음 글에서 이어서 살펴본다.


관련 글


리액티브 스트림즈, Flux, Mono

스프링은 웹 요청 처리, HTTP 클라이언트, NoSQL 연동 등 많은 영역에서 리액티프 프로그래밍을 지원하고 있다. 스프링 리액터는 스프링에서 리액티브 프로그래밍을 위한 핵심 모듈이다. 리액터를 잘 사용하려면 많은 것들을 알아야 하지만 가장 기본이 되는 두 타입인 Flux와 Mono에 대해 알아야 한다. 이 글에서는 리액티브 프로그래밍의 핵심 개념인 스프림에 대해 살펴보고 Flux와 Mono의 기본적인 사용법을 살펴본다.


왜 리액티브인가?

서버 관점에서 리액티브를 사용하는 이유 중 하나는 비동기/논블록을 이용해서 더 적은 자원으로 더 많은 트래픽을 처리하기 위함이다. 관련 내용은 "왜 리액티브인가 요약" 글을 참고한다.


리액티브 스트림

리액티브 스트림즈(http://www.reactive-streams.org/)는 비동기 스트림 처리를 위한 표준이다. 스프링 리액터는 이를 구현한 라이브러리이며 자바9의 Flow API도 리액티브 스트림 API를 따르고 있다. 


스트림은 시간이 지남에 따라 생성되는 일련의 데이터/이벤트(event)/신호(signal)이다. 맥락에 따라 데이터, 이벤트, 신호라는 용어를 사요한다. 이 글에서도 필요에 따라 이들 용어를 혼용해서 사용할 것이다. 리액티브 스트림즈 스펙에서는 용어로 신호를 사용한다. 리액티브 스트림즈는 다음 세 신호를 발생할 수 있다.

  • onNext* (onComplete | onError)?

스트림은 0개 이상의 next 신호를 발생할 수 있다. next 신호는 데이터를 담는다. complete 신호는 스트림이 끝났음을 의미하며 error 신호는 에러가 발생했음을 의미한다. complete와 error는 둘 중 하나만 발생할 수 있으며, 이 두 신호는 발생하지 않을 수도 있다. 


스트림의 예로 1분 간격 현재 기온 스트림을 들 수 있다. 이 데이터는 개념적으로 complete나 error 없이 next 신호만 1분 간격으로 발생한다. 파일 스트림은 파일을 읽는 동안 데이터를 담은 next 신호를 발생하고 파일을 다 읽으면 compelete 신호를 발생한다. 파일을 읽는 도중 에러가 발생하면 error 신호를 발생한다.


리액티브 스트림즈는 Publisher를 이용해서 스트림을 정의하며 Subscriber를 이용해서 발생한 신호를 처리한다. Subscriber가 Publisher로부터 신호를 받는 것을 구독이라고 한다. 다음 코드는 스프링 리액터가 제공하는 Publisher의 한 종류인 Flux에 대해 구독하는 코드 예를 보여준다.


Flux<Integer> seq = Flux.just(1, 2, 3); // Integer 값을 발생하는 Flux 생성


seq.subscribe(value -> System.out.println("데이터 : " + value)); // 구독


리액터는 스트림이라는 용어 대신 시퀀스라는 용어를 주로 사용한다. 위 코드에서 변수 이름이 seq인 이유는 시퀀스를 의미하기 위함이다. 첫 줄은 1, 2, 3 값을 차례대로 발생하는 Flux를 생성한다.


실제 값 발생은 구독(subscription) 시점에 이뤄진다. 위 코드는 Flux#subscribe(Consumer) 메서드를 이용해서 구독한다. 이 경우 Flux가 발생한 신호를 Consumer가 받아서 처리한다. 위 코드는 수신한 데이터를 콘솔에 출력하므로 위 코드를 실행하면 다음과 같은 결과가 출력된다.


데이터 : 1

데이터 : 2

데이터 : 3


물론 이렇게 단순한 작업을 하기 위해 리액티브 프로그래밍을 사용하는 것은 아니다. 스케줄링, 다양한 조합 기능을 이용해서 이전보다 더 간결하면서도 자원을 효율적으로 사용하는 코드를 작성할 수 있다.


[노트]

리액티브 스트림즈는 스트림이라는 표현을 사용하지만 이는 자바 8의 스트림과 혼동할 수 있다. 이런 이유로 "리액티브 스트림즈" 자체를 표현할 때가 아니면 스트림 대신 시퀀스라는 용어를 사용하겠다.


리액터 사용 위한 메이븐 설정

리액터는 reactor-core, reactor-netty, reactor-extra, reactor-adapter 등 다양한 모듈로 구성되어 있다. 각 모듈의 버전을 맞추기 위해 리액터는 메이븐 BOM(Bill Of Materials)을 제공한다. 이 글에서는 reactor-core 의존만 사용하긴 하지만 BOM을 포함한 의존 설정을 사용해보자. 메이븐 의존 설정은 다음과 같다.


<dependencyManagement>

    <dependencies>

        <dependency>

            <groupId>io.projectreactor</groupId>

            <artifactId>reactor-bom</artifactId>

            <version>Bismuth-SR9</version>

            <type>pom</type>

            <scope>import</scope>

        </dependency>

    </dependencies>

</dependencyManagement>


<dependencies>

    <dependency>

        <groupId>io.projectreactor</groupId>

        <artifactId>reactor-core</artifactId>

    </dependency>


    <dependency>

        <groupId>org.slf4j</groupId>

        <artifactId>slf4j-api</artifactId>

        <version>1.7.12</version>

    </dependency>

    <dependency>

        <groupId>ch.qos.logback</groupId>

        <artifactId>logback-classic</artifactId>

        <version>1.2.3</version>

    </dependency>

</dependencies>


reactor-bom의 Bismuth 버전은 스프링 리액터 3.1 버전을 정의한다. Bismuth-SR9 버전은 reactor-core 3.1.7.RELEASE를 기준으로 한다.


스프링 리액터의 Publisher: Flux와 Mono

스프링 리액터는 Flux와 Mono의 두 가지 Publisher를 제공하고 있다. 이 두 타입은 발생할 수 있는 데이터 개수에 차이가 있다. Flux는 0개 이상의 데이터를 발생할 수 있고 Mono는 0 또는 1개의 데이터를 발생할 수 있다.


앞서 Publisher는 next, complete, error 신호를 발생할 수 있다고 했다. Flux는 0개 이상의 데이터를 발생하므로 0개 이상의 next 신호를 발생할 수 있고 complete나 error 신호를 발생하거나 발생하지 않을 수 있다. 예를 들어 다음 코드를 보자.


Flux.just(1, 2, 3);


이 코드에서 seq 시퀀스는 1, 2, 3을 값으로 갖는 세 개의 next 신호를 발생하고 마지막에 complete 신호를 발생해서 시퀀스를 끝낸다. 즉 시간 순으로 표시하면 다음과 같이 시퀀스가 발생한다('--->'는 시간축, '|'는 complete 신호 의미).


--1-2-3-|-->


아래 코드와 같이 아무 값도 발생하지 않는 시퀀스는 complete 신호만 발생한다.


Flux.just(); // --|-->


Mono도 유사하다. 차이라면 최대 발생할 수 있는 값이 1개라는 점이다.


Mono.just(1); // --1-|-->

Mono.empty(); // --|-->


just() 메서드는 이미 존재하는 값을 사용해서 Flux/Mono를 생성할 때 사용된다. just() 외에 create(), generate()를 이용해서 생성할 수 있는데 이에 대한 내용은 나중에 정리해보겠다.


[노트]

Flux와 Mono를 직접 생성하기보다는 다른 라이브러리가 제공하는 Flux와 Mono를 사용할 때가 많다. 예를 들어 스프링 5 버전에 추가된 WebClient 클래스를 사용할 때에는 WebClient가 생성하는 Mono를 이용해서 데이터를 처리한다.


구독과 신호 발생

시퀀스는 바로 신호를 발생하지 않는다. 구독을 하는 시점에 신호를 발생하기 시작한다. 코드로 확인해보자. 먼저 다음 코드를 보자.


Flux.just(1, 2, 3)

     .doOnNext(i -> System.out.println("doOnNext: " + i))

     .subscribe(i -> System.out.println("Received: " + i));


위 코드에서 doOnNext() 메서드는 Flux가 Subscriber에 next 신호를 발생할 때 불린다. 실행 결과는 다음과 같다.


doOnNext: 1

Received: 1

doOnNext: 2

Received: 2

doOnNext: 3

Received: 3


이제 코드를 다음과 같이 바꾸고 다시 실행해보자.


Flux<Integer> seq = Flux.just(1, 2, 3)

        .doOnNext(i -> System.out.println("doOnNext: " + i));


System.out.println("시퀀스 생성");

seq.subscribe(i -> System.out.println("Received: " + i));


실행 결과는 다음과 같다.


시퀀스 생성

doOnNext: 1

Received: 1

doOnNext: 2

Received: 2

doOnNext: 3

Received: 3


이 결과를 보면 시퀀스를 생성한 시점에는 doOnNext에 전달한 함수가 실행되지 않는 것을 알 수 있다. doOnNext에 전달한 함수는 next 신호를 발생할 때 호출되기 때문이다. subscribe()를 실행해서 구독을 한 이후에 doOnNext에 전달한 코드가 실행되는데 이는 subscribe() 시점에 신호를 발생하기 시작한다는 것을 보여준다.


콜드 시퀀스 vs 핫 시퀀스

시퀀스는 구독 시점부터 데이터를 새로 생성하는 콜드(cold) 시퀀스와 구독자 수에 상관없이 데이터를 생성하는 핫(hot) 시퀀스로 나뉜다.


앞 예제 Flux.just()로 생성한 시퀀스가 콜드 시퀀스이다. 콜드 시퀀스는 구독을 하지 않으면 데이터를 생성하지 않는다. 구독을 하면 그 시점에 데이터를 새롭게 발생한다. 다음 코드는 이런 특징을 보여준다.


Flux<Integer> seq = Flux.just(1, 2, 3);

seq.subscribe(v -> System.out.println("구독1: " + v)); // 구독

seq.subscribe(v -> System.out.println("구독2: " + v)); // 구독


이 코드는 seq 시퀀스에 대해 구독을 두 번한다. 코드 결과는 다음과 같은데 이 결과를 보면 seq 시퀀스는 각 구독마다 데이터를 새롭게 생성하는 것을 알 수 있다.


구독1: 1

구독1: 2

구독1: 3

구독2: 1

구독2: 2

구독2: 3


콜드 시퀀스의 예로 API 호출을 들 수 있다. API 호출 시퀀스는 구독을 할 때마다 매번 새로운 요청을 서버에 전송하고 결과를 받는다.


핫 시퀀스는 구독 여부에 상관없이 데이터가 생성된다. 구독을 하면 구독한 시점 이후에 발생하는 데이터부터 신호를 받는다. 핫 시퀀스 예로 센서 데이터를 들 수 있다. 센서 데이터를 제공하는 시퀀스를 구독하면 그 시점 이후에 센서가 발생한 데이터부터 받게 된다.


Subscriber와 Subscription

앞 코드 예에서 다음의 subscribe() 메서드를 사용해서 구독을 했다.


// Flux나 Mono

subscribe(Consumer<? super T> consumer)


Consumer를 파라미터로 갖는 subscribe() 메서드는 리액터가 편의를 위해 제공하는 메서드로서 이 메서드는 내부적으로 Subscriber를 인자로 받는 subscribe() 메서드를 실행한다.


Subscriber는 리액티브 스트림즈에 포함된 인터페이스로 다음과 같이 정의되어 있다.


package org.reactivestreams;


public interface Subscriber<T> {

    void onSubscribe(Subscription s);

    void onNext(T t);

    void onError(Throwable t);

    void onComplete();

}


각 메서드는 다음과 같다.

  • onSubscribe(Subscription s): 구독을 하면 Publisher와 연동된 Subscription을 받는다. 전달받은 Subscription을 이용해서 Publisher에 데이터를 요청한다.
  • onNext(T t): Publisher가 next 신호를 보내면 호출된다.
  • onError(Throwable t): Publisher가 error 신호를 보내면 호출된다.
  • onComplete(): Publisher가 complete 신호를 보내면 호출된다.

각 메서드가 어떻게 동작하는지 다음 예제 코드로 알아보자.


Flux<Integer> seq = Flux.just(1, 2, 3);


seq.subscribe(new Subscriber<>() {

    private Subscription subscription;

    @Override

    public void onSubscribe(Subscription s) {

        System.out.println("Subscriber.onSubscribe");

        this.subscription = s;

        this.subscription.request(1); // Publisher에 데이터 요청

    }


    @Override

    public void onNext(Integer i) {

        System.out.println("Subscriber.onNext: " + i);

        this.subscription.request(1); // Publisher에 데이터 요청

    }


    @Override

    public void onError(Throwable t) {

        System.out.println("Subscriber.onError: " + t.getMessage());

    }


    @Override

    public void onComplete() {

        System.out.println("Subscriber.onComplete");

    }

});


subscribe() 메서드에 전달한 임의 Subscriber 객체의 onSubscribe() 메서드는 인자로 전달받은 Subscription 객체를 필드에 저장한다. Subscription은 구독 라이프사이클을 관리한다. 예를 들어 Subscription#request() 메서드는 Publisher에 데이터 요청 신호를 보낸다. 위 코드는 request(1)을 실행했는데 이는 1개의 데이터를 요청한다는 것을 의미한다. 즉 onSubscribe() 메서드는 파라미터로 전달받은 Subscription을 이용해서 Publisher에 1개의 데이터를 요청한다.

Publisher가 데이터 신호(next 신호)를 보내면 Subscriber#onNext() 메서드가 불린다. 위 예제에서는 전달받은 데이터를 출력하고 Subscription#request()를 이용해서 다음 데이터 1개를 요청한다. 즉 이 코드는 최초 구독 시점에 데이터 1개를 요청하고(onSubscribe) 이후 한 개의 데이터를 받으면 다시 한 개의 데이터를 요청한다(onNext).

실제 위 코드를 실행하면 다음 내용이 콘솔에 출력된다.

Subscriber.onSubscribe
Subscriber.onNext: 1
Subscriber.onNext: 2
Subscriber.onNext: 3
Subscriber.onComplete

이번엔 onNext() 메서드에서 다음처럼 subscription.request(1) 코드를 주석처리하고 다시 실행해보자.

Flux<Integer> seq = Flux.just(1, 2, 3);
seq.subscribe(new Subscriber<>() {
    ...생략

    @Override
    public void onNext(Integer i) {
        System.out.println("Subscriber.onNext: " + i);
        // this.subscription.request(1);
    }

    ...생략
});

결과는 다음과 같다.

Subscriber.onSubscribe
Subscriber.onNext: 1

onSubscribe()에서만 1개의 데이터를 요청하고 onNext()에서는 어떤 요청도 하지 않아 Publisher가 1개 데이터만 발생한 것을 알 수 있다.

request(Long.MAX_VALUE)를 사용하면 개수 제한없는 데이터 요청 신호를 Publisher에 보낸다. Publisher는 이 신호를 받으면 끝까지 데이터를 발생시킨다. 이를 다음과 같이 변경해보자.

Flux<Integer> seq = Flux.just(1, 2, 3);
seq.subscribe(new Subscriber<>() {
    private Subscription subscription;
    @Override
    public void onSubscribe(Subscription s) {
        System.out.println("Subscriber.onSubscribe");
        this.subscription = s;
        this.subscription.request(Long.MAX_VALUE);
    }

    @Override
    public void onNext(Integer i) {
        System.out.println("Subscriber.onNext: " + i);
    }

    ...생략
});

위 코드를 실행하면 Publisher가 모든 데이터를 발생한 것을 확인할 수 있다.

푸시 모델 vs 풀 모델

Subscription#request()는 Subscriber가 데이터를 처리할 수 있을 때 Publisher에게 데이터를 요청하는 풀(pull) 모델이다. 하지만 request(Long.MAX_VALUE)로 요청하면 Publisher는 개수 제한 없이 Subscriber에 데이터를 전송한다. 이는 완전한 푸시(push) 모델이다. 또 request(100000)을 사용하면 십 만 개의 데이터를 요청하고, Publisher는 발생한 데이터가 십 만 개가 될 때까지 신호를 보낸다. 데이터 요청은 풀 모델로 이루어졌지만 10만 개의 데이터를 전송하는 동안은 실질적으로 푸시 모델과 같다.

subscribe() 메서드

다음은 리액터가 제공하는 subscribe() 메서드이다.
  • subscribe()
  • subscribe(Consumer<? super T> consumer)
  • subscribe(Consumer<? super T> consumer,
                 Consumer<? super Throwable> errorConsumer)
  • subscribe(Consumer<? super T> consumer,
                 Consumer<? super Throwable> errorConsumer,
                 Runnable completeConsumer)
  • subscribe(Consumer<? super T> consumer,
                 Consumer<? super Throwable> errorConsumer,
                 Runnable completeConsumer,
                 Consumer<? super Subscription> subscriptionConsumer)
  • subscribe(Subscriber<? super T> actual)
  • subscribe(CoreSubscriber<? super T> actual)

메서드의 각 파라미터는 다음과 같다.

  • consumer: next 신호 처리
  • errorConsumer: error 신호 처리
  • completeConsumer: complete 신호 처리
  • subscriptionConsumer: Subscriber의 onSubscribe 메서드에 대응
  • actual: Subscriber나 CoreSubscriver 타입

관련글


  1. kyungsik-oh 2018.09.27 14:49

    핫 시퀀스는 구독 여부에 상관없이 데이터가 생성된다. 구독을 하면 구독한 시점 이후에 발생하는 데이터부터 신호를 받는다. 콜드 스트림의 예로 센서 데이터를 들 수 있다. 센서 데이터를 제공하는 시퀀스스에 구독을 하면 그 시점 이후에 센서가 발생한 데이터부터 받게 된다.

    -->

    여기 구문에 오타가 있는 것 같습니다 : )
    콜드스트림의 예로 센서 데이터를 들 수 있다 --> 핫 시퀀스(핫스트림)의 예로 센서 데이터를 들 수 있다.
    시퀀스스에 구독을 하면 --> 시퀀스에 구독을 하면

+ Recent posts