본문 바로가기
부트캠프/백

Reactive Programming

by 티코딩 2023. 1. 28.
public class HelloReactorExample {
    public static void main(String[] args) throws InterruptedException {
        Flux    // (1)
            .just("Hello", "Reactor")               // (2)
            .map(message -> message.toUpperCase())  // (3)
            .publishOn(Schedulers.parallel())       // (4)
            .subscribe(System.out::println,         // (5)
                    error -> System.out.println(error.getMessage()),  // (6)
                    () -> System.out.println("# onComplete"));        // (7)

        Thread.sleep(100L);
    }
}

그간 내가 배웠던 Spring MVC기반 애플리케이션은 명령형프로그래밍이었던것에 반해, 오늘배운 Reactive Programming은 선언형 프로그래밍이다.

사실 명령형, 선언형 프로그래밍이란 말이 생소했다.

명령형 프로그래밍이란, 코드가 어떻게 실행되어야 하는지 구체적인 로직이 코드에 드러난다. 예시로는 리스트선언후, 향상된 for문을 이용해 넣는 방식이 나왔다. 

선언형 프로그래밍이란, 필요한 동작을 람다 표현식으로 정의후, 동작수행은 연산메서드(Operation) 체인에게 위임한다. 예시로는 리스트를 선언하고, Java8 부터 지원하는 Stream을 사용해 for문을 대체했다. stream을 사용해 내부에 보이지않는 반복자를 사용한것이다.

 

ㅇ 리액티브 시스템(Reactive System)

Reactive라는건 무슨 반응을 말하는 것일까?

-> 클라이언트의 요청에 대한 반응이다.

리액티브 시스템은 클라이언트의 요청에 대한 응답 대기 시간을 최소화 할 수 있도록 요청 쓰레드가 차단되지 않게 함으로써(Non-Blocking) 클라이언트에게 즉각적으로 반응하도록 구성된 시스템이라고 볼 수 있다.

 

ㅇ 리액티브 스트림즈(Reactive Streams)

리액티브 프로그래밍을 위한 표준사양. 컴포넌트로는 크게 Publisher, Subscriber, Subscription, Processor이 있다.

Publisher - 데이터소스로부터 데이터를 내보낸다.(emit 한다.)

Subscriber - emit된 데이터를 소비하는 역할을함.

Subscription - Subscriber의 구독 자체를 표현한 인터페이스.

Processor - Publisher,Subscriber 둘다 상속받으며 둘의 역할을 동시에 한다.

 

ㅇ 리액티브 스트림즈의 구현체

Project Reactor, RxJava, Java Flow API

나는 Project Reactor(이하 Reactor)에 대해 배웠다.

 

ㅇ Reactor

리액티브 스트림즈 표준사양을 구현한 구현체중 하나. Spring 5 부터 지원한다. Reactor는 완전한 Non-Blocking을 지원함. 

Publisher타입으로 Mono, Flux를 제공함. Mono는 0개 혹은 1개의 데이터를 처리하고, Flux는 여러개의 데이터를 처리한다.

public class HelloReactorExample {
    public static void main(String[] args) throws InterruptedException {
        Flux    // (1)
            .just("Hello", "Reactor")               // (2)
            .map(message -> message.toUpperCase())  // (3)
            .publishOn(Schedulers.parallel())       // (4)
            .subscribe(System.out::println,         // (5)
                    error -> System.out.println(error.getMessage()),  // (6)
                    () -> System.out.println("# onComplete"));        // (7)

        Thread.sleep(100L);
    }
}

스케줄러 - Reactor Sequece상 처리되는 동작들을 하나 이상의 쓰레드에서 동작하도록 별도의 쓰레드를 제공해줌. 복잡한 멀티쓰레딩 프로세스를 단순하게 해준다.

스케줄러 전용 Operator - subscribeOn() : 구독직후에 수행, publishOn() : publishOn()기준으로 Downstream쪽 쓰레드가 publishOn()에서 스케줄러로 지정한 쓰레드로 변경됨.

 

 

ㅇ Reactor의 다양한 Operator

Reactor은 Operator로 시작해 Operator로 끝난다.

중요한것만 정리하자면, 일단 먼저 상황별 분류해서 보자

ㅇ 새로운 Sequence를 생성하고자 할경우

fromStream()

fromIterable()

create()

 

ㅇ 기존 Sequence에서 변환 작업이 필요한 경우

map()

flatMap()

concat()

zip()

 

ㅇ Sequence 내부의 동작을 확인 하고자 할 경우

doOnNext()

log()

 

ㅇ Sequence에서 데이터 필터링이 필요한 경우

filter()

take()

 

ㅇ 에러를 처리 하고자 할 경우

error()

timeout()

 

'부트캠프 > ' 카테고리의 다른 글

클라우드 컴퓨팅, AWS의 중요개념들  (1) 2023.02.01
Section3-1  (0) 2023.01.29
12.6 Framework, SpringFramework, POJO  (0) 2022.12.10
11.29 네트워크,HTTP  (0) 2022.11.29
11.24 시간복잡도, Greedy, 구현  (0) 2022.11.25