MockK는 코틀린을 위한 Mock 프레임워크이다. 자바에서 주로 사용하는 Mockito와 유사해서 약간만 노력하면 쉽게 적응할 수 있다. 이 글에서는 MockK의 간단한 사용법을 소개하며 더 다양한 사용법은 https://mockk.io/ 사이트에서 확인할 수 있다.

의존 설정

MockK를 사용하려면 먼저 다음 의존을 추가한다.

<dependency>
    <groupId>io.mockk</groupId>
    <artifactId>mockk</artifactId>
    <version>1.9.3</version>
    <scope>test</scope>
</dependency>

코틀린 1.2 버전을 사용하면 1.9.3.kotlin12 버전을 사용한다.

모의 객체 생성

io.mockk.mockk 함수를 이용해서 모의 객체를 생성한다. 다음은 생성 예이다.

// 1. mockk<타입>()
private val mockValidator1 = mockk<CreationValidator>()

// 2. 타입 추론
private val mockValidator2 : CreationValidator = mockk()

mockk 함수는 타입 파라미터를 이용해서 생성할 모의 객체의 타입을 전달받는다. 변수나 프로퍼티의 타입이 명시적으로 정의되어 있으면 타입 추론이 가능하므로 생략해도 된다.

Answer 정의

모의 객체를 생성했다면 모의 객체가 어떻게 동작할지 정의할 차례이다. 아주 간단하다. io.mockk.every 함수를 사용하면 된다. 다음은 예이다.

@Test
fun someMockTest() {
    every { mock.someMethod(1) } returns "OK" // "OK" 리턴
    every { mock.someMethod(2) } throws SomeException() // 익셉션 발생
    every { mock.call() } just Runs // Unit 함수 실행
    
    assertEquals("OK", mock.someMethod(1))
    assertThrows<SomeException> { mock.someMethod(2) }
}

임의의 인자와 일치

임의의 인자 값과 일치하도록 설정하려면 any()를 사용한다.

@Test
fun someMockTest() {
    every { mock.anyMethod(any(), 3) } returns "OK"
    
    assertEquals("OK", mock.anyMethod(10, 3))
}

Relaxed mock

MockK는 호출 대상에 대한 스텁 정의를 하지 않으면 오류를 발생한다. 

val mock = mockk<Some>()

mock.someMethod(1) // --> io.mockk.MockKException: no answer found for: Some(#1).someMethod(1)

이를 완화하는 방법은 Relaxed mock을 생성하는 것이다. mockk()의 relaxed 파라미터 값을 true로 전달하면 Relaxed mock을 생성할 수 있다.

val mock = mockk<Some>(relaxed = true)
mock.someMethod(1) // --> 0 리턴

리턴 타입이 Unit인 함수는 relaxUnitFun 파라미터 값을 true로 전달한다.

호출 여부 검증

io.mockk.verify 함수를 사용해서 호출 여부를 검증할 수 있다.

val mock = mockk<Some>(relaxed = true)

mock.someMethod(1)
mock.anyMethod(1, 3)

verify { mock.someMethod(1) }
verify { mock.anyMethod(any(), 3) }

인자 캡처

인자를 캡처하고 싶을 땐 slot()과 capture()를 사용한다. 다음은 사용 예를 보여준다.

val mock = mockk<Some>()

val argSlot = slot<Int>()
every { mock.someMethod(capture(argSlot)) } returns 3

mock.someMethod(5)

val realArg = argSlot.captured
assertEquals(5, realArg)

이 글에서는 기본적인 MockK의 사용법을 소개했다. MockK는 더 다양한 기능을 제공하므로 https://mockk.io 사이트를 구경해보자. 도움이 되는 기능을 찾을 수 있을 것이다.

졸업 전만 해도 굉장한 개발자가 되고 싶었다. 뛰어난 설계 능력과 코딩 속도를 자랑하는 그런 실력자 말이다. 이런 막연한 목표는 오래가지 않아 사라졌다. 3-4년 정도 경력을 쌓는 동안 '적당히 잘하는 개발자'로 원하는 수준이 바뀌었다. 언제인지도 모르게 '굉장한' 개발자가 되기 어렵다는 걸 깨닫고 나름 노력하면 될 수 있는 '적당히 잘하는'으로 목표를 낮춘 것이다. 회사 생활을 하면서 뭔가 대단한 걸 만들 재주가 없다는 것을 알게 되었고 남이 만든 거라도 잘 쓰면 다행이란 생각을 하시 시작했다.

사회 초년기에 또 하나 깨달은 건 '기술'만으로는 일이 되지 않으며 기술은 일이 되게 하는 여러 요소 중 하나라는 사실이었다. 기술력이 없으면 안 되는 경우도 있겠지만 꽤 많은 프로젝트가 기술 난이도가 아닌 다른 이유로 실패하는 것을 경험했다. 기술에 대한 욕심이 줄고 다가올 일을 수행하는데 필요한 역량에 초점을 맞추기 시작한 것도 이 시기이다.

다다르고 싶은 수준이 내려가고 기술 외에 다른 것도 있다는 걸 알게 되면서 접하는 책의 주제도 다양해졌다. '피플 웨어', '테크니컬 리더(BTL)', '프로젝트 생존 전략', '스크럼'과 같이 구현 기술은 아니지만 개발과 연관된 책을 읽기 시작했다. '썩은사과'나 '인간력'과 같은 사람에 대한 책도 읽기 시작했다. 이런 책은 개발에 대한 시야를 넓히는데 도움이 되었다.

적당히 잘하기 위해 생산성을 높여야 했고 이를 위해 테스트 코드처럼 효율을 높이는 수단을 찾아 학습했다. 남들이 좋다고 하는 지식도 일부 학습했다. 당장 이해할 수 없는 주제가 많았지만 여러 번 책을 읽고 실제로 적용해 보면서 체득하려고 노력했다. 이런 지식은 개발하는 사고의 틀을 제공해 주었고 생산성을 높여주는 밑거름이 되었다.

많은 뛰어난 개발자가 좋다고 알려준 것도 다 못하고 있고, 배틀을 해서 이길 만큼 개발 지식이 넓지도 깊지도 않으며, 개발 리더로서의 자질도 부족해 팀장 역할이 힘겨울 때가 많다. 애초에 높은 경지가 목표가 아니었기에 당연한 모습이다. 그래도 위안을 삼자면 적당히 잘하는 수준은 되었다는 것이다. 최고의 결과를 만들어내는 고수는 아니나 그래도 중간 이상의 결과는 만들 수 있는 개발자는 되었다.

꽤 긴 경력에 이 정도 밖에 도달하지 못했지만 그래도 이게 어딘가! 20대 초반에 상상한 그런 초고수는 아니지만 지금의 모습에 아쉬움은 없다. 부족한 게 많지만 조금 더 갈고닦아 지금보다 조금이라도 나아질 수 있다면 그걸로 족하다.

  1. 빡빡이발레리나 2019.06.25 11:32

    저도 개발을 오래 하면서 생각이 많이 바뀌었습니다.
    저 역시 모든것을 다 알아야 하고 막힘없이 해결하고 다른 사람들에게 기술적으로 인정을 벋는 그런 사람이 되려 했지만 그런점들이 프로젝트를 하면서 보여지든 보여지지 않든 프로젝트의 결과를 향해 가는 항해에 방해가 되는 점이라는 생각이 어느날 들더군요.

    글을 잘 안남기지만 생각이 비슷하고 글을 읽고 머리가 환기되기에 적어 봅니다.

    좋은글 감사합니다.


Centos 7 버전에 쿠버네티스(kubernetes)를 설치하는 과정을 정리한다. 보다 자세한 내용은 다음 문서를 참고한다.

 

0. Centos 7 준비

쿠버네티스 테스트 용도로 세 개의 가상 머신을 준비했다. 각 가상 머신에 Centos 7을 설치했고 IP와 호스트 이름을 다음과 같이 설정했다.

  • 172.16.1.100 k8s-master
  • 172.16.1.101 k8s-node1
  • 172.16.1.102 k8s-node2
각 서버의 /etc/hosts 파일에도 위 내용을 추가했다.

[Centos 7 호스트 이름 변경 명령어]

hostnamectl을 사용하면 Centos에서 호스트 이름을 변경할 수 있다.

# hostnamectl set-hostname 호스트이름


1. 도커 설치

전체 서버에 도커를 설치한다. https://docs.docker.com/install/linux/docker-ce/centos/ 문서를 참고해서 설치했다.


# yum install -y yum-utils device-mapper-persistent-data lvm2


# yum-config-manager --add-repo https://download.docker.com/linux/centos/docker-ce.repo


# yum install docker-ce


# systemctl start docker && systemctl enable docker


2. kubeadm 설치 준비

kubeadm을 설치하려면 몇 가지 준비를 해야 한다. 전체 서버에서 다음을 진행한다.


SELinux 설정을 permissive 모드로 변경


# setenforce 0


# sed -i 's/^SELINUX=enforcing$/SELINUX=permissive/' /etc/selinux/config


iptable 설정


# cat <<EOF >  /etc/sysctl.d/k8s.conf

net.bridge.bridge-nf-call-ip6tables = 1

net.bridge.bridge-nf-call-iptables = 1

EOF

$ sysctl --system


firewalld 비활성화


# systemctl stop firewalld

# systemctl disable firewalld


스왑 오프


스왑 끄기:

# swapoff -a


/etc/fstab 파일에 아래 코드 주석 처리:

#/dev/mapper/centos-swap swap                    swap    defaults        0 0


서버 재시작:

# reboot



3. 쿠버네티스 설치 준비

쿠버네티스 YUM 리포지토리 설정:

# cat <<EOF > /etc/yum.repos.d/kubernetes.repo

[kubernetes]

name=Kubernetes

baseurl=https://packages.cloud.google.com/yum/repos/kubernetes-el7-x86_64

enabled=1

gpgcheck=1

repo_gpgcheck=1

gpgkey=https://packages.cloud.google.com/yum/doc/yum-key.gpg https://packages.cloud.google.com/yum/doc/rpm-package-key.gpg

exclude=kube*

EOF


kubeadm 설치:

# yum install -y kubelet kubeadm kubectl --disableexcludes=kubernetes


# systemctl enable kubelet && systemctl start kubelet



[쿠버네티스 구성 요소]

쿠버네티스를 구성하는 컴포넌트에 대해 알고 싶다면 https://kubernetes.io/ko/docs/concepts/overview/components/ 문서를 읽어보자. 이 문서를 빠르게 훑어보고 다음 내용을 진행하면 설치 과정에서 용어나 메시지를 이해하는데 도움이 된다.


4. 마스터 컴포넌트 설치

kubeadm init 명령으로 마스터 노드 초기화


kubeadm init 명령어를 이용해서 마스터 노드를 초기화한다. --pod-network-cidr 옵션은 사용할 CNI(Container Network Interface)에 맞게 입력한다. 이 글에서는 CNI로 Flannel을 사용한다고 가정한다.


# kubeadm init --pod-network-cidr=10.244.0.0/16 --apiserver-advertise-address=172.16.1.100

...생략

[addons] Applied essential addon: CoreDNS

[addons] Applied essential addon: kube-proxy


Your Kubernetes master has initialized successfully!


To start using your cluster, you need to run the following as a regular user:


  mkdir -p $HOME/.kube

  sudo cp -i /etc/kubernetes/admin.conf $HOME/.kube/config

  sudo chown $(id -u):$(id -g) $HOME/.kube/config


You should now deploy a pod network to the cluster.

Run "kubectl apply -f [podnetwork].yaml" with one of the options listed at:

  https://kubernetes.io/docs/concepts/cluster-administration/addons/


You can now join any number of machines by running the following on each node

as root:


  kubeadm join 172.16.1.100:6443 --token yrc47a.55b25p2dhe14pzd1 --discovery-token-ca-cert-hash sha256:2a7a31510b9a0b0da1cf71c2c29627b40711cdd84be12944a713ce2af2d5d148



마스터 초기화에 성공하면 마지막에 'kubeadm join ....'으로 시작하는 명령어가 출력된다. 이 명령어를 이용해서 작업 노드를 설치하므로 잘 복사해 놓자.


환경 변수 설정

root 계정을 이용해서 kubectl을 실행할 경우 다음 환경 변수를 설정한다.


# export KUBECONFIG=/etc/kubernetes/admin.conf


CNI 설치

이 글에서는 Flannel을 설치한다. 설치 명령어는 다음과 같다.


# kubectl apply -f https://raw.githubusercontent.com/coreos/flannel/bc79dd1505b0c8681ece4de4c0d86c5cd2643275/Documentation/kube-flannel.yml


각 CNI별 설치 명령어는 Creating a single master cluster with kubeadm 문서를 참고한다.


마스터 실행 확인


마스터를 설치했다. 다음 명령어를 실행해서 결과를 확인한다.


# kubectl get pods --all-namespaces

NAMESPACE     NAME                                 READY   STATUS    RESTARTS   AGE

kube-system   coredns-86c58d9df4-78jbg             1/1     Running   0          9m3s

kube-system   coredns-86c58d9df4-q7mwf             1/1     Running   0          9m3s

kube-system   etcd-k8s-master                      1/1     Running   0          13m

kube-system   kube-apiserver-k8s-master            1/1     Running   0          13m

kube-system   kube-controller-manager-k8s-master   1/1     Running   0          13m

kube-system   kube-flannel-ds-amd64-zv8nc          1/1     Running   0          3m11s

kube-system   kube-proxy-xj7hg                     1/1     Running   0          14m

kube-system   kube-scheduler-k8s-master            1/1     Running   0          13m


5. 노드 컴포넌트 설치

kubeadm init 명령을 이용해서 설치할 때 콘솔에 출력된 메시지에 kubeadm join 명령어가 있었다. 이 명령어를 노드 컴포넌트로 사용할 서버에서 실행한다. 이 예에서는 k8s-node1 서버에서 아래 명령어를 실행했다.


# kubeadm join 172.16.1.100:6443 --token yrc47a.55b25p2dhe14pzd1 --discovery-token-ca-cert-hash sha256:2a7a31510b9a0b0da1cf71c2c29627b40711cdd84be12944a713ce2af2d5d148


첫 번째 슬레이브 노드 추가 후 마스터 노드에서 kubectl get nodes 명령을 실행해보자. master 역할을 하는 k8s-master 노드와 방금 추가한 k8s-node1이 노드 목록에 표시된다. 노드를 추가하자 마자 노드 목록을 조회하면 다음 처럼 아직 사용 준비가 안 된 NotReady 상태임을 알 수 있다.


