주요글: 도커 시작하기

kafka 아는 척하기1 영상: (카프카 기본 구조, 토픽/파티션, 성능, 리플리카 등, youtu.be/0Ssx7jJJADI)

kafka 아는 척하기 1

 

kafka 아는 척하기 2 영상: 프로듀서 편 (youtu.be/geMtm17ofPY)

 

 

MariaDB에서 grant usage ... with max_statement_time 명령어를 사용하면 사용자별로 최대 쿼리 실행 시간을 지정할 수 있다. 다음은 명령어 실행 예를 보여준다. 이때 시간은 초 단위다(MySQL도 동일한 유사한 기능이 있는데 시간 단위는 밀리초이다).

GRANT USAGE ON *.* TO batchuser@'%' WITH MAX_STATEMENT_TIME 60

사용자별로 지정한 최대 쿼리 실행 시간은 글로벌 시간에 우선한다.

CQRS(Command Query Responsibility Segregation) 아는 척하기1 영상: youtu.be/xf0kXMTFJm8

CQRS 아는 척하기 2 영상: youtu.be/H1IF3BUeFb8

 

요즘 MariaDB 바이너리 로그를 이용한 간단한 라이브러리를 만들고 있는데(mariadb-cdc 라이브러리) 다양한 버전의 MariaDB와 몇 가지 다른 설정으로 테스트할 필요가 있었다. 로컬에 설치한 MariaDB로는 다양한 테스트를 수행하는데 불편함이 있어 방법을 찾다가 Testcontainers가 있다는 것을 알게 되었다.

 

Testcontainers는 JUnit에서 테스트하는 동안 도커 컨테이너를 임시로 생성하고 제거해 주는 라이브러리다. Testcontainers를 사용하면 MariaDB, MongoDB, MySQL 등 다양한 DB에 대한 통합 테스트 환경을 쉽게 구성할 수 있다.

 

유튜브로 보기: youtu.be/eZbLAD2yUfE

의존 설정(JUnit5 기준)

JUnit에서 Testcontainers를 사용하려면 다음 의존 설정을 추가한다.

  • org.testcontainers:testcontainers:1.14.3
  • org.testcontainers:mariadb:1.14.3
  • org.testcontainers:junit-jupiter:1.14.3

그레이들 설정 예

dependencies {
    implementation("org.mariadb.jdbc:mariadb-java-client:2.7.0")
    implementation("ch.qos.logback:logback-classic:1.2.3")
    testImplementation(platform('org.junit:junit-bom:5.7.0'))
    testImplementation('org.junit.jupiter:junit-jupiter')
    testImplementation("org.testcontainers:testcontainers:1.14.3")
    testImplementation("org.testcontainers:mariadb:1.14.3")
    testImplementation("org.testcontainers:junit-jupiter:1.14.3")
}

test {
    useJUnitPlatform()
}

메이븐 설정 예

<dependencyManagement>
    <dependencies>
        <dependency>
            <groupId>org.junit</groupId>
            <artifactId>junit-bom</artifactId>
            <version>5.7.0</version>
            <type>pom</type>
            <scope>import</scope>
        </dependency>
    </dependencies>
</dependencyManagement>

<dependencies>
    <dependency>
        <groupId>org.mariadb.jdbc</groupId>
        <artifactId>mariadb-java-client</artifactId>
        <version>2.7.0</version>
    </dependency>
    <dependency>
        <groupId>ch.qos.logback</groupId>
        <artifactId>logback-classic</artifactId>
        <version>1.2.3</version>
    </dependency>
    <dependency>
        <groupId>org.junit.jupiter</groupId>
        <artifactId>junit-jupiter</artifactId>
        <scope>test</scope>
    </dependency>
    <dependency>
        <groupId>org.testcontainers</groupId>
        <artifactId>testcontainers</artifactId>
        <version>1.14.3</version>
        <scope>test</scope>
    </dependency>
    <dependency>
        <groupId>org.testcontainers</groupId>
        <artifactId>mariadb</artifactId>
        <version>1.14.3</version>
        <scope>test</scope>
    </dependency>
    <dependency>
        <groupId>org.testcontainers</groupId>
        <artifactId>junit-jupiter</artifactId>
        <version>1.14.3</version>
        <scope>test</scope>
    </dependency>
</dependencies>

JUnit에서 MariaDB 컨테이너 구동하기

의존을 추가했다면 간단하게 MariaDB 컨테이너를 구동하고 연동할 수 있다. 다음 세 가지만 하면 된다.

  • @Testcontainers 애노테이션 테스트 클래스에 적용
  • MariaDBContainer 타입 필드를 선언하고 객체 생성
  • MariaDBContainer 필드에 @Container 애노테이션 적용

아래 코드는 작성 예이다.

import org.testcontainers.containers.MariaDBContainer;
import org.testcontainers.junit.jupiter.Container;
import org.testcontainers.junit.jupiter.Testcontainers;

@Testcontainers
public class MariadbTest {
    Logger logger = LoggerFactory.getLogger(getClass());

    @Container
    MariaDBContainer mariaDB = new MariaDBContainer();

    @Test
    void connect() {
        logger.info("host: {}", mariaDB.getHost());
        logger.info("port: {}", mariaDB.getMappedPort(3306));
        logger.info("username: {}", mariaDB.getUsername());
        logger.info("password: {}", mariaDB.getPassword());
        logger.info("jdbc url: {}", mariaDB.getJdbcUrl());
        try (Connection conn = DriverManager.getConnection(
                    mariaDB.getJdbcUrl(), 
                    mariaDB.getUsername(), 
                    mariaDB.getPassword())
        ) {
            logger.info("got connection");
            // 코드
        } catch (SQLException ex) {
            ex.printStackTrace();
        } catch (InterruptedException e) {
        }
    }
}

위 테스트 코드를 실행하면 테스트 메서드를 실행하기 전에 MariaDB 이미지를 이용해서 컨테이너를 시작한다. 컨테이너가 시작되면 MariaDBContainer가 제공하는 다양한 메서드를 이용해서 DB 연결에 필요한 정보를 구할 수 있다. 실제 위 테스트 코드를 실행하면 다음 로그 메시지가 출력된다(예제에서 사용하는 로그 포맷은 깃헙 코드를 참고한다).

