Java/Reactive

간단한 Flux method 테스트 (Reactive WebFlux)

체리필터 2021. 3. 26. 08:10
반응형

Reative의 개념은 간단하지만 실제 사용해보지 않으면 익숙해지기 쉽지 않은 것 같다.

우선은 작은 것 부터 사용해 보고, 하나씩 익숙해지는 시간이 필요한 듯 하여 간단하게 어떻게 동작하는지 확인해 본다.

아래 내용은 "실전! 스프링 5를 활용한 리액티브 프로그래밍"을 따라 하면서 작성한 것이다. - www.kyobobook.co.kr/product/detailViewKor.laf?ejkGb=KOR&mallGb=KOR&barcode=9791158391591&orderClick=LET&Kc=

 

실전! 스프링 5를 활용한 리액티브 프로그래밍 - 교보문고

오늘날 기업은 어떤 상황에도 높은 응답성을 유지할 수 있는 새로운 유형의 시스템이 필요합니다. 리액티브 프로그래밍을 이용하면 이를 달성할 수 있습니다. 이러한 시스템 개발은 복잡하며

www.kyobobook.co.kr

관련된 원 소스는 github.com/wikibook/spring5-reactive 에 있다.

package com.example.demo.reactive;

import org.junit.Test;
import reactor.core.publisher.Flux;

public class FluxTest {
    @Test
    public void anyTest() {
        Flux.just(3, 5, 7, 9, 11, 15, 17)
                .any(e -> e % 2 == 0)
                .subscribe(System.out::println);
    }

    @Test
    public void reduceTest() {
        // 1 + 2 + 3 + 4 + 5 = 15
        Flux.range(1, 5)
                .reduce(0, (acc, elem) -> acc + elem)
                .subscribe(System.out::println);
    }

    @Test
    public void scanTest() {
        // 1 + 2 + 3 + 4 + 5 = 15
        Flux.range(1, 5)
                .scan(0, (acc, elem) -> acc + elem)
                .subscribe(System.out::println);
    }

    @Test
    public void concatTest() {
        Flux.concat(
                Flux.range(1, 3),
                Flux.range(4, 2),
                Flux.range(6, 5)
        ).subscribe(System.out::println);
    }

    @Test
    public void bufferTest() {
        Flux.range(1, 13)
                .buffer(4)
                .subscribe(System.out::println);
    }
}

 

anyTest

any 메소드 내에서 조건식이 참인 것이 있는지 판별하는 것으로 보인다.

위의 코드의 경우에는 주어진 flux stream 내에서 짝수가 있는지 판별하는 것인데 없다면 false를 출력하고 있다면 true를 출력한다.

reduceTest

reduce라는 것이 줄이다라는 뜻인데 원소를 하나씩 처리 하면서 주어진 동작을 하는 것이다. 위의 소스의 경우에는 초기값이 0으로 주어졌고 기존 값에 새로운 원소를 더해 나가는 것으로 결과는 15가 출력될 것이다.

즉 "1 + 2 + 3 + 4 + 5 = 15"과 동일한 동작을 보여준다.

scanTest

scan도 reduce와 비슷하게 동작하는데, 다만 중간에 변화하는 상태를 보여준다. reduceTest가 결과값인 15만 보여준 것에 비해 scanTest는 아래와 같이 보여준다.

더보기

0
1
3
6
10
15

 

concatTest

여러개의 Stream을 이어 붙여 주는 역할을 한다. 위의 소스에서는 [1, 2, 3] 과 [4, 5] 와 [6, 7, 8, 9, 10] 이란 스트림을 이어 붙여 준 결과를 리턴한다. 즉 1 ~ 10을 리턴하게 된다.

 

bufferTest

Stream을 특정 버퍼 개수만큼 잘라 보여준다. 소스에서는 buffer를 4로 주었기 때문에 1 ~ 13까지의 스트림이 아래와 같은 모습으로 보여진다.

더보기

[1, 2, 3, 4]
[5, 6, 7, 8]
[9, 10, 11, 12]
[13]

13의 경우 4개가 차지 않았지만 Stream이 더 이상 없으므로 1개만 보여지게 된다.


