Reactive Stream 간단 사용

Reactive Stream 을 사용하기 위해서는 다음과 같은 것을 구현해줘야 한다.

  • Publisher
  • Subscriber
  • Subscription

한가지 문제가 있다. 이것을 구현하기가 쉽지가 않다. 특히 Publisher 의 경우에는 Reactive Stream 의 기본 아이디어만 가지고 간단하게 구현할 수가 없다. 어떤 것을 Publisher 할지도 영향을 주는것도 문제지만 Reactive 의 Spec 을 마춰짜야 하는데 이게 쉽지가 안다.

Publisher 를 간단하게 구현 방법이 없을까? SubmissionPublisher 를 이용하면 간단하게 사용해 볼 수 있다.

이 문서는 SubmissionPublisher 를 이용해 어떻게 Reactive Stream 을 구현하는지를 설명한다.

SubmissionPublisher Class

이 클래스를 간단히 살펴보자.

이 클래스는 Publisher 인터페이스를 구현한 구현체 클래스 이다. 하지만 아주 간단하지만은 않다.

SubmissionPublisher 클래스가 가지는 메소드가 꽤 있다. 자세히 보면 get 혹은 is 로 시작하는 메소드가 많음을 알 수 있다. 이 클래스들은 Publisher 의 상태 정보를 체크하기 위한 것들이다. 반대로 set 으로 시작하는, 그러니까 뭔가를 지정하는 메소드는 많지가 않다.

이 클래스에 설명은 다음과 같이 시작한다.

Flow.Publisher that asynchronously issues submitted (non-null) items to current subscribers until it is closed. Each current subscriber receives newly submitted items in the same order unless drops or exceptions are encountered. Using a SubmissionPublisher allows item generators to act as compliant reactive-streams Publishers relying on drop handling and/or blocking for flow control.

Flow.Publisher 는 Null 이 아닌 제출된 아이템들을(item) 현재 구독자에게 닫힐때까지 비동기적으로 발행한다. 각각의 현재 구독자들은 삭제 또는 예외가 발생하지 않는한 새롭게 제출된 아이템들을 같은 순서로 받는다. SubmissionPublisher 를 사용하는 것은 아이템 생성기를 흐름제어를 위해 삭제 핸들링과 블럭킹을 필요로하는 reactive-streams Publisher 와 호환되는 것처럼 행동하게 해준다.

https://docs.oracle.com/en/java/javase/11/docs/api/java.base/java/util/concurrent/SubmissionPublisher.html

어찌되었든 간에 이것을 이용하면 Reactive Streams Publisher 처럼 만들어 준다 것에 주목해야 한다.

SubmissionPublisher 사용하기

사용방법은 아주 간단하다.

SubmissionPublisher 의 객체를 하나 생성한다. 그리고 Reactive Stream 이 Publisher 에 요구하는 subscribe 메소드를 호출해 준다.

하지만 subscribe() 메소드는 인자값으로 Subscriber 클래스를 요구한다. Subscriber 클래스는 Interface 클래스여서 구현체 클래서를 넣어야 한다. 다양한 방법이 있지만 여기서는 다음과 같이 작성해 본다.

Reactive Stream 이 요구하는 Subscriber 에 요구 메소드들이(onNext, onError, onComplete) 모두 나온다. 다음과 같이 모두 구현해 준다.

위와같이 구현을 해주면 된다. 여기서 중요한 것이 존재하는데, Subscription 객체 변수를 반드시 써야 한다는 것이다. onSubscribe, onNext 메소드에서 공통으로 보이는 subscription.request(1) 이 바로 그것인데 onNext 메소드에 이를 사용하기 위해서 Subscriber 구현체에서 멤버변수를 만들어두고 onSubscribe 시에 주입된 객체변수를 할당해 준다.

이제 다음과 같이 publisher 의 submit 메소드를 사용해 아이템을 넣어준다.

submit 메소드는 아이템을 제출해 비동기적으로 Subscriber 에게 생성해준다. SubmissionPublisher 의 비동기는 Thread 를 이용한다. 그래서 모든 Thread 가 완료되기를 Main Thread 를 몇초간 중지줘야해서 Thread.sleep(100) 코드가 필요하다.

모든 비동기 제출이 완료되면 제출자를 닫아준다. publisher.close() 다.

이렇게 완료된 전체 코드는 다음과 같다.

Reactive Stream 을 어떻게하면 간단하게 사용해볼까. Reactive Stream 이 요구하는 최소사항을 어떻게하면 직접 사용해볼 수 있을까? 그렇다면 SubmissionPublisher 클래스를 활용하라.

Post a comment

You may use the following HTML:
<a href="" title=""> <abbr title=""> <acronym title=""> <b> <blockquote cite=""> <cite> <code class="" title="" data-url=""> <del datetime=""> <em> <i> <q cite=""> <s> <strike> <strong> <pre class="" title="" data-url=""> <span class="" title="" data-url="">