Reactive Stream 이란
토비 강의1
1. 리액티브 프로그래밍이 나오게된이유
먼저 리액티브 프로그래밍을 이야기하기전에 상대성 개념을 이해야한다. 상대성개념의 대표적인 예는 pull과 push이다. 이 두개의 개념을 자바에서 잘보여주는 예가 Iterable과 observable이다.
먼저 풀방식에 iterable을 구현하면 아래와같다.
Iterable
public class IterableMain {
public static void main(String[] args) {
Iterable<Integer> iter = new Iterable<>(){
@Override
public Iterator<Integer> iterator() {
return new Iterator<Integer>() {
int i = 0;
final static int MAX = 10;
@Override
public boolean hasNext() {
return i < MAX;
}
@Override
public Integer next() {
return ++i;
}
};
}
};
for (Integer i : iter){
System.out.println(i);
}
for(Iterator<Integer> it = iter.iterator(); it.hasNext();){
System.out.println(it.next()); // 데이터를 polling 해가는 방식
}
}
}
위에 Iterable에서 데이터를 가져오려면 it.next()메소드를 통해서 pull해오는 방식으로 데이터들을 불러온다. 반대로 Observable 개념은 push 개념으로 데이터를 보낸다.
그다음은 push 방식에 옵저번 패턴을 구현해보겠다.
Observable
public class ObservableMain {
static class IntObservable extends Observable implements Runnable{
public void run() {
for(int i = 1; i <= 10; i++){
setChanged(); //publisher, observable
notifyObservers(i);
}
}
}
public static void main(String[] args) throws IOException {
Observer ob = new Observer() {
@Override
public void update(Observable o, Object arg) { // subscriber // observer
System.out.println(Thread.currentThread().getName() + " " + arg);
}
};
IntObservable io = new IntObservable();
io.addObserver(ob);
ExecutorService executorService = Executors.newSingleThreadExecutor();
executorService.execute(io);
System.out.println(Thread.currentThread().getName() + " " + "EXIT");
}
}
위에보면 notifyObservers를 통해 데이터를 보내주는것을 볼 수 있다. 이게 push 개념이다. 위에 코드를 돌리면 아래와같은 결과를 얻을 수 있다.
main EXIT
pool-1-thread-1 1
pool-1-thread-1 2
pool-1-thread-1 3
pool-1-thread-1 4
pool-1-thread-1 5
pool-1-thread-1 6
pool-1-thread-1 7
pool-1-thread-1 8
pool-1-thread-1 9
pool-1-thread-1 10
이게 두개의 상대성 개념을 비교한 개념이다. poll 방식을 보면 push보다 몇가지 장점을 가진다. 예를들어서 브로드캐스트 방식으로 데이터를 처리할 수 있다. 하지만 orserve 패턴은 아래와같은 단점을가진다.
- 기존 옵저버 패턴의 문제점은 Complete 를 가지고 있지 않는 단점이 있었음
- Error 처리를 하는데 어려움을 가지고 있었음
이러한 단점을 보안하는게 Reactive Stream이다.
2. Reactive Stream 이란
논블록킹 비동기 스트림의 프로그래밍 표준을 Reactive Stream이라한다. JVM과 Javascript 진형에서 만든 리액티브 프로그래밍 표준이다. (www.reactive-streams.org),
- ReactiveX
- rx java
- rx javascript
- 등등
Ractive 프로그래밍을 하기위해서는 표준 인터페잇를 구현해줘야한다. 인터페이스느 크게 4가지가있다
- Processor<T, R>
- Publisher
- Subuscriber
- Subscription
publisher는 연속적으로 넘어가는 무한한 엘리먼트들을 제공하는 역할을 한다. Subscriber는 제공받은 엘리먼트들을 다루는역할를한다.
Publisher.subscriber(Subscriber)
onSubscribe onNext* (onError | onComplete)?
- onSubscribe 반드시 호출되어야하는 메소드이다.
- onNext는 0~N까지 호출될 수 있다.
- onError, onComplete둘중에 하나만 호출될 수 있다.
- Subscription은 Publisher와 Suberscriber를 연결하는 역할을한다.
- Publisher가 처리해야할 데이터가 100만개인경우 이것을 한번에 푸쉬되지 않게 subscription에서 백프레쉘어라는 개념으로 개수를 제어할 수 있다.
- Subscription을 통해 엘리먼트 갯수를 지정할 수 있다.
- request값에 값을 지정하면 그갯수만큼 subscriber로 넘어간다.
public class PubSub {
public static void main(String[] args){
// publisher <- Observable
// subscriber <- observer
List<Integer> iter = Arrays.asList(1, 2, 3, 4, 5);
Publisher p = new Publisher<>() {
@Override
public void subscribe(Subscriber subscriber) {
Iterator<Integer> it = iter.iterator();
subscriber.onSubscribe(new Flow.Subscription() {
@Override
public void request(long n) {
try {
while (n-- > 0) {
if (it.hasNext()) {
subscriber.onNext(it.next());
} else {
subscriber.onComplete();
break;
}
}
} catch (Exception e) {
subscriber.onError(e);
}
}
@Override
public void cancel() {
}
});
}
};
Subscriber<Integer> subscriber = new Subscriber<Integer>(){
Flow.Subscription subscription;
@Override
public void onSubscribe(Flow.Subscription subscription) { // 반드시 호출되는 메소드
System.out.println("onSubscription");
this.subscription = subscription;
this.subscription.request(1);
}
@Override
public void onNext(Integer item) {
System.out.println("onNext " + item);
this.subscription.request(1);
}
@Override
public void onError(Throwable throwable) {
System.out.println("onError");
}
@Override
public void onComplete() {
System.out.println("onComplete");
}
};
p.subscribe(subscriber);
}
}