주요글: 도커 시작하기
반응형
파이프 스트림을 사용하여 쓰레드간에 데이터를 주고 받고 쓰레드를 제어 하는 것에 대하여 살펴본다.

파이프 스트림의 기본 특징

리눅스나 기타 시스템에서 프로세스 사이에 데이터를 주고 받을 때 사용되는 방법 중의 하나가 파이프를 이용하는 것이다. 파이프를 이용하여 다른 프로세스에 데이터를 전달하거나 전달받을 수도 있고, 또한 파이프를 통해서 한 프로세스가 다른 프로세스를 제어할 수도 있다. 이와 비슷하게 자바도 파이프 스트림을 사용하여 쓰레드 사이에 데이터를 주고 받거나 쓰레드의 흐름을 제어할 수 있다. 이 글에서는 이에 대한 내용을 살펴볼 것이며, 만약 파이프 스트림에 대해서 잘 알지 못한다면 기초 서적을 읽어보기 바란다.

파이프 스트림은 다음 그림과 같이 입력 스트림과 출력 스트림이 서로 연결된 스트림이다.


두 스트림이 서로 연결되어 있다는 것은 PipedOutputStream을 통해서 출력한 데이터를 PipedInputStream을 통해서 읽어온다는 것을 의미한다. 즉, PipedInputStream의 read() 메소드는 PipedOutputStream의 write() 메소드를 사용하여 출력한 데이터를 읽어오게 되며, 만약 PipedOutputStream이 데이터를 출력하지 않는다면 PipedInputStream의 read() 메소드는 읽어올 수 있는 데이터가 없으므로 블럭킹된다. 이 글에서는 이처럼 입력 스트림이 읽어올 데이터가 없을 때 블럭킹되는 특징과 파이프 스트림이 서로 연결되어 있다는 두 특징을 사용할 것이다.

멀티 쓰레드와 파이프 스트림

파이프 스트림은 여러개의 쓰레드가 서로 데이터를 주고 받아야 하는 경우에 유용하게 사용될 수 있다. 예를 들어, 공장의 컨베이어 벨트 시스템을 시뮬레이션하는 프로그램을 개발해야 하는데, 이 콘베이어 벨트 시스템은 '가열'-'제조'-'포장'의 세 라인이 유기적으로 동작하는 시스템이라고 가정해보자. 이때, '가열' 라인을 통해서 처리된 원료는 '제조'라인으로 전달되고, '제조' 라인을 통해서 생성된 제품은 '포장' 라인에 전달되어 포장처리 된다고 하자.

이 경우 '제조' 라인은 '가열' 라인을 통해서 가열된 재료를 전달받기 전까지는 어떤 작업도 할 수 없으며, 마찬가지로 '포장' 라인은 '제조' 라인을 통해서 만들어진 제품이 전달되기 전까지는 어떤 제품도 포장할 수 없다. 그리고 이 세 라인은 유기적으로 연결되어 있지만 서로 독립적으로 동작한다. 단지, 자원을 이전 단계에서 전달받기만 하면 되는 것이다.

이러한 시스템을 개발할 경우 각각의 라인은 별도의 쓰레드를 통해서 구현될 것이다. 다음 그림을 살펴보자.


위 그림과 같이 세 개의 쓰레드가 실행되면서 서로 데이터를 주고 받아야만 한다. 단, 데이터를 주고 받을 때 '가열쓰레드'에서 데이터 처리가 완료되지 않으면 '제조쓰레드'는 블럭킹 상태여야 하며, 비슷하게 '제조쓰레드'에서 데이터를 처리한 후에 '포장쓰레드'가 필요한 작업을 수행해야 한다.

이처럼 쓰레드가 상호간에 데이터를 주고 받으면서 동시에 데이터를 생성되는 순서에 따라서 쓰레드의 실행을 제어해야 하는 경우 파이프 스트림을 사용하면 된다. 즉, '가열쓰레드'와 '제조쓰레드'는 서로 파이프를 사용하여 '가열쓰레드'가 생상한 데이터를 '제조쓰레드'가 받고, 마찬가지로 '제조쓰레드'와 '포장쓰레드'도 또다른 파이프를 사용하여 데이터를 주고 받도록 하는 것이다. 다음 리스트1은 공장 라인의 흐름을 구현한 예제 클래스이다. 비록 리스트1이 간단하긴 하지만 다중 쓰레드 환경에서 파이프 스트림이 어떻게 사용되는 지 보여주는 예제이므로 구현 방법을 익혀두기 바란다.

