Rework #6

Merged
monoid merged 38 commits from dev into main 2024-04-17 01:45:37 +09:00
3 changed files with 93 additions and 55 deletions
Showing only changes of commit 385391334a - Show all commits

View File

@ -5,7 +5,7 @@
"main": "build/app.js", "main": "build/app.js",
"scripts": { "scripts": {
"compile": "swc src --out-dir dist", "compile": "swc src --out-dir dist",
"dev": "nodemon -r @swc-node/register --exec node app.ts", "dev": "nodemon -r @swc-node/register --enable-source-maps --exec node app.ts",
"start": "node build/app.js" "start": "node build/app.js"
}, },
"author": "", "author": "",

View File

@ -1,26 +1,33 @@
import { type Context, DefaultContext, DefaultState, Next } from "koa"; import { type Context, DefaultContext, DefaultState, Next } from "koa";
import Router from "koa-router"; import Router from "koa-router";
import { createReadableStreamFromZip, entriesByNaturalOrder, readZip, type ZipAsync } from "../util/zipwrap"; import { createReadableStreamFromZip, entriesByNaturalOrder, readZip } from "../util/zipwrap";
import type { ContentContext } from "./context"; import type { ContentContext } from "./context";
import { since_last_modified } from "./util"; import { since_last_modified } from "./util";
import type { ZipReader } from "@zip.js/zip.js";
import type { FileHandle } from "node:fs/promises";
import { Readable, Writable } from "node:stream";
/** /**
* zip stream cache. * zip stream cache.
*/ */
const ZipStreamCache: {
const ZipStreamCache: { [path: string]: [ZipAsync, number] } = {}; [path: string]: [{
reader: ZipReader<FileHandle>,
handle: FileHandle
}, number]
} = {};
async function acquireZip(path: string) { async function acquireZip(path: string) {
if (!(path in ZipStreamCache)) { if (!(path in ZipStreamCache)) {
const ret = await readZip(path); const obj = await readZip(path);
ZipStreamCache[path] = [ret, 1]; ZipStreamCache[path] = [obj, 1];
// console.log(`acquire ${path} 1`); // console.log(`acquire ${path} 1`);
return ret; return obj.reader;
} }
const [ret, refCount] = ZipStreamCache[path]; const [ret, refCount] = ZipStreamCache[path];
ZipStreamCache[path] = [ret, refCount + 1]; ZipStreamCache[path] = [ret, refCount + 1];
// console.log(`acquire ${path} ${refCount + 1}`); // console.log(`acquire ${path} ${refCount + 1}`);
return ret; return ret.reader;
} }
function releaseZip(path: string) { function releaseZip(path: string) {
@ -29,7 +36,10 @@ function releaseZip(path: string) {
const [ref, refCount] = obj; const [ref, refCount] = obj;
// console.log(`release ${path} : ${refCount}`); // console.log(`release ${path} : ${refCount}`);
if (refCount === 1) { if (refCount === 1) {
ref.close(); const { reader, handle } = ref;
reader.close().then(() => {
handle.close();
});
delete ZipStreamCache[path]; delete ZipStreamCache[path];
} else { } else {
ZipStreamCache[path] = [ref, refCount - 1]; ZipStreamCache[path] = [ref, refCount - 1];
@ -41,33 +51,35 @@ async function renderZipImage(ctx: Context, path: string, page: number) {
// console.log(`opened ${page}`); // console.log(`opened ${page}`);
const zip = await acquireZip(path); const zip = await acquireZip(path);
const entries = (await entriesByNaturalOrder(zip)).filter((x) => { const entries = (await entriesByNaturalOrder(zip)).filter((x) => {
const ext = x.name.split(".").pop(); const ext = x.filename.split(".").pop();
return ext !== undefined && image_ext.includes(ext); return ext !== undefined && image_ext.includes(ext);
}); });
if (0 <= page && page < entries.length) { if (0 <= page && page < entries.length) {
const entry = entries[page]; const entry = entries[page];
const last_modified = new Date(entry.time); const last_modified = entry.lastModDate;
if (since_last_modified(ctx, last_modified)) { if (since_last_modified(ctx, last_modified)) {
return; return;
} }
const read_stream = await createReadableStreamFromZip(zip, entry); const read_stream = await createReadableStreamFromZip(zip, entry);
/** Exceptions (ECONNRESET, ECONNABORTED) may be thrown when processing this request const nodeReadableStream = new Readable();
* for reasons such as when the browser unexpectedly closes the connection. nodeReadableStream._read = () => { };
* Once such an exception is raised, the stream is not properly destroyed,
* so there is a problem with the zlib stream being accessed even after the stream is closed.
* So it waits for 100 ms and releases it.
* Additionaly, there is a risk of memory leak becuase zlib stream is not properly destroyed.
* @todo modify function 'stream' in 'node-stream-zip' library to prevent memory leak */
read_stream.once("close", () => {
setTimeout(() => {
releaseZip(path);
}, 100);
});
ctx.body = read_stream; read_stream.pipeTo(new WritableStream({
ctx.response.length = entry.size; write(chunk) {
nodeReadableStream.push(chunk);
},
close() {
nodeReadableStream.push(null);
setTimeout(() => {
releaseZip(path);
}, 100);
},
}));
ctx.body = nodeReadableStream;
ctx.response.length = entry.uncompressedSize;
// console.log(`${entry.name}'s ${page}:${entry.size}`); // console.log(`${entry.name}'s ${page}:${entry.size}`);
ctx.response.type = entry.name.split(".").pop() as string; ctx.response.type = entry.filename.split(".").pop() as string;
ctx.status = 200; ctx.status = 200;
ctx.set("Date", new Date().toUTCString()); ctx.set("Date", new Date().toUTCString());
ctx.set("Last-Modified", last_modified.toUTCString()); ctx.set("Last-Modified", last_modified.toUTCString());

View File

@ -1,33 +1,59 @@
import type { ZipEntry } from "node-stream-zip"; import { type FileHandle, open } from "node:fs/promises";
import { ReadStream } from "node:fs";
import { orderBy } from "natural-orderby"; import { orderBy } from "natural-orderby";
import StreamZip from "node-stream-zip"; import { ZipReader, Reader, type Entry } from "@zip.js/zip.js";
export type ZipAsync = InstanceType<typeof StreamZip.async>; class FileReader extends Reader<FileHandle> {
export async function readZip(path: string): Promise<ZipAsync> { private fd: FileHandle;
return new StreamZip.async({ private offset: number;
file: path, constructor(fd: FileHandle) {
storeEntries: true, super(fd);
}); this.fd = fd;
this.offset = 0;
}
async init(): Promise<void> {
this.offset = 0;
this.size = (await this.fd.stat()).size;
}
close(): void {
this.fd.close();
}
async readUint8Array(index: number, length: number): Promise<Uint8Array> {
const buffer = new Uint8Array(length);
const buf = await this.fd.read(buffer, 0, length, index);
if (buf.bytesRead !== length) {
console.error(`read error: ${buf.bytesRead} !== ${length}`);
throw new Error("read error");
}
return buffer;
}
} }
export async function entriesByNaturalOrder(zip: ZipAsync) {
const entries = await zip.entries(); export async function readZip(path: string): Promise<{
const ret = orderBy(Object.values(entries), (v) => v.name); reader: ZipReader<FileHandle>
handle: FileHandle
}> {
const fd = await open(path);
const reader = new ZipReader(new FileReader(fd), {
useCompressionStream: true,
preventClose: false,
});
return { reader, handle: fd };
}
export async function entriesByNaturalOrder(zip: ZipReader<FileHandle>) {
// console.log(zip);
const entries = await zip.getEntries();
// console.log(entries.map((v) => v.filename));
const ret = orderBy(entries, (v) => v.filename);
return ret; return ret;
} }
export async function createReadableStreamFromZip(zip: ZipAsync, entry: ZipEntry): Promise<NodeJS.ReadableStream> { export async function createReadableStreamFromZip(_zip: ZipReader<FileHandle>, entry: Entry): Promise<ReadableStream> {
return await zip.stream(entry); if (entry.getData === undefined) {
} throw new Error("entry.getData is undefined");
export async function readAllFromZip(zip: ZipAsync, entry: ZipEntry): Promise<Buffer> { }
const stream = await createReadableStreamFromZip(zip, entry); const stream = new TransformStream<Uint8Array, Uint8Array>();
const chunks: Uint8Array[] = []; entry.getData(stream.writable);
return new Promise((resolve, reject) => { return stream.readable;
stream.on("data", (data) => {
chunks.push(data);
});
stream.on("error", (err) => reject(err));
stream.on("end", () => resolve(Buffer.concat(chunks)));
});
} }