package com.example.demo.reactive;

import org.junit.Test;
import reactor.core.publisher.Flux;

public class FluxTest {
    @Test
    public void windowTest() {
        Flux<Flux<Integer>> windowedFlux = Flux.range(101, 20).windowUntil(this::isPrime, true);

        windowedFlux.subscribe(window -> window.collectList().subscribe(System.out::println));
    }

    private boolean isPrime(Integer integer) {
        return (new java.math.BigInteger(String.valueOf(integer))).isProbablePrime(100);
    }

    @Test
    public void groupbyTest() {
        Flux.range(1, 7)
                .groupBy(e -> e % 2 == 0 ? "Even" : "Odd")
                .subscribe(groupFlux -> groupFlux
                        .scan(new LinkedList<>(), (list, elem) -> {
                            list.add(elem);
                            if (list.size() > 2) {
                                list.remove(0);
                            }
                            return list;
                        })
                        .filter(arr -> !arr.isEmpty())
                        .subscribe(data -> {
                            System.out.println("Key : " + groupFlux.key() + ", Data : " + data);
                        })
                );
    }
}

 

windowTest

bufferTest와 비슷하게 일정 단위로 잘라 처리하는 것을 windowing 한다고 하는 것 같다. 영어사전 정의에 보면 "1. [컴퓨터]윈도잉 ((두 개 이상의 서로 다른 데이터를 윈도를 사용하여 동시에 한 화면에 표시하는 것))" 이라고 나와 있다.

어째든 위의 코드는 isPrime 이란 메소드에서 정의한 내용을 기초로 그룹화 하고 있다. isPrime 메소드는 소수인지의 여부를 판단하고 있다. 참고로 소수란 "1과 그 수 자신 이외의 자연수로는 나눌 수 없는 자연수"이다.

그리고 위의 isPrime 메소드는 zetawiki.com/wiki/%EC%9E%90%EB%B0%94_isPrime() 에서 가져왔다.

windowUntil 메소드의 2번째 파라미터에 true를 넘긴 것은 어느 시점에 Stream을 잘라 그룹화 할 것인지 정의하는 파라미터이다. reactor.core.publisher.Flux.java 파일의 windowUntil 메소드를 보면 cutBefore 파라미터에 따라 FluxWindowPredicate Object 생성 시 다음과 같이 넘기고 있다.

cutBefore ? FluxBufferPredicate.Mode.UNTIL_CUT_BEFORE : FluxBufferPredicate.Mode.UNTIL

어째든 다시 정리해 보면 소수가 나오면, 소수 이전에 짤라서 그룹화를 하는 것이다.

테스트 코드를 돌린 결과는 아래와 같다.

더보기

[]
[101, 102]
[103, 104, 105, 106]
[107, 108]
[109, 110, 111, 112]
[113, 114, 115, 116, 117, 118, 119, 120]

101, 103, 107, 109, 113 등이 소수이므로 그 이전에서 잘라 새로운 Stream을 만들고 있다.

groupbyTest

groupBy 메소드는 조건에 따라 그룹화 하는 메소드로 보인다.

소스코드를 보면 1 ~ 7의 스트림 중에 홀수와 짝수를 그룹으로 분리하고 있다. 중간에 size가 2보다 큰 경우 첫 번째 요소를 제거하는 것을 제외하고 테스트를 돌려보면 다음과 같이 화면에 보이는 것을 볼 수 있다.

더보기

Key : Odd, Data : [1]
Key : Even, Data : [2]
Key : Odd, Data : [1, 3]
Key : Even, Data : [2, 4]
Key : Odd, Data : [1, 3, 5]
Key : Even, Data : [2, 4, 6]
Key : Odd, Data : [1, 3, 5, 7]

중간에 사이즈 체크하는 소스가 있다면 원소 수가 2개가 넘지 않도록 조절하기 때문에 다음과 같이 나오게 된다.

더보기

Key : Odd, Data : [1]
Key : Even, Data : [2]
Key : Odd, Data : [1, 3]
Key : Even, Data : [2, 4]
Key : Odd, Data : [3, 5]
Key : Even, Data : [4, 6]
Key : Odd, Data : [5, 7]


