주요글: 도커 시작하기
반응형

서로 다른 MariaDB 간에 데이터를 복제할 일이 자꾸 생겨서 MariaDB 바이너리 로그 기반 CDC(Change Data Capture)를 활용하기로 했다. 이미 mysql-binlog-connector-java라는 훌륭한 라이브러리가 존재하는데 이를 직접 사용하면 코드가 다소 복잡해져서 이 라이브러리를 기반으로 한 mariadb-cdc라는 라이브러리를 만들었다.

MariaDB 설정

MariaDB 바이너리 로그 활성화

mariadb-cdc는 바이너리 로그를 사용하므로 binlog_format을 row로 설정한다.

binlog_format = row  
binlog_row_image = full  

binlog_row_image 값이 FULL이면 변경 전/후 모든 칼럼 값을 기록한다.

CDC를 위한 사용자 생성

CDC를 위한 사용자를 생성한다. 이 사용자에 REPLICATION SLAVE, REPLICATION CLIENT, SELECT 권한을 부여한다.

CREATE USER cdc@'%' IDENTIFIED BY 'password'  
GRANT REPLICATION SLAVE, REPLICATION CLIENT, SELECT ON *.* TO cdc@'%'  

메이븐 리포지토리 설정

mariadb-cdc를 사용하려면 아래 리포지토리 설정과 의존 설정을 추가한다.

<repositories>
    <repository>
        <id>jitpack.io</id>
        <url>https://jitpack.io</url>
    </repository>
</repositories>

<dependencies>
    <dependency>
        <groupId>com.github.madvirus</groupId>
        <artifactId>mariadb-cdc</artifactId>
        <version>0.11.0</version>
    </dependency>
</dependencies>

그레이들을 사용한다면 아래 설정을 추가한다.

repositories {
    mavenCentral()
    maven { url 'https://jitpack.io' }
}

dependencies {
    implementation 'com.github.madvirus:mariadb-cdc:0.11.0'
}

MariadbCdc 사용하기

기본 사용법

단계 1, MariadbCdcConfig 생성

MariadbCdcConfig config = new MariadbCdcConfig(
                "localhost", // MariaDB 호스트
                3306, // 포트
                "cdc", // CDC를 위한 사용자
                "password", // 암호
                "bin.pos"); // 바이너리 로그 위치 추적용 파일 경로

"bin.pos" 파일이 없으면 현재 바이너로 로그 위치부터 읽기 시작한다.
"bin.pos" 파일에 바이너로 로그 위치가 기록되어 있으면 해당 위치부터 읽기 시작한다.
MariadbCdc는 바이너로 로그를 읽으면 다음 위치를 "bin.pos" 파일에 기록한다.

단계 2, MariadbCdc 생성

JdbcColumnNamesGetter columnNamesGetter = new JdbcColumnNamesGetter(
            "localhost", // host
            3307, // port
            "cdc", // cdc user
            "password"); // password 

MariadbCdc cdc = new MariadbCdc(config, columnNamesGetter);

MariadbCdc는 테이블의 칼럼 이름을 구하기 위해 ColumnNamesGetter를 사용한다.

기본 제공하는 JdbcColumnNamesGetter는 다음 쿼리를 이용해서 칼럼 이름을 구한다:

select COLUMN_NAME, ORDINAL_POSITION, COLUMN_DEFAULT, IS_NULLABLE, DATA_TYPE 
from INFORMATION_SCHEMA.COLUMNS 
WHERE table_schema = '?' and TABLE_NAME = '?' 
order by ORDINAL_POSITION

단계 3, MariadbCdcListener 설정

