주요글: 도커 시작하기

R2DBC(Reactive Relational Database Connectivity)는 SQL 데이터베이스를 위한 리액티브 API이다. 리액티브 스트림즈를 기반으로 SQL을 실행하는데 필요한 커넥션, 쿼리 실행, 트랜잭션 처리 등에 대한 API를 정의하고 있다. JDBC API처럼 R2DBC는 API만 정의하고 있다. r2dbc-spi 모듈이 SPI(service-provider interface)로서 각 드라이버는 SPI에 정의된 인터페이스를 알맞게 구현한다. 이 글을 쓰는 시점에서 R2DBC 버전은 0.8.0이며 스펙, 드라이버 구현 등 관련 문서는 https://r2dbc.io/ 사이트에서 확인할 수 있다.

 

* 이 글에서는 리액티브 스트림즈에 대한 내용은 설명하지 않는다. Publisher나 Subscriber에 대한 내용은 https://javacan.tistory.com/entry/Reactor-Start-1-RS-Flux-Mono-Subscriber 글을 참고한다.

의존 설정

R2DBC를 사용하려면 구현을 제공하는 드라이버가 필요하다. 예제에서는 r2dbc-mysql 드라이버를 사용한다.

<dependency>
    <groupId>dev.miku</groupId>
    <artifactId>r2dbc-mysql</artifactId>
    <version>0.8.0.RELEASE</version>
</dependency>

r2dbc-mysql 0.8.0은 r2dbc-spi 0.8.0 버전을 지원하며 r2dbc-mysql은 스프링 리액터와 네티를 이용해서 구현되어 있다.

주요 구성 요소

r2dbc를 이용해서 SQL을 실행하기 위해 사용하는 주요 구성 요소는 다음과 같다. 모두 io.r2dbc.spi 패키지에 위치하며ConnectionFactories를 제외한 나머지는 인터페이스이다.

 

  • ConnectionFactory : Connection을 생성하는 팩토리
  • ConnectionFactories : ConnectionFactory를 검색해서 제공하는 유틸리티
  • Connection : 데이터베이스에 대한 커넥션
  • Statement : 실행할 SQL
  • Batch : 배치로 실행할 SQL
  • Result : 쿼리 실행 결과

ConnectionFactory 구하기

쿼리를 실행하려면 커넥션을 먼저 구해야 한다. R2DBC는 ConnectionFactory를 이용해서 커넥션을 구할 수 있다. 드라이버이가 제공하는 ConnectionFactory를 직접 생성하거나 ConnectionFactories를 이용해서 ConnectionFactory를 구할 수도 있다. 참고로 ConnectionFactories는 자바 서비스 프로바이더를 이용해서 ConnectionFactory를 찾는다.

 

다음 코드는 ConnectionFactories.get() 메서드를 이용해서 ConnectionFactory를 구하는 코드 예이다.

String url = "r2dbcs:mysql://user:pw@localhost:3306/test";
ConnectionFactory connectionFactory = ConnectionFactories.get(url);

JDBC URL과 비슷하게 R2DBC도 URL을 이용해서 연결할 DB 정보를 지정한다. 다음은 URL 구성 요소를 보여준다.

scheme:driver:protocol://authority/path?query
  • scheme : URL이 유효한 R2DBC URL임을 지정한다. 유효한 스킴은 r2dbc와 r2dbcs(SSL 용)이다.
  • driver : 드라이버를 지정한다.
  • protocol : 드라이버에 따라 프로토콜 정보를 지정한다(선택).
  • authority : 접속할 DB와 인증 정보를 포함한다.
  • path : 초기 스카마나 데이터베이스 이름을 지정한다(선택).
  • query : 추가 설정 옵션을 전달할 때 사용한다(선택).

각 드라이버마다 URL 값이 다르므로 드라이버 문서를 참고한다.

Connection을 구하고 쿼리 실행하기

다음 코드는 Connection을 구하고 쿼리를 실행해서 원하는 결과를 출력하는 예이다.

CountDownLatch latch = new CountDownLatch(1);

ConnectionFactory connectionFactory = ConnectionFactories.get(url);