[root@k8s-master ~]# kubectl get nodes

NAME         STATUS     ROLES    AGE   VERSION

k8s-master   Ready      master   18m   v1.13.2

k8s-node1    NotReady   <none>   30s   v1.13.2


k8s-node2 노드에서 두 번째 슬레이브 노드를 추가한 뒤에 다시 노드 목록을 살펴보자. 새로 추가한 노드가 목록에 보인다.


[root@k8s-master ~]# kubectl get nodes

NAME         STATUS     ROLES    AGE     VERSION

k8s-master   Ready      master   21m     v1.13.2

k8s-node1    Ready      <none>   3m28s   v1.13.2

k8s-node2    NotReady   <none>   15s     v1.13.2


kubectl cluster-info 명령어를 실행하면 클러스터 정보를 확인할 수 있다.


[root@k8s-master ~]# kubectl cluster-info

Kubernetes master is running at https://172.16.1.100:6443

KubeDNS is running at https://172.16.1.100:6443/api/v1/namespaces/kube-system/services/kube-dns:dns/proxy


To further debug and diagnose cluster problems, use 'kubectl cluster-info dump'.


6. 클러스터 테스트

마스터와 슬레이브를 설치했으니 간단한 예제를 실행해보자.


luksa/kubia 도커 컨테이너 이미지를 이용해서 팟 만들기:


요즘 학습하고 있는 '쿠버네티스 인 액셕' 책의 예제로 테스트했다. 다음 명령을 실행하자.


# kubectl run kubia --image=luksa/kubia --port 8080 --generator=run-pod/v1

replicationcontroller/kubia created


이 명령은 도커 이미지(luksa/kubia)를 이용해서 쿠버네티스 배포 단위인 파드(pod)를 클러스터에서 실행한다. 참고로 luksa/kubia 이미지는 호스트 이름을 응답하는 간단한 웹 서버다.


파드가 포함한 컨테이너에 연결할 수 있도록 서비스를 생성한다.


[root@k8s-master ~]# kubectl expose rc kubia --type=LoadBalancer --name kubia-http

service/kubia-http exposed


kubectl get services 명령어로 생성한 서비스 정보를 보자. 내가 테스트한 환경에서는 LoadBalancer 타입 서비스가 클러스터 IP로 10.101.195.144를 사용하고 있다.


[root@k8s-master ~]# kubectl get services

NAME         TYPE           CLUSTER-IP       EXTERNAL-IP   PORT(S)          AGE

kubernetes   ClusterIP      10.96.0.1        <none>        443/TCP          3h22m

kubia-http   LoadBalancer   10.101.195.144   <pending>     8080:31701/TCP   15s


쿠버네티스 클러스터를 설치한 서버에서 이 클러스터 IP를 이용해서 8080 포트로 연결해보자. 다음과 비슷한 결과가 나오면 쿠버네티스가 정상적으로 동작하고 있는 것이다.


# curl 10.101.195.144:8080

You've hit kubia-ckh8w




  1. 엽이 2019.04.04 16:24

    저도 쿠버네티스 인 액션 책을 통해 공부하려고 주문하였는데 아직 오지를 않아서.. 쿠버네티스 관련하여 인터넷 글을 참고하던차에 여기글을 보고 기본적인 셋팅 및 서비스 테스트까지 성공적으로 해 볼 수 있었습니다. 감사합니다.

  2. 2019.06.25 15:16

    비밀댓글입니다

  3. 동방폐인 2019.07.16 16:23

    사이트 아직 살아 있구나...광균아... 요즘은 어떻게 사는지...?

    나도 이제 K8s 시작한다... 이것도 제대로 쓰려면 골치 아픈 거구낭...ㅠ

최근에 사용하는 프로필이 dev, prod, local, test 4개가 존재하는 스프링 부트 어플리케이션을 개발하고 있다. 로컬에서 'mvn spring-boot:run' 명령어를 실행하면 local 프로필을 사용해서 부트 앱을 실행하고 싶었다. src/main/resources 폴더에 application-prod.properties, application-dev.properties, application-local.properties 파일이 함께 존재해서 src/main/resources의 application.properties 파일에 spring.profiles.active=local 설정을 줄 수 없었다.


'mvn spring-boot:run -Dspring-boot.run.profiles=local'와 같이 로컬에서 실행할 때 마다 프로필을 지정하려니까 귀찮았다. 그래서 프로필을 선택하지 않은 경우 기본으로 local 프로필을 활성화하는 설정을 추가했다.


먼저 EnvironmentPostProcessor 인터페이스를 구현한 클래스를 작성한다.


public class ProfileResolverEnvironmentPostProcessor implements EnvironmentPostProcessor {


    @Override

    public void postProcessEnvironment(ConfigurableEnvironment environment, 

                                                   SpringApplication application) {

        boolean isSomeProfileActive = 

                environment.acceptsProfiles(Profiles.of("prod", "dev", "test", "local"));


        if (!isSomeProfileActive) {

            environment.addActiveProfile("local");

            Resource path = new ClassPathResource("application-local.properties");

            if (path.exists()) {

                try {

                    environment.getPropertySources().addLast(

                            new PropertiesPropertySourceLoader().load("application-local", path).get(0));

                } catch (IOException e) {

                    throw new IllegalStateException(e);

                }

            }

        } else {

            log.info("Some of [prod, dev, test, local] is active: " + environment.getActiveProfiles());

        }

    }

}


이 코드는 ConfigurableEnvironment#acceptsProfiles() 메서드를 이용해서 "prod", "dev", "test", "local" 프로필 중 하나라도 활성화되어 있는지 검사한다. 활성화되어 있지 않으면 활성 프로필을 "local"을 추가하고, 사용할 프로퍼티 소스로 "application-local" 프로퍼티 파일을 추가한다.


다음 할 일은 META-INF/spring.factories 파일에 다음 설정을 추가하는 것이다.


org.springframework.boot.env.EnvironmentPostProcessor=\

demo.ProfileResolverEnvironmentPostProcessor


특정 프로필을 선택하지 않고 부트 어플리케이션을 실행하면 local 프로필이 활성화되는 것을 확인할 수 있다.

스프링 스케줄러를 이용해서 cron 설정을 런타임에 변경하는 방법을 살펴본다.


1. TaskScheduler 설정


먼저 TaskScheduler를 설정한다.


@Configuration

public class SchedulingConfiguration {


    @Bean

    public ThreadPoolTaskScheduler schedulerExecutor() {

        ThreadPoolTaskScheduler taskScheduler = new ThreadPoolTaskScheduler();

        taskScheduler.setPoolSize(4);

        taskScheduler.setRejectedExecutionHandler(new ThreadPoolExecutor.AbortPolicy());

        return taskScheduler;

    }


}


스프링 부트를 사용한다면 부트가 알아서 TaskScheduler를 만들어준다.


2. cron을 사용해서 작업을 스케줄링하는 코드 작성


다음은 cron을 이용해서 스케줄링하는 코드를 작성한다. 예제는 다음과 같다.


@Service

public class SchedulerService {

    private TaskScheduler scheduler;

    private String cron = "*/2 * * * * *";

    private ScheduledFuture<?> future;


    public SchedulerService(TaskScheduler scheduler) {

        this.scheduler = scheduler;

    }


    public void start() {

        ScheduledFuture<?> future = this.scheduler.schedule(() -> {

                    System.out.println("run at " + LocalDateTime.now());

                },

                new CronTrigger(cron));

        this.future = future;

    }


    public void changeCron(String cron) {

        if (future != null) future.cancel(true);

        this.future = null;

        this.cron = cron;

        this.start();

    }

}


scheduler.schedule()은 스케줄링을 취소할 수 있는 ScheduledFuture를 리턴한다. 이 ScheduledFuture를 이용해서 스케줄을 변경할 때 이전 스케줄을 취소하고 새 스케줄을 등록하면 된다. 위 코드에서 changeCron() 메서드는 앞서 생성한 스케줄이 있다면 future.cancel()을 이용해서 스케줄을 취소한다.


3. 스케줄 런타임 변경 확인


테스트 코드를 이용해서 실제 스케줄이 런타임에 바뀌는지 확인해보자.


@RunWith(SpringRunner.class)

@SpringBootTest

public class SchedulerServiceTest {

    @Autowired

    private SchedulerService schedulerService;


    @Test

    public void changeCron() throws InterruptedException {

        schedulerService.start();

        Thread.sleep(10000);

        schedulerService.changeCron("*/3 * * * * *");

        Thread.sleep(20000);

    }

}


SchedulerService의 최초 cron 설정은 "2/* * * * * *"이므로 매 2초마다 작업을 실행한다. 위 코드는 스케줄링을 시작한 뒤에 10초간 쉬고 그 다음에 매 3초마다 작업을 실행하도록 cron 설정을 변경한다. 그리고 20초 동안 쉰다. 실행 결과는 다음과 같다.


run at 2018-12-20T23:03:02.003

run at 2018-12-20T23:03:04.002

run at 2018-12-20T23:03:06.001

run at 2018-12-20T23:03:08.001

run at 2018-12-20T23:03:10.002

run at 2018-12-20T23:03:12.002

run at 2018-12-20T23:03:15.002

run at 2018-12-20T23:03:18.003

run at 2018-12-20T23:03:21.001

run at 2018-12-20T23:03:24.001

run at 2018-12-20T23:03:27.002

run at 2018-12-20T23:03:30.001


위 결과를 보면 2초 마다 실행하다가 changeCron()을 실행한 뒤부터는 3초 마다 실행하는 것을 확인할 수 있다.


예제 코드는 https://github.com/madvirus/spring-scheduler-cron-change 에서 확인할 수 있다.

클라우드 서버에 실수로 용량이 큰 이미지 파일을 올리면 과도한 트래픽 발생으로 높은 비용을 지불할 수도 있다. 이런 상황을 방지하는 방법 중 하나는 아파치 웹 서버 설정에서 응답 파일의 크기를 제한하는 것이다. 아파치 웹 서버에서는 RewirteCond에서 filesize() 식을 사용해서 특정 크기보다 큰 파일에 대한 접근을 거부할 수 있다. 다음은 <Directory> 설정은 1 MB(1048576 바이트) 큰 파일에 접근할 때 403 상태 코드를 응답하도록 설정한 예이다.


<Directory /var/www/html/images/>

  RewriteEngine On

  RewriteCond expr "filesize('%{REQUEST_FILENAME}') -gt 1048576"

  RewriteRule .* - [F]

</Directory>


참고로 filesize()를 이용한 설정은 아파치 2.4부터 지원한다.

다소 동접이 발생하는 간단한 TCP 서버를 구현할 기술을 찾다가 리액터 네티(Reactor Netty)를 알게 되었다. 리액터 네티를 이용하면 네티를 기반으로 한 네트워크 프로그램을 리액터 API로 만들 수 있다. 리액터 네티를 사용하면 네티를 직접 사용하는 것보다 간결한 코드로 비동기 네트워크 프로그램을 만들 수 있는 이점이 있다.


다음은 리액터 네티(Reactor Netty)의 주요 특징이다.

  • 네티 기반
  • 리액터 API 사용
  • 논블로킹 TCP, UDP, HTTP 클라이언트/서버

이 글에서는 리액터 네티를 이용해서 간단한 소켓 서버를 만들어 보겠다.

TcpServer를 이용한 소켓 서버 만들기

리액터 네티는 TcpServer 클래스를 제공한다. 이 클래스를 이용해서 비교적 간단하게 비동기 소켓 서버를 구현할 수 있다. 이 글에서는 간단한 에코 서버를 만들어 본다. 만들 기능은 다음과 같다.

  • 클라이언트가 한 줄을 입력하면 "echo: 입력한 줄\r\n"으로 응답한다.
  • 클라이언트가 exit를 입력하면 클라이언트와 연결을 끊는다.
  • 클라이언트가 SHUTDOWN을 입력하면 서버를 종료한다.
  • 10초 이내에 클라이언트로부터 입력이 없으면 연결을 종료한다.
리액터 네티를 사용하기 위한 메이븐 설정은 다음과 같다.

    <dependencyManagement>
        <dependencies>
            <dependency>
                <groupId>io.projectreactor</groupId>
                <artifactId>reactor-bom</artifactId>
                <version>Californium-SR3</version> <!-- 리액터 네티 0.8.3에 대응 -->
                <type>pom</type>
                <scope>import</scope>
            </dependency>
        </dependencies>
    </dependencyManagement>

    <dependencies>
        <dependency>
            <groupId>io.projectreactor.netty</groupId>
            <artifactId>reactor-netty</artifactId>
        </dependency>

        <dependency>
            <groupId>io.projectreactor.addons</groupId>
            <artifactId>reactor-logback</artifactId>
        </dependency>
    </dependencies>

다음 코드는 리액터 네티로 만든 에코 서버의 전체 코드이다.

package demo;

import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.LineBasedFrameDecoder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Mono;
import reactor.netty.DisposableServer;
import reactor.netty.tcp.TcpServer;

import java.util.concurrent.CountDownLatch;

public class EchoServer {
    private static Logger log = LoggerFactory.getLogger(EchoServer.class);

    public static void main(String[] args) {
        CountDownLatch latch = new CountDownLatch(1);
        DisposableServer server = TcpServer.create()
                .port(9999) // 서버가 사용할 포트
                .doOnConnection(conn -> { // 클라이언트 연결시 호출
                    // conn: reactor.netty.Connection
                    conn.addHandler(new LineBasedFrameDecoder(1024));
                    conn.addHandler(new ChannelHandlerAdapter() {
                        @Override
                        public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
                            log.info("client added");
                        }

                        @Override
                        public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
                            log.info("client removed");
                        }

                        @Override
                        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
                                       throws Exception {
                            log.warn("exception {}", cause.toString());
                            ctx.close();
                        }
                    });
                    conn.onReadIdle(10_000, () -> {
                        log.warn("client read timeout");
                        conn.dispose();
                    });
                })
                .handle((in, out) -> // 연결된 커넥션에 대한 IN/OUT 처리
                        // reactor.netty (NettyInbound, NettyOutbound)
                        in.receive() // 데이터 읽기 선언, ByteBufFlux 리턴
                          .asString()  // 문자열로 변환 선언, Flux<String> 리턴
                          .flatMap(msg -> {
                                      log.debug("doOnNext: [{}]", msg);
                                      if (msg.equals("exit")) {
                                          return out.withConnection(conn -> conn.dispose());
                                      } else if (msg.equals("SHUTDOWN")) {
                                          latch.countDown();
                                          return out;
                                      } else {
                                          return out.sendString(Mono.just("echo: " + msg + "\r\n"));
                                      }
                                  }
                          )
                )
                .bind() // Mono<DisposableServer> 리턴
                .block();

        try {
            latch.await();
        } catch (InterruptedException e) {
        }
        log.info("dispose server");
        server.disposeNow(); // 서버 종료
    }
}


