주요글: 도커 시작하기
반응형

다소 동접이 발생하는 간단한 TCP 서버를 구현할 기술을 찾다가 리액터 네티(Reactor Netty)를 알게 되었다. 리액터 네티를 이용하면 네티를 기반으로 한 네트워크 프로그램을 리액터 API로 만들 수 있다. 리액터 네티를 사용하면 네티를 직접 사용하는 것보다 간결한 코드로 비동기 네트워크 프로그램을 만들 수 있는 이점이 있다.


다음은 리액터 네티(Reactor Netty)의 주요 특징이다.

  • 네티 기반
  • 리액터 API 사용
  • 논블로킹 TCP, UDP, HTTP 클라이언트/서버

이 글에서는 리액터 네티를 이용해서 간단한 소켓 서버를 만들어 보겠다.

TcpServer를 이용한 소켓 서버 만들기

리액터 네티는 TcpServer 클래스를 제공한다. 이 클래스를 이용해서 비교적 간단하게 비동기 소켓 서버를 구현할 수 있다. 이 글에서는 간단한 에코 서버를 만들어 본다. 만들 기능은 다음과 같다.

  • 클라이언트가 한 줄을 입력하면 "echo: 입력한 줄\r\n"으로 응답한다.
  • 클라이언트가 exit를 입력하면 클라이언트와 연결을 끊는다.
  • 클라이언트가 SHUTDOWN을 입력하면 서버를 종료한다.
  • 10초 이내에 클라이언트로부터 입력이 없으면 연결을 종료한다.
리액터 네티를 사용하기 위한 메이븐 설정은 다음과 같다.

    <dependencyManagement>
        <dependencies>
            <dependency>
                <groupId>io.projectreactor</groupId>
                <artifactId>reactor-bom</artifactId>
                <version>Californium-SR3</version> <!-- 리액터 네티 0.8.3에 대응 -->
                <type>pom</type>
                <scope>import</scope>
            </dependency>
        </dependencies>
    </dependencyManagement>

    <dependencies>
        <dependency>
            <groupId>io.projectreactor.netty</groupId>
            <artifactId>reactor-netty</artifactId>
        </dependency>

        <dependency>
            <groupId>io.projectreactor.addons</groupId>
            <artifactId>reactor-logback</artifactId>
        </dependency>
    </dependencies>

다음 코드는 리액터 네티로 만든 에코 서버의 전체 코드이다.

package demo;

import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.LineBasedFrameDecoder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Mono;
import reactor.netty.DisposableServer;
import reactor.netty.tcp.TcpServer;

import java.util.concurrent.CountDownLatch;

public class EchoServer {
    private static Logger log = LoggerFactory.getLogger(EchoServer.class);

    public static void main(String[] args) {
        CountDownLatch latch = new CountDownLatch(1);
        DisposableServer server = TcpServer.create()
                .port(9999) // 서버가 사용할 포트
                .doOnConnection(conn -> { // 클라이언트 연결시 호출
                    // conn: reactor.netty.Connection
                    conn.addHandler(new LineBasedFrameDecoder(1024));
                    conn.addHandler(new ChannelHandlerAdapter() {
                        @Override
                        public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
                            log.info("client added");
                        }

                        @Override
                        public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
                            log.info("client removed");
                        }

                        @Override
                        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
                                       throws Exception {
                            log.warn("exception {}", cause.toString());
                            ctx.close();
                        }
                    });
                    conn.onReadIdle(10_000, () -> {
                        log.warn("client read timeout");
                        conn.dispose();
                    });
                })
                .handle((in, out) -> // 연결된 커넥션에 대한 IN/OUT 처리
                        // reactor.netty (NettyInbound, NettyOutbound)
                        in.receive() // 데이터 읽기 선언, ByteBufFlux 리턴
                          .asString()  // 문자열로 변환 선언, Flux<String> 리턴
                          .flatMap(msg -> {
                                      log.debug("doOnNext: [{}]", msg);
                                      if (msg.equals("exit")) {
                                          return out.withConnection(conn -> conn.dispose());
                                      } else if (msg.equals("SHUTDOWN")) {
                                          latch.countDown();
                                          return out;
                                      } else {
                                          return out.sendString(Mono.just("echo: " + msg + "\r\n"));
                                      }
                                  }
                          )
                )
                .bind() // Mono<DisposableServer> 리턴
                .block();

        try {
            latch.await();
        } catch (InterruptedException e) {
        }
        log.info("dispose server");
        server.disposeNow(); // 서버 종료
    }
}


