Cold sequence/Hot sequence

2025. 1. 30. 19:35·개념 저장소/spring reactive

1.Sequence란

Sequence는 시간의 흐름에 따라 순차적으로 발생하는 데이터 스트림을 의미합니다.

Reactor에서 제공하는 Flux와 Mono는 데이터 흐름의 성격에 따라 Hot Sequence 또는 Cold Sequence로 분류됩니다.

  • Cold Sequence: Subscriber가 구독할 때마다 새로운 데이터 스트림을 생성하는 방식
  • Hot Sequence: 데이터 스트림이 이미 동작하고 있으며, Subscriber가 중간에 참여하는 방식

2. Cold Sequence: Subscriber별로 독립적인 데이터 스트림

2.1 특징

Cold Sequence는 구독할 때마다 새로운 데이터 스트림을 생성합니다. Subscriber마다 별도의 데이터 소스를 통해 독립적인 데이터를 받게 됩니다.

2.2 동작 방식

@Slf4j
public class ColdSequenceExample {
	public static void main(String[] args) {
		Flux<String> coldFlux = Flux.fromIterable(Arrays.asList("RED", "YELLOW", "PINK"))
			.map(String::toLowerCase);

		coldFlux.subscribe(country -> log.info("# Subscriber1: {}", country));
		log.info("-------------------------");
		coldFlux.subscribe(country -> log.info("# Subscriber2: {}", country));
	}
}

// 출력
19:28:54.295 [main] INFO org.jamin.reactive01.section5.ColdSequenceExample -- # Subscriber1: red
19:28:54.298 [main] INFO org.jamin.reactive01.section5.ColdSequenceExample -- # Subscriber1: yellow
19:28:54.298 [main] INFO org.jamin.reactive01.section5.ColdSequenceExample -- # Subscriber1: pink
19:28:54.299 [main] INFO org.jamin.reactive01.section5.ColdSequenceExample -- -------------------------
19:28:54.299 [main] INFO org.jamin.reactive01.section5.ColdSequenceExample -- # Subscriber2: red
19:28:54.299 [main] INFO org.jamin.reactive01.section5.ColdSequenceExample -- # Subscriber2: yellow
19:28:54.299 [main] INFO org.jamin.reactive01.section5.ColdSequenceExample -- # Subscriber2: pink

각 Subscriber가 독립적으로 같은 데이터를 받습니다.

2.3 장점과 단점

장점:

  • 각 Subscriber가 독립적으로 데이터를 처리할 수 있어 데이터 무결성이 보장됨
  • 필요한 시점에 데이터를 요청하므로 지연 평가가 가능

단점:

  • 동일한 데이터를 여러 번 생성해야 하므로 리소스 낭비 가능성이 있음
  • 실시간 데이터 스트림에는 적합하지 않음

3. Hot Sequence: 공유되는 데이터 스트림

3.1 특징

Hot Sequence는 데이터 스트림이 이미 동작 중이며, 구독자는 중간에 합류할 수 있습니다.이미 생성된 데이터 스트림을 여러 Subscriber가 공유하는 방식입니다.

3.2 동작 방식

@Slf4j
public class HotSequenceExample {
	public static void main(String[] args) throws InterruptedException {
		Flux<String> concertFlux =
			Flux.fromStream(Stream.of("Singer A", "Singer B", "Singer C", "Singer D", "Singer E"))
				.delayElements(Duration.ofSeconds(1)).share();  //  share() 원본 Flux를 여러 Subscriber가 공유한다.

		concertFlux.subscribe(singer -> log.info("# Subscriber1 is watching {}'s song.", singer));

		Thread.sleep(2500);

		concertFlux.subscribe(singer -> log.info("# Subscriber2 is watching {}'s song.", singer));

		Thread.sleep(3000);
	}
}

// 출력
19:32:53.511 [parallel-1] INFO org.jamin.reactive01.section5.HotSequenceExample -- # Subscriber1 is watching Singer A's song.
19:32:54.523 [parallel-2] INFO org.jamin.reactive01.section5.HotSequenceExample -- # Subscriber1 is watching Singer B's song.
19:32:55.527 [parallel-3] INFO org.jamin.reactive01.section5.HotSequenceExample -- # Subscriber1 is watching Singer C's song.
19:32:55.527 [parallel-3] INFO org.jamin.reactive01.section5.HotSequenceExample -- # Subscriber2 is watching Singer C's song.
19:32:56.540 [parallel-4] INFO org.jamin.reactive01.section5.HotSequenceExample -- # Subscriber1 is watching Singer D's song.
19:32:56.540 [parallel-4] INFO org.jamin.reactive01.section5.HotSequenceExample -- # Subscriber2 is watching Singer D's song.
19:32:57.549 [parallel-5] INFO org.jamin.reactive01.section5.HotSequenceExample -- # Subscriber1 is watching Singer E's song.
19:32:57.549 [parallel-5] INFO org.jamin.reactive01.section5.HotSequenceExample -- # Subscriber2 is watching Singer E's song.

이전 데이터를 다시 받을 수 없고, 현재 진행 중인 스트림만 받을 수 있음을 알 수 있습니다.

3.3 장점과 단점

장점:

  • 리소스를 절약할 수 있음 (한 번의 데이터 생성으로 여러 Subscriber가 공유)
  • 실시간 데이터 스트림 처리에 유리 (예: WebSocket, Kafka, Sensor Data)

단점:

  • Subscriber가 중간에 합류하면 이전 데이터를 놓칠 수 있음
  • 스트림이 종료되지 않으면 Subscriber가 언제까지나 기다려야 할 수도 있음

4. Cold Sequence와 Hot Sequence의 변환

4.1 Cold -> Hot 변환

Cold Sequence를 Hot Sequence로 변환하려면 publish() 또는 share()를 사용합니다.

Flux<Integer> coldFlux = Flux.range(1, 5).publish().autoConnect();
Flux<Integer> hotFlux = Flux.range(1, 5).share();

 

4.2 Hot -> Cold 변환

Hot Sequence를 다시 Cold Sequence로 만들려면 cache() 메서드를 사용합니다.

Flux<Integer> coldAgain = hotFlux.cache();

 


 

'개념 저장소 > spring reactive' 카테고리의 다른 글

SINKS  (0) 2025.02.11
backpressure  (0) 2025.02.01
Mono와 Flux  (0) 2025.01.29
리액티브 스트림 구성요소  (0) 2025.01.25
리액티브 시스템이란  (1) 2025.01.21
'개념 저장소/spring reactive' 카테고리의 다른 글
  • SINKS
  • backpressure
  • Mono와 Flux
  • 리액티브 스트림 구성요소
jamin
jamin
  • jamin
    jamin
    jamin
  • 전체
    오늘
    어제
    • 전체 (82)
      • 트러블슈팅 (31)
        • 회사 (25)
        • 개인 (6)
      • 개념 저장소 (19)
        • coroutine (10)
        • spring reactive (9)
        • network (0)
      • 코딩 테스트 (31)
  • 태그

    DP
    코딩테스트
    instancio
    spring boot
    분리집합
    백준 23758
    cluster mode
    코테
    경로압축
    정렬
    대용량 데이터
    그리디
    error alram
    coroutine
    그리디 알고리즘
    누적합
    Kotlin
    mirroring mode
    BFS
    다이나믹 프로그래밍
    batch
    백준
    sinks
    JPA
    다익스트라
    reactive
    수학
    spring reactive
    dfs
    log
  • 최근 글

  • hELLO· Designed By정상우.v4.10.3
jamin
Cold sequence/Hot sequence
상단으로

티스토리툴바