먼저 전체 코드 구조를 살펴보자.

DisposableServer server = TcpServer.create()
        .port(9999) // 포트 지정
        .doOnConnection(conn -> { // 클라이언트 연결시 호출 코드
            ...
        })
        .handle((in, out) -> // 데이터 입출력 처리 코드
            ...
        )
        .bind() // 서버 실행에 사용할 Mono<DisposableServer>
        .block(); // 서버 실행 및 DisposableServer 리턴

...(서버 사용)

// 서버 중지
server.disposeNow();

전체 코드 구조는 다음과 같다.
  • TcpServer.create()로 TcpServer 준비
  • port()로 사용할 포트 포트
  • doOnConnection() 메서드로 클라이언트 연결시 실행할 함수 설정
    • 이 함수에서 커넥션에 ChannelHandler를 등록하는 것과 같은 작업 수행
  • handle() 메서드로 클라이언트와 데이터를 주고 받는 함수 설정
  • bind() 메서드로 서버 연결에 사용할 Mono<DisposableServer> 생성
  • bind()가 리턴한 Mono의 block()을 호출해서 서버 실행하고 DisposableServer 리턴
서버가 정상적으로 구동되면 block() 메서드는 구동중인 DisposableServer를 리턴한다. DisposableServer의 disposeNow() 메서드는 서버를 중지할 때 사용한다. 이 외에도 서버 중지에 사용되는 몇 가지 dispose로 시작하는 메서드를 제공한다.

doOnConnection()으로 커넥션 초기화

doOnConnection() 메서드의 파라미터는 다음 함수형 타입이다.
  • Consumer<? super Connection>
reactor.netty.Connection 타입은 인터페이스로 네티의 ChannelHandler 등록과 몇 가지 이벤트 연동 기능을 제공한다. 예제 코드의 doOnConnection 설정 부분을 다시 보자.

.doOnConnection(conn -> { // 클라이언트 연결시 호출
    conn.addHandler(new LineBasedFrameDecoder(1024));
    conn.addHandler(new ChannelHandlerAdapter() {
        @Override
        public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
            log.info("client added");
        }
        ...
        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
        throws Exception {
            log.warn("exception {}", cause.toString());
            ctx.close();
        }
    });
    conn.onReadIdle(10_000, () -> {
        log.warn("client read timeout");
        conn.dispose();
    });

})

Connection#addHandler()는 네티의 ChannelHandler를 등록한다. 이 외에 addHandlerFirst(), addHandlerLast() 메서드를 제공한다. 이 메서드를 이용해서 필요한 네티 코덱을 등록하면 된다. 예제 코드에서는 한 줄씩 데이터를 읽어오는 LineBasedFrameDecoder를 등록했고 클라이언트 연결 이벤트에 따라 로그를 출력하기 위해 임의 ChannelHandlerAdapter 객체를 등록했다.

Connection#onReadIdle() 메서드는 첫 번째 인자로 지정한 시간(밀리초) 동안 데이터 읽기가 없으면 두 번째 인자로 전달받은 코드를 실행한다. 위 코드는 10초 동안 데이터 읽기가 없으면 연결을 종료한다. 비슷하게 onWriteIdle() 메서드는 지정한 시간 동안 쓰기가 없으면 코드를 실행한다.

handle() 메서드로 데이터 입출력 처리

데이터 송수신과 관련된 코드는 handle() 메서드로 지정한다. handle() 메서드가 전달 받는 함수형 타입은 다음과 같다.

BiFunction<? super NettyInbound, ? super NettyOutbound, ? extends Publisher<Void>>

이 함수는 NettyInbound와 NettyOutbound를 인자로 갖고 Publisher<Void>나 그 하위 타입을 리턴한다. 예제 코드의 handle() 메서드를 다시 보자.

.handle((in, out) -> // 연결된 커넥션에 대한 IN/OUT 처리
        // (NettyInbound, NettyOutbound)
        in.receive() // 데이터 읽기 선언, ByteBufFlux 리턴
          .asString()  // 문자열로 변환 선언, Flux<String> 리턴
          .flatMap(msg -> {
                      log.debug("doOnNext: [{}]", msg);
                      if (msg.equals("exit")) {
                          return out.withConnection(conn -> conn.dispose());
                      } else if (msg.equals("SHUTDOWN")) {
                          latch.countDown();
                          return out;
                      } else {
                          return out.sendString(Mono.just("echo: " + msg + "\r\n"));
                      }
                  }
          )
)


위 코드를 요약하면 다음과 같다.

  • NettyInbound#receive()는 데이터 수신을 위한 ByteBufFlux를 리턴
  • ByteBufFlux#asString()은 데이터를 문자열로 수신 처리
  • flatMap을 이용해서 수신한 메시지 처리

flatMap은 수신한 데이터를 이용해서 알맞은 처리를 한다. 클라이언트에 데이터를 전송할 때에는  NettyOutbound를 이용한다. NettyOutbound#sendString() 메서드를 이용하면 문자열 데이터를 클라이언트에 전송한다. NettyOutbound#sendString()의 파라미터는 Publisher<? extends String> 타입이기 때문에 위 코드에 Mono.just()를 이용했다.


Connection이 필요하면 NettyOutbound#withConnection() 메서드를 사용한다. 위 코드에서는 클라이언트가 "exit"를 전송하면 연결을 끊기 위해 이 메서드를 사용했다.


ByteBufFlux#asString() 메서드는 기본 캐릭터셋을 사용한다. 다른 캐릭터셋을 사용하고 싶다면 asString(Charset) 메서드를 사용한다. 비슷하게 NettyOutbound#sendString() 메서드도 기본 캐릭터셋을 사용하므로 다른 캐릭터셋을 사용하려면 NettyOutbound#sendString(Publisher, Charset) 메서드를 사용한다.


예제 실행

EchoServer를 실행해보자. 로그백을 사용했다면 아래와 비슷한 메시지가 출력되면서 서버가 구동된다.


08:49:10.522 [reactor-tcp-nio-1] DEBUG reactor.netty.tcp.TcpServer - [id: 0x1fb82e53, L:/127.0.0.1:9999] Bound new server


telnet을 이용해서 에코가 제대로 동작하는지 확인해본다. 클라이언트가 전송한 데이터를 굵은 글씨로 표시했고 서버가 응답한 데이터를 파란색으로 표시했다.


$ telnet localhost 9999

Trying 127.0.0.1...

Connected to localhost.

Escape character is '^]'.

124124

echo: 124124

wefwef

echo: wefwef

exit

Connection closed by foreign host.

$


위 과정에서 서버에 출력되는 로그는 다음과 같다(리액터 네티가 출력하는 로그는 생략했다.)


08:50:40.187 [reactor-tcp-nio-5] INFO  demo.EchoServer - client added

08:50:37.374 [reactor-tcp-nio-4] INFO  demo.EchoServer - doOnNext: [124124]

08:50:44.506 [reactor-tcp-nio-4] INFO  demo.EchoServer - doOnNext: [wefwef]

08:50:46.218 [reactor-tcp-nio-4] INFO  demo.EchoServer - doOnNext: [exit]

08:50:46.221 [reactor-tcp-nio-4] INFO  demo.EchoServer - client removed


Connection#onReadIdle()을 이용해서 읽기 타임아웃을 10초로 설정했는데 실제로 서버 접속 후 10초 동안 데이터를 전송하지 않으면 연결이 끊기는 것을 확인할 수 있다.


08:56:23.358 [reactor-tcp-nio-2] WARN  demo.EchoServer - client read timeout

08:56:23.360 [reactor-tcp-nio-2] INFO  demo.EchoServer - client removed


마지막으로 SHUTDOWN 명령어를 전송해 보자. 그러면 서버가 종료되는 것도 확인할 수 있을 것이다.


08:57:46.372 [reactor-tcp-nio-3] INFO  demo.EchoServer - doOnNext: [SHUTDOWN]

08:57:46.373 [main] INFO  demo.EchoServer - dispose server


특정 시간 동안 실행 횟수를 제한하기 위한 라이브러를 검색해서 아래 3가지 정도를 찾았다.

이 글에서는 각 라이브러리의 사용법을 간단하게 살펴본다.

Guava RateLimiter

Guava에 포함된 RateLimiter를 사용하면 초당 실행 횟수를 제한할 수 있다. 이 클래스를 사용하려면 다음 의존을 추가한다.

<dependency>
    <groupId>com.google.guava</groupId>
    <artifactId>guava</artifactId>
    <version>26.0-jre</version>
</dependency>

사용 방법은 다음과 같다.

private RateLimiter limiter = RateLimiter.create(4.0); // 초당 4개

public void someLimit() {
    if (limiter.tryAcquire()) { // 1개 사용 요청
        // 초과하지 않음
        ... 코드1 // 초당 4번까지 실행 가능

    } else {
        // 제한 초과

    }
}

RateLimiter.create() 메서드는 초당 몇 개를 허용할지를 인수로 받는다. 위 예의 경우 4.0을 값으로 주었는데 이는 초당 4개를 허용함을 뜻한다. 이 값을 0.2로 주면 초당 0.4개를 허용하므로 5초당 1개를 허용한다.


실행 횟수를 초과했는지 검사할 때 tryAcquire() 메서드를 사용한다. 이 메서드는 허용하는 횟수를 초과하지 않았으면 true를 리턴하고 초과했으면 false를 리턴한다. 따라서 위 코드는 someLimit() 메서드의 '코드1'을 초당 4번까지 실행한다. 만약 1초 이내에 5번을 실행하면 그 중 한 번은 tryAcquire() 메서드가 false를 리턴한다.


RateLimiter는 실행 가능 시점을 분배하는 방식을 사용한다. 예를 들어 다음과 같이 초당 실행 가능한 횟수를 5.0으로 지정하고 0.1초마다 tryAcquire() 메서드를 실행한다고 하자.


RateLimiter limiter = RateLimiter.create(5.0);


Timer timer = new Timer(true);


timer.scheduleAtFixedRate(new TimerTask() {

    @Override

    public void run() {

        if (limiter.tryAcquire()) {

            log.info("!! OK");

        } else {

            log.warn("XX");

        }

    }

}, 0, 100); // 0.1초마다 실행


Thread.sleep(1500);


위 코드를 실행한 결과는 다음과 같다. 이 결과를 보면 "!! OK"와 "XX"가 번갈아 출력된 것을 알 수 있다. 즉 초당 5.0으로 횟수를 제한했을 때 1초에 10번 tryAcquire()를 실행하면 연속해서 5번 실행 가능한 게 아니고 1초에 5번 실행 가능한 시점이 분산되는 것을 알 수 있다.

17:27:39.503 [Timer-0] INFO guava.RateLimiterTest - !! OK
17:27:39.596 [Timer-0] WARN guava.RateLimiterTest - XX
17:27:39.695 [Timer-0] INFO guava.RateLimiterTest - !! OK
17:27:39.797 [Timer-0] WARN guava.RateLimiterTest - XX
17:27:39.898 [Timer-0] INFO guava.RateLimiterTest - !! OK
17:27:39.998 [Timer-0] WARN guava.RateLimiterTest - XX
17:27:40.098 [Timer-0] INFO guava.RateLimiterTest - !! OK
17:27:40.197 [Timer-0] WARN guava.RateLimiterTest - XX
17:27:40.297 [Timer-0] INFO guava.RateLimiterTest - !! OK
17:27:40.396 [Timer-0] WARN guava.RateLimiterTest - XX
17:27:40.498 [Timer-0] INFO guava.RateLimiterTest - !! OK
17:27:40.594 [Timer-0] WARN guava.RateLimiterTest - XX
17:27:40.695 [Timer-0] INFO guava.RateLimiterTest - !! OK
17:27:40.795 [Timer-0] WARN guava.RateLimiterTest - XX
17:27:40.897 [Timer-0] INFO guava.RateLimiterTest - !! OK
17:27:40.997 [Timer-0] WARN guava.RateLimiterTest - XX


[노트]

RateLimiter의 내부 구현은 1초 동안 사용하지 않은 개수를 누적한다. 예를 들어 RateLimiter.create(10)으로 만든 RateLimiter를 1초 동안 사용하지 않았다면 5개가 누적되어 이후 1초 동안은 10개를 사용할 수 있다.


RateLimitJ

RateLimitJ는 Redis, Hazelcast를 이용해서 시간 당 실행 횟수를 제한할 수 있다. 메모리를 이용한 구현도 지원하므로 Redis나 Hazelcast가 없어도 사용할 수 있다. 이 글에서는 인메모리 구현의 사용법을 살펴본다. 인메모리 구현을 사용하려면 다음 의존을 추가한다. Redis를 이용한 구현을 사용하는 방법은 https://github.com/mokies/ratelimitj 사이트를 참고한다.


<dependency>

    <groupId>es.moki.ratelimitj</groupId>

    <artifactId>ratelimitj-inmemory</artifactId>

    <version>0.5.0</version>

</dependency>


RateLimitJ를 이용한 실행 횟수 제한 방법은 다음과 같다.


// 1분에 10개 제한

RequestLimitRule limitRule = RequestLimitRule.of(Duration.ofMinutes(1), 10);

RequestRateLimiter rateLimiter =

        new InMemorySlidingWindowRequestRateLimiter(limitRule);


if (rateLimiter.overLimitWhenIncremented("key")) {

    // 제한 초과


} else {

    // 초과하지 않음


}


RequestLimitRule은 제한 규칙을 적용한다. RequestLimitRule.of() 메서드를 이용해서 제한 규칙을 생성하는데 첫 번째 파라미터는 시간 범위이고 두 번째 파라미터는 제한 횟수이다. 위 코드는 1분 동안 10으로 제한하는 규칙을 생성한다.


이 규칙을 사용해서 InMemorySlidingWindowRequestRateLimiter 객체를 생성하면 사용할 준비가 끝난다.