먼저 전체 코드 구조를 살펴보자.

DisposableServer server = TcpServer.create()
        .port(9999) // 포트 지정
        .doOnConnection(conn -> { // 클라이언트 연결시 호출 코드
            ...
        })
        .handle((in, out) -> // 데이터 입출력 처리 코드
            ...
        )
        .bind() // 서버 실행에 사용할 Mono<DisposableServer>
        .block(); // 서버 실행 및 DisposableServer 리턴

...(서버 사용)

// 서버 중지
server.disposeNow();

전체 코드 구조는 다음과 같다.
  • TcpServer.create()로 TcpServer 준비
  • port()로 사용할 포트 포트
  • doOnConnection() 메서드로 클라이언트 연결시 실행할 함수 설정
    • 이 함수에서 커넥션에 ChannelHandler를 등록하는 것과 같은 작업 수행
  • handle() 메서드로 클라이언트와 데이터를 주고 받는 함수 설정
  • bind() 메서드로 서버 연결에 사용할 Mono<DisposableServer> 생성
  • bind()가 리턴한 Mono의 block()을 호출해서 서버 실행하고 DisposableServer 리턴
서버가 정상적으로 구동되면 block() 메서드는 구동중인 DisposableServer를 리턴한다. DisposableServer의 disposeNow() 메서드는 서버를 중지할 때 사용한다. 이 외에도 서버 중지에 사용되는 몇 가지 dispose로 시작하는 메서드를 제공한다.

doOnConnection()으로 커넥션 초기화

doOnConnection() 메서드의 파라미터는 다음 함수형 타입이다.
  • Consumer<? super Connection>
reactor.netty.Connection 타입은 인터페이스로 네티의 ChannelHandler 등록과 몇 가지 이벤트 연동 기능을 제공한다. 예제 코드의 doOnConnection 설정 부분을 다시 보자.

.doOnConnection(conn -> { // 클라이언트 연결시 호출
    conn.addHandler(new LineBasedFrameDecoder(1024));
    conn.addHandler(new ChannelHandlerAdapter() {
        @Override
        public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
            log.info("client added");
        }
        ...
        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
        throws Exception {
            log.warn("exception {}", cause.toString());
            ctx.close();
        }
    });
    conn.onReadIdle(10_000, () -> {
        log.warn("client read timeout");
        conn.dispose();
    });

})

Connection#addHandler()는 네티의 ChannelHandler를 등록한다. 이 외에 addHandlerFirst(), addHandlerLast() 메서드를 제공한다. 이 메서드를 이용해서 필요한 네티 코덱을 등록하면 된다. 예제 코드에서는 한 줄씩 데이터를 읽어오는 LineBasedFrameDecoder를 등록했고 클라이언트 연결 이벤트에 따라 로그를 출력하기 위해 임의 ChannelHandlerAdapter 객체를 등록했다.

Connection#onReadIdle() 메서드는 첫 번째 인자로 지정한 시간(밀리초) 동안 데이터 읽기가 없으면 두 번째 인자로 전달받은 코드를 실행한다. 위 코드는 10초 동안 데이터 읽기가 없으면 연결을 종료한다. 비슷하게 onWriteIdle() 메서드는 지정한 시간 동안 쓰기가 없으면 코드를 실행한다.

handle() 메서드로 데이터 입출력 처리

데이터 송수신과 관련된 코드는 handle() 메서드로 지정한다. handle() 메서드가 전달 받는 함수형 타입은 다음과 같다.

BiFunction<? super NettyInbound, ? super NettyOutbound, ? extends Publisher<Void>>

이 함수는 NettyInbound와 NettyOutbound를 인자로 갖고 Publisher<Void>나 그 하위 타입을 리턴한다. 예제 코드의 handle() 메서드를 다시 보자.

