Tagged: Reactive

Reactive Stream 간단 사용

Reactive Stream 을 사용하기 위해서는 다음과 같은 것을 구현해줘야 한다.

  • Publisher
  • Subscriber
  • Subscription

한가지 문제가 있다. 이것을 구현하기가 쉽지가 않다. 특히 Publisher 의 경우에는 Reactive Stream 의 기본 아이디어만 가지고 간단하게 구현할 수가 없다. 어떤 것을 Publisher 할지도 영향을 주는것도 문제지만 Reactive 의 Spec 을 마춰짜야 하는데 이게 쉽지가 안다.

Publisher 를 간단하게 구현 방법이 없을까? SubmissionPublisher 를 이용하면 간단하게 사용해 볼 수 있다.

이 문서는 SubmissionPublisher 를 이용해 어떻게 Reactive Stream 을 구현하는지를 설명한다.

SubmissionPublisher Class

이 클래스를 간단히 살펴보자.

이 클래스는 Publisher 인터페이스를 구현한 구현체 클래스 이다. 하지만 아주 간단하지만은 않다.

SubmissionPublisher 클래스가 가지는 메소드가 꽤 있다. 자세히 보면 get 혹은 is 로 시작하는 메소드가 많음을 알 수 있다. 이 클래스들은 Publisher 의 상태 정보를 체크하기 위한 것들이다. 반대로 set 으로 시작하는, 그러니까 뭔가를 지정하는 메소드는 많지가 않다.

이 클래스에 설명은 다음과 같이 시작한다.

Flow.Publisher that asynchronously issues submitted (non-null) items to current subscribers until it is closed. Each current subscriber receives newly submitted items in the same order unless drops or exceptions are encountered. Using a SubmissionPublisher allows item generators to act as compliant reactive-streams Publishers relying on drop handling and/or blocking for flow control.

Flow.Publisher 는 Null 이 아닌 제출된 아이템들을(item) 현재 구독자에게 닫힐때까지 비동기적으로 발행한다. 각각의 현재 구독자들은 삭제 또는 예외가 발생하지 않는한 새롭게 제출된 아이템들을 같은 순서로 받는다. SubmissionPublisher 를 사용하는 것은 아이템 생성기를 흐름제어를 위해 삭제 핸들링과 블럭킹을 필요로하는 reactive-streams Publisher 와 호환되는 것처럼 행동하게 해준다.

https://docs.oracle.com/en/java/javase/11/docs/api/java.base/java/util/concurrent/SubmissionPublisher.html

어찌되었든 간에 이것을 이용하면 Reactive Streams Publisher 처럼 만들어 준다 것에 주목해야 한다.

SubmissionPublisher 사용하기

사용방법은 아주 간단하다.

SubmissionPublisher 의 객체를 하나 생성한다. 그리고 Reactive Stream 이 Publisher 에 요구하는 subscribe 메소드를 호출해 준다.

하지만 subscribe() 메소드는 인자값으로 Subscriber 클래스를 요구한다. Subscriber 클래스는 Interface 클래스여서 구현체 클래서를 넣어야 한다. 다양한 방법이 있지만 여기서는 다음과 같이 작성해 본다.

Reactive Stream 이 요구하는 Subscriber 에 요구 메소드들이(onNext, onError, onComplete) 모두 나온다. 다음과 같이 모두 구현해 준다.

위와같이 구현을 해주면 된다. 여기서 중요한 것이 존재하는데, Subscription 객체 변수를 반드시 써야 한다는 것이다. onSubscribe, onNext 메소드에서 공통으로 보이는 subscription.request(1) 이 바로 그것인데 onNext 메소드에 이를 사용하기 위해서 Subscriber 구현체에서 멤버변수를 만들어두고 onSubscribe 시에 주입된 객체변수를 할당해 준다.

이제 다음과 같이 publisher 의 submit 메소드를 사용해 아이템을 넣어준다.

