-
Notifications
You must be signed in to change notification settings - Fork 3
/
index.js
74 lines (65 loc) · 1.92 KB
/
index.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
const { Writable, Readable } = require('streamx')
class WriteStream extends Writable {
constructor (feed, opts) {
super()
this.feed = feed
this.maxBlockSize = (opts && opts.maxBlockSize) || 0
}
_writev (batch, cb) {
this.feed.append(this.maxBlockSize ? this._ensureMaxSize(batch) : batch, cb)
}
_ensureMaxSize (batch) {
for (let i = 0; i < batch.length; i++) {
let blk = batch[i]
if (blk.length > this.maxBlockSize) {
const chunked = []
while (blk.length > this.maxBlockSize) {
chunked.push(blk.slice(0, this.maxBlockSize))
blk = blk.slice(this.maxBlockSize)
}
if (blk.length) chunked.push(blk)
batch.splice(i, 1, ...chunked)
i += chunked.length - 1
}
}
return batch
}
}
class ReadStream extends Readable {
constructor (feed, opts = {}) {
super()
this.feed = feed
this.start = opts.start || 0
this.end = typeof opts.end === 'number' ? opts.end : -1
this.live = !!opts.live
this.snapshot = opts.snapshot !== false
this.tail = !!opts.tail
this.index = this.start
this.options = { wait: opts.wait !== false, ifAvailable: !!opts.ifAvailable, valueEncoding: opts.valueEncoding }
}
_open (cb) {
this.feed.ready((err) => {
if (err) return cb(err)
if (this.end === -1) {
if (this.live) this.end = Infinity
else if (this.snapshot) this.end = this.feed.length
if (this.start > this.end) this.push(null)
}
if (this.tail) this.start = this.feed.length
this.index = this.start
cb(null)
})
}
_read (cb) {
if (this.index === this.end || (this.end === -1 && this.index >= this.feed.length)) {
this.push(null)
return cb(null)
}
this.feed.get(this.index++, this.options, (err, block) => {
if (err) return cb(err)
this.push(block)
cb(null)
})
}
}
module.exports = { WriteStream, ReadStream }