206 lines
5.6 KiB
JavaScript
206 lines
5.6 KiB
JavaScript
'use strict'
|
|
|
|
const events = require('events')
|
|
|
|
const contentPath = require('./path')
|
|
const fs = require('fs/promises')
|
|
const { moveFile } = require('@npmcli/fs')
|
|
const { Minipass } = require('minipass')
|
|
const Pipeline = require('minipass-pipeline')
|
|
const Flush = require('minipass-flush')
|
|
const path = require('path')
|
|
const ssri = require('ssri')
|
|
const uniqueFilename = require('unique-filename')
|
|
const fsm = require('fs-minipass')
|
|
|
|
module.exports = write
|
|
|
|
// Cache of move operations in process so we don't duplicate
|
|
const moveOperations = new Map()
|
|
|
|
async function write (cache, data, opts = {}) {
|
|
const { algorithms, size, integrity } = opts
|
|
|
|
if (typeof size === 'number' && data.length !== size) {
|
|
throw sizeError(size, data.length)
|
|
}
|
|
|
|
const sri = ssri.fromData(data, algorithms ? { algorithms } : {})
|
|
if (integrity && !ssri.checkData(data, integrity, opts)) {
|
|
throw checksumError(integrity, sri)
|
|
}
|
|
|
|
for (const algo in sri) {
|
|
const tmp = await makeTmp(cache, opts)
|
|
const hash = sri[algo].toString()
|
|
try {
|
|
await fs.writeFile(tmp.target, data, { flag: 'wx' })
|
|
await moveToDestination(tmp, cache, hash, opts)
|
|
} finally {
|
|
if (!tmp.moved) {
|
|
await fs.rm(tmp.target, { recursive: true, force: true })
|
|
}
|
|
}
|
|
}
|
|
return { integrity: sri, size: data.length }
|
|
}
|
|
|
|
module.exports.stream = writeStream
|
|
|
|
// writes proxied to the 'inputStream' that is passed to the Promise
|
|
// 'end' is deferred until content is handled.
|
|
class CacacheWriteStream extends Flush {
|
|
constructor (cache, opts) {
|
|
super()
|
|
this.opts = opts
|
|
this.cache = cache
|
|
this.inputStream = new Minipass()
|
|
this.inputStream.on('error', er => this.emit('error', er))
|
|
this.inputStream.on('drain', () => this.emit('drain'))
|
|
this.handleContentP = null
|
|
}
|
|
|
|
write (chunk, encoding, cb) {
|
|
if (!this.handleContentP) {
|
|
this.handleContentP = handleContent(
|
|
this.inputStream,
|
|
this.cache,
|
|
this.opts
|
|
)
|
|
}
|
|
return this.inputStream.write(chunk, encoding, cb)
|
|
}
|
|
|
|
flush (cb) {
|
|
this.inputStream.end(() => {
|
|
if (!this.handleContentP) {
|
|
const e = new Error('Cache input stream was empty')
|
|
e.code = 'ENODATA'
|
|
// empty streams are probably emitting end right away.
|
|
// defer this one tick by rejecting a promise on it.
|
|
return Promise.reject(e).catch(cb)
|
|
}
|
|
// eslint-disable-next-line promise/catch-or-return
|
|
this.handleContentP.then(
|
|
(res) => {
|
|
res.integrity && this.emit('integrity', res.integrity)
|
|
// eslint-disable-next-line promise/always-return
|
|
res.size !== null && this.emit('size', res.size)
|
|
cb()
|
|
},
|
|
(er) => cb(er)
|
|
)
|
|
})
|
|
}
|
|
}
|
|
|
|
function writeStream (cache, opts = {}) {
|
|
return new CacacheWriteStream(cache, opts)
|
|
}
|
|
|
|
async function handleContent (inputStream, cache, opts) {
|
|
const tmp = await makeTmp(cache, opts)
|
|
try {
|
|
const res = await pipeToTmp(inputStream, cache, tmp.target, opts)
|
|
await moveToDestination(
|
|
tmp,
|
|
cache,
|
|
res.integrity,
|
|
opts
|
|
)
|
|
return res
|
|
} finally {
|
|
if (!tmp.moved) {
|
|
await fs.rm(tmp.target, { recursive: true, force: true })
|
|
}
|
|
}
|
|
}
|
|
|
|
async function pipeToTmp (inputStream, cache, tmpTarget, opts) {
|
|
const outStream = new fsm.WriteStream(tmpTarget, {
|
|
flags: 'wx',
|
|
})
|
|
|
|
if (opts.integrityEmitter) {
|
|
// we need to create these all simultaneously since they can fire in any order
|
|
const [integrity, size] = await Promise.all([
|
|
events.once(opts.integrityEmitter, 'integrity').then(res => res[0]),
|
|
events.once(opts.integrityEmitter, 'size').then(res => res[0]),
|
|
new Pipeline(inputStream, outStream).promise(),
|
|
])
|
|
return { integrity, size }
|
|
}
|
|
|
|
let integrity
|
|
let size
|
|
const hashStream = ssri.integrityStream({
|
|
integrity: opts.integrity,
|
|
algorithms: opts.algorithms,
|
|
size: opts.size,
|
|
})
|
|
hashStream.on('integrity', i => {
|
|
integrity = i
|
|
})
|
|
hashStream.on('size', s => {
|
|
size = s
|
|
})
|
|
|
|
const pipeline = new Pipeline(inputStream, hashStream, outStream)
|
|
await pipeline.promise()
|
|
return { integrity, size }
|
|
}
|
|
|
|
async function makeTmp (cache, opts) {
|
|
const tmpTarget = uniqueFilename(path.join(cache, 'tmp'), opts.tmpPrefix)
|
|
await fs.mkdir(path.dirname(tmpTarget), { recursive: true })
|
|
return {
|
|
target: tmpTarget,
|
|
moved: false,
|
|
}
|
|
}
|
|
|
|
async function moveToDestination (tmp, cache, sri, opts) {
|
|
const destination = contentPath(cache, sri)
|
|
const destDir = path.dirname(destination)
|
|
if (moveOperations.has(destination)) {
|
|
return moveOperations.get(destination)
|
|
}
|
|
moveOperations.set(
|
|
destination,
|
|
fs.mkdir(destDir, { recursive: true })
|
|
.then(async () => {
|
|
await moveFile(tmp.target, destination, { overwrite: false })
|
|
tmp.moved = true
|
|
return tmp.moved
|
|
})
|
|
.catch(err => {
|
|
if (!err.message.startsWith('The destination file exists')) {
|
|
throw Object.assign(err, { code: 'EEXIST' })
|
|
}
|
|
}).finally(() => {
|
|
moveOperations.delete(destination)
|
|
})
|
|
|
|
)
|
|
return moveOperations.get(destination)
|
|
}
|
|
|
|
function sizeError (expected, found) {
|
|
/* eslint-disable-next-line max-len */
|
|
const err = new Error(`Bad data size: expected inserted data to be ${expected} bytes, but got ${found} instead`)
|
|
err.expected = expected
|
|
err.found = found
|
|
err.code = 'EBADSIZE'
|
|
return err
|
|
}
|
|
|
|
function checksumError (expected, found) {
|
|
const err = new Error(`Integrity check failed:
|
|
Wanted: ${expected}
|
|
Found: ${found}`)
|
|
err.code = 'EINTEGRITY'
|
|
err.expected = expected
|
|
err.found = found
|
|
return err
|
|
}
|