Browse Source

feat(queue): return promise from queue

Delegate to p-queue with concurrency=1
pull/11/head
Barrie Treloar 1 year ago
committed by Nicolas Petton
parent
commit
775b61b7a1
4 changed files with 56 additions and 46 deletions
  1. +4
    -20
      server/helpers/queue.js
  2. +27
    -0
      server/package-lock.json
  3. +1
    -0
      server/package.json
  4. +24
    -26
      server/spec/helpers/queue-spec.js

+ 4
- 20
server/helpers/queue.js View File

@ -1,7 +1,8 @@
const {default: PQueue} = require("p-queue");
/**
* Return a queued version of the function `f`.
*/
// TODO: Return a promise
const queued = f => (q => () => q(f))(queue());
/**
@ -17,26 +18,9 @@ const queued = f => (q => () => q(f))(queue());
* Registered workers are automatically evaluated in sequence.
*/
const queue = () => {
let workers = [];
let running = false;
const run = async () => {
if (running) return;
running = true;
try {
while (workers.length) {
await workers.shift()();
}
}
finally { running = false; }
};
const pqueue = new PQueue({concurrency: 1});
// TODO: Return a promise, to make it async and enable promise error handling
return f => {
workers.push(f);
setTimeout(run, 0);
};
return f => pqueue.add(() => f());
};
module.exports = { queued, queue };

+ 27
- 0
server/package-lock.json View File

@ -359,6 +359,11 @@
"integrity": "sha1-Cr9PHKpbyx96nYrMbepPqqBLrJs=",
"dev": true
},
"eventemitter3": {
"version": "4.0.0",
"resolved": "https://registry.npmjs.org/eventemitter3/-/eventemitter3-4.0.0.tgz",
"integrity": "sha512-qerSRB0p+UDEssxTtm6EDKcE7W4OaoisfIMl4CngyEhjpYglocpNg6UEqCvemdGhosAsg4sO2dXJOdyBifPGCg=="
},
"external-editor": {
"version": "3.0.3",
"resolved": "https://registry.npmjs.org/external-editor/-/external-editor-3.0.3.tgz",
@ -755,6 +760,28 @@
"integrity": "sha1-u+Z0BseaqFxc/sdm/lc0VV36EnQ=",
"dev": true
},
"p-finally": {
"version": "1.0.0",
"resolved": "https://registry.npmjs.org/p-finally/-/p-finally-1.0.0.tgz",
"integrity": "sha1-P7z7FbiZpEEjs0ttzBi3JDNqLK4="
},
"p-queue": {
"version": "6.2.0",
"resolved": "https://registry.npmjs.org/p-queue/-/p-queue-6.2.0.tgz",
"integrity": "sha512-B2LXNONcyn/G6uz2UBFsGjmSa0e/br3jznlzhEyCXg56c7VhEpiT2pZxGOfv32Q3FSyugAdys9KGpsv3kV+Sbg==",
"requires": {
"eventemitter3": "^4.0.0",
"p-timeout": "^3.1.0"
}
},
"p-timeout": {
"version": "3.2.0",
"resolved": "https://registry.npmjs.org/p-timeout/-/p-timeout-3.2.0.tgz",
"integrity": "sha512-rhIwUycgwwKcP9yTOOFK/AKsAopjjCakVqLHePO3CC6Mir1Z99xT+R63jZxAT5lFZLa2inS5h+ZS2GvR99/FBg==",
"requires": {
"p-finally": "^1.0.0"
}
},
"parent-module": {
"version": "1.0.1",
"resolved": "https://registry.npmjs.org/parent-module/-/parent-module-1.0.1.tgz",


+ 1
- 0
server/package.json View File

@ -28,6 +28,7 @@
"dependencies": {
"chrome-remote-interface": "^0.27.2",
"node-fetch": "^2.2.0",
"p-queue": "^6.2.0",
"semver": "^5.5.0",
"source-map": "^0.7.3"
},


+ 24
- 26
server/spec/helpers/queue-spec.js View File

@ -10,12 +10,12 @@ describe("Running queues", () => {
let result = [];
let workers = expected.map(n => () => result.push(n));
workers.forEach(q);
let promises = workers.map(q);
setTimeout(() => {
Promise.all(promises).then(() => {
expect(result).toEqual(expected);
done();
}, 100);
});
});
it("should run async workers in sequence", (done) => {
@ -25,15 +25,16 @@ describe("Running queues", () => {
let result = [];
expected.forEach((n) => q(async () => {
await timeout(20);
let promises = expected.map((n) => q(async () => {
// make the first ones slower than the later ones
await timeout((3-n)*20);
result.push(n);
}));
setTimeout(() => {
Promise.all(promises).then(() => {
expect(result).toEqual(expected);
done();
}, 200);
});
});
it("should run again when adding new workers", (done) => {
@ -46,24 +47,23 @@ describe("Running queues", () => {
result.push(n);
});
workers.forEach(q);
let promises = workers.map(q);
setTimeout(() => {
Promise.all(promises).then(() => {
expect(result).toEqual([ 1, 2, 3 ]);
done();
}, 100);
});
let newWorkers = [ 4, 5, 6 ].map(n => async () => {
await timeout(20);
result.push(n);
});
setTimeout(() => newWorkers.forEach(q), 300);
promises = newWorkers.map(q);
setTimeout(() => {
Promise.all(promises).then(() => {
expect(result).toEqual([ 1, 2, 3, 4, 5, 6 ]);
done();
}, 400);
});
});
it("queues should not evaluate other queues workers", (done) => {
@ -76,19 +76,19 @@ describe("Running queues", () => {
let result1 = [];
let result2 = [];
expected1.forEach(n => q1(() => result1.push(n)));
expected2.forEach(n => q2(() => result2.push(n)));
let promises1 = expected1.map(n => q1(() => result1.push(n)));
let promises2 = expected2.map(n => q2(() => result2.push(n)));
setTimeout(() => {
Promise.all([...promises1, ...promises2]).then(() => {
expect(result1).toEqual(expected1);
expect(result2).toEqual(expected2);
done();
}, 100);
});
});
});
describe("queued functions", () => {
it("can queue functions", (done) => {
it("can queue functions", async (done) => {
let count = 0;
let increase = () => {
@ -97,13 +97,11 @@ describe("queued functions", () => {
let queuedIncrease = queued(increase);
queuedIncrease();
queuedIncrease();
queuedIncrease();
await queuedIncrease();
await queuedIncrease();
await queuedIncrease();
setTimeout(() => {
expect(count).toBe(3);
done();
}, 10);
expect(count).toBe(3);
done();
});
})

Loading…
Cancel
Save