RequestRateLimiter#overLimitWhenIncremented(key) 메서드는 특정 시간 동안 지정한 횟수를 초과했는지 검사한다. 초과했으면 true를 리턴하고 초과하지 않았으면 false를 리턴한다. 따라서 이 메서드가 false를 리턴할 때 기능을 실행하면 된다.


overLimitWhenIncremented() 메서드는 인수로 key를 받는다. 이 key 별로 규칙을 적용한다. 예를 들어 URI 마다 실행 횟수를 제한하고 싶다면 다음과 같이 key로 URI를 사용하면 된다.


// 각 URI마다 실행 횟수 제한

if (rateLimiter.overLimitWhenIncremented(request.getRequestURI())) {

    // 해당 URI에 대한 제한 초과


} else {

    // 해당 URI에 대한 접근 허용


}


Guava의 RateLimiter와 달리 RateLimitJ는 지정한 횟수에 다다를 때까지 실행을 허용하고 그 이후로는 시간이 지날 때까지 실행을 허용하지 않는다. 예를 들어 다음 코드를 보자.


RequestLimitRule limitRule = RequestLimitRule.of(Duration.ofSeconds(1), 5);

RequestRateLimiter rateLimiter =

        new InMemorySlidingWindowRequestRateLimiter(limitRule);


Timer timer = new Timer(true);


timer.scheduleAtFixedRate(new TimerTask() {

    @Override

    public void run() {

        if (rateLimiter.overLimitWhenIncremented("key")) {

            log.warn("XX"); // 제한 초과로 실행하지 않음

        } else {

            log.info("!! OK"); // 제한을 초과하지 않아 실행함

        }

    }

}, 0, 100);


Thread.sleep(1500);


이 코드는 1초 당 5개로 제한하는 RequestRateLimiter를 사용하고 0.1초마다 한 번씩 기능을 실행하고 있다. 이 코드를 실행하면 다음과 같이 처음 5개 요청을 실행하고 그 이후 1초가 지날 때까지 5개 요청은 제한 초과로 실행하지 않은 것을 알 수 있다.


17:46:38.061 [Timer-0] INFO ratelimitj.RateLimitJTest - !! OK

17:46:38.130 [Timer-0] INFO ratelimitj.RateLimitJTest - !! OK

17:46:38.230 [Timer-0] INFO ratelimitj.RateLimitJTest - !! OK

17:46:38.330 [Timer-0] INFO ratelimitj.RateLimitJTest - !! OK

17:46:38.430 [Timer-0] INFO ratelimitj.RateLimitJTest - !! OK

17:46:38.531 [Timer-0] WARN ratelimitj.RateLimitJTest - XX

17:46:38.632 [Timer-0] WARN ratelimitj.RateLimitJTest - XX

17:46:38.733 [Timer-0] WARN ratelimitj.RateLimitJTest - XX

17:46:38.833 [Timer-0] WARN ratelimitj.RateLimitJTest - XX

17:46:38.931 [Timer-0] WARN ratelimitj.RateLimitJTest - XX

17:46:39.033 [Timer-0] INFO ratelimitj.RateLimitJTest - !! OK

17:46:39.134 [Timer-0] INFO ratelimitj.RateLimitJTest - !! OK

17:46:39.235 [Timer-0] INFO ratelimitj.RateLimitJTest - !! OK

17:46:39.330 [Timer-0] INFO ratelimitj.RateLimitJTest - !! OK

17:46:39.432 [Timer-0] INFO ratelimitj.RateLimitJTest - !! OK


Bucket4j

Bucket4j는 hazelcast, infinispan을 이용한 구현 외에 인메모리 구현을 지원한다. 이 글에서는 인모메리 구현을 이용한 횟수 제한에 대해 살펴본다. 다른 구현에 대한 내용은 https://github.com/vladimir-bukhtoyarov/bucket4j 문서를 참고한다.


인메모리 구현을 사용하려면 다음 의존을 추가한다.


<dependency>

    <groupId>com.github.vladimir-bukhtoyarov</groupId>

    <artifactId>bucket4j-core</artifactId>

    <version>4.0.1</version>

</dependency>


사용법은 다음과 같다.


// 1초에 5개 사용 제한

Bandwidth limit = Bandwidth.simple(5, Duration.ofSeconds(1));

// 버킷 생성

Bucket bucket = Bucket4j.builder().addLimit(limit).build();


if (bucket.tryConsume(1)) { // 1개 사용 요청

    // 초과하지 않음


} else {

    // 제한 초과

    

}


Bandwith는 지정한 시간 동안 제한할 개수를 지정한다. 위 코드는 1초 동안 5개를 허용하는 Bandwith를 생성한다. 이 Bandwith를 이용해서 Bucket을 생성한다. Bucket#tryConsume() 메서드는 사용할 개수를 인수로 받으며, 사용 가능할 경우 true를 리턴하고 사용 가능하지 않으면 false를 리턴한다.



[노트]

Bucket4j는 다중 Bandwidth를 지원한다. 또한 사용 가능 개수를 시간이 흘러감에 따라 점진적으로(greedy) 채우는 방식과 시간 간격마다 채우는 방식을 지원한다. 이에 대한 내용은 Bucket4j 문서를 참고한다.




이 글은 DevOps Handbook 책을 읽고 몇 가지 핵심 실천법을 정리한 것이다. (원서 링크, 번역서 링크)


*주의: 요약 글에 오류/오역이 존재할 수 있고 더 중요한 내용을 누락했을 수도 있으니 애매한 부분은 반드시 원문을 참고하기 바란다.


데브옵스 시작하기

밸류스트림 선택

  • 데브옵스 전환을 시도할 밸류 스트림은 신중히 선택할 것: 성공해야 확대 기회 생김
  • 동조 잘하고 혁신적인 그룹과 시작하기: 보수적인 그룹은 처음부터 설득하지 말고 충분히 성공한 뒤에 해결
밸류 스트림 이해, 팀 구성, 계획
  • 밸류 스트림 맵 작성: 모든 구성원 식별, 빠른 가치 제공을 위협하는 영역 이해
  • 개선할 메트릭을 선택하고 목표와 일정 결정
  • 전용 전환 팀 구성
  • 목표를 합의하고 공유: 측정 가능한 목표, 6개월~2년 사이의 명확한 기한, 어렵지만 달성 가능한 목표, 조직과 고객에 가치 있음, 책임자가 목표에 동의, 목표를 조직 전체 공유
  • 개선 계획은 짧게: 2-3주 안에 측정 가능한 개선이나 이용 가능한 데이터를 만들어야 함, 빠른 개선으로 일상 업무에서 차이를 만들어 내고, 빠른 증명으로 프로젝트를 유지
  • 모두가 작업 상태를 알 수 있도록 최신 상태 공개
조직 구성
  • 시장 지향 팀 구성
  • 밸류 스트림에 관여하는 모두가 고객 목표와 조직 목표 공유
  • 제너럴리스트: 배움을 장려, 호기심/용기/솔직함을 가진 사람 채용
  • 프로젝트가 아닌 서비스와 제품에 투자
  • 콘웨이 법칙에 따라 팀 경계 설계
  • 팀을 작게 유지
운영을 개발 환경에 통합
  • 운영 역량을 개발 팀에 통합: 운영과 개발의 효율과 생산성을 높이고 시장 지향 결과를 더 잘 만들어 낼 수 있도록 함
  • 운영도 개발 활동에 참여: 운영 엔지니어를 서비스 팀에 포함시키거나 운영 담당자를 서비스 팀에 할당해서 제품 관련 작업을 운영 계획에 반영하고 제품 팀에 운영 지식 전파
  • 제품과 관련된 운영 작업을 공유 칸반 보드에 공개: 운영도 밸류 스트림의 일부

흐름(Flow) 개선

배포 파이프라인 기반

  • 필요할 때 개발, 테스트, 제품 환경을 생성할 수 있어야 함: 모든 환경을 만들 수 있는 빌드 장치, 환경 구성에 필요한 것을 체계화/자동화, 이를 통해 일관된 환경 생성 프로세스 구축, 수작업 감소
  • 단일 리포지토리: 환경도 버전 컨트롤로 관리, 빠르게 롤백할 수 있는 방법 제공
  • 반복가능한 환경 구축 시스템으로 인프라도 빠르게 재구축할 수 있게 함
  • 조기에 환경을 코드와 통합하고 배포를 연습해서 릴리즈와 관련된 위험을 줄임
빠르고 신뢰할 수 있는 자동화된 테스트
  • 자동화된 테스트 스위트 작성: 배포 파이프라인으로 커밋한 모든 코드를 자동으로 빌드하고 테스트
  • 자동화된 빌드/테스트 프로세스를 실행하는 전용 환경 구축
  • UAT, 보안 테스트 환경을 셀프 서비스로 생성 가능
  • 테스트 커버리지를 이용해서 테스트 작성 유도
  • 성능 테스트, 비기능 요구사항 테스트를 배포 파이프라인에 통합
  • 배포 파이프라인이 깨지면 작업을 멈추고 즉시 해결: 문제 해결에 조기에 발견할 수 있는 테스트 케이스를 추가
지속적 통합(CI)
  • 작은 배치로 개발
  • 트렁크에 자주 커밋, 일일 커밋
저위험 출시
  • 배포 프로세스 단순화, 자동화: 소요 시간이 긴 단계를 제거하기 위해 아키텍처 개선, 소요 시간과 이관 횟수를 줄이기 위한 노력
  • 모든 환경에 대해 동일한 방법으로 배포
  • 자동화된 배포 셀프 서비스로 개발자가 직접 배포: 자동화된 테스트, 자동화된 배포, 코드 리뷰 등 위험 감소 장치 필요
  • CI로 배포 가능 패키지 생성, 제품 환경 준비 조회, 특정 버전 패키지를 배포할 수 있는 버튼, 감사 기록, 스모크 테스트 실행, 배포 성공 여부를 빠르게 피드백을 제공하는 배포 자동화
  • 배포와 릴리즈 분리: 블루-그린 배포, 카나리아 릴리즈, 기능 토글, 다크 론치 등으로 릴리즈 위험 감소
  • CD(Delivery): 트렁크에서 작은 크기로 작업 또는 짧은 피처 브랜치, 트렁크는 항상 릴리즈 가능 상태로 유지, 업무 시간에 필요할 때 푸시 버튼으로 릴리즈 가능
  • CD(Deployment): Delivery + 정기적으로 빌드를 제품에 배포


피드백

문제 확인과 해결 위한 텔레메트리

  • 중앙 집중화된 텔레메트리 인프라
  • 어플리케이션 메트릭을 충분하게 생성
  • 텔레메트리를 사용해서 문제 해결에 과학적으로 접근
  • 어플리케이션 메트릭, 비즈니스 메트릭, 인프라 메트릭을 함께 표시
  • 유지보수, 백업, 배포 등 배포/운영 활동도 메트릭에 표시

예측을 더 잘하고 목표를 달성하기 위한 텔레메트리 분석

  • 평균, 표준 편차, 비정상 탐지 기법(데이터 스무딩, 콜모고로프-스마르노프 검정 등)을 사용해서 잠재적인 문제 발견
  • 장애를 예측할 수 있는 메트릭을 찾아 모니터링 시스템에 추가
안전한 배포를 위한 피드백
  • 기능이 정상임을 확인할 수 있는 충분한 텔레메트리
  • 배포와 변경 이벤트를 메트릭 그래프에 함께 표시: 배포 파이프라인에서 놓친 제품 에러를 텔레메트리 이용해서 발견 가능
  • 모두가 전체 밸류 스트림의 건강 상태를 책임지는 문화
  • 론치 가이드, 론치 요구사항: 모든 개발이 전체 조직의 누적된 경험을 활용
가설 검증 통합
  • 목표를 달성했는지 검증할 수 있는 실험을 실시
  • A/B 테스트를 프로세스에 통합
리뷰와 조율 프로세스
  • 결합도를 낮춰 소통과 조율 필요성을 감소: 위험 완화 위해 변경을 공지하고 충돌 발견, 고위험 영역의 변경은 기술적 조치
  • 변경 승인 프로세스를 리뷰로 대체: 짝 프로그래밍, 코드 리뷰, 작은 배치 크기로 원활한 리뷰
  • 긴 변경 승인 프로세스 제거

지속적 배움과 실험

일상 업무에서의 배움
  • 저스트 컬처: 배움 관점에서 실수와 에러 접근. 휴먼 에러는 주어진 도구의 피할 수 없는 설계 문제에서 기인함, 탓하지 않느 사후 분석 미팅
  • 사후 미팅 결과를 전사에 공유해 조직이 배울 수 있도록 함
  • 혁신을 위한 위험 감수 문화: 리더의 노력 필요
  • 회복성 엔지니어링으로 회복성 향상
로컬 발견을 조직 전체의 개선으로
  • 업무 프로세스에 챗룸을 활용해서 지식 전파를 빠르게 함
  • 소프트웨어 표준 프로세스를 자동화: 문서나 프로세스를 실행 가능한 형태로 변환해서 리포지토리에 추가
  • 비기능 요구사항을 체계화
  • 재사용가능한 운영 유저 스토리를 개발에 구축: 반복되는 IT 운영 작업을 개발 작업에 함께 표시
  • 조직 목표 달성 위한 기술 선택: 운영이 지원하는 기술 목록 지정
배움, 개선 위한 시간 확보
  • 기술 부채를 감소하기 위한 활동을 일정을 잡아 진행
  • 가르치고 배우는 문화: 내부 세미나, 코드 리뷰, 컨퍼런스, 내부 컨설팅/코칭

보안, 규제, 변경 관리

보안

  • 보안을 개발 이터레이션 시연에 통합: 인포섹을 초기부터 참여시킴
  • 보안도 결함 추적과 사후 작업에 통합
  • 공유 리포지토리, 공유 서비스에 보안 예방 수단 통합: 보안 관련 라이브러리나 서비스에 대한 교육 제공, 안전한 빌드 이미지나 쿡북 제작
  • 배포 파이프라인에 보안 테스트 통합
  • SW 공급 체인 보안 검토
  • 환경에 대한 보안 관련 모니터링 추가
  • 보안 관련 텔레메트리 추가
