diff options
Diffstat (limited to 'deps/npm/lib/search/all-package-metadata.js')
-rw-r--r-- | deps/npm/lib/search/all-package-metadata.js | 311 |
1 files changed, 311 insertions, 0 deletions
diff --git a/deps/npm/lib/search/all-package-metadata.js b/deps/npm/lib/search/all-package-metadata.js new file mode 100644 index 0000000000..ce917f5ef0 --- /dev/null +++ b/deps/npm/lib/search/all-package-metadata.js @@ -0,0 +1,311 @@ +'use strict' + +var fs = require('graceful-fs') +var path = require('path') +var mkdir = require('mkdirp') +var chownr = require('chownr') +var npm = require('../npm.js') +var log = require('npmlog') +var cacheFile = require('npm-cache-filename') +var getCacheStat = require('../cache/get-stat.js') +var mapToRegistry = require('../utils/map-to-registry.js') +var jsonstream = require('JSONStream') +var writeStreamAtomic = require('fs-write-stream-atomic') +var ms = require('mississippi') +var sortedUnionStream = require('sorted-union-stream') +var once = require('once') + +// Returns a sorted stream of all package metadata. Internally, takes care of +// maintaining its metadata cache and making partial or full remote requests, +// according to staleness, validity, etc. +// +// The local cache must hold certain invariants: +// 1. It must be a proper JSON object +// 2. It must have its keys lexically sorted +// 3. The first entry must be `_updated` with a millisecond timestamp as a val. +// 4. It must include all entries that exist in the metadata endpoint as of +// the value in `_updated` +module.exports = allPackageMetadata +function allPackageMetadata (staleness) { + var stream = ms.through.obj() + + mapToRegistry('-/all', npm.config, function (er, uri, auth) { + if (er) return stream.emit('error', er) + + var cacheBase = cacheFile(npm.config.get('cache'))(uri) + var cachePath = path.join(cacheBase, '.cache.json') + + createEntryStream(cachePath, uri, auth, staleness, function (err, entryStream, latest, newEntries) { + if (err) return stream.emit('error', err) + log.silly('all-package-metadata', 'entry stream created') + if (entryStream && newEntries) { + createCacheWriteStream(cachePath, latest, function (err, writeStream) { + if (err) return stream.emit('error', err) + log.silly('all-package-metadata', 'output stream created') + ms.pipeline.obj(entryStream, writeStream, stream) + }) + } else if (entryStream) { + ms.pipeline.obj(entryStream, stream) + } else { + stream.emit('error', new Error('No search sources available')) + } + }) + }) + return stream +} + +// Creates a stream of the latest available package metadata. +// Metadata will come from a combination of the local cache and remote data. +module.exports._createEntryStream = createEntryStream +function createEntryStream (cachePath, uri, auth, staleness, cb) { + createCacheEntryStream(cachePath, function (err, cacheStream, cacheLatest) { + cacheLatest = cacheLatest || 0 + if (err) { + log.warn('', 'Failed to read search cache. Rebuilding') + log.silly('all-package-metadata', 'cache read error: ', err) + } + createEntryUpdateStream(uri, auth, staleness, cacheLatest, function (err, updateStream, updatedLatest) { + updatedLatest = updatedLatest || 0 + var latest = updatedLatest || cacheLatest + if (!cacheStream && !updateStream) { + return cb(new Error('No search sources available')) + } + if (err) { + log.warn('', 'Search data request failed, search might be stale') + log.silly('all-package-metadata', 'update request error: ', err) + } + if (cacheStream && updateStream) { + // Deduped, unioned, sorted stream from the combination of both. + cb(null, + createMergedStream(cacheStream, updateStream), + latest, + !!updatedLatest) + } else { + // Either one works if one or the other failed + cb(null, cacheStream || updateStream, latest, !!updatedLatest) + } + }) + }) +} + +// Merges `a` and `b` into one stream, dropping duplicates in favor of entries +// in `b`. Both input streams should already be individually sorted, and the +// returned output stream will have semantics resembling the merge step of a +// plain old merge sort. +module.exports._createMergedStream = createMergedStream +function createMergedStream (a, b) { + linkStreams(a, b) + return sortedUnionStream(b, a, function (pkg) { return pkg.name }) +} + +// Reads the local index and returns a stream that spits out package data. +module.exports._createCacheEntryStream = createCacheEntryStream +function createCacheEntryStream (cacheFile, cb) { + log.verbose('all-package-metadata', 'creating entry stream from local cache') + log.verbose('all-package-metadata', cacheFile) + fs.stat(cacheFile, function (err, stat) { + if (err) return cb(err) + // TODO - This isn't very helpful if `cacheFile` is empty or just `{}` + var entryStream = ms.pipeline.obj( + fs.createReadStream(cacheFile), + jsonstream.parse('*'), + // I believe this passthrough is necessary cause `jsonstream` returns + // weird custom streams that behave funny sometimes. + ms.through.obj() + ) + extractUpdated(entryStream, 'cached-entry-stream', cb) + }) +} + +// Stream of entry updates from the server. If `latest` is `0`, streams the +// entire metadata object from the registry. +module.exports._createEntryUpdateStream = createEntryUpdateStream +function createEntryUpdateStream (all, auth, staleness, latest, cb) { + log.verbose('all-package-metadata', 'creating remote entry stream') + var params = { + timeout: 600, + follow: true, + staleOk: true, + auth: auth, + streaming: true + } + var partialUpdate = false + if (latest && (Date.now() - latest < (staleness * 1000))) { + // Skip the request altogether if our `latest` isn't stale. + log.verbose('all-package-metadata', 'Local data up to date, skipping update') + return cb(null) + } else if (latest === 0) { + log.warn('', 'Building the local index for the first time, please be patient') + log.verbose('all-package-metadata', 'No cached data: requesting full metadata db') + } else { + log.verbose('all-package-metadata', 'Cached data present with timestamp:', latest, 'requesting partial index update') + all += '/since?stale=update_after&startkey=' + latest + partialUpdate = true + } + npm.registry.request(all, params, function (er, res) { + if (er) return cb(er) + log.silly('all-package-metadata', 'request stream opened, code:', res.statusCode) + // NOTE - The stream returned by `request` seems to be very persnickety + // and this is almost a magic incantation to get it to work. + // Modify how `res` is used here at your own risk. + var entryStream = ms.pipeline.obj( + res, + ms.through(function (chunk, enc, cb) { + cb(null, chunk) + }), + jsonstream.parse('*', function (pkg, key) { + if (key[0] === '_updated' || key[0][0] !== '_') { + return pkg + } + }) + ) + if (partialUpdate) { + // The `/all/since` endpoint doesn't return `_updated`, so we + // just use the request's own timestamp. + cb(null, entryStream, Date.parse(res.headers.date)) + } else { + extractUpdated(entryStream, 'entry-update-stream', cb) + } + }) +} + +// Both the (full) remote requests and the local index have `_updated` as their +// first returned entries. This is the "latest" unix timestamp for the metadata +// in question. This code does a bit of juggling with the data streams +// so that we can pretend that field doesn't exist, but still extract `latest` +function extractUpdated (entryStream, label, cb) { + cb = once(cb) + log.silly('all-package-metadata', 'extracting latest') + function nope (msg) { + return function () { + log.warn('all-package-metadata', label, msg) + entryStream.removeAllListeners() + entryStream.destroy() + cb(new Error(msg)) + } + } + var onErr = nope('Failed to read stream') + var onEnd = nope('Empty or invalid stream') + entryStream.on('error', onErr) + entryStream.on('end', onEnd) + entryStream.once('data', function (latest) { + log.silly('all-package-metadata', 'got first stream entry for', label, latest) + entryStream.removeListener('error', onErr) + entryStream.removeListener('end', onEnd) + // Because `.once()` unpauses the stream, we re-pause it after the first + // entry so we don't vomit entries into the void. + entryStream.pause() + if (typeof latest === 'number') { + // The extra pipeline is to return a stream that will implicitly unpause + // after having an `.on('data')` listener attached, since using this + // `data` event broke its initial state. + cb(null, ms.pipeline.obj(entryStream, ms.through.obj()), latest) + } else { + cb(new Error('expected first entry to be _updated')) + } + }) +} + +// Creates a stream that writes input metadata to the current cache. +// Cache updates are atomic, and the stream closes when *everything* is done. +// The stream is also passthrough, so entries going through it will also +// be output from it. +module.exports._createCacheWriteStream = createCacheWriteStream +function createCacheWriteStream (cacheFile, latest, cb) { + _ensureCacheDirExists(cacheFile, function (err) { + if (err) return cb(err) + log.silly('all-package-metadata', 'creating output stream') + var outStream = _createCacheOutStream() + var cacheFileStream = writeStreamAtomic(cacheFile) + var inputStream = _createCacheInStream(cacheFileStream, outStream, latest) + + // Glue together the various streams so they fail together. + // `cacheFileStream` errors are already handled by the `inputStream` + // pipeline + var errEmitted = false + linkStreams(inputStream, outStream, function () { errEmitted = true }) + + cacheFileStream.on('close', function () { !errEmitted && outStream.end() }) + + cb(null, ms.duplex.obj(inputStream, outStream)) + }) +} + +function _ensureCacheDirExists (cacheFile, cb) { + var cacheBase = path.dirname(cacheFile) + log.silly('all-package-metadata', 'making sure cache dir exists at', cacheBase) + getCacheStat(function (er, st) { + if (er) return cb(er) + mkdir(cacheBase, function (er, made) { + if (er) return cb(er) + chownr(made || cacheBase, st.uid, st.gid, cb) + }) + }) +} + +function _createCacheOutStream () { + return ms.pipeline.obj( + // These two passthrough `through` streams compensate for some + // odd behavior with `jsonstream`. + ms.through(), + jsonstream.parse('*', function (obj, key) { + // This stream happens to get _updated passed through it, for + // implementation reasons. We make sure to filter it out cause + // the fact that it comes t + if (typeof obj === 'object') { + return obj + } + }), + ms.through.obj() + ) +} + +function _createCacheInStream (writer, outStream, latest) { + var updatedWritten = false + var inStream = ms.pipeline.obj( + ms.through.obj(function (pkg, enc, cb) { + if (!updatedWritten && typeof pkg === 'number') { + // This is the `_updated` value getting sent through. + updatedWritten = true + return cb(null, ['_updated', pkg]) + } else if (typeof pkg !== 'object') { + this.emit('error', new Error('invalid value written to input stream')) + } else { + // The [key, val] format is expected by `jsonstream` for object writing + cb(null, [pkg.name, pkg]) + } + }), + jsonstream.stringifyObject('{', ',', '}'), + ms.through(function (chunk, enc, cb) { + // This tees off the buffer data to `outStream`, and then continues + // the pipeline as usual + outStream.write(chunk, enc, function () { + cb(null, chunk) + }) + }), + // And finally, we write to the cache file. + writer + ) + inStream.write(latest) + return inStream +} + +// Links errors between `a` and `b`, preventing cycles, and calls `cb` if +// an error happens, once per error. +function linkStreams (a, b, cb) { + var lastError = null + a.on('error', function (err) { + if (err !== lastError) { + lastError = err + b.emit('error', err) + cb(err) + } + }) + b.on('error', function (err) { + if (err !== lastError) { + lastError = err + a.emit('error', err) + cb(err) + } + }) +} |