주요글: 도커 시작하기

스프링 데이터(Spring Data) R2DBC를 사용하면 R2DBC 기반의 리포지토리를 쉽게 구현할 수쉽게 사용할 수 있다. 이 글에서는 이 중에서 스프링 데이터 R2DBC가 제공하는 DatabaseClient를 이용한 R2DBC 연동 방법을 소개한다.

의존 설정

스프링 데이터 R2DBC를 사용하기 위한 의존 설정은 다음과 같다.

<dependencyManagement>
    <dependencies>
        <dependency>
            <groupId>io.r2dbc</groupId>
            <artifactId>r2dbc-bom</artifactId>
            <version>Arabba-SR2</version>
            <type>pom</type>
            <scope>import</scope>
        </dependency>
    </dependencies>
</dependencyManagement>

<dependencies>
    <dependency>
        <groupId>dev.miku</groupId>
        <artifactId>r2dbc-mysql</artifactId>
    </dependency>
    <dependency>
        <groupId>io.r2dbc</groupId>
        <artifactId>r2dbc-pool</artifactId>
    </dependency>
    <dependency>
        <groupId>io.r2dbc</groupId>
        <artifactId>r2dbc-spi</artifactId>
    </dependency>

    <dependency>
        <groupId>org.springframework.data</groupId>
        <artifactId>spring-data-r2dbc</artifactId>
        <version>1.0.0.RELEASE</version>
    </dependency>
</dependencies>

r2dbc-bom의 Arabba-SR2 버전은 r2dbc 0.8.1 버전에 대응한다. 이 글을 쓰는 시점 기준으로 spring-data-r2dbc 최신 버전은 1.0.0이다.

DatabaseClient 생성

DatabaseClient는 ConnectionFactory를 이용해서 생성한다.

String url = "r2dbcs:mysql://user:user@localhost:3306/test";
ConnectionFactory connectionFactory = ConnectionFactories.get(url);

DatabaseClient client = DatabaseClient.create(connectionFactory);

스프링 빈으로 설정하고 싶다면 AbstractR2dbcConfiguration 클래스를 사용하면 된다.

@Configuration
public class ApplicationConfig extends AbstractR2dbcConfiguration {
    @Override
    @Bean
    public ConnectionFactory connectionFactory() {
        String url = "r2dbcs:mysql://user:user@localhost:3306/test";
        ConnectionFactory connectionFactory = ConnectionFactories.get(url);
        return connectionFactory;
    }
}

이 설정을 사용하면 connectionFactory() 메서드로 정의한 ConnectionFactory 빈을 이용해서 DatabaseClient 객체를 생성하고 빈으로 등록한다. AbstractR2dbcConfiguration 클래스의 databaseClient() 메서드가 이 과정을 처리한다.

DatabaseClient로 쿼리 실행하기

DatabaseClient로 쿼리를 실행하는 방법에는 두 가지가 있다. 하나는 SQL을 사용하는 것이고 다른 하나는 매핑 객체를 사용하는 것이다. 이 글에서는 SQL을 사용하는 방법을 살펴보고 다음에 매핑 객체를 이용하는 방법을 살펴보도록 하자.

조회 쿼리

다음 코드는 DatabaseClient를 이용해서 조회 쿼리를 실행하는 예를 보여준다.

Flux<Map<String, Object>> selectPub =
        client.execute("select id, name from member")
                .fetch()
                .all();

selectPub.subscribe(m -> {
    logger.info("m.get(id) : {}", m.get("id"));
});

위 코드에서 각 메서드는 다음을 지정한다.

  • execute : 실행할 SQL을 지정한다.
  • fetch : 쿼리 실행 결과를 읽어온다는 것을 지정한다.
  • all : 쿼리 실행 결과를 모두 읽어온다는 것을 지정한다.

'지정한다'고 설명했는데 실제 쿼리는 Flux를 구독하는 시점에 실행한다. 위 코드에서는 selectPub.subscribe() 코드를 실행하는 시점에 지정한 동작(execute로 쿼리 실행하고 fetch로 쿼리 결과를 가져오고 all로 모든 결과를 조회)을 수행한다.

 

쿼리 실행 결과는 Map에 담긴다. 한 개의 행이 한 개의 Map에 대응하며 칼럼 이름을 Map의 키로 사용한다. 칼럼 이름은 대소문자를 가리지 않는다. (실제 Map으로 spring-core 모듈에 포함된 LinkedCaseInsensitiveMap 구현을 사용한다.)

 

all() 대신 first()를 사용하면 전체 결과가 아닌 첫 번째 결과만 구한다. first() 메서드의 결과는 Mono이다.

Mono<Map<String, Object>> firstPub =
        client.execute("select id, name from member where id = '124124'")
                .fetch()
                .first();

 

결과가 딱 한 개이거나 없다면 one()을 사용할 수도 있다. 단 결과가 두 개 이상이면 구독 시점에 익셉션이 발생한다.

Mono<Map<String, Object>> one =
        client.execute("select id, name from member limit 1")
                .fetch()
                .one();

바인딩 파라미터

DatabaseClient는 이름과 인덱스의 두 가지 바인딩 파라미터를 지원한다. 다음은 이름 기반 바인딩 파라미터 사용 예이다. 쿼리에서 바인딩 파라미터 이름은 ":이름"의 형식을 갖는다. R2DBC의 Statement#bind() 메서드도 이름 기반 파라미터를 지원하지만 이때 바인딩 파라미터의 형식은 DBMS의 형식을 사용해야 하는 것과 다르다.

Flux<Map<String, Object>> all =
        client.execute("select id, name from member where id like :id or name like :name")
                .bind("id", "%jdbc%")
                .bind("name", "%jdbc%")
                .fetch()
                .all();

bind() 메서드는 첫 번째 인자로 파라미터 이름을, 두 번째 인자로 값을 사용한다.

 

다음은 인덱스 기반 바인딩 파라미터 사용 예이다.

Flux<Map<String, Object>> all =
        client.execute("select id, name from member where name like ? or id like ?")
                .bind(0, "%r2dbc%")
                .bind(1, "%r2dbc%")
                .fetch()
                .all();

인덱스는 0부터 시작한다.

수정 쿼리

다음은 수정 쿼리 지정 예이다.

Mono<Void> updPub = client.execute("update member set name = ? where id = ?")
        .bind(0, "id2")
        .bind(1, "bkchoi")
        .then();

쿼리 실행 결과로 변경된 행 개수를 구하고 싶다면 fetch(), rowsUpdated() 메서드를 사용하면 된다.

Mono<Integer> updPub = client.execute("update member set name = :name where id = :id")
        .bind("name", "id3")
        .bind("id", "bkchoi")
        .fetch()
        .rowsUpdated();

여러 쿼리 실행

구독하지 않고 여러 쿼리를 실행하려면 flatMap() 등을 사용해서 DatabaseClient로 생성한 Mono나 Flux를 연결한다. 아래 코드는 select 쿼리 결과가 존재하면 update 쿼리를 실행하고 존재하지 않으면 insert 쿼리를 실행하는 스트림 생성 예이다.

Mono<Integer> saveOrUpdate = client.execute("select id, name from member where id = :id")
        .bind("id", id)
        .fetch()
        .one()
        .flatMap(m ->
                client.execute("update member set name = :name where id = :id")
                        .bind("name", name)
                        .bind("id", id)
                        .fetch()
                        .rowsUpdated())
        .switchIfEmpty(
                client.execute("insert into member (id, name) values (:id, :name)")
                        .bind("name", name)
                        .bind("id", id)
                        .fetch()
                        .rowsUpdated()
        );

saveOrUpdate.subscribe( ...생략 )

 

위 코드에서 주의할 점은 각 쿼리를 실행할 때마다 DB 커넥션을 새로 구한다는 점이다. 즉 select 쿼리를 실행할 때 DB 커넥션을 구하고 쿼리를 실행한 뒤에는 커넥션을 닫는다. 비슷하게 update나 insert 쿼리를 실행할 때에도 DB 커넥션을 구하고 종료한다.

트랜잭션

스프링 데이터 R2DBC는 리액티브를 위한 트랜잭션 관리자인 R2dbcTransactionManager를 제공한다. 이 클래스를 이용해서 트랜잭션을 제어할 수 있다. 다음은 R2dbcTransactionManager를 이용해서 트랜잭션 범위 안에서 쿼리를 실행하는 코드 작성 예이다.

R2dbcTransactionManager tm = new R2dbcTransactionManager(connectionFactory);
TransactionalOperator operator = TransactionalOperator.create(tm);

Mono<Integer> saveOrUpdate = client.execute("select id, name from member where id = :id")
        .bind("id", id)
        .fetch()
        .one()
        .flatMap(m ->
                client.execute("update member set name = :name where id = :id")
                        .bind("name", name)
                        .bind("id", id)
                        .fetch()
                        .rowsUpdated())
        .switchIfEmpty(
                client.execute("insert into member (id, name) values (:id, :name)")
                        .bind("name", name)
                        .bind("id", id)
                        .fetch()
                        .rowsUpdated()
        ).as(operator::transactional);

다음은 R2dbcTransactionManager를 스프링 빈으로 등록한 예를 보여준다.

@Configuration
@EnableTransactionManagement
public class ApplicationConfig extends AbstractR2dbcConfiguration {
    @Override
    @Bean
    public ConnectionFactory connectionFactory() {
        String url = "r2dbcs:mysql://user:user@localhost:3306/test";
        ConnectionFactory connectionFactory = ConnectionFactories.get(url);
        return connectionFactory;
    }

    @Bean
    ReactiveTransactionManager transactionManager(ConnectionFactory connectionFactory) {
        return new R2dbcTransactionManager(connectionFactory);
    }
}

@EnableTransactionManagement를 사용했는데 이 경우 @Transactional 애노테이션을 이용해서 트랜잭션을 처리할 수 있다. 예를 들어 다음 코드는 saveOrUpdate() 메서드에 @Transactional 애노테이션을 적용했는데 이 경우 saveOrUpdate() 메서드가 리턴한 Mono를 구독할 때 트랜잭션 범위에서 관련 쿼리를 실행한다.