규제, 변경 관리
  • 보안/규제를 변경 승인 프로세스에 통합
  • 효과적인 변경 관리 정책 구축
    • 표준 변경: 저위험 변경으로 자동 승인, 사전 승인 가능
    • 일반 변경: 리뷰나 승인이 필요한 위험한 변경
    • 긴급 변경: 긴급한 고위험 변경으로 즉시 반영
  • 저위험 변경을 표준 변경으로 재분류
  • 일반 변경을 표준으로 바꾸는 노력 필요
  • 감사 조직을 위한 문서와 근거 자료 생성: 텔레메트리 활용


  1. 강남 2018.09.13 02:27

    잘보고 가욤 ^^

스프링 리액터 로깅과 체크포인트


로깅

리액터의 동작을 보다 자세히 보고 싶다면 다음과 같이 log() 메서드를 사용한다. 아래 코드를 보자.


Flux.just(1, 2, 4, 5, 6)

        .log()

        .map(x -> x * 2)

        .subscribe(x -> logger.info("next: {}", x));


로깅 프레임워크로 SLF4j를 사용할 경우 실행한 결과는 다음과 같다.

08:38:29.990 [main] DEBUG reactor.util.Loggers$LoggerFactory - Using Slf4j logging framework
08:38:30.010 [main] INFO reactor.Flux.Array.1 - | onSubscribe([Synchronous Fuseable] FluxArray.ArraySubscription)
08:38:30.013 [main] INFO reactor.Flux.Array.1 - | request(unbounded)
08:38:30.014 [main] INFO reactor.Flux.Array.1 - | onNext(1)
08:38:30.014 [main] INFO logging.LoggingTest - next: 2
08:38:30.014 [main] INFO reactor.Flux.Array.1 - | onNext(2)
08:38:30.014 [main] INFO logging.LoggingTest - next: 4
08:38:30.014 [main] INFO reactor.Flux.Array.1 - | onNext(4)
08:38:30.014 [main] INFO logging.LoggingTest - next: 8
08:38:30.014 [main] INFO reactor.Flux.Array.1 - | onNext(5)
08:38:30.014 [main] INFO logging.LoggingTest - next: 10
08:38:30.014 [main] INFO reactor.Flux.Array.1 - | onNext(6)
08:38:30.014 [main] INFO logging.LoggingTest - next: 12
08:38:30.015 [main] INFO reactor.Flux.Array.1 - | onComplete()

"reactor.Flux.Array.1"이라는 로거가 출력한 로그 메시지는 Flux.just()가 생성한 시퀀스의 동작을 로그로 남긴 것이다. 로그를 보면 시퀀스가 request() 신호를 받은 시점, next 신호(onNext(2) 등)나 complete 신호(onComplete())를 발생한 시점을 확인할 수 있다.


로그 레벨은 INFO인데 로그 레벨을 변경하고 싶다면 다음과 같이 log() 메서드를 사용하면 된다.


Flux.just(1, 2, 4, 5, 6)

        .log(null, Level.FINE) // java.util.logging.Level 타입

        .subscribe(x -> logger.info("next: {}", x));


두 번째 인자로 자바 로깅의 Level.FINE을 주었다. SLF4j를 사용할 경우 리액터는 자바의 FINE 레벨을 SLF4j의 DEBUG 레벨로 기록한다. 따라서 위 코드를 실행하면 다음과 같이 DEBUG 레벨로 로그를 남기는 것을 확인할 수 있다.


08:50:30.098 [main] DEBUG reactor.Flux.Array.1 - | onSubscribe([Synchronous Fuseable] FluxArray.ArraySubscription)

08:50:30.101 [main] DEBUG reactor.Flux.Array.1 - | request(unbounded)

08:50:30.102 [main] DEBUG reactor.Flux.Array.1 - | onNext(1)

08:50:30.102 [main] INFO logging.LoggingTest - next: 1

08:50:30.102 [main] DEBUG reactor.Flux.Array.1 - | onNext(2)

08:50:30.102 [main] INFO logging.LoggingTest - next: 2


다음과 같이 특정 로거를 이용하도록 지정할 수도 있다. 


Flux.just(1, 2, 4, 5, 6)

        .log("MYLOG") // 또는 log("MYLOG", Level.INFO)

        .subscribe(x -> logger.info("next: {}", x));


위 코드를 실행하면 다음과 같이 지정한 로거를 이용해서 로그를 남긴다.


08:51:55.180 [main] INFO MYLOG - | onSubscribe([Synchronous Fuseable] FluxArray.ArraySubscription)

08:51:55.184 [main] INFO MYLOG - | request(unbounded)

08:51:55.184 [main] INFO MYLOG - | onNext(1)

08:51:55.184 [main] INFO logging.LoggingTest - next: 1

08:51:55.184 [main] INFO MYLOG - | onNext(2)

08:51:55.184 [main] INFO logging.LoggingTest - next: 2

08:51:55.184 [main] INFO MYLOG - | onNext(4)


체크포인트

시퀀스가 신호를 발생하는 과정에서 익셉션이 발생하면 어떻게 될까? 시퀀스가 여러 단게를 거쳐 변환한다면 어떤 시점에 익셉션이 발생했는지 단번에 찾기 힘들 수도 있다. 이럴 때 도움이 되는 것이 체크포인트이다. 다음은 체크포인트 사용 예이다.


Flux.just(1, 2, 4, -1, 5, 6)

        .map(x -> x + 1)

        .checkpoint("MAP1")

        .map(x -> 10 / x) // 원본 데이터가 -1인 경우 x는 0이 되어 익셉션이 발생

        .checkpoint("MAP2")

        .subscribe(

                x -> System.out.println("next: " + x),

                err -> err.printStackTrace());


이 코드는 데이터에 1을 더하고 다시 10을 데이터로 나누는 변환을 수행한다. 원본 데이터에 -1이 있으므로 중간에 0으로 나누게 되어 익셉션이 발생하게 된다. checkpoint()를 사용하면 어떤 단계에서 익셉션이 발생했는지 쉽게 확인할 수 있다. 아래 코드는 익셉션이 발생했을 때 출력한 익셉션 트레이스 메시지인데 이 메시지를 보면 checkpoint()로 지정한 description이 익셉션 트레이스 마지막에 출력되는 것을 알 수 있다. 이를 통해 어느 과정에서 익셉션이 발생했는지 쉽게 찾을 수 있다.

java.lang.ArithmeticException: / by zero
    at logging.CheckpointTest.lambda$checkpoint$1(CheckpointTest.java:15)
    at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onNext(FluxMapFuseable.java:107)
    ...생략
    Suppressed: reactor.core.publisher.FluxOnAssembly$OnAssemblyException: 
Assembly site of producer [reactor.core.publisher.FluxMapFuseable] is identified by light checkpoint [MAP2]."description" : "MAP2"



관련글


리액터 윈도우


일정 개수로 묶어서 Flux 만들기: window(int), window(int, int)

Flux#window(int) 메서드를 사용하면 시퀀스가 발생시키는 데이터를 일정 개수로 묶을 수 있다. 다음은 예제 코드이다.


Flux<Flux<Integer>> windowSeq = 

        Flux.just(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)

             .window(4); // 4개 간격으로 4개씩 새로운 Flux로 묶음


windowSeq.subscribe(seq -> { // seq는 Flux<Integer>

    Mono<List<Integer>> monoList = seq.collectList();

    monoList.subscribe(list -> logger.info("window: {}", list));

});


위 코드에서 Flux#window(4)가 리턴하는 타입은 Flux<Flux<Integer>>이다. 즉 값이 Flux<Integer>인 Flux를 리턴한다. 이 시퀀스(Flux<Integer>)가 발생하는 값의 개수는 최대 4개이다. 위 코드의 실행 결과는 다음과 같다. 결과를 보면 4개씩 데이터를 묶어서 하나의 Flux로 만든 것을 알 수 있다.


01:19:52.388 [parallel-2] INFO batch.WindowTest - window: [5, 6, 7, 8]

01:19:52.388 [parallel-1] INFO batch.WindowTest - window: [1, 2, 3, 4]

01:19:52.391 [parallel-1] INFO batch.WindowTest - window: [9, 10]


Flux.window(int maxSize, int skip) 메서드를 사용하면 어느 간격으로 데이터를 묶을지 정할 수 있다. 두 번째 파라미터는 몇 개씩 건너서 데이터를 묶을 지 결정한다. 예를 들어 다음 코드를 보자.


Flux<Flux<Integer>> windowSeq =

        Flux.just(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)

                .window(4, 3); // 3개 간격마다 4개씩 새로운 Flux로 묶음


windowSeq.subscribe(seq -> { // seq는 Flux<Integer>

    Mono<List<Integer>> monoList = seq.collectList();

    monoList.subscribe(list -> logger.info("window: {}", list));

});


위 코드는 두 번째 인자로 3을 주었다. 이 경우 3개 데이터 간격으로 4개씩 데이터를 묶는다. 데이터를 묶는 간격이 데이터를 묶는 개수보다 작으므로 일부 데이터에 중복이 발생한다.


15:18:37.898 [main] INFO batch.WindowTest - window: [1, 2, 3, 4]

15:18:37.898 [main] INFO batch.WindowTest - window: [4, 5, 6, 7]

15:18:37.898 [main] INFO batch.WindowTest - window: [7, 8, 9, 10]

15:18:37.898 [main] INFO batch.WindowTest - window: [10]


다음과 같이 skip 파라미터 값으로 5를 주면 어떻게 될까? 데이터를 묶는 개수보다 간격이 더 크므로 일부 데이터에 누락이 발생할 것이다. 


Flux<Flux<Integer>> windowSeq2 =

        Flux.just(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)

                .window(4, 5); // 5개 간격마다 4개씩 새로운 Flux 묶음


// 첫 번째 Flux<Integer> : [1, 2, 3, 4]
// 두 번째 Flux<Integer> : [6, 7, 8, 9]



일정 시간 간격으로 묶어서 Flux 만들기: window(Duration), window(Duration, Duration)

Flux#window(Duration) 메서드를 사용하면 시퀀스가 발생시키는 데이터를 일정 시간마다 묶을 수 있다. 다음은 예제 코드이다.


Flux<Flux<Long>> windowSeq = Flux.interval(Duration.ofMillis(100))

      .window(Duration.ofMillis(500)); // 500밀리초 간격마다 500밀리초 동안 데이터 묶음


이 코드는 500밀리초(0.5초) 동안 발생한 데이터를 묶는다.


데이터를 묶기 시작하는 간격을 지정하고 싶다면 Flux#window(Duration, Duration) 메서드를 사용한다.


Flux<Flux<Long>> windowSeq = Flux.interval(Duration.ofMillis(100))

        // 400밀리초 간격마다 500밀리초 동안 데이터 묶음

        .window(Duration.ofMillis(500), Duration.ofMillis(400))



특정 조건에 다다를 때가지 묶어서 Flux 만들기: windowUntil(Predicate)

특정 조건을 충족하는 데이터를 만날 때까지 묶어서 Flux로 만들고 싶다면 windowUntil()을 사용한다. 다음은 사용 예이다.


Flux.just(1,1,2,3,3,4,5)

        .windowUntil(x -> x % 2 == 0)

        .subscribe((Flux<Integer> seq) -> {

            seq.collectList().subscribe(lst -> logger.info("window: {}", lst));

        });


위 코드는 2로 나눠서 나머지가 0인(즉 짝수인) 값을 만날 때까지 묶는다. 실제 실행 결과를 보면 다음과 같다.


01:19:27.166 [main] INFO batch.WindowTest - window: [1, 1, 2]

01:19:27.169 [main] INFO batch.WindowTest - window: [3, 3, 4]

01:19:27.169 [main] INFO batch.WindowTest - window: [5]


다음과 같이 마지막 데이터가 조건에 해당하면 어떻게 될까?


Flux.just(1,1,2,3,3,4)

        .windowUntil(x -> x % 2 == 0)

        .subscribe(seq -> {

            seq.collectList().subscribe(lst -> logger.info("window: {}", lst));

        });


결과를 보면 다음과 같이 마지막에 빈 Flux가 하나 더 발생되는 것을 알 수 있다.


17:23:22.724 [main] INFO batch.WindowTest - window: [1, 1, 2]

17:23:22.727 [main] INFO batch.WindowTest - window: [3, 3, 4]

17:23:22.727 [main] INFO batch.WindowTest - window: []


특정 조건을 충족하는 동안 묶어서 Flux 만들기: windowWhile(Predicate)

Flux#windowWhile(Predicate)은 해당 조건을 충족하지 않는 데이터가 나올 때까지 묶어서 Flux를 만든다. 조건을 충족하지 않는 데이터로 시작하거나 연속해서 데이터가 조건을 충족하지 않으면 빈 윈도우를 생성한다.


Flux.just(1,1,2,4,3,3,4,6,8,9,10)

        .windowWhile(x -> x % 2 == 0) // 짝수인 동안

        .subscribe(seq -> {

            seq.collectList().subscribe(lst -> logger.info("window: {}", lst));

        });


이 코드의 결과는 다음과 같다.


01:07:00.239 [main] INFO batch.WindowTest - window: []

01:07:00.242 [main] INFO batch.WindowTest - window: []

01:07:00.242 [main] INFO batch.WindowTest - window: [2, 4]

01:07:00.242 [main] INFO batch.WindowTest - window: []

01:07:00.242 [main] INFO batch.WindowTest - window: [4, 6, 8]

01:07:00.242 [main] INFO batch.WindowTest - window: [10]


Flux 대신 List로 묶기: buffer류 메서드

window류 메서드가 Flux로 묶는다면 buffer류 메서드는 Collection으로 묶는다. 메서드 이름이 window에서 buffer로 바뀔뿐 시그너쳐는 동일하다. 다음은 buffer류 메서드의 사용 예이다.


Flux<List<Integer>> bufferSeq = Flux.just(1, 2, 3, 4, 5, 6, 7, 8, 9, 10).buffer(4);

bufferSeq.subscribe(list -> logger.info("window: {}", list));



관련글




리액터 모으기(aggregation) 연산


List 콜렉션으로 모으기: collectList()

Flux는 데이터를 콜렉션으로 모을 수 있는 기능을 제공한다. 이 중에서 List로 모아주는 collectList()는 다음과 같이 사용한다.


Mono<List<Integer>> mono = someFlux.collectList();

mono.subscribe(lst -> System.out.println(lst));


collectList()의 리턴 타입은 Mono<List<T>>이므로 Mono를 구독해서 값을 사용하면 된다.


Map 콜렉션으로 모으기: collectMap()

