-
Notifications
You must be signed in to change notification settings - Fork 0
/
abortable-iterator.ts
144 lines (127 loc) · 3.84 KB
/
abortable-iterator.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
/**
* @file abort-iterator.ts
* @description Creates an iterator that is abortable. Based on the npm package by the same name
* @copyright MIT Alan Shaw. Ported to Deno by Brandon Kalinowski.
* @version 3.0.0
*/
export class AbortError extends Error {
type = "aborted";
code: string | number;
constructor(message?: string, code?: string | number) {
super(message || "The operation was aborted");
this.code = code || "ABORT_ERR";
}
}
/**
* If the passed object is an (async) iterable, then get the iterator
* If it's probably an iterator already (i.e. has next function) return it
* else throw
*/
function getIterator<T, TReturn = any, TNext = unknown>(
obj: Iterable<T> | Iterator<T, TReturn, TNext>,
): Iterator<T, TReturn, TNext> {
if (obj) {
//@ts-ignore -- safe
if (typeof obj[Symbol.iterator] === "function") {
//@ts-ignore -- safe
return obj[Symbol.iterator]();
//@ts-ignore -- safe
} else if (typeof obj[Symbol.asyncIterator] === "function") {
//@ts-ignore -- safe
return obj[Symbol.asyncIterator]();
//@ts-ignore -- safe
} else if (typeof obj.next === "function") {
//@ts-ignore -- safe
return obj; // probably an iterator
}
}
throw new Error("argument is not an iterator or iterable");
}
interface AbortableOptions {
// deno-lint-ignore ban-types
onAbort?: Function;
abortMessage?: string;
abortCode?: string | number;
returnOnAbort?: boolean;
}
/** Wrap an iterator to make it abortable, allow cleanup when aborted via onAbort */
export const toAbortableSource = <T>(
source: Iterable<T> | Iterator<T>,
signal: AbortSignal,
options: AbortableOptions = {},
) => {
return toMultiAbortableSource(
source,
Array.isArray(signal) ? signal : [{ signal, options }],
);
};
const toMultiAbortableSource = <T>(
theSource: Iterable<T> | Iterator<T>,
signals: { signal: AbortSignal; options: AbortableOptions }[],
) => {
const source = getIterator(theSource);
signals = signals.map(({ signal, options }) => ({
signal,
options: options || {},
}));
async function* abortable() {
// deno-lint-ignore ban-types
let nextAbortHandler: Function | undefined;
const abortHandler = () => {
if (nextAbortHandler) nextAbortHandler();
};
for (const { signal } of signals) {
signal.addEventListener("abort", abortHandler);
}
while (true) {
let result;
try {
for (const { signal, options } of signals) {
if (signal.aborted) {
const { abortMessage, abortCode } = options!;
throw new AbortError(abortMessage, abortCode);
}
}
// TODO: improve promise type
const abort = new Promise<any>((_resolve, reject) => {
nextAbortHandler = () => {
const abortedSignal = signals.find(({ signal }) =>
signal.aborted
);
const msg = abortedSignal?.options.abortMessage;
const code = abortedSignal?.options.abortCode;
reject(new AbortError(msg, code));
};
});
// Race the iterator and the abort signals
result = await Promise.race([abort, source.next()]);
nextAbortHandler = undefined;
} catch (err) {
for (const { signal } of signals) {
signal.removeEventListener("abort", abortHandler);
}
// Might not have been aborted by a known signal
const aborter = signals.find(({ signal }) => signal.aborted)!;
const isKnownAborter = err.type === "aborted" && aborter;
if (isKnownAborter && aborter.options?.onAbort) {
// Do any custom abort handling for the iterator
await aborter.options.onAbort(source);
}
// End the iterator if it is a generator
if (typeof source.return === "function") {
await source.return();
}
if (isKnownAborter && aborter.options?.returnOnAbort) {
return;
}
throw err;
}
if (result.done) break;
yield result.value;
}
for (const { signal } of signals) {
signal.removeEventListener("abort", abortHandler);
}
}
return abortable();
};