Table of Contents
Reactive Stream 을 사용하기 위해서는 다음과 같은 것을 구현해줘야 한다.
- Publisher
- Subscriber
- Subscription
한가지 문제가 있다. 이것을 구현하기가 쉽지가 않다. 특히 Publisher 의 경우에는 Reactive Stream 의 기본 아이디어만 가지고 간단하게 구현할 수가 없다. 어떤 것을 Publisher 할지도 영향을 주는것도 문제지만 Reactive 의 Spec 을 마춰짜야 하는데 이게 쉽지가 안다.
Publisher 를 간단하게 구현 방법이 없을까? SubmissionPublisher 를 이용하면 간단하게 사용해 볼 수 있다.
이 문서는 SubmissionPublisher 를 이용해 어떻게 Reactive Stream 을 구현하는지를 설명한다.
SubmissionPublisher Class
이 클래스를 간단히 살펴보자.
public class SubmissionPublisher<T> implements Publisher<T>, AutoCloseable {
이 클래스는 Publisher 인터페이스를 구현한 구현체 클래스 이다. 하지만 아주 간단하지만은 않다.
SubmissionPublisher 클래스가 가지는 메소드가 꽤 있다. 자세히 보면 get 혹은 is 로 시작하는 메소드가 많음을 알 수 있다. 이 클래스들은 Publisher 의 상태 정보를 체크하기 위한 것들이다. 반대로 set 으로 시작하는, 그러니까 뭔가를 지정하는 메소드는 많지가 않다.
이 클래스에 설명은 다음과 같이 시작한다.
A
Flow.Publisherthat asynchronously issues submitted (non-null) items to current subscribers until it is closed. Each current subscriber receives newly submitted items in the same order unless drops or exceptions are encountered. Using a SubmissionPublisher allows item generators to act as compliant reactive-streams Publishers relying on drop handling and/or blocking for flow control.Flow.Publisher 는 Null 이 아닌 제출된 아이템들을(item) 현재 구독자에게 닫힐때까지 비동기적으로 발행한다. 각각의 현재 구독자들은 삭제 또는 예외가 발생하지 않는한 새롭게 제출된 아이템들을 같은 순서로 받는다. SubmissionPublisher 를 사용하는 것은 아이템 생성기를 흐름제어를 위해 삭제 핸들링과 블럭킹을 필요로하는 reactive-streams Publisher 와 호환되는 것처럼 행동하게 해준다.
https://docs.oracle.com/en/java/javase/11/docs/api/java.base/java/util/concurrent/SubmissionPublisher.html
어찌되었든 간에 이것을 이용하면 Reactive Streams Publisher 처럼 만들어 준다 것에 주목해야 한다.
SubmissionPublisher 사용하기
사용방법은 아주 간단하다.
SubmissionPublisher<Integer> publisher = new SubmissionPublisher<>();
SubmissionPublisher 의 객체를 하나 생성한다. 그리고 Reactive Stream 이 Publisher 에 요구하는 subscribe 메소드를 호출해 준다.
publisher.subscribe()
하지만 subscribe() 메소드는 인자값으로 Subscriber 클래스를 요구한다. Subscriber 클래스는 Interface 클래스여서 구현체 클래서를 넣어야 한다. 다양한 방법이 있지만 여기서는 다음과 같이 작성해 본다.
publisher.subscribe(new Subscriber<Integer>() {
@Override
public void onSubscribe(Subscription subscription) {
// TODO Auto-generated method stub
}
@Override
public void onNext(Integer item) {
// TODO Auto-generated method stub
}
@Override
public void onError(Throwable throwable) {
// TODO Auto-generated method stub
}
@Override
public void onComplete() {
// TODO Auto-generated method stub
}
Reactive Stream 이 요구하는 Subscriber 에 요구 메소드들이(onNext, onError, onComplete) 모두 나온다. 다음과 같이 모두 구현해 준다.
publisher.subscribe(new Subscriber<Integer>() {
private Subscription subscription;
@Override
public void onSubscribe(Subscription subscription) {
// TODO Auto-generated method stub
this.subscription = subscription;
subscription.request(1);
}
@Override
public void onNext(Integer item) {
// TODO Auto-generated method stub
System.out.println("Received item:" + item);
subscription.request(1);
}
@Override
public void onError(Throwable throwable) {
// TODO Auto-generated method stub
System.out.println("Error occurred: " + throwable.getMessage());
}
@Override
public void onComplete() {
// TODO Auto-generated method stub
System.out.println("Subscriber is complete");
}
});
위와같이 구현을 해주면 된다. 여기서 중요한 것이 존재하는데, Subscription 객체 변수를 반드시 써야 한다는 것이다. onSubscribe, onNext 메소드에서 공통으로 보이는 subscription.request(1) 이 바로 그것인데 onNext 메소드에 이를 사용하기 위해서 Subscriber 구현체에서 멤버변수를 만들어두고 onSubscribe 시에 주입된 객체변수를 할당해 준다.
이제 다음과 같이 publisher 의 submit 메소드를 사용해 아이템을 넣어준다.
for (int i = 0; i < 10; i++) {
publisher.submit(i);
}
Thread.sleep(100);
publisher.close();
submit 메소드는 아이템을 제출해 비동기적으로 Subscriber 에게 생성해준다. SubmissionPublisher 의 비동기는 Thread 를 이용한다. 그래서 모든 Thread 가 완료되기를 Main Thread 를 몇초간 중지줘야해서 Thread.sleep(100) 코드가 필요하다.
모든 비동기 제출이 완료되면 제출자를 닫아준다. publisher.close() 다.
이렇게 완료된 전체 코드는 다음과 같다.
SubmissionPublisher<Integer> publisher = new SubmissionPublisher<>();
publisher.subscribe(new Subscriber<Integer>() {
private Subscription subscription;
@Override
public void onSubscribe(Subscription subscription) {
// TODO Auto-generated method stub
this.subscription = subscription;
subscription.request(1);
}
@Override
public void onNext(Integer item) {
// TODO Auto-generated method stub
System.out.println("Received item:" + item);
subscription.request(1);
}
@Override
public void onError(Throwable throwable) {
// TODO Auto-generated method stub
System.out.println("Error occurred: " + throwable.getMessage());
}
@Override
public void onComplete() {
// TODO Auto-generated method stub
System.out.println("Subscriber is complete");
}
});
for (int i = 0; i < 10; i++) {
publisher.submit(i);
}
Thread.sleep(100);
publisher.close();
Reactive Stream 을 어떻게하면 간단하게 사용해볼까. Reactive Stream 이 요구하는 최소사항을 어떻게하면 직접 사용해볼 수 있을까? 그렇다면 SubmissionPublisher 클래스를 활용하라.
