Spring

Reactive Stream 이란

메모는나의열정 2021. 10. 2. 13:20
반응형

토비 강의1

1. 리액티브 프로그래밍이 나오게된이유

먼저 리액티브 프로그래밍을 이야기하기전에 상대성 개념을 이해야한다. 상대성개념의 대표적인 예는 pull과 push이다. 이 두개의 개념을 자바에서 잘보여주는 예가 Iterable과 observable이다.

상대성(duality)

먼저 풀방식에 iterable을 구현하면 아래와같다.

Iterable

public class IterableMain {

    public static void main(String[] args) {

        Iterable<Integer> iter = new Iterable<>(){

            @Override
            public Iterator<Integer> iterator() {
                return new Iterator<Integer>() {

                    int i = 0;
                    final static int MAX = 10;
                    @Override
                    public boolean hasNext() {
                        return i < MAX;
                    }

                    @Override
                    public Integer next() {
                        return ++i;
                    }
                };
            }
        };

        for (Integer i : iter){
            System.out.println(i);
        }

        for(Iterator<Integer> it = iter.iterator(); it.hasNext();){
            System.out.println(it.next()); // 데이터를 polling 해가는 방식
        } 
    }
}

위에 Iterable에서 데이터를 가져오려면 it.next()메소드를 통해서 pull해오는 방식으로 데이터들을 불러온다. 반대로 Observable 개념은 push 개념으로 데이터를 보낸다.

그다음은 push 방식에 옵저번 패턴을 구현해보겠다.

Untitled

Observable

public class ObservableMain {

    static class IntObservable extends Observable implements Runnable{
        public void run() {

            for(int i = 1; i <= 10; i++){
                setChanged(); //publisher, observable
                notifyObservers(i);
            }

        }
    }

    public static void main(String[] args) throws IOException {
        Observer ob = new Observer() {
            @Override
            public void update(Observable o, Object arg) { // subscriber // observer

                System.out.println(Thread.currentThread().getName() + " "  + arg);

            }
        };

        IntObservable io = new IntObservable();
        io.addObserver(ob);

        ExecutorService executorService = Executors.newSingleThreadExecutor();
        executorService.execute(io);
        System.out.println(Thread.currentThread().getName() + " "  + "EXIT");
    }
}

위에보면 notifyObservers를 통해 데이터를 보내주는것을 볼 수 있다. 이게 push 개념이다. 위에 코드를 돌리면 아래와같은 결과를 얻을 수 있다.

main EXIT
pool-1-thread-1 1
pool-1-thread-1 2
pool-1-thread-1 3
pool-1-thread-1 4
pool-1-thread-1 5
pool-1-thread-1 6
pool-1-thread-1 7
pool-1-thread-1 8
pool-1-thread-1 9
pool-1-thread-1 10

이게 두개의 상대성 개념을 비교한 개념이다. poll 방식을 보면 push보다 몇가지 장점을 가진다. 예를들어서 브로드캐스트 방식으로 데이터를 처리할 수 있다. 하지만 orserve 패턴은 아래와같은 단점을가진다.

  • 기존 옵저버 패턴의 문제점은 Complete 를 가지고 있지 않는 단점이 있었음
  • Error 처리를 하는데 어려움을 가지고 있었음

이러한 단점을 보안하는게 Reactive Stream이다.

2. Reactive Stream 이란

논블록킹 비동기 스트림의 프로그래밍 표준을 Reactive Stream이라한다. JVM과 Javascript 진형에서 만든 리액티브 프로그래밍 표준이다. (www.reactive-streams.org),

  • ReactiveX
    • rx java
    • rx javascript
    • 등등

Ractive 프로그래밍을 하기위해서는 표준 인터페잇를 구현해줘야한다. 인터페이스느 크게 4가지가있다

  • Processor<T, R>
  • Publisher
  • Subuscriber
  • Subscription

publisher는 연속적으로 넘어가는 무한한 엘리먼트들을 제공하는 역할을 한다. Subscriber는 제공받은 엘리먼트들을 다루는역할를한다.

Publisher.subscriber(Subscriber)

onSubscribe onNext* (onError | onComplete)?
  • onSubscribe 반드시 호출되어야하는 메소드이다.
  • onNext는 0~N까지 호출될 수 있다.
  • onError, onComplete둘중에 하나만 호출될 수 있다.

Untitled

  • Subscription은 Publisher와 Suberscriber를 연결하는 역할을한다.
  • Publisher가 처리해야할 데이터가 100만개인경우 이것을 한번에 푸쉬되지 않게 subscription에서 백프레쉘어라는 개념으로 개수를 제어할 수 있다.
  • Subscription을 통해 엘리먼트 갯수를 지정할 수 있다.
    • request값에 값을 지정하면 그갯수만큼 subscriber로 넘어간다.
public class PubSub {

    public static void main(String[] args){
        // publisher <- Observable
        // subscriber <- observer

        List<Integer> iter = Arrays.asList(1, 2, 3, 4, 5);

        Publisher p = new Publisher<>() {
            @Override
            public void subscribe(Subscriber subscriber) {

                Iterator<Integer> it = iter.iterator();

                subscriber.onSubscribe(new Flow.Subscription() {

                    @Override
                    public void request(long n) {

                        try {
                            while (n-- > 0) {
                                if (it.hasNext()) {
                                    subscriber.onNext(it.next());
                                } else {
                                    subscriber.onComplete();
                                    break;
                                }
                            }
                        } catch (Exception e) {
                            subscriber.onError(e);
                        }
                    }

                    @Override
                    public void cancel() {

                    }
                });
            }
        };

        Subscriber<Integer> subscriber = new Subscriber<Integer>(){

            Flow.Subscription subscription;

            @Override
            public void onSubscribe(Flow.Subscription subscription) { // 반드시 호출되는 메소드
                System.out.println("onSubscription");
                this.subscription = subscription;
                this.subscription.request(1);
            }

            @Override
            public void onNext(Integer item) {
                System.out.println("onNext " + item);
                this.subscription.request(1);
            }

            @Override
            public void onError(Throwable throwable) {
                System.out.println("onError");
            }

            @Override
            public void onComplete() {
                System.out.println("onComplete");
            }
        };

        p.subscribe(subscriber);

    }
}
반응형