Node.js + MongoDB + AngularJS – 5 在Node.js中处理数据I0-2

释放双眼,带上耳机,听听看~!

3. 使用Stream模块来传送数据

Stream模块是Node.js的一个重要模块。数据流是可读、可写,或即可读又可写的内存结构。
流的目的是提供一种从一个地方向另一个地方传送数据的通用机制。
流一般用于HTTP数据和文件。

3.1 Readable流

Readable流旨在提供一种机制,以方便地读取从其他来源进入应用程序的数据。
常见实例:

  • 在客户端的HTTP相应
  • 在服务器的HTTP请求
  • fs读取流
  • zlib流
  • crypto(加密)流
  • TCP套接字
  • 子进程的stdout和stderr
  • process.stdin

Readable流公开了以下事件:

  • readable: 在数据块可以从流中读取的时候发出
  • data:类似于readable;不同之处在于,当数据的事件处理程序被连接时,流被转成流动的模式,并且数据处理程序被连续地调用,直到所有的数据都被用尽
  • end:当数据将不再被提供时由流发出
  • close:当底层的自愿,如文件,已关闭时发出
  • error:当在接受数据中出现错误时发出

Readable流对象提供的方法;

read([size])
从流中读取数据。这些数据可以是String、Buffer或者null(null表示没有剩下任何更多的数据)。如果指定size参数,那么被读取的数据将将仅限于那个字节数
setEncoding(encoding)
设置从read()请求读取返回string时使用的编码
pause()
暂停从该对象发出的data事件
resume()
恢复从该事件发出的data事件
pipe(desctination, [options])
把这个流的输出传输到一个由destination(目的地)指定的Writeable流对象。options是一个Javascript对象。例如,{end: true}当Readable结束时就结束Writable的目的地
unpipe([desctination])
从Writeable目的地断开这一对象


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
1var stream = require('stream') ;
2var util = require('util') ;
3util.inherits(Answers, stream.Readable) ;
4
5function Answers(opt) {
6    stream.Readable.call(this, opt) ;
7    this.quotes = ['yes', 'no', 'maybe'] ;
8    this._index = 0 ;
9}
10
11Answers.prototype._read = function () {
12    if (this._index > this.quotes.length) {
13        this.push(null) ;
14    } else {
15        this.push( this.quotes[this._index] ) ;
16        this._index += 1 ;
17    }
18} ;
19
20var r = new Answers() ;
21
22console.log('Direct read: ' + r.read().toString()) ;
23
24r.on('data', function (data) {
25    console.log('Callback read: ' + data.toString()) ;
26}) ;
27
28r.on('end', function (data) {
29    console.log('No more answers') ;
30}) ;
31

3.2 Writeable流

Writeable流旨在提供把数据写入一种可以轻松地在代码的另一个区域被使用的机制。
常用实例:

  • 客户端上的HTTP请求
  • 服务器上的HTTP相应
  • fs写入流
  • zlib流
  • crypto流
  • TCP套接字
  • 子进程的stdin
  • process.stdout 和 process.stderr

Writeable流公开的事件:

  • drain:在write()调用返回false后,当准备好开始写更多的数据时,发出此事件通知监听器
  • finish:当end()在Writeabel对象上被调用,所有的数据都被刷新,并且不会有更多的数据将被接受时发出此事件
  • pipe:当pipe()方法在Readable上被调用,以添加此Writeable为目的地时,发出此事件
  • unpipie:当unpipe()方法在Readable流上被调用,以删除此Writeable为目的地时,发出此事件

Writeable流对象的方法:

write(chunk, [encoding], [callback])
将数据块写入流对象的数据位置。该数据可以是字符串或缓冲区。如果指定encoding,那么将其用于对字符串数据的编码。如果指定callback,那么它在数据已被刷新后被调用
end(chunk, [encoding], [callback])
与write()相同,除了它把Writeable对象置于不再接受数据的状态,并发送finish事件外


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
1var stream = require('stream') ;
2var util = require('util') ;
3
4util.inherits(Writer, stream.Writable) ;
5
6function Writer(opt) {
7    stream.Writable.call(this, opt) ;
8    this.data = new Array() ;
9}
10
11Writer.prototype._write = function (data, encoding, callback) {
12   this.data.push( data.toString('utf8') ) ;
13    console.log('Adding: ' + data) ;
14    callback() ;
15} ;
16
17var w = new Writer() ;
18for(var i = 1; i <= 5; i++ ) {
19    w.write('Item' + i, 'utf8') ;
20}
21
22w.end('ItemLast') ;
23
24console.log(w.data) ;
25

3.3 Duplex流

Duplex(双向)流是结合可读写功能的流。


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
1var stream = require('stream') ;
2var util = require('util') ;
3
4util.inherits(Duplexer, stream.Duplex) ;
5
6function Duplexer(opt) {
7    stream.Duplex.call(this, opt) ;
8    this.data = [] ;
9}
10
11Duplexer.prototype._read = function readItem(size) {
12   var chunk = this.data.shift() ;
13    if (chunk == 'stop') {
14        this.push(null) ;
15    } else {
16        if (chunk) {
17            this.push(chunk) ;
18        } else {
19            setTimeout(readItem.bind(this), 500, size) ;
20        }
21    }
22} ;
23
24Duplexer.prototype._write = function (data, encoding, callback) {
25   this.data.push(data) ;
26    callback() ;
27} ;
28
29var d = new Duplexer() ;
30d.on('data', function (chunk) {
31    console.log('read: ', chunk.toString()) ;
32}) ;
33d.on('end', function () {
34    console.log('Message Complete') ;
35}) ;
36d.write('I think, ') ;
37d.write('therefore ') ;
38d.write('I am, ') ;
39d.write('Rene descartes') ;
40d.write('stop') ;
41

3.4 Transform流

Transform(变换)流扩展了Duplex流,但它修改Writeable流和Readable流之间的数据。当你需要修改从一个系统到另一个系统的数据时,此流会非常有用。
Duplex和Transform流之间的一个主要区别是:在Transform流中不需要实现_read()和_write()原型方法。这些被作为直通函数提供。相反,你要实现_transform(chunk, encoding, callback)和_finish(callback)方法。此_transform()方法应该接受来自write()请求的数据,对其修改,并推出修改后的数据。


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
1var stream = require('stream') ;
2var util = require('util') ;
3
4util.inherits(JSONObjectStream, stream.Transform) ;
5
6function JSONObjectStream(opt) {
7    stream.Transform.call(this, opt) ;
8}
9
10JSONObjectStream.prototype._transform = function (data, encoding, callback) {
11    object = data ? JSON.parse(data.toString()) : '' ;
12    this.emit('object', object) ;
13    object.handled  = true ;
14    this.push(JSON.stringify(object)) ;
15    callback() ;
16} ;
17
18JSONObjectStream.prototype._finish = function (cb) {
19    cb() ;
20} ;
21
22var tc = new JSONObjectStream() ;
23
24tc.on('object', function (object) {
25    console.log('Name: %s', object.name) ;
26    console.log('Color: %s', object.color) ;
27}) ;
28
29tc.on('data', function (data) {
30    console.log('Data: %s', data.toString()) ;
31}) ;
32
33tc.write('{"name": "Carolinus", "color": "Green"}') ;
34tc.write('{"name": "Solarius", "color": "Blue"}') ;
35tc.write('{"name": "Lo Tea Zhao", "color": "Gold"}') ;
36tc.write('{"name": "Ommadon", "color": "Red"}') ;
37

3.5 把Readable流用管道输送到Writeable流

