-
RxJava 리액티브 연산자 입문RxJava 2023. 2. 25. 15:50
RxJava 리액티브 연산자 입문
- ReactiveX 문서의 연산자 부분을 살펴보면 다양한 연산자(operator) 함수가 존재합니다.
- 리액티브 연산자의 특징은 언어 특성과 크게 연관이 없다는 것입니다. ReactiveX는 자바뿐만 아니라 자바스크립트, 닷넷, 스칼라, 클로저, 스위프트의 리액티브 연산자 목록을 함께 제공하기 때문입니다.
- 리액티브 연산자 분류
- 연산자
- 생성(Creating) 연산자 : Observable, Single 클래스 등으로 데이터의 흐름을 만들어내는 함수.
- 변환(Transforming) 연산자 : 어떤 입력을 받아서 원하는 출력 결과를 내는 전통적인 의미의 함수.
- 필터(Filter) 연산자 : 입력 데이터 중에 원하는 데이터만 걸러내는 함수.
- 합성(Combining) 연산자 : 생성, 변환, 필터 연산자가 주로 단일 Observable을 다룬다면, 합성 연산자는 여러 Observable을 조합하는 역할을 하는 함수.
- 오류 처리(Error Handling) 연산자 : 예외 처리를 담당하는 함수.
- 유틸리티(Utility) 연산자 : 주요 연산자로는 subscribeOn()과 observeOn() 등이 있으며, 비동기 프로그래밍을 지원
- 조건(Conditional) 연산자 : Observable의 흐름을 제어하는 역할.
- 수학과 집합형(Mathematical and Aggregate) 연산자 : 수학 함수와 연관 있는 연산자.
- 배압(Back pressure) 연산자 : 배압 이슈에 대응하는 연산자.
map() 함수
- map() 함수는 입력값을 어떤 함수에 넣어서 원하는 값으로 변환하는 함수.
ex) String을 String으로 변환 / String을 Integer나 다른 객체로 변환 - map() 함수는 입력 데이터와 그것을 변환해줄 함수를 이어주는 중개업자가 있다고 생각하자!
>> map() 함수의 마블 다이어그램- 앞서 말했던 중개업자는 마블 다이어그램의 map{} 사이에 있습니다.
>> map() 함수의 예시
public void marbleDiagram() { String[] balls = {"1", "2", "3", "5"}; Observable<String> source = Observable.fromArray(balls) .map(ball -> ball + "<>"); source.subscribe(System.out::println); } >>1<> >>2<> >>3<> >>5<>
>> map() 함수의 Example Code
@CheckReturnValue @SchedulerSupport(SchedulerSupport.NONE) public final <R> Observable<R> map(Function<? super T, ? extends R> mapper) { ObjectHelper.requireNonNull(mapper, "mapper is null"); return RxJavaPlugins.onAssembly(new ObservableMap<T, R>(this, mapper)); }
>> map() 함수의 원형
- @CheckReturnValue는 반환값을 확인한다는 의미입니다.
- @SchedulerSupport는 스케줄러를 지원하지 않는다는 의미 → 현재 스레드에서 실행한다는 의미입니다.
- map() 함수는 매개변수로 Fuction 인터페이스 타입을 받습니다.
Function<? super T, ? extends R> mapper
>> map() 함수의 인자
- 위 코드는 람다식인데, RxJava의 람다 표현식은 아래의 제네릭 함수형 인터페이스 중 하나로 나타냅니다.
Predicate<T> : t값을 받아서 참이나 거짓을 반환 ( boolean test(T t) ) Consumer<T> : t값을 받아서 처리하고 반환값은 없음 ( void accept(T t) ) Function<T, R> : t값을 받아서 결과 R 반환 ( R apply(T t) )
public void mapFunction() { Function<String, String> getDiamond = ball -> ball + "<>"; String[] balls = {"1", "2", "3", "5"}; Observable<String> source = Observable.fromArray(balls).map(getDiamond); source.subscribe(System.out::println); } >>1<> >>2<> >>3<> >>5<>
>> 위 marbleDiagram() 예제를 Function 인터페이스를 이용해 분리
- map() 함수는 입력 타입과 반환 타입이 반드시 같을 필요는 없습니다.
public void mappingType() { Function<String, Integer> ballToIndex = ball -> { switch(ball) { case "RED" : return 1; case "YELLOW" : return 2; case "GREEN" : return 3; case "BLUE" : return 5; default : return -1; } }; String[] balls = {"RED", "YELLOW", "GREEN", "BLUE"}; Observable<Integer> source = Observable.fromArray(balls).map(ballToIndex); source.subscribe(System.out::println); } >>1 >>2 >>3 >>5
flatMap() 함수
- flatMap() 함수는 map() 함수를 좀 더 발전시킨 함수입니다.
- map() 함수는 원하는 입력값을 어떤 함수에 넣어서 변환할 수 있는 일대일 함수
- flatMap() 함수는 원하는 입력값을 어떤 함수에 넣더라도 결과가 Observable로 나오는 일대다 혹은 일대일 Observable 함수
>> flatMap() 함수의 마블 다이어그램@CheckReturnValue @SchedulerSupport(SchedulerSupport.NONE) public final <R> Observable<R> flatMap(Function<? super T, ? extends ObservableSource<? extends R>> mapper) {}
>> flatMap() 함수의 원형- @CheckReturnValue는 반환값을 확인한다는 의미입니다.
- @SchedulerSupport는 스케줄러를 지원하지 않는다는 의미 → 현재 스레드에서 실행한다는 의미입니다.
- 위 원형을 압축해서 요약하면 Observable flatMap(Function <T, ObservableSource>)입니다. “T를 넣으면 여러 개의 R이 나오도록 매핑한다.”는 의미입니다.
public void marbleDiagram() { Function<String, Observable<String>> getDoubleDiamonds = ball -> Observable.just(ball + "<>", ball + "<>"); String[] balls = {"1", "3", "5"}; Observable<String> source = Observable.fromArray(balls) .flatMap(getDoubleDiamonds); //Observable<String> source = Observable.fromArray(balls) // .flatMap(ball -> Observable.just(ball + "<>", ball + "<>"); source.subscribe(System.out::println); } >>1<> >>1<> >>3<> >>3<> >>5<> >>5<>
>> Example Code
- Observable Type인 source는 balls 배열 값을 가져온 후 getDoubleDiamonds() 함수를 활용해 flatMap() 함수를 호출합니다.
- getDoubleDiamonds() 함수는 String을 넣으면 “여러 개의 String을 발행하는 Observable이 나오는구나”라고 생각하면 됩니다.
- getDoubleDiamonds() 함수는 ball을 입력받아서 ball + ”<>“을 두 번 발행합니다.
구구단 만들기
public void plainJava() { Scanner in = new Scanner(System.in); System.out.println("Gugudan Input : "); int dan = Integer.parseInt(in.nextLine()); for(int row = 1; row <= 9; row++) { System.out.println(dan + " * " + row + " = " + dan * row); } } >>Gugudan Input : 7 >>7 * 1 = 7 >>7 * 2 = 14 >>7 * 3 = 21 >>7 * 4 = 28 >>7 * 5 = 35 >>7 * 6 = 42 >>7 * 7 = 49 >>7 * 8 = 56 >>7 * 9 = 63
>> Java코드로 작성한 구구단
Step 1 : for문을 Observable로 변환하기
public void reactiveV1() { Scanner in = new Scanner(System.in); System.out.println("Gugudan Input : "); int dan = Integer.parseInt(in.nextLine()); Observable<Integer> source = Observable.range(1, 9); source.subscribe(row -> System.out.println(dan + " * " + row + " = " + dan * row)); } >>Gugudan Input : 7 >>7 * 1 = 7 >>7 * 2 = 14 >>7 * 3 = 21 >>7 * 4 = 28 >>7 * 5 = 35 >>7 * 6 = 42 >>7 * 7 = 49 >>7 * 8 = 56 >>7 * 9 = 63
>> for문 Observable 변환
- Observable.range(start, count)는 start 숫자부터 count 개수만큼의 숫자 값을 발행합니다.
Step 2 : 사용자 함수 정의하기
- 1개의 입력을 받아서 9개의 줄을 출력하는 함수가 필요합니다.
- map() 함수는 1개를 입력받아서 1개의 결과만 만들어내므로 이 예제에는 적합하지 않습니다.
- 따라서, 한 개의 Integer 값을 입력하면 여러 개의 String을 발행하는 flatMap() 함수를 활용하기에 적합합니다.
public void reactiveV2() { Scanner in = new Scanner(System.in); System.out.println("Gugudan Input : "); int dan = Integer.parseInt(in.nextLine()); Function<Integer, Observable<String>> gugudan = num -> Observable.range(1, 9).map(row -> num + " * " + row + " = " + num * row); Observable<String> source = Observable.just(dan).flatMap(gugudan); source.subscribe(System.out::println); } >>Gugudan Input : 7 >>7 * 1 = 7 >>7 * 2 = 14 >>7 * 3 = 21 >>7 * 4 = 28 >>7 * 5 = 35 >>7 * 6 = 42 >>7 * 7 = 49 >>7 * 8 = 56 >>7 * 9 = 63
>> flatMap() 함수를 활용한 코드 변환
- gugudan() 함수를 만들어 두면 subscribe() 메서드를 호출해 출력하기만 하면 됩니다. → 코드 재활용성 증대
- 디버깅 도구들을 추가해 예외 처리하기도 쉽습니다.
filter() 함수
- filter() 함수는 Observable에서 원하는 데이터만 걸러내는 역할을 합니다.
- 필요없는 데이터는 제거하고 필요한 데이터만 filter() 함수를 통과
>> filter() 함수의 마블 다이어그램
- 위 다이어그램을 보면 filter() 함수는 오직 동그란 모양의 원만 통과시킵니다.
>> filter() 함수의 예시- 위 예시를 보면 인자로 받은 x의 값이 10보다 큰 경우 통과됩니다.
public void marbleDiagram() { String[] objs = {"1 CIRCLE", "2 DIAMOND", "3 TRIANGLE", "4 DIAMOND", "5 CIRCLE", "6 HEXAGON"}; Observable<String> source = Observable.fromArray(objs).filter(obj -> obj.endsWith("CIRCLE")); source.subscribe(System.out::println); } >>1 CIRCLE >>5 CIRCLE
>> filter() 함수의 활용 예
@CheckReturnValue @SchedulerSupport(SchedulerSupport.NONE) public final Observable<T> filter(Predicate<? super T> predicate) {}
>> filter() 함수의 원형
- filter() 함수의 원형을 보면, 함수형 인터페이스인 Predicate를 인자로 넣습니다. Predicate는 진위 판별이라는 뜻이 있으며 boolean 값을 리턴하는 특수한 함수형 인터페이스입니다.
public void marbleDiagram() { String[] objs = {"1 CIRCLE", "2 DIAMOND", "3 TRIANGLE", "4 DIAMOND", "5 CIRCLE", "6 HEXAGON"}; Predicate<String> filterCircle = obj -> obj.endsWith("CIRCLE"); Observable<String> source = Observable.fromArray(objs).filter(filterCircle); source.subscribe(System.out::println); } >>1 CIRCLE >>5 CIRCLE
- filter() 함수와 비슷한 함수들
- first(default) 함수 : Observable의 첫 번째 값을 필터함. 만약 값없이 완료되면 대신 기본값을 리턴함.
- last(default) 함수 : Observable의 마지막 값을 필터함. 만약 값없이 완료되면 대신 기본갑을 리턴함.
- take(N) 함수 : 최초 N개 값만 가져옴.
- takeLast(N) 함수 : 마지막 N개 값만 가져옴.
- skip(N) 함수 : 최초 N개 값을 건너뜀.
- skipLast(N) 함수 : 마지막 N개 값을 건너뜀.
public void otherFilters() { Integer[] numbers = {100, 200, 300, 400, 500}; Single<Integer> single; Observable<Integer> source; //1. first single = Observable.fromArray(numbers).first(-1); single.subscribe(data -> System.out.println("first() value = " + data)); //2. last single = Observable.fromArray(numbers).last(999); single.subscribe(data -> System.out.println("last() value = " + data)); //3. take(N) source = Observable.fromArray(numbers).take(3); source.subscribe(data -> System.out.println("take(3) values = " + data)); //4. takeLast(N) source = Observable.fromArray(numbers).takeLast(3); source.subscribe(data -> System.out.println("takeLast(3) values = " + data)); //5. skip(N) source = Observable.fromArray(numbers).skip(2); source.subscribe(data -> System.out.println("skip(2) values = " + data)); //6. skipLast(N) source = Observable.fromArray(numbers).skipLast(2); source.subscribe(data -> System.out.println("skipLast(2) values = " + data)); } >>first() value = 100 >>last() value = 500 >>take(3) values = 100 >>take(3) values = 200 >>take(3) values = 300 >>takeLast(3) values = 300 >>takeLast(3) values = 400 >>takeLast(3) values = 500 >>skip(2) values = 300 >>skip(2) values = 400 >>skip(2) values = 500 >>skipLast(2) values = 100 >>skipLast(2) values = 200 >>skipLast(2) values = 300
>> filter() 함수와 비슷한 함수들의 활용 예
reduce() 함수
- reduce() 함수는 발행한 데이터를 모두 사용하여 어떤 최종 결과 데이터를 합성할 때 활용합니다.
- 함수형 프로그래밍의 가장 기본 연산자인 map/filter/reduce 패턴을 이루는 마지막 필수 함수입니다.
>> reduce() 함수의 마블 다이어그램
>> reduce() 함수의 예시public void marbleDiagram() { String[] balls = {"1", "3", "5"}; Maybe<String> source = Observable.fromArray(balls) .reduce((ball1, ball2) -> ball2 + "(" + ball1 + ")"); source.subscribe(System.out::println); } >> 5(3(1))
>> reduce() 함수의 활용 예
- reduce() 함수를 호출하면 인자로 넘긴 람다 표현식에 의해 결과 없이 완료될 수도 있습니다. 따라서 Observable이 아니라 Maybe 객체로 리턴됩니다.
Maybe 클래스 Single 클래스와 마찬가지로 최대 데이터 하나를 가질 수 있지만 데이터 발행 없이 바로 데이터 발생을 완료할 수도 있는 클래스.
@CheckReturnValue @SchedulerSupport(SchedulerSupport.NONE) public final Maybe<T> reduce(BiFunction<T, T, T> reducer) {}
>> reduce() 함수의 원형
- reduce() 함수의 원형을 보면, BiFunction 인터페이스(첫 번째 인자로 받은 T와 그것의 결과로 나오는 두 번째 T를 기반으로 새로운 Observable을 만드는 인터페이스)를 인자로 활용합니다.
public void marbleDiagram() { String[] balls = {"1", "3", "5"}; BiFunction<String, String, String> mergeBalls = (ball1, ball2) -> ball2 + "(" + ball1 + ")"; Maybe<String> source = Observable.fromArray(balls) .reduce(mergeBalls); source.subscribe(System.out::println); } >> 5(3(1))
>> reduce() 함수의 람다 표현식 분리
데이터 쿼리하기
map(), filter(), reduce() 함수를 활용하여 간단한 데이터 쿼리 예제 만들기.
상점에서 발생한 오늘 매출 - TV : $2,500 - Camera : $300 - TV : $1,600 - Phone : $800 오늘 발생한 TV 매출의 총합은? 1. 전체 매출 데이터를 입력함. 2. 매출 데이터 중 TV 매출을 필터링함. 3. TV 매출의 합을 구함.
public void run() { List<Pair<String, Integer>> sales = new ArrayList<>(); sales.add(Pair.create("TV", 2500)); sales.add(Pair.create("Camera", 300)); sales.add(Pair.create("TV", 1600)); sales.add(Pair.create("Phone", 800)); Maybe<Integer> tvSales = Observable.fromIterable(sales) .filter(sale -> "TV".equals(sale.getKey())) .map(sale -> sale.getValue()) .reduce((sale1, sale2) -> sale1 + sale2); tvSales.subscribe(tot -> System.out.println("TV Sales : $" + tot)); } >> TV Sales : $4100
>> TV 매출의 총합을 구하는 예제
- key로 String, value로 Integer인 Pair 타입의 List 객체를 정의합니다.
- 각각 TV, Camera, Phone 제품의 매출액을 List 객체에 추가합니다.
- TV의 매출액은 int 타입이므로 Maybe로 정의합니다.
- Observable이 아니라 Maybe 클래스를 사용하는 이유는 결과 계산을 위한 reduce() 함수를 호출하기 위해서.
- TV의 매출을 취합하기 위해 filter 함수를 활용합니다.
- Pair 클래스에서 왼쪽 값(Key)을 얻기 위해서는 getLeft()/getKey() 메서드를, 오른쪽 값을 얻기 위해서는 getRight()/getValue() 메서드를 호출합니다.
- map() 함수를 호출하여 매출액만 매핑합니다.
- reduce() 함수를 활용하여 두 값을 더해줍니다.
'RxJava' 카테고리의 다른 글
Reactive - 결합 연산자 (1) 2023.02.26 Reactive - 생성 연산자 (0) 2023.02.26 Reactive - 조건 연산자 (0) 2023.02.26 Reactive - 변환 연산자 (0) 2023.02.26 Rxjava Observable 만들어보기! (0) 2023.02.24