반응형
논블럭킹 IO에 대해서 살펴보고, Selector를 이용한 논블럭킹 IO의 사용법을 알아본다.
블럭킹 IO와 논블럭킹 IO
자바 1.5가 조만간 정식 버전이 출시될 것으로 기대되고 있기는 하지만, 필자가 앞서 썻던 자바 1.4의 새로운 입출력 API인 NIO와 관련된 글을 끝맺음하고자 NIO API의 마지막 주제인 'Selector'와 '논블럭킹 IO'에 대해서 살펴보고자 한다. 먼저 1.4 이전의 입출력 코드에 대해서 살펴보도록 하자. 1.4 이전의 입출력 프로그램에서의 기본적인 입출력 메소드는 모두 블럭킹되었다. 예를 들어, ServerSocket의 accept() 메소드의 경우 클라이언트로부터의 연결 요청이 들어올 때 까지 블럭킹된다.
뿐만 아니라 클라이언트가 전송한 데이터를 입력받을 때에도 다음과 같이 데이터를 읽어오는 부분에서 프로그램의 흐름이 블럭킹된다.
이렇게 블럭킹되는 코드 때문에 자바로 서버 프로그래밍을 하는 개발자들은 성능 문제를 겪어야만 했다. 예를 들어, 동시에 10명의 접속자를 처리하기 위해서는 기본적으로 다음과 같이 각각의 클라이언트마다 하나의 쓰레드를 생성해주어야 했다.
public class ClientAcceptor {
public void run() {
...
Socket socket = serverSocket.accept();
ClientProcessor processor = new ClientProcessor(socket);
processor.start(); // 기본구조는 클라이언트 하나당 쓰레드 하나
...
}
}
public class ClientProcessor extends Thread {
...
public void run() {
...
in = socket.getInputStreacm();
// in으로부터 데이터 읽어오기
...
}
}
그 동안 서버 프로그래밍의 기본 구조는 위 코드와 같이 클라이언트의 요청이 들어올 경우 그 클라이언트의 요청을 처리하는 하나의 쓰레드를 생성하는 것이었다. 하지만 이렇게 <클라이언트-쓰레드> 쌍을 사용하는 경우 클라이언트가 숫자에 비례해서 쓰레드의 숫자가 많아지게 되기 때문에 서버에 심각한 성능 문제를 유발하게 되며 이런 점을 극복하기 위해서 자바 개발자들은 폴링(polling)기법과 쓰레드 풀을 함께 사용해 왔었다. 폴링 기법을 간단하게 설명하자면 다음과 같은 형태의 코드로 정리된다. (아래 코드에서 SocketList는 Socket 목록을 저장하는 List라고 가정하자.)
public class ClientAcceptor {
public void run() {
...
Socket socket = serverSocket.accept();
socketList.addSocket(socket); // 소켓을 처리 목록에 추가
...
}
}
public class ClientProcessor {
public void run() {
while(true) {
Thread.sleep(100); // 0.1초간 대기
for (int i = 0 ; i < socketList.size() ; i++) {
// 클라이언트의 요청을 차례대로 처리
Socket socket = socketList.getSocket(i);
in = socket.getInputStream();
// 클라이언트와의 입출력 처리
in.read(..); // 블럭킹되므로 시간 대기 문제 발생
...
}
}
}
}
구현하는 방법에 따라서 차이는 있겠지만 폴링기법의 기본 구현 형태는 위 코드와 같다. 즉, 처리해야 할 클라이언트를 차례대로 처리하는 것이다. 폴링 기법과 쓰레드 풀 기법을 함께 사용하면 성능을 좀더 높일 수 있긴 하지만, 대기 시간을 낭비하게 되고, (채팅 서비스와 같이) 동시처리가 요구되는 경우 폴링기법으로는 한계가 있다.
자바에서 성능상의 문제가 발생함에도 불구하고 멀티 쓰레드를 사용하여 클라이언트 당 하나의 쓰레드를 생성해주거나 폴링 기법을 사용하여 클라이언트 요청을 처리하는 근본적인 이유는 InputStream의 read() 메소드가 블럭킹되기 때문이다. 멀티 쓰레드를 사용하는 경우에는 각각의 클라이언트를 별도로 쓰레드로 처리함으로써 read() 메소드의 블럭킹 때문에 발생하는 문제를 해소하고 있으며, 폴링 기법의 경우는 시스템 자원 소모를 최소화하기 위해서 클라이언트의 요청을 순차적으로 반복해서 처리하고 있다. 하지만, 멀티 쓰레드 기법은 클라이언트의 증가에 비례해서 자원을 많이 소모하게 되며, 폴링 기법은 여전히 블럭킹에 따른 문제점을 안고 있다.
이런 문제를 해결하기 위해서 자바 1.4부터는 입출력의 논블럭킹 기능이 추가되었다. (논블럭킹 IO의 경우는 이미 운영체제 차원에서 지원되고 있고 C/C++ 라이브러리들 역시 각 운영체제에 맞게 논블럭킹 IO를 사용할 수 있도록 도와주고 있는 걸 생각해보면 자바에서는 늦게 이 기능이 추가되었다고 생각된다.) 논블럭킹 IO의 핵심은 클라이언트의 연결을 기다리거나 채널로부터 데이터를 읽어올 때 다음과 같이 처리되는 것이다.
논블럭킹 모드 사용하기
논블럭킹 모드를 사용할 수 있는 채널은 다음과 같은 메소드를 제공한다.
configureBlocking() 메소드의 파라미터의 값을 false로 전달하면 손쉽게 채널을 논블럭킹 모드로 변환할 수 있다. 예를 들어, 소켓스트림으로부터 데이터를 주고 받는데 사용되는 채널인 SocketChannel을 논블럭킹 모드로 지정하고 싶다면 다음과 같은 코드를 사용하면 된다.
위 코드와 같이 configureBlocking(false) 메소드를 호출한 이후에 SocketChannel.read() 메소드는 앞에서 설명했듯이 블럭킹 되지 않고 곧바로 리턴된다.
SocketChannel을 비롯해서 논블럭킹 모드를 지원하는 채널은 다음과 같다.
위 코드의 문제점은 무한루프를 도는 데 모든 코드가 쉴새 없이 실행되기 때문에 CPU의 실행시간을 상당부분 소모하게 된다는 것이다. 예를 들어, 10초간 클라이언트로부터의 연결 요청이 없을 경우, serverChannel.accept() 메소드는 10초가 계속해서 null을 리턴하게 되며, 결과적으로 10초 동안 while() 루프는 (실제적으로 아무런 기능도 수행하지 않은채로) 반복해서 실행되며 그 만큼 CPU 시간을 낭비하게 되는 것이다. 앞에서 소켓으로부터 데이터를 읽어와 처리하는 부분의 경우에도 Thread.sleep() 메소드를 지정해주지 않으면 상당량의 CPU 시간을 낭비하게 된다.
무한 루프 형태의 반복문에서 논블럭킹 메소드를 수행하는 경우에는 이처럼 CPU 시간이 낭비되는 데, 자바 1.4는 이를 방지할 수 있는 기능을 제공하고 있다. 그것은 바로 Selector라는 것인데, 이 Selector를 사용하게 되면 연결요청이 들어왔거나 또는 데이터를 읽어올 수 있는 경우에만 코드를 수행하도록 지정할 수 있다.
Selector를 통해서 논블럭킹 채널 사용하기
Selector 클래스는 일종의 이벤트 리스너이다. 즉, 논블럭킹 모드를 지원하는 채널에 Selector를 등록해놓으면 논블럭킹 채널은 연결요청이 들어오거나 데이터가 도착한 경우에 그 사실을 Selector에 알리게 된다. 그럼, Selector는 어떤 기능을 사용할 수 있는 지를 리턴하게 되며, 그 리턴값을 통해서 연결요청을 처리할지 데이터 읽기를 처리할지 결정할 수 있게 된다.
논블럭킹 모드를 지원하는 채널들은 다음과 같이 Selector를 등록할 수 있는 메소드를 제공하고 있다.
sel 인자는 해당 채널에 등록할 Selector이고, ops는 Selector가 전달받은 이벤트의 종류를 명시한다. att 객체는 리턴디는 SelectionKey에서 사용할 속성을 나타낸다.
Selector를 채널에 등록하기 위해서는 먼저 Selector를 생성해야 하는데, Selector.open() 메소드를 사용하면 새로운 Selector를 생성할 수 있게 된다. (NIO API의 특징은 객체를 생성할 때 open() 메소드를 생성한다는 점이다.) open() 메소드의 사용방법은 다음과 같이 간단한다.
Selector를 생성한 후에는 Selector 객체를 알맞은 채널에 등록해주면 된다. Selector를 채널에 등록할 때에는 채널의 어떤 기능과 관련해서 등록할지를 정해야 하는데 이때에는 SelectionKey 클래스에 정의된 상수값을 사용하면 된다. 다음의 SelectionKey 클래스에 정의된 상수값의 목록이다.
위와 같이 논블럭킹 채널에 Selector를 등록한 이후에는 논블럭킹되는 메소드 대신에 Selector 클래스의 readyOps() 메소드를 사용하여 어떤 기능을 사용할 수 있는 지 검사할 수 있게 된다. 예를 들어, 위 코드에서처럼 ServerSocketChannel에 Selector를 등록했다면 다음과 같이 Selector를 사용하여 연결요청이 들어오는 때에 알맞은 처리를 할 수가 있다.
위 코드에서 핵심적인 기능을 제공하는 메소드는 Selector의 readyOps() 메소드이다. Selector의 readyOps() 메소드는 Selector가 등록되어 있는 채널로부터 관련된 이벤트가 발생할 때 까지 블럭킹되며, 관련된 이벤트가 발생할 경우 그와 간련된 키값을 리턴한다. 예를 들어, 위 코드에서는 SelectionKey.OP_ACCEPT 이벤트에 관심을 갖도록 Selector를 등록했는데 관련된 ServerSocketChannel에 클라이언트의 연결 요청이 들어올때까지 selector.readyOps() 메소드는 블럭킹되며, 클라이언트의 요청이 들어올 경우 그와 관련된 키 값인 SelectionKey.OP_ACCEPT를 리턴하게 된다.
Selector의 readyOps() 메소드는 등록된 채널들로부터 받은 모든 이벤트 목록을 리턴한다. 예를 들어, 다음과 같이 하나의 Selector를 여러 채널에 등록했다고 가정해보자.
위와 같이 하나의 Selector를 여러 채널에 등록한 경우 Selector의 readyOps() 메소드는 관련된 채널들 중의 하나라도 이벤트가 발생한 경우 그와 관련된 값을 리턴한다. 만약 동시에 클라이언트로부터 연결 요청이 들어오고 소켓으로부터 읽을 수 있는 데이터가 들어왔다면 selector.readyOps() 메소드는 다음과 같은 값을 리턴할 것이다.
Selector.readyOps() 메소드가 리턴한 값이 0보다 큰 경우 Selector의 selectedKeys() 메소드를 사용하여 관련된 이벤트 목록을 읽어올 수 있다. selectedKeys() 메소드는 Selector가 등록된 채널과 관련된 SelectionKey의 집합인 Set을 리턴한다. SelectionKey는 관련 채널을 리턴하는 channel() 메소드를 제공하고 있다. 따라서, Selector를 이용하는 경우 전체적인 프로그램 코드는 다음과 같은 구조를 갖게 된다.
// 1. Selector 생성 Selector selector = Selector.open();
// 2. Selector를 등록할 수 있는 채널 생성
ServerSocketChannel channel = ServerSocketChannel.open();
...
// 3. 채널에 Selector 등록
channel.register(selector, SelectionKey.OP_ACCEPT, null);
...
while(true) {
// 4. selector를 이용하여 채널의 이벤트 대기
int readyKey = selector.readyOps();
// 5. readyKey가 0 이상이면 이벤트가 발생한 것으로 처리
if (readyKey > 0) {
// 6. selector로부터 채널에서 발생한 이벤트와 관련된 SelectionKey Set 구함
Set selectionKeySet = selector.selectedKeys();
// 7. Set에서 각 SelectionKey를 차례대로 읽어와
Iterator iter = selectionKeySet.iterator();
while(iter.hasNext()) {
SelectionKey selectionKey = (SelectionKey)iter.next();
// 8. SelectionKey로부터 채널을 구함
ServerSocketChannel relatedChannel =
(ServerSocketChannel)selectionKey.channel();
// 9. 채널을 사용하여 알맞은 작업 수행
...
}
}
}
논블럭킹 IO를 이용한 웹 서버 구축
마지막으로 논블럭킹 IO를 이용하여 간단한 웹 서버를 구현해보도록 하겠다. 여기서 구현할 웹 서버는 다음과 같이 4개의 클래스로 구성되어 있다.
ClientAcceptor는 쓰레드로서 생성자에서 클라이언트와 연결된 SocketChannel을 저장할 ChannelQueue와 클라이언트의 연결을 대기할 포트 번호를 전달받는다. 생성자에서는 클라이언트의 연결을 대기할 ServerSocketChannel을 생성하고 지정한 지정한 포트에 바인딩한다. 또한 생성자에서는 ServerSocketChannel을 논블럭킹 모드로 전환하고 ServerSocketChannel에 Selector를 등록한다.
run() 메소드에서는 Selector.readyOps()를 사용하여 클라이언트의 연결이 들어올 경우 ServerSocketChannel의 accept() 메소드를 사용해서 클라이언트와 연결된 SocketChannel을 구한 후 큐에 저장한다.
ClientAcceptor가 클라이언트의 연결 요청을 받아서 관련된 SocketChannel을 큐에 저장하면, ClientProcessor는 그 큐로부터 SocketChannel을 읽어와 클라이언트의 요청을 처리한다. ClientProcessor 클래스의 소스 코드는 다음과 같다.
ClientProcessor의 소스 코드는 지금까지 설명했던 것들을 코드로 표현한 것에 불과하므로 자세한 설명은 덧붙이지 않겠다. 참고로 각 메소드를 간단하게 설명하자면 다음과 같다.
위 코드에서 문제가 발생할 수 있는데 그것은 바로 readSelector.select() 메소드는 클라이언트로부터 데이터가 들어올 때에 비로서 값을 리턴한다는 점이다. 좀더 구체적으로 설명하기 위해 다음과 같은 실행 순서를 생각해보자.
위에서 '>>>' 표시는 클라이언트로부터 연결 요청이 들어왔음을 의미하는데, 4의 과정에서 새로들어온 SocketChannel을 channelQueue에 등록한다. channelQueue에 등록된 SocketChannel은 ClientProcessor.processSocketChannelQueue() 메소드를 통해서 논블럭킹 모드로 지정되고 readSelector를 등록하게 된다. 여기서 실행 순서상 과정 4나 과정 8에서 channelQueue에 새로운 SocketChannel을 등록하더라도 이들 SocketChannel은 readSelector를 등록되지 않은 상태이기 때문에 이들 채널에 데이터가 들어오더라도 readSelector.select() 메소드는 계속해서 블럭킹된다.
이처럼 계속해서 논리적으로 계속해서 블럭킹될 수 밖에 없는데 왜 필자가 이렇게 구현했을까? 그것은 바로 Selector가 제공하는 wakeup()이라는 메소드를 사용하면 위와 같은 문제를 해결할 수 있기 때문이다. 실제로 ChannelSocketQueue의 addLast() 메소드는 다음과 같이 구현되어 있다.
위 코드에서 readSelector는 ClientProcessor의 readSelector와 동일한 객체를 가리키는데, Selector의 wakeup() 메소드를 호출하게 되면 블럭킹되어 있던 select() 메소드는 곧바로 리턴된다. 즉, 앞에서 살펴봤던 블럭킹 문제가 발생하지 않게 되는 것이다.
마지막으로 NIOWebServer 클래스는 앞에서 ClientAcceptor 쓰레드와 ClientProcessor 쓰레드를 구동한다. 소스 코드는 복잡하지 않으므로 첨부한 소스 코드를 참고하기 바란다. 실행은 다음과 같이 하면 된다. (소스 코드에 컴파일한 .class 파일도 포함되어 있으니 그대로 사용할 수 있을 것이다.)
위와 같이 실행한 후 웹브라우저를 띄워서 http://localhost/index.html 과 같은 알맞은 URL을 입력해서 실제로 웹 서버가 제대로 동작하는 지 확인해보기 바란다.
결론
본 글에서는 NIO API의 논블럭킹 IO에 대해서 살펴보았다. 앞에서 작성한 웹 서버의 경우 단 두 개의 쓰레드로(ClientAcceptor와 ClientProcessor) 모든 클라이언트의 요청을 처리하고 있다. 물론 상황에 따라서 ClientProcessor 쓰레드의 개수를 증가시킬 수 있겠지만 기본적으로 다수의 클라이언트를 하나의 쓰레드에서 성능저하현상없이 처리할 수 있게 해주는 원천이 바로 논블록킹 IO의 막강한 장점이라 할 수 있다.
관련링크:
블럭킹 IO와 논블럭킹 IO
자바 1.5가 조만간 정식 버전이 출시될 것으로 기대되고 있기는 하지만, 필자가 앞서 썻던 자바 1.4의 새로운 입출력 API인 NIO와 관련된 글을 끝맺음하고자 NIO API의 마지막 주제인 'Selector'와 '논블럭킹 IO'에 대해서 살펴보고자 한다. 먼저 1.4 이전의 입출력 코드에 대해서 살펴보도록 하자. 1.4 이전의 입출력 프로그램에서의 기본적인 입출력 메소드는 모두 블럭킹되었다. 예를 들어, ServerSocket의 accept() 메소드의 경우 클라이언트로부터의 연결 요청이 들어올 때 까지 블럭킹된다.
Socket socket = serverSocket.accept(); // 블럭킹됨!!
뿐만 아니라 클라이언트가 전송한 데이터를 입력받을 때에도 다음과 같이 데이터를 읽어오는 부분에서 프로그램의 흐름이 블럭킹된다.
in = socket.getInputStream();
...
len = in.read(buff); // 블럭킹됨.
...
len = in.read(buff); // 블럭킹됨.
이렇게 블럭킹되는 코드 때문에 자바로 서버 프로그래밍을 하는 개발자들은 성능 문제를 겪어야만 했다. 예를 들어, 동시에 10명의 접속자를 처리하기 위해서는 기본적으로 다음과 같이 각각의 클라이언트마다 하나의 쓰레드를 생성해주어야 했다.
public class ClientAcceptor {
public void run() {
...
Socket socket = serverSocket.accept();
ClientProcessor processor = new ClientProcessor(socket);
processor.start(); // 기본구조는 클라이언트 하나당 쓰레드 하나
...
}
}
public class ClientProcessor extends Thread {
...
public void run() {
...
in = socket.getInputStreacm();
// in으로부터 데이터 읽어오기
...
}
}
그 동안 서버 프로그래밍의 기본 구조는 위 코드와 같이 클라이언트의 요청이 들어올 경우 그 클라이언트의 요청을 처리하는 하나의 쓰레드를 생성하는 것이었다. 하지만 이렇게 <클라이언트-쓰레드> 쌍을 사용하는 경우 클라이언트가 숫자에 비례해서 쓰레드의 숫자가 많아지게 되기 때문에 서버에 심각한 성능 문제를 유발하게 되며 이런 점을 극복하기 위해서 자바 개발자들은 폴링(polling)기법과 쓰레드 풀을 함께 사용해 왔었다. 폴링 기법을 간단하게 설명하자면 다음과 같은 형태의 코드로 정리된다. (아래 코드에서 SocketList는 Socket 목록을 저장하는 List라고 가정하자.)
public class ClientAcceptor {
public void run() {
...
Socket socket = serverSocket.accept();
socketList.addSocket(socket); // 소켓을 처리 목록에 추가
...
}
}
public class ClientProcessor {
public void run() {
while(true) {
Thread.sleep(100); // 0.1초간 대기
for (int i = 0 ; i < socketList.size() ; i++) {
// 클라이언트의 요청을 차례대로 처리
Socket socket = socketList.getSocket(i);
in = socket.getInputStream();
// 클라이언트와의 입출력 처리
in.read(..); // 블럭킹되므로 시간 대기 문제 발생
...
}
}
}
}
구현하는 방법에 따라서 차이는 있겠지만 폴링기법의 기본 구현 형태는 위 코드와 같다. 즉, 처리해야 할 클라이언트를 차례대로 처리하는 것이다. 폴링 기법과 쓰레드 풀 기법을 함께 사용하면 성능을 좀더 높일 수 있긴 하지만, 대기 시간을 낭비하게 되고, (채팅 서비스와 같이) 동시처리가 요구되는 경우 폴링기법으로는 한계가 있다.
자바에서 성능상의 문제가 발생함에도 불구하고 멀티 쓰레드를 사용하여 클라이언트 당 하나의 쓰레드를 생성해주거나 폴링 기법을 사용하여 클라이언트 요청을 처리하는 근본적인 이유는 InputStream의 read() 메소드가 블럭킹되기 때문이다. 멀티 쓰레드를 사용하는 경우에는 각각의 클라이언트를 별도로 쓰레드로 처리함으로써 read() 메소드의 블럭킹 때문에 발생하는 문제를 해소하고 있으며, 폴링 기법의 경우는 시스템 자원 소모를 최소화하기 위해서 클라이언트의 요청을 순차적으로 반복해서 처리하고 있다. 하지만, 멀티 쓰레드 기법은 클라이언트의 증가에 비례해서 자원을 많이 소모하게 되며, 폴링 기법은 여전히 블럭킹에 따른 문제점을 안고 있다.
이런 문제를 해결하기 위해서 자바 1.4부터는 입출력의 논블럭킹 기능이 추가되었다. (논블럭킹 IO의 경우는 이미 운영체제 차원에서 지원되고 있고 C/C++ 라이브러리들 역시 각 운영체제에 맞게 논블럭킹 IO를 사용할 수 있도록 도와주고 있는 걸 생각해보면 자바에서는 늦게 이 기능이 추가되었다고 생각된다.) 논블럭킹 IO의 핵심은 클라이언트의 연결을 기다리거나 채널로부터 데이터를 읽어올 때 다음과 같이 처리되는 것이다.
- 논블럭킹 모드인 경우
- 클라이언트의 연결 요청이 없을 경우 ServerSocketChannel.accept() 메소드는 곧바로 null을 리턴한다.
- 채널로부터 읽어올 데이터가 없는 경우 SocketChannel.read() 메소드는 곧바로 리턴되며, 인자로 전달한 ByteBuffer에는 어떤 내용도 입력되지 않는다.
논블럭킹 모드 사용하기
논블럭킹 모드를 사용할 수 있는 채널은 다음과 같은 메소드를 제공한다.
public SelectableChannel configureBlocking(boolean block) throws IOException
configureBlocking() 메소드의 파라미터의 값을 false로 전달하면 손쉽게 채널을 논블럭킹 모드로 변환할 수 있다. 예를 들어, 소켓스트림으로부터 데이터를 주고 받는데 사용되는 채널인 SocketChannel을 논블럭킹 모드로 지정하고 싶다면 다음과 같은 코드를 사용하면 된다.
public class ClientAcceptor {
public void run() {
ServerSocketChannel ssc = null;
try {
ssc = ServerSocketChannel.open();
InetSocketAddress address = new InetSocketAddress(port);
ssc.socket().bind(address);
while(true) {
SocketChannel socketChannel = serverChannel.accept();
// 소켓채널 논블럭킹 모드 지정
socketChannel.configureBlocking(false);
socketList.addSocket(socketChannel);
...
...
}
} catch(IOException ex) {
...
} finally {
...
}
}
}
public class ClientProcessor {
public void run() {
ByteBuffer buffer = ....;
...
while(true) {
Thread.sleep(100); // 0.1초간 대기
for (int i = 0 ; i < socketList.size() ; i++) {
// 클라이언트의 요청을 차례대로 처리
SocketChannel socket = socketList.getSocket(i);
buffer.clear();
socket.read(buffer); // 블럭킹 되지 않음
if (buffer.position() > 0) {
... // 소켓에서 읽어온 데이터 처리
}
}
}
}
}
public void run() {
ServerSocketChannel ssc = null;
try {
ssc = ServerSocketChannel.open();
InetSocketAddress address = new InetSocketAddress(port);
ssc.socket().bind(address);
while(true) {
SocketChannel socketChannel = serverChannel.accept();
// 소켓채널 논블럭킹 모드 지정
socketChannel.configureBlocking(false);
socketList.addSocket(socketChannel);
...
...
}
} catch(IOException ex) {
...
} finally {
...
}
}
}
public class ClientProcessor {
public void run() {
ByteBuffer buffer = ....;
...
while(true) {
Thread.sleep(100); // 0.1초간 대기
for (int i = 0 ; i < socketList.size() ; i++) {
// 클라이언트의 요청을 차례대로 처리
SocketChannel socket = socketList.getSocket(i);
buffer.clear();
socket.read(buffer); // 블럭킹 되지 않음
if (buffer.position() > 0) {
... // 소켓에서 읽어온 데이터 처리
}
}
}
}
}
위 코드와 같이 configureBlocking(false) 메소드를 호출한 이후에 SocketChannel.read() 메소드는 앞에서 설명했듯이 블럭킹 되지 않고 곧바로 리턴된다.
SocketChannel을 비롯해서 논블럭킹 모드를 지원하는 채널은 다음과 같다.
- ServerSocketChannel
- SocketChannel
- DatagramChannel
- Pipe.SinkChannel
- Pipe.SourceChannel
public class ClientAcceptor {
public void run() {
ServerSocketChannel ssc = null;
try {
ssc = ServerSocketChannel.open();
InetSocketAddress address = new InetSocketAddress(port);
ssc.configureBlocking(false);
ssc.socket().bind(address);
while(true) {
// 연결 요청없을 경우 곧바로 리턴
SocketChannel socketChannel = serverChannel.accept();
if (socketChannel != null) {
// 소켓채널 논블럭킹 모드 지정
socketChannel.configureBlocking(false);
socketList.addSocket(socketChannel);
...
...
}
}
} catch(IOException ex) {
...
} finally {
...
}
}
}
public void run() {
ServerSocketChannel ssc = null;
try {
ssc = ServerSocketChannel.open();
InetSocketAddress address = new InetSocketAddress(port);
ssc.configureBlocking(false);
ssc.socket().bind(address);
while(true) {
// 연결 요청없을 경우 곧바로 리턴
SocketChannel socketChannel = serverChannel.accept();
if (socketChannel != null) {
// 소켓채널 논블럭킹 모드 지정
socketChannel.configureBlocking(false);
socketList.addSocket(socketChannel);
...
...
}
}
} catch(IOException ex) {
...
} finally {
...
}
}
}
위 코드의 문제점은 무한루프를 도는 데 모든 코드가 쉴새 없이 실행되기 때문에 CPU의 실행시간을 상당부분 소모하게 된다는 것이다. 예를 들어, 10초간 클라이언트로부터의 연결 요청이 없을 경우, serverChannel.accept() 메소드는 10초가 계속해서 null을 리턴하게 되며, 결과적으로 10초 동안 while() 루프는 (실제적으로 아무런 기능도 수행하지 않은채로) 반복해서 실행되며 그 만큼 CPU 시간을 낭비하게 되는 것이다. 앞에서 소켓으로부터 데이터를 읽어와 처리하는 부분의 경우에도 Thread.sleep() 메소드를 지정해주지 않으면 상당량의 CPU 시간을 낭비하게 된다.
무한 루프 형태의 반복문에서 논블럭킹 메소드를 수행하는 경우에는 이처럼 CPU 시간이 낭비되는 데, 자바 1.4는 이를 방지할 수 있는 기능을 제공하고 있다. 그것은 바로 Selector라는 것인데, 이 Selector를 사용하게 되면 연결요청이 들어왔거나 또는 데이터를 읽어올 수 있는 경우에만 코드를 수행하도록 지정할 수 있다.
Selector를 통해서 논블럭킹 채널 사용하기
Selector 클래스는 일종의 이벤트 리스너이다. 즉, 논블럭킹 모드를 지원하는 채널에 Selector를 등록해놓으면 논블럭킹 채널은 연결요청이 들어오거나 데이터가 도착한 경우에 그 사실을 Selector에 알리게 된다. 그럼, Selector는 어떤 기능을 사용할 수 있는 지를 리턴하게 되며, 그 리턴값을 통해서 연결요청을 처리할지 데이터 읽기를 처리할지 결정할 수 있게 된다.
논블럭킹 모드를 지원하는 채널들은 다음과 같이 Selector를 등록할 수 있는 메소드를 제공하고 있다.
public SelectionKey register(Selector sel, int ops, Object att)
throws ClosedChannelException
throws ClosedChannelException
sel 인자는 해당 채널에 등록할 Selector이고, ops는 Selector가 전달받은 이벤트의 종류를 명시한다. att 객체는 리턴디는 SelectionKey에서 사용할 속성을 나타낸다.
Selector를 채널에 등록하기 위해서는 먼저 Selector를 생성해야 하는데, Selector.open() 메소드를 사용하면 새로운 Selector를 생성할 수 있게 된다. (NIO API의 특징은 객체를 생성할 때 open() 메소드를 생성한다는 점이다.) open() 메소드의 사용방법은 다음과 같이 간단한다.
Selector selector = Selector.open();
Selector를 생성한 후에는 Selector 객체를 알맞은 채널에 등록해주면 된다. Selector를 채널에 등록할 때에는 채널의 어떤 기능과 관련해서 등록할지를 정해야 하는데 이때에는 SelectionKey 클래스에 정의된 상수값을 사용하면 된다. 다음의 SelectionKey 클래스에 정의된 상수값의 목록이다.
- SelectionKey.OP_READ - 채널로부터 데이터를 읽어올 수 있는 경우. 값은 1
- SelectionKey.OP_WRITE - 채널에 데이터를 쓸 수 있는 경우. 값은 4
- SelectionKey.OP_ACCEPT - 소켓 연결이 들어온 경우. 값은 16
- SelectionKey.OP_CONNECT - 연결 요청이 이뤄진 경우. 값은 8
ServerSocketChannel ssc = null;
..
try {
ssc = ServerSocketChannel.open();
ssc.blockingConfigure(true);
Selector selector = Selector.open();
ssc.register(selector, SelectionKey.OP_ACCEPT, null);
...
} catch(..) {
...
}
..
try {
ssc = ServerSocketChannel.open();
ssc.blockingConfigure(true);
Selector selector = Selector.open();
ssc.register(selector, SelectionKey.OP_ACCEPT, null);
...
} catch(..) {
...
}
위와 같이 논블럭킹 채널에 Selector를 등록한 이후에는 논블럭킹되는 메소드 대신에 Selector 클래스의 readyOps() 메소드를 사용하여 어떤 기능을 사용할 수 있는 지 검사할 수 있게 된다. 예를 들어, 위 코드에서처럼 ServerSocketChannel에 Selector를 등록했다면 다음과 같이 Selector를 사용하여 연결요청이 들어오는 때에 알맞은 처리를 할 수가 있다.
ssc = ServerSocketChannel.open();
ssc.blockingConfigure(true);
Selector selector = Selector.open();
ssc.register(selector, SelectionKey.OP_ACCEPT, null); // selector를 등록
while (true) {
// selector와 관련된 이벤트가 발생할때까지 블럭킹!
int numKeys = selector.readyOps();
if (numKeys > 0) {
if ((numKeys & SelectionKey.OP_ACCEPT) ==
SelectionKey.OP_ACCEPT) {
Set selectedKeySet = selector.selectedKeys();
Iterator iter = selectedKeySet.iterator();
while(iter.hasNext()) {
SelectionKey key = (SelectionKey)iter.next();
iter.remove();
SocketChannel incomingChannel = ssc.accept();
// 또는 다음 코드와 같이 (ServerSocketChannel)key.channel()
// SocketChannel incomingChannel =
// ((ServerSocketChannel)key.channel()).accept();
}
}
}
}
ssc.blockingConfigure(true);
Selector selector = Selector.open();
ssc.register(selector, SelectionKey.OP_ACCEPT, null); // selector를 등록
while (true) {
// selector와 관련된 이벤트가 발생할때까지 블럭킹!
int numKeys = selector.readyOps();
if (numKeys > 0) {
if ((numKeys & SelectionKey.OP_ACCEPT) ==
SelectionKey.OP_ACCEPT) {
Set selectedKeySet = selector.selectedKeys();
Iterator iter = selectedKeySet.iterator();
while(iter.hasNext()) {
SelectionKey key = (SelectionKey)iter.next();
iter.remove();
SocketChannel incomingChannel = ssc.accept();
// 또는 다음 코드와 같이 (ServerSocketChannel)key.channel()
// SocketChannel incomingChannel =
// ((ServerSocketChannel)key.channel()).accept();
}
}
}
}
위 코드에서 핵심적인 기능을 제공하는 메소드는 Selector의 readyOps() 메소드이다. Selector의 readyOps() 메소드는 Selector가 등록되어 있는 채널로부터 관련된 이벤트가 발생할 때 까지 블럭킹되며, 관련된 이벤트가 발생할 경우 그와 간련된 키값을 리턴한다. 예를 들어, 위 코드에서는 SelectionKey.OP_ACCEPT 이벤트에 관심을 갖도록 Selector를 등록했는데 관련된 ServerSocketChannel에 클라이언트의 연결 요청이 들어올때까지 selector.readyOps() 메소드는 블럭킹되며, 클라이언트의 요청이 들어올 경우 그와 관련된 키 값인 SelectionKey.OP_ACCEPT를 리턴하게 된다.
Selector의 readyOps() 메소드는 등록된 채널들로부터 받은 모든 이벤트 목록을 리턴한다. 예를 들어, 다음과 같이 하나의 Selector를 여러 채널에 등록했다고 가정해보자.
Selector selector = Selector.open();
ServerSocketChannel ssc = ...;
ssc.register(selector, SelectionKey.OP_ACCEPT, null);
...
SocketChannel sc = ssc.accept();
sc.register(selector, SelectionKey.OP_READ, null);
...
ServerSocketChannel ssc = ...;
ssc.register(selector, SelectionKey.OP_ACCEPT, null);
...
SocketChannel sc = ssc.accept();
sc.register(selector, SelectionKey.OP_READ, null);
...
위와 같이 하나의 Selector를 여러 채널에 등록한 경우 Selector의 readyOps() 메소드는 관련된 채널들 중의 하나라도 이벤트가 발생한 경우 그와 관련된 값을 리턴한다. 만약 동시에 클라이언트로부터 연결 요청이 들어오고 소켓으로부터 읽을 수 있는 데이터가 들어왔다면 selector.readyOps() 메소드는 다음과 같은 값을 리턴할 것이다.
// 연결 요청과 채널에서 읽어올 수 있는 데이터를 사용가능한 경우 아래 numKeys는
// SelectionKey.OP_READ | SelectionKey.OP_ACCEPT의 값을 갖는다.
int numKeys = selector.readyOps();
// SelectionKey.OP_READ | SelectionKey.OP_ACCEPT의 값을 갖는다.
int numKeys = selector.readyOps();
Selector.readyOps() 메소드가 리턴한 값이 0보다 큰 경우 Selector의 selectedKeys() 메소드를 사용하여 관련된 이벤트 목록을 읽어올 수 있다. selectedKeys() 메소드는 Selector가 등록된 채널과 관련된 SelectionKey의 집합인 Set을 리턴한다. SelectionKey는 관련 채널을 리턴하는 channel() 메소드를 제공하고 있다. 따라서, Selector를 이용하는 경우 전체적인 프로그램 코드는 다음과 같은 구조를 갖게 된다.
// 1. Selector 생성 Selector selector = Selector.open();
// 2. Selector를 등록할 수 있는 채널 생성
ServerSocketChannel channel = ServerSocketChannel.open();
...
// 3. 채널에 Selector 등록
channel.register(selector, SelectionKey.OP_ACCEPT, null);
...
while(true) {
// 4. selector를 이용하여 채널의 이벤트 대기
int readyKey = selector.readyOps();
// 5. readyKey가 0 이상이면 이벤트가 발생한 것으로 처리
if (readyKey > 0) {
// 6. selector로부터 채널에서 발생한 이벤트와 관련된 SelectionKey Set 구함
Set selectionKeySet = selector.selectedKeys();
// 7. Set에서 각 SelectionKey를 차례대로 읽어와
Iterator iter = selectionKeySet.iterator();
while(iter.hasNext()) {
SelectionKey selectionKey = (SelectionKey)iter.next();
// 8. SelectionKey로부터 채널을 구함
ServerSocketChannel relatedChannel =
(ServerSocketChannel)selectionKey.channel();
// 9. 채널을 사용하여 알맞은 작업 수행
...
}
}
}
논블럭킹 IO를 이용한 웹 서버 구축
마지막으로 논블럭킹 IO를 이용하여 간단한 웹 서버를 구현해보도록 하겠다. 여기서 구현할 웹 서버는 다음과 같이 4개의 클래스로 구성되어 있다.
- ClientAcceptor - 클라이언트의 연결을 대기하는 쓰레드. 클라이언트로부터 연결 요청이 들어올 경우 관련 SocketChannel을 SocketChannelQueue에 저장한다.
- ClientProcessor - 클라이언트의 요청을 처리하는 쓰레드. SocketChannelQueue로부터 처리할 SocketChannel을 읽어오며 각 SocketChannel로부터 데이터를 읽어와 알맞은 작업을 수행한다.
- SocketChannelQueue - ChannelSocket을 연결된 순서대로 임시적으로 저장하는 큐
- NIOWebServer - 구동 프로그램.
package madvirus.nioexam;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
/**
* 클라이언트의 연결을 대기한다.
* @author 최범균
*/
public class ClientAcceptor extends Thread {
private int port;
private SocketChannelQueue channelQueue;
private ServerSocketChannel ssc;
private Selector acceptSelector;
public ClientAcceptor(SocketChannelQueue channelQueue, int port)
throws IOException {
this.port = port;
this.channelQueue = channelQueue;
acceptSelector = Selector.open();
ssc = ServerSocketChannel.open();
ssc.configureBlocking(false);
// 지정한 포트에 서버소켓 바인딩
InetSocketAddress address = new InetSocketAddress(port);
ssc.socket().bind(address);
ssc.register(acceptSelector, SelectionKey.OP_ACCEPT);
}
public void run() {
try {
while(true) {
int numKeys = acceptSelector.select();
if (numKeys > 0) {
Iterator iter = acceptSelector.selectedKeys().iterator();
while(iter.hasNext()) {
SelectionKey key = (SelectionKey)iter.next();
iter.remove();
ServerSocketChannel readyChannel =
(ServerSocketChannel)key.channel();
SocketChannel incomingChannel = readyChannel.accept();
System.out.println("ClientAcceptor - 클라이언트 연결됨!");
channelQueue.addLast(incomingChannel);
}
}
}
} catch (IOException e) {
e.printStackTrace();
} finally {
try { ssc.close(); } catch (IOException e1) { }
}
}
}
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
/**
* 클라이언트의 연결을 대기한다.
* @author 최범균
*/
public class ClientAcceptor extends Thread {
private int port;
private SocketChannelQueue channelQueue;
private ServerSocketChannel ssc;
private Selector acceptSelector;
public ClientAcceptor(SocketChannelQueue channelQueue, int port)
throws IOException {
this.port = port;
this.channelQueue = channelQueue;
acceptSelector = Selector.open();
ssc = ServerSocketChannel.open();
ssc.configureBlocking(false);
// 지정한 포트에 서버소켓 바인딩
InetSocketAddress address = new InetSocketAddress(port);
ssc.socket().bind(address);
ssc.register(acceptSelector, SelectionKey.OP_ACCEPT);
}
public void run() {
try {
while(true) {
int numKeys = acceptSelector.select();
if (numKeys > 0) {
Iterator iter = acceptSelector.selectedKeys().iterator();
while(iter.hasNext()) {
SelectionKey key = (SelectionKey)iter.next();
iter.remove();
ServerSocketChannel readyChannel =
(ServerSocketChannel)key.channel();
SocketChannel incomingChannel = readyChannel.accept();
System.out.println("ClientAcceptor - 클라이언트 연결됨!");
channelQueue.addLast(incomingChannel);
}
}
}
} catch (IOException e) {
e.printStackTrace();
} finally {
try { ssc.close(); } catch (IOException e1) { }
}
}
}
ClientAcceptor는 쓰레드로서 생성자에서 클라이언트와 연결된 SocketChannel을 저장할 ChannelQueue와 클라이언트의 연결을 대기할 포트 번호를 전달받는다. 생성자에서는 클라이언트의 연결을 대기할 ServerSocketChannel을 생성하고 지정한 지정한 포트에 바인딩한다. 또한 생성자에서는 ServerSocketChannel을 논블럭킹 모드로 전환하고 ServerSocketChannel에 Selector를 등록한다.
run() 메소드에서는 Selector.readyOps()를 사용하여 클라이언트의 연결이 들어올 경우 ServerSocketChannel의 accept() 메소드를 사용해서 클라이언트와 연결된 SocketChannel을 구한 후 큐에 저장한다.
ClientAcceptor가 클라이언트의 연결 요청을 받아서 관련된 SocketChannel을 큐에 저장하면, ClientProcessor는 그 큐로부터 SocketChannel을 읽어와 클라이언트의 요청을 처리한다. ClientProcessor 클래스의 소스 코드는 다음과 같다.
package madvirus.nioexam;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.CharBuffer;
import java.nio.channels.FileChannel;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.nio.charset.CharacterCodingException;
import java.nio.charset.Charset;
import java.nio.charset.CharsetDecoder;
import java.nio.charset.CharsetEncoder;
import java.util.Iterator;
import java.util.StringTokenizer;
/**
* 클라이언트의 요청을 처리한다.
* @author 최범균
*/
public class ClientProcessor extends Thread {
private SocketChannelQueue channelQueue;
private File rootDirectory;
private Selector readSelector;
private ByteBuffer readBuffer;
private Charset iso8859;
private CharsetDecoder iso8859decoder;
private Charset euckr;
private CharsetEncoder euckrEncoder;
private ByteBuffer headerBuffer;
public ClientProcessor(SocketChannelQueue channelQueue, File rootDirectory)
throws IOException {
this.channelQueue = channelQueue;
this.rootDirectory = rootDirectory;
readSelector = Selector.open();
this.channelQueue.setReadSelector(readSelector);
readBuffer = ByteBuffer.allocate(1024);
// 캐릭터셋 관련 객체 초기화
iso8859 = Charset.forName("iso-8859-1");
iso8859decoder = iso8859.newDecoder();
euckr = Charset.forName("euc-kr");
euckrEncoder = euckr.newEncoder();
initializeHeaderBuffer();
}
private void initializeHeaderBuffer()
throws CharacterCodingException {
CharBuffer chars = CharBuffer.allocate(88);
chars.put("HTTP/1.1 200 OK\n");
chars.put("Connection: close\n");
chars.put("Server: 자바 NIO 예제 서버\n");
chars.put("Content-Type: text/html\n");
chars.put("\n");
chars.flip();
headerBuffer = euckrEncoder.encode(chars);
}
public void run() {
while(true) {
try {
processSocketChannelQueue();
int numKeys = readSelector.select();
if (numKeys > 0) {
processRequest();
}
} catch (IOException e) {
//
}
}
}
private void processSocketChannelQueue() throws IOException {
SocketChannel socketChannel = null;
while ( (socketChannel = channelQueue.getFirst()) != null) {
socketChannel.configureBlocking(false);
socketChannel.register( readSelector, SelectionKey.OP_READ, new StringBuffer());
}
}
private void processRequest() {
Iterator iter = readSelector.selectedKeys().iterator();
while( iter.hasNext() ) {
SelectionKey key = (SelectionKey)iter.next();
iter.remove();
SocketChannel socketChannel = (SocketChannel)key.channel();
try {
socketChannel.read(readBuffer);
readBuffer.flip();
String result = iso8859decoder.decode(readBuffer).toString();
StringBuffer requestString = (StringBuffer)key.attachment();
requestString.append(result);
readBuffer.clear();
if(result.endsWith("\n\n") || result.endsWith("\r\n\r\n")) {
completeRequest(requestString.toString(), socketChannel);
}
} catch (IOException e) {
// 에러 발생
}
}
}
private void completeRequest(String requestData, SocketChannel socketChannel)
throws IOException {
StringTokenizer st = new StringTokenizer(requestData);
st.nextToken();
String requestURI = st.nextToken();
System.out.println(requestURI);
try {
File file = new File(rootDirectory, requestURI);
FileInputStream fis = new FileInputStream(file);
FileChannel fc = fis.getChannel();
int fileSize = (int)fc.size();
ByteBuffer fileBuffer = fc.map(FileChannel.MapMode.READ_ONLY, 0, fileSize);
headerBuffer.rewind();
socketChannel.write(headerBuffer);
socketChannel.write(fileBuffer);
} catch(FileNotFoundException fnfe) {
sendError("404", socketChannel);
} catch (IOException e) {
sendError("500", socketChannel);
}
socketChannel.close();
}
private void sendError(String errorCode, SocketChannel channel)
throws IOException {
System.out.println("ClientProcessor - 클라이언트에 에러 코드 전송:"+errorCode);
CharBuffer chars = CharBuffer.allocate(64);
chars.put("HTTP/1.0 ").put(errorCode).put(" OK\n");
chars.put("Connection: close\n");
chars.put("Server: 자바 NIO 예제 서버\n");
chars.put("\n");
chars.flip();
ByteBuffer buffer = euckrEncoder.encode(chars);
channel.write(buffer);
}
}
import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.CharBuffer;
import java.nio.channels.FileChannel;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.nio.charset.CharacterCodingException;
import java.nio.charset.Charset;
import java.nio.charset.CharsetDecoder;
import java.nio.charset.CharsetEncoder;
import java.util.Iterator;
import java.util.StringTokenizer;
/**
* 클라이언트의 요청을 처리한다.
* @author 최범균
*/
public class ClientProcessor extends Thread {
private SocketChannelQueue channelQueue;
private File rootDirectory;
private Selector readSelector;
private ByteBuffer readBuffer;
private Charset iso8859;
private CharsetDecoder iso8859decoder;
private Charset euckr;
private CharsetEncoder euckrEncoder;
private ByteBuffer headerBuffer;
public ClientProcessor(SocketChannelQueue channelQueue, File rootDirectory)
throws IOException {
this.channelQueue = channelQueue;
this.rootDirectory = rootDirectory;
readSelector = Selector.open();
this.channelQueue.setReadSelector(readSelector);
readBuffer = ByteBuffer.allocate(1024);
// 캐릭터셋 관련 객체 초기화
iso8859 = Charset.forName("iso-8859-1");
iso8859decoder = iso8859.newDecoder();
euckr = Charset.forName("euc-kr");
euckrEncoder = euckr.newEncoder();
initializeHeaderBuffer();
}
private void initializeHeaderBuffer()
throws CharacterCodingException {
CharBuffer chars = CharBuffer.allocate(88);
chars.put("HTTP/1.1 200 OK\n");
chars.put("Connection: close\n");
chars.put("Server: 자바 NIO 예제 서버\n");
chars.put("Content-Type: text/html\n");
chars.put("\n");
chars.flip();
headerBuffer = euckrEncoder.encode(chars);
}
public void run() {
while(true) {
try {
processSocketChannelQueue();
int numKeys = readSelector.select();
if (numKeys > 0) {
processRequest();
}
} catch (IOException e) {
//
}
}
}
private void processSocketChannelQueue() throws IOException {
SocketChannel socketChannel = null;
while ( (socketChannel = channelQueue.getFirst()) != null) {
socketChannel.configureBlocking(false);
socketChannel.register( readSelector, SelectionKey.OP_READ, new StringBuffer());
}
}
private void processRequest() {
Iterator iter = readSelector.selectedKeys().iterator();
while( iter.hasNext() ) {
SelectionKey key = (SelectionKey)iter.next();
iter.remove();
SocketChannel socketChannel = (SocketChannel)key.channel();
try {
socketChannel.read(readBuffer);
readBuffer.flip();
String result = iso8859decoder.decode(readBuffer).toString();
StringBuffer requestString = (StringBuffer)key.attachment();
requestString.append(result);
readBuffer.clear();
if(result.endsWith("\n\n") || result.endsWith("\r\n\r\n")) {
completeRequest(requestString.toString(), socketChannel);
}
} catch (IOException e) {
// 에러 발생
}
}
}
private void completeRequest(String requestData, SocketChannel socketChannel)
throws IOException {
StringTokenizer st = new StringTokenizer(requestData);
st.nextToken();
String requestURI = st.nextToken();
System.out.println(requestURI);
try {
File file = new File(rootDirectory, requestURI);
FileInputStream fis = new FileInputStream(file);
FileChannel fc = fis.getChannel();
int fileSize = (int)fc.size();
ByteBuffer fileBuffer = fc.map(FileChannel.MapMode.READ_ONLY, 0, fileSize);
headerBuffer.rewind();
socketChannel.write(headerBuffer);
socketChannel.write(fileBuffer);
} catch(FileNotFoundException fnfe) {
sendError("404", socketChannel);
} catch (IOException e) {
sendError("500", socketChannel);
}
socketChannel.close();
}
private void sendError(String errorCode, SocketChannel channel)
throws IOException {
System.out.println("ClientProcessor - 클라이언트에 에러 코드 전송:"+errorCode);
CharBuffer chars = CharBuffer.allocate(64);
chars.put("HTTP/1.0 ").put(errorCode).put(" OK\n");
chars.put("Connection: close\n");
chars.put("Server: 자바 NIO 예제 서버\n");
chars.put("\n");
chars.flip();
ByteBuffer buffer = euckrEncoder.encode(chars);
channel.write(buffer);
}
}
ClientProcessor의 소스 코드는 지금까지 설명했던 것들을 코드로 표현한 것에 불과하므로 자세한 설명은 덧붙이지 않겠다. 참고로 각 메소드를 간단하게 설명하자면 다음과 같다.
- ClientProcessor() : 생성자. 데이터를 읽고 쓸때 사용할 인코더/디코더를 생성하고 Selector를 초기화.
- initializeHeaderBuffer() : 클라이언트에 응답을 보낼 때 사용될 헤더 정보를 초기화.
- processSocketChannelQueue() : ClientAcceptor가 클라이언트의 연결이 들어올 때 생성된 큐에 저장한 SocketChannel을 읽어와 논블럭킹 처리 및 Selector를 SocketChannel에 등록.
- run() : Selector를 사용하여 SocketChannel의 이벤트 대기.
- processRequest() : Selector로부터 사용가능한 SelectionKey 목록을 읽어와 알맞은 작업을 수행.
- completeRequest() : 클라이언트가 요청한 문서 데이터를 전송.
- sendError() : 에러가 발생한 경우 헤러 정보를 클라이언트에 전송.
public void run() {
while(true) {
try {
processSocketChannelQueue();
int numKeys = readSelector.select();
if (numKeys > 0) {
processRequest();
}
} catch (IOException e) {
//
}
}
}
private void processSocketChannelQueue() throws IOException {
SocketChannel socketChannel = null;
while ( (socketChannel = channelQueue.getFirst()) != null) {
socketChannel.configureBlocking(false);
socketChannel.register(
readSelector,
SelectionKey.OP_READ, new StringBuffer());
}
}
while(true) {
try {
processSocketChannelQueue();
int numKeys = readSelector.select();
if (numKeys > 0) {
processRequest();
}
} catch (IOException e) {
//
}
}
}
private void processSocketChannelQueue() throws IOException {
SocketChannel socketChannel = null;
while ( (socketChannel = channelQueue.getFirst()) != null) {
socketChannel.configureBlocking(false);
socketChannel.register(
readSelector,
SelectionKey.OP_READ, new StringBuffer());
}
}
위 코드에서 문제가 발생할 수 있는데 그것은 바로 readSelector.select() 메소드는 클라이언트로부터 데이터가 들어올 때에 비로서 값을 리턴한다는 점이다. 좀더 구체적으로 설명하기 위해 다음과 같은 실행 순서를 생각해보자.
ClientAcceptor의 run() ClientProcessor의 run()
-----------------------------------------------------
1 processSocketChannelQueue();
2 >>> acceptSelector.select() readSelector.select()
3 ... [블럭킹상태]
4 channelQueue.addLast(..) [블럭킹상태] --> 2에서 생성된 채널소켓은
5 [블럭킹상태] readSelector 등록불가
6 ... [블럭킹상태]
7 >>> acceptSelector.select() [블럭킹상태]
8 channelQueue.addLast(..) [블럭킹상태] --> 7에서 생성된 채널소켓
9 ... [블럭킹상태] readSelector 등록불가
-----------------------------------------------------
1 processSocketChannelQueue();
2 >>> acceptSelector.select() readSelector.select()
3 ... [블럭킹상태]
4 channelQueue.addLast(..) [블럭킹상태] --> 2에서 생성된 채널소켓은
5 [블럭킹상태] readSelector 등록불가
6 ... [블럭킹상태]
7 >>> acceptSelector.select() [블럭킹상태]
8 channelQueue.addLast(..) [블럭킹상태] --> 7에서 생성된 채널소켓
9 ... [블럭킹상태] readSelector 등록불가
위에서 '>>>' 표시는 클라이언트로부터 연결 요청이 들어왔음을 의미하는데, 4의 과정에서 새로들어온 SocketChannel을 channelQueue에 등록한다. channelQueue에 등록된 SocketChannel은 ClientProcessor.processSocketChannelQueue() 메소드를 통해서 논블럭킹 모드로 지정되고 readSelector를 등록하게 된다. 여기서 실행 순서상 과정 4나 과정 8에서 channelQueue에 새로운 SocketChannel을 등록하더라도 이들 SocketChannel은 readSelector를 등록되지 않은 상태이기 때문에 이들 채널에 데이터가 들어오더라도 readSelector.select() 메소드는 계속해서 블럭킹된다.
이처럼 계속해서 논리적으로 계속해서 블럭킹될 수 밖에 없는데 왜 필자가 이렇게 구현했을까? 그것은 바로 Selector가 제공하는 wakeup()이라는 메소드를 사용하면 위와 같은 문제를 해결할 수 있기 때문이다. 실제로 ChannelSocketQueue의 addLast() 메소드는 다음과 같이 구현되어 있다.
public void addLast(SocketChannel channel) {
list.addLast(channel);
readSelector.wakeup();
}
list.addLast(channel);
readSelector.wakeup();
}
위 코드에서 readSelector는 ClientProcessor의 readSelector와 동일한 객체를 가리키는데, Selector의 wakeup() 메소드를 호출하게 되면 블럭킹되어 있던 select() 메소드는 곧바로 리턴된다. 즉, 앞에서 살펴봤던 블럭킹 문제가 발생하지 않게 되는 것이다.
마지막으로 NIOWebServer 클래스는 앞에서 ClientAcceptor 쓰레드와 ClientProcessor 쓰레드를 구동한다. 소스 코드는 복잡하지 않으므로 첨부한 소스 코드를 참고하기 바란다. 실행은 다음과 같이 하면 된다. (소스 코드에 컴파일한 .class 파일도 포함되어 있으니 그대로 사용할 수 있을 것이다.)
c:\>java madvirus.nioexam.NIOWebServer Y:\docuemtRoot 80
위와 같이 실행한 후 웹브라우저를 띄워서 http://localhost/index.html 과 같은 알맞은 URL을 입력해서 실제로 웹 서버가 제대로 동작하는 지 확인해보기 바란다.
결론
본 글에서는 NIO API의 논블럭킹 IO에 대해서 살펴보았다. 앞에서 작성한 웹 서버의 경우 단 두 개의 쓰레드로(ClientAcceptor와 ClientProcessor) 모든 클라이언트의 요청을 처리하고 있다. 물론 상황에 따라서 ClientProcessor 쓰레드의 개수를 증가시킬 수 있겠지만 기본적으로 다수의 클라이언트를 하나의 쓰레드에서 성능저하현상없이 처리할 수 있게 해주는 원천이 바로 논블록킹 IO의 막강한 장점이라 할 수 있다.
관련링크: