자바 Reactive Stream 구현체
자바(Java) 에서 Reactive Stream 구현체를 살펴본다.
이전 글에서 Reative Stream 의 기본 아이디어를 설명했었다.
- Flow control
- Publisher – Subscriber pattern
또한 이 아이디어를 위한 포맷은 Publisher – Subscriber 이다. Reactive Stream 에서 이 모델은 1) Subscriber 가 Publisher 에게 가입을 요청하면 2) Publisher 는 Subscriber 와 통신을 위한 채널인 Subscription 을 생성하고 이를 통보한다. 이는 다음 그림과 같이 묘사할 수 있다.
여기서 주목해야할 것이 Subscriber 는 Subcription 을 통해서 데이터를 요청하고 받게 된다는데 있다.
java.util.concurrent.Flow
자바에서 구현은 위 클래스에 구현 되어 있다. 그리고 이 클래스 첫줄에 다음과 같이 설명이 시작된다.
Interrelated interfaces and static methods for establishing flow-controlled components in which Publishers produce items consumed by one or more Subscribers, each managed by a Subscription
흐름 제어 컴포넌트를 설정하기 위한 (밀접하게) 상호연관된 인터페이스(Interface) 와 스태틱 메소드(Static method)
java.util.concurrent.Flow
주석에 내용에 등장하는 컴포넌트 Publisher, Subscriber, Subscription 이다. Flow.class 는 다음과 같이 구성되어 있다.
Publisher
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 |
@FunctionalInterface public static interface Publisher<T> { /** * Adds the given Subscriber if possible. If already * subscribed, or the attempt to subscribe fails due to policy * violations or errors, the Subscriber's {@code onError} * method is invoked with an {@link IllegalStateException}. * Otherwise, the Subscriber's {@code onSubscribe} method is * invoked with a new {@link Subscription}. Subscribers may * enable receiving items by invoking the {@code request} * method of this Subscription, and may unsubscribe by * invoking its {@code cancel} method. * * @param subscriber the subscriber * @throws NullPointerException if subscriber is null */ public void subscribe(Subscriber<? super T> subscriber); } |
FunctionalInterface 인게 눈에 뛴다. 이것은 오직 한개의 메소드만을 가진다는 것과 Lambda 식을 지원하게 된다. subscribe 메소드는 인자로 Subscriber 를 받는다.
중간에 주석에 주목할만한 내용이 나온다.
Subscribers may enable receiving items by invoking the request method of this Subscription, and may unsubscribe by invoking its cancel method.
Subscriber 는 이 Subcription 의 request 메소드를 호출함으로써 아이템들을 수신할 수 있으며 cancel 메소드를 호출함으로써 가입이 해제된다.
Subcriber 는 Subscription 의 request 메소드를 호출함으로써 데이터(아이템)을 수신한다는 것이다.
Subscriber
1 2 3 4 5 6 7 8 9 10 |
public static interface Subscriber<T> { public void onSubscribe(Subscription subscription); public void onNext(T item); public void onError(Throwable throwable); public void onComplete(); } |
4개의 메소드를 가지고 있다. 이 메소드들이 다루는 것이 무엇인지를 아는것이 제일 중요하다. 각 메소드의 주석에는 Subscription 이 항상 나온다.
Method invoked with a Subscription’s next item. If this method throws an exception, resulting behavior is not guaranteed, but may cause the Subscription to be cancelled
Subscription’s 의 다음 아이템을 호출하는 메소드. 만약 이 메소드가 예외를 발생시키면, 최종 결과를 보장하지 않지만, Subcription 은 취소될 수 있다.
onNext
Subscriber 가 다음 아이템을 호출하는데 Publisher 를 대상으로 하지 않고 Subscription 을 대상으로 한다는 것을 꼭 기억해야 한다.
Subscription
1 2 3 4 5 6 |
public static interface Subscription { public void request(long n); public void cancel(); } |
Publisher 와 Subscriber 를 연결하는 메시지 제어를 하는게 목적이다 .request 메소드는 Subcription 에 대해 현재 처리되지 않은 요청을 위한 아이템의 개수를 추가하게 된다. 몇개의 아이템을 요청할 것인지를 요청하는 것이다.
SubmissionPublisher
SubmissionPublisher 는 non-null 아이템을 생성해주는 Publisher 이다. 생성을 해준다고 해서 자동으로 생성해주는 것이 아니라 아이템을 제출해야 한다. 이 제출된 아이템은 순차적으로 Publish 해준다.
submit
1 2 3 |
public int submit(T item) { return doOffer(item, Long.MAX_VALUE, null); } |
이 메소드는 비동기적으로 Subscriber의 onNext 메소드를 호출함으로써 인수로 받은 아이템을 Subscriber 에 아이템을 발행한다.