cdc.setMariadbCdcListener(new MariadbCdcListener() {
    @Override
    public void started(BinlogPosition binlogPosition) {
        // CDC가 시작되면 호출됨
    }

    @Override
    public void startFailed(Exception e) {
        // CDC 시작에 실패하면 호출됨
    }

    @Override
    public void onDataChanged(List<RowChangedData> list) {
        // 변경된 데이터 목록(행 목록)을 받음
        list.forEach(data -> { // 변경된 각 행에 대해
            String database = data.getDatabase(); // 변경된 행의 DB 이름
            String table = data.getTable(); // 변경된 행의 테이블 이름
            DataRow dataRow = data.getDataRow(); // 변경된 데이터 행(변경 후 이미지)
            if (data.getType() == ChangeType.INSERT) { // 추가된 데이터면
                Long id = dataRow.getLong("id"); // id 칼럼 값을 Long으로 구한다.
                // ...
            } else if (data.getType() == ChangeType.UPDATE) { // 변경된 데이터면
                String name = dataRow.getString("name"); // 변경된 name 칼럼 값을 구한다.
                DataRow dataRowBeforeUpdate = data.getDataRowBeforeUpdate(); // 변경전 데이터 행(변경 전 이미지)
                String nameBeforeUpdate = dataRowBeforeUpdate.getString("name"); // 변경전 name 칼럼 값을 구한다.
                // ...
            } else if (data.getType() == ChangeType.DELETE) {
                String email = dataRow.getString("email"); // 삭제된 행의 email 칼럼 값을 구한다.
                // ...
            }
        });
    }

    @Override
    public void onXid(Long xid) {
        // 트랜잭션 커밋 로그
    }

    @Override
    public void stopped() {
        // CDC를 중지하면 호출됨
    }
});

단계 4, MariadbCdc 시작/중지

cdc.start(); // 별도 쓰레드로 바이너리 로그 조회 시작

...

cdc.stop(); // 조회 중지

MariadbCdc가 시작되면 setMariadbCdcListener()로 설정한 listener에 읽은 바이너리 로그 데이터를 전달한다.

 

MariadbCdc#start()는 별도 쓰레드로 바이너리 로그 조회를 시작한다.

특정 테이블만 포함하기/제외하기

MariadbCdcListener#onDataChanged()는 기본적으로 모든 변경에 대해 불린다. 만약 특정 테이블만 포함하거나 제외하고 싶다면 필터를 사용하면 된다.

// test.user 테이블 변경에 대해서만 onDataChanged() 호출
config.setIncludeFilters("test.user");
// test.member 테이블 변경은 onDataChanged() 호출 제외
config.setIncludeFilters("test.member");

수정한 칼럼만 포함하기

binlog_row_image가 full 이면 UPDATE/DELETE 시에 모든 칼럼 값을 포함한다. 변경된 칼럼 값만 포함하고 싶다면 binlog_row_image를 minimal로 설정한다.

binlog_format = row
binlog_row_image = minimal

MINIMAL은 수정 전 이미지에는 PK 칼럼만 기록하고(PK 칼럼이 없으면 전체 칼럼) 수정 후 이미지에는 변경된 칼럼만 기록한다.

binlog_row_image 가 minimal일 때 아래 쿼리를 실행하면

update member set name = 'newname' where id = 10

RowChangedData#getDataRowBeforeUpdate()는 PK 칼럼만 포함한 DataRow를 리턴하고
RowChangedData#getDataRow()는 변경된 칼럼만 포함한 DataRow를 리턴한다.

@Override
public void onDataChanged(List<RowChangedData> list) {
    // handle changed data
    list.forEach(data -> { // each
        String database = data.getDatabase(); // test
        String table = data.getTable(); // member
        DataRow afterDataRow = data.getDataRow(); // after image
        if (data.getType() == ChangeType.UPDATE) {
            DataRow beforeDataRow = data.getDataRowBeforeUpdate(); // before image
            Long id = beforeDataRow.getLong("id"); // before image includes only pk fields
            String name = afterDataRow.getString("name"); // after image includes only updated fields
            // ...
        }
    });
}

MariaDB 10.5: binlog_row_metadata = full

MariaDB 10.5는 binlog_row_metadata 변수를 지원한다.
binlog_row_metadata 값이 FULL이면 칼럼 이름을 포함한 모든 메타데이터가 바이너리 로그에 기록된다.
그래서 binlog_row_metadata가 FULL이면 ColumnNamesGetter가 필요 없다.

참고자료

+ Recent posts