Node.js & MongoDB

스트림 데이터 사용을 위한 Stream 모듈 사용하기

Node.js의 중요한 모듈 중에 하나로 Stream 모듈이 있다. 스트림은 파일 접근이나 HTTP 요청으로 데이터를 읽을 때를 포함한 많은 부분에 사용된다.

스트림을 사용하는 목적은 데이터 전송을 위한 공통 구조를 제공하기 위해서다. 스트림에서 데이터가 읽기 가능한 상태가 되면 data 이벤트가 발생하고, 오류가 발생하면 error 이벤트가 생성된다. 스트림에서 데이터를 처리하기 위한 리스너를 등록해 사용할 수도 있다.

스트림은 파일을 읽기 가능한 스트림으로 열거나 HTTP 요청을 읽기 가능한 스트림으로 만들어 필요한 정보를 읽을 수 있다. 추가적으로 커스텀 스트림을 만들어 사용할 수도 있다.

Readable 스트림

Readable 스트림은 다른 리소스로부터 애플리케이션으로 데이터를 쉽게 읽어 들일 수 있는 구조를 제공하기 위해 디자인됐다.

Readable 스트림은 데이터를 읽기 위해 read(size) 함수를 제공한다. size는 스트림에서 읽을 바이트의 크기를 지정한다. read()는 String 객체나 Buffer 객체, null을 반환한다. Readable 스트림은 다음 이벤트를 생성한다.

  • readable - 스트림에서 데이터 청크를 읽을 수 있을 때 생성
  • data - 데이터 이벤트 핸들러가 추가된 것을 제외하면 readable가 유사
  • end - 스트림에서 데이터가 더 이상 제공되지 않을 때 생성
  • close - 파일 같은 기본 리소스가 닫힐 때 생성
  • error - 데이터를 수신 시 오류가 발생한 경우 생성

다음은 Readable 스트림 객체가 제공하는 함수이다.

함수 설명
read([size]) 스트림에서 String이나 Buffer, null 형태의 데이터를 읽음. size 전달인자를 지정한 경우 읽는 데이터의 크기가 제한된다.
setEncoding(encoding) read() 요청 결과를 String 형태로 반환 시 인코딩 형태
pause() 객체에서 생성되는 data 이벤트를 중지
resume() 객체에서 생성되는 data 이벤트를 재개
pipe(destination, [options]) 출력 스트림을 destination 필드에 지정된 Writable 스트림 객체에 연결. {end:true}는 Readable이 끝나는 시점에 Writable 목적 스트림도 끝나는 것을 의미
unpipe([destination]) Writable 목적 스트림에서 객체 제거

사용자 맞춤형 Readable 스트림 객체를 구현하려면 util 모듈의 inherits() 함수를 사용하여 Readable 스트림의 기능을 상속받아야 한다.

var util = require('util');
util.inherits(MyReadableStream, stream.Readable);

위 코드를 수행한 후 객체의 call 함수 호출을 통해 객체를 생성할 수 있다.

stream.Readable.call(this, opt);

또한 _read() 함수를 구현해 Readable 스트림에서 데이터를 출력하기 위한 push()를 호출할 수 있다. push() 호출은 String 이나 Buffer, null을 사용해야 한다.

var stream = require('stream');
var util = require('util');
util.inherits(Answers, stream.Readable);
function Answers(opt) {
  stream.Readable.call(this, opt);
  this.quotes = ["yes", "no", "maybe"];
  this._index = 0;
}
Answers.prototype._read = function() {
  if (this._index > this.quotes.length){
    this.push(null);
  } else {
    this.push(this.quotes[this._index]);
    this._index += 1;
  }
};
var r = new Answers();
console.log("Direct read: " + r.read().toString());
r.on('data', function(data){
  console.log("Callback read: " + data.toString());
});
r.on('end', function(data){
  console.log("No more answers.");
});

위 코드는 Readable 스트림의 기본 구현 및 읽기 절차를 보여준다. Answers() 클래스는 Readable을 상속받고 Answers.prototype._read() 함수를 구현해 데이터를 추출한다. 직접 read() 호출을 통해 스트림의 처음 아이템을 읽은 후 data 이벤트 핸들러를 통해 남은 아이템을 읽는다.

다음은 수행 결과다.

Direct read: yes
Callback read: no
Callback read: maybe
No more answers.

Writable 스트림

Writable 스트림은 다른 코드 영역에서 읽기 쉬운 형태로 데이터 쓰기 구조를 제공하기 위해 디자인됐다.

