개발자를위한레디스 6장 - 레디스를 메시지 브로커로로 사용하기
레디스를 메시지 브로커로로 사용하기
레디스를 메시지 브로커로로 사용하기
메시지 브로커는 크게 메시징 큐와 이벤트 스트림으로 나눌 수 있습니다.
메시징 큐와 이벤트 스트림
메시징 큐
생산자
(producer)는 소비자의 큐로 데이터를 직접 푸시소비자
(consumer)가 데이터를 읽어갈 때 큐에서 데이터를 삭제- 1:1 상황에서 한 서비스가 다른 서비스에게 동작을 지시할 때 유용
이벤트 스트림
생산자
(publisher)는 스트림의 특정 저장소에 하나의 메시지를 보낼 수 있고, 소비자들은 스트림에서 같은 메시지를 풀(pull)구독자
(subscriber)가 읽어간 데이터는 저장소의 설정에 따라 특정 기간 동안 저장될 수 있음- n:n 상황에서 유리
레디스를 메시지 브로커로 사용하기
레디스에서 제공하는 pub/sub를 사용하면 빠르고 간단한 방식으로 메시지를 전달할 수 있는 메시지 브로커를 구현할 수 있습니다.
- 일회성 : 모든 데이터는 한 전 채널 전체에 전파된 뒤 삭제.
=> fire-and-forget 패턴이 필요한 상황에서 유용 (ex. 알림 서비스)
레디스의 자료구조를 활용해서 메시지 브로커를 구현할 수 있습니다.
- 메시징 큐: list 이용
- 스트림 플랫폼: stream 이용
레디스의 pub/sub
- 레디스에서 pub/sub은 매우 가벼움
- 최소한의 메시지 전달 기능만 제공하며 메타데이터(어떤 구독자가 메시지를 읽어가는지, 정상적으로 모든 구독자에게 메시지가 전달됐는지 등)는 저장하지 않음
- 단순히 메시지의 통로 역할만 수행
메시지 publish하기
hello라는 채널을 수신하고 있는 모든 서버들에 world라는 메시지가 전파됩니다.
1
2
> PUBLISH hello world
(integer) 1 # 메시지를 수신한 구독자의 수
메시지 구독하기
SUBSCRIBE
커맨드를 통해 특정 채널을 구독합니다. (event1, event2 동시 구독)
1
2
3
4
5
6
7
8
> SUBSCRIBE event1 event2
Reading messages... (press Ctrl-C to quit)
1) "subscribe"
2) "event1"
3) (integer) 1
1) "subscribe"
2) "event2"
3) (integer) 2
PSUBSCRIBE를 통해 앞부분이 mail-로 시작하는 모든 채널에 전파된 메시지를 모두 수신할 수 있습니다.
1
2
3
4
5
> PSUBSCRIBE mail-*
Reading messages... (press Ctrl-C to quit)
1) "psubscribe"
2) "mail-*"
3) (integer) 1
클러스터 구조에서의 pub/sub
클러스터
는 레디스가 자체적으로 제공하는 데이터 분산 형태의 구조를 의미합니다.
레디스 7 이하 버전에서는 하나의 노드에 메시지를 발행하면 메시지는 모든 노드에 전파됩니다. 따라서, 모든 레디스 노드에 복제되는 방식이므로 불필요한 리소스 사용과 네트워크 부하가 발생할 수 있습니다.
sharded pub/sub
위와 같은 비효율을 해결하기 위해 레디스 7.0
부터 sharded pub/sub 기능이 도입되었습니다.
SPUBLISH 커맨드로 발행된 메시지는 모든 노드에 전파되지 않으며, 노드의 복제본에만 전달됩니다.
1
2
3
4
10.0.0.1:6379> SPUBLISH apple a
-> Redirected to slot [7092] located at 10.0.0.2:6379
(integer) 1
10.0.0.2:6379>
데이터를 전파하려고 할 때, 연결된 노드에서 지정한 채널에 전파할 수 없다는 메시지와 함께 연결된 노드로 리다이렉트됩니다.
1
2
3
4
5
6
7
8
9
10
10.0.0.1:6379> SSUBSCRIBE apple
Reading messages... (press Ctrl-C to quit)
-> Redirected to slot [7092] located at 10.0.0.2:6379
Reading messages... (press Ctrl-C to quit)
1) "ssubscribe"
2) "apple"
3) (integer) 1
1) "smessage"
2) "apple"
3) "a"
apple 채널은 apple 키 값을 할당받을 수 있는 슬롯을 포함한 마스터 노드에 연결될 수 있도록 리다이렉트됩니다.
레디스의 list를 메시징 큐로 사용하기
list는 큐로 사용하기 적절한 자료 구조
list의 EX 기능
- 트위터는 각 유저의 타임라인 캐시 데이터를 레디스에서 list 자료 구조로 관리
- 유저A가 새로운 트윗을 작성하면 그 데이터는 A를 팔로우하는 유저의 타임라인 캐시에 저장
RPUSH는 list가 이미 존재할 때에만 아이템을 추가합니다. (자주 들어오지 않는 유저의 캐시라인은 관리할 필요가 없으므로)
1
2
3
4
5
6
7
8
> RPUSHX Timelinecache:userB data3
(integer) 26
> RPUSH Timelinecache:userC data3
(integer) 5
> RPUSH Timelinecache:userD data3
(integer) 0
list의 블로킹 기능
레디스르 이벤트 큐로 사용할 경우 블로킹 기능을 유용하게 사용 가능합니다. 이벤트 기반 구조에서 이벤트 루프를 돌며 신규로 처리할 이벤트가 있는지 체크합니다. 이벤트 루프는 이벤트 큐에 새 이벤트가 있는지 체크하며, 새로운 이벤트가 없을 경우 정해진 시간(polling interval)동안 대기한 뒤 다시 이벤트 큐에 데이터가 있는지 확인하는 과정을 반복합니다.
이러한 작업을 ‘polling’이라고 하며, 폴링 프로세스가 진행되는 동안 어플리케이션과 큐의 리소스가 불필요하게 소모될 수 있습니다. 또한 이벤트 큐에 이벤트가 들어왔을 수 있지만, 폴링 인터벌 시간 동안은 대기한 뒤 다시 확인하는 과정을 거치기 때문에 이벤트를 즉시 처리할 수 없는 단점이 있습니다.
이때, 리스트의 블로킹 기능을 사용하면 불필요함을 줄일 수 있습니다.
- 예시 : queue:a에 데이터가 입력될 때까지 최대 5초 동안 대기하고, 5초가 경과하면 nil을 반환
1
2
3
> BRPOP queue:a 5
1) "queue:a"
2) "data"
BRPOP은 2개의 데이터를 반환 (키 값, 반환된 데이터의 값)
1
2
3
4
> BRPOP queue:a queue:b queue:c timeout 1000
1) "queue:b"
2) "DATA" # queue:b에 신규로 들어온 DATA라는 값을 반환
(19.89s) # 19.98초 동안 세 리스트에 데이터가 입력되는 것을 기다림
BRPOP은 1,000초 동안 queue:a, queue:b, queue:c 중 어느 하나라도 데이터가 들어올 때까지 기다린 뒤, 그 중 하나의 리스트에 데이터가 들어오면 해당 값을 읽어옵니다.
list를 이용한 원형 큐
특정 아이템을 계속해서 반복 접근해야 하는 클라이언트, 혹은 여러 개의 클라이언트가 병렬적으로 같은 아이템에 접근해야 하는 클라이언트에서는 원형 큐를 이용해 아이템을 처리할 수 있습니다.
list에서 RPOPLPUSH
커맨드를 사용해서 간편하게 원형 큐를 사용할 수 있습니다.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
> LPUSH clist A
(integer) 1
> LPUSH clist B
(integer) 2
> LPUSH clist C
(integer) 3
> LRANGE clist 0 -1 # CBA
1) "C"
2) "B"
3) "A"
> RPOPLPUSH clist clist
"A"
> LRANGE clist 0 -1 # ACB
1) "A"
2) "C"
3) "B"
Stream
레디스의 Stream과 아파치 카프카
레디스 5.0에서 새로 추가된 자료 구조인 Stream은 대용량, 대규모의 메시징 데이터를 빠르게 처리할 수 있도록 설계되었습니다.
스트림이란?
스트림이란 연속적인 데이터의 흐름, 일정한 데이터 조각의 연속을 의미합니다.
메시지(데이터)의 저장과 식별
- 카프카에서 스트림 데이터는 토픽이라는 개념에 저장
- 레디스에서는 하나의 stream 자료 구조가 하나의 stream을 의미.
- 레디스 stream에서 각 메시지는 시간과 관련된 유니크한 ID를 가지며, 이 값은 중복되지 않음(
- )( - <같은 milliseconds에="" 저장된="" 순서="">)같은>
스트림 생성과 데이터 입력
카프카에서는 각 스트림은 토픽이라는 이름으로 관리됩니다. producer는 데이터를 토픽에 푸시하며, consumer는 토픽에서 데이터를 읽습니다.
1
2
3
4
5
6
7
-- 토픽 생성
$ kafka-topics --zookeeper 127.0.0.1:6000 --topic Email --create partitions 1 --replication-factor 1
-- 데이터 추가
$ kafka-console-consuer --brokers-list 127.0.0.1:7000 --topic Email
> "I am first email"
> "I am second email"
레디스에서는 XADD 커맨드를 이용해 새로운 이름의 stream에 데이터를 저장하면 데이터의 저장과 동시에 stream 자료 구조가 생성됩니다.
1
2
> XADD Email * subject "first" body "hello?"
"1659114481311-0"
데이터 조회
카프카
에서 소비자는 특정 토픽을 실시간으로 리스닝하며, 새롭게 토픽에 저장되는 메시지를 전달받을 수 있습니다.(--from-beginning
은 카프카에 저장되어있는 모든 데이터를 처음부터 읽겠다를 의미)1
$ kafka-console-producer --bootstrap-server 127.0.0.1:7000 --topic Email --from-beginning
레디스
stream에서는 실시간으로 처리되는 데이터를 리스닝하거나, ID를 이용해 필요한 데이터를 검색할 수 있습니다.
실시간 리스닝
XREAD 커맨드를 통해 실시간으로 stream에 저장되는 데이터를 읽어올 수 있습니다. BLOCK milliseconds가 0이면 가져올데이터가없더라도 무한 listening입니다. 1000이면 1초를 기다립니다.
1
2
3
4
XREAD [COUNT count] [BLOCK milliseconds] STREAMS key [key ...] ID [ID ...]
# 가져올 데이터가 없더라도 무한 listeneing. Email stream key에서 ID가 0보다 큰값 read.
> XREAD BLOCK 0 STREAMS Email 0
특정한 데이터 조회
XRANGE 커맨드를 통해 ID를 이용해 원하는 시간대의 데이터를 조회할 수 있습니다.
1
2
XRANGE key start end [COUNT count]
XREVRANGE key end start [COUNT count]
소비자와 소비자 그룹
팬아웃(fan-out) : 같은 데이터를 여러 소비자에게 전달하는 것
- 카프카에서는 같은 토픽을 여러 개의 소비자가 일거악게 함으로써 간단하게 팬아웃할 수 있음
- 레디스 stream에서도 XREAD 커맨드를 여러 소비자가 수행한다면 팬아웃이 가능
- 카프카에서는 소비자 그룹에 여러 소비자를 추가할 수 있으며, 이때 소비자는 토픽 내의 파티션과 일대일로 연결
- 레디스 stream은 카프카와는 달리 메시지가 전달되는 순서를 신경 쓰지 않아도 된다. 소비자 그룹 내의 한 소비자는 다른 소비자가 아직 읽지 않은 데이터만을 읽음
레디스 stream에서 XGROUP 커맨드로 소비자 그룹을 생성할 수 있습니다.
1
2
> XGROUP CREATE <stream key> <group name> $
> XGROUP CREATE Email EmailServiceGroup $ # 현재시점에서 소비자 그룹 생성.('$'는 현재시점을 의미)
ACK와 보류 리스트
메시지 브로커는 각 소비자에게 어떤 메시지까지 전달됐고, 전달된 메시지의 처리 유무를 인지하고 있어야 합니다.
메시지의 재할당
XCLAIM 커맨드를 사용할 때에는 최소 대기 시간을 지정해야합니다. 이 메시지가 보류 상태로 머무른 시간이 최소 대기 시간을 초과한 경우에만 소유권을 변경할 수 있도록 해서 같은 메시지가 2개의 다른 소비자에게 중복으로 할당되는 것을 방지할 수 있습니다.
1
XCLAIM <key> <group> <consumer> <min-idle-time> <ID-1> <ID-2> ... <ID-N>
메시지의 자동 재할당
소비자가 직접 보류했던 메시지 중 하나를 자동으로 가져와 처리할 수 있도록 합니다. 할당 대기 중인 다음 메시지의 ID를 반환하는 방식으로 동작하기 때문에 반복적 호출이 가능합니다.
1
XAUTOCLAIM <key> <group> <consumer> <min-idle-time> <start> [COUNT count] [JUSTID]
메시지의 수동 재할당
메시지를 재할당할 경우 1씩 증가.(counter++). 하지만 계속 할당되면서 counter가 특정 값에 도달하면 이 메시지를 특수한 다른 stream으로 보내, 관리자가 추후에 처리할 수 있도록 한다. 이러한 메시지를 dead letter
라고함.
stream 상태 확인
XINFO 커맨드를 통해 stream의 여러 상태를 확인할 수 있고, 사용할 수 있는 기능은 help 커맨드로 확인할 수 있습니다