package com.example.demo.reactive;

import org.junit.Test;
import reactor.core.publisher.Flux;

public class FluxTest {
    @Test
    public void flatmapTest() {
        Flux.just("user-1", "user-2", "user-3")
                .flatMap(
                        u -> {
                            System.out.println("User : " + u);
                            Flux<String> bookFlux = requestBooks(u);
                            Flux<String> mapFlux = bookFlux.map(b -> u + "/" + b);

                            return mapFlux;
                        }
                ).subscribe(data -> {
                    System.out.println("Data : " + data);
                }, error -> {
                    System.out.println(error);
                })
        ;
    }

    Random random = new Random();
    private Flux<String> requestBooks(String user) {
        return Flux
                .range(1, random.nextInt(3) + 1)
                .map(i -> "book-" + i);
    }
}

flatmapTest

flatMap의 동작 원리를 보여주는 이미지이다. 관련해서 flatMapSequential, concatMap 등이 있는데 차이점은 다음의 링크에서 확인해 볼 수 있다.

tech.kakao.com/2018/05/29/reactor-programming/

두 개의 Stream을 하나로 합쳐 주는 기능인데 이것이 어떻게 동작하는지 소스를 통해 알 수 있다.

requestBooks에서 생성된 Flux와 flatmapTest에서 생성된 user Flux를 하나의 Flux로 합쳐주는 것이다.

실행하게 될 경우 random.nextInt에 의해 임의로 생성된 book의 개수만큼 user에 맵팽되게 된다. 따라서 실행 될 때마다 다른 값이 맵핑되게 된다.

더보기

실행 결과 1

User : user-1
Data : user-1/book-1
Data : user-1/book-2
Data : user-1/book-3
User : user-2
Data : user-2/book-1
Data : user-2/book-2
Data : user-2/book-3
User : user-3
Data : user-3/book-1
Data : user-3/book-2
Data : user-3/book-3

실행 결과 2

User : user-1
Data : user-1/book-1
Data : user-1/book-2
User : user-2
Data : user-2/book-1
Data : user-2/book-2
Data : user-2/book-3
User : user-3
Data : user-3/book-1
Data : user-3/book-2


package com.example.demo.reactive;

import org.junit.Test;
import reactor.core.publisher.Flux;

public class FluxTest {
    @Test
    public void sampleTest() {
        Flux.range(1, 1000)
//                .delayElements(Duration.ofNanos(10))
                .sample(Duration.ofNanos(100))
                .subscribe(System.out::println);
    }

    @Test
    public void doOnEachTest() {
        Flux.just(1, 2, 3)
                .concatWith(Flux.error(new RuntimeException("Conn error")))
                .doOnEach(System.out::println)
                .subscribe();
    }

    @Test
    public void signalTest() {
        Flux.range(1, 3)
                .doOnNext(e -> System.out.println("data : " + e))
                .materialize()
                .doOnNext(e -> System.out.println("signal : " + e))
                .dematerialize()
                .collectList()
                .subscribe(e -> System.out.println("result : " + e));
    }
}

SampleTest

전체 스트림 중 일부분만 추출하여 사용하고자 하는 경우 sample을 사용할 수 있다. 책에 나온 예제에서는 delay를 1 millimillisecond를 주었고 sample은 20millisecond 마다 가져오는 것으로 하였지만 컴퓨터의 성능 상 그 전에 끝나는 듯 하여 추출이 되지 않았다.

따라서 지연을 하지 않고 100nanosecond 마다 추출을 하는 것으로 테스트를 하면 아래와 같이 나온다. 물론 sample 이기에 실행 마다 다른 결과물이 나오게 된다.

더보기

1
162
205
246
288
329
374
419
460
498
523
556
600
646
726
809
862
948
1000

Process finished with exit code 0

doOnEachTest

sequence를 처리하는 동안 doOnNext, doOnError, doOnComplete, doOnSubscribe, doOnRequest, doOnCancel 등의 이벤트가 발생한다.

이러한 이벤트 발생 시 어떠한 일들이 발생하는지 확인하기 위해 doOnEach라는 것을 사용할 수 있다.