@Service
public class MemberService {
    private DatabaseClient client;

    public MemberService(DatabaseClient client) {
        this.client = client;
    }

    @Transactional
    public Mono<Integer> saveOrUpdate(String id, String name) {
        return client.execute("select id, name from member where id = :id")
                .bind("id", id)
                .fetch()
                .one()
                .flatMap(m ->
                        client.execute("update member set name = :name where id = :id")
                                .bind("name", name)
                                .bind("id", id)
                                .fetch()
                                .rowsUpdated())
                .switchIfEmpty(
                        client.execute("insert into member (id, name) values (:id, :name)")
                                .bind("name", name)
                                .bind("id", id)
                                .fetch()
                                .rowsUpdated()
                );
    }
}

 

팀장 역할을 할 때 가장 힘든 시기 중 하나가 요즘과 같은 평가 기간이다. 흔히 성과와 역량을 평가하는데 현재 조직에서는 평가해야 할 역량 항목이 적지 않다. 

 

누구나 자신의 역량을 높게 평가하고 싶겠지만 모든 항목을 높게 받을 수는 없다. 전 역량 항목이 높다는 것은 마치 국영수 평균이 90점 이상인데 100미터를 12초에 뛰고 노래방 가수 수준에 그림도 잘 그리며 음식 또한 맛있게 하고 패션 감각이 뛰어난 데 거기에 성격 좋고 이타적인 것도 모잘라 매력적인 외모를 가진 것과 같다. 

 

역량 평가를 가능한 객관적으로 하기 위해 기대하는 수준을 기준으로 평가한다. 기대하는 수준은 직급/연차/연봉 등을 고려한 기대하는 수준을 의미한다. 즉 기대하는 수준은 일반적인 평가에서 중간 값인 B에 해당한다.

 

동일 직급/연차 대비 상대적으로 잘하는(또는 그렇게 느껴지는) 항목이 있다면 중간 값보다 한 칸 위인 A로 평가한다. 다시 말하지만 동일 직급/연차 대비 잘해야 A다. 과장한테 기대하는 바가 있는데 과장이 사원보다 잘한다 해서 해당 역량을 A로 평가할 수는 없다.

 

S는 정말 뛰어나야 줄 수 있다. 정말 뛰어나다는 건 회사에서 그 역량 하나 만큼은 최고라는 뜻이다. 그냥 좀 하네 정도가 아니다.


어떤 항목은 A로 평가하기도 C로 평가하기도 애매하다. '책임감' 같은 항목이 그렇다. 매사에 일을 대충하고 기대하는 만큼 하지 않으면 '책임감'을 C로 평가하겠지만 단순히 일을 열심히 했다고 '책임감'을 A로 평가할 수는 없다. '열정', '소통', '윤리'와 같은 항목도 비슷하다. 특별히 못하면 티가 나지만 이 역시 남보다 내가 특별히 더 잘한다고 말하기 힘든 항목이다. 내가 옆 동료보다 더 윤리적이라고 말할 수 있으려면 얼마나 윤리적이어야 하나?

 

이렇다 보니 결국 역량 평가 결과는 '기대하는 수준'에서 크게 벗어나지 않는다. A를 절반 이상 받고 나머지는 부족한 점이 없어야 A와 B 사이인 B+를 받을 수 있는데 절반 이상의 역량을 '기대하는 수준' 이상 받기란 쉬운 게 아니다. 주변 동료보다 몇몇 항목은 뛰어날 수 있지만 절반이 넘는 역량 항목에서 뛰어나기란 쉽지 않다.

 

근데 이게 문제다. 직군에 따라 항목별로 가중치라도 있어야 역량 평가에 차이가 날텐데 다수가 비슷한 점수를 받는다. 이럴거면 뭐하러 역량 평가를 하나. 평가를 하는 사람도 평가를 받는 사람도 만족할 수 없는 방식이다. 1년 농사를 망치는 기분마저 든다.

파이썬 머신러닝 완벽 가이드 책 스터디 자료

 

2장 : 사이킷런 소개

* 프로세스 기초 / 사이킷런 프레임워크 기초 / 교차 검증: KFold, Stratified

* 데이터 전처리: null 값 처리, 인코딩, 표준화, 정규화

 

파이선 ML - 2장 사이킷런.pdf
0.14MB

3장 : 평가

* 성능 지표: 정확도, 오차행렬(confusion matrix), 정밀도, 재현율, 분류 결정 임곗값

* ROC / ROC-AUC 

파이선 ML - 3장 평가.pdf
0.88MB

4장 : 분류

* 결정 트리(Decision Tree)

* 앙상블 학습 / 보팅, 배깅, 부스팅  / 하드 보팅, 소프트 보팅

* 랜덤 포레스트 / GBM / XGBoost / LightGBM

파이선 ML - 4장 분류.pdf
0.46MB

5장 : 회귀

* 선형 회귀, 다항 회귀, 규제 선형 모델(릿지, 라쏘, 엘라스틱넷)

* 로지스틱 회귀

* 회귀 트리

파이썬 ML - 5장 회귀.pdf
0.42MB

R2DBC는 커넥션 풀 라이브러리를 제공한다. 이 라이브러릴 사용하면 어렵지 않게 R2DBC에 대한 커넥션 풀을 설정할 수 있다.

의존 설정

커넥션 풀을 설정하려면 r2dbc-pool 모듈을 추가한다.

<dependency>
    <groupId>io.r2dbc</groupId>
    <artifactId>r2dbc-pool</artifactId>
    <version>0.8.0.RELEASE</version>
</dependency>

URL로 커넥션 풀 사용하기

커넥션 풀을 사용하는 가장 쉬운 방법은 URL에 pool을 사용하는 것이다.

String url = "r2dbcs:pool:mysql://root:1@localhost:3306/test";
ConnectionFactory connectionFactory = ConnectionFactories.get(url);

Mono<Integer> updatedMono = Mono.from(connectionFactory.create())
        .flatMap(conn -> {
            Mono<Void> txMono = Mono.from(conn.beginTransaction());
            ...
            return updMono.delayUntil(ret -> conn.commitTransaction())
                    .onErrorResume(err -> Mono.from(conn.rollbackTransaction())
                                              .then(Mono.error(err)))
                    .doFinally(signal -> Mono.from(conn.close()).subscribe());
        });

URL의 드라이버 위치에 "pool"을 사용했고 나머지는 동일하다. URL을 사용하면 초기 크기나 최대 크기 기본값으로 10을 사용한다.

ConnectionFactoryOptions로 커넥션 풀 사용하기

커넥션 풀의 초기 크기, 최대 크기, 검증 쿼리를 직접 제어하고 싶다면 ConnectionFactoryOptions를 이용해서 ConnectionFactory를 생성하면 된다. 아래 코드는 사용 예이다.

ConnectionFactory connectionFactory = ConnectionFactories.get(ConnectionFactoryOptions.builder()
        .option(ConnectionFactoryOptions.SSL, true)
        .option(ConnectionFactoryOptions.DRIVER, "pool")
        .option(ConnectionFactoryOptions.PROTOCOL, "mysql")
        .option(ConnectionFactoryOptions.HOST, "localhost")
        .option(ConnectionFactoryOptions.PORT, 3306)
        .option(ConnectionFactoryOptions.USER, "root")
        .option(ConnectionFactoryOptions.PASSWORD, "1")
        .option(ConnectionFactoryOptions.DATABASE, "test")
        .option(Option.<Integer>valueOf("initialSize"), 5)
        .option(Option.<Integer>valueOf("maxSize"), 20)
        .option(Option.<String>valueOf("validationQuery"), "select 1+1")
        .build());

DRIVER 옵션 값은 "pool"로 지정하고 PROTOCOL 옵션에 실제 드라이버를 지정한다.

ConnectionPool 클래스로 커넥션 풀 사용하기

커넥션 풀에 대해 보다 상세한 설정을 하고 싶다면 ConnectionPool 클래스를 이용해서 커넥션 풀을 생성한다. 다음은 사용 예를 보여준다.

String url = "r2dbcs:mysql://root:1@localhost:3306/test";
ConnectionFactory connectionFactory = ConnectionFactories.get(url);

ConnectionPoolConfiguration configuration = ConnectionPoolConfiguration.builder(connectionFactory)
        .initialSize(5)
        .maxSize(20)
        .maxIdleTime(Duration.ofMinutes(10))
        .maxLifeTime(Duration.ofMinutes(60))
        .maxCreateConnectionTime(Duration.ofMillis(500))
        .maxAcquireTime(Duration.ofMillis(500))
        .validationDepth(ValidationDepth.LOCAL)
        .validationQuery("select 1+1")
        .name("POOL 01")
        .build();

ConnectionPool pool = new ConnectionPool(configuration);

Mono<Integer> updatedMono = Mono.from(pool.create())
        .flatMap(conn -> {
            Mono<Void> txMono = Mono.from(conn.beginTransaction());
            ...
            return updMono.delayUntil(ret -> conn.commitTransaction())
                .onErrorResume(err -> Mono.from(conn.rollbackTransaction())
                                          .then(Mono.error(err)))
                .doFinally(signal -> Mono.from(conn.close()).subscribe());
        });

ConnectionPoolConfiguration은 커넥션 풀 설정 정보를 담는다. ConnectionPoolConfiguration.builder() 메서드는 커넥션을 생성할 때 사용할 ConnectionFactory를 인자로 받으며 이후 initialSize(), maxSize() 등의 메서드를 이용해서 커넥션 풀을 설정한다.

 

설정한 ConnectionPoolConfiguration을 이용해서 ConnectionPool 객체를 생성한 뒤 ConnectionFactory 대신에 ConnectionPool을 이용해서 커넥션을 구하면 된다.