可以用流对象做的最酷的东西之一是通过pipe(writeableStream, [options])函数把Readable流链接到Writeable流。


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
1var stream = require( 'stream' ) ;
2var util = require( 'util' ) ;
3
4util.inherits( Reader, stream.Readable ) ;
5util.inherits( Writer, stream.Writable ) ;
6
7function Reader(opt) {
8    stream.Readable.call(this, opt) ;
9    this._index = 1 ;
10}
11
12Reader.prototype._read = function (size) {
13   var i = this._index++ ;
14    if( i > 10 ) {
15        this.push(null) ;
16    } else {
17        this.push('Item ' + i.toString()) ;
18    }
19} ;
20
21function Writer(opt) {
22    stream.Writable.call(this, opt) ;
23    this._index = 1 ;
24}
25
26Writer.prototype._write = function (data, encoding, callback) {
27   console.log(data.toString()) ;
28    callback() ;
29} ;
30
31var r = new Reader() ;
32var w = new Writer() ;
33
34r.pipe( w ) ;
35

4. 用zlib压缩与解压缩数据

支持如下这些压缩算法:

  • gzip/gunzip: 标准gzip压缩
  • deflate/inflate: 基于Huffman编码的标准deflate压缩算法
  • deflateRaw/inflateRaw:针对原始缓冲区的deflate压缩算法

4.1 压缩和解压缩缓冲区


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
1var zlib = require('zlib') ;
2var input = '......................text.....................' ;
3
4zlib.deflate(input, function (err, buffer) {
5    if(!err) {
6        console.log('deflate (%s): ', buffer.length, buffer.toString('base64')) ;
7
8        zlib.inflate(buffer, function (err, buffer) {
9            if(!err) {
10                console.log('inflate (%s): ', buffer.length, buffer.toString()) ;
11            }
12        }) ;
13
14        zlib.unzip(buffer, function (err, buffer) {
15            if (!err) {
16                console.log('unzip deflate (%s): ', buffer.length, buffer.toString()) ;
17            }
18        }) ;
19    }
20}) ;
21
22zlib.deflateRaw(input, function (err, buffer) {
23    if(!err) {
24        console.log('deflateRaw (%s): ', buffer.length, buffer.toString('base64')) ;
25
26        zlib.inflateRaw(buffer, function (err, buffer) {
27            console.log('inflateRaw (%s): ', buffer.length, buffer.toString()) ;
28        }) ;
29
30    }
31}) ;
32
33zlib.gzip(input, function (err, buffer) {
34    if(!err) {
35        console.log('gzip (%s): ', buffer.length, buffer.toString('base64')) ;
36
37        zlib.gunzip(buffer, function (err, buffer) {
38            if(!err) {
39                console.log('gunzip (%s): ', buffer.length, buffer.toString()) ;
40            }
41        }) ;
42
43        zlib.unzip(buffer, function (err, buffer) {
44            if(!err) {
45                console.log('unzip gzip (%s): ', buffer.length, buffer.toString()) ;
46            }
47        }) ;
48    }
49}) ;
50

4.2 压缩/解压缩流


1
2
3
4
5
6
7
8
9
10
11
12
13
14
1var zlib = require('zlib') ;
2var gzip = zlib.createGzip() ;
3var fs = require('fs') ;
4var inFile = fs.createReadStream('zlib_file.js') ;
5var outFile = fs.createWriteStream('zlib_file.gz') ;
6
7inFile.pipe(gzip).pipe(outFile) ;
8setTimeout(function () {
9    var gunzip = zlib.createUnzip({flush: zlib.Z_FULL_FLUSH}) ;
10    var inFile = fs.createReadStream('zlib_file.gz') ;
11    var outFile = fs.createWriteStream('zlib_file.unzipped') ;
12    inFile.pipe(gunzip).pipe(outFile) ;
13}, 3000) ;
14

给TA打赏
共{{data.count}}人
人已打赏
安全技术

对称加密和分组加密中的四种模式(ECB、CBC、CFB、OFB)

2021-8-18 16:36:11

安全技术

C++ 高性能服务器网络框架设计细节

2022-1-11 12:36:11

个人中心
购物车
优惠劵
今日签到
有新私信 私信列表
搜索