.handle((in, out) -> // 연결된 커넥션에 대한 IN/OUT 처리
        // (NettyInbound, NettyOutbound)
        in.receive() // 데이터 읽기 선언, ByteBufFlux 리턴
          .asString()  // 문자열로 변환 선언, Flux<String> 리턴
          .flatMap(msg -> {
                      log.debug("doOnNext: [{}]", msg);
                      if (msg.equals("exit")) {
                          return out.withConnection(conn -> conn.dispose());
                      } else if (msg.equals("SHUTDOWN")) {
                          latch.countDown();
                          return out;
                      } else {
                          return out.sendString(Mono.just("echo: " + msg + "\r\n"));
                      }
                  }
          )
)


위 코드를 요약하면 다음과 같다.

  • NettyInbound#receive()는 데이터 수신을 위한 ByteBufFlux를 리턴
  • ByteBufFlux#asString()은 데이터를 문자열로 수신 처리
  • flatMap을 이용해서 수신한 메시지 처리

flatMap은 수신한 데이터를 이용해서 알맞은 처리를 한다. 클라이언트에 데이터를 전송할 때에는  NettyOutbound를 이용한다. NettyOutbound#sendString() 메서드를 이용하면 문자열 데이터를 클라이언트에 전송한다. NettyOutbound#sendString()의 파라미터는 Publisher<? extends String> 타입이기 때문에 위 코드에 Mono.just()를 이용했다.


Connection이 필요하면 NettyOutbound#withConnection() 메서드를 사용한다. 위 코드에서는 클라이언트가 "exit"를 전송하면 연결을 끊기 위해 이 메서드를 사용했다.


ByteBufFlux#asString() 메서드는 기본 캐릭터셋을 사용한다. 다른 캐릭터셋을 사용하고 싶다면 asString(Charset) 메서드를 사용한다. 비슷하게 NettyOutbound#sendString() 메서드도 기본 캐릭터셋을 사용하므로 다른 캐릭터셋을 사용하려면 NettyOutbound#sendString(Publisher, Charset) 메서드를 사용한다.


예제 실행

EchoServer를 실행해보자. 로그백을 사용했다면 아래와 비슷한 메시지가 출력되면서 서버가 구동된다.


08:49:10.522 [reactor-tcp-nio-1] DEBUG reactor.netty.tcp.TcpServer - [id: 0x1fb82e53, L:/127.0.0.1:9999] Bound new server


telnet을 이용해서 에코가 제대로 동작하는지 확인해본다. 클라이언트가 전송한 데이터를 굵은 글씨로 표시했고 서버가 응답한 데이터를 파란색으로 표시했다.


$ telnet localhost 9999

Trying 127.0.0.1...

Connected to localhost.

Escape character is '^]'.

124124

echo: 124124

wefwef

echo: wefwef

exit

Connection closed by foreign host.

$


위 과정에서 서버에 출력되는 로그는 다음과 같다(리액터 네티가 출력하는 로그는 생략했다.)


08:50:40.187 [reactor-tcp-nio-5] INFO  demo.EchoServer - client added

08:50:37.374 [reactor-tcp-nio-4] INFO  demo.EchoServer - doOnNext: [124124]

08:50:44.506 [reactor-tcp-nio-4] INFO  demo.EchoServer - doOnNext: [wefwef]

08:50:46.218 [reactor-tcp-nio-4] INFO  demo.EchoServer - doOnNext: [exit]

08:50:46.221 [reactor-tcp-nio-4] INFO  demo.EchoServer - client removed


Connection#onReadIdle()을 이용해서 읽기 타임아웃을 10초로 설정했는데 실제로 서버 접속 후 10초 동안 데이터를 전송하지 않으면 연결이 끊기는 것을 확인할 수 있다.


08:56:23.358 [reactor-tcp-nio-2] WARN  demo.EchoServer - client read timeout

08:56:23.360 [reactor-tcp-nio-2] INFO  demo.EchoServer - client removed


마지막으로 SHUTDOWN 명령어를 전송해 보자. 그러면 서버가 종료되는 것도 확인할 수 있을 것이다.


08:57:46.372 [reactor-tcp-nio-3] INFO  demo.EchoServer - doOnNext: [SHUTDOWN]

08:57:46.373 [main] INFO  demo.EchoServer - dispose server


+ Recent posts