주요글: 도커 시작하기

이전 글(스프링 웹플럭스(WebFlux) SSE(Server Sent Event) 구현 1)에서 작성한 SSE 예제는 현실적이지 않다. 서로 다른 브라우저에서 /stocks/123 으로 연결하면 서로 다른 값이 표시된다. 실제라면 둘 다 같은 값을 출력해야 한다. 그래서 이 글에서는 조금 더 현실적인 예를 만들어보려 한다. 여기서 만들 예는 사용자 ID별로 데이터를 전송하는 기능이다.

여기서 구현할 클래스는 세 개다.

  • UserChannel : 사용자 ID별로 메시지를 보내기 위한 채널이다. Flux를 이용해서 메시지를 전송한다.
  • UserChannels : 사용자 ID에 대한 UserChannel을 관리한다.
  • UserChannelApi : SSE를 사용자 ID 별로 메시지를 전송하는 기능을 제공한다.

UserChannel 클래스

UserChannel 클래스는 다음과 같다.

public class UserChannel {
    private Flux<String> flux;
    private FluxSink<String> sink;
    private UnicastProcessor<String> processor;

    public UserChannel() {
        processor = UnicastProcessor.create();
        this.sink = processor.sink();
        this.flux = processor.share();
    }

    public void send(String message) {
        sink.next(message);
    }

    public Flux<String> toFlux() {
        return flux;
    }
}

하나의 데이터를 여러 클라이언트에 전송하기 위해 UnicastProcessor와 Flux#share()를 사용해서 Flux를 생성했다. 이 코드는 데이터를 전송하기 위해 UnicastProcessor#sink()로 구한 FluxSink를 사용한다.

send() 메서드는 FluxSink#next() 메서드를 이용해서 데이터를 Flux에 데이터를 보낸다.

UserChannels 클래스

UserChannels 클래스는 UserChannel을 맵으로 관리한다.

public class UserChannels {
    private ConcurrentHashMap<Long, UserChannel> map = new ConcurrentHashMap<>();

    public UserChannel connect(Long userId) {
        return map.computeIfAbsent(userId, key -> new UserChannel());
    }

    public void send(Long userId, String message) {
        UserChannel userChannel = map.get(userId);
        if (userChannel != null) {
            userChannel.send(message);
        }
    }
}

UserChannelApi 클래스

UserChannelApi가 제공하는 API는 두 개다.

  • GET /channels/users/{userId}/messages : 특정 사용자의 메시지를 수신하기 위한 SSE 구현
  • POST /channels/users/{userId}/messages : 특정 사용자에 메시지를 보내기 위한 API

다음은 두 API의 구현이다.

@RestController
public class UserChannelApi {
    private UserChannels channels = new UserChannels();

    @GetMapping("/channels/users/{userId}/messages")
    public Flux<ServerSentEvent<String>> connect(
            @PathVariable("userId") Long userId) {
        return channels.connect(userId) // UserChannel 리턴
            .toFlux() // Flux 리턴
            .map(str -> ServerSentEvent.builder(str).build());
    }

    @PostMapping(path = "/channels/users/{userId}/messages", 
                 consumes = MediaType.TEXT_PLAIN_VALUE)
    public void send(@PathVariable("userId") Long userId, 
                     @RequestBody String message) {
        channels.send(userId, message);
    }
}

connect() 메서드는 클라이언트를 특정 사용자 채널에 연결하고 채널의 메시지를 ServerSentEvent로 변환해서 클라이언트에 전송한다.

send() 메서드는 특정 사용자 채널에 메시지를 전송한다. @PostMapping 애노테이션의 consumes 값이 text/plain이므로 HTTP 요청 몸체로 받은 값을 문자열로 처리한다.

실행

구현을 했으니 동작을 확인할 차례다. 서버를 구동하고 크롬과 같이 SSE를 지원하는 브라우저를 두 개 띄운다. 그리고 Talend API Tester나 Postman 등 send() API를 호출하기 위한 도구를 준비한다. 참고로 서버 포트는 18080으로 변경했다.

첫 번째 브라우저에서 http://localhost:18080/channels/users/1/messages 에 접속한다. 그리고 POST로 데이터를 두 개 전송한다. 그러면 첫 번째 브라우저에 POST로 전송한 두 문자열이 표시되는 것을 확인할 수 있다.

두 번째 브라우저에서 접속한 뒤에 다시 POST로 메시지를 몇 개 더 전송한다. 아래 그림과 같이 첫 번째 브라우저와 두 번째 브라우저가 동일 데이터를 수신한 것을 확인할 수 있다.

관련 링크

+ Recent posts