R2DBC(Reactive Relational Database Connectivity)는 SQL 데이터베이스를 위한 리액티브 API이다. 리액티브 스트림즈를 기반으로 SQL을 실행하는데 필요한 커넥션, 쿼리 실행, 트랜잭션 처리 등에 대한 API를 정의하고 있다. JDBC API처럼 R2DBC는 API만 정의하고 있다. r2dbc-spi 모듈이 SPI(service-provider interface)로서 각 드라이버는 SPI에 정의된 인터페이스를 알맞게 구현한다. 이 글을 쓰는 시점에서 R2DBC 버전은 0.8.0이며 스펙, 드라이버 구현 등 관련 문서는 https://r2dbc.io/ 사이트에서 확인할 수 있다.

 

* 이 글에서는 리액티브 스트림즈에 대한 내용은 설명하지 않는다. Publisher나 Subscriber에 대한 내용은 https://javacan.tistory.com/entry/Reactor-Start-1-RS-Flux-Mono-Subscriber 글을 참고한다.

의존 설정

R2DBC를 사용하려면 구현을 제공하는 드라이버가 필요하다. 예제에서는 r2dbc-mysql 드라이버를 사용한다.

<dependency>
    <groupId>dev.miku</groupId>
    <artifactId>r2dbc-mysql</artifactId>
    <version>0.8.0.RELEASE</version>
</dependency>

r2dbc-mysql 0.8.0은 r2dbc-spi 0.8.0 버전을 지원하며 r2dbc-mysql은 스프링 리액터와 네티를 이용해서 구현되어 있다.

주요 구성 요소

r2dbc를 이용해서 SQL을 실행하기 위해 사용하는 주요 구성 요소는 다음과 같다. 모두 io.r2dbc.spi 패키지에 위치하며ConnectionFactories를 제외한 나머지는 인터페이스이다.

 

  • ConnectionFactory : Connection을 생성하는 팩토리
  • ConnectionFactories : ConnectionFactory를 검색해서 제공하는 유틸리티
  • Connection : 데이터베이스에 대한 커넥션
  • Statement : 실행할 SQL
  • Batch : 배치로 실행할 SQL
  • Result : 쿼리 실행 결과

ConnectionFactory 구하기

쿼리를 실행하려면 커넥션을 먼저 구해야 한다. R2DBC는 ConnectionFactory를 이용해서 커넥션을 구할 수 있다. 드라이버이가 제공하는 ConnectionFactory를 직접 생성하거나 ConnectionFactories를 이용해서 ConnectionFactory를 구할 수도 있다. 참고로 ConnectionFactories는 자바 서비스 프로바이더를 이용해서 ConnectionFactory를 찾는다.

 

다음 코드는 ConnectionFactories.get() 메서드를 이용해서 ConnectionFactory를 구하는 코드 예이다.

String url = "r2dbcs:mysql://user:pw@localhost:3306/test";
ConnectionFactory connectionFactory = ConnectionFactories.get(url);

JDBC URL과 비슷하게 R2DBC도 URL을 이용해서 연결할 DB 정보를 지정한다. 다음은 URL 구성 요소를 보여준다.

scheme:driver:protocol://authority/path?query
  • scheme : URL이 유효한 R2DBC URL임을 지정한다. 유효한 스킴은 r2dbc와 r2dbcs(SSL 용)이다.
  • driver : 드라이버를 지정한다.
  • protocol : 드라이버에 따라 프로토콜 정보를 지정한다(선택).
  • authority : 접속할 DB와 인증 정보를 포함한다.
  • path : 초기 스카마나 데이터베이스 이름을 지정한다(선택).
  • query : 추가 설정 옵션을 전달할 때 사용한다(선택).

각 드라이버마다 URL 값이 다르므로 드라이버 문서를 참고한다.

Connection을 구하고 쿼리 실행하기

다음 코드는 Connection을 구하고 쿼리를 실행해서 원하는 결과를 출력하는 예이다.

CountDownLatch latch = new CountDownLatch(1);

ConnectionFactory connectionFactory = ConnectionFactories.get(url);

Publisher<? extends Connection> connPub = connectionFactory.create();
connPub.subscribe(new BaseSubscriber<Connection>() {
    @Override
    protected void hookOnNext(Connection conn) {
        Statement stmt = conn.createStatement("select id, name from member where name = ?name");
        stmt.bind("name", "최범균");
        Publisher<? extends Result> resultPub = stmt.execute();
        resultPub.subscribe(new BaseSubscriber<Result>() {
            @Override
            protected void hookOnNext(Result result) {
                Publisher<Member> memberPub = result.map((row, meta) -> 
                    new Member(row.get(0, String.class), row.get(1, String.class))
                );
                memberPub.subscribe(new BaseSubscriber<Member>() {
                    @Override
                    protected void hookOnNext(Member member) {
                        logger.info("회원 데이터 : {}", member);
                    }
                });
            }

            @Override
            protected void hookFinally(SignalType type) {
                conn.close().subscribe(new BaseSubscriber<Void>() {
                    @Override
                    protected void hookFinally(SignalType type) {
                        latch.countDown();
                    }
                });
            }
        });
    }
});

latch.await(); // latch.countDown() 실행 전까지 블록킹

코드가 다소 복잡하다. R2DBC의 주요 API는 리액티브 스트림의 Publisher 타입을 리턴하는데 그 타입을 그대로 사용해서 다소 복잡해졌다. Publisher#subscribe() 메서드에 전달할 Subscriber 구현 객체는 BaseSubscriber를 이용해서 생성했다. BaseSubscriber는 스프링 리액터가 제공하는 Subscriber 구현 클래스로 필요한 기능만 구현하기 위해 이 클래스를 사용했다.

 

먼저 Connection을 구하고 Statement를 생성하는 코드만 보자.

ConnectionFactory connectionFactory = ConnectionFactories.get(url);

final Publisher<? extends Connection> connPub = connectionFactory.create();
connPub.subscribe(new BaseSubscriber<Connection>() {
    @Override
    protected void hookOnNext(Connection conn) {
        Statement stmt = conn.createStatement("select id, name from member where name = ?name");
        stmt.bind("name", "최범균");
        Publisher<? extends Result> resultPub = stmt.execute();
        resultPub.subscribe(...);
    }
});

ConnectionFactory#create() 메서드가 생성한 Publisher는 연결에 성공하면 next 신호로 Connection을 보낸다. Subscriber는 onNext() 메서드로 Connection을 받아 쿼리를 실행한다(BaseSubscriber를 사용한 경우 hookOnNext() 메서드 사용).

 

쿼리를 실행하기 위한 Statement는 Connection#createStatement() 메서드로 생성한다. createStatement() 메서드는 실행할 쿼리를 입력받는다. JDBC의 Statement와 PreparedStatement가 R2DBC에서는 Statement 하나로 처리한다.

 

Statement#bind() 메서드를 이용해서 바인딩 파라미터에 값을 전달한다. 다음은 Statement가 제공하는 바인딩 파라미터 관련 메서드이다.

  • Statement bind(int index, Object value)
  • Statement bind(String name, Object value)
  • Statement bindNull(int index, Class<?> type)
  • Statement bindNull(String name, Class<?> type)

한 가지 주의할 점은 쿼리에서 사용하는 바인딩 파라미터가 JDBC와 다르다는 점이다. JDBC의 PreparedStatement는 물음표(?)를 이용해서 바인딩 파라미터를 지정하고 1부터 시작하는 인덱스를 사용한다. 반면에 DBMS가 사용하는 바인딩 파라미터를 사용하며 이름이나 0부터 시작하는 인덱스를 사용할 수 있다. 예를 들어 MySQL은 위 코드에서 보는 것처럼 바인딩 파라미터로 "?이름" 형식을 사용한다.

 

Statement#execute() 메서드는 Publisher<Result> 타입을 리턴한다. Result는 쿼리 실행 결과를 제공한다. Result가 제공하는 다음의 두 메서드를 이용해서 쿼리 실행 결과를 구할 수 있다.

public interface Result {
    // 변경된 행 개수를 리러탄한다.
    Publisher<Integer> getRowsUpdated();
    // 조회 결과(Row)를 변환한다.
    <T> Publisher<T> map(BiFunction<Row, RowMetadata, ? extends T> mappingFunction);
}

Row는 조회한 한 행에 대응한다. Row는 행의 데이터를 조회하기 위한 get() 메서드를 제공한다.

  • T get(int index, Class<T> type) : JDBC API와 달리 index는 0부터 시작한다. 
  • T get(String name, Class<T> type)

예제 코드는 SELECT 쿼리 실행 결과를 처리하므로 map() 메서드를 이용해서 조회환 결과를 Member 객체로 변환했다. 

Publisher<? extends Result> resultPub = stmt.execute();
resultPub.subscribe(new BaseSubscriber<Result>() {
    @Override
    protected void hookOnNext(Result result) {
        Publisher<Member> memberPub = result.map((row, meta) ->
                new Member(row.get(0, String.class), row.get(1, String.class))
        );
        memberPub.subscribe(new BaseSubscriber<Member>() {
            @Override
            protected void hookOnNext(Member member) {
                logger.info("회원 데이터 : {}", member);
            }
        });
    }

    @Override
    protected void hookFinally(SignalType type) {
        conn.close().subscribe(...);
    }
});

DB 작업이 끝나면 Connection#close() 메서드로 커넥션을 종료해야 한다. close() 메서드는 Publisher<Void>를 리턴하므로 실제 커넥션 종료는 close() 메서드가 리턴한 Publisher를 구독해야 실행된다.

 

리액터가 제공하는 BaseSubscriber는 complete 신호나 error 신호가 오면 hookFinally()를 실행하는 기능을 구현하고 있으므로 에러 여부에 상관없이 커넥션을 종료하기 위해 이 메서드를 이용했다.

Connection을 구하고 쿼리 실행하기 : 리액터 이용

Publisher를 이용하면 구독 처리 코드가 중첩되어 복잡해진다. 중첩 구조는 리액터를 사용해서 없앨 수 있다. 다음 코드는 리액터를 사용해서 구현현 예이다.

Mono<? extends Connection> connMono = Mono.from(connectionFactory.create());