Writable 스트림은 데이터를 스트림에 쓰기 위한 write(chunk, [encoding], [callback]) 함수를 제공한다. chunk에는 쓰여질 데이터를 말하고, encoding은 문자열 인코딩, callback은 데이터를 성공적으로 썼을 때 수행할 콜백 함수를 지정한다. write() 함수는 데이터가 성공적으로 쓰이면 true를 반환한다. Writable 스트림은 다음 이벤트를 발생한다.

  • drain - write() 호출이 false를 반환한 이후 다시 데이터를 쓰기 가능한 상태라는 것을 리스너 함수에 알려주기 위해 사용한다.
  • finish - 모든 데이터가 비워지고 더 이상 쓸 데이터가 없을 경우 Writable 객체에 end()가 호출된 후 생성된다.
  • pipe - Readable 스트림에 Writable 목적지가 추가되면 pipe() 함수 호출 후 생성된다.
  • unpipe - Readable 스트림에서 Writable 목적지제거를 위해 unpipe() 호출 후 생성된다.

다음은 Writable 스트림 객체 상에서 사용 가능한 함수를 보여준다.

함수 설명
write(chunk, [encoding], [callback]) 스트림 객체의 데이터 위치에 데이터 청크를 쓴다. 데이터는 String이나 Buffer 형태다. encoding이 지정되면 문자열 데이터의 인코딩 정보로 사용된다. callback이 지정되면 데이터가 비워진 이후 호출된다.
end(chunk, [encoding], [callback]) 데이터를 더 이상 수용하지 않고 finish 이벤트를 보내는 것을 제외하면 write()와 동일하다.

Writable 스트림 객체를 만들려면 Writable 스트림의 기능을 상속받고 util 모듈의 inherits() 함수를 사용하면 생성 가능한다.

var util = require('util');
util.inherits(MyWritableStream, stream.Writable);

이제 객체 호출을 통해 객체를 생성할 수 있다.

stream.Writable.call(this, opt);

_write(data, encoding, callback) 함수를 구현해 Writable 객체에 데이터를 저장할 수 있다.

var stream = require('stream');
var util = require('util');
util.inherits(Writer, stream.Writable);
function Writer(opt) {
  stream.Writable.call(this, opt);
  this.data = new Array();
}
Writer.prototype._write = function(data, encoding, callback) {
  this.data.push(data.toString('utf8'));
  console.log("Adding: " + data);
  callback();
};
var w = new Writer();
for (var i=1; i<=5; i++){
  w.write("Item" + i, 'utf8');
}
w.end("ItemLast");
console.log(w.data);

다음은 실행결과다.

Adding: Item1
Adding: Item2
Adding: Item3
Adding: Item4
Adding: Item5
Adding: ItemLast
[ 'Item1', 'Item2', 'Item3', 'Item4', 'Item5', 'ItemLast' ]

Duplex 스트림

Duplex 스트림은 Readable과 Writable 기능을 합친 스트림이다. TCP 소켓 연결이 Duplex 스트림의 좋은 예다. 소켓 연결이 생성된 후에는 읽기와 쓰기가 가능하다.

Duplex 스트림 객체를 구현하려면 다음과 같이 한다.

var util = require('util');
util.inherits(MyDuplexStream, stream.Duplex);

객체 호출을 통해 객체를 생성한다.

stream.Duplex.call(this, opt);

Duplex 스트림을 구현 시 read(size)_write(data, encoding, callback)을 모두 구현해야 한다.

var stream = require('stream');
var util = require('util');
util.inherits(Duplexer, stream.Duplex);
function Duplexer(opt) {
  stream.Duplex.call(this, opt);
  this.data = [];
}
Duplexer.prototype._read = function readItem(size) {
  var chunk = this.data.shift();
  if (chunk == "stop"){
    this.push(null);
  } else{
    if(chunk){
      this.push(chunk);
    } else {
      setTimeout(readItem.bind(this), 500, size);
    }
  }
};
Duplexer.prototype._write = function(data, encoding, callback) {
  this.data.push(data);
  callback();
};
var d = new Duplexer();
d.on('data', function(chunk){
  console.log('read: ', chunk.toString());
});
d.on('end', function(){
  console.log('Message Complete');
});
d.write("I think, ");
d.write("therefore ");
d.write("I am.");
d.write("Rene Descartes");
d.write("stop");

다음은 실행결과다.

read:  I think, therefore
read:  I am.
read:  Rene Descartes
Message Complete