submit 메소드는 아이템을 제출해 비동기적으로 Subscriber 에게 생성해준다. SubmissionPublisher 의 비동기는 Thread 를 이용한다. 그래서 모든 Thread 가 완료되기를 Main Thread 를 몇초간 중지줘야해서 Thread.sleep(100) 코드가 필요하다.

모든 비동기 제출이 완료되면 제출자를 닫아준다. publisher.close() 다.

이렇게 완료된 전체 코드는 다음과 같다.

Reactive Stream 을 어떻게하면 간단하게 사용해볼까. Reactive Stream 이 요구하는 최소사항을 어떻게하면 직접 사용해볼 수 있을까? 그렇다면 SubmissionPublisher 클래스를 활용하라.

자바 Reactive Stream 구현체

자바(Java) 에서 Reactive Stream 구현체를 살펴본다.

이전 글에서 Reative Stream 의 기본 아이디어를 설명했었다.

  • Flow control
  • Publisher – Subscriber pattern

또한 이 아이디어를 위한 포맷은 Publisher – Subscriber 이다. Reactive Stream 에서 이 모델은 1) Subscriber 가 Publisher 에게 가입을 요청하면 2) Publisher 는 Subscriber 와 통신을 위한 채널인 Subscription 을 생성하고 이를 통보한다. 이는 다음 그림과 같이 묘사할 수 있다.

출처:https://dzone.com/articles/reactive-streams-in-java-9

여기서 주목해야할 것이 Subscriber 는 Subcription 을 통해서 데이터를 요청하고 받게 된다는데 있다.

java.util.concurrent.Flow

자바에서 구현은 위 클래스에 구현 되어 있다. 그리고 이 클래스 첫줄에 다음과 같이 설명이 시작된다.

Interrelated interfaces and static methods for establishing flow-controlled components in which Publishers produce items consumed by one or more Subscribers, each managed by a Subscription

흐름 제어 컴포넌트를 설정하기 위한 (밀접하게) 상호연관된 인터페이스(Interface) 와 스태틱 메소드(Static method)

java.util.concurrent.Flow

주석에 내용에 등장하는 컴포넌트 Publisher, Subscriber, Subscription 이다. Flow.class 는 다음과 같이 구성되어 있다.

Publisher

FunctionalInterface 인게 눈에 뛴다. 이것은 오직 한개의 메소드만을 가진다는 것과 Lambda 식을 지원하게 된다. subscribe 메소드는 인자로 Subscriber 를 받는다.

중간에 주석에 주목할만한 내용이 나온다.

Subscribers may enable receiving items by invoking the request method of this Subscription, and may unsubscribe by invoking its cancel method.

Subscriber 는 이 Subcription 의 request 메소드를 호출함으로써 아이템들을 수신할 수 있으며 cancel 메소드를 호출함으로써 가입이 해제된다.

Subcriber 는 Subscription 의 request 메소드를 호출함으로써 데이터(아이템)을 수신한다는 것이다.

Subscriber

4개의 메소드를 가지고 있다. 이 메소드들이 다루는 것이 무엇인지를 아는것이 제일 중요하다. 각 메소드의 주석에는 Subscription 이 항상 나온다.

Method invoked with a Subscription’s next item. If this method throws an exception, resulting behavior is not guaranteed, but may cause the Subscription to be cancelled

Subscription’s 의 다음 아이템을 호출하는 메소드. 만약 이 메소드가 예외를 발생시키면, 최종 결과를 보장하지 않지만, Subcription 은 취소될 수 있다.

onNext

Subscriber 가 다음 아이템을 호출하는데 Publisher 를 대상으로 하지 않고 Subscription 을 대상으로 한다는 것을 꼭 기억해야 한다.

Subscription

Publisher 와 Subscriber 를 연결하는 메시지 제어를 하는게 목적이다 .request 메소드는 Subcription 에 대해 현재 처리되지 않은 요청을 위한 아이템의 개수를 추가하게 된다. 몇개의 아이템을 요청할 것인지를 요청하는 것이다.