Flux<Member> members = connMono.flatMapMany(conn -> {
        Flux<? extends Result> resultFlux = Flux.from(
                conn.createStatement("select id, name from member where name like ?name")
                        .bind("name", "%범%")
                        .execute()
        );

        Flux<Member> memberFlux = resultFlux.flatMap(result ->
                result.map((row, meta) ->
                        new Member(row.get("id", String.class), row.get("name", String.class))
                )
        );

        return memberFlux.doFinally(signal -> Mono.from(conn.close()).subscribe());
    }
);

List<Member> ret = members.collectList().block();

트랜잭션 처리

Connection은 트랜잭션 처리를 위해 다음 메서드를 제공한다.

  • Publisher<Void> beginTransaction()
  • Publisher<Void> commitTransaction()
  • Publisher<Void> rollbackTransaction()

다음은 트랜잭션 처리 예를 보여준다.

Mono<Integer> updatedMono = Mono.from(connectionFactory.create())
    .flatMap(conn -> {
        final Mono<Void> txMono = Mono.from(conn.beginTransaction());
        final Mono<Integer> updMono = txMono.then(
                Mono.from(conn.createStatement("insert into member values (?id, ?name)")
                        .bind("id", "bkchoi4")
                        .bind("name", "최범균2")
                        .execute()
                ).flatMap(
                        result -> Mono.from(result.getRowsUpdated())
                )
        );
        return updMono.delayUntil(ret -> conn.commitTransaction())
            .onErrorResume(err -> Mono.from(conn.rollbackTransaction()).then(Mono.error(err)))
            .doFinally(signal -> Mono.from(conn.close()).subscribe());
    });

트랜잭션 커밋/롤백 처리를 위해 리액터의 delayUntil(), onErrorResume() 메서드를 사용했다. 참고로 delayUntil() 메서드는 인자로 받은 Publisher가 종료된 뒤에 종료하며 onErrorResume() 메서드는 에러 발생시 인자로 받은 Publisher를 실행한다. 트랜잭션 롤백 처리 후에 Mono.error()를 이용해서 에러를 다시 발생시키도록 했다.

 

이전 글(스프링 웹플럭스(WebFlux) SSE(Server Sent Event) 구현 1)에서 작성한 SSE 예제는 현실적이지 않다. 서로 다른 브라우저에서 /stocks/123 으로 연결하면 서로 다른 값이 표시된다. 실제라면 둘 다 같은 값을 출력해야 한다. 그래서 이 글에서는 조금 더 현실적인 예를 만들어보려 한다. 여기서 만들 예는 사용자 ID별로 데이터를 전송하는 기능이다.

여기서 구현할 클래스는 세 개다.

  • UserChannel : 사용자 ID별로 메시지를 보내기 위한 채널이다. Flux를 이용해서 메시지를 전송한다.
  • UserChannels : 사용자 ID에 대한 UserChannel을 관리한다.
  • UserChannelApi : SSE를 사용자 ID 별로 메시지를 전송하는 기능을 제공한다.

UserChannel 클래스

UserChannel 클래스는 다음과 같다.

public class UserChannel {
    private Flux<String> flux;
    private FluxSink<String> sink;
    private UnicastProcessor<String> processor;

    public UserChannel() {
        processor = UnicastProcessor.create();
        this.sink = processor.sink();
        this.flux = processor.share();
    }

    public void send(String message) {
        sink.next(message);
    }

    public Flux<String> toFlux() {
        return flux;
    }
}

하나의 데이터를 여러 클라이언트에 전송하기 위해 UnicastProcessor와 Flux#share()를 사용해서 Flux를 생성했다. 이 코드는 데이터를 전송하기 위해 UnicastProcessor#sink()로 구한 FluxSink를 사용한다.

send() 메서드는 FluxSink#next() 메서드를 이용해서 데이터를 Flux에 데이터를 보낸다.

UserChannels 클래스

UserChannels 클래스는 UserChannel을 맵으로 관리한다.

public class UserChannels {
    private ConcurrentHashMap<Long, UserChannel> map = new ConcurrentHashMap<>();

    public UserChannel connect(Long userId) {
        return map.computeIfAbsent(userId, key -> new UserChannel());
    }

    public void send(Long userId, String message) {
        UserChannel userChannel = map.get(userId);
        if (userChannel != null) {
            userChannel.send(message);
        }
    }
}

UserChannelApi 클래스

UserChannelApi가 제공하는 API는 두 개다.

  • GET /channels/users/{userId}/messages : 특정 사용자의 메시지를 수신하기 위한 SSE 구현
  • POST /channels/users/{userId}/messages : 특정 사용자에 메시지를 보내기 위한 API

다음은 두 API의 구현이다.

@RestController
public class UserChannelApi {
    private UserChannels channels = new UserChannels();

    @GetMapping("/channels/users/{userId}/messages")
    public Flux<ServerSentEvent<String>> connect(
            @PathVariable("userId") Long userId) {
        return channels.connect(userId) // UserChannel 리턴
            .toFlux() // Flux 리턴
            .map(str -> ServerSentEvent.builder(str).build());
    }

    @PostMapping(path = "/channels/users/{userId}/messages", 
                 consumes = MediaType.TEXT_PLAIN_VALUE)
    public void send(@PathVariable("userId") Long userId, 
                     @RequestBody String message) {
        channels.send(userId, message);
    }
}

connect() 메서드는 클라이언트를 특정 사용자 채널에 연결하고 채널의 메시지를 ServerSentEvent로 변환해서 클라이언트에 전송한다.

send() 메서드는 특정 사용자 채널에 메시지를 전송한다. @PostMapping 애노테이션의 consumes 값이 text/plain이므로 HTTP 요청 몸체로 받은 값을 문자열로 처리한다.

실행

구현을 했으니 동작을 확인할 차례다. 서버를 구동하고 크롬과 같이 SSE를 지원하는 브라우저를 두 개 띄운다. 그리고 Talend API Tester나 Postman 등 send() API를 호출하기 위한 도구를 준비한다. 참고로 서버 포트는 18080으로 변경했다.

첫 번째 브라우저에서 http://localhost:18080/channels/users/1/messages 에 접속한다. 그리고 POST로 데이터를 두 개 전송한다. 그러면 첫 번째 브라우저에 POST로 전송한 두 문자열이 표시되는 것을 확인할 수 있다.

두 번째 브라우저에서 접속한 뒤에 다시 POST로 메시지를 몇 개 더 전송한다. 아래 그림과 같이 첫 번째 브라우저와 두 번째 브라우저가 동일 데이터를 수신한 것을 확인할 수 있다.

관련 링크

Server Sent Event, 줄여서 SSE는 웹 서버에서 웹 브라우저로 이벤트를 푸시하고 싶을 때 유용하게 사용할 수 있다. 스프링 웹플럭스를 사용하면 간단하게 SSE를 구현할 수 있다. 이 글에서는 간단한 예를 이용해서 스프링 웹플럭스로 SSE를 어떻게 구현하는지 살펴보자.

[참고] 스프링 리액터에 대한 내용은 스프링 리액터 Reactor 기초 글 목록 글을 참고한다.

프로젝트 생성

먼저 스프링 부트 프로젝트를 생성한다. 이 글에서는 메이븐 프로젝트를 예로 사용했으며 pom.xml 파일은 다음과 같다. 웹플럭스 사용을 위해 spring-boot-starter-webflux를 추가했다.

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.2.1.RELEASE</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>
    <groupId>sse-demo</groupId>
    <artifactId>sse-demo</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <name>sse-demo</name>
    <description>Demo project for Spring Boot</description>

    <properties>
        <java.version>13</java.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-thymeleaf</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-webflux</artifactId>
        </dependency>

        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <optional>true</optional>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-devtools</artifactId>
            <optional>true</optional>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>

</project>

SSE 서버 구현

SSE 서버 구현은 간단하다. 요청 처리 메서드에서 ServerSentEvent를 제공하는 Flux를 리턴하기만 하면 된다. 다음은 구현 예이다.

@RestController
public class SimpleSSEApi {

    @GetMapping("/stocks/{code}")
    public Flux<ServerSentEvent<Stock>> stocks(@PathVariable("code") String code) {
        return Flux.interval(Duration.ofSeconds(1))
                .map(t -> Stock.builder()
                        .code(code)
                        .value(randomValue())
                        .build())
                .map(stock -> ServerSentEvent.builder(stock).build());
    }

    private int randomValue() {
        return ThreadLocalRandom.current().nextInt(1000) + 10000;
    }
}

이 코드에서 stocks()가 리턴하는 Flux는 1초 간격으로 ServerSentEvent를 제공한다. ServerSentEvent.builder() 메서드는 클라이언트에 전송할 이벤트 데이터를 받는다. 위 코드에서는 임의의 value 값을 갖는 Stock 객체를 이벤트 데이터로 사용했다.

자바 스크립트 구현

자바 스크립트는 EventSource를 이용해서 서버가 보내는 이벤트를 수신할 수 있다. 먼저 첫 화면 요청을 처리할 컨트롤러를 만든다. 이 컨트롤러는 '/' 요청이 오면 뷰로 index를 사용한다.

@Controller
public class IndexController {
    @GetMapping("/")
    public String index() {
        return "index";
    }
}

다음은 index 뷰로 사용할 index.html 파일이다. src/main/resxources/templates 폴더에 위치한 thymeleaf 파일이다.

<!DOCTYPE html>
<html lang="ko">
<head>
    <meta charset="utf-8">
    <title>단순 SSE</title>
</head>
<body>
    <div id="stockValue">
    </div>

    <script src="https://code.jquery.com/jquery-3.4.1.slim.min.js"></script>
    <script type="text/javascript">
        var source = null;
        function start() {
            source = new EventSource("/stocks/1234");
            console.log("create EventSource");
            source.onmessage = function(ev) {
                console.log("on message: ", ev.data);
                $("#stockValue").text(ev.data);
            };
            source.onerror = function(err) {
                console.log("on err: ", err);
                stop();
            };
        }
        function stop() {
            if (source != null) {
                source.close();
                console.log("close EventSource");
                source = null;
            }
        }

        $(document).ready(function(){
            start();
        });
        $(window).on("unload", function () {
            stop();
        });
    </script>
</body>

</html>

