Skip to content

Commit

Permalink
Fix autoack bugs (#185)
Browse files Browse the repository at this point in the history
* acksFromNode should fetch previous indexer node

* pass pending flag to shouldAckHeads

* make tests less timing dependent
  • Loading branch information
chm-diederichs authored Oct 18, 2024
1 parent db7bb65 commit 2fb87ca
Show file tree
Hide file tree
Showing 3 changed files with 47 additions and 41 deletions.
7 changes: 4 additions & 3 deletions lib/consensus.js
Original file line number Diff line number Diff line change
Expand Up @@ -186,11 +186,12 @@ module.exports = class Consensus {
for (const idx of this.indexers) {
if (idx === view.writer) continue

const next = view.clock.get(idx.core.key)
const length = view.clock.get(idx.core.key)
if (!length) continue

if (target.clock.includes(idx.core.key, next)) continue
if (target.clock.includes(idx.core.key, length)) continue

const head = idx.get(next)
const head = idx.get(length - 1)
if (!head) continue

if (head.clock.includes(target.writer.core.key, target.length)) {
Expand Down
4 changes: 2 additions & 2 deletions lib/linearizer.js
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,7 @@ module.exports = class Linearizer {
}

// ack if any head is closer to confirming a value
_shouldAckHeads (writer) {
_shouldAckHeads (writer, pending) {
const prev = writer.head()

for (const head of this.heads) {
Expand All @@ -246,7 +246,7 @@ module.exports = class Linearizer {
if (visited.has(node)) continue
visited.add(node)

if (node.value !== null) {
if (pending || node.value !== null) {
const acks = this.consensus.acksFromNode(node, head)
const prevAcks = this.consensus.acksFromNode(node, prev)

Expand Down
77 changes: 41 additions & 36 deletions test/autoack.js
Original file line number Diff line number Diff line change
Expand Up @@ -42,21 +42,23 @@ test('autoack - simple', async t => {
})

test('autoack - 5 writers', async t => {
t.plan(26)
t.plan(21)

const ackInterval = 50

const { bases } = await create(5, t, {
ackInterval: 10,
ackInterval,
ackThreshold: 0
})

const [a, b, c, d, e] = bases

t.teardown(replicate(bases))

await addWriterAndSync(a, b)
await addWriterAndSync(a, c)
await addWriterAndSync(a, d)
await addWriterAndSync(a, e)
addWriter(a, b)
addWriter(a, c)
addWriter(a, d)
await addWriter(a, e)

await sync(bases)

Expand All @@ -70,30 +72,18 @@ test('autoack - 5 writers', async t => {
t.is(d.view.indexedLength, 0)
t.is(e.view.indexedLength, 0)

await new Promise(resolve => setTimeout(resolve, 1000))
await poll(() => bases.reduce((acc, b) => acc && b.view.indexedLength === 1, true), ackInterval)

let alen = a.local.length
let blen = b.local.length
let clen = c.local.length
let dlen = d.local.length
let elen = e.local.length
// allow acks to settle
await new Promise(resolve => setTimeout(resolve, 10 * ackInterval))

await new Promise(resolve => setTimeout(resolve, 300))

// check that acks stop
t.is(a.local.length, alen)
t.is(b.local.length, blen)
t.is(c.local.length, clen)
t.is(d.local.length, dlen)
t.is(e.local.length, elen)

alen = a.local.length
blen = b.local.length
clen = c.local.length
dlen = d.local.length
elen = e.local.length
const alen = a.local.length
const blen = b.local.length
const clen = c.local.length
const dlen = d.local.length
const elen = e.local.length

await new Promise(resolve => setTimeout(resolve, 300))
await new Promise(resolve => setTimeout(resolve, 10 * ackInterval))

// check that acks stop
t.is(a.local.length, alen)
Expand All @@ -102,8 +92,6 @@ test('autoack - 5 writers', async t => {
t.is(d.local.length, dlen)
t.is(e.local.length, elen)

await sync([a, b, c, d, e])

t.is(a.linearizer.indexers.length, 5)
t.is(b.linearizer.indexers.length, 5)
t.is(c.linearizer.indexers.length, 5)
Expand All @@ -120,19 +108,21 @@ test('autoack - 5 writers', async t => {
test('autoack - concurrent', async t => {
t.plan(10)

const ackInterval = 100

const { bases } = await create(5, t, {
ackInterval: 20,
ackInterval,
ackThreshold: 0
})

const [a, b, c, d, e] = bases

t.teardown(replicate(bases))

await addWriterAndSync(a, b)
await addWriterAndSync(a, c)
await addWriterAndSync(a, d)
await addWriterAndSync(a, e)
addWriter(a, b)
addWriter(a, c)
addWriter(a, d)
await addWriter(a, e)

await sync(bases)

Expand All @@ -158,8 +148,8 @@ test('autoack - concurrent', async t => {
t.is(c.local.length, clen)
t.is(d.local.length, dlen)
t.is(e.local.length, elen)
}, 500)
}, 2000)
}, 10 * ackInterval)
}, 40 * ackInterval)

async function message (w, n) {
for (let i = 0; i < n; i++) {
Expand Down Expand Up @@ -570,3 +560,18 @@ test('autoack - minority indexers with non-indexer tails', async t => {
function getWriter (base, writer) {
return base.activeWriters.get(writer.core.key)
}

function poll (fn, interval) {
return new Promise(resolve => {
const int = setInterval(check, interval)

function check () {
if (fn()) done()
}

function done () {
clearInterval(int)
resolve()
}
})
}

0 comments on commit 2fb87ca

Please sign in to comment.