Publisher<? extends Connection> connPub = connectionFactory.create();
connPub.subscribe(new BaseSubscriber<Connection>() {
    @Override
    protected void hookOnNext(Connection conn) {
        Statement stmt = conn.createStatement("select id, name from member where name = ?name");
        stmt.bind("name", "최범균");
        Publisher<? extends Result> resultPub = stmt.execute();
        resultPub.subscribe(new BaseSubscriber<Result>() {
            @Override
            protected void hookOnNext(Result result) {
                Publisher<Member> memberPub = result.map((row, meta) -> 
                    new Member(row.get(0, String.class), row.get(1, String.class))
                );
                memberPub.subscribe(new BaseSubscriber<Member>() {
                    @Override
                    protected void hookOnNext(Member member) {
                        logger.info("회원 데이터 : {}", member);
                    }
                });
            }

            @Override
            protected void hookFinally(SignalType type) {
                conn.close().subscribe(new BaseSubscriber<Void>() {
                    @Override
                    protected void hookFinally(SignalType type) {
                        latch.countDown();
                    }
                });
            }
        });
    }
});

latch.await(); // latch.countDown() 실행 전까지 블록킹

코드가 다소 복잡하다. R2DBC의 주요 API는 리액티브 스트림의 Publisher 타입을 리턴하는데 그 타입을 그대로 사용해서 다소 복잡해졌다. Publisher#subscribe() 메서드에 전달할 Subscriber 구현 객체는 BaseSubscriber를 이용해서 생성했다. BaseSubscriber는 스프링 리액터가 제공하는 Subscriber 구현 클래스로 필요한 기능만 구현하기 위해 이 클래스를 사용했다.

 

먼저 Connection을 구하고 Statement를 생성하는 코드만 보자.

ConnectionFactory connectionFactory = ConnectionFactories.get(url);

final Publisher<? extends Connection> connPub = connectionFactory.create();
connPub.subscribe(new BaseSubscriber<Connection>() {
    @Override
    protected void hookOnNext(Connection conn) {
        Statement stmt = conn.createStatement("select id, name from member where name = ?name");
        stmt.bind("name", "최범균");
        Publisher<? extends Result> resultPub = stmt.execute();
        resultPub.subscribe(...);
    }
});

ConnectionFactory#create() 메서드가 생성한 Publisher는 연결에 성공하면 next 신호로 Connection을 보낸다. Subscriber는 onNext() 메서드로 Connection을 받아 쿼리를 실행한다(BaseSubscriber를 사용한 경우 hookOnNext() 메서드 사용).

 

쿼리를 실행하기 위한 Statement는 Connection#createStatement() 메서드로 생성한다. createStatement() 메서드는 실행할 쿼리를 입력받는다. JDBC의 Statement와 PreparedStatement가 R2DBC에서는 Statement 하나로 처리한다.

 

Statement#bind() 메서드를 이용해서 바인딩 파라미터에 값을 전달한다. 다음은 Statement가 제공하는 바인딩 파라미터 관련 메서드이다.

  • Statement bind(int index, Object value)
  • Statement bind(String name, Object value)
  • Statement bindNull(int index, Class<?> type)
  • Statement bindNull(String name, Class<?> type)

한 가지 주의할 점은 쿼리에서 사용하는 바인딩 파라미터가 JDBC와 다르다는 점이다. JDBC의 PreparedStatement는 물음표(?)를 이용해서 바인딩 파라미터를 지정하고 1부터 시작하는 인덱스를 사용한다. 반면에 DBMS가 사용하는 바인딩 파라미터를 사용하며 이름이나 0부터 시작하는 인덱스를 사용할 수 있다. 예를 들어 MySQL은 위 코드에서 보는 것처럼 바인딩 파라미터로 "?이름" 형식을 사용한다.

 

Statement#execute() 메서드는 Publisher<Result> 타입을 리턴한다. Result는 쿼리 실행 결과를 제공한다. Result가 제공하는 다음의 두 메서드를 이용해서 쿼리 실행 결과를 구할 수 있다.

public interface Result {
    // 변경된 행 개수를 리러탄한다.
    Publisher<Integer> getRowsUpdated();
    // 조회 결과(Row)를 변환한다.
    <T> Publisher<T> map(BiFunction<Row, RowMetadata, ? extends T> mappingFunction);
}