다음의 Flux#collectMap()을 이용해서 Map으로 모을 수도 있다.


  • Mono<Map<K, T>> collectMap(Function<? super T, ? extends K> keyExtractor)
  • Mono<Map<K, V>> collectMap(Function<? super T, ? extends K> keyExtractor,
                                              Function<? super T, ? extends V> valueExtractor)
  • Mono<Map<K, V>> collectMap(Function<? super T, ? extends K> keyExtractor,
                                              Function<? super T, ? extends V> valueExtractor,
                                              Supplier<Map<K, V>> mapSupplier)

각 인자는 다음과 같다.

  • keyExtractor : 데이터에서 맵의 키를 제공하는 함수
  • valueExtractor : 데이터에서 맵의 값을 제공하는 함수
  • mapSupplier : 사용할 Map 객체를 제공(mapSupplier가 없는 메서드는 기본으로 HashMap 사용)

다음 코드는 각 메서드의 사용 예이다.


// keyExtractor만 지정. 값은 그대로 사용.


Mono<Map<Integer, Tuple2<Integer, String>>> numTupMapMono =

        Flux.just(Tuples.of(1, "일"), Tuples.of(2, "이"), Tuples.of(3, "삼"), Tuples.of(4, "사"))

                .collectMap(x -> x.getT1()); // keyExtractor



// String을 리턴하는 valueExtractor 사용.


Mono<Map<Integer, String>> numLabelMapMono =

        Flux.just(Tuples.of(1, "일"), Tuples.of(2, "이"), Tuples.of(3, "삼"), Tuples.of(4, "사"))

                .collectMap(x -> x.getT1(), // keyExtractor

                        x -> x.getT2()); // valueExtractor



// Map으로 TreeMap 사용


Mono<Map<Integer, String>> numLabelTreeMapMono =

        Flux.just(Tuples.of(1, "일"), Tuples.of(2, "이"), Tuples.of(3, "삼"), Tuples.of(4, "사"))

                .collectMap(x -> x.getT1(), // keyExtractor

                        x -> x.getT2(), // valueExtractor

                        () -> new TreeMap<>()); // mapSupplier


collectMap은 중복된 키가 존재하면 마지막 데이터와 관련된 값이 사용된다. 예를 들어 아래 코드는 Flux가 생성하는 데이터는 4개지만 키로 사용하는 값이 중복되므로 실제 Map에는 2와 4 두 개의 데이터만 저장된다.


Flux.just(1, 2, 3, 4)
     .collectMap(x -> x % 2)
     .subscribe(map -> System.out.println(map)); // {0=4, 1=3}

Map의 값을 콜렉션으로 모으기: collectMultiMap()

collectMultiMap()을 사용하면 같은 키를 가진 데이터를 List로 갖는 Map을 생성할 수 있다. 다음은 예제 코드이다.


Mono<Map<Integer, Collection<Integer>>> oddEvenList =

        Flux.just(1, 2, 3, 4).collectMultimap(x -> x % 2);

oddEvenList.subscribe(map -> System.out.println(map)); // {0=[2, 4], 1=[1, 3]}


collectMultiMap() 메서드는 collectMap() 메서드와 동일한 파라미터를 갖는다.


개수 새기: count()

Flux#count() 메서드를 사용하면 개수를 제공하는 Mono를 리턴한다.


Mono<Long> countMono = Flux.just(1, 2, 3, 4).count();


누적 하기: reduce()

reduce()는 각 값에 연산을 누적해서 결과를 생성한다. Flux의 데이터를 이용해서 단일 값을 생성하는 범용 기능이라고 보면 된다. 첫 번째 살펴볼 reduce() 메서드 다음과 같다. 이 메서드는 Flux가 발생하는 데이터와 동일 타입으로 누적할 때 사용한다.

  • Mono<T> reduce(BiFunction<T, T, T> aggregator)

aggregator는 인자가 두 개인 함수이다. 이 함수의 첫 번째 인자는 지금까지 누적된 값을 받으며, 두 번째 인자는 누적할 데이터를 받는다. aggregator는 두 인자를 이용해서 새로운 누적 값을 리턴한다. 새 누적 값은 다음 데이터를 aggregator 함수로 누적할 때 첫 번째 인자로 사용된다.


예를 들어 간단한 곱셈 기능을 reduce()를 이용해서 다음과 같이 구현할 수 있다.


Mono<Integer> mulMono = Flux.just(1, 2, 3, 4).reduce((acc, ele) -> acc * ele);