EventSource는 SSE를 위한 표준 API이다. EventSource 객체를 생성할 때 SSE를 받을 서버 경로를 입력한다. 위 코드에서는 /stocks/1234를 경로로 지정했다.

EventSource#onmessage는 서버가 이벤트를 전송할 때 불린다. onmessage에 전달한 이벤트 처리 함수는 서버가 이벤트를 전송할 때마다 인자로 이벤트를 받는다. 서버가 전송한 데이터는 이벤트 객체의 data 속성에 담긴다. 예제에서는 서버가 전송한 데이터를 stockValue 영역에 표시하게 구현했다.

EventSource#onerror는 에러가 발생했을 때 불린다. 예제에서는 에러가 발생하면 EventSource#close()를 이용해서 연결을 끊도록 했다.

실행 결과

mvnw spring-boot:run 명령어로 서버를 실행한 뒤에 http://localhost:8080으로 접속해보자. 그러면 1초마다 stockValue 영역의 값이 바뀌는 것을 볼 수 있다.

서버의 로그 레벨을 debug로 변경하면 1초 간격으로 메시지를 생성하는 로그를 볼 수 있다.

2019-12-06 08:10:22.609 DEBUG 37036 --- [     parallel-5] o.s.http.codec.json.Jackson2JsonEncoder  : [0a3a9b2d] Encoding [ssedemo.Stock@7769d30b]
2019-12-06 08:10:23.609 DEBUG 37036 --- [     parallel-5] o.s.http.codec.json.Jackson2JsonEncoder  : [0a3a9b2d] Encoding [ssedemo.Stock@5efe1c65]
2019-12-06 08:10:24.609 DEBUG 37036 --- [     parallel-5] o.s.http.codec.json.Jackson2JsonEncoder  : [0a3a9b2d] Encoding [ssedemo.Stock@5eb98269]
2019-12-06 08:10:25.609 DEBUG 37036 --- [     parallel-5] o.s.http.codec.json.Jackson2JsonEncoder  : [0a3a9b2d] Encoding [ssedemo.Stock@67b3812f]
2019-12-06 08:10:26.609 DEBUG 37036 --- [     parallel-5] o.s.http.codec.json.Jackson2JsonEncoder  : [0a3a9b2d] Encoding [ssedemo.Stock@5e7be759]
2019-12-06 08:10:27.609 DEBUG 37036 --- [     parallel-5] o.s.http.codec.json.Jackson2JsonEncoder  : [0a3a9b2d] Encoding [ssedemo.Stock@70c8d5c7]
2019-12-06 08:10:28.609 DEBUG 37036 --- [     parallel-5] o.s.http.codec.json.Jackson2JsonEncoder  : [0a3a9b2d] Encoding [ssedemo.Stock@6270bb7d]

 

정리

이 글에서는 간단하게 스프링 웹 플럭스로 SSE를 구현하는 방법을 살펴봤는데 다음 글(스프링 웹플럭스(WebFlux) SSE(Server Sent Event) 구현 2)에서는 조금 더 현실적인 예제를 만들어보자.

참고자료

  1. ㅇㅇ 2020.03.18 23:33

    깃헙에 있나요 ?

출처: H2 DB 홈페이지

테스트 실행 시간을 단축하기 위해 인메모리 DB인 H2 DB를 종종 사용한다. JDBC URL에서 MODE 프로퍼티를 지정하면 MySQL, 오라클 용 쿼리를 사용할 수 있어 편리한 점도 있다. 예를 들어 다음과 같이 MODE 값으로 MySQL을 주면 H2 DB를 사용하면서 MySQL의 limit을 사용할 수 있다.

jdbc:h2:mem:progress;DATABASE_TO_UPPER=false;MODE=MySQL;DB_CLOSE_DELAY=-1

 

MODE를 사용해도 특정 DB에 특화된 기능을 사용할 수 없을 때가 발생한다. 예를 들면 MySQL의 inet_aton() 함수나 inet_ntoa() 함수는 H2 DB에서는 사용할 수 없다. 통합 테스트 코드를 실행할 때 H2 DB에서 지원하지 않는 함수를 쿼리에서 사용하면 에러가 발생하기 때문에 이런 함수를 대체할 수 있는 수단을 찾아야 한다.

다행히 H2는 사용자 정의 함수 기능을 지원해서 이 문제를 어렵지 않게 해결할 수 있다. 사용자 정의 함수 기능을 사용하려면 다음 두 가지 작업만 하면 된다.

  • 사용자 정의 함수에서 호출할 정적 메서드 구현
  • create alias를 이용한 커스텀 함수 생성

먼저 커스텀 함수에서 호출할 정적 메서드를 구현한다.

package util;

public class H2CustomFunc {
    public static Long inet_aton(String value) {
        if (value == null) return null;
        else return ...생략;
    }

    public static String inet_ntoa(Long value) {
        if (value == null) return null;
        else return ...생략;
    }
}

다음은 인메모리 H2에 테이블을 생성할 때 다음 쿼리를 실행한다.

CREATE ALIAS IF NOT EXISTS inet_ntoa FOR "util.inet_ntoa"
CREATE ALIAS IF NOT EXISTS inet_aton FOR "util.inet_aton"

이제 H2에 연동할 때 inet_ntoa() 함수나 inet_aton() 함수를 사용할 수 있다.

return jdbcTemplate.query("select " +
                "uid, content_div, use_yn, " +
                "view_start_dt, view_end_dt, " +
                "settings, memo, " +
                "reg_id, reg_dt, inet_ntoa(reg_ip) as reg_ip, " +
                "upd_id, upd_dt, inet_ntoa(upd_ip) as upd_ip " +
                "from progress.featured_contents_mgt " +
                "order by uid desc limit ?, ?",
        JdbcFeaturedContentsRepository::mapRow,
        start, limit);

사용자 정의 함수에 대한 내용은 자세한 내용은 아래 문서를 참고한다.

 

예전에 '아이들이 열중하는 수업에는 법칙이 있다'는 책을 읽다가 너무 와 닿는 글귀가 있어 사진으로 남겨둔 적이 있다. 아래가 그 사진이다.

출처: 아이들이 열중하는 수업에는 법칙이 있다

책에서도 언급하고 있지만 이 글은 당연히 교사뿐만 아니라 개발자에게도 적용된다. 종종 경력이 쌓이고 프로젝트를 경험하면 실력이 는다고 말하는 이가 있지만 이는 어디까지나 신입일 때 얘기다. 이런 식으로는 실력 향상에 한계가 있다. 조금만 시간이 지나도 더 이상 실력이 늘지 않고 정체된다.

이 업계에서 정체는 곧 후퇴다. 기량 향상을 위해 노력하지 않는 것은 본인의 선택이지만 정체는 곧 후퇴라는 것은 잊지 말자.

도커 시작하기 전체 글 목록

  1. GetLight 2020.02.29 14:17 신고

    감사합니다.
    도커 공부 하는데 많은 도움을 받았습니다.

스웜과 오버레이 네트워크

docker network ls 명령어는 네트워크 목록을 보여준다. 스웜에 스택을 배포했다면 범위가 swarm인 오버레이 네트워크를 볼 수 있다. 오버레이 네트워크는 서로 다른 노드에 생성된 컨테이너 간 연결을 처리한다.

$ docker network ls
NETWORK ID          NAME                DRIVER              SCOPE
c6ef5b2ea943        bridge              bridge              local
da6c7575bf20        docker_gwbridge     bridge              local
b30ace236245        host                host                local
c8c1e9h5214o        ingress             overlay             swarm
vwes54i1ysyh        simple_default      overlay             swarm

오버레이 네트워크 중 ingress 네트워크는 호스트(노드)에서 서비스로의 포워딩을 담당한다. 외부에 포트를 공개한 서비스를 스웜에 배포하면 ingress 네트워크에 서비스를 참여시키고 다음의 두 IP를 할당한다.

  • 서비스에 대한 가상 IP
  • 서비스의 각 컨테이너에 대한 IP

각 노드는 외부에 개시된 서비스 포트로 요청이 오면 ingress 네트워크의 가상 IP로 전달한다. 가상 IP는 ingress 네트워크에서 서비스에 접근할 때 사용할 IP이다. 가상 IP에 전달된 요청은 다시 컨테이너의 IP로 전달된다. ingress 네트워크에 참여한 서비스의 각 컨테이너는 ingress 네트워크 내에서 고유 IP를 갖는다.

docker network inspect ingress 명령어 실행하면 ingress 네트워크 내에 생성된 서비스의 VIP와 각 컨테이너에 할당된 IP를 확인할 수 있다.

스웜에 스택을 배포하면 스택을 위한 오버레이 네트워크도 생긴다. 다음 컴포즈 파일을 보자.

version: "3.6"
services:
  mysql:
    image: mysql:5.7
    environment:
      - MYSQL_ROOT_PASSWORD=rootpw
    deploy:
      replicas: 1
  adminer:
    image: adminer
    ports:
      - "8080:8080"
    deploy:
      replicas: 1

이 파일에는 네트워크 설정이 없다. 이 경우 스웜은 스택을 위한 네트워크를 생성한다. 이때 네트워크 이름은 "스택명_default"가 된다. 예를 들어 위 설정을 이용해서 이름이 dbadmin인 스택을 배포하면 dbadmin_default인 네트워크를 생성한다.

스택을 위한 오버레이 네트워크는 스택에 속한 서비스가 서로 통신할 때 사용된다. 위 설정의 경우 mysql 서비스와 adminer 서비스가 서로 통신할 때 스택을 위한 오버레이 네트워크를 사용하게 된다.

오버레이 네트워크와 서비스

default 이름 대신에 직접 네트워크 이름을 지정할 수 있다. 컴포즈 파일에 networks 키를 이용해서 사용할 네트워크를 지정하면 된다. 다음은 설정 예이다.

version: "3.6"
services:
  mysql:
    image: mysql:5.7
    environment:
      - MYSQL_ROOT_PASSWORD=rootpw
    networks:
      internal:
        aliases:
          - db
    deploy:
      replicas: 1
  adminer:
    image: adminer
    ports:
      - "8080:8080"
    networks:
      - internal
    deploy:
      replicas: 1