위의 소스코드를 실행하게 되면 아래처럼 1, 2, 3의 next event와 concatWith로 연결된 error 이벤트를 볼 수 있게 된다.

doOnEach_onNext(1)
doOnEach_onNext(2)
doOnEach_onNext(3)
onError(java.lang.RuntimeException: Conn error)
08:28:40.259 [main] ERROR reactor.core.publisher.Operators - Operator called default onErrorDropped
reactor.core.Exceptions$ErrorCallbackNotImplemented: java.lang.RuntimeException: Conn error
Caused by: java.lang.RuntimeException: Conn error
	at com.example.demo.reactive.FluxTest.doOnEachTest(FluxTest.java:117)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
	at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
	at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
	at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
	at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
	at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
	at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
	at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
	at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
	at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
	at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
	at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
	at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
	at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
	at com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:69)
	at com.intellij.rt.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:33)
	at com.intellij.rt.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:220)
	at com.intellij.rt.junit.JUnitStarter.main(JUnitStarter.java:53)

signalTest

Data stream을 Signal Stream으로 변환할 때 materialize를 다시 Data stream으로 되돌릴 때 dematerialize를 사용한다.

materialize는 stream을 signal로 바꿔 준다.
dematerialize는 signal을 stream으로 바꿔 준다.

위의 소스코드를 실행하면 아래와 같이 나온다.

더보기

data : 1
signal : onNext(1)
data : 2
signal : onNext(2)
data : 3
signal : onNext(3)
signal : onComplete()
result : [1, 2, 3]


package com.example.demo.reactive;

import org.junit.Test;
import reactor.core.publisher.Flux;

public class FluxTest {
    @Test
    public void pushTest() {
        Flux.push(emitter -> IntStream
                .range(2000, 3000)
                .forEach(emitter::next))
//            .delayElements(Duration.ofMillis(1))
                .subscribe(e -> log.debug("push data : {}", e));

        log.debug("======================================================");

        Flux.push(emitter -> {
            for (int i = 2000; i < 3000; i++) {
                emitter.next(i);

                if (i > 2500) {
                    emitter.complete();
                }
            }
        }).subscribe(e -> log.debug("push data : {}", e));
    }

    @Test
    public void generateTest() {
        Flux.generate(() -> Tuples.of(0L, 1L),
                (state, sink) -> {
                    log.debug("generated value : {}", state.getT2());
                    sink.next(state.getT2());
                    long newValue = state.getT1() + state.getT2();

                    return Tuples.of(state.getT2(), newValue);
                })
//                .delayElements(Duration.ofMillis(1))
                .take(7)
                .subscribe(e -> log.debug("onNext : {}", e));
    }
}

pushTest

배압(backpressure)가 구현된 reactive stream을 만든다 

위 그림에서 볼 수 있는 것 처럼 push 메소드는 reactive stream을 만드는데 배압까지 구현이 되도록 만든다는 것을 알 수 있다. subscribe를 한 이후 request 요청 개수만큼만 전달하고 있다.

어째든 이러한 개념을 생각해 두고 위 소스 코드를 실행하면 2000 ~ 2999까지 출력되는 것을 볼 수 있다.

11:50:48.849 [main] DEBUG com.example.demo.reactive.FluxTest - push data : 2000
11:50:48.851 [main] DEBUG com.example.demo.reactive.FluxTest - push data : 2001
11:50:48.851 [main] DEBUG com.example.demo.reactive.FluxTest - push data : 2002
11:50:48.851 [main] DEBUG com.example.demo.reactive.FluxTest - push data : 2003
11:50:48.851 [main] DEBUG com.example.demo.reactive.FluxTest - push data : 2004
...
11:50:48.876 [main] DEBUG com.example.demo.reactive.FluxTest - push data : 2998
11:50:48.877 [main] DEBUG com.example.demo.reactive.FluxTest - push data : 2999

돌아가는 내부의 모습을 좀 더 자세히 보기 위해 일부러 for 문을 돌리면서 emitter.complete를 호출하게 되면 그 상태에서 바로 종료가 되는 것을 볼 수 있다.

