summaryrefslogtreecommitdiff
path: root/deps/npm/lib/search/all-package-metadata.js
diff options
context:
space:
mode:
Diffstat (limited to 'deps/npm/lib/search/all-package-metadata.js')
-rw-r--r--deps/npm/lib/search/all-package-metadata.js290
1 files changed, 145 insertions, 145 deletions
diff --git a/deps/npm/lib/search/all-package-metadata.js b/deps/npm/lib/search/all-package-metadata.js
index 5a27bdbcee..5883def5c7 100644
--- a/deps/npm/lib/search/all-package-metadata.js
+++ b/deps/npm/lib/search/all-package-metadata.js
@@ -1,21 +1,28 @@
'use strict'
-var fs = require('graceful-fs')
-var path = require('path')
-var mkdir = require('mkdirp')
-var chownr = require('chownr')
-var npm = require('../npm.js')
-var log = require('npmlog')
-var cacheFile = require('npm-cache-filename')
-var correctMkdir = require('../utils/correct-mkdir.js')
-var mapToRegistry = require('../utils/map-to-registry.js')
-var jsonstream = require('JSONStream')
-var writeStreamAtomic = require('fs-write-stream-atomic')
-var ms = require('mississippi')
-var sortedUnionStream = require('sorted-union-stream')
-var once = require('once')
-var gunzip = require('../utils/gunzip-maybe')
+const BB = require('bluebird')
+const cacheFile = require('npm-cache-filename')
+const chownr = BB.promisify(require('chownr'))
+const correctMkdir = BB.promisify(require('../utils/correct-mkdir.js'))
+const figgyPudding = require('figgy-pudding')
+const fs = require('graceful-fs')
+const JSONStream = require('JSONStream')
+const log = require('npmlog')
+const mkdir = BB.promisify(require('mkdirp'))
+const ms = require('mississippi')
+const npmFetch = require('libnpm/fetch')
+const path = require('path')
+const sortedUnionStream = require('sorted-union-stream')
+const url = require('url')
+const writeStreamAtomic = require('fs-write-stream-atomic')
+
+const statAsync = BB.promisify(fs.stat)
+
+const APMOpts = figgyPudding({
+ cache: {},
+ registry: {}
+})
// Returns a sorted stream of all package metadata. Internally, takes care of
// maintaining its metadata cache and making partial or full remote requests,
// according to staleness, validity, etc.
@@ -27,63 +34,70 @@ var gunzip = require('../utils/gunzip-maybe')
// 4. It must include all entries that exist in the metadata endpoint as of
// the value in `_updated`
module.exports = allPackageMetadata
-function allPackageMetadata (staleness) {
- var stream = ms.through.obj()
-
- mapToRegistry('-/all', npm.config, function (er, uri, auth) {
- if (er) return stream.emit('error', er)
-
- var cacheBase = cacheFile(npm.config.get('cache'))(uri)
- var cachePath = path.join(cacheBase, '.cache.json')
+function allPackageMetadata (opts) {
+ const staleness = opts.staleness
+ const stream = ms.through.obj()
- createEntryStream(cachePath, uri, auth, staleness, function (err, entryStream, latest, newEntries) {
- if (err) return stream.emit('error', err)
- log.silly('all-package-metadata', 'entry stream created')
- if (entryStream && newEntries) {
- createCacheWriteStream(cachePath, latest, function (err, writeStream) {
- if (err) return stream.emit('error', err)
- log.silly('all-package-metadata', 'output stream created')
- ms.pipeline.obj(entryStream, writeStream, stream)
- })
- } else if (entryStream) {
- ms.pipeline.obj(entryStream, stream)
- } else {
- stream.emit('error', new Error('No search sources available'))
- }
- })
- })
+ opts = APMOpts(opts)
+ const cacheBase = cacheFile(path.resolve(path.dirname(opts.cache)))(url.resolve(opts.registry, '/-/all'))
+ const cachePath = path.join(cacheBase, '.cache.json')
+ createEntryStream(
+ cachePath, staleness, opts
+ ).then(({entryStream, latest, newEntries}) => {
+ log.silly('all-package-metadata', 'entry stream created')
+ if (entryStream && newEntries) {
+ return createCacheWriteStream(cachePath, latest, opts).then(writer => {
+ log.silly('all-package-metadata', 'output stream created')
+ ms.pipeline.obj(entryStream, writer, stream)
+ })
+ } else if (entryStream) {
+ ms.pipeline.obj(entryStream, stream)
+ } else {
+ stream.emit('error', new Error('No search sources available'))
+ }
+ }).catch(err => stream.emit('error', err))
return stream
}
// Creates a stream of the latest available package metadata.
// Metadata will come from a combination of the local cache and remote data.
module.exports._createEntryStream = createEntryStream
-function createEntryStream (cachePath, uri, auth, staleness, cb) {
- createCacheEntryStream(cachePath, function (err, cacheStream, cacheLatest) {
+function createEntryStream (cachePath, staleness, opts) {
+ return createCacheEntryStream(
+ cachePath, opts
+ ).catch(err => {
+ log.warn('', 'Failed to read search cache. Rebuilding')
+ log.silly('all-package-metadata', 'cache read error: ', err)
+ return {}
+ }).then(({
+ updateStream: cacheStream,
+ updatedLatest: cacheLatest
+ }) => {
cacheLatest = cacheLatest || 0
- if (err) {
- log.warn('', 'Failed to read search cache. Rebuilding')
- log.silly('all-package-metadata', 'cache read error: ', err)
- }
- createEntryUpdateStream(uri, auth, staleness, cacheLatest, function (err, updateStream, updatedLatest) {
+ return createEntryUpdateStream(staleness, cacheLatest, opts).catch(err => {
+ log.warn('', 'Search data request failed, search might be stale')
+ log.silly('all-package-metadata', 'update request error: ', err)
+ return {}
+ }).then(({updateStream, updatedLatest}) => {
updatedLatest = updatedLatest || 0
- var latest = updatedLatest || cacheLatest
+ const latest = updatedLatest || cacheLatest
if (!cacheStream && !updateStream) {
- return cb(new Error('No search sources available'))
- }
- if (err) {
- log.warn('', 'Search data request failed, search might be stale')
- log.silly('all-package-metadata', 'update request error: ', err)
+ throw new Error('No search sources available')
}
if (cacheStream && updateStream) {
// Deduped, unioned, sorted stream from the combination of both.
- cb(null,
- createMergedStream(cacheStream, updateStream),
+ return {
+ entryStream: createMergedStream(cacheStream, updateStream),
latest,
- !!updatedLatest)
+ newEntries: !!updatedLatest
+ }
} else {
// Either one works if one or the other failed
- cb(null, cacheStream || updateStream, latest, !!updatedLatest)
+ return {
+ entryStream: cacheStream || updateStream,
+ latest,
+ newEntries: !!updatedLatest
+ }
}
})
})
@@ -96,66 +110,51 @@ function createEntryStream (cachePath, uri, auth, staleness, cb) {
module.exports._createMergedStream = createMergedStream
function createMergedStream (a, b) {
linkStreams(a, b)
- return sortedUnionStream(b, a, function (pkg) { return pkg.name })
+ return sortedUnionStream(b, a, ({name}) => name)
}
// Reads the local index and returns a stream that spits out package data.
module.exports._createCacheEntryStream = createCacheEntryStream
-function createCacheEntryStream (cacheFile, cb) {
+function createCacheEntryStream (cacheFile, opts) {
log.verbose('all-package-metadata', 'creating entry stream from local cache')
log.verbose('all-package-metadata', cacheFile)
- fs.stat(cacheFile, function (err, stat) {
- if (err) return cb(err)
+ return statAsync(cacheFile).then(stat => {
// TODO - This isn't very helpful if `cacheFile` is empty or just `{}`
- var entryStream = ms.pipeline.obj(
+ const entryStream = ms.pipeline.obj(
fs.createReadStream(cacheFile),
- jsonstream.parse('*'),
+ JSONStream.parse('*'),
// I believe this passthrough is necessary cause `jsonstream` returns
// weird custom streams that behave funny sometimes.
ms.through.obj()
)
- extractUpdated(entryStream, 'cached-entry-stream', cb)
+ return extractUpdated(entryStream, 'cached-entry-stream', opts)
})
}
// Stream of entry updates from the server. If `latest` is `0`, streams the
// entire metadata object from the registry.
module.exports._createEntryUpdateStream = createEntryUpdateStream
-function createEntryUpdateStream (all, auth, staleness, latest, cb) {
+function createEntryUpdateStream (staleness, latest, opts) {
log.verbose('all-package-metadata', 'creating remote entry stream')
- var params = {
- timeout: 600,
- follow: true,
- staleOk: true,
- auth: auth,
- streaming: true
- }
- var partialUpdate = false
+ let partialUpdate = false
+ let uri = '/-/all'
if (latest && (Date.now() - latest < (staleness * 1000))) {
// Skip the request altogether if our `latest` isn't stale.
log.verbose('all-package-metadata', 'Local data up to date, skipping update')
- return cb(null)
+ return BB.resolve({})
} else if (latest === 0) {
log.warn('', 'Building the local index for the first time, please be patient')
log.verbose('all-package-metadata', 'No cached data: requesting full metadata db')
} else {
log.verbose('all-package-metadata', 'Cached data present with timestamp:', latest, 'requesting partial index update')
- all += '/since?stale=update_after&startkey=' + latest
+ uri += '/since?stale=update_after&startkey=' + latest
partialUpdate = true
}
- npm.registry.request(all, params, function (er, res) {
- if (er) return cb(er)
+ return npmFetch(uri, opts).then(res => {
log.silly('all-package-metadata', 'request stream opened, code:', res.statusCode)
- // NOTE - The stream returned by `request` seems to be very persnickety
- // and this is almost a magic incantation to get it to work.
- // Modify how `res` is used here at your own risk.
- var entryStream = ms.pipeline.obj(
- res,
- ms.through(function (chunk, enc, cb) {
- cb(null, chunk)
- }),
- gunzip(),
- jsonstream.parse('*', function (pkg, key) {
+ let entryStream = ms.pipeline.obj(
+ res.body,
+ JSONStream.parse('*', (pkg, key) => {
if (key[0] === '_updated' || key[0][0] !== '_') {
return pkg
}
@@ -164,9 +163,12 @@ function createEntryUpdateStream (all, auth, staleness, latest, cb) {
if (partialUpdate) {
// The `/all/since` endpoint doesn't return `_updated`, so we
// just use the request's own timestamp.
- cb(null, entryStream, Date.parse(res.headers.date))
+ return {
+ updateStream: entryStream,
+ updatedLatest: Date.parse(res.headers.get('date'))
+ }
} else {
- extractUpdated(entryStream, 'entry-update-stream', cb)
+ return extractUpdated(entryStream, 'entry-update-stream', opts)
}
})
}
@@ -175,36 +177,37 @@ function createEntryUpdateStream (all, auth, staleness, latest, cb) {
// first returned entries. This is the "latest" unix timestamp for the metadata
// in question. This code does a bit of juggling with the data streams
// so that we can pretend that field doesn't exist, but still extract `latest`
-function extractUpdated (entryStream, label, cb) {
- cb = once(cb)
+function extractUpdated (entryStream, label, opts) {
log.silly('all-package-metadata', 'extracting latest')
- function nope (msg) {
- return function () {
- log.warn('all-package-metadata', label, msg)
- entryStream.removeAllListeners()
- entryStream.destroy()
- cb(new Error(msg))
- }
- }
- var onErr = nope('Failed to read stream')
- var onEnd = nope('Empty or invalid stream')
- entryStream.on('error', onErr)
- entryStream.on('end', onEnd)
- entryStream.once('data', function (latest) {
- log.silly('all-package-metadata', 'got first stream entry for', label, latest)
- entryStream.removeListener('error', onErr)
- entryStream.removeListener('end', onEnd)
- // Because `.once()` unpauses the stream, we re-pause it after the first
- // entry so we don't vomit entries into the void.
- entryStream.pause()
- if (typeof latest === 'number') {
- // The extra pipeline is to return a stream that will implicitly unpause
- // after having an `.on('data')` listener attached, since using this
- // `data` event broke its initial state.
- cb(null, ms.pipeline.obj(entryStream, ms.through.obj()), latest)
- } else {
- cb(new Error('expected first entry to be _updated'))
+ return new BB((resolve, reject) => {
+ function nope (msg) {
+ return function () {
+ log.warn('all-package-metadata', label, msg)
+ entryStream.removeAllListeners()
+ entryStream.destroy()
+ reject(new Error(msg))
+ }
}
+ const onErr = nope('Failed to read stream')
+ const onEnd = nope('Empty or invalid stream')
+ entryStream.on('error', onErr)
+ entryStream.on('end', onEnd)
+ entryStream.once('data', latest => {
+ log.silly('all-package-metadata', 'got first stream entry for', label, latest)
+ entryStream.removeListener('error', onErr)
+ entryStream.removeListener('end', onEnd)
+ if (typeof latest === 'number') {
+ // The extra pipeline is to return a stream that will implicitly unpause
+ // after having an `.on('data')` listener attached, since using this
+ // `data` event broke its initial state.
+ resolve({
+ updateStream: entryStream.pipe(ms.through.obj()),
+ updatedLatest: latest
+ })
+ } else {
+ reject(new Error('expected first entry to be _updated'))
+ }
+ })
})
}
@@ -213,44 +216,43 @@ function extractUpdated (entryStream, label, cb) {
// The stream is also passthrough, so entries going through it will also
// be output from it.
module.exports._createCacheWriteStream = createCacheWriteStream
-function createCacheWriteStream (cacheFile, latest, cb) {
- _ensureCacheDirExists(cacheFile, function (err) {
- if (err) return cb(err)
+function createCacheWriteStream (cacheFile, latest, opts) {
+ return _ensureCacheDirExists(cacheFile, opts).then(() => {
log.silly('all-package-metadata', 'creating output stream')
- var outStream = _createCacheOutStream()
- var cacheFileStream = writeStreamAtomic(cacheFile)
- var inputStream = _createCacheInStream(cacheFileStream, outStream, latest)
+ const outStream = _createCacheOutStream()
+ const cacheFileStream = writeStreamAtomic(cacheFile)
+ const inputStream = _createCacheInStream(
+ cacheFileStream, outStream, latest
+ )
// Glue together the various streams so they fail together.
// `cacheFileStream` errors are already handled by the `inputStream`
// pipeline
- var errEmitted = false
- linkStreams(inputStream, outStream, function () { errEmitted = true })
+ let errEmitted = false
+ linkStreams(inputStream, outStream, () => { errEmitted = true })
- cacheFileStream.on('close', function () { !errEmitted && outStream.end() })
+ cacheFileStream.on('close', () => !errEmitted && outStream.end())
- cb(null, ms.duplex.obj(inputStream, outStream))
+ return ms.duplex.obj(inputStream, outStream)
})
}
-function _ensureCacheDirExists (cacheFile, cb) {
+function _ensureCacheDirExists (cacheFile, opts) {
var cacheBase = path.dirname(cacheFile)
log.silly('all-package-metadata', 'making sure cache dir exists at', cacheBase)
- correctMkdir(npm.cache, function (er, st) {
- if (er) return cb(er)
- mkdir(cacheBase, function (er, made) {
- if (er) return cb(er)
- chownr(made || cacheBase, st.uid, st.gid, cb)
+ return correctMkdir(opts.cache).then(st => {
+ return mkdir(cacheBase).then(made => {
+ return chownr(made || cacheBase, st.uid, st.gid)
})
})
}
function _createCacheOutStream () {
+ // NOTE: this looks goofy, but it's necessary in order to get
+ // JSONStream to play nice with the rest of everything.
return ms.pipeline.obj(
- // These two passthrough `through` streams compensate for some
- // odd behavior with `jsonstream`.
ms.through(),
- jsonstream.parse('*', function (obj, key) {
+ JSONStream.parse('*', (obj, key) => {
// This stream happens to get _updated passed through it, for
// implementation reasons. We make sure to filter it out cause
// the fact that it comes t
@@ -263,9 +265,9 @@ function _createCacheOutStream () {
}
function _createCacheInStream (writer, outStream, latest) {
- var updatedWritten = false
- var inStream = ms.pipeline.obj(
- ms.through.obj(function (pkg, enc, cb) {
+ let updatedWritten = false
+ const inStream = ms.pipeline.obj(
+ ms.through.obj((pkg, enc, cb) => {
if (!updatedWritten && typeof pkg === 'number') {
// This is the `_updated` value getting sent through.
updatedWritten = true
@@ -277,13 +279,11 @@ function _createCacheInStream (writer, outStream, latest) {
cb(null, [pkg.name, pkg])
}
}),
- jsonstream.stringifyObject('{', ',', '}'),
- ms.through(function (chunk, enc, cb) {
+ JSONStream.stringifyObject('{', ',', '}'),
+ ms.through((chunk, enc, cb) => {
// This tees off the buffer data to `outStream`, and then continues
// the pipeline as usual
- outStream.write(chunk, enc, function () {
- cb(null, chunk)
- })
+ outStream.write(chunk, enc, () => cb(null, chunk))
}),
// And finally, we write to the cache file.
writer
@@ -300,14 +300,14 @@ function linkStreams (a, b, cb) {
if (err !== lastError) {
lastError = err
b.emit('error', err)
- cb(err)
+ cb && cb(err)
}
})
b.on('error', function (err) {
if (err !== lastError) {
lastError = err
a.emit('error', err)
- cb(err)
+ cb && cb(err)
}
})
}