본문 바로가기
카테고리 없음

Rxjava - subscribeOn과 observeOn의 차이

by 너츠너츠 2023. 5. 9.

subscribeOn()과 observeOn()의 차이점

subscribeOn은 여러분 호출되더라도 맨 처음의 호출만 영향을 주며 어디에 위치하든 상관없습니다.

observeOn은 여러번 호출될 수 있으며 이후에 실행되는 연산에 영향을 주므로 위치가 중요합니다.

 

핵심은 observeOn()이 호출되는 위치입니다. observeOn()이 실행된 시점부터 뒷 로직들은 mainThread에서 돌아가서 시간이 오래 걸리는 로직의 경우 mainThread를 잡고 있어서 문제가 발생할 수 있습니다.

// # 1
myService.getUsers()
    .subscribeOn(Schedulers.io())
    .observeOn(AndroidSchedulers.mainThread())
    .flatMap(Observable::fromIterable)
    .filter(User::isMember)
    .map(this::saveToCache)
    .toList()
    .subscribe(View::showUser);
    
    
// # 2
myService.getUsers()
    .subscribeOn(Schedulers.io())
    .flatMap(Observable::fromIterable)
    .filter(User::isMember)
    .map(this::saveToCache)
    .toList()
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe(View::showUser);

 

예제 코드

ArrayList<MyShape> shapes = new ArrayList<>();
shapes.add(new MyShape("Red","Ball"));
shapes.add(new MyShape("Green","Ball"));
shapes.add(new MyShape("Blue","Ball"));

Observable.fromIterable(shapes)
	.subscribeOn(Schedulers.computation())             // (A)
	.subscribeOn(Schedulers.io())                      // (B)
	// 1. 현재 스레드(main)에서 Observable을 구독
	.doOnSubscribe(data -> MyUtil.printData("doOnSubscribe"))
	// 2. (A)에 의해 computation 스케줄러에서 데이터 흐름 발생, (B)는 영향 X
	.doOnNext(data -> MyUtil.printData("doOnNext", data))
	// 3. (C)에 의해 map 연산이 new thread에서 실행
	.observeOn(Schedulers.newThread())                  // (C)
	.map(data -> {data.shape = "Square"; return data;})
	.doOnNext(data -> MyUtil.printData("map(Square)", data))
	// 4. (D)에 의해 map 연산이 new thread에서 실행
	.observeOn(Schedulers.newThread())                  // (D)
	.map(data -> {data.shape = "Triangle"; return data;})
	.doOnNext(data -> MyUtil.printData("map(Triangle)", data))
	// 5. (E)에 의해 new thread에서 데이터 소비(subscribe)
	.observeOn(Schedulers.newThread())                  // (E)
	.subscribe(data -> MyUtil.printData("subscribe", data));
[실행결과]
main | doOnSubscribe |
RxComputationThreadPool-1 | doOnNext | MyShape{color='Red', shape='Ball'}
RxComputationThreadPool-1 | doOnNext | MyShape{color='Green', shape='Ball'}
RxComputationThreadPool-1 | doOnNext | MyShape{color='Blue', shape='Ball'}
RxNewThreadScheduler-1 | map(Square) | MyShape{color='Red', shape='Square'}
RxNewThreadScheduler-1 | map(Square) | MyShape{color='Green', shape='Square'}
RxNewThreadScheduler-1 | map(Square) | MyShape{color='Blue', shape='Square'}
RxNewThreadScheduler-2 | map(Triangle) | MyShape{color='Red', shape='Triangle'}
RxNewThreadScheduler-2 | map(Triangle) | MyShape{color='Green', shape='Triangle'}
RxNewThreadScheduler-2 | map(Triangle) | MyShape{color='Blue', shape='Triangle'}
RxNewThreadScheduler-3 | subscribe | MyShape{color='Red', shape='Triangle'}
RxNewThreadScheduler-3 | subscribe | MyShape{color='Green', shape='Triangle'}
RxNewThreadScheduler-3 | subscribe | MyShape{color='Blue', shape='Triangle'}
class MyShape{
    String color;
    String shape;

    MyShape(String color, String shape) {
        this.color = color;
        this.shape = shape;
    }

    @Override
    public String toString() {
        return "MyShape{" +
                "color='" + color + '\'' +
                ", shape='" + shape + '\'' +
                '}';
    }
}

class MyUtil {
    static void printData(String message) {
        System.out.println(""+Thread.currentThread().getName()+" | "+message+" | ");
    }

    static void printData(String message, Object obj) {
        System.out.println(""+Thread.currentThread().getName()+" | "+message+" | " +obj.toString());
    }
}

 

<참고>

https://choi-dev.tistory.com/m/139

https://4z7l.github.io/2020/12/18/rxjava-6.html

반응형

댓글