mulMono.subscribe(sum -> System.out.println("sum : " + sum);


acc는 이전까지 누적된 값인데, 두 번째 데이터를 누적할 때 첫 번째 데이터를 누적된 값(acc)으로 사용한다. 위 코드는 다음과 같은 계산을 거쳐 최종 값으로 24를 출력한다.


acc1 = 1 // 첫 번째 값을 누적 값의 초기 값으로 사용

acc2 = aggregator(acc1, 2) // 1 * 2

acc3 = aggregator(acc2, 3) // 2 * 3

acc4 = aggregator(acc3, 4) // 6 * 4


누적 값의 초기 값을 지정하고 싶거나 데이터와 다른 타입으로 누적하고 싶다면 다음 reduce() 메서드를 사용한다.

  • Mono<A> reduce(A initial, BiFunction<A, ? super T, A> accumulator)
  • Mono<A> reduceWith(Supplier<A> initial, BiFunction<A, ? super T, A> accumulator)

reduce()의 initial은 초기 값이고, reduceWith()의 initial은 초기값을 제공하는 Supplier이다. 다음은 초기 값을 사용하는 reduce() 메서드의 사용예이다.


Mono<String> strMono = Flux.just(1, 2, 3, 4)

                                        .reduce("", (str, ele) -> str + "-" + ele.toString());

strMono.subscribe(System.out::println); // -1-2-3-4 출력


누적하면서 값 생성하기: scan()

데이터를 누적하면 중간 누적 결과를 데이터로 생성하고 싶다면 scan() 메서드를 사용한다. 최종 누적된 값 한 개만 발생하는 reduce()와 달리 scan()은 중간 결과를 포함한 여러 값을 생성하므로, scan()의 리턴 타입은 Flux이다. 다음은 같은 타입으로 누적한 결과를 발생하는 scan() 메서드이다.

  • Flux<T> scan(BiFunction<T, T, T> accumulator)

리턴 타입이 Flux인 것을 제외하면 reduce()와 동일하다. 


다음은 예제 코드이다.


Flux<Integer> seq = Flux.just(1, 2, 3, 4).scan((acc, x) -> acc * x);

seq.subscribe(System.out::println);


다음은 위 코드의 출력 결과이다. 중간 결과가 출력되는 것을 알 수 있다.


1

2

6

24


reduce()와 동일하게 누적 초기값을 갖는 메서드를 제공한다.

  • Flux<A> scan(A initial, BiFunction<A, ? super T, A> accumulator)
  • Flux<A> scanWith(Supplier<A> initial, BiFunction<A, ? super T, A> accumulator)

다음은 초기 값을 지정하는 사용 예이다.


Flux<Integer> seq = Flux.just(2, 3, 4).scan(1, (acc, x) -> acc * x);

seq.subscribe(System.out::println);


실행 결과는 다음과 같다.


1

2

6

24


결과를 보면 초기 값(1)도 시퀀스의 값으로 발생한 것을 알 수 있다.


데이터 조건 검사

모든/일부 데이터가 특정 조건을 충족하는지 검사할 때는 all()이나 any()를 사용한다.


Mono<Boolean> all = Flux.just(1, 2, 3, 4).all(x -> x > 2);

all.subscribe(b -> System.out.println("all: " + b)); // false


Mono<Boolean> any = Flux.just(1, 2, 3, 4).any(x -> x > 2);

any.subscribe(b -> System.out.println("any: " + b)); // true


데이터가 존재하는지 또는 특정 데이터를 포함하는지 검사할 때는 hasElements()나 hasElement()를 사용한다.


Mono<Boolean> hasElements = Flux.just(1, 2, 3, 4).hasElements();

hasElements.subscribe(b -> System.out.println("hasElements: " + b)); // true


Mono<Boolean> hasElement = Flux.just(1, 2, 3, 4).hasElement(3);

hasElement.subscribe(b -> System.out.println("hasElement: " + b)); // true


관련 글





병렬(Parallel) 처리

시퀀스는 순차적으로 next 신호를 발생하고 Subscriber는 순차적으로 신호를 처리한다. 리액터는 시퀀스가 발생하는 next 신호를 병렬로 처리할 수 있는 방법을 제공한다. 이 글에서는 Flux의 parallel()을 사용하는 방법과 zip()을 이용한 방법에 대해 살펴본다.


parallel()과 runOn()으로 Flux 병렬 처리하기

Flux#parallel()과 runOn()을 사용하면 Flux가 생성하는 next 신호를 병렬로 처리할 수 있다. 다음 예를 보자.


Flux.range(1, 20)

        .parallel(2) // 작업을 레일로 나누기만 함

        .runOn(Schedulers.newParallel("PAR", 2))  // 각 레일을 병렬로 실행

        .map(x -> {

            int sleepTime = nextSleepTime(x % 2 == 0 ? 50 : 100, x % 2 == 0 ? 150 : 300);

            logger.info("map1 {}, sleepTime {}", x, sleepTime);

            sleep(sleepTime);

            return String.format("%02d", x);

        })

        .subscribe(i -> logger.info("next {}", i) );


// nextSleepTime은 인자로 받은 두 정수 값 범위에 해당하는 임의의 값을 생성한다고 가정


Flux#parallel(int parallelism) 메서드는 Flux가 생성하는 next 신호를 parallelism 개수만큼 라운드 로빈 방식으로 신호를 나눈다. 분리한 신호는 일종의 신호를 전달할 레일(rail)을 구성한다. 위 코드는 2를 값으로 주었으므로 2개의 레일을 생성한다. 라운드 로빈 방식을 사용해서 각 레일에 값을 전달하므로 위 코드는 [1, 3, 5, .., 19]를 제공하는 레일과 [2, 4, 6, ..., 20]를 제공하는 레일을 생성한다.


parallel()로 여러 레일을 만든다고 해서 병렬로 신호를 처리하는 것은 아니다. parallel()은 병렬로 신호를 처리할 수 있는 ParallelFlux를 리턴하는데, ParallelFlux의 runOn() 메서드에 다중 쓰레드를 사용하는 스케줄러를 전달해야 병렬로 신호를 처리할 수 있다. 위 코드는 2개 쓰레드를 사용하는 parallel 스케줄러를 전달했으므로 동시에 2개 레일로부터 오는 신호를 처리하게 된다.


병렬로 처리되는 것을 확인하기 위해 map() 메서드는 값이 짝수인 경우 50~150 밀리초, 홀수인 경우 100~300 밀리초 동안 슬립하고 문자열로 변환한 값을 리턴하도록 구현했다. parallel()은 라운드 로빈 방식으로 레일을 나누므로 짝수 레일과 홀수 레일이 생성되므로 슬립 타임 구간이 작은 짝수 레일이 더 빨리 끝나게 된다.


실제 결과를 확인해보자.


13:45:14.272 [PAR-1] INFO parallel.ParallelTest - map1 1, sleepTime 117

13:45:14.272 [PAR-2] INFO parallel.ParallelTest - map1 2, sleepTime 96

13:45:14.378 [PAR-2] INFO parallel.ParallelTest - next 02

13:45:14.378 [PAR-2] INFO parallel.ParallelTest - map1 4, sleepTime 98

13:45:14.399 [PAR-1] INFO parallel.ParallelTest - next 01

13:45:14.399 [PAR-1] INFO parallel.ParallelTest - map1 3, sleepTime 268

13:45:14.477 [PAR-2] INFO parallel.ParallelTest - next 04

13:45:14.477 [PAR-2] INFO parallel.ParallelTest - map1 6, sleepTime 93

13:45:14.570 [PAR-2] INFO parallel.ParallelTest - next 06

...생략

13:45:14.868 [PAR-2] INFO parallel.ParallelTest - map1 16, sleepTime 50

13:45:14.905 [PAR-1] INFO parallel.ParallelTest - next 05

13:45:14.905 [PAR-1] INFO parallel.ParallelTest - map1 7, sleepTime 201

13:45:14.918 [PAR-2] INFO parallel.ParallelTest - next 16

13:45:14.918 [PAR-2] INFO parallel.ParallelTest - map1 18, sleepTime 122

13:45:15.040 [PAR-2] INFO parallel.ParallelTest - next 18

13:45:15.040 [PAR-2] INFO parallel.ParallelTest - map1 20, sleepTime 62

13:45:15.102 [PAR-2] INFO parallel.ParallelTest - next 20

13:45:15.106 [PAR-1] INFO parallel.ParallelTest - next 07

13:45:15.106 [PAR-1] INFO parallel.ParallelTest - map1 9, sleepTime 202

13:45:15.308 [PAR-1] INFO parallel.ParallelTest - next 09

13:45:15.308 [PAR-1] INFO parallel.ParallelTest - map1 11, sleepTime 131

13:45:15.439 [PAR-1] INFO parallel.ParallelTest - next 11

13:45:15.439 [PAR-1] INFO parallel.ParallelTest - map1 13, sleepTime 289

13:45:15.728 [PAR-1] INFO parallel.ParallelTest - next 13

13:45:15.728 [PAR-1] INFO parallel.ParallelTest - map1 15, sleepTime 288

13:45:16.017 [PAR-1] INFO parallel.ParallelTest - next 15

13:45:16.017 [PAR-1] INFO parallel.ParallelTest - map1 17, sleepTime 156

13:45:16.173 [PAR-1] INFO parallel.ParallelTest - next 17

13:45:16.173 [PAR-1] INFO parallel.ParallelTest - map1 19, sleepTime 247

13:45:16.420 [PAR-1] INFO parallel.ParallelTest - next 19


실행 결과를 보면 PAR-1 쓰레드는 홀수를 PAR-2는 짝수를 처리하는 것을 알 수 있다. 즉 쓰레드마다 한 레일을 처리하고 있다. 짝수인 경우 슬립 타임을 더 작은 범위로 주었으므로 짝수 레일을 처리한 PAR-2가 먼저 레일을 처리하고 있다.

아래와 같이 레일은 4개로 나누었는데 쓰레드가 2개인 병렬 스케줄러를 사용하면 어떻게 될까?

Flux.range(1, 20)
        .parallel(4)
        .runOn(Schedulers.newParallel("PAR", 2))
        .map(x -> {
            ...
        })
        .subscribe(i -> logger.info("next {}", i) );


이 경우 스케줄러는 2개의 레일을 먼저 처리한다. 한 레일에 남아 있는 데이터가 없으면 데이터가 남아 있는 다른 레일을 처리한다.


레일당 크기

ParallelFlux#runOn() 메서드는 기본적으로 한 레일 당 Queues.SMALL_BUFFER_SIZE 만큼의 데이터를 저장한다. (이 값은 reactor.bufferSize.small 시스템 프로퍼티 값을 사용하는데 이 값을 지정하지 않으면 256을 사용하고 이 값이 16보다 작으면 16을 사용한다.)


레일에 미리 채울 데이터 개수를 변경하려면 다음과 같이 runOn() 메서드의 두 번째 인자로 값을 주면 된다. 다음 코드는 레일에 미리 채울 값(prefetch)으로 2를 사용한 예이다.


Flux.range(1, 20)

        .parallel(4)

        .runOn(Schedulers.newParallel("PAR", 2), 2) // 레일에 미리 채울 값으로 2 사용

        .subscribe(x -> logger.info("next {}", x));


위 코드의 경우 최초에 각 레일에 다음과 같이 데이터가 채워진다.

레일0: 1, 5
레일1: 2, 6
레일2: 3, 7
레일3: 4, 8


스케줄러는 2개의 쓰레드를 사용하는데 두 쓰레드를 PAR-1, PAR-2라고 하자. 이 두 쓰레드가 처음에 각각 레일0과 레일1을 선택했다고 하자.


레일0: 1, 5 (PAR-1)
레일1: 2, 6 (PAR-2)
레일2: 3, 7
레일3: 4, 8

두 쓰레드가 레일의 데이터를 처리하면 상태는 다음과 같이 바뀐다.


레일0: (PAR-1)
레일1: (PAR-2)
레일2: 3, 7
레일3: 4, 8

이 상태에서 PAR-2가 레일1이 비어있는지 여부를 검사한다면 레일이 비워져 있으므로 다음 레일을 선택한다. 이때 레일3을 선택했다고 하자. 그리고 PAR-1이 레일0이 비어있는지 여부를 검사하기 전에 레일0과 레일1이 채워졌다고 하자. 그럼 상태는 다음과 같이 바뀐다.


레일0: 9 (PAR-1)
레일1: 10
레일2: 3, 7 
레일3: 4, 8 (PAR-2)

그러면 PAR-2는 4를 처리하고 PAR-1은 9를 처리한다. PAR-1이 9를 처리하는 동안에 레일0에 데이터가 채워지지 않았다면 다음 레일을 선택하는데 이때 레일1을 선택할 수 있다.

레일0: 11,
레일1: 10 (PAR-1)
레일2: 3, 7 
레일3: 8 (PAR-2)

이렇게 병렬 스케줄러의 쓰레드 개수가 레일 개수보다 작으면 그때 그때 레일의 데이터 개수에 따라 스케줄러가 선택하는 레일이 달라지게 된다.

Mono.zip()으로 병렬 처리하기

각 Mono의 구독 처리 쓰레드를 병렬 스케줄러로 실행하고 Mono.zip() 메서드를 이용해서 Mono를 묶으면 각 Mono를 병렬로 처리할 수 있다. 다음은 예제 코드이다.


Mono m1 = Mono.just(1).map(x -> {

    logger.info("1 sleep");

    sleep(1500);

    return x;

}).subscribeOn(Schedulers.parallel());


Mono m2 = Mono.just(2).map(x -> {

    logger.info("2 sleep");

    sleep(3000);

    return x;

}).subscribeOn(Schedulers.parallel());


Mono m3 = Mono.just(3).map(x -> {

    logger.info("3 sleep");

    sleep(2000);

    return x;

}).subscribeOn(Schedulers.parallel());


logger.info("Mono.zip(m1, m2, m3)");


Mono.zip(m1, m2, m3)

        .subscribe(tup -> logger.info("next: {}", tup);


위 코드에서 m1, m2, m3는 각각 1.5초, 3초, 2초간 슬립한다. 각각은 subscribeOn()을 이용해서 Parallel 스케줄러를 이용해서 구독 요청을 처리하도록 했다. 그리고 Mono.zip()으로 m1, m2, m3를 묶었다.


실제 실행 결과를 보면 m1, m2, m3가 슬립을 동시에 시작하고 약 3초 뒤에 세 Mono의 값을 묶은 Tuple3의 값을 출력하는 것을 알 수 있다. 이를 통해 m1, m2, m3를 동시에 실행했음을 확인할 수 있다.


16:12:34.424 [main] INFO parallel.ParallelTest - Mono.zip(m1, m2, m3)

16:12:34.447 [parallel-1] INFO parallel.ParallelTest - 1 sleep

16:12:34.447 [parallel-3] INFO parallel.ParallelTest - 3 sleep

16:12:34.447 [parallel-2] INFO parallel.ParallelTest - 2 sleep

16:12:37.469 [parallel-2] INFO parallel.ParallelTest - next: [1,2,3]



관련 글


리액터 쓰레드 스케줄링

리액터는 비동기 실행을 강제하지 않는다. 예를 들어 아래 코드를 보자.


Flux.range(1, 3)

        .map(i -> {

            logger.info("map {} to {}", i, i + 2);

            return i + 2;

        })

        .flatMap(i -> {

            logger.info("flatMap {} to Flux.range({}, {})", i, 1, i);

            return Flux.range(1, i);

        })

        .subscribe(i -> logger.info("next " + i));


위 코드에서 logger는 쓰레드 이름을 남기도록 설정한 로거라고 하자. 위 코드를 main 메서드에서 실행하면 다음과 같은 결과를 출력한다.


17:44:57.180 [main] INFO schedule.ScheduleTest - map 1 to 3

17:44:57.183 [main] INFO schedule.ScheduleTest - flatMap 3 to Flux.range(1, 3)

17:44:57.202 [main] INFO schedule.ScheduleTest - next 1

17:44:57.202 [main] INFO schedule.ScheduleTest - next 2

17:44:57.202 [main] INFO schedule.ScheduleTest - next 3

17:44:57.202 [main] INFO schedule.ScheduleTest - map 2 to 4

17:44:57.202 [main] INFO schedule.ScheduleTest - flatMap 4 to Flux.range(1, 4)

17:44:57.202 [main] INFO schedule.ScheduleTest - next 1

17:44:57.202 [main] INFO schedule.ScheduleTest - next 2

17:44:57.202 [main] INFO schedule.ScheduleTest - next 3

17:44:57.202 [main] INFO schedule.ScheduleTest - next 4

17:44:57.202 [main] INFO schedule.ScheduleTest - map 3 to 5

17:44:57.202 [main] INFO schedule.ScheduleTest - flatMap 5 to Flux.range(1, 5)

17:44:57.203 [main] INFO schedule.ScheduleTest - next 1

17:44:57.203 [main] INFO schedule.ScheduleTest - next 2

17:44:57.203 [main] INFO schedule.ScheduleTest - next 3

17:44:57.203 [main] INFO schedule.ScheduleTest - next 4

17:44:57.203 [main] INFO schedule.ScheduleTest - next 5


실행 결과를 보면 map(), flatMap(), subscribe()에 전달한 코드가 모두 main 쓰레드에서 실행된 것을 알 수 있다. 즉 map 연산, flatMap 연산뿐만 아니라 subscribe를 이용한 구독까지 모두 main 쓰레드가 실행한다.


스케줄러를 사용하면 구독이나 신호 처리를 별도 쓰레드로 실행할 수 있다.


publishOn을 이용한 신호 처리 쓰레드 스케줄링

publishOn() 메서드를 이용하면 next, complete, error신호를 별도 쓰레드로 처리할 수 있다. map(), flatMap() 등의 변환도 publishOn()이 지정한 쓰레드를 이용해서 처리한다. 다음 코드를 보자.


CountDownLatch latch = new CountDownLatch(1);

Flux.range(1, 6)

        .map(i -> {

            logger.info("map 1: {} + 10", i);

            return i + 10;

        })

        .publishOn(Schedulers.newElastic("PUB"), 2)

        .map(i -> { // publishOn에서 지정한 PUB 스케줄러가 실행

            logger.info("map 2: {} + 10", i);

            return i + 10;

        })

        .subscribe(new BaseSubscriber<Integer>() {

            @Override

            protected void hookOnSubscribe(Subscription subscription) {

                logger.info("hookOnSubscribe");

                requestUnbounded();

            }


            @Override

            protected void hookOnNext(Integer value) {

                logger.info("hookOnNext: " + value); // publishOn에서 지정한 스케줄러가 실행

            }


            @Override

            protected void hookOnComplete() {

                logger.info("hookOnComplete"); // publishOn에서 지정한 스케줄러가 실행

                latch.countDown();

            }

        });

latch.await();


publishOn()은 두 개의 인자를 받는다. 이 코드에서 첫 번째 인자인 Schedulers.newElastic("PUB")은 비동기로 신호를 처리할 스케줄러이다. 다양한 스케줄러가 존재하는데 이에 대해서는 뒤에서 다시 살펴본다. 일단 지금은 스케줄러가 별도 쓰레드를 이용해서 신호를 처리한다고 생각하면 된다.


두 번째 인자인 2는 스케줄러가 신호를 처리하기 전에 미리 가져올 (prefetch) 데이터 개수이다. 이는 스케줄러가 생성하는 비동기 경계 시점에 보관할 수 있는 데이터의 개수로 일종의 버퍼 크기가 된다.


위 코드를 실제로 실행하면 어떤 일이 벌어지는지 보자. 다음은 결과이다.


13:01:03.026 [main] INFO schedule.ScheduleTest - hookOnSubscribe

13:01:03.029 [main] INFO schedule.ScheduleTest - map 1: 1 + 10

13:01:03.030 [main] INFO schedule.ScheduleTest - map 1: 2 + 10


13:01:03.031 [PUB-2] INFO schedule.ScheduleTest - map 2: 11 + 10

13:01:03.037 [PUB-2] INFO schedule.ScheduleTest - hookOnNext: 21

13:01:03.037 [PUB-2] INFO schedule.ScheduleTest - map 2: 12 + 10

13:01:03.037 [PUB-2] INFO schedule.ScheduleTest - hookOnNext: 22

13:01:03.037 [PUB-2] INFO schedule.ScheduleTest - map 1: 3 + 10

13:01:03.038 [PUB-2] INFO schedule.ScheduleTest - map 1: 4 + 10

13:01:03.038 [PUB-2] INFO schedule.ScheduleTest - map 2: 13 + 10

13:01:03.038 [PUB-2] INFO schedule.ScheduleTest - hookOnNext: 23

13:01:03.038 [PUB-2] INFO schedule.ScheduleTest - map 2: 14 + 10

13:01:03.038 [PUB-2] INFO schedule.ScheduleTest - hookOnNext: 24

13:01:03.038 [PUB-2] INFO schedule.ScheduleTest - map 1: 5 + 10

13:01:03.038 [PUB-2] INFO schedule.ScheduleTest - map 1: 6 + 10

13:01:03.038 [PUB-2] INFO schedule.ScheduleTest - map 2: 15 + 10

13:01:03.038 [PUB-2] INFO schedule.ScheduleTest - hookOnNext: 25

13:01:03.038 [PUB-2] INFO schedule.ScheduleTest - map 2: 16 + 10

13:01:03.038 [PUB-2] INFO schedule.ScheduleTest - hookOnNext: 26

13:01:03.038 [PUB-2] INFO schedule.ScheduleTest - hookOnComplete


최초에 2개를 미리 가져올 때를 제외하면 나머지는 모두 publishOn()으로 전달한 스케줄러의 쓰레드(쓰레드 이름이 "PUB"로 시작)가 처리하는 것을 알 수 있다.


publishOn()에 지정한 스케줄러는 다음 publishOn()을 설정할 때까지 적용된다. 예를 들어 다음과 같이 이름이 PUB1과 PUB2인 두 개의 스케줄러를 설정했다고 하자.


Flux.range(1, 6)

        .publishOn(Schedulers.newElastic("PUB1"), 2)

        .map(i -> {

            logger.info("map 1: {} + 10", i);

            return i + 10;

        })

        .publishOn(Schedulers.newElastic("PUB2"))

        .map(i -> {

            logger.info("map 2: {} + 10", i);

            return i + 10;

        })

        .subscribe(new BaseSubscriber<Integer>() {

            @Override

            protected void hookOnSubscribe(Subscription subscription) {

                logger.info("hookOnSubscribe");

                requestUnbounded();

            }


            @Override

            protected void hookOnNext(Integer value) {

                logger.info("hookOnNext: " + value);

            }


            @Override

            protected void hookOnComplete() {

                logger.info("hookOnComplete");

                latch.countDown();

            }

        });


이 코드를 실행한 결과는 다음과 같다.


13:38:14.957 [main] INFO schedule.ScheduleTest - hookOnSubscribe

13:38:14.960 [PUB1-4] INFO schedule.ScheduleTest - map 1: 1 + 10

13:38:14.963 [PUB1-4] INFO schedule.ScheduleTest - map 1: 2 + 10

13:38:14.963 [PUB2-3] INFO schedule.ScheduleTest - map 2: 11 + 10

13:38:14.963 [PUB1-4] INFO schedule.ScheduleTest - map 1: 3 + 10

13:38:14.964 [PUB1-4] INFO schedule.ScheduleTest - map 1: 4 + 10

13:38:14.964 [PUB1-4] INFO schedule.ScheduleTest - map 1: 5 + 10

13:38:14.964 [PUB1-4] INFO schedule.ScheduleTest - map 1: 6 + 10

13:38:14.969 [PUB2-3] INFO schedule.ScheduleTest - hookOnNext: 21

13:38:14.979 [PUB2-3] INFO schedule.ScheduleTest - map 2: 12 + 10

13:38:14.979 [PUB2-3] INFO schedule.ScheduleTest - hookOnNext: 22

...생략

13:38:15.021 [PUB2-3] INFO schedule.ScheduleTest - map 2: 16 + 10

13:38:15.021 [PUB2-3] INFO schedule.ScheduleTest - hookOnNext: 26

13:38:15.031 [PUB2-3] INFO schedule.ScheduleTest - hookOnComplete


결과를 보면 첫 번째 publishOn()과 두 번째 publishOn() 사이의 map() 처리는 PUB1 스케줄러가 실행하고 두 번째 publishOn() 이후의 map(), 신호 처리는 PUB2 스케줄러가 실행한 것을 알 수 있다.


subscribeOn을 이용한 구독 처리 쓰레드 스케줄링

subscribeOn()을 사용하면 Subscriber가 시퀀스에 대한 request 신호를 별도 스케줄러로 처리한다. 즉 시퀀스(Flux나 Mono)를 실행할 스케줄러를 지정한다. 다음은 subscribeOn()의 사용예이다.


CountDownLatch latch = new CountDownLatch(1);

Flux.range(1, 6)

        .log() // 보다 상세한 로그 출력 위함

        .subscribeOn(Schedulers.newElastic("SUB"))

        .map(i -> {

            logger.info("map: {} + 10", i);

            return i + 10;

        })

        .subscribe(new BaseSubscriber<Integer>() {

            @Override

            protected void hookOnSubscribe(Subscription subscription) {

                logger.info("hookOnSubscribe"); // main thread

                request(1);

            }


            @Override

            protected void hookOnNext(Integer value) {

                logger.info("hookOnNext: " + value); // SUB 쓰레드

                request(1);

            }


            @Override

            protected void hookOnComplete() {

                logger.info("hookOnComplete"); // SUB 쓰레드

                latch.countDown();

            }

        });


latch.await();


subscribeOn()으로 지정한 스케줄러는 시퀀스의 request 요청 처리뿐만 아니라 첫 번째 publishOn() 이전까지의 신호 처리를 실행한다. 따라서 위 코드를 실행하면 Flux.range()가 생성한 시퀀스의 신호 발생뿐만 아니라 map() 실행, Subscriber의 next, complete 신호 처리를 "SUB" 스케줄러가 실행한다. 참고로 시퀀스의 request 요청과 관련된 로그를 보기 위해 log() 메서드를 사용했다.


다음은 실행 결과이다.


14:56:24.996 [main] INFO schedule.ScheduleTest - hookOnSubscribe

14:56:25.005 [SUB-2] INFO reactor.Flux.Range.1 - | onSubscribe([Synchronous Fuseable] FluxRange.RangeSubscription)

14:56:25.010 [SUB-2] INFO reactor.Flux.Range.1 - | request(1)

14:56:25.010 [SUB-2] INFO reactor.Flux.Range.1 - | onNext(1)

14:56:25.011 [SUB-2] INFO schedule.ScheduleTest - map: 1 + 10

14:56:25.016 [SUB-2] INFO schedule.ScheduleTest - hookOnNext: 11

14:56:25.016 [SUB-2] INFO reactor.Flux.Range.1 - | request(1)

14:56:25.016 [SUB-2] INFO reactor.Flux.Range.1 - | onNext(2)

14:56:25.016 [SUB-2] INFO schedule.ScheduleTest - map: 2 + 10

14:56:25.017 [SUB-2] INFO schedule.ScheduleTest - hookOnNext: 12

...(생략)

14:56:25.017 [SUB-2] INFO reactor.Flux.Range.1 - | request(1)

14:56:25.017 [SUB-2] INFO reactor.Flux.Range.1 - | onNext(6)

14:56:25.017 [SUB-2] INFO schedule.ScheduleTest - map: 6 + 10

14:56:25.017 [SUB-2] INFO schedule.ScheduleTest - hookOnNext: 16

14:56:25.017 [SUB-2] INFO reactor.Flux.Range.1 - | request(1)

14:56:25.018 [SUB-2] INFO reactor.Flux.Range.1 - | onComplete()

14:56:25.018 [SUB-2] INFO schedule.ScheduleTest - hookOnComplete


실행 결과에서 Flux.Range 타입은 Flux.range() 메서드가 생성한 시퀀스 객체의 타입이다. 위 결과에서 Flux.Range.1의 reques(1), onNext(), onComplete() 로그는 Subscriber의 request 신호를 처리하는 로그이다. 이 로그를 보면 SUB 스케줄러가 해당 기능을 실행하고 있음을 알 수 있다. 또한 map()과 Subscriber의 신호 처리 메서드(hookOnNext, hookOnComplete)도 SUB 스케줄러가 실행하고 있다.


subscribeOn() + publishOn() 조합

앞서 말했듯이 subscribeOn으로 지정한 스케줄러는 첫 번째 publishOn이 올때까지 적용된다. 다음 코드를 통해 이를 확인할 수 있다.


CountDownLatch latch = new CountDownLatch(1);

Flux.range(1, 6)

        .log()

        .subscribeOn(Schedulers.newElastic("SUB"))

        .map(i -> {

            logger.info("map1: " + i + " --> " + (i + 20));

            return i + 20;

        })

        .map(i -> {

            logger.info("mapBySub: " + i + " --> " + (i + 100));

            return i + 100;

        })

        .publishOn(Schedulers.newElastic("PUB1"), 2)

        .map(i -> {

            logger.info("mapByPub1: " + i + " --> " + (i + 1000));

            return i + 1000;

        })

        .publishOn(Schedulers.newElastic("PUB2"), 2)

        .subscribe(new BaseSubscriber<Integer>() {

            @Override

            protected void hookOnSubscribe(Subscription subscription) {

                logger.info("hookOnSubscribe");

                request(1);

            }


            @Override

            protected void hookOnNext(Integer value) {

                logger.info("hookOnNext: " + value);

                request(1);

            }


            @Override

            protected void hookOnComplete() {

                logger.info("hookOnComplete");

                latch.countDown();

            }

        });


latch.await();


이 코드는 구독을 위한 "SUB" 스케줄러와 신호 처리를 위한 "PUB1", "PUB2" 스케줄러를 설정하고 있다. 


다음은 실행 결과이다.


15:10:05.660 [main] INFO schedule.ScheduleTest - hookOnSubscribe

15:10:05.681 [SUB-6] INFO reactor.Flux.Range.1 - | onSubscribe([Synchronous Fuseable] FluxRange.RangeSubscription)

15:10:05.687 [SUB-6] INFO reactor.Flux.Range.1 - | request(2)

15:10:05.688 [SUB-6] INFO reactor.Flux.Range.1 - | onNext(1)

15:10:05.718 [SUB-6] INFO schedule.ScheduleTest - map1: 1 --> 21

15:10:05.719 [SUB-6] INFO schedule.ScheduleTest - mapBySub: 21 --> 121

15:10:05.720 [SUB-6] INFO reactor.Flux.Range.1 - | onNext(2)

15:10:05.720 [SUB-6] INFO schedule.ScheduleTest - map1: 2 --> 22

15:10:05.720 [SUB-6] INFO schedule.ScheduleTest - mapBySub: 22 --> 122

15:10:05.721 [PUB1-5] INFO schedule.ScheduleTest - mapByPub1: 121 --> 1121

15:10:05.722 [PUB1-5] INFO schedule.ScheduleTest - mapByPub1: 122 --> 1122

15:10:05.734 [SUB-6] INFO reactor.Flux.Range.1 - | request(2)

15:10:05.735 [SUB-6] INFO reactor.Flux.Range.1 - | onNext(3)

15:10:05.735 [SUB-6] INFO schedule.ScheduleTest - map1: 3 --> 23

15:10:05.735 [SUB-6] INFO schedule.ScheduleTest - mapBySub: 23 --> 123

15:10:05.735 [SUB-6] INFO reactor.Flux.Range.1 - | onNext(4)

15:10:05.735 [SUB-6] INFO schedule.ScheduleTest - map1: 4 --> 24

15:10:05.735 [SUB-6] INFO schedule.ScheduleTest - mapBySub: 24 --> 124

15:10:05.736 [PUB2-4] INFO schedule.ScheduleTest - hookOnNext: 1121

15:10:05.736 [PUB2-4] INFO schedule.ScheduleTest - hookOnNext: 1122

15:10:05.736 [PUB1-5] INFO schedule.ScheduleTest - mapByPub1: 123 --> 1123

15:10:05.736 [PUB1-5] INFO schedule.ScheduleTest - mapByPub1: 124 --> 1124

15:10:05.736 [PUB2-4] INFO schedule.ScheduleTest - hookOnNext: 1123

15:10:05.736 [PUB2-4] INFO schedule.ScheduleTest - hookOnNext: 1124

15:10:05.736 [SUB-6] INFO reactor.Flux.Range.1 - | request(2)

15:10:05.736 [SUB-6] INFO reactor.Flux.Range.1 - | onNext(5)

15:10:05.736 [SUB-6] INFO schedule.ScheduleTest - map1: 5 --> 25

15:10:05.736 [SUB-6] INFO schedule.ScheduleTest - mapBySub: 25 --> 125

15:10:05.737 [SUB-6] INFO reactor.Flux.Range.1 - | onNext(6)

15:10:05.737 [PUB1-5] INFO schedule.ScheduleTest - mapByPub1: 125 --> 1125

15:10:05.737 [SUB-6] INFO schedule.ScheduleTest - map1: 6 --> 26

15:10:05.737 [SUB-6] INFO schedule.ScheduleTest - mapBySub: 26 --> 126

15:10:05.737 [PUB2-4] INFO schedule.ScheduleTest - hookOnNext: 1125

15:10:05.737 [PUB1-5] INFO schedule.ScheduleTest - mapByPub1: 126 --> 1126

15:10:05.737 [PUB2-4] INFO schedule.ScheduleTest - hookOnNext: 1126

15:10:05.737 [SUB-6] INFO reactor.Flux.Range.1 - | onComplete()

15:10:05.738 [PUB2-4] INFO schedule.ScheduleTest - hookOnComplete


실행 결과를 보면 첫 번째 publishOn()으로 PUB1 스케줄러를 지정하기 전까지는 SUB 스케줄러가 request 요청과 map1, mapBySub 변환을 처리하는 것을 확인할 수 있다.


[노트]

subscribeOn()이 publishOn() 뒤에 위치하면 실질적으로 prefetch할 때를 제외하면 적용되지 않는다. subscribeOn()은 원본 시퀀스의 신호 발생을 처리할 스케줄러를 지정하므로 시퀀스 생성 바로 뒤에 subscribeOn()을 지정하도록 하자. 또한 두 개 이상 subscribeOn()을 지정해도 첫 번째 subscribeOn()만 적용된다.


스케줄러 종류

스프링 리액터는 다음 스케줄러를 기본 제공한다.


  • Schedulers.immediate() : 현재 쓰레드에서 실행한다.
  • Schedulers.single() : 쓰레드가 한 개인 쓰레드 풀을 이용해서 실행한다. 즉 한 쓰레드를 공유한다.
  • Schedulers.elastic() : 쓰레드 풀을 이용해서 실행한다. 블로킹 IO를 리액터로 처리할 때 적합하다. 쓰레드가 필요하면 새로 생성하고 일정 시간(기본 60초) 이상 유휴 상태인 쓰레드는 제거한다. 데몬 쓰레드를 생성한다.
  • Schedulers.parallel() : 고정 크기 쓰레드 풀을 이용해서 실행한다. 병렬 작업에 적합하다.

single(), elastic(), parallel()은 매번 새로운 쓰레드 풀을 만들지 않고 동일한 쓰레드 풀을 리턴한다. 예를 들어 아래 코드에서 두 publishOn()은 같은 쓰레드 풀을 공유한다.


someFlux.publishOn(Schedulers.parallel())

            .map(...)

            .publishOn(Schedulers.parallel())

            .subscribe(...);


single(), elastic(), parallel()이 생성하는 쓰레드는 데몬 쓰레드로서 main 쓰레드가 종료되면 함께 종료된다.


같은 종류의 쓰레드 풀인데 새로 생성하고 싶다면 다음 메서드를 사용하면 된다.

  • newSingle(String name)
  • newSingle(String name, boolean daemon)
  • newElastic(String name)
  • newElastic(String name, int ttlSeconds)
  • newElastic(String name, int ttlSeconds, boolean daemon)
  • newParallel(String name)
  • newParallel(String name, int parallelism)
  • newParallel(String name, int parallelism, boolean daemon)

각 파라미터는 다음과 같다.

  • name : 쓰레드 이름으로 사용할 접두사이다.
  • daemon : 데몬 쓰레드 여부를 지정한다. 지정하지 않으면 false이다. 데몬 쓰레드가 아닌 경우 JVM 종료시에 생성한 스케줄러의 dispose()를 호출해서 풀에 있는 쓰레드를 종료해야 한다.
  • ttlSeconds : elastic 쓰레드 풀의 쓰레드 유휴 시간을 지정한다. 지정하지 않으면 60(초)이다.
  • parallelism : 작업 쓰레드 개수를 지정한다. 지정하지 않으면 Runtime.getRuntime().availableProcessors()이 리턴한 값을 사용한다.

newXXX() 로 생성하는 쓰레드 풀은 기본으로 데몬 쓰레드가 아니기 때문에 어플리케이션 종료시에는 다음과 같이 dispose() 메서드를 호출해서 쓰레드를 종료시켜 주어야 한다. 그렇지 않으면 어플리케이션이 종료되지 않는 문제가 발생할 수 있다.


// 비데몬 스케줄러 초기화

Scheduler scheduler = Schedulers.newElastic("SUB", 60, false);


// 비데몬 스케줄러 사용

someFlux.publishOn(scheduler)

            .map(...)

            .subscribe(...)


// 어플리케이션 종료시에 스케줄러 종료 처리

scheduler.dispose();


병렬 처리와 관련된 내용은 다음에 더 자세히 살펴본다.


일정 주기로 tick 발생: Flux.interval

Flux.interval()을 사용하면 일정 주기로 신호를 발생할 수 있다. 발생 순서에 따라 발생한 정수 값을 1씩 증가시킨다. 다음은 간단한 사용 예이다.


Flux.interval(Duration.ofSeconds(1)) // Flux<Long>

        .subscribe(tick -> System.out.println("Tick " + tick));


Thread.sleep(5000);


위 코드를 실행한 결과는 다음과 같다.

Tick 0
Tick 1
Tick 2
Tick 3
Tick 4


1초 간격으로 신호가 발생하는 것을 알 수 있다.


interval()은 Schedulers.parallel()를 사용해서 신호를 주기적으로 발생한다. 다른 스케줄러를 사용하고 싶다면 internval(Duration, Scheduler) 메서드를 사용하면 된다.


관련글

예전에 신림프로그래머 페이스북 그룹에서 진행한 "코틀린 인 액션" 책 스터디 정리한 자료


kia-ch02.pdf

kia-ch03.pdf

kia-ch04.pdf

kia-ch05.pdf

kia-ch06.pdf

kia-ch07.pdf

kia-ch08.pdf



+ Recent posts