networks:
  internal:

가장 하단에 networks 키는 네트워크 이름 목록을 값으로 갖는다. 위 설정에서는 이름이 internal인 네트워크만 설정했다. 여기서 internal은 오버레이 네트워크다.

서비스 설정에 networks 키 값으로 internal을 추가하면 해당 서비스는 internal 네트워크에 묶인다. 위 설정은 mysql 서비스와 adminer 서비스를 둘 다 internal 네트워크에 연결했다.

aliases는 네트워크 내에서 서비스를 참조할 때 사용할 별칭을 추가한다. mysql 설정은 db를 별칭으로 추가했다. 같은 네트워크를 사용하는 서비스는 서비스 이름과 별칭을 사용해서 다른 서비스에 연결할 수 있다. 위 설정에서 adminer 서비스의 컨테이너는 'mysql'이나 'db'를 이용해서 mysql 서비스에 연결할 수 있다. 실제 서비스 이름인 '스택명_mysql'로도 접근할 수 있다. 물론 다른 네트워크에서는 이 이름들로 접근할 수 없다.

외부 오버레이 네트워크 사용

각 스택의 서비스를 같은 네트워크에 참여시키고 싶다면 오버레이 네트워크를 스웜 수준에서 생성하고 스택에서 이 네트워크를 참조하면 된다. 오버레이 네트워크를 생성할 때는 -d (--driver) 옵션 값으로 overlay를 지정하면 된다.

$ docker network create -d overlay service-net

컴포즈 파일에서는 external 옵션을 true로 지정해서 해당 네트워크가 스택 외부 자원임을 명시한다.

version: "3.6"
services:
  mysql:
    image: mysql:5.7
    environment:
      - MYSQL_ROOT_PASSWORD=rootpw
    networks:
      service-net:
        aliases:
          - db
    deploy:
      replicas: 1
  adminer:
    image: adminer
    ports:
      - "8080:8080"
    networks:
      service-net:
        aliases:
          - web
    deploy:
      replicas: 1

networks:
  service-net:
    external: true

위 설정을 이용해서 스택을 생성하면 스택의 서비스는 service-net 네트워크에 참여한다. service-net에 참여하는 서비스는 서비스 이름과 alias를 이용해서 각 서비스에 연결할 수 있다.

 

 

 

 

컴포즈 파일과 스택

컴포즈 파일을 이용하면 서비스를 보다 쉽게 만들고 업데이트할 수 있다. 다음 컴포즈 파일을 보자.

version: "3.6"
services:
  web:
    image: madvirus/simplenode:0.1
    ports:
      - "5000:5000"
    deploy:
      mode: replicated
      replicas: 2
      update_config:
        parallelism: 1
        order: start-first
        delay: 10s
        failure_action: rollback
    healthcheck:
      test: ["CMD", "curl", "-f", "http://localhost:5000/health"]
      interval: 10s
      timeout: 5s
      retries: 3
      start_period: 10s

컴포즈 파일은 YAML을 사용해서 작성한다. 최상위 services는 서비스 목록을 정의한다. services의 바로 아래는 정의할 서비스 이름이 온다. 위 설정의 경우 이름이 web인 서비스를 정의한다. 실제 생성되는 서비스의 이름은 web이 아니라 스택 이름과 조합한 이름을 사용한다. 스택은 뒤에서 설명한다.

서비스 이름 아래 표시한 속성은 도커 서비스를 생성할 때 사용한 옵션과 이름이 유사하다. 서비스를 생성할 때 사용한 옵션과 유사한 이름을 사용한다. 각 속성의 값은 서비스를 설명할 때 사용한 옵션과 동일한 값을 갖는다.

컴포즈 파일에 대한 내용은 https://docs.docker.com/compose/compose-file/ 문서를 참고한다.

컴포즈 파일을 작성했다면 이제 스택을 배포할 차례다. 위 파일이 stack01.yml이라고 할 경우 다음 명령어를 이용해서 스택을 배포할 수 있다. docker stack deploy 명령어는 컴포즈 파일을 이용해서 새로운 스택을 배포한다. -c(또는 --compose-file) 옵션은 사용할 컴포즈 파일을 지정한다. 명령어에서 마지막의 simple은 배포할 스택의 이름이다.

$ docker stack deploy -c stack01.yml simple
Creating network simple_default
Creating service simple_web

스택을 배포하면 simple_web 서비스를 만든다. simple_web에서 simple은 스택 이름이고 web은 컴포즈 파일에서 지정한 서비스 이름이다. 스택을 배포하면 docker stack ls 명령어로 배포한 스택을 확인할 수 있다.

$ docker stack ls
NAME                SERVICES            ORCHESTRATOR
simple              1                   Swarm

실제로 docker service ps simple_web 명령어를 실행하면 simple_web 서비스에 두 개의 컨테이너가 실행 중이다.

$ docker service ps simple_web
ID                  NAME                IMAGE                     NODE                DESIRED STATE       CURRENT STATE            ERROR               PORTS
x8cq0l5i6jwg        simple_web.1        madvirus/simplenode:0.1   docker-node1        Running             Running 13 minutes ago
hu97kwyc8oon        simple_web.2        madvirus/simplenode:0.1   docker-node2        Running             Running 13 minutes ago

실행 중인 서비스의 이미지 버전이나 리플리케이션 개수를 변경하고 싶다면 컴포즈 파일을 알맞게 만들어 다시 스택을 배포하면 된다. 컴포즈 파일을 다시 만들자.

version: "3.6"
services:
  web:
    image: madvirus/simplenode:0.2
    ports:
      - "5000:5000"
    deploy:
      mode: replicated
      replicas: 4
      update_config:
        parallelism: 1
        order: start-first
        delay: 10s
        failure_action: rollback
    healthcheck:
      test: ["CMD", "curl", "-f", "http://localhost:5000/health"]
      interval: 10s
      timeout: 5s
      retries: 3
      start_period: 10s

이미지 태그를 0.1에서 0.2로 바꾸고 리플리카 개수를 4로 바꿨다. 새로 바꾼 컴포즈 파일을 이용해서 스택을 다시 배포해 보자.

$ docker stack deploy -c stack02.yml simple
Updating service simple_web (id: oj33pustknvntz1ud4t9g5egs)

simple_web 서비스를 업데이트한다는 메시지가 출력된다. 배포 후에 서비스 목록을 보면 컨테이너가 4개로 증가하고 이미지가 0.2로 바뀐 것을 볼 수 있다.

환경 변수를 이용한 설정 변경

환경 변수를 사용하면 컴포즈 파일을 수정하지 않고 값을 변경할 수 있다. 보통 컴포즈 파일을 사용해서 스택을 배포할 때는 새로 적용할 버전만 바뀔 때가 많은데 환경 변수를 사용하면 배포할 버전을 컴포즈 파일에 하드 코딩하지 않고 환경 변수로 전달할 수 있다.

version: "3.6"
services:
  web:
    image: madvirus/simplenode:${VER:-latest}
    ports:
      - "5000:5000"
    deploy:
      mode: replicated
      replicas: ${REPLICAS:-3}
      update_config:
        parallelism: 1
        order: start-first
        delay: 10s
        failure_action: rollback
    healthcheck:
      test: ["CMD", "curl", "-f", "http://localhost:5000/health"]
      interval: 10s
      timeout: 5s
      retries: 3
      start_period: 10s

${VER}와 ${REPLICAS}는 각각 같은 이름의 환경 변수의 값을 사용한다. 해당 환경 변수가 존재하지 않으면 각각 latest와 3을 값으로 사용한다. 이제 도커 스택을 배포할 때 환경 변수를 이용해서 컴포즈 파일 변경없이 원하는 값을 쉽게 지정할 수 있다.

$ VER=0.2 REPLICAS=2 docker stack deploy -c stack.yml simple
Creating network simple_default
Creating service simple_web

컨테이너 환경 변수 설정: environment

컴포즈 파일에서 컨테이너에 전달할 환경 변수를 전달할 수도 있다. environment 속성에 컨테이너에 전달할 환경 변수 이름과 값 목록을 설정하면 된다. 다음은 설정 예이다.

version: "3.6"
services:
  web:
    image: madvirus/simplenode:0.3
    ports:
      - "5000:5000"
    environment:
      - RUNTIME_ENV=${ENV:-dev}
    deploy:
      mode: replicated
      replicas: 2

스택과 서비스 묶음

한 스택에는 한 개 이상의 서비스, 네트워크, 볼륨을 정의할 수 있다. 이 중 지금은 두 개 이상의 서비스를 에 대한 내용만 살펴보자. 다음은 두 서비스를 정의한 컴포즈 파일 예이다.

version: "3.6"
services:
  mysql:
    image: mysql:5.7
    environment:
      - MYSQL_ROOT_PASSWORD=rootpw
    deploy:
      replicas: 1
  adminer:
    image: adminer
    ports:
      - "8080:8080"
    deploy:
      replicas: 1

이름이 mysql이고 adminer인 두 서비스를 정의하고 있다. 앞서 봤듯 실제로는 스택 이름을 앞에 붙인 서비스 이름을 사용한다. 위 컴포즈 파일을 이용해서 dbadmin이란 이름의 스택을 생성하자.

$ docker stack deploy -c stack03.yml dbadmin
Creating network dbadmin_default
Creating service dbadmin_adminer
Creating service dbadmin_mysql

스택을 생성하면 dbadmin_adminer와 dbadmin_mysql의 두 서비스가 만들어진다. 그리고 dbadmin_default 네트워크도 생성한 것을 알 수 있다. 스택을 생성하면 이름이 '스택_default'인 네트워크를 기본으로 생성한다. 이 네트워크는 오버레이 네트워크로 스택에 속한 서비스를 위한 네트워크이다. 스택에 속한 서비스는 서비스 이름을 사용해서 다른 서비스의 컨테이너와 통신할 수 있다.

dbadmin 스택의 경우 dbadmin_adminer 서비스에 속한 컨테이너에서 dbadmin_mysql라는 이름을 사용해서 해당 서비스의 컨테이너에 접근할 수 있다. 웹 브라우저에서 http://호스트:8080 주소를 입력하면 dbadmin_adminer 서비스가 생성한 컨테이너에 접속한다. 여기서 서버 이름에 dbadmin_mysql을 입력해서 dbadmin 스택에 속한 mysql DB에 연결할 수 있다.

같은 스택에 속한 컨테이너는 서비스 이름으로 접근

도커 스택과 네트워크에 대한 내용은 다음 글에서 간단히 살펴본다.

스택 삭제

docker stack rm 명령어를 사용하면 스택을 삭제한다.

$ docker stack rm dbadmin
Removing service dbadmin_adminer
Removing service dbadmin_mysql
Removing network dbadmin_default

관련 글

서비스 생성과 리플리케이션

도커 스웜 클러스터를 만들었다면 서비스를 생성할 수 있다. 서비스를 만드는 것은 어렵지 않다. 매니저 노드에서 docker create service 명령어를 실행하면 된다. 서비스를 생성할 때 --replicas 옵션을 사용해서 생성할 컨테이너 개수를 지정한다.

docker service create --name simple \
--publish published=8000,target=5000 \
--replicas 2 \
madvirus/simplenode:0.1

서비스를 생성한 뒤 "docker service ps 서비스명" 명령어를 실행하면 각 컨테이너가 어느 도커 노드에서 실행 중인지 확인할 수 있다.

vagrant@docker-node1:~$ docker service ps simple
ID                  NAME                IMAGE                     NODE                DESIRED STATE       CURRENT STATE           ERROR               PORTS
lecwvwjcs7lm        simple.1            madvirus/simplenode:0.1   docker-node2        Running             Running 5 minutes ago
wap8ua3proil        simple.2            madvirus/simplenode:0.1   docker-node3        Running             Running 6 minutes ago

결과에서 NAME은 컨테이너의 완전한 이름이 아닌 서비스 내에서 구분하기 위한 이름이다. 실제 컨테이너 이름은 simple.2.wap8ua3proil1aqircsy7tifq와 같이 NAME 뒤에 ID를 결합한 문자열을 사용한다.

docker servie scale 명령어를 사용하면 운영 중에 리플리케이션 개수를 변경할 수 있다.

$ docker service scale simple=4
simple scaled to 4
overall progress: 4 out of 4 tasks
1/4: running   [==================================================>]
2/4: running   [==================================================>]
3/4: running   [==================================================>]
4/4: running   [==================================================>]
verify: Service converged

$ docker service ps simple
ID                  NAME                IMAGE                     NODE                DESIRED STATE       CURRENT STATE                ERROR               PORTS
lecwvwjcs7lm        simple.1            madvirus/simplenode:0.1   docker-node2        Running             Running 18 minutes ago
wap8ua3proil        simple.2            madvirus/simplenode:0.1   docker-node3        Running             Running 19 minutes ago
v90ftwuywm0d        simple.3            madvirus/simplenode:0.1   docker-node1        Running             Running about a minute ago
674upghwilm2        simple.4            madvirus/simplenode:0.1   docker-node1        Running             Running about a minute ago

docker service ps 명령어를 실행하면 simple 서비스의 컨테이너 개수가 4개가 된 것을 확인할 수 있다. 개수를 늘리는 것 뿐만 아니라 줄이는 것도 가능하다.

$ docker service scale simple=1
simple scaled to 1
overall progress: 1 out of 1 tasks
1/1: running   [==================================================>]
verify: Service converged

$ docker service ps simple
ID                  NAME                IMAGE                     NODE                DESIRED STATE       CURRENT STATE            ERROR               PORTS
lecwvwjcs7lm        simple.1            madvirus/simplenode:0.1   docker-node2        Running             Running 20 minutes ago

리플리케이션 개수를 줄일 때는 가장 최근에 생성한 컨테이너를 먼저 중지한다.

서비스 제거

서비스 제거는 간단하다. "docker service rm 서비스명" 명령어를 사용하면 된다. 서비스를 제거하면 해당 컨테이너도 함께 삭제된다.

simplenode:0.2 버전

롤링 업그레이드와 헬스 체크를 테스트하기 위해 madvirus/simplenode:0.2 버전 이미지를 새로 추가했다. 이 이미지는 다음의 app.js를 실행한다.

const http = require('http');
const url = require('url');

const server = http.createServer().listen(5000);
let slow = false;

server.on('request', (req, res) => {
    console.log('request arrived.');
    const query = url.parse(req.url, true).query;
    if (query.stop === 'true') {
        res.write("Process down: " + process.env.HOSTNAME);
        res.end();
        setTimeout((arg) => { process.exit(1); }, 1000, "shutdown");
    } else if (query.slow === 'true') {
        slow = true;
        res.write("Health check slowdown");
        res.end();
    } else if (req.url === '/health') {
        if (slow) {
            setTimeout((arg) => { res.write("SLOW"); res.end(); }, 5000, "slow");
        } else {
            res.write("OK");
            res.end();
        }
    } else {
        res.write("Hello V0.2: " + process.env.HOSTNAME);
        res.end();
    }
});

server.on('connection', (socket) => {
    console.log("Connected");
});

다음은 이 app.js의 주요 내용이다.

  • 헬스체크를 위한 /health 경로를 처리한다.
    • slow 값이 true면 /health 응답을 5초 뒤에 한다. 이것으로 헬스체크 실패를 흉내낸다.
    • slow 기본 값은 false이다.
    • slow 파라미터가 'true'면 slow를 true로 바꾼다.
  • stop 파라미터가 'true'면 1초 뒤에 프로세스를 중지한다.

롤링 업그레이드

madvirus/simplenode:0.1 버전을 이용한 서비스를 madvirus/simplenode:0.2 버전으로 업그레이드해보자. docker service update 명령어를 사용하면 된다. 다음은 실행 예이다.

$ docker service update \
--update-parallelism 1 \
--update-delay 10s \
--update-order start-first \
--image madvirus/simplenode:0.2 \
simple

업데이트 관련 주요 옵션은 옵션은 다음과 같다.

  • --update-parallelism : 동시에 업데이트할 컨테이너 개수를 지정한다.
  • --update-delay : 업데이트 간 간격을 지정한다.
  • --update-order : start-first면 새 컨테이너를 먼저 생성한 뒤에 기존 컨테이너를 삭제한다. stop-first면 기존 컨테이너를 먼저 삭제하고 그 다음에 새 컨테이너를 생성한다.
  • --update-failure-action : 업데이트에 실패할 경우 이 값이 pause면 업데이트를 멈추고, continue면 업데이트를 계속하고, rollback이면 업데이트를 롤백한다.
  • --update-max-failure-ratio : 실패 비율이 지정한 값 이상이면 업데이트 실패로 간주한다.

서비스 업데이트 후에 docker service ps 명령어로 서비스를 조회하면 다음과 같이 simplenode:0.1 버전은 중지되고 simplenode:0.2 버전이 실행 중인 것을 확인할 수 있다.

vagrant@docker-node1:/vagrant/sample/simplenode$ docker service ps simple
ID                  NAME                IMAGE                     NODE                DESIRED STATE       CURRENT STATE             ERROR               PORTS
sqyy4m1846qu        simple.1            madvirus/simplenode:0.2   docker-node3        Running             Running 35 seconds ago
78c9glgwxr01         \_ simple.1        madvirus/simplenode:0.1   docker-node2        Shutdown            Shutdown 31 seconds ago
1zynj40j8a5v        simple.2            madvirus/simplenode:0.2   docker-node2        Running             Running 19 seconds ago
z0pz24xqpmna         \_ simple.2        madvirus/simplenode:0.1   docker-node1        Shutdown            Shutdown 15 seconds ago

상태 유지와 헬스 체크

simple 서비스를 생성할 때 리플리카의 개수를 2로 지정했다. 도커 스웜은 simple 서비스의 컨테이너 개수를 2개로 유지하기 위해 노력한다. 예를 들어 컨테이너 한 개가 종료되면 새로운 컨테이너를 구동해서 정상 동작하는 컨테이너 개수를 맞춘다.

실제로 컨테이너 한 개를 종료해보자. http://192.168.1.101:8000/?stop=true을 요청하면 컨테이너 중 한 개가 종료된다. 컨테이너가 종료되면 도커 스웜은 새로운 컨테이너를 바로 생성해서 컨테이너 개수를 2로 맞춘다.

헬스 체크 옵션을 주면 컨테이너가 정상 상태가 아닐 때 컨테이너를 중지하고 새로운 컨테이너를 생성할 수 있다. 헬스 체크 관련 옵션을 사용하면 생성한 컨테이너가 정상 상태인지 주기적으로 확인한다. 도커 스웜은 컨테이너의 상태를 주기적으로 확인해서 정상이 아니면 컨테이너를 제거하고 리플리케이션 개수에 맞게 새로 생성한다. 헬스 체크와 관련된 옵션은 --health로 시작하며 다음은 주요 옵션의 사용 예를 보여준다.

docker service create \
--name simple \
--publish published=8000,target=5000 \
--health-cmd 'curl -f http://localhost:5000/health' \
--health-timeout=3s \
--health-retries=3 \
--health-interval=10s \
--health-start-period=10s \
--replicas=2 \
madvirus/simplenode:0.2

주요 옵션은 다음과 같다.

  • --health-cmd : 정상 상태인지 확인할 때 사용할 명령어를 입력한다. 이 명령어는 컨테이너 내에서 실행된다.
  • --health-timeout : 명령어의 실행 시간 제한을 지정한다. 이 시간 내에 명령어 실행이 끝나면 정상이 아닌 것으로 판단한다.
  • --health-retries : 지정한 횟수만큼 연속해서 실패하면 정상이 아닌 것으로 간주한다.
  • --health-interval : 헬스 체크를 실행할 주기를 지정한다.
  • --health-start-period : 컨테이너 시작 후 지정한 시간 동안은 헬스 체크에 실패해도 실패로 간주하지 않는다.

헬스 체크가 잘 되는지 확인해보자. 두 개의 컨테이너가 떠 있다고 가정하고 http://192.168.1.101:8000/?slow=true 요청을 보내자. 앞서 app.js 코드를 보면 slow 파라미터 값이 true이면 /health 요청의 응답 시간을 5초로 바꾼다. 헬스 체크 제한 시간은 3초이므로 이 URL을 실행하면 컨테이너의 헬스체크에 실패한다.

헬스 체크 간격이 10초이고 3회 시도하므로 잠시 뒤에 simple service ps simple 명령어로 컨테이너 목록을 보자. 한 컨테이너가 실패 상태에 있고(Failed) 이를 대체할 새로운 컨테이너를 준비한 것을 확인할 수 있다.

$ docker service ps simple
ID                  NAME                IMAGE                     NODE                DESIRED STATE       CURRENT STATE                   ERROR                              PORTS
qzxmt802h7b9        simple.1            madvirus/simplenode:0.2   docker-node2        Running             Running 2 minutes ago
drmlj2bh7ltw        simple.2            madvirus/simplenode:0.2   docker-node3        Ready               Ready less than a second ago
cdumn8jafb56         \_ simple.2        madvirus/simplenode:0.2   docker-node3        Shutdown            Failed less than a second ago   "task: non-zero exit (143): do…"

서비스와 호스트 연결

서비스를 생성할 때 --publish (또는 -p) 옵션을 사용하면 호스트의 포트에 서비스의 포트를 연결해서 외부에 서비스를 개시할 수 있다.

docker service create \
--name simple \
--publish published=8000,target=5000 \
--replicas=2 \
madvirus/simplenode:0.2

스웜에서 외부에 서비스를 개시하면 스웜에 참여하는 모든 노드가 서비스에 대한 요청을 처리한다. 위 코드는 호스트의 8000 포트를 이용해서 서비스를 개시했는데 이 경우 스웜에 참여한 모든 노드는 8000 포트로 오는 요청을 서비스의 5000 포트로 전달한다. 호스트로 오는 요청을 실제 컨테이너로 전달하는 것은 ingress 네트워크를 통해 처리되는데 이에 대한 내용은 뒤에서 다시 살펴본다.

모드 종류

도커 서비스는 리플리케이션과 글로벌의 두 모드가 있다. 서비스를 생성할 때 --mode 옵션을 이용해서 모드를 지정할 수 있다. 모드에는 다음 두 값이 올 수 있다.

  • replicated : 기본 값으로 지정한 개수만큼 컨테이너를 생성한다.
  • global : 각 노드마다 한 개의 컨테이너를 생성한다.
https://docs.docker.com/engine/reference/commandline/service_create/ 문서에서 더 많은 서비스 관련 옵션을 확인할 수 있다.

관련 글

 

 

도커 스웜 모드를 사용하면 클러스트 환경에서 서비스를 관리할 수 있다. 여기서 서비스는 일종의 네트워크에서 찾고 사용할 수 있는 기능으로 컨테이너로 구성되어 있다. 서비스를 사용하면 실제 운영 중에 필요한 컨테이너 헬스체크, 고가용성을 위한 이중화, 롤링 업그레이드 등을 손쉽게 할 수 있다.

이 글에서는 도커 스웜 클러스터를 구축하고 아주 간단한 서비스를 클러스터에서 실행해보겠다.

서비스 테스트를 위한 간단한 이미지

여기서 사용할 이미지는 호스트 이름을 리턴하는 간단한 노드 웹이다. 코드는 다음과 같다.

const http = require('http');

const server = http.createServer().listen(5000);

server.on('request', (req, res) => {
    console.log('request arrived.');
    res.write("Hello: " + process.env.HOSTNAME);
    res.end();
});

server.on('connection', (socket) => {
    console.log("Connected");
});

이 코드를 실행하는 이미지를 도커 허브에 madvirus/simplenode:0.1로 등록해 놨다. 다음은 이미지를 생성할 때 사용한 Dockerfile이다.

FROM node:8.16-alpine
RUN apk add --no-cache tini curl

WORKDIR /app

COPY app.js .

ENTRYPOINT ["/sbin/tini", "--"]
CMD ["node", "app.js"]

도커 스웜 클러스터 생성

도커 스웜 클러스터를 생성하기 위해 다음의 IP를 가진 세 장비를 준비했다. 괄호 안에 표시한 문자열은 호스트 이름이다.

  • 매니저 : 192.168.1.101(docker-node1)
  • 워커: 192.168.1.102(docker-node2), 192.168.1.103(docker-node3)

101 서버에는 스웜 클러스터의 매니저를 설치하고 나머지 102, 103 서버에는 워커를 설치한다.

먼저 101 서버에서 docker swarm init 명령어로 도커 스웜을 초기화하자. 이때 --advertise-addr 옵션으로 스웜 클러스터에 참여할 IP를 지정한다.

$ docker swarm init --advertise-addr=192.168.1.101
Swarm initialized: current node (t8uoehrwcnzdf4q0poh6thlt4) is now a manager.

To add a worker to this swarm, run the following command:

    docker swarm join --token SWMTKN-1-4pldr32x98ytmo60lso4jkhj2mg2zrdqxo33b2n8f50oz01pwp-8y4jflppx6eelxs7cn6fgfmd2 192.168.1.101:2377

To add a manager to this swarm, run 'docker swarm join-token manager' and follow the instructions.

docker swram init 명령어는 docker swarm join 명령어를 출력한다. 이 명령을 사용해서 스웜에 워커를 추가할 수 있다. docker swarm join-token worker 명령어를 사용해서 참여 명령어를 조회할 수 있다.

102 서버와 103 서버에서 docker swram join 명령어를 실행하자.

$ docker swarm join --token SWMTKN-1-4pldr32x98ytmo60lso4jkhj2mg2zrdqxo33b2n8f50oz01pwp-8y4jflppx6eelxs7cn6fgfmd2 192.168.1.101:2377
This node joined a swarm as a worker.

스웜 매니저 장비(101 서버)에서 docker node ls 명령어를 실행해보자. 노드 목록을 볼 수 있다. 노드는 스웜에 참여한 도커 인스턴스로 보통 한 호스트에 한 노드를 설치한다.

vagrant@docker-node1:~$ docker node ls
ID                            HOSTNAME            STATUS              AVAILABILITY        MANAGER STATUS      ENGINE VERSION
t8uoehrwcnzdf4q0poh6thlt4 *   docker-node1        Ready               Active              Leader              19.03.2
rmsb5mnofkv8czjgjzpcce1gy     docker-node2        Ready               Active                                  19.03.2
01bul21qtizrxnawg56zngs4v     docker-node3        Ready               Active                                  19.03.2

docker-node1 호스트는 MANAGER STATUS 값이 Leader이다. 도커 스웜은 한 개 이상의 매니저를 가질 수 있는데 이 중에서 리더가 클러스터를 관리한다.

서비스 테스트하기

스웜 클러스터를 만들었으니 이제 클러스터에서 서비스를 실행해보자. docker service create 명령어를 사용해서 서비스를 생성한다.

$ docker service create \
--name simple \
--publish published=8000,target=5000 \
--replicas 2 \
madvirus/simplenode:0.1

ojslsjah1obmlpdyargxp6fg9
overall progress: 2 out of 2 tasks
1/2: running   [==================================================>]
2/2: running   [==================================================>]
verify: Service converged

위 명령어는 madvirus/simplenode:0.1 이미지를 이용해서 이름이 simple인 서비스를 생성한다. --replicas 옵션은 생성할 컨테이너의 개수를 지정한다. 여기서는 2를 주었으므로 클러스터 내에 두 개의 컨테이너를 생성한다. --publish 옵션을 사용해서 외부에서 8000번 포트로 연결하면 컨테이너의 5000번 포트로 연결한다.

서비스를 만들었으니 http://192.168.1.101:8000, http://192.168.1.102:8000, http://192.168.1.103:8000에 각각 연결해보자. 아래 그림처럼 호스트 이름을 응답으로 출력할 것이다(simplenode:0.1이 실행하는 app.js는 호스트 이름을 출력하는 간단한 웹 어플리케이션이다).

스웜 ingress 네트워크

docker service ls 명령어를 실행하면 실행 중인 서비스 목록을 보여준다.

$ docker service ls
ID                  NAME                MODE                REPLICAS            IMAGE                     PORTS
ojslsjah1obm        simple              replicated          2/2                 madvirus/simplenode:0.1   *:8000->5000/tcp

REPLICAS 값이 2/2인데 이는 현재 2개가 실행중이고(앞의 2) 설정한 복제수는 2(뒤의 2)개라는 것을 보여준다. 포트 연결은 서비스 단위에서 처리된다. 서비스 수준에서 8000 포트로 오는 요청을 컨테이너의 5000 포트로 연결한다.

8000 포트를 사용하면 어떤 노드에 요청하더라도 서비스의 컨테이너에 연결된다. 예를 들어 simple 서비스의 컨테이너가 docker-node1과 docker-node2에 각각 하나씩 떠 있고 docker-node3에 요청을 보내도 도커 스웜 네트워크가 알맞게 컨테이너에 전달한다. 이를 처리하는 것이 ingress 네트워크이다.

docker network ls 명령어를 실행해보자. 이름이 ingress인 네트워크가 생성된 것을 볼 수 있다.

$ docker network ls
NETWORK ID          NAME                DRIVER              SCOPE
d1a366c95b3a        bridge              bridge              local
da6c7575bf20        docker_gwbridge     bridge              local
b30ace236245        host                host                local
c8c1e9h5214o        ingress             overlay             swarm
918b235ab093        none                null                local

ingress 네트워크는 오버레이(overlay) 드라이버를 사용하고 스웜(swarm) 범위를 갖는다. 오버레이 네트워크는 서로 다른 도커 호스트에서 실행되는 컨테이너 간 통신을 위한 논리적인 네트워크 그룹으로서 ingress 네트워크는 특수한 오버레이 네트워크이다. ingress 네트워크는 스웜 노드로 오는 모든 요청을 알맞게 서비스로 라우팅한다.

오버레이 네트워크와 서비스에 대한 내용은 뒤에서 다시 살펴본다.

관련 글

+ Recent posts