Row는 조회한 한 행에 대응한다. Row는 행의 데이터를 조회하기 위한 get() 메서드를 제공한다.

  • T get(int index, Class<T> type) : JDBC API와 달리 index는 0부터 시작한다. 
  • T get(String name, Class<T> type)

예제 코드는 SELECT 쿼리 실행 결과를 처리하므로 map() 메서드를 이용해서 조회환 결과를 Member 객체로 변환했다. 

Publisher<? extends Result> resultPub = stmt.execute();
resultPub.subscribe(new BaseSubscriber<Result>() {
    @Override
    protected void hookOnNext(Result result) {
        Publisher<Member> memberPub = result.map((row, meta) ->
                new Member(row.get(0, String.class), row.get(1, String.class))
        );
        memberPub.subscribe(new BaseSubscriber<Member>() {
            @Override
            protected void hookOnNext(Member member) {
                logger.info("회원 데이터 : {}", member);
            }
        });
    }

    @Override
    protected void hookFinally(SignalType type) {
        conn.close().subscribe(...);
    }
});

DB 작업이 끝나면 Connection#close() 메서드로 커넥션을 종료해야 한다. close() 메서드는 Publisher<Void>를 리턴하므로 실제 커넥션 종료는 close() 메서드가 리턴한 Publisher를 구독해야 실행된다.

 

리액터가 제공하는 BaseSubscriber는 complete 신호나 error 신호가 오면 hookFinally()를 실행하는 기능을 구현하고 있으므로 에러 여부에 상관없이 커넥션을 종료하기 위해 이 메서드를 이용했다.

Connection을 구하고 쿼리 실행하기 : 리액터 이용

Publisher를 이용하면 구독 처리 코드가 중첩되어 복잡해진다. 중첩 구조는 리액터를 사용해서 없앨 수 있다. 다음 코드는 리액터를 사용해서 구현현 예이다.

Mono<? extends Connection> connMono = Mono.from(connectionFactory.create());

Flux<Member> members = connMono.flatMapMany(conn -> {
        Flux<? extends Result> resultFlux = Flux.from(
                conn.createStatement("select id, name from member where name like ?name")
                        .bind("name", "%범%")
                        .execute()
        );

        Flux<Member> memberFlux = resultFlux.flatMap(result ->
                result.map((row, meta) ->
                        new Member(row.get("id", String.class), row.get("name", String.class))
                )
        );

        return memberFlux.doFinally(signal -> Mono.from(conn.close()).subscribe());
    }
);

List<Member> ret = members.collectList().block();

트랜잭션 처리

Connection은 트랜잭션 처리를 위해 다음 메서드를 제공한다.

  • Publisher<Void> beginTransaction()
  • Publisher<Void> commitTransaction()
  • Publisher<Void> rollbackTransaction()

다음은 트랜잭션 처리 예를 보여준다.

Mono<Integer> updatedMono = Mono.from(connectionFactory.create())
    .flatMap(conn -> {
        final Mono<Void> txMono = Mono.from(conn.beginTransaction());
        final Mono<Integer> updMono = txMono.then(
                Mono.from(conn.createStatement("insert into member values (?id, ?name)")
                        .bind("id", "bkchoi4")
                        .bind("name", "최범균2")
                        .execute()
                ).flatMap(
                        result -> Mono.from(result.getRowsUpdated())
                )
        );
        return updMono.delayUntil(ret -> conn.commitTransaction())
            .onErrorResume(err -> Mono.from(conn.rollbackTransaction()).then(Mono.error(err)))
            .doFinally(signal -> Mono.from(conn.close()).subscribe());
    });

트랜잭션 커밋/롤백 처리를 위해 리액터의 delayUntil(), onErrorResume() 메서드를 사용했다. 참고로 delayUntil() 메서드는 인자로 받은 Publisher가 종료된 뒤에 종료하며 onErrorResume() 메서드는 에러 발생시 인자로 받은 Publisher를 실행한다. 트랜잭션 롤백 처리 후에 Mono.error()를 이용해서 에러를 다시 발생시키도록 했다.

 

+ Recent posts