SubmissionPublisher

SubmissionPublisher 는 non-null 아이템을 생성해주는 Publisher 이다. 생성을 해준다고 해서 자동으로 생성해주는 것이 아니라 아이템을 제출해야 한다. 이 제출된 아이템은 순차적으로 Publish 해준다.

submit

이 메소드는 비동기적으로 Subscriber의 onNext 메소드를 호출함으로써 인수로 받은 아이템을 Subscriber 에 아이템을 발행한다.

Reactive Stream Family

Reactive Stream 에는 다양한 프레임워크, 라이브러리들이 존재한다. 이것들은 각자의 고유한 이름과 특징을 가지고 있다. 어떤 것들이 있는지 간단하게 정리해 본다.

Reactive Streams

저수준의 규약으로 주로 자바 인터페이스(Interface) 로 구현 되어 있다. 명시적인 back-pressure 으로 Publisher 와 Subscriber 의 기본 빌딩 블록을 표현한다. Java 9 에서 java.util.concurrent.Flow 로 구현되어 있다.

RxJava

이것은 Reactive Extension 이다. Neflix 에서 개발해 오픈소스로 전화하면서 세상에 알려졌다. ReactiveX 라고도 불린다. 브릿지 Reactive Stream 이라고 말하기도 하는데, Reactive Stream 을 위한 타입 전환을 지원한다.

Reactor

자바 프레임워크 이다. Spring 배포로 유명한 피보탈(Pivotal) 에서 제작해 오픈소스로 배포하고 있다.

Spring Framework

HTTP 서버/클라이언트을 툴을 비롯해 리액트 기능을 가지고 있다. Spring 의 특징은 어노테이션을 이용한 기능 주입등을 사용할 수 있다. 비동기 HTTP 서버에서 구동가능하다. (Netty, Undertow, Tomcat 8 이상)

Akka Stream

Actor Model 을 기반으로 애플리케이션 개발을 위한 Java, Scala 를 위한 툴킷이다.

이것을 정리하는 이유가 있다. Reactive Stream 은 스펙이기 때문이다. 이 스펙을 만족하면 Reactive Stream 이라고 할 수 있다. 물론 자바의 저수준 인터페이스 이름도 Reactive Stream 이라 헷깔릴 수 있지만 Reactive Stream 스펙을 만족하는 다양하고 라이브러리, 프레임워크, 툴킷들이 있다.

Reactive Stream 기본 아이디어

자바 9에서 소개된 Reactive Stream 은 비동기(Asynchronously), 논 블럭킹(Non-Blocking)이 특징이다.

그렇다면 왜 이것이 특징이되어야만 했나. 이 특징에 반대되는 개념은 블럭킹(Blocking) 이며 블럭킹을 발생시키는 요인은 동시식 요청인 것이다. 결과적으로 이 둘을 해결해야만 하는 과제를 안고 있었는데 Reactive Stream 이 이 문제를 해결했다고 볼수 있다.

이 글에서 Reactive Stream 에서 핵심 특징의 아이디어를 아주 가볍게 고찰해 본다.

일반적인 데이터 처리 흐름.

데이터를 처리를 어떻게 할까? 좀 더 정확하게 표현을 한다면 데이터 처리를 어떠한 형태를 가지고 할까?

대부분 위와 같은 형태를 가진다. 첫째로 데이터 처리를 요청하는 무언가가 있다. 이를 ‘데이터 처리 요청자’라고 하자. 둘째로 데이터를 처리하는 무언가가 있다. 이를 ‘데이터 처리자’라고 하자. 데이터 처리 요청자는 데이터 처리를 담당하는 데이터 처리자에게 데이터를 던진다. 그러면 데이터 처리자는 요청이 들어온 순서대로 데이터를 처리하게 된다.

그런데, 데이터 처리 요청자가 갑자기 한꺼번에 많은 데이터를 던지면 어떻게 될까? 아니면 데이터 처리 요청자 여러개가 하나의 데이터 처리자에게 데이터를 던지면 어떻게 될까?

데이터 처리자는 일을 처리하는데 필요한 자원이 한정되어 있다. 순차적으로 일을 처리하는 데이터 처리자는 한꺼번에 밀려드는 데이터를 쌓아두게 된다. 또, 데이터 처리 요청자는 요청한 데이터가 모두 처리되어 돌아올때까지 기달려야 한다.

이렇게 되면 전체적으로 시스템의 성능이 느려지게 된다. 모든 데이터처리가 동기화되어서 작동되는 방식이며 데이터 처리자가 모든 것을 처리할때까지 요청자는 다른 요청을 모두 차단(Block) 하게 된다.

Reactive Stream 은 이러한 문제를 해결하고자 하는게 핵심 포인트라고 보면 된다.

기본 아이디어 – 데이터 흐름을 뒤집다.

데이터 처리자의 자원은 한정되어 있다. 자원이 할당되어져야 데이터 처리자는 데이터를 처리하게 된다.

그렇다면 데이터 처리자가 자원이 할당 되었을때에 데이터 처리자가 데이터 요청자에 데이터를 가지고 가면 될 것이 아닌가? 이렇게 하기 위해서는 데이터 처리자가 데이터 요청자를 알고 있어야 한다.

이러한 아이디어는 Reactive Stream 에서는 다음과 같은 것이 핵심적인 키 포인트로 정리가 된다.

  • Flow Control
  • Publish-Subscribe pattern

기존의 데이터 흐름 제어를 완전히 뒤집은 새로운 아이디어다.

Reactive Stream 작동 방법

Reactive Stream 에서 구현은 기본 아이디어를 바탕으로 하지만 약간 다르다.

Reactive Stream 에서는 Subcriber 가 Publisher 에게 직접적으로 데이터를 요청하는 것이 아니라 어느정도 데이터 처리가 가능한지에 대한 정보를 알려준다. 그렇게되면 Publisher 는 처리가 가능한 정도만 데이터를 보내게 된다.

Publisher 와 Subscriber 사이에 정보를 교환하기 위한 일종의 채널이 필요하게 된다. 이러한 채널을 Subscription 이라고 한다. 이러한 Subscription 은 Subscriber 가 Publisher 에 가입하는 순간 생성된다.

Subscription 은 Publisher 와 Subcriber 간의 통신 채널이다. Subcriber 는 Subscription 을 통해 몇개나 받을 수 있는지에 대한 정보를 통지한다. 그러면 Publisher 는 다음과 같은 정보를 Subcriber 에게 전송한다.

  • Subscribed with subscription
  • error
  • complete
  • cancel

Publisher 가 complete 나 cancel 을 통지하면 채널은 해제되고 Publisher 와 Subscriber 관계는 끝이 난다.

Back-Pressure

역압이라고 번역하던데, 의미를 파악하기 어려운 용어들이다. 이 용어에 대한 정의는 다음에서 찾을 수 있다 .

This back-pressure is an important feedback mechanism that allows systems to gracefully respond to load rather than collapse under it

Back-Pressure 는 중요한 피드백 매커니즘인데, 대량으로 데이터를 수신한 Subscriber 가 과부하에 응답을 못하거나 시스템이 다운되지 않고 정상적으로 응답하게 해준다.

https://www.reactivemanifesto.org/glossary#Back-Pressure

Back-Pressure 는 매커니즘인데, 앞에서 설명한 어쩔땐 pull-based 혹은 push-based 로 작동되게하는 것이 바로 Back-Pressure 다.

여기서 한가지 중요한 포인트가 있다. Back-Pressure 는 성능을 보장하지 않는다는 것이다. 시스템을 보호해 어떻게든 응답을 보장하는게 목적인 것이지 Subscriber 가 응답을 빠르게 해야한다는 것은 아니다.