스프링 데이터(Spring Data) R2DBC를 사용하면 R2DBC 기반의 리포지토리를 쉽게 구현할 수쉽게 사용할 수 있다. 이 글에서는 이 중에서 스프링 데이터 R2DBC가 제공하는 DatabaseClient를 이용한 R2DBC 연동 방법을 소개한다.
의존 설정
스프링 데이터 R2DBC를 사용하기 위한 의존 설정은 다음과 같다.
<dependencyManagement>
<dependencies>
<dependency>
<groupId>io.r2dbc</groupId>
<artifactId>r2dbc-bom</artifactId>
<version>Arabba-SR2</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
<dependencies>
<dependency>
<groupId>dev.miku</groupId>
<artifactId>r2dbc-mysql</artifactId>
</dependency>
<dependency>
<groupId>io.r2dbc</groupId>
<artifactId>r2dbc-pool</artifactId>
</dependency>
<dependency>
<groupId>io.r2dbc</groupId>
<artifactId>r2dbc-spi</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.data</groupId>
<artifactId>spring-data-r2dbc</artifactId>
<version>1.0.0.RELEASE</version>
</dependency>
</dependencies>
r2dbc-bom의 Arabba-SR2 버전은 r2dbc 0.8.1 버전에 대응한다. 이 글을 쓰는 시점 기준으로 spring-data-r2dbc 최신 버전은 1.0.0이다.
DatabaseClient 생성
DatabaseClient는 ConnectionFactory를 이용해서 생성한다.
String url = "r2dbcs:mysql://user:user@localhost:3306/test";
ConnectionFactory connectionFactory = ConnectionFactories.get(url);
DatabaseClient client = DatabaseClient.create(connectionFactory);
스프링 빈으로 설정하고 싶다면 AbstractR2dbcConfiguration 클래스를 사용하면 된다.
@Configuration
public class ApplicationConfig extends AbstractR2dbcConfiguration {
@Override
@Bean
public ConnectionFactory connectionFactory() {
String url = "r2dbcs:mysql://user:user@localhost:3306/test";
ConnectionFactory connectionFactory = ConnectionFactories.get(url);
return connectionFactory;
}
}
이 설정을 사용하면 connectionFactory() 메서드로 정의한 ConnectionFactory 빈을 이용해서 DatabaseClient 객체를 생성하고 빈으로 등록한다. AbstractR2dbcConfiguration 클래스의 databaseClient() 메서드가 이 과정을 처리한다.
DatabaseClient로 쿼리 실행하기
DatabaseClient로 쿼리를 실행하는 방법에는 두 가지가 있다. 하나는 SQL을 사용하는 것이고 다른 하나는 매핑 객체를 사용하는 것이다. 이 글에서는 SQL을 사용하는 방법을 살펴보고 다음에 매핑 객체를 이용하는 방법을 살펴보도록 하자.
조회 쿼리
다음 코드는 DatabaseClient를 이용해서 조회 쿼리를 실행하는 예를 보여준다.
Flux<Map<String, Object>> selectPub =
client.execute("select id, name from member")
.fetch()
.all();
selectPub.subscribe(m -> {
logger.info("m.get(id) : {}", m.get("id"));
});
위 코드에서 각 메서드는 다음을 지정한다.
- execute : 실행할 SQL을 지정한다.
- fetch : 쿼리 실행 결과를 읽어온다는 것을 지정한다.
- all : 쿼리 실행 결과를 모두 읽어온다는 것을 지정한다.
'지정한다'고 설명했는데 실제 쿼리는 Flux를 구독하는 시점에 실행한다. 위 코드에서는 selectPub.subscribe() 코드를 실행하는 시점에 지정한 동작(execute로 쿼리 실행하고 fetch로 쿼리 결과를 가져오고 all로 모든 결과를 조회)을 수행한다.
쿼리 실행 결과는 Map에 담긴다. 한 개의 행이 한 개의 Map에 대응하며 칼럼 이름을 Map의 키로 사용한다. 칼럼 이름은 대소문자를 가리지 않는다. (실제 Map으로 spring-core 모듈에 포함된 LinkedCaseInsensitiveMap 구현을 사용한다.)
all() 대신 first()를 사용하면 전체 결과가 아닌 첫 번째 결과만 구한다. first() 메서드의 결과는 Mono이다.
Mono<Map<String, Object>> firstPub =
client.execute("select id, name from member where id = '124124'")
.fetch()
.first();
결과가 딱 한 개이거나 없다면 one()을 사용할 수도 있다. 단 결과가 두 개 이상이면 구독 시점에 익셉션이 발생한다.
Mono<Map<String, Object>> one =
client.execute("select id, name from member limit 1")
.fetch()
.one();
바인딩 파라미터
DatabaseClient는 이름과 인덱스의 두 가지 바인딩 파라미터를 지원한다. 다음은 이름 기반 바인딩 파라미터 사용 예이다. 쿼리에서 바인딩 파라미터 이름은 ":이름"의 형식을 갖는다. R2DBC의 Statement#bind() 메서드도 이름 기반 파라미터를 지원하지만 이때 바인딩 파라미터의 형식은 DBMS의 형식을 사용해야 하는 것과 다르다.
Flux<Map<String, Object>> all =
client.execute("select id, name from member where id like :id or name like :name")
.bind("id", "%jdbc%")
.bind("name", "%jdbc%")
.fetch()
.all();
bind() 메서드는 첫 번째 인자로 파라미터 이름을, 두 번째 인자로 값을 사용한다.
다음은 인덱스 기반 바인딩 파라미터 사용 예이다.
Flux<Map<String, Object>> all =
client.execute("select id, name from member where name like ? or id like ?")
.bind(0, "%r2dbc%")
.bind(1, "%r2dbc%")
.fetch()
.all();
인덱스는 0부터 시작한다.
수정 쿼리
다음은 수정 쿼리 지정 예이다.
Mono<Void> updPub = client.execute("update member set name = ? where id = ?")
.bind(0, "id2")
.bind(1, "bkchoi")
.then();
쿼리 실행 결과로 변경된 행 개수를 구하고 싶다면 fetch(), rowsUpdated() 메서드를 사용하면 된다.
Mono<Integer> updPub = client.execute("update member set name = :name where id = :id")
.bind("name", "id3")
.bind("id", "bkchoi")
.fetch()
.rowsUpdated();
여러 쿼리 실행
구독하지 않고 여러 쿼리를 실행하려면 flatMap() 등을 사용해서 DatabaseClient로 생성한 Mono나 Flux를 연결한다. 아래 코드는 select 쿼리 결과가 존재하면 update 쿼리를 실행하고 존재하지 않으면 insert 쿼리를 실행하는 스트림 생성 예이다.
Mono<Integer> saveOrUpdate = client.execute("select id, name from member where id = :id")
.bind("id", id)
.fetch()
.one()
.flatMap(m ->
client.execute("update member set name = :name where id = :id")
.bind("name", name)
.bind("id", id)
.fetch()
.rowsUpdated())
.switchIfEmpty(
client.execute("insert into member (id, name) values (:id, :name)")
.bind("name", name)
.bind("id", id)
.fetch()
.rowsUpdated()
);
saveOrUpdate.subscribe( ...생략 )
위 코드에서 주의할 점은 각 쿼리를 실행할 때마다 DB 커넥션을 새로 구한다는 점이다. 즉 select 쿼리를 실행할 때 DB 커넥션을 구하고 쿼리를 실행한 뒤에는 커넥션을 닫는다. 비슷하게 update나 insert 쿼리를 실행할 때에도 DB 커넥션을 구하고 종료한다.
트랜잭션
스프링 데이터 R2DBC는 리액티브를 위한 트랜잭션 관리자인 R2dbcTransactionManager를 제공한다. 이 클래스를 이용해서 트랜잭션을 제어할 수 있다. 다음은 R2dbcTransactionManager를 이용해서 트랜잭션 범위 안에서 쿼리를 실행하는 코드 작성 예이다.
R2dbcTransactionManager tm = new R2dbcTransactionManager(connectionFactory);
TransactionalOperator operator = TransactionalOperator.create(tm);
Mono<Integer> saveOrUpdate = client.execute("select id, name from member where id = :id")
.bind("id", id)
.fetch()
.one()
.flatMap(m ->
client.execute("update member set name = :name where id = :id")
.bind("name", name)
.bind("id", id)
.fetch()
.rowsUpdated())
.switchIfEmpty(
client.execute("insert into member (id, name) values (:id, :name)")
.bind("name", name)
.bind("id", id)
.fetch()
.rowsUpdated()
).as(operator::transactional);
다음은 R2dbcTransactionManager를 스프링 빈으로 등록한 예를 보여준다.
@Configuration
@EnableTransactionManagement
public class ApplicationConfig extends AbstractR2dbcConfiguration {
@Override
@Bean
public ConnectionFactory connectionFactory() {
String url = "r2dbcs:mysql://user:user@localhost:3306/test";
ConnectionFactory connectionFactory = ConnectionFactories.get(url);
return connectionFactory;
}
@Bean
ReactiveTransactionManager transactionManager(ConnectionFactory connectionFactory) {
return new R2dbcTransactionManager(connectionFactory);
}
}
@EnableTransactionManagement를 사용했는데 이 경우 @Transactional 애노테이션을 이용해서 트랜잭션을 처리할 수 있다. 예를 들어 다음 코드는 saveOrUpdate() 메서드에 @Transactional 애노테이션을 적용했는데 이 경우 saveOrUpdate() 메서드가 리턴한 Mono를 구독할 때 트랜잭션 범위에서 관련 쿼리를 실행한다.
@Service
public class MemberService {
private DatabaseClient client;
public MemberService(DatabaseClient client) {
this.client = client;
}
@Transactional
public Mono<Integer> saveOrUpdate(String id, String name) {
return client.execute("select id, name from member where id = :id")
.bind("id", id)
.fetch()
.one()
.flatMap(m ->
client.execute("update member set name = :name where id = :id")
.bind("name", name)
.bind("id", id)
.fetch()
.rowsUpdated())
.switchIfEmpty(
client.execute("insert into member (id, name) values (:id, :name)")
.bind("name", name)
.bind("id", id)
.fetch()
.rowsUpdated()
);
}
}