5장 Stream API (3/3) - Stream을 사용할 때에 순차 실행, 병렬 실행, 제한된 병렬 실행 구현하기
이 글은 Stream을 사용할 때에 순차 실행, 병렬 실행, 제한된 병렬 실행에 대해 다룬다. 또한 독자가 Node.js Stream에 대한 기초 지식이 있음을 전제로 작성되었음을 밝힌다.
참고 자료:
- Node.js에서의 스트림 자체에 대해서는 5장 Stream API (1/3) - 스트림 개요 및 Readable Stream
- Stream의 종류 4가지에 대해서는 5장 Stream API (2/3) - Node.js의 4가지 스트림 소개와 사용법
1. 여러 파일을 하나의 파일로 순차적으로 병합하는 방법
스트림은 당연하게도 비동기로 작동한다. 여러 개의 Redable Stream이 있고 하나의 Writable Stream이 있을 때, 각 작업들을 순차적으로 수행하는 방법이 있을까? 가능하다. 여러 개의 Readable 을 각각 Writable로 연결하고, Redable에 순서를 지정하면 된다. 아래 코드에 대한 설명은 주석으로 나타나 있으니 주석을 따라가기 바란다.
1 | const fromArray = require('from2-array'); |
2. 순서에 상관 없이 결과를 비동기로, 병렬적으로 한 파일에 출력하는 방법
http://thiswillbedownforsure.com is down
https://www.naver.com is up
https://www.google.com is up
위와 같이 특정 사이트 목록들에 대해 health check를 하고 그 결과를 파일로 출력하는 프로그램을 만든다고 하자. 굳이 Stream으로 만들 필요는 없겠지만 그렇게 해본다면 다음과 같은 코드를 생각해볼 수 있다.
일단 Transform 기반의 스트림을 하나 정의한다. 이 스트림은 request
의 콜백으로 스트림의 기능을 빌려주는 형태로 작동한다.
1 | // Transform 스트림을 하나 정의한다. |
위에서 정의한 스트림을 사용해 구현한다.
1 | /* |
3. (2)의 동시 실행 수를 제한하는 방법
비동기 요청 여러 개를 처리하는 일은 Node.js에선 매우 간단하다. Run to Completion이기 때문에 변수 하나로 비동기 작업의 개수를 정확히 세고 이 값에 기반해 의사 결정을 할 수 있다.
따라서 this.running
의 개수가 동시 실행 제한 개수에 도달한 경우 처리하지 않으면 된다. 좀 더 정확하게는, _transform
함수에서 해당 chunk의 처리가 완료됐음을 알리는 콜백을 호출하지 않고 보류하면 된다.
이 경우 해당 chunk를 처리한 결과는 다음 스트림으로 넘어가지 않으며 현재 chunk가 처리되지 않았기 때문에 추가적인 chunk가 스트림으로 전달되지도 않는다(스트림 내부 버퍼에 쌓인다).
만약 ReadableStream이 chunk를 생성하고 내보내는 속도가 우리의 스트림의 처리 속도보다 빠르다면 처리되지 않는 chunk는 Transform의 버퍼에 쌓이며 이내 백 프레셔가 발동되고 알아서 처리될 것이다. - pipe
로 연결하면 Node.js에서 자동으로 처리한다. 백 프래셔에 대해선 5장 Stream API (2/3) - Node.js의 4가지 스트림 소개와 사용법을 참고하라.
따라서 추가적으로 신경써야 하는 부분은 출력을 할 지 여부를 결정하는 것이다.
1 | constructor(concurrency, userTransform) { |
참고 자료 (이번 글만 특별히 도움이 됐는지와는 별개로 읽은 몇 개의 글을 링크한다.):
What’s the proper way to handle back-pressure in a node.js Transform stream?
Awesome Nodejs#Streams (Github Repo)
TODO:
- Stream 관련해서 자세한 자료보단 내부 구조를 코드 수준에서 확인하는 게 가장 좋을 것 같다.
- Back Pressure의 효과를 제대로 확인하기 위해선 디버거를 키고 스트림 객체를 살펴봐야 할 것 같다.
- Stream의 추상하된 구현체들을 가져다 쓸 수록 더욱 더 이해하기 어려워지는 것 같다.
- Stream을 3부작으로 나누어 작성하려고 했는데 한 10부작 까지는 나올 수도 있을 것 같다. 그만큼 부족하고, 글 쓰는 데도 매우 오래 걸린다.
5장 Stream API (3/3) - Stream을 사용할 때에 순차 실행, 병렬 실행, 제한된 병렬 실행 구현하기