23:13:30.742 [Test worker] INFO  docker[mariadb:10.3.6] - Creating container for image: mariadb:10.3.6
23:13:31.060 [Test worker] INFO  docker[mariadb:10.3.6] - Starting container with ID: 79fb2e93168247a2d9643256aba2f8fad39445be855d96e7fecb92d8fba9593e
23:13:31.656 [Test worker] INFO  docker[mariadb:10.3.6] - Container mariadb:10.3.6 is starting: 79fb2e93168247a2d9643256aba2f8fad39445be855d96e7fecb92d8fba9593e
23:13:31.692 [Test worker] INFO  docker[mariadb:10.3.6] - Waiting for database connection to become available at jdbc:mariadb://localhost:32769/test using query 'SELECT 1'
23:13:44.089 [Test worker] INFO  docker[mariadb:10.3.6] - Container is started (JDBC URL: jdbc:mariadb://localhost:32769/test)
23:13:44.090 [Test worker] INFO  docker[mariadb:10.3.6] - Container mariadb:10.3.6 started in PT17.215S
23:13:44.102 [Test worker] INFO  mariadb.MariadbTest - host: localhost
23:13:44.102 [Test worker] INFO  mariadb.MariadbTest - port: 32769
23:13:44.102 [Test worker] INFO  mariadb.MariadbTest - username: test
23:13:44.102 [Test worker] INFO  mariadb.MariadbTest - password: test
23:13:44.102 [Test worker] INFO  mariadb.MariadbTest - jdbc url: jdbc:mariadb://localhost:32769/test
23:13:44.120 [Test worker] INFO  mariadb.MariadbTest - got connection

출력한 결과를 보면 Testcontainers 1.14.3 기준으로 MariaDB 버전이 10.3.6이고 DB 사용자와 암호가 test인 것을 알 수 있다. JDBC URL을 보면 사용할 DB 이름도 test이다. 포트 번호는 32769인데 이는 컨테이너의 3306 포트에 매핑된 로컬 포트가 32769인 것을 의미한다. 실제 컨테이너가 구동될 때마다 이 포트는 바뀐다.

 

테스트 코드를 실행하는 중간에 docker ps 명령어를 실행하면 mariadb:10.3.6 이미지를 사용한 컨테이너가 생성된 것을 확인할 수 있다.

C:\work\any\testcontainer-sample>docker ps
CONTAINER ID        IMAGE                               COMMAND                  CREATED             STATUS              PORTS                     NAMES
c45c731d82ae        mariadb:10.3.6                      "docker-entrypoint.s…"   2 seconds ago       Up 1 second         0.0.0.0:32771->3306/tcp   modest_shannon
92d9b2fd807d        testcontainersofficial/ryuk:0.3.0   "/app"                   4 seconds ago       Up 2 seconds        0.0.0.0:32770->8080/tcp   testcontainers-ryuk-c2d44a10-9754-4133-830e-53c6147a1b22

@Container 애노테이션이 붙은 필드가 static이 아니면 각 테스트 메서드를 실행할 때마다 도커 컨테이너를 시작하고 종료한다. 테스트 클래스에 있는 테스트 메서드가 동일한 도커 컨테이너를 사용하게 하고 싶다면 @Container 필드를 static으로 선언하면 된다.

커스텀 설정

MariaDB 버전을 쉽게 바꿀 수도 있다. 생성자에 사용할 이미지만 입력하면 된다. 이 외에 DB 사용자와 암호를 포함해 몇 가지 설정을 쉽게 변경할 수 있다. 다음은 변경 예이다.

@Testcontainers
public class MariadbInitTest {
    
    @Container
    JdbcDatabaseContainer mariaDB = new MariaDBContainer("mariadb:10.5") // 이미지
            .withConfigurationOverride("conf.d.105") // DB 서버 추가 설정
            .withUsername("myuser") // DB 사용자
            .withPassword("mypassword") // 암호
            .withDatabaseName("mydb") // 사용할 데이터베이스
            .withInitScript("init.sql"); // 초기 실행 쿼리

withConfigurationOverride()가 아닌 다른 메서드는 (MariaDBContainer 클래스의 상위 클래스인) JdbcDatabaseContainer 클래스에 정의되어 있다. 그래서 필드 타입을 JdbcDatabaseContainer 클래스로 선언했다.

 

withInitScript()는 클래스패스에서 실행할 쿼리를 찾는다. 예제 코드는 src/test/resources 폴더에 위치한 init.sql 파일을 실행한다.

 

withConfigurationOverride() 메서드는 MariaDB 설정 파일을 변경할 때 사용한다. 이 메서드는 클래스패스에 위치한 폴더명을 인자로 받는다. MariaDBContainer는 이 폴더의 파일을 컨테이너의 "/etc/mysql/conf.d" 폴더에 복사한다(withConfigurationOverride() 메서드를 설정하지 않으면 Testcontainers MariaDB 모듈이 제공하는 my.cnf 파일을 사용한다).

참고 자료

서로 다른 MariaDB 간에 데이터를 복제할 일이 자꾸 생겨서 MariaDB 바이너리 로그 기반 CDC(Change Data Capture)를 활용하기로 했다. 이미 mysql-binlog-connector-java라는 훌륭한 라이브러리가 존재하는데 이를 직접 사용하면 코드가 다소 복잡해져서 이 라이브러리를 기반으로 한 mariadb-cdc라는 라이브러리를 만들었다.

MariaDB 설정

MariaDB 바이너리 로그 활성화

mariadb-cdc는 바이너리 로그를 사용하므로 binlog_format을 row로 설정한다.

binlog_format = row  
binlog_row_image = full  

binlog_row_image 값이 FULL이면 변경 전/후 모든 칼럼 값을 기록한다.

CDC를 위한 사용자 생성

CDC를 위한 사용자를 생성한다. 이 사용자에 REPLICATION SLAVE, REPLICATION CLIENT, SELECT 권한을 부여한다.

CREATE USER cdc@'%' IDENTIFIED BY 'password'  
GRANT REPLICATION SLAVE, REPLICATION CLIENT, SELECT ON *.* TO cdc@'%'  

메이븐 리포지토리 설정

mariadb-cdc를 사용하려면 아래 리포지토리 설정과 의존 설정을 추가한다.

<repositories>
    <repository>
        <id>jitpack.io</id>
        <url>https://jitpack.io</url>
    </repository>
</repositories>

<dependencies>
    <dependency>
        <groupId>com.github.madvirus</groupId>
        <artifactId>mariadb-cdc</artifactId>
        <version>0.11.0</version>
    </dependency>
</dependencies>

그레이들을 사용한다면 아래 설정을 추가한다.

repositories {
    mavenCentral()
    maven { url 'https://jitpack.io' }
}

dependencies {
    implementation 'com.github.madvirus:mariadb-cdc:0.11.0'
}

MariadbCdc 사용하기

기본 사용법

단계 1, MariadbCdcConfig 생성

MariadbCdcConfig config = new MariadbCdcConfig(
                "localhost", // MariaDB 호스트
                3306, // 포트
                "cdc", // CDC를 위한 사용자
                "password", // 암호
                "bin.pos"); // 바이너리 로그 위치 추적용 파일 경로

"bin.pos" 파일이 없으면 현재 바이너로 로그 위치부터 읽기 시작한다.
"bin.pos" 파일에 바이너로 로그 위치가 기록되어 있으면 해당 위치부터 읽기 시작한다.
MariadbCdc는 바이너로 로그를 읽으면 다음 위치를 "bin.pos" 파일에 기록한다.

단계 2, MariadbCdc 생성

JdbcColumnNamesGetter columnNamesGetter = new JdbcColumnNamesGetter(
            "localhost", // host
            3307, // port
            "cdc", // cdc user
            "password"); // password 

MariadbCdc cdc = new MariadbCdc(config, columnNamesGetter);

MariadbCdc는 테이블의 칼럼 이름을 구하기 위해 ColumnNamesGetter를 사용한다.

기본 제공하는 JdbcColumnNamesGetter는 다음 쿼리를 이용해서 칼럼 이름을 구한다:

select COLUMN_NAME, ORDINAL_POSITION, COLUMN_DEFAULT, IS_NULLABLE, DATA_TYPE 
from INFORMATION_SCHEMA.COLUMNS 
WHERE table_schema = '?' and TABLE_NAME = '?' 
order by ORDINAL_POSITION

단계 3, MariadbCdcListener 설정

cdc.setMariadbCdcListener(new MariadbCdcListener() {
    @Override
    public void started(BinlogPosition binlogPosition) {
        // CDC가 시작되면 호출됨
    }

    @Override
    public void startFailed(Exception e) {
        // CDC 시작에 실패하면 호출됨
    }

    @Override
    public void onDataChanged(List<RowChangedData> list) {
        // 변경된 데이터 목록(행 목록)을 받음
        list.forEach(data -> { // 변경된 각 행에 대해
            String database = data.getDatabase(); // 변경된 행의 DB 이름
            String table = data.getTable(); // 변경된 행의 테이블 이름
            DataRow dataRow = data.getDataRow(); // 변경된 데이터 행(변경 후 이미지)
            if (data.getType() == ChangeType.INSERT) { // 추가된 데이터면
                Long id = dataRow.getLong("id"); // id 칼럼 값을 Long으로 구한다.
                // ...
            } else if (data.getType() == ChangeType.UPDATE) { // 변경된 데이터면
                String name = dataRow.getString("name"); // 변경된 name 칼럼 값을 구한다.
                DataRow dataRowBeforeUpdate = data.getDataRowBeforeUpdate(); // 변경전 데이터 행(변경 전 이미지)
                String nameBeforeUpdate = dataRowBeforeUpdate.getString("name"); // 변경전 name 칼럼 값을 구한다.
                // ...
            } else if (data.getType() == ChangeType.DELETE) {
                String email = dataRow.getString("email"); // 삭제된 행의 email 칼럼 값을 구한다.
                // ...
            }
        });
    }

    @Override
    public void onXid(Long xid) {
        // 트랜잭션 커밋 로그
    }

    @Override
    public void stopped() {
        // CDC를 중지하면 호출됨
    }
});

단계 4, MariadbCdc 시작/중지

cdc.start(); // 별도 쓰레드로 바이너리 로그 조회 시작

...

cdc.stop(); // 조회 중지

MariadbCdc가 시작되면 setMariadbCdcListener()로 설정한 listener에 읽은 바이너리 로그 데이터를 전달한다.

 

MariadbCdc#start()는 별도 쓰레드로 바이너리 로그 조회를 시작한다.

특정 테이블만 포함하기/제외하기

MariadbCdcListener#onDataChanged()는 기본적으로 모든 변경에 대해 불린다. 만약 특정 테이블만 포함하거나 제외하고 싶다면 필터를 사용하면 된다.

// test.user 테이블 변경에 대해서만 onDataChanged() 호출
config.setIncludeFilters("test.user");
// test.member 테이블 변경은 onDataChanged() 호출 제외
config.setIncludeFilters("test.member");

수정한 칼럼만 포함하기

binlog_row_image가 full 이면 UPDATE/DELETE 시에 모든 칼럼 값을 포함한다. 변경된 칼럼 값만 포함하고 싶다면 binlog_row_image를 minimal로 설정한다.

binlog_format = row
binlog_row_image = minimal

MINIMAL은 수정 전 이미지에는 PK 칼럼만 기록하고(PK 칼럼이 없으면 전체 칼럼) 수정 후 이미지에는 변경된 칼럼만 기록한다.

binlog_row_image 가 minimal일 때 아래 쿼리를 실행하면

update member set name = 'newname' where id = 10

RowChangedData#getDataRowBeforeUpdate()는 PK 칼럼만 포함한 DataRow를 리턴하고
RowChangedData#getDataRow()는 변경된 칼럼만 포함한 DataRow를 리턴한다.

@Override
public void onDataChanged(List<RowChangedData> list) {
    // handle changed data
    list.forEach(data -> { // each
        String database = data.getDatabase(); // test
        String table = data.getTable(); // member
        DataRow afterDataRow = data.getDataRow(); // after image
        if (data.getType() == ChangeType.UPDATE) {
            DataRow beforeDataRow = data.getDataRowBeforeUpdate(); // before image
            Long id = beforeDataRow.getLong("id"); // before image includes only pk fields
            String name = afterDataRow.getString("name"); // after image includes only updated fields
            // ...
        }
    });
}

MariaDB 10.5: binlog_row_metadata = full

MariaDB 10.5는 binlog_row_metadata 변수를 지원한다.
binlog_row_metadata 값이 FULL이면 칼럼 이름을 포함한 모든 메타데이터가 바이너리 로그에 기록된다.
그래서 binlog_row_metadata가 FULL이면 ColumnNamesGetter가 필요 없다.

참고자료

DB를 사용하는 기능에 대한 통합 테스트를 진행하려면 테스트할 상황에 맞게 데이터를 구성해야 한다. 예를 들어 주문의 상태를 변경하는 기능을 통합 테스트한다고 하자. 이 경우 주문이 DB에 존재해야 상태가 올바르게 바뀌는지 확인할 수 있다. 즉 테스트 대상 기능을 실행하기 전에 DB에 테스트할 상황에 맞는 데이터를 INSERT 해야 한다. 이를 위해 다음과 같은 보조 클래스를 만들어 사용하면 테스트 코드 작성이 조금 편리해진다.

public class OrderGivenHelper {
    private JdbcTemplate jdbcTemplate;
    
    public OrderGivenHelper(DataSource dataSource) {
    	this.jdbcTemplate = new JdbcTemplate(dataSource);
    }
    
    public void givenOrder(Order order) {
        List<String> cols = new ArrayList<>();
        List<Object> values = new ArrayList<>();
        if (order.getId() != null) {
            cols.add("order_id");
            values.add(order.getId());
        }
        if (order.getTotalAmount() != null) {
            cols.add("total_amount");
            values.add(order.getTotalAmount());
        }
        if (order.getStatus() != null) {
            cols.add("status");
            values.add(order.getStatus());
        }

        ...생략
        String insCols = cols.stream().collect(Collectors.joining(",", "(", ")"));
        String bindParams = cols.stream().map(x -> "?").collect(Collectors.joining(",", "(", ")"));
        jdbcTemplate.update("insert into PURCHASE_ORDER " + insCols + " values " + bindParams,
                values.toArray());
    }
    
    public void clearOrder() {
    	jdbcTemplate.update("delete from PURCHASE_ORDER");
    }
}

위 헬퍼를 사용하면 giveOrder() 메서드에 전달할 Order 객체를 생성할 때 INSERT 쿼리를 실행할 때 NOT NULL인 필수 값만 지정하면 된다.

 

통합 테스트 코드는 위 코드를 사용해서 상황에 맞는 테스트를 실행한다.

@SpringBootTest
public class OrderTest {
    @Autowired
    private OrderCancelService cancelService;
    @Autowired
    private OrderRepository repo;
    private OrderGivenHelper givenHelper;
    private DataSource dataSource = ...생략;

    @BeforeEach
    public void setUp() {
        givenHelper = new OrderGivenHelper(dataSource);
    }
    
    public void cancelOrder() {
        givenHelper.clearOrder();
        givenHelper.givenOrder(Order.builder().id(10L).status(PAID).build());
        
        cancelService.cancelOrder(10L);
        
        Order order = repo.findById(10L);
        assertThat(order.getStatus()).isEqualTo(CANCEL);
    }
}

 

  1. 2020.09.12 22:06

    비밀댓글입니다

[3부]에 이어서

 

4월 16일 목요일. 미디어는 온통 415 총선 얘기다. 충격적인 결과 때문인지 다양한 기사가 쏟아져 나온다. 온라인 개학도 본격적으로 시작했다. 학생들이 온라인 수업에 참여해서 그런지 오전 트래픽이 줄었다. 별일 없이 하루가 지나가고 있다.

오후 4시 40분. 평온함을 깨는 전화가 온다. 발신자를 보니 감이 온다. M사에 문제가 생겼나 보다. 아니나 다를까, M사 서비스에 장애가 났으니 내일 가서 지원하라는 지시가 내려왔다. 학교 선생님과 학생이 온라인 수업 보조 도구로 M사 서비스를 사용하는데 트래픽이 몰리면서 M사 서비스가 다운된 것이다. 온라인 개학이 시작되면서 EBS를 비롯해 여러 서비스가 먹통 됐는데 M사 서비스도 그중 하나였던 것이다.

일시 정지

4월 17일 금요일. 바로 M사로 출근했다. A클라우드의 관리형 DB 성능이 저하되면서 장애를 일으켰다고 한다. 성능 문제를 일으킨 건 게시글 내용과 관련이 있다. 내용 칼럼은 MySQL MEDIUM TEXT 타입인데 어떤 이유인지 내용 칼럼을 조회하면 급격히 응답 속도가 느려진다. 기능 특성상 첫 화면부터 게시글 목록에 내용을 보여주기 때문에 게시글 내용 성능 저하가 바로 서비스 먹통으로 연결된 것이다.

M사 인프라 조직은 이대로는 서비스를 할 수 없다고 판단하고 클라우드 DB를 빼기로 결정했다. 대신 비슷한 성능의 VM을 다수 생성해서 직접 마스터/슬레이브 DB를 구성한다고 한다. DB 변경 일정은 오후 7시부터 시작해서 4월 20일 새벽까지로 예정되어 있다.

 

이번 작업의 핵심은 마스터/슬레이브를 구성해서 처리 용량을 높이는 것이므로 이를 위한 코드 작업을 먼저 진행했다. 스프링을 사용하고 있지만 @Transactional을 사용하기 어려운 구조다. 그래서 필터를 선택했다. URL 경로에 따라 마스터나 슬레이브를 선택하도록 필터를 구현했다. 세 개 정도 경로를 넣고 확인해보니 잘 동작한다. 다음 작업으로 넘어가야겠다.

 
이어서 내용 칼럼만 조회하는 코드를 분리하기 시작했다. 내용만 캐시를 적용할 수 있는 구조로 만들기 위함이다. 앞서 게시글 조회 API를 개선해 놓지 않았으면 할 수 없는 작업이었다.

 

오후 7시가 되었다. 앱에는 작업 공지가 올라왔다. J 개발자와 난 하던 작업을 멈추고 J 개발자는 모든 서버를 내렸다. DB 사용이 없어지자 인프라 조직에서 클라우드 DB 데이터 덤프를 시작한다. 우리는 하던 작업을 이어서 진행했다. 내용 조회 코드 분리를 마치니 9시가 넘어간다. 나머지는 내일 진행해야겠다.

맑은 하늘

4월 18일 토요일. 판교에 도착하니 9시다. 스타벅스에 들려 커피를 마시고 있는데 J 개발자가 들어온다. J 개발자가 커피를 들고 자리에 오더니 불안한 표정으로 아직 DB 구성 작업이 끝나지 않았다고 말한다. 당연히 끝나 있을 거라 생각했는데 실패했다는 것이다. 이유가 궁금했다.

 

J 개발자와 사무실로 들어갔다. 얼마 지나지 않아 인프라 조직장이 왔다. 얘기를 들어 보니 관리형 DB에서 덤프 받는 속도가 매우 느렸다고 한다. 시간당 수십 메가 수준이었다고 한다. 어쩌면 게시글 내용 조회 속도가 느렸던 것도 관리형 DB의 IO 성능 문제가 아니었을까 하는 의심이 들기 시작한다.

 

DB 문제를 해결하는 동안 J 개발자와 난 게시글 내용을 캐시에 담는 코드를 구현했다. 게시글 쓰기, 수정, 조회 기능에 레디스 연동 코드를 추가했다. 얼추 잘 된다. 이어서 추천 게시글 목록에도 캐시를 적용했다. 추천 게시글 목록은 양도 크지 않고 변경도 없어 카페인 캐시를 사용했다. 불필요한 카운트 쿼리 때문에 간헐적으로 응답이 느린 코드도 손을 봤다. 게시글 조회 트래픽이 전체 서버 쓰레드를 점유하는 것을 막기 위해 동시 처리 개수를 제한하는 코드도 추가했다.

 

어느덧 5시. 저녁을 먹는 동안에도 인프라 조직은 DB 관련 통화를 하느라 바쁘다. DB 덤프 속도가 오후부터 빨라져서 곧 DB 구성을 할 수 있을 것 같다고 한다. 다행이다.

 

저녁을 먹고 옥상에 올라갔다. 3월엔 어둑어둑했는데 지금은 어둡지 않다. 하늘이 참 맑다. 토요일이 그렇게 지나갔다.

 

4월 19일 일요일. DB 구성이 잘 되었다고 한다. M사 직원들은 오전부터 운영 환경 테스트를 진행하고 있다. 난 이제 할 일이 없다. 부하 테스트를 진행해서 검증하고 싶지만 지금은 진행할 수 있는 사람이 없다. 불안하다. 그저 내일 트래픽을 잘 견뎠으면 하는 마음뿐이다.

태풍

4월 20일 월요일. 잠이 잘 안 와 일찍 깼다. 판교에 도착하니 7시 30분이다. 과연 잘 될까?

8시 15분. 서버가 출렁인다. 스카우터를 보니 레디스 연동에서 시간이 오래 걸린다. 젠장! 이것 때문에 게시글 목록 응답 시간이 느리다. 응답이 느려지자 동시 처리 개수 제한에 걸려 503 응답을 주기 시작했다. 아침부터 긴박하다.

 

잠시 후 J 개발자가 출근해서 급하게 레디스 연동을 제거하고 다시 배포했다. 레디스 설정을 쳐다볼 때가 아니다. 일단 살고 봐야 한다. 게시글 목록 조회는 돌아오기 시작했지만 여전히 느린 응답이 있다. 로그인하자마자 불리는 API에 풀스캔을 타는 쿼리가 있다. 뭐지? 쿼리를 보니까 조건이 빠져있다. 왜?

 

한참 만에 원인을 찾았다. 리플리케이션이 밀린 것이다. 그 망할 모듈 때문에 문제가 심해졌다. 리플리케이션이 밀리면 데이터가 안 나오면 그만인데 이 모듈은 where 절에서 조건이 빠진다. 그러면서 풀스캔이 발생한 것이다. 아 씨땡이다. 하지만 이 족보 없는 모듈을 붙이고 간 사람을 욕하고 있을 시간이 없다. 트래픽 태풍을 어떻게든 방어해야 한다. 풀스캔이 발생하는 기능을 찾아서 방어 코드를 넣었다. where 절에 조건이 빠지는 상황이면 아무 값이나 설정해서 조건이 빠지지 않도록 했다. 배포하니 서버가 나아진다.

 

여전히 느린 API가 있다. J 개발자에게 물어보니 아직 오픈하지 않은 기능이란다. 앱과 서버에 선반영 했는데 화면에는 노출되는 곳이 없단다. 이 API는 서버에서 다시 P 서비스를 호출하는데 이 P 서비스가 응답을 늦게 주는 것이다. P 서비스는 타 팀 서비스라 손을 댈 수 없다. 고민하다 일단 P 서비스에 대한 연동을 끊기로 했다. 앱에서 API를 호출하면 빈 응답을 주도록 코드를 바꾸고 배포했다. 드디어 느린 응답이 사라졌다!

 

벌써 오후다. 이번에는 업로드가 말썽이다. 사용자가 몰린 것이다. 그래도 전반적인 서비스는 안정적이다. 업로드만 조치하면 된다. 업로드 서버를 2대에서 4대로 늘려서 급한 불을 껐다.

 

안정

4월 21일 화요일. 8시 40분, 9시, 10시, 11시.... 오후 5시. 평온하다. 간헐적인 오류도 있긴 했지만 비교적 하루가 무사히 지나갔다. 고생한 J 개발자에게 수고했다고 슬랙 메시지를 보냈다. 모든 게 평화롭다. 긴장이 풀리면서 피로가 밀려온다.

 

다시 에러

4월 22일 수요일. 오전 10시. 갑자기 응답 시간이 느려진다. 9시도 잘 넘겼는데 10시에 왜? 원인을 모르겠다. J 개발자는 점진적으로 서버를 재시작했다. 문제가 사라졌다. 뭐지?

4월 23일 목요일. 오전 10시. 또 응답 시간이 느려진다. 원인을 모르겠다. 급하니까 일단 재시작이다. 잠시 후 신기하게 멀쩡해졌다. 어제와 증상이 동일하다. 증상만 보면 해당 시점에 마스터 DB로 연결을 구하지 못해 에러가 발생했다. J 개발자와 통화를 해서 몇 가지 조치를 취했다. 슬레이브를 사용하는 URL을 더 늘렸다. 설마 이 정도까지 했는데 마스터 연결이 안 될까?

 

4월 24일 금요일. 오전 10시가 다가온다. 젠장! 또 같은 증상이다. 평균 응답 시간, TPS를 계산했을 때 이 정도로 무너지면 안 된다. 게다가 피크 시간은 8시 30분에서 9시 사이다. 9시를 잘 넘겨 놓고 10시에 왜 이 모양이냐? 원인을 모르겠다. 이번에도 급하게 재시작이다.

10시 15분. 메시지가 하나 왔다. 차마 남들에겐 언급할 수 없는 이유로 10시부터 약 10~15분 정도 마스터 DB로의 연결이 잘 안 되었다고 한다. 아... J 개발자와 난 상상조차 하지 못했다.

 

이젠 괜찮겠지 했는데 오후에 메신저가 시끄럽다. 파일 업로드가 안 된다. CDN이 업로드 트래픽을 견디기 힘들어 M사 서비스에 대한 업로드를 차단했단다. 정말 다양한 일이 벌어지는구나! 속도를 늦추는 것도 아니고 업로드 일시 차단이라니. M사는 재발을 막기 위해 CDN 관련 작업을 4월 25일에 진행했다.

 

4월 27일 월요일 오후 6시. 오늘 하루가 고요하다. 이제 끝인가 보다. 마침 J 개발자가 다른 일로 서울 사무실에 왔다. 저녁 시간도 다 됐고 겸사겸사 술 한잔 기울이며 서로를 다독인다.

  1. MSX2+ 2020.04.29 16:08

    흥미진진하게 잘 읽었습니다! 남들에게 언급할 수 없는 이유가 너무 궁금하네요.

  2. 애송이 2020.05.13 18:08

    재밌게 잘 읽었습니다.

  3. msa 2020.06.03 23:48

    차마 남들에겐 언급할 수 없는 이유때문에 현기증 납니다...

  4. 2020.06.16 22:49

    비밀댓글입니다

  5. 루키로키 2020.06.23 15:36 신고

    너무 재밌게 잘 읽었습니다 ! 긴박함이 생생하게 느껴져서 더 몰입하면서 읽은 것 같네요.

  6. 2020.07.11 10:40

    비밀댓글입니다

[2부]에 이어

 

앱에는 아직 새로 만든 API가 반영되지 않고 있다. 웹도 극히 일부만 적용했다. 서버와 클라이언트 모두 성능 개선과 기능 개발을 병행하고 있다. 숨이 꼴딱 꼴딱 넘어가기 직전인데 기능 개발을 함께 하고 있다. 이러고 있을 처지가 아닌데 여유를 부리고 있다. 난 외부 인력이어서 여기에 대해 뭐라 말할 처지가 못 된다.

3월 20일. 결국 장애가 났다. 데이터가 늘어나면서 쿼리 시간이 조금씩 증가하더니 어느 순간에 급격히 무너졌다. 원인은 이번에도 풀스캔. 데이터가 적을 때는 느리지 않아 발견하지 못하고 놓친 쿼리가 문제를 일으켰다. 슬슬 느려지는 추이를 보이고 있었는데 기능 개발하느라 놓친 것이다. 3월 25일까지 비슷한 이유로 몇 차례 장애가 더 발생했다. 그때마다 M사는 정신없는 상황이 반복되었고 J 개발자는 하던 일을 멈추고 장애 처리에 시간을 보내야 했다.

 

드디어 배포

3월 26일. 드디어 수정한 API를 사용한 안드로이드 앱을 배포했다. 안을 제안한지 20일 만이다. 여전히 기존 API를 사용하는 곳이 많지만 전반적인 응답 시간은 확연히 줄었다. 다행이다.  게시글 목록 조회/상세 API를 앱의 일부 화면에 적용했을 뿐인데도 효과가 있다. 기세를 몰아서 아직 개선 못한 부분을 진행하면 좋을 것 같다.

 

J 개발자에게 연락이 왔다. 이제 기능 개선을 안 하고 기능 개발에 집중한단다. 아직 해야 할게 많이 남았는데 성능 개선을 멈추는 것이다. 성능 개선을 더 하길 원했지만 결정권자가 기능 개발을 지시했다고 한다. 결국 J 개발자는 더 이상 성능 개선을 못하고 기능 개발에 집중하기 시작했다. 안타깝다. 당하고 또 당했는데 이렇게 결정하다니. 아직 더 많이 개선해야 하는데 이 수준에 멈추다니. 걱정이다.

 

고요

4월 초. 앱을 배포하고 1주가 지났다. 아직까지 서버 상태는 고요하다. 현재 수준의 트래픽은 무난히 처리하고 있다.

 

뉴스는 온라인 개학 얘기로 시끄럽다. 코로나 여파로 개학을 미뤄왔는데 더 이상 미룰 수가 없다고 판단을 한 것 같다. 학생이 모이지 않게 온라인 수업을 검토하고 있다는 언론 보도도 나오기 시작했다.

 

M사는 추가 기능 개발에 한참이다. 나도 지원하느라 제때 처리하지 못한 일에 집중하고 있다. 서버 성능은 더 이상 관심 대상이 아니다. 그렇게 아무도 신경 쓰고 있지 않을 때 온라인 개학이 눈 앞으로 다가왔다.

 

[4부]에서 계속 ......

[1부]에 이어

 

스카우터에 찍힌 쿼리와 코드를 계속해서 탐색했다. 몇 가지 개선해야 할 것이 눈에 보이기 시작했다. 코드가 변경이 쉬운 구조는 아니지만 J 개발자에게 코드의 구조와 동작 방식을 들으면서 변경 방법을 조금씩 찾기 시작했다.

 

이 와중에

J 개발자와 코드를 이리저리 보고 있는데 다급한 표정으로 누군가 다가온다. 푸시 발송에 문제가 있단다. 개선안 찾던 걸 멈추고 푸시 문제로 넘어갔다. 스카우터를 보니 푸시 서버 관련 XLog가 제일 위에 붙어 있다.

일단 푸시를 내렸다. 코드를 봤더니 푸시 목록을 DB에서 조회하고 푸시를 발송하고 발송 상태로 변경하는 작업이 한 트랜잭션으로 묶여 있다. 이를 주기적으로 실행해서 푸시를 발송하고 있는데 발송 대상이 늘어나면서 실행 주기 안에 푸시 발송을 끝내지 못했고 이로 인해 이슈가 발생하기 시작했다. 아직 푸시 처리를 하고 있는데 다시 푸시 처리를 요청하면서 락이 발생했고 그러면서 전체 푸시 발송이 먹통이 되었다.

당장 구조를 변경할 수 없어 중복 실행을 방지하는 쪽으로 방향을 잡았다. 푸시 발송 상태 필드를 추가하고 아직 발송 중이면 실행 요청을 무시하는 코드를 넣었다. 수정한 코드를 배포하고 푸시를 재개했다. 다행히 문제가 재발하지 않는다.

급한 불을 끄고 J 개발자와 옥상에 올라갔다. J 개발자는 담배 한 모금과 긴 한 숨을 내 쉬며 마음을 가라 앉힌다. 판교 하늘이 어두워지기 시작했다.

 

다시 안 도출

저녁을 먹고 자리로 돌아와 다시 안을 도출하기 시작했다. 한 번에 모두 바꿀 수 없으니 조회 성능을 높이는데 초점을 맞췄다. 크게 4개 정도 안을 도출했다.

  • 자주 불리는 기능에 대해 조회 전용 모델 개발. 현재는 하나의 모델을 그것도 복잡하게 연관을 맺은 모델을 명령과 조회에 함께 사용하고 있어 조회 시 불필요하게 많은 쿼리를 실행하고 이로 인해 응답 시간이 길어지고 있다.
  • 집계용 테이블 추가. 게시물의 댓글 개수를 댓글 테이블에 count 쿼리를 날려서 구하고 있다. 좋아요 수도 동일한 방식으로 구하고 있다. 이 또한 응답 시간을 증가시키고 있다.
  • 조회 기능 일부에 캐시 적용. 조회 전용 모델을 만들어내는데 성공하면 일부 기능에 캐시를 적용할 수 있게 된다. 아직은 바로 할 수 없다.
  • 채팅 용도 서버 분리. 지금은 API 서버가 모든 요청을 처리하고 있다. 채팅 트래픽이 많지는 않지만 응답 시간이 증가해 톰캣 쓰레드를 잡고 있으면 다른 기능도 영향을 받게 된다. 당장 DB 분리는 못 하더라도 채팅 전용 서버라도 따로 분리해 영향을 줄이고 싶다.

안을 정리하고 나니 벌써 금요일 저녁 10시다. 집에 가야겠다.

 

구현 시작

3월 9일. 주말을 쉬고 돌아온 월요일. M사에 해당 서비스를 책임지는 서버 개발자는 J 혼자다. M사의 다른 조직에도 서버 개발자가 있긴 한데 적극적으로 돕지는 않고 있다. 이대로 가만있으면 도출한 안을 실행하지 못할 것 같다. 일부라도 실행하려면 M사에 가서 밀어 붙여야 한다. 점심을 먹고 M사로 이동했다.

 

M사에 도착했다. 여전히 다들 정신이 없다. 한가지 다행이라면 J 개발자가 채팅 용도 서버는 분리했다는 것이다. 하나를 했으니 다른 하나를 진행하면 된다. 오늘의 목표는 게시글 조회 전용 모델을 만드는 것이다. 이걸 만들지 못하면 다음 단계로 넘어갈 수 없다.

 

기존 게시글 모델을 복사해서 새 모델을 만들었다. 조회에 불필요한 사용자 엔티티, 첨부 파일 엔티티 등 대부분 연관을 삭제했다. 첨부파일의 썸네일 이미지 경로는 코드에서 조합했다. 새 모델을 위한 조회 API는 별도 클래스로 직접 구현했다. 나중에 일부라도 캐시를 적용하고 싶어서다.

 

현재 게시글 목록 조회 API는 페이지 단위로 조회하고 있다. 여기에 like 검색도 섞여 있다. 페이지 개수를 구하기 위해 count 쿼리도 날린다. 이대로는 성능 개선이 어렵다. 마침 등록 시간에 인덱스가 걸려 있는 걸 확인했다. 다행이다. 이걸 쓰면 되겠다. 조회 전용 게시글 모델 목록 API는 등록 시간 기준 from - to 조건과 조회 개수 제한 limit을 사용해서 목록을 조회하게 구현했다. 이를 이용하면 일정 시간 내에 목록을 제공할 수 있을 것 같다.

 

새 게시글 목록 API를 다 만들고 개발에서 기능을 확인했다. 끝나고 나니 저녁 10시가 되었다. 월요일부터 힘들다. 못하겠다. 퇴근이다.

 

서버 다운

3월 10일 화요일. 오전부터 응답 시간이 폭증한다. 일부 서버가 풀GC를 미친듯이 하고 있다. 특정 게시글을 조회할 때 발생한 것으로 보인다. 거미줄처럼 엮어 있는 연관 관계 때문에 조회 결과에 연관된 엔티티 개수가 많은 게시글이 포함되어 응답 시간이 느려진 것이다. 0.01초 걸리는 쿼리를 1000번 돌리면 10초가 걸리는 것과 같은 효과다. 응답이 빨리 안 오자 새로고침을 계속하고 그러면서 메모리 사용이 증가했고 이것이 풀GC를 유발했으며 그러다 어느 순간 죽은 거다.

 

당장 살아야 하니 급한대로 톰캣이 사용하는 메모리를 늘려서 재시작하자고 했다. 조회 전용 모델을 빨리 운영에 반영해야 한다. 안 그러면 또 같은 일이 벌어질 것이다.

 

작은 실랑이

3월 11일. 수요일. 앱 개발자/앱 기획자와 만났다. API 변경에 대해 얘기하기 위함이다. 서버를 M사에서 진행했다면 앱은 I사에서 진행했다. I사는 내가 다니는 회사다. 그렇다. 같은 회사 직원을 만나 회의를 진행했다.

앱 쪽과 먼저 협의하지 않고 서버 API 변경을 진행하는 것이 서운했는지 반응이 좋지 않다. 당장 시작해도 늦었는데 이런 반응이라니. 순간 목소리가 높아졌다. 몇 차례 언성을 높였다가 정신을 차리고 다시 논의를 했다. 사실 앱 개발자도 처음부터 서버 API가 많이 이상해서 바꿔달라고 요구했지만 그 프리랜서 개발자가 고집을 피우며 바꾸지 않았다고 한다. 아...... 마음을 가라 앉히고 다시 API 변경에 대해 논의했다. 안을 정리했고 회의 말미에는 언성을 높인 것도 사과했다.

 

J 개발자에게 회의 내용을 공유하고 조회 전용 모델을 더 추가해 달라고 말했다.

일주일

3월 12일. 목요일이다. 처음 연락을 받은 뒤로 일주일이 지났다. 그 사이에 J 개발자가 인덱스 몇 개를 더 추가했다. 그 덕에 당장은 버티고 있다. 하지만 이걸로는 안 된다. 트래픽이 조금만 증가해도 무너질 것이다. 마음이 급하다. 오후에 다시 M사로 이동했다.

 

오늘은 게시글 관련 집계 정보를 별도 테이블로 구현하는 것이 목표다. 댓글 수, 좋아요 수를 담을 테이블을 구성하고 개발을 시작했다. 댓글 생성 기능에 이벤트 발생 코드를 추가했다. 이 이벤트를 수신할 핸들러를 만들고 핸들러에서 댓글 수 증가 기능을 호출하게 구현했다. 핸들러는 비동기로 실행했다. 카운트 증가 감소는 JdbcTrmplate을 이용해서 쿼리로 구현했다. JPA를 써야 할 이유가 없다. 게시글 조회수 증가도 변경했다. 기존에는 게시글 엔티티의 readCount 값을 1 증가하는 방식이었는데 이를 댓글 개수와 동일하게 이벤트+비동기 핸들러+쿼리로 바꿨다.

새로 만든 게시글 목록/상세 API에 카운트 테이블 연동을 추가했다. 목표를 달성했다. 끝내고 나니 10시가 넘었다. 퇴근이다.

 

초조

3월 13일. 금요일. J 개발자와 통화를 했다. 게시글 관련 집계 정보를 별도 테이블이 아니라 게시글 테이블에 칼럼을 추가한 방식으로 구현을 바꿨단다. 음... 분리한 이유가 있는데. 지금은 그 이유를 설명하고 있을 여유가 없다. 일단 하나를 했으니 다음을 진행해야 한다.

 

이후로 몇 일이 지났다. J 개발자의 노력으로 몇 개 기능에 대해 조회 전용 모델이 완성됐다. 하지만 웹과 앱에는 아직 반영되지 않고 있다. 초조하다. 빨리 적용해야 하는데...

[3부]에서 계속 ......

2020년 4월 21일 화요일 오전 8시 20분. 어제 이 시간에는 이미 서버 응답 시간이 증가하기 시작했다. 간헐적으로 됐다 안 됐다를 반복하다 8시 40분 정도부터 서비스를 사용하기 힘든 수준으로 서버 상태가 나빠졌다. 온라인 개학이 시작한 4월 17일부터 여러 온라인 서비스가 먹통 증상을 보이고 있다지만 오늘도 서비스에 문제가 발생하면 안 된다. 고객 이탈이 심해질 것이다.

 

8시 40분, 9시, 10시, 11시 .... 오후 5시. 평온하다. 간헐적인 오류도 있긴 했지만 비교적 하루가 무사히 지나갔다. 고생한 J 개발자에게 수고했다고 슬랙 메시지를 보냈다. 긴장이 풀리면서 피로가 밀려온다.

시작

2020년 3월 5일 오후. 관계사인 M사의 서비스에 성능 문제가 있으니 같이 보라는 지시가 내려왔다. JPA를 이용해서 서버를 개발했는데 내가 JPA 경험이 있다는 이유로 나한테까지 연락이 왔다. 대충 들어보니 사용자가 증가하면서 서비스가 먹통이 되었다고 한다.

 

M사 서버 개발자 J에게 연락해서 VPN, 스카우터, DB 정보, 소스 리포지토리 주소를 받았다. 스카우터로 확인해보니 XLog의 많은 점이 폭포처럼 찍혀 있다. 나이아가라 폭포 같다. XLog에서 느린 쿼리를 몇 개 찾아 실행 계획을 확인했다. 풀스캔이다. 이런 쿼리 몇 개를 찾아 인덱스 추가를 요청했다.

 

다음날 점심을 먹고 있는데 전화가 왔다. M사에 직접 가서 지원하라는 지시였다. 식사를 마치고 바로 M사로 이동했다. 사무실에 도착해서 현 상태를 들었다. 클라우드 DB의 CPU와 IO 성능을 높였는데 일시적인 효과는 있었지만 여전히 응답 시간이 느린 상태라고 한다. 상황이 나빠서인지 서버 개발자 근처에 이사람 저사람 우루루 몰려와 각자 떠들고 있다. 개중에는 윗사람에 잘 보이려고 온 인간도 보인다. 별 도움 안 되는 소리만 하면서 시간을 뺏는다.

 

한바탕 소란이 지나고 다들 돌아갔다. 중간 중간 서비스 상태를 물어보는 사람만 있을 뿐 집중할 수 있는 환경이 되었다. 서버 개발자 J와 해결안을 찾기 시작했다. 응답 시간이 느린 요청부터 확인하기 시작했다. 두 가지 정도가 눈에 들어왔다. 첫 번째는 느린 쿼리다. 인덱스 추가로 일부 해결할 수 있을 것 같다. 어떤 기능은 like 검색을 하는데 페이징 처리를 위해 count 쿼리도 날린다. 이건 다른 해결안을 찾아야 할 것 같다. 두 번째는 불필요한 쿼리를 너무 많이 실행하는 기능이다. JPA를 잘못 사용한 것으로 보인다.

코드

성능 개선안을 도출하기 위해 코드를 보다 자세히 보기 시작했다. 에휴..... 한숨이 절로 나온다. 코드가 기능 추가/변경, 성능 개선 등은 내 알 바 아니라고 말하고 있다. JPA를 사용했지만 JPA가 제공하는 매퍼 기능을 제외하면 맵을 사용하는 것과 별 차이가 없을 정도다. 족보를 알 수 없는 자신만의 모듈을 만들어 사용하고 있어 성능 향상을 위한 운신의 폭도 좁다.

왜 이렇게 했을까 사정을 들어보니 오픈전까지 같이 개발한 프리랜서가 구현 기술을 선택하는 등 일종의 표준을 잡았다고 한다. 그런 개발자에게 중요 결정을 맡겼으니 지금 M사의 고생은 어찌보면 자업자득이다. 단지 나도 같이 빨려서 고생을 하고 있으니 그게 싫을 뿐이다.

코드에서 발견한 근본적인 문제는 엔티티 연관을 아주 마음껏 사용했다는 거다. 일대일, 일대다, 다대일 가릴 것 없이 엔티티 간 연관을 거미줄처럼 설정했다. 이로 인해 목록 조회시 정말 많은 쿼리를 불필요하게 실행하고 있다.

API도 문제다. 스프링을 사용했지만 전형적인 구조가 아니다. 스프링 MVC를 확장해서 JPA 리포지토리를 바로 API에 연결하도록 구현했는데 조회한 엔티티를 그대로 API 응답으로 보낸다. 이 자체도 문제지만 거미줄처럼 연관된 모든 데이터를 함께 전송하고 있는 것도 문제다. 수정은 조회한 엔티티 데이터에서 일부만 변경해서 그대로 다시 보내는 방식을 사용했다. 연관된 데이터도 모두 포함해서 말이다.

엔티티 처리 로직도 흩어져 있다. 생성 전, 생성 후에 불리는 메서드가 있고 이 메서드에 생성 관련 로직이 흩어져 있다. 조회도 비슷하다.

최근에 본 코드 중에 가장 나쁘다.

그래도

 

코드가 이 모양이어도 문제를 해결해야 한다. 서비스는 해야 하니까. 해결을 시도해 보자.

 

[2부]에서 계속...

 

  1. 호돌맨 2020.04.23 09:29 신고

    ㅎㅎ재미있게 읽었습니다. 빨리 2부 올려주세요 현기증납니다~~~~

  2. ㅎㅎㅎ 2020.04.23 14:22

    너무 재미있고 흥미진진합니다 ㅋㅋ 다음 내용이 너무 궁금하네요

스프링 데이터(Spring Data) R2DBC를 사용하면 R2DBC 기반의 리포지토리를 쉽게 구현할 수쉽게 사용할 수 있다. 이 글에서는 이 중에서 스프링 데이터 R2DBC가 제공하는 DatabaseClient를 이용한 R2DBC 연동 방법을 소개한다.

의존 설정

스프링 데이터 R2DBC를 사용하기 위한 의존 설정은 다음과 같다.

<dependencyManagement>
    <dependencies>
        <dependency>
            <groupId>io.r2dbc</groupId>
            <artifactId>r2dbc-bom</artifactId>
            <version>Arabba-SR2</version>
            <type>pom</type>
            <scope>import</scope>
        </dependency>
    </dependencies>
</dependencyManagement>

<dependencies>
    <dependency>
        <groupId>dev.miku</groupId>
        <artifactId>r2dbc-mysql</artifactId>
    </dependency>
    <dependency>
        <groupId>io.r2dbc</groupId>
        <artifactId>r2dbc-pool</artifactId>
    </dependency>
    <dependency>
        <groupId>io.r2dbc</groupId>
        <artifactId>r2dbc-spi</artifactId>
    </dependency>

    <dependency>
        <groupId>org.springframework.data</groupId>
        <artifactId>spring-data-r2dbc</artifactId>
        <version>1.0.0.RELEASE</version>
    </dependency>
</dependencies>

r2dbc-bom의 Arabba-SR2 버전은 r2dbc 0.8.1 버전에 대응한다. 이 글을 쓰는 시점 기준으로 spring-data-r2dbc 최신 버전은 1.0.0이다.

DatabaseClient 생성

DatabaseClient는 ConnectionFactory를 이용해서 생성한다.

String url = "r2dbcs:mysql://user:user@localhost:3306/test";
ConnectionFactory connectionFactory = ConnectionFactories.get(url);

DatabaseClient client = DatabaseClient.create(connectionFactory);

스프링 빈으로 설정하고 싶다면 AbstractR2dbcConfiguration 클래스를 사용하면 된다.

@Configuration
public class ApplicationConfig extends AbstractR2dbcConfiguration {
    @Override
    @Bean
    public ConnectionFactory connectionFactory() {
        String url = "r2dbcs:mysql://user:user@localhost:3306/test";
        ConnectionFactory connectionFactory = ConnectionFactories.get(url);
        return connectionFactory;
    }
}

이 설정을 사용하면 connectionFactory() 메서드로 정의한 ConnectionFactory 빈을 이용해서 DatabaseClient 객체를 생성하고 빈으로 등록한다. AbstractR2dbcConfiguration 클래스의 databaseClient() 메서드가 이 과정을 처리한다.

DatabaseClient로 쿼리 실행하기

DatabaseClient로 쿼리를 실행하는 방법에는 두 가지가 있다. 하나는 SQL을 사용하는 것이고 다른 하나는 매핑 객체를 사용하는 것이다. 이 글에서는 SQL을 사용하는 방법을 살펴보고 다음에 매핑 객체를 이용하는 방법을 살펴보도록 하자.

조회 쿼리

다음 코드는 DatabaseClient를 이용해서 조회 쿼리를 실행하는 예를 보여준다.

Flux<Map<String, Object>> selectPub =
        client.execute("select id, name from member")
                .fetch()
                .all();

selectPub.subscribe(m -> {
    logger.info("m.get(id) : {}", m.get("id"));
});

위 코드에서 각 메서드는 다음을 지정한다.

  • execute : 실행할 SQL을 지정한다.
  • fetch : 쿼리 실행 결과를 읽어온다는 것을 지정한다.
  • all : 쿼리 실행 결과를 모두 읽어온다는 것을 지정한다.

'지정한다'고 설명했는데 실제 쿼리는 Flux를 구독하는 시점에 실행한다. 위 코드에서는 selectPub.subscribe() 코드를 실행하는 시점에 지정한 동작(execute로 쿼리 실행하고 fetch로 쿼리 결과를 가져오고 all로 모든 결과를 조회)을 수행한다.

 

쿼리 실행 결과는 Map에 담긴다. 한 개의 행이 한 개의 Map에 대응하며 칼럼 이름을 Map의 키로 사용한다. 칼럼 이름은 대소문자를 가리지 않는다. (실제 Map으로 spring-core 모듈에 포함된 LinkedCaseInsensitiveMap 구현을 사용한다.)

 

all() 대신 first()를 사용하면 전체 결과가 아닌 첫 번째 결과만 구한다. first() 메서드의 결과는 Mono이다.

Mono<Map<String, Object>> firstPub =
        client.execute("select id, name from member where id = '124124'")
                .fetch()
                .first();

 

결과가 딱 한 개이거나 없다면 one()을 사용할 수도 있다. 단 결과가 두 개 이상이면 구독 시점에 익셉션이 발생한다.

Mono<Map<String, Object>> one =
        client.execute("select id, name from member limit 1")
                .fetch()
                .one();

바인딩 파라미터

DatabaseClient는 이름과 인덱스의 두 가지 바인딩 파라미터를 지원한다. 다음은 이름 기반 바인딩 파라미터 사용 예이다. 쿼리에서 바인딩 파라미터 이름은 ":이름"의 형식을 갖는다. R2DBC의 Statement#bind() 메서드도 이름 기반 파라미터를 지원하지만 이때 바인딩 파라미터의 형식은 DBMS의 형식을 사용해야 하는 것과 다르다.

Flux<Map<String, Object>> all =
        client.execute("select id, name from member where id like :id or name like :name")
                .bind("id", "%jdbc%")
                .bind("name", "%jdbc%")
                .fetch()
                .all();

bind() 메서드는 첫 번째 인자로 파라미터 이름을, 두 번째 인자로 값을 사용한다.

 

다음은 인덱스 기반 바인딩 파라미터 사용 예이다.

Flux<Map<String, Object>> all =
        client.execute("select id, name from member where name like ? or id like ?")
                .bind(0, "%r2dbc%")
                .bind(1, "%r2dbc%")
                .fetch()
                .all();

인덱스는 0부터 시작한다.

수정 쿼리

다음은 수정 쿼리 지정 예이다.

Mono<Void> updPub = client.execute("update member set name = ? where id = ?")
        .bind(0, "id2")
        .bind(1, "bkchoi")
        .then();

쿼리 실행 결과로 변경된 행 개수를 구하고 싶다면 fetch(), rowsUpdated() 메서드를 사용하면 된다.

Mono<Integer> updPub = client.execute("update member set name = :name where id = :id")
        .bind("name", "id3")
        .bind("id", "bkchoi")
        .fetch()
        .rowsUpdated();

여러 쿼리 실행

구독하지 않고 여러 쿼리를 실행하려면 flatMap() 등을 사용해서 DatabaseClient로 생성한 Mono나 Flux를 연결한다. 아래 코드는 select 쿼리 결과가 존재하면 update 쿼리를 실행하고 존재하지 않으면 insert 쿼리를 실행하는 스트림 생성 예이다.

Mono<Integer> saveOrUpdate = client.execute("select id, name from member where id = :id")
        .bind("id", id)
        .fetch()
        .one()
        .flatMap(m ->
                client.execute("update member set name = :name where id = :id")
                        .bind("name", name)
                        .bind("id", id)
                        .fetch()
                        .rowsUpdated())
        .switchIfEmpty(
                client.execute("insert into member (id, name) values (:id, :name)")
                        .bind("name", name)
                        .bind("id", id)
                        .fetch()
                        .rowsUpdated()
        );

saveOrUpdate.subscribe( ...생략 )

 

위 코드에서 주의할 점은 각 쿼리를 실행할 때마다 DB 커넥션을 새로 구한다는 점이다. 즉 select 쿼리를 실행할 때 DB 커넥션을 구하고 쿼리를 실행한 뒤에는 커넥션을 닫는다. 비슷하게 update나 insert 쿼리를 실행할 때에도 DB 커넥션을 구하고 종료한다.

트랜잭션

스프링 데이터 R2DBC는 리액티브를 위한 트랜잭션 관리자인 R2dbcTransactionManager를 제공한다. 이 클래스를 이용해서 트랜잭션을 제어할 수 있다. 다음은 R2dbcTransactionManager를 이용해서 트랜잭션 범위 안에서 쿼리를 실행하는 코드 작성 예이다.

R2dbcTransactionManager tm = new R2dbcTransactionManager(connectionFactory);
TransactionalOperator operator = TransactionalOperator.create(tm);

Mono<Integer> saveOrUpdate = client.execute("select id, name from member where id = :id")
        .bind("id", id)
        .fetch()
        .one()
        .flatMap(m ->
                client.execute("update member set name = :name where id = :id")
                        .bind("name", name)
                        .bind("id", id)
                        .fetch()
                        .rowsUpdated())
        .switchIfEmpty(
                client.execute("insert into member (id, name) values (:id, :name)")
                        .bind("name", name)
                        .bind("id", id)
                        .fetch()
                        .rowsUpdated()
        ).as(operator::transactional);

다음은 R2dbcTransactionManager를 스프링 빈으로 등록한 예를 보여준다.

@Configuration
@EnableTransactionManagement
public class ApplicationConfig extends AbstractR2dbcConfiguration {
    @Override
    @Bean
    public ConnectionFactory connectionFactory() {
        String url = "r2dbcs:mysql://user:user@localhost:3306/test";
        ConnectionFactory connectionFactory = ConnectionFactories.get(url);
        return connectionFactory;
    }

    @Bean
    ReactiveTransactionManager transactionManager(ConnectionFactory connectionFactory) {
        return new R2dbcTransactionManager(connectionFactory);
    }
}

@EnableTransactionManagement를 사용했는데 이 경우 @Transactional 애노테이션을 이용해서 트랜잭션을 처리할 수 있다. 예를 들어 다음 코드는 saveOrUpdate() 메서드에 @Transactional 애노테이션을 적용했는데 이 경우 saveOrUpdate() 메서드가 리턴한 Mono를 구독할 때 트랜잭션 범위에서 관련 쿼리를 실행한다.

@Service
public class MemberService {
    private DatabaseClient client;

    public MemberService(DatabaseClient client) {
        this.client = client;
    }

    @Transactional
    public Mono<Integer> saveOrUpdate(String id, String name) {
        return client.execute("select id, name from member where id = :id")
                .bind("id", id)
                .fetch()
                .one()
                .flatMap(m ->
                        client.execute("update member set name = :name where id = :id")
                                .bind("name", name)
                                .bind("id", id)
                                .fetch()
                                .rowsUpdated())
                .switchIfEmpty(
                        client.execute("insert into member (id, name) values (:id, :name)")
                                .bind("name", name)
                                .bind("id", id)
                                .fetch()
                                .rowsUpdated()
                );
    }
}

 

팀장 역할을 할 때 가장 힘든 시기 중 하나가 요즘과 같은 평가 기간이다. 흔히 성과와 역량을 평가하는데 현재 조직에서는 평가해야 할 역량 항목이 적지 않다. 

 

누구나 자신의 역량을 높게 평가하고 싶겠지만 모든 항목을 높게 받을 수는 없다. 전 역량 항목이 높다는 것은 마치 국영수 평균이 90점 이상인데 100미터를 12초에 뛰고 노래방 가수 수준에 그림도 잘 그리며 음식 또한 맛있게 하고 패션 감각이 뛰어난 데 거기에 성격 좋고 이타적인 것도 모잘라 매력적인 외모를 가진 것과 같다. 

 

역량 평가를 가능한 객관적으로 하기 위해 기대하는 수준을 기준으로 평가한다. 기대하는 수준은 직급/연차/연봉 등을 고려한 기대하는 수준을 의미한다. 즉 기대하는 수준은 일반적인 평가에서 중간 값인 B에 해당한다.

 

동일 직급/연차 대비 상대적으로 잘하는(또는 그렇게 느껴지는) 항목이 있다면 중간 값보다 한 칸 위인 A로 평가한다. 다시 말하지만 동일 직급/연차 대비 잘해야 A다. 과장한테 기대하는 바가 있는데 과장이 사원보다 잘한다 해서 해당 역량을 A로 평가할 수는 없다.

 

S는 정말 뛰어나야 줄 수 있다. 정말 뛰어나다는 건 회사에서 그 역량 하나 만큼은 최고라는 뜻이다. 그냥 좀 하네 정도가 아니다.


어떤 항목은 A로 평가하기도 C로 평가하기도 애매하다. '책임감' 같은 항목이 그렇다. 매사에 일을 대충하고 기대하는 만큼 하지 않으면 '책임감'을 C로 평가하겠지만 단순히 일을 열심히 했다고 '책임감'을 A로 평가할 수는 없다. '열정', '소통', '윤리'와 같은 항목도 비슷하다. 특별히 못하면 티가 나지만 이 역시 남보다 내가 특별히 더 잘한다고 말하기 힘든 항목이다. 내가 옆 동료보다 더 윤리적이라고 말할 수 있으려면 얼마나 윤리적이어야 하나?

 

이렇다 보니 결국 역량 평가 결과는 '기대하는 수준'에서 크게 벗어나지 않는다. A를 절반 이상 받고 나머지는 부족한 점이 없어야 A와 B 사이인 B+를 받을 수 있는데 절반 이상의 역량을 '기대하는 수준' 이상 받기란 쉬운 게 아니다. 주변 동료보다 몇몇 항목은 뛰어날 수 있지만 절반이 넘는 역량 항목에서 뛰어나기란 쉽지 않다.

 

근데 이게 문제다. 직군에 따라 항목별로 가중치라도 있어야 역량 평가에 차이가 날텐데 다수가 비슷한 점수를 받는다. 이럴거면 뭐하러 역량 평가를 하나. 평가를 하는 사람도 평가를 받는 사람도 만족할 수 없는 방식이다. 1년 농사를 망치는 기분마저 든다.

파이썬 머신러닝 완벽 가이드 책 스터디 자료

 

2장 : 사이킷런 소개

* 프로세스 기초 / 사이킷런 프레임워크 기초 / 교차 검증: KFold, Stratified

* 데이터 전처리: null 값 처리, 인코딩, 표준화, 정규화

 

파이선 ML - 2장 사이킷런.pdf
0.14MB

3장 : 평가

* 성능 지표: 정확도, 오차행렬(confusion matrix), 정밀도, 재현율, 분류 결정 임곗값

* ROC / ROC-AUC 

파이선 ML - 3장 평가.pdf
0.88MB

4장 : 분류

* 결정 트리(Decision Tree)

* 앙상블 학습 / 보팅, 배깅, 부스팅  / 하드 보팅, 소프트 보팅

* 랜덤 포레스트 / GBM / XGBoost / LightGBM

파이선 ML - 4장 분류.pdf
0.46MB

5장 : 회귀

* 선형 회귀, 다항 회귀, 규제 선형 모델(릿지, 라쏘, 엘라스틱넷)

* 로지스틱 회귀

* 회귀 트리

파이썬 ML - 5장 회귀.pdf
0.42MB

R2DBC는 커넥션 풀 라이브러리를 제공한다. 이 라이브러릴 사용하면 어렵지 않게 R2DBC에 대한 커넥션 풀을 설정할 수 있다.

의존 설정

커넥션 풀을 설정하려면 r2dbc-pool 모듈을 추가한다.

<dependency>
    <groupId>io.r2dbc</groupId>
    <artifactId>r2dbc-pool</artifactId>
    <version>0.8.0.RELEASE</version>
</dependency>

URL로 커넥션 풀 사용하기

커넥션 풀을 사용하는 가장 쉬운 방법은 URL에 pool을 사용하는 것이다.

String url = "r2dbcs:pool:mysql://root:1@localhost:3306/test";
ConnectionFactory connectionFactory = ConnectionFactories.get(url);

Mono<Integer> updatedMono = Mono.from(connectionFactory.create())
        .flatMap(conn -> {
            Mono<Void> txMono = Mono.from(conn.beginTransaction());
            ...
            return updMono.delayUntil(ret -> conn.commitTransaction())
                    .onErrorResume(err -> Mono.from(conn.rollbackTransaction())
                                              .then(Mono.error(err)))
                    .doFinally(signal -> Mono.from(conn.close()).subscribe());
        });

URL의 드라이버 위치에 "pool"을 사용했고 나머지는 동일하다. URL을 사용하면 초기 크기나 최대 크기 기본값으로 10을 사용한다.

ConnectionFactoryOptions로 커넥션 풀 사용하기

커넥션 풀의 초기 크기, 최대 크기, 검증 쿼리를 직접 제어하고 싶다면 ConnectionFactoryOptions를 이용해서 ConnectionFactory를 생성하면 된다. 아래 코드는 사용 예이다.

ConnectionFactory connectionFactory = ConnectionFactories.get(ConnectionFactoryOptions.builder()
        .option(ConnectionFactoryOptions.SSL, true)
        .option(ConnectionFactoryOptions.DRIVER, "pool")
        .option(ConnectionFactoryOptions.PROTOCOL, "mysql")
        .option(ConnectionFactoryOptions.HOST, "localhost")
        .option(ConnectionFactoryOptions.PORT, 3306)
        .option(ConnectionFactoryOptions.USER, "root")
        .option(ConnectionFactoryOptions.PASSWORD, "1")
        .option(ConnectionFactoryOptions.DATABASE, "test")
        .option(Option.<Integer>valueOf("initialSize"), 5)
        .option(Option.<Integer>valueOf("maxSize"), 20)
        .option(Option.<String>valueOf("validationQuery"), "select 1+1")
        .build());

DRIVER 옵션 값은 "pool"로 지정하고 PROTOCOL 옵션에 실제 드라이버를 지정한다.

ConnectionPool 클래스로 커넥션 풀 사용하기

커넥션 풀에 대해 보다 상세한 설정을 하고 싶다면 ConnectionPool 클래스를 이용해서 커넥션 풀을 생성한다. 다음은 사용 예를 보여준다.

String url = "r2dbcs:mysql://root:1@localhost:3306/test";
ConnectionFactory connectionFactory = ConnectionFactories.get(url);

ConnectionPoolConfiguration configuration = ConnectionPoolConfiguration.builder(connectionFactory)
        .initialSize(5)
        .maxSize(20)
        .maxIdleTime(Duration.ofMinutes(10))
        .maxLifeTime(Duration.ofMinutes(60))
        .maxCreateConnectionTime(Duration.ofMillis(500))
        .maxAcquireTime(Duration.ofMillis(500))
        .validationDepth(ValidationDepth.LOCAL)
        .validationQuery("select 1+1")
        .name("POOL 01")
        .build();

ConnectionPool pool = new ConnectionPool(configuration);

Mono<Integer> updatedMono = Mono.from(pool.create())
        .flatMap(conn -> {
            Mono<Void> txMono = Mono.from(conn.beginTransaction());
            ...
            return updMono.delayUntil(ret -> conn.commitTransaction())
                .onErrorResume(err -> Mono.from(conn.rollbackTransaction())
                                          .then(Mono.error(err)))
                .doFinally(signal -> Mono.from(conn.close()).subscribe());
        });

ConnectionPoolConfiguration은 커넥션 풀 설정 정보를 담는다. ConnectionPoolConfiguration.builder() 메서드는 커넥션을 생성할 때 사용할 ConnectionFactory를 인자로 받으며 이후 initialSize(), maxSize() 등의 메서드를 이용해서 커넥션 풀을 설정한다.

 

설정한 ConnectionPoolConfiguration을 이용해서 ConnectionPool 객체를 생성한 뒤 ConnectionFactory 대신에 ConnectionPool을 이용해서 커넥션을 구하면 된다.

R2DBC(Reactive Relational Database Connectivity)는 SQL 데이터베이스를 위한 리액티브 API이다. 리액티브 스트림즈를 기반으로 SQL을 실행하는데 필요한 커넥션, 쿼리 실행, 트랜잭션 처리 등에 대한 API를 정의하고 있다. JDBC API처럼 R2DBC는 API만 정의하고 있다. r2dbc-spi 모듈이 SPI(service-provider interface)로서 각 드라이버는 SPI에 정의된 인터페이스를 알맞게 구현한다. 이 글을 쓰는 시점에서 R2DBC 버전은 0.8.0이며 스펙, 드라이버 구현 등 관련 문서는 https://r2dbc.io/ 사이트에서 확인할 수 있다.

 

* 이 글에서는 리액티브 스트림즈에 대한 내용은 설명하지 않는다. Publisher나 Subscriber에 대한 내용은 https://javacan.tistory.com/entry/Reactor-Start-1-RS-Flux-Mono-Subscriber 글을 참고한다.

의존 설정

R2DBC를 사용하려면 구현을 제공하는 드라이버가 필요하다. 예제에서는 r2dbc-mysql 드라이버를 사용한다.

<dependency>
    <groupId>dev.miku</groupId>
    <artifactId>r2dbc-mysql</artifactId>
    <version>0.8.0.RELEASE</version>
</dependency>

r2dbc-mysql 0.8.0은 r2dbc-spi 0.8.0 버전을 지원하며 r2dbc-mysql은 스프링 리액터와 네티를 이용해서 구현되어 있다.

주요 구성 요소

r2dbc를 이용해서 SQL을 실행하기 위해 사용하는 주요 구성 요소는 다음과 같다. 모두 io.r2dbc.spi 패키지에 위치하며ConnectionFactories를 제외한 나머지는 인터페이스이다.

 

  • ConnectionFactory : Connection을 생성하는 팩토리
  • ConnectionFactories : ConnectionFactory를 검색해서 제공하는 유틸리티
  • Connection : 데이터베이스에 대한 커넥션
  • Statement : 실행할 SQL
  • Batch : 배치로 실행할 SQL
  • Result : 쿼리 실행 결과

ConnectionFactory 구하기

쿼리를 실행하려면 커넥션을 먼저 구해야 한다. R2DBC는 ConnectionFactory를 이용해서 커넥션을 구할 수 있다. 드라이버이가 제공하는 ConnectionFactory를 직접 생성하거나 ConnectionFactories를 이용해서 ConnectionFactory를 구할 수도 있다. 참고로 ConnectionFactories는 자바 서비스 프로바이더를 이용해서 ConnectionFactory를 찾는다.

 

다음 코드는 ConnectionFactories.get() 메서드를 이용해서 ConnectionFactory를 구하는 코드 예이다.

String url = "r2dbcs:mysql://user:pw@localhost:3306/test";
ConnectionFactory connectionFactory = ConnectionFactories.get(url);

JDBC URL과 비슷하게 R2DBC도 URL을 이용해서 연결할 DB 정보를 지정한다. 다음은 URL 구성 요소를 보여준다.

scheme:driver:protocol://authority/path?query
  • scheme : URL이 유효한 R2DBC URL임을 지정한다. 유효한 스킴은 r2dbc와 r2dbcs(SSL 용)이다.
  • driver : 드라이버를 지정한다.
  • protocol : 드라이버에 따라 프로토콜 정보를 지정한다(선택).
  • authority : 접속할 DB와 인증 정보를 포함한다.
  • path : 초기 스카마나 데이터베이스 이름을 지정한다(선택).
  • query : 추가 설정 옵션을 전달할 때 사용한다(선택).

각 드라이버마다 URL 값이 다르므로 드라이버 문서를 참고한다.

Connection을 구하고 쿼리 실행하기

다음 코드는 Connection을 구하고 쿼리를 실행해서 원하는 결과를 출력하는 예이다.

CountDownLatch latch = new CountDownLatch(1);

ConnectionFactory connectionFactory = ConnectionFactories.get(url);

Publisher<? extends Connection> connPub = connectionFactory.create();
connPub.subscribe(new BaseSubscriber<Connection>() {
    @Override
    protected void hookOnNext(Connection conn) {
        Statement stmt = conn.createStatement("select id, name from member where name = ?name");
        stmt.bind("name", "최범균");
        Publisher<? extends Result> resultPub = stmt.execute();
        resultPub.subscribe(new BaseSubscriber<Result>() {
            @Override
            protected void hookOnNext(Result result) {
                Publisher<Member> memberPub = result.map((row, meta) -> 
                    new Member(row.get(0, String.class), row.get(1, String.class))
                );
                memberPub.subscribe(new BaseSubscriber<Member>() {
                    @Override
                    protected void hookOnNext(Member member) {
                        logger.info("회원 데이터 : {}", member);
                    }
                });
            }

            @Override
            protected void hookFinally(SignalType type) {
                conn.close().subscribe(new BaseSubscriber<Void>() {
                    @Override
                    protected void hookFinally(SignalType type) {
                        latch.countDown();
                    }
                });
            }
        });
    }
});

latch.await(); // latch.countDown() 실행 전까지 블록킹

코드가 다소 복잡하다. R2DBC의 주요 API는 리액티브 스트림의 Publisher 타입을 리턴하는데 그 타입을 그대로 사용해서 다소 복잡해졌다. Publisher#subscribe() 메서드에 전달할 Subscriber 구현 객체는 BaseSubscriber를 이용해서 생성했다. BaseSubscriber는 스프링 리액터가 제공하는 Subscriber 구현 클래스로 필요한 기능만 구현하기 위해 이 클래스를 사용했다.

 

먼저 Connection을 구하고 Statement를 생성하는 코드만 보자.

ConnectionFactory connectionFactory = ConnectionFactories.get(url);

final Publisher<? extends Connection> connPub = connectionFactory.create();
connPub.subscribe(new BaseSubscriber<Connection>() {
    @Override
    protected void hookOnNext(Connection conn) {
        Statement stmt = conn.createStatement("select id, name from member where name = ?name");
        stmt.bind("name", "최범균");
        Publisher<? extends Result> resultPub = stmt.execute();
        resultPub.subscribe(...);
    }
});

ConnectionFactory#create() 메서드가 생성한 Publisher는 연결에 성공하면 next 신호로 Connection을 보낸다. Subscriber는 onNext() 메서드로 Connection을 받아 쿼리를 실행한다(BaseSubscriber를 사용한 경우 hookOnNext() 메서드 사용).

 

쿼리를 실행하기 위한 Statement는 Connection#createStatement() 메서드로 생성한다. createStatement() 메서드는 실행할 쿼리를 입력받는다. JDBC의 Statement와 PreparedStatement가 R2DBC에서는 Statement 하나로 처리한다.

 

Statement#bind() 메서드를 이용해서 바인딩 파라미터에 값을 전달한다. 다음은 Statement가 제공하는 바인딩 파라미터 관련 메서드이다.

  • Statement bind(int index, Object value)
  • Statement bind(String name, Object value)
  • Statement bindNull(int index, Class<?> type)
  • Statement bindNull(String name, Class<?> type)

한 가지 주의할 점은 쿼리에서 사용하는 바인딩 파라미터가 JDBC와 다르다는 점이다. JDBC의 PreparedStatement는 물음표(?)를 이용해서 바인딩 파라미터를 지정하고 1부터 시작하는 인덱스를 사용한다. 반면에 DBMS가 사용하는 바인딩 파라미터를 사용하며 이름이나 0부터 시작하는 인덱스를 사용할 수 있다. 예를 들어 MySQL은 위 코드에서 보는 것처럼 바인딩 파라미터로 "?이름" 형식을 사용한다.

 

Statement#execute() 메서드는 Publisher<Result> 타입을 리턴한다. Result는 쿼리 실행 결과를 제공한다. Result가 제공하는 다음의 두 메서드를 이용해서 쿼리 실행 결과를 구할 수 있다.

public interface Result {
    // 변경된 행 개수를 리러탄한다.
    Publisher<Integer> getRowsUpdated();
    // 조회 결과(Row)를 변환한다.
    <T> Publisher<T> map(BiFunction<Row, RowMetadata, ? extends T> mappingFunction);
}

Row는 조회한 한 행에 대응한다. Row는 행의 데이터를 조회하기 위한 get() 메서드를 제공한다.

  • T get(int index, Class<T> type) : JDBC API와 달리 index는 0부터 시작한다. 
  • T get(String name, Class<T> type)

예제 코드는 SELECT 쿼리 실행 결과를 처리하므로 map() 메서드를 이용해서 조회환 결과를 Member 객체로 변환했다. 

Publisher<? extends Result> resultPub = stmt.execute();
resultPub.subscribe(new BaseSubscriber<Result>() {
    @Override
    protected void hookOnNext(Result result) {
        Publisher<Member> memberPub = result.map((row, meta) ->
                new Member(row.get(0, String.class), row.get(1, String.class))
        );
        memberPub.subscribe(new BaseSubscriber<Member>() {
            @Override
            protected void hookOnNext(Member member) {
                logger.info("회원 데이터 : {}", member);
            }
        });
    }

    @Override
    protected void hookFinally(SignalType type) {
        conn.close().subscribe(...);
    }
});

DB 작업이 끝나면 Connection#close() 메서드로 커넥션을 종료해야 한다. close() 메서드는 Publisher<Void>를 리턴하므로 실제 커넥션 종료는 close() 메서드가 리턴한 Publisher를 구독해야 실행된다.

 

리액터가 제공하는 BaseSubscriber는 complete 신호나 error 신호가 오면 hookFinally()를 실행하는 기능을 구현하고 있으므로 에러 여부에 상관없이 커넥션을 종료하기 위해 이 메서드를 이용했다.

Connection을 구하고 쿼리 실행하기 : 리액터 이용

Publisher를 이용하면 구독 처리 코드가 중첩되어 복잡해진다. 중첩 구조는 리액터를 사용해서 없앨 수 있다. 다음 코드는 리액터를 사용해서 구현현 예이다.

Mono<? extends Connection> connMono = Mono.from(connectionFactory.create());

Flux<Member> members = connMono.flatMapMany(conn -> {
        Flux<? extends Result> resultFlux = Flux.from(
                conn.createStatement("select id, name from member where name like ?name")
                        .bind("name", "%범%")
                        .execute()
        );

        Flux<Member> memberFlux = resultFlux.flatMap(result ->
                result.map((row, meta) ->
                        new Member(row.get("id", String.class), row.get("name", String.class))
                )
        );

        return memberFlux.doFinally(signal -> Mono.from(conn.close()).subscribe());
    }
);

List<Member> ret = members.collectList().block();

트랜잭션 처리

Connection은 트랜잭션 처리를 위해 다음 메서드를 제공한다.

  • Publisher<Void> beginTransaction()
  • Publisher<Void> commitTransaction()
  • Publisher<Void> rollbackTransaction()

다음은 트랜잭션 처리 예를 보여준다.

Mono<Integer> updatedMono = Mono.from(connectionFactory.create())
    .flatMap(conn -> {
        final Mono<Void> txMono = Mono.from(conn.beginTransaction());
        final Mono<Integer> updMono = txMono.then(
                Mono.from(conn.createStatement("insert into member values (?id, ?name)")
                        .bind("id", "bkchoi4")
                        .bind("name", "최범균2")
                        .execute()
                ).flatMap(
                        result -> Mono.from(result.getRowsUpdated())
                )
        );
        return updMono.delayUntil(ret -> conn.commitTransaction())
            .onErrorResume(err -> Mono.from(conn.rollbackTransaction()).then(Mono.error(err)))
            .doFinally(signal -> Mono.from(conn.close()).subscribe());
    });

트랜잭션 커밋/롤백 처리를 위해 리액터의 delayUntil(), onErrorResume() 메서드를 사용했다. 참고로 delayUntil() 메서드는 인자로 받은 Publisher가 종료된 뒤에 종료하며 onErrorResume() 메서드는 에러 발생시 인자로 받은 Publisher를 실행한다. 트랜잭션 롤백 처리 후에 Mono.error()를 이용해서 에러를 다시 발생시키도록 했다.

 

+ Recent posts