Reactive Stream 간단 사용
Reactive Stream 을 사용하기 위해서는 다음과 같은 것을 구현해줘야 한다.
- Publisher
- Subscriber
- Subscription
한가지 문제가 있다. 이것을 구현하기가 쉽지가 않다. 특히 Publisher 의 경우에는 Reactive Stream 의 기본 아이디어만 가지고 간단하게 구현할 수가 없다. 어떤 것을 Publisher 할지도 영향을 주는것도 문제지만 Reactive 의 Spec 을 마춰짜야 하는데 이게 쉽지가 안다.
Publisher 를 간단하게 구현 방법이 없을까? SubmissionPublisher 를 이용하면 간단하게 사용해 볼 수 있다.
이 문서는 SubmissionPublisher 를 이용해 어떻게 Reactive Stream 을 구현하는지를 설명한다.
SubmissionPublisher Class
이 클래스를 간단히 살펴보자.
1 |
public class SubmissionPublisher<T> implements Publisher<T>, AutoCloseable { |
이 클래스는 Publisher 인터페이스를 구현한 구현체 클래스 이다. 하지만 아주 간단하지만은 않다.
SubmissionPublisher 클래스가 가지는 메소드가 꽤 있다. 자세히 보면 get 혹은 is 로 시작하는 메소드가 많음을 알 수 있다. 이 클래스들은 Publisher 의 상태 정보를 체크하기 위한 것들이다. 반대로 set 으로 시작하는, 그러니까 뭔가를 지정하는 메소드는 많지가 않다.
이 클래스에 설명은 다음과 같이 시작한다.
A
Flow.Publisher
that asynchronously issues submitted (non-null) items to current subscribers until it is closed. Each current subscriber receives newly submitted items in the same order unless drops or exceptions are encountered. Using a SubmissionPublisher allows item generators to act as compliant reactive-streams Publishers relying on drop handling and/or blocking for flow control.Flow.Publisher 는 Null 이 아닌 제출된 아이템들을(item) 현재 구독자에게 닫힐때까지 비동기적으로 발행한다. 각각의 현재 구독자들은 삭제 또는 예외가 발생하지 않는한 새롭게 제출된 아이템들을 같은 순서로 받는다. SubmissionPublisher 를 사용하는 것은 아이템 생성기를 흐름제어를 위해 삭제 핸들링과 블럭킹을 필요로하는 reactive-streams Publisher 와 호환되는 것처럼 행동하게 해준다.
https://docs.oracle.com/en/java/javase/11/docs/api/java.base/java/util/concurrent/SubmissionPublisher.html
어찌되었든 간에 이것을 이용하면 Reactive Streams Publisher 처럼 만들어 준다 것에 주목해야 한다.
SubmissionPublisher 사용하기
사용방법은 아주 간단하다.
1 |
SubmissionPublisher<Integer> publisher = new SubmissionPublisher<>(); |
SubmissionPublisher 의 객체를 하나 생성한다. 그리고 Reactive Stream 이 Publisher 에 요구하는 subscribe 메소드를 호출해 준다.
1 |
publisher.subscribe() |
하지만 subscribe() 메소드는 인자값으로 Subscriber 클래스를 요구한다. Subscriber 클래스는 Interface 클래스여서 구현체 클래서를 넣어야 한다. 다양한 방법이 있지만 여기서는 다음과 같이 작성해 본다.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 |
publisher.subscribe(new Subscriber<Integer>() { @Override public void onSubscribe(Subscription subscription) { // TODO Auto-generated method stub } @Override public void onNext(Integer item) { // TODO Auto-generated method stub } @Override public void onError(Throwable throwable) { // TODO Auto-generated method stub } @Override public void onComplete() { // TODO Auto-generated method stub } |
Reactive Stream 이 요구하는 Subscriber 에 요구 메소드들이(onNext, onError, onComplete) 모두 나온다. 다음과 같이 모두 구현해 준다.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 |
publisher.subscribe(new Subscriber<Integer>() { private Subscription subscription; @Override public void onSubscribe(Subscription subscription) { // TODO Auto-generated method stub this.subscription = subscription; subscription.request(1); } @Override public void onNext(Integer item) { // TODO Auto-generated method stub System.out.println("Received item:" + item); subscription.request(1); } @Override public void onError(Throwable throwable) { // TODO Auto-generated method stub System.out.println("Error occurred: " + throwable.getMessage()); } @Override public void onComplete() { // TODO Auto-generated method stub System.out.println("Subscriber is complete"); } }); |
위와같이 구현을 해주면 된다. 여기서 중요한 것이 존재하는데, Subscription 객체 변수를 반드시 써야 한다는 것이다. onSubscribe, onNext 메소드에서 공통으로 보이는 subscription.request(1) 이 바로 그것인데 onNext 메소드에 이를 사용하기 위해서 Subscriber 구현체에서 멤버변수를 만들어두고 onSubscribe 시에 주입된 객체변수를 할당해 준다.
이제 다음과 같이 publisher 의 submit 메소드를 사용해 아이템을 넣어준다.
1 2 3 4 5 |
for (int i = 0; i < 10; i++) { publisher.submit(i); } Thread.sleep(100); publisher.close(); |
submit 메소드는 아이템을 제출해 비동기적으로 Subscriber 에게 생성해준다. SubmissionPublisher 의 비동기는 Thread 를 이용한다. 그래서 모든 Thread 가 완료되기를 Main Thread 를 몇초간 중지줘야해서 Thread.sleep(100) 코드가 필요하다.
모든 비동기 제출이 완료되면 제출자를 닫아준다. publisher.close() 다.
이렇게 완료된 전체 코드는 다음과 같다.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 |
SubmissionPublisher<Integer> publisher = new SubmissionPublisher<>(); publisher.subscribe(new Subscriber<Integer>() { private Subscription subscription; @Override public void onSubscribe(Subscription subscription) { // TODO Auto-generated method stub this.subscription = subscription; subscription.request(1); } @Override public void onNext(Integer item) { // TODO Auto-generated method stub System.out.println("Received item:" + item); subscription.request(1); } @Override public void onError(Throwable throwable) { // TODO Auto-generated method stub System.out.println("Error occurred: " + throwable.getMessage()); } @Override public void onComplete() { // TODO Auto-generated method stub System.out.println("Subscriber is complete"); } }); for (int i = 0; i < 10; i++) { publisher.submit(i); } Thread.sleep(100); publisher.close(); |
Reactive Stream 을 어떻게하면 간단하게 사용해볼까. Reactive Stream 이 요구하는 최소사항을 어떻게하면 직접 사용해볼 수 있을까? 그렇다면 SubmissionPublisher 클래스를 활용하라.