주요글: 도커 시작하기
반응형
CommonJ의 Work Manager API에 대해 살펴보고, 쓰레드풀 기반의 WorkManager를 구현해본다.

Work Manager API 소개

CommonJ의 Work Manager API는 BEA와 IBM이 함께 정의한 API로서 서블릿이나 EJB와 같은 환경에서 Concurrent 프로그래밍 API를 제공하기 위한 목적으로 만들어졌다. Work Manager API는 콘테이너의 관리 하에 어플리케이션이 다수의 작업을 동시에 수행할 수 있도록 해 주는 고수준의 모델을 제공한다. 즉, 서블릿 콘테이너나 EJB 컨테이너에 의해 관리되는 환경에서 Thread를 직접 사용하지 않고 쓰레드 프로그래밍을 할 수 있도록 도와준다.

Thread를 직접 사용하지 않고 쓰레드 프로그래밍을 한다는 점에서 Work Manager API는 자바 5에 새롭게 추가된 Executor 인터페이스와 Callable/Future와 비슷하다. 하지만, 자바 5의 Executor 및 Callable/Future가 작업을 실행하는 것에 초점이 맞춰진 모델이라면, Work Manager API는 작업 실행 뿐만 아니라 작업 수용, 완료, 거절 등 작업을 관리하는 기능이 추가된 모델이다. 따라서, 좀 더 정교한 작업 관리 모델을 원한다면 Work Manager API를 사용하는 것을 고려해 볼 수 있다.

Work Manager API의 주요 구성 요소

Work Manager API의 주요 인터페이스는 다음 그림과 같다.


WorkManager 인터페이스는 작업을 관리하는 데 사용되는 메서드를 제공하고 있다. WorkManager.schedule() 메서드는 schedule() 메서드는 수행될 작업 정보를 담고 있는 Work 객체를 전달받으며, 작업의 상태를 확인할 수 있는 WorkItem 객체를 리턴한다. WorkManager 인터페이스를 구현한 클래스는 알맞은 전략에 따라서 schedule() 메서드를 통해 전달받은 Work 객체를 실행하면 된다. 예를 들어, WorkManager는 schedule() 메서드를 통해서 전달받은 Work 객체를 큐에 넣은 뒤 순차적으로 Work 객체를 실행할 수 있을 것이다.

WorkManager의 waitForAll() 메서드는 첫 번째 파라미터로 전달받은 콜렉션에 포함된 WorkItem 목록과 관련된 모든 작업이 끝날 때 까지 대기한다. 만약 지정한 대기 시간안에 콜렉션으로 전달한 작업이 끝나면 waitForAll() 메서드는 true를 리턴하고, 그렇지 않을 경우 false를 리턴한다. waitForAny() 메서드의 경우는 콜렉션과 관련된 작업 중 한 개 이상의 작업이 종료될 때 까지 대기한다. 대기 시간 안에 한 개 이상의 작업이 종료되면 종료된 작업과 관련된 WorkItem을 담은 콜렉션을 리턴하며, 종료된 작업이 없을 경우 크기가 0인 콜렉션을 리턴한다.

waitForAll() 메서드와 waitForAny() 메서드의 두 번째 파라미터는 1/1000초 단위의 대기 시간을 의미하며, 아래의 두 값은 WorkManager에 정의된 상수값으로서 특별한 의미를 갖는다.

  • WorkManager.IMMEDIATE: 완료된 작업이 존재하는 지 검사를 수행하고 곧 바로 결과를 리턴한다.
  • WorkManager.INDEFINITE: 무한정 대기한다. waitForAll()의 경우는 모든 작업이 종료될 때 까지 대기하며, waitForAny()의 경우는 한 개 이상의 작업이 종료될 때 까지 대기한다.
Work 인터페이스는 작업을 표현한다. Work는 Runnable 인터페이스를 상속받고 있으며 작업을 실행하는 코드는 run() 메서드에 위치하게 된다. isDaemon() 메서드는 이 작업이 데몬인지의 여부를 리턴한다. isDaemon() 메서드가 true를 리턴할 경우 해당 작업은 데몬으로 실행되어야 한다는 것을 의미하며, 따라서 WorkManager는 알맞게 해당 작업을 실행해 주어야 한다. release() 메서드는 Work의 작업을 종료할 때 호출되는 메서드로서 작업을 수행하는 동안 사용한 자원 등을 이 메서드에서 반납하면 된다.

WorkItem 인터페이스는 WorkManager.schedule() 메서드가 리턴하는 객체로서, WorkManager에 의해 관리되는 Work의 상태를 확인할 때 사용된다. getResult() 메서드는 연관된 Work 객체를 리턴한다. getStatus() 메서드는 Work의 작업 진행 상태 값을 리턴하며, 진행 상태와 관련된 상수값은 WorkEvent 클래스에 정의되어 있다. 아래 그림은 WorkEvent 클래스에 정의된 상수를 보여주고 있다.


Work Manager API를 사용하는 전형적인 코드 형태는 다음과 같다.

WorkManager workManager = ...; // 사용할 WorkManager를 구함

Work myWork1 = new SomeWork();
Work myWork2 = new SomeWork();

// WorkManager에 작업 실행 요청
WorkItem workItem1 = workManager.schedule(myWork1);
WorkItem workItem2 = workManager.schedule(myWork2);

List<WorkItem> workItems = new ArrayList();
workItems.add(workItem1);
workItems.add(workItem2);

// 작업이 종료될 때 까지 대기
boolean allCompleted = workManager.waitForAll(workItems, WorkManager.INDEFINITE);

if (allCompleted) {
    for (WorkItem workItem : workItems) {
        SomeWork work = (SomeWork)workItem.getResult();
        // 알맞은 후속 처리
    }
} else {
    ...
}

웹스피어나 웹로직과 같은 WAS 서버는 WorkManger 객체를 JNDI로부터 구할 수 있도록 하고 있기 때문에, 이 WAS 서버를 사용할 경우 개발자는 Work 구현 클래스만 알맞게 작성해 주면 WorkManager를 통해서 작업을 실행할 수 있게 된다. 아래 코드는 JNDI로부터 WorkManager를 구하는 코드의 예를 보여주고 있다.

InitialContext ctx = new InitialContext();
String jndiName = "java:comp/env/wm/default";
workManager = (WorkManager)ctx.lookup(jndiName);

WorkListener

앞서 WorkEvent 클래스 다이어그램을 보면 작업은 'ACCEPTED', 'REJECTED', 'STARTED', 'COMPLETED'의 네 가지 상태를 가질 수 있음을 알 수 있다. WorkManager는 작업의 상태가 변경될 때, 관련 이벤트를 이벤트 리스너에 전달하는 방법을 제공하기 위해 다음과 같은 schedule() 메서드를 제공하고 있다.

WorkItem schedule(Work work, WorkListener wl)

위 schedule() 메서드는 첫 번째 파라미터로 전달받은 Work 객체의 상태가 변경될 때 마다 WorkListener에 통지해 주는 기능을 제공한다. 예를 들어, 전달받은 작업의 상태가 ACCEPTED에서 STARTED로 변경되거나, STARTED에서 COMPLETED로 변경될 경우 WorkListener의 알맞은 메서드를 호출해줌으로써 WorkListener가 상태 변화를 통지 받을 수 있도록 하고 있다. 아래 그림은 WorkListener 인터페이스가 어떻게 정의되어 있는 지를 보여주고 있다.


Work Manager API 구현 클래스 직접 만들기

WAS가 제공하는 WorkManager 객체를 이용해서 Work를 실행할 수 있지만, Work Manager API를 직접 구현할 수도 있다. 본 글에서는 자바 5의 Concurrent API를 이용해서 작업을 동시에 실행해주는 WorkManager 구현 클래스를 작성해 보겠다.

Work Manager API를 직접 구현한다는 것은 아래의 인터페이스를 구현하다는 것과 같다.

  • WorkManager 인터페이스
  • WorkItem 인터페이스
  • WorkEvent 인터페이스
WorkManager 인터페이스 구현

본 글에서는 Java Concurrent API가 제공하는 쓰레드 풀 API를 이용해서 WorkManager 인터페이스를 구현할 것이다. 아래 코드는 실제 구현 코드이다.

package net.madvirus.javacan.commonj.wm;

...

/**
 * 쓰레드 풀을 사용해서 작업을 실행한다.
 * 
 * waitForAll() 메서드와 waitForAny() 메서드는 Jonas Boner가 작성한 코드를 참고하여 작성하였다.
 * 
 * @author madvirus
 */
public class ThreadPoolWorkManager implements WorkManager {
    public static final int DEFAULT_POOL_SIZE = 10;

    private ExecutorService executorService;

    public ThreadPoolWorkManager() {
        this(DEFAULT_POOL_SIZE);
    }

    public ThreadPoolWorkManager(int poolSize) {
        executorService = Executors.newFixedThreadPool(poolSize);
    }

    public WorkItem schedule(Work work) throws IllegalArgumentException {
        return schedule(work, null);
    }

    public WorkItem schedule(final Work work, final WorkListener listener)
            throws IllegalArgumentException {
        if (work.isDaemon()) {
            throw new IllegalArgumentException("damon work not supported");
        }

        final DefaultWorkItem workItem = new DefaultWorkItem(work, listener);
        executorService.execute(new Runnable() {
            public void run() {
                try {
                    workItem.setStatus(WorkEvent.WORK_STARTED, null);
                    work.run();
                    workItem.setStatus(WorkEvent.WORK_COMPLETED, null);
                } catch (Throwable e) {
                    workItem.setStatus(WorkEvent.WORK_REJECTED,
                            new WorkException(e));
                }
            }
        });
        return workItem;
    }
    ...

schedule() 메서드는 파라미터로 전달받은 work 객체로부터 DefaultWorkItem 객체를 생성한다. 그런 뒤, ExecutorService를 이용해서 작업을 실행한다. ExecutorService에 전달되는 Runnable 타입의 임의 객체는 DefaultWorkItem의 setStatus() 메서드를 사용해서 작업의 상태를 변경하고, Work.run() 메서드를 호출함으로써 작업을 실행하게 된다.

WorkManager의 waitForAll() 및 waitForAny() 메서드는 다음과 같이 구현하였다. 참고로, 이 코드는 Jonas Boner가 작성한 코드를 그대로 사용하였다.

    @SuppressWarnings("unchecked")
    public boolean waitForAll(Collection workItems, long timeout)
            throws InterruptedException, IllegalArgumentException {
        long start = System.currentTimeMillis();
        do {
            synchronized (this) {
                boolean isAllCompleted = true;
                for (Iterator it = workItems.iterator(); it.hasNext()
                        && isAllCompleted;) {
                    int status = ((WorkItem) it.next()).getStatus();
                    isAllCompleted = status == WorkEvent.WORK_COMPLETED
                            || status == WorkEvent.WORK_REJECTED;
                }
                if (isAllCompleted) {
                    return true;
                }
                if (timeout == IMMEDIATE) {
                    return false;
                }
                if (timeout == INDEFINITE) {
                    continue;
                }
            }
        } while ((System.currentTimeMillis() - start) < timeout);
        return false;
    }

    @SuppressWarnings("unchecked")
    public Collection waitForAny(Collection workItems, long timeout)
            throws InterruptedException, IllegalArgumentException {
        long start = System.currentTimeMillis();
        do {
            synchronized (this) {
                Collection<WorkItem> completed = new ArrayList<WorkItem>();
                for (Iterator it = workItems.iterator(); it.hasNext();) {
                    WorkItem workItem = (WorkItem) it.next();
                    if (workItem.getStatus() == WorkEvent.WORK_COMPLETED
                            || workItem.getStatus() == WorkEvent.WORK_REJECTED) {
                        completed.add(workItem);
                    }
                }
                if (!completed.isEmpty()) {
                    return completed;
                }
            }
            if (timeout == IMMEDIATE) {
                return Collections.EMPTY_LIST;
            }
            if (timeout == INDEFINITE) {
                continue;
            }
        } while ((System.currentTimeMillis() - start) < timeout);
        return Collections.EMPTY_LIST;
    }
}

WorkItem과 WorkEvent 구현

ThreadPoolWorkManager 클래스는 내부적으로 DefaultWorkItem 클래스를 사용하였는데, DefaultWorkItem 클래스는 다음과 같이 구현되었다. 참고로, DefaultWorkItem 클래스와 곧 이어 보여줄 DefaultWorkEvent 클래스 역시 ThreadPoolWorkManager의 waitForAll()/waitForAny() 메서드와 마찬가지로 Jonas Boner가 작성한 코드를 그대로 사용하였다.

public class DefaultWorkItem implements WorkItem {

    protected int status;
    protected Work work;
    protected WorkListener workListener;

    public DefaultWorkItem(Work work, WorkListener workListener) {
        this.work = work;
        this.status = WorkEvent.WORK_ACCEPTED;
        this.workListener = workListener;
    }

    public Work getResult() {
        return work;
    }

    public synchronized void setStatus(final int status,
            final WorkException exception) {
        this.status = status;
        if (workListener != null) {
            switch (status) {
            case WorkEvent.WORK_ACCEPTED:
                workListener.workAccepted(new DefaultWorkEvent(
                        WorkEvent.WORK_ACCEPTED, this, exception));
                break;
            case WorkEvent.WORK_REJECTED:
                workListener.workRejected(new DefaultWorkEvent(
                        WorkEvent.WORK_REJECTED, this, exception));
                break;
            case WorkEvent.WORK_STARTED:
                workListener.workStarted(new DefaultWorkEvent(
                        WorkEvent.WORK_STARTED, this, exception));
                break;
            case WorkEvent.WORK_COMPLETED:
                workListener.workCompleted(new DefaultWorkEvent(
                        WorkEvent.WORK_COMPLETED, this, exception));
                break;
            }
        }
    }

    public synchronized int getStatus() {
        return status;
    }

    @SuppressWarnings("unchecked")
    public int compareTo(Object compareTo) {
        Work work = ((WorkItem) compareTo).getResult();
        if (work instanceof Comparable) {
            Comparable comparableWork1 = (Comparable) work;
            if (work instanceof Comparable) {
                Comparable comparableWork2 = (Comparable) work;
                return comparableWork1.compareTo(comparableWork2);
            }
        }
        return 0;
    }

    public String toString() {
        String statusLabel = null;
        switch (this.status) {
        case WorkEvent.WORK_ACCEPTED:
            statusLabel = "WORK_ACCEPTED";
            break;
        case WorkEvent.WORK_COMPLETED:
            statusLabel = "WORK_COMPLETED";
            break;
        case WorkEvent.WORK_REJECTED:
            statusLabel = "WORK_REJECTED";
            break;
        case WorkEvent.WORK_STARTED:
            statusLabel = "WORK_STARTED";
            break;
        default:
            throw new IllegalStateException("illegal (unknown) status "
                    + statusLabel);
        }
        return work.toString() + ":" + statusLabel;
    }

}

DefaultWorkItem 클래스는 setStatus() 메서드를 제공하고 있으며, 이 메서드를 사용해서 작업의 상태를 변경할 수 있도록 하고 있다. setStatus() 메서드는 작업의 상태가 변경되면, 관련 WorkListener에 변경 내역을 통지한다. 이때 전달되는 WorkEvent의 구현 클래스인 DefaultWorkEvent 클래스는 다음과 같다.

public class DefaultWorkEvent implements WorkEvent {

    private int type;
    private WorkItem workItem;
    private WorkException exception;

    public DefaultWorkEvent(int type, WorkItem item, WorkException exception) {
        this.type = type;
        this.workItem = item;
        this.exception = exception;
    }

    public int getType() {
        return type;
    }

    public WorkItem getWorkItem() {
        return workItem;
    }

    public WorkException getException() {
        return exception;
    }
}

ThreadPoolWorkManager를 이용하여 작업 관리하기

이제 남은 작업 앞서 구현한 WorkManager를 사용해서 작업을 실행하는 것이다. Work 인터페이스를 구현해서 작업을 실행하는 클래스를 만들고, 해당 클래스의 객체를 생성한 뒤 WorkManager의 schedule() 메서드에 전달만 하면 된다. 아래 코드는 사용 예이다.

WorkManager workManager = new ThreadPoolWorkManager(5);
WorkItem workItem1 = workManager.schedule(someWork1);
WorkItem workItem2 = workManager.schedule(someWork2);

List<WorkItem> workItems = new ArrayList<WorkItem>();
workItems.add(workItem1);
workItems.add(workItem2);

boolean allCompleted = workManager.waitForAll(workItems, WorkManager.INDEFINITE);
if (allCompleted) {
    for (WorkItem workItem : workItems) {
        SomeWork someWork = ((SomeWork)workItem.getResult());
        ...
    }
}

Work Manager API가 자바 5부터 새롭게 추가된 Concurrent API와 비슷한 기능을 (즉, 여러 작업을 동시에 실행하는 기능을) 제공하고 있지만, Work Manager API는 작업의 라이프 사이클을 추가로 제공하고 있어 작업에 대한 정교한 처리가 가능하다는 장점을 갖고 있다. 또한, 작업의 상태가 변경될 경우 WorkListener를 통해서 변경 관련 이벤트를 전달받을 수 있기 때문에, WorkManager가 작업을 거부하거나 또는 작업이 실패할 경우 작업을 손쉽게 재시도 할 수 있다.

관련링크:

+ Recent posts