take(pattern)과 차이점은, 사가가 아직 처리할 준비가 되지 않았다면 (예를 들어 API 호출에 block됨)
actionChannel은 들어오는 메세지를 버퍼링 할 수 있음
eventChannel 팩토리를 사용해 외부 이벤트 연결하기
actionChannel처럼, eventChannel은 리덕스 스토어가 아닌 외부 이벤트를 위한 채널을 생성.
일정 간격마다 채널 생성 하기 예제
import { eventChannel, END } from 'redux-saga';
function countdown(secs) {
return eventChannel(emitter => {
const iv = setInterval(() => {
secs -= 1;
if(secs > 0) {
emitter(secs)
} else {
// 채널이 닫히도록 함.
emitter(END)
}
}, 1000);
// subscriber는 unsubscribe 함수를 리턴해야함.
})
}
eventChannel
첫번째 인자는 subscriber 함수
구독자의 역할은
외부 이벤트 소스 초기화 (setInterval)
제겅된 emitter를 실행하여 소스에서 채널로 들어오는 모든 이벤트를 라우팅.
매 초마다 emitter를 호출.
emitter(END) : 채널 소비자에게 채널 폐쇄됨을 알림.
채널 사용 방법 보자
// 초 간격으로 이벤트 채널 생성하기
function countdown(seconds) { ... }
export function* saga() {
const chan = yield call(countdown, value)
try {
while (true) {
finally block으로 점프하여 sga가 종료하도록 유도
// take(END) will cause the saga to terminate by jumping to the finally block
let seconds = yield take(chan)
console.log(`countdown: ${seconds}`)
}
} finally {
console.log('countdown terminated')
}
}
take(chan)을 yield하고 있음.
메시지가 채널에 들어가기 전까지 사가는 block됨.
// creates an event Channel from an interval of seconds
function countdown(seconds) { ... }
export function* saga() {
const chan = yield call(countdown, value)
try {
while (true) {
let seconds = yield take(chan)
console.log(`countdown: ${seconds}`)
}
} finally {
if (yield cancelled()) {
chan.close()
console.log('countdown cancelled')
}
}
}
countdown의 interval 종료시, countdown 함수는 emitter(END)를 호출하여 이벤트 채널 폐쇄
채널 닫으면 take에 block된 모든 사가들을 종료시키는 효과 있음.
사가 종료시 finally 구간으로 점프.
구독자는 unsubscribe 함수를 반환.
이벤트 소스가 완료되기 전에 채널 구독을 취소하는데 사용
이벤트 채널의 메시지를 소비하는 사가 내에서 이벤트 소스 완료되기 전에 일찍 나가기를 원하면 chan.close()를 호출해 채널 폐쇄, 구독 취소 가능.
사가 간 통신체 채널 사용하기
action channel, event channel 외에 어떤 거에도 연결되지 않은 채널 직접 생성 가능.
watch와 fork패턴이 동시에 실행되는 태스크의 개수에 제한 없이 동시에 다수 요청 처리 가능.
actionChannel 이펙트를 이용하여 한 번에 하나의 태스크만 실행되도록 제한 가능.
최대 3개의 태스크를 한번에 실행해보자.
3개보다 적은 태스크를 실행하고 있다면, 요청 즉시 실행. 그게 아니라면 3개의 슬롯 중에 1개의 태스크가 끝나기전까지 대기열에 요청 집어넣을거임.
import {channel} from 'redux-saga';
function* watchRequests() {
// create a channel to queue incoming requests
const chan = yield call(channel)
// create 3 worker 'threads'
for (var i = 0; i < 3; i++) {
yield fork(handleRequest, chan)
}
while (true) {
const {payload} = yield take('REQUEST')
yield put(chan, payload)
}
}
function* handleRequest(chan) {
while (true) {
const payload = yield take(chan)
// process the request
}
}
3개의 워커 saga를 fork.
생산된 채널이 fork된 모든 saga에 인수로 들어감.
watchRequests는 3개의 워커 사가들에게 일을 dispatch하기 위해 채널을 사용.
각 request action마다 saga는 채널에 payload를 put할거임.
이는 할 일이 없는 워커 사가에게 전달.
이게 아니라면 워커 사가가 준비 될 때까지 채널 대기열에 들어감.
while 반복으로 3개의 워커가 req 액션을 가져가거나, 요청으로 만들어진 메시지가 있을 때까지 block됨.
이 메커니즘이 3개의 워커 칸 자동 로드밸런싱을 해줌.
WebSocket 이벤트에 이벤트 채널 이용하는 예제도 보자. server message ping을 기다리고, pong message를 약간의 딜레이 후에 회신한다고 하자.
import { take, put, call, apply, delay } from 'redux-saga/effects'
import { eventChannel } from 'redux-saga'
import { createWebSocketConnection } from './socketConnection'
// 이 함수는 주어진 소켓에서 이벤트 채널을 생성한다.
// ping events 구독을 세팅한다.
function createSocketChannel(socket) {
// 이벤트 채널은 구독자 함수를 가지고,
// 구독자 함수는 메세지를 채널에 전달하기 위해 emit을 인자로 받는다.
// `eventChannel` takes a subscriber function
// the subscriber function takes an `emit` argument to put messages onto the channel
return eventChannel(emit => {
const pingHandler = (event) => {
// 채널에 이벤트 payload 전달하기
// 사가에게 이 payload를 리턴된 채널에 전달하도록 허용
// puts event payload into the channel
// this allows a Saga to take this payload from the returned channel
emit(event.payload)
}
const errorHandler = (errorEvent) => {
// create an Error object and put it into the channel
emit(new Error(errorEvent.reason))
}
// setup the subscription
socket.on('ping', pingHandler)
socket.on('error', errorHandler)
// 구독자는 반드시 unsubscribe 함수를 리턴해야 한다
// sage는 channel.close method호출할 때 실행된다.
// the subscriber must return an unsubscribe function
// this will be invoked when the saga calls `channel.close` method
const unsubscribe = () => {
socket.off('ping', pingHandler)
}
return unsubscribe
})
}
// reply with a `pong` message by invoking `socket.emit('pong')`
function* pong(socket) {
yield delay(5000)
yield apply(socket, socket.emit, ['pong']) // call `emit` as a method with `socket` as context
}
export function* watchOnPings() {
const socket = yield call(createWebSocketConnection)
const socketChannel = yield call(createSocketChannel, socket)
while (true) {
try {
// An error from socketChannel will cause the saga jump to the catch block
const payload = yield take(socketChannel)
yield put({ type: INCOMING_PONG_PAYLOAD, payload })
yield fork(pong, socket)
} catch(err) {
console.error('socket error:', err)
// socketChannel is still open in catch block
// if we want end the socketChannel, we need close it explicitly
// socketChannel.close()
}
}
}
Summary
채널은
외부 이벤트 소스
사가 간 통신
특정 작업을 대기열에 넣을 때에도 사용.
actionchannel
eventChannel
channel 팩토리 함수를 이용하여 채널 만듬. 사가 간 통신을 위해 take/put 사용.
action channel
들어오는 메시지를 큐에 버퍼링할 수 있음.
event Channel
리덕스 스토어가 아닌 외부 이벤트(emitter를 setInterval에 전달) 를 위한 채널 생성.
saga에서 channel 가져와서 call을 통해 직접 생성.
보통 한번에 1개의 태스크만 실행되도록 함.
fork(generator func, channel)하여 여러개의 태스크를 한번에 실행 가능.