리스트1 ConveyorBelt.java
001: package conveyor.simulator;
002: 
003: import java.io.*;
004: 
005: /**
006:  * 콘베이어 벨트 시뮬레이션 프로그램
007:  * 가열라인, 제조라인, 포장라인과 관련된 쓰레드를
008:  * 실행시킨다.
009:  */
010: public class ConveyorBelt {
011:     public static void main(String[] args) {
012:         // 가열 라인과 제조 라인 사이에 데이터를 주고 받을 때
013:         // 사용되는 파이프 스트림
014:         PipedOutputStream toProducingLine = null;
015:         PipedInputStream fromHeatingLine = null;
016:         
017:         // 제조 라인과 포장 라인 사이에 데이터를 주고 받을 때
018:         // 사용되는 파이프 스트림
019:         PipedOutputStream toPackingLine = null;
020:         PipedInputStream fromProducingLine = null;
021:         
022:         try {
023:             toProducingLine = new PipedOutputStream();
024:             fromHeatingLine = new PipedInputStream(toProducingLine);
025:             
026:             toPackingLine = new PipedOutputStream();
027:             fromProducingLine = new PipedInputStream(toPackingLine);
028:             
029:             HeatingLine heating = new HeatingLine(toProducingLine);
030:             ProducingLine producing = new ProducingLine(
031:                                       fromHeatingLine, toPackingLine);
032:             PackingLine packing = new PackingLine(fromProducingLine);
033:             
034:             packing.start();
035:             producing.start();
036:             heating.start();
037:             
038:             try {
039:                 packing.join(); // 포장 라인이 종료될 때 까지 기다린다.
040:             } catch(InterruptedException ex) { }
041:         } catch(IOException ex) {
042:             System.out.println("스트림 생성시 이상 발생:"+ex.getMessage());
043:         }
044:     }
045: }
046: // 가열 라인을 구현한 쓰레드
047: class HeatingLine extends Thread {
048:     
049:     private PipedOutputStream toProducingLine;
050:     
051:     HeatingLine(PipedOutputStream toProducingLine) {
052:         this.toProducingLine = toProducingLine;
053:     }
054:     
055:     public void run() {
056:         // 자원으로 사용할 데이터
057:         int[] sourceData = { 0x01, 0x02, 0x03, 0x04, 0x05, 
058:                               0x06, 0x07, 0x08, 0x09, 0x0a};
059:         
060:         try {
061:             for (int i = 0 ; i < sourceData.length ; i++) {
062:                 // 제조 라인으로 원료 데이터 전송
063:                 toProducingLine.write(sourceData[i]);
064:             }
065:             // 원료 없음을 나타내는 값인 '0'을 전송
066:             toProducingLine.write(0x00);
067:         } catch(IOException ex) {
068:             System.out.println("가열 라인에서 이상 발생:"+ex.getMessage());
069:         }
070:     }
071: }
072: // 생산 라인을 구현한 쓰레드
073: class ProducingLine extends Thread {
074:     
075:     private PipedInputStream fromHeatingLine;
076:     private PipedOutputStream toPackingLine;
077:                   
078:     ProducingLine(PipedInputStream fromHeatingLine, 
079:                   PipedOutputStream toPackingLine) {
080:         this.fromHeatingLine = fromHeatingLine;
081:         this.toPackingLine = toPackingLine;
082:     }
083:     
084:     public void run() {
085:         try {
086:             int dataFromHeating;
087:             int producingData;
088:             
089:             while(true) {
090:                 // 가열 라인에서 데이터 받음
091:                 dataFromHeating = fromHeatingLine.read();
092:                 
093:                 // 가열 라인에서 받은 데이터 가공처리
094:                 producingData = dataFromHeating * 2;
095:                 
096:                 // 가공처리된 데이터 포장 라인에 전송
097:                 toPackingLine.write(producingData);
098:                 
099:                 if (dataFromHeating == 0x00) {
100:                     break;
101:                 }
102:             }
103:         } catch(IOException ex) {
104:             System.out.println("제조 라인에서 이상 발생:"+ex.getMessage());
105:         }
106:     }
107: }
108: // 포장 라인을 구현한 쓰레드
109: class PackingLine extends Thread {
110:     
111:     private PipedInputStream fromProducingLine;
112:     
113:     PackingLine(PipedInputStream fromProducingLine) {
114:         this.fromProducingLine = fromProducingLine;
115:     }
116:     
117:     public void run() {
118:         int index = 1;
119:         try {
120:             int dataFromProducing;
121:             int finalData;
122:             
123:             while(true) {
124:                 // 제조 라인에서 데이터 받음
125:                 dataFromProducing = fromProducingLine.read();
126: 
127:                 if (dataFromProducing == 0x00) {
128:                     break;
129:                 }
130:                 
131:                 // 제조 라인에서 받은 데이터 포장처리
132:                 finalData = dataFromProducing + 100;
133:                 
134:                 System.out.println((index++)+"번째 제품 = "+finalData);
135:             }
136:         } catch(IOException ex) {
137:             System.out.println("포장 라인에서 이상 발생:"+ex.getMessage());
138:         }
139:     }
140: }

ConveyBelt의 소스 코드가 간단한 예제치곤 좀 긴 편이므로 차근히 분석해보자. 먼저, 리스트1은 네 개의 클래스로 구성되어 있으며, 각각의 클래스는 다음과 같은 역할을 한다.

  • ConveyorBelt - 프로그램 실행을 위한 클래스. 파이프 스트림을 초기화하고 각각의 라인을 실행시킨다.
  • HeatingLine - 가열 라인을 처리하는 클래스. ProducingLine에 바이트 데이터를 전달해준다.
  • ProducingLine - 제조 라인을 처리하는 클래스. HeatingLine으로부터 데이터를 입력받아 변형한 후 PackingLine에 전달해준다.
  • PackingLine - 포장 라인을 처리하는 클래스. ProducingLine으로부터 데이터를 입력받아 최종 결과물을 출력한다.
HeatingLine, ProducingLine, PackingLine 클래스를 쓰레드이며, ConveyorBelt 클래스가 이 세 쓰레드를 실행한다. ConveyorBelt 클래스는 라인23-27에서 이 세 쓰레드가 데이터를 주고 받을 때 사용하는 파이프를 초기화한 후, 각각의 쓰레드를 생성할 때 파라미터로 전달해준다.(라인29-32)

이 결과 세 쓰레드는 다음 그림과 같이 파이프를 통해서 데이터를 주고 받을 수 있게 된다.


여기서 중요한 점은 HeatingLine에서 toProducingLine을 통해서 데이터를 출력하기 전에는(라인63) ProducingLine이 블럭킹되어 있다는(라인91) 점이다. 마찬가지로 ProducingLine이 toPackingLine을 통해서 데이터를 출력하기 전에는(라인97) PackingLine은 블럭킹되어 있게(라인125) 된다.

이처럼 두 개의 쓰레드가 파이프를 통해서 데이터를 주고 받게 될 경우 쓰레드의 흐름을 제어하는 효과를 내게 된다. 즉, 두 개의 쓰레드가 단방향 통신을 하게 되는 것인데, 파이프 스트림이 다중 쓰레드 환경에서 유용하게 사용될 수 있음을 알 수 있다.

이제 마지막으로 ConveyorBelt 클래스를 실행해보자. ConveyorBelt 클래스를 실행하면 다음과 같은 결과가 출력된다.

    C:\>java conveyor.simulator.ConveyorBelt
    1번째 제품 = 102
    2번째 제품 = 104
    3번째 제품 = 106
    4번째 제품 = 108
    5번째 제품 = 110
    6번째 제품 = 112
    7번째 제품 = 114
    8번째 제품 = 116
    9번째 제품 = 118
    10번째 제품 = 120

위 결과를 통해서 파이프를 통해서 HeatingLine에 있는 데이터가 PackingLine까지 전달되면서 알맞게 처리된 것을 확인할 수 있을 것이다.

결론

이번 글에서는 기본 입출력 스트림중의 하나인 파이프 스트림을 사용하여 쓰레드 간에 데이터를 주고 받고 쓰레드의 흐름을 제어하는 방법에 대해서 살펴보았다. 사실 이러한 형태의 쓰레드간 데이터 교환 및 쓰레드 제어는 어플리케이션을 구현할 때 많이 사용되는 방법중의 하나이며, 파이프 대신에 큐를 사용하기도 한다.

+ Recent posts