11:50:48.878 [main] DEBUG com.example.demo.reactive.FluxTest - push data : 2000
11:50:48.878 [main] DEBUG com.example.demo.reactive.FluxTest - push data : 2001
11:50:48.878 [main] DEBUG com.example.demo.reactive.FluxTest - push data : 2002
11:50:48.878 [main] DEBUG com.example.demo.reactive.FluxTest - push data : 2003
11:50:48.878 [main] DEBUG com.example.demo.reactive.FluxTest - push data : 2004
11:50:48.878 [main] DEBUG com.example.demo.reactive.FluxTest - push data : 2005
...
11:50:48.883 [main] DEBUG com.example.demo.reactive.FluxTest - push data : 2499
11:50:48.883 [main] DEBUG com.example.demo.reactive.FluxTest - push data : 2500
11:50:48.883 [main] DEBUG com.example.demo.reactive.FluxTest - push data : 2501

Process finished with exit code 0

 

참고로 Flux의 range는 아래 처럼 정의 되어 있다. 즉 두 번째 파라미터가 몇 개를 생성할지 정하는 count 이다.

	public static Flux<Integer> range(int start, int count) {
		if (count == 1) {
			return just(start);
		}
		if (count == 0) {
			return empty();
		}
		return onAssembly(new FluxRange(start, count));
	}

반면에 IntStream의 range는 endExclusive로 "마지막 숫자를 포함하지 않는 미만" 이란 의미이다.

    public static IntStream range(int startInclusive, int endExclusive) {
        if (startInclusive >= endExclusive) {
            return empty();
        } else {
            return StreamSupport.intStream(
                    new Streams.RangeIntSpliterator(startInclusive, endExclusive, false), false);
        }
    }

 

generateTest

초기값을 주고 값을 생성한 다음, next 이벤트를 발생시킨다.

Flux를 만들어 낼 수 있는 genrate라는 메소드가 있다.

위의 소스에서 state에 초기값 0, 1이 들어가 있고, 이 값 중 2번째 값을 이용해서 next 이벤트를 발생 시킨다. 다음 값으로는 2번째 값과 두 값을 합친 값을 Tuple로 넘기게 된다. 이런식으로 해서 피보나치 수열을 만들어 낼 수 있다.

중간에 next 메소드를 이벤트를 실행 시켰기 때문에, generated value와 subscribe 에서 보여지는 value가 바로 보여지게 된다.

take(7)을 이용해서 7번만 값을 취하게 만든다. 이 소스를 실행하게 되면 아래와 같이 출력된다.

14:22:38.202 [main] DEBUG reactor.util.Loggers - Using Slf4j logging framework
14:22:38.212 [main] DEBUG com.example.demo.reactive.FluxTest - generated value : 1
14:22:38.213 [main] DEBUG com.example.demo.reactive.FluxTest - onNext : 1
14:22:38.213 [main] DEBUG com.example.demo.reactive.FluxTest - generated value : 1
14:22:38.213 [main] DEBUG com.example.demo.reactive.FluxTest - onNext : 1
14:22:38.213 [main] DEBUG com.example.demo.reactive.FluxTest - generated value : 2
14:22:38.213 [main] DEBUG com.example.demo.reactive.FluxTest - onNext : 2
14:22:38.213 [main] DEBUG com.example.demo.reactive.FluxTest - generated value : 3
14:22:38.213 [main] DEBUG com.example.demo.reactive.FluxTest - onNext : 3
14:22:38.213 [main] DEBUG com.example.demo.reactive.FluxTest - generated value : 5
14:22:38.213 [main] DEBUG com.example.demo.reactive.FluxTest - onNext : 5
14:22:38.213 [main] DEBUG com.example.demo.reactive.FluxTest - generated value : 8
14:22:38.213 [main] DEBUG com.example.demo.reactive.FluxTest - onNext : 8
14:22:38.213 [main] DEBUG com.example.demo.reactive.FluxTest - generated value : 13
14:22:38.213 [main] DEBUG com.example.demo.reactive.FluxTest - onNext : 13

 

 

테스트 하면서 계속 덧 붙일 예정...

 

반응형
1 2 3 4 5