Welcome to mirror list, hosted at ThFree Co, Russian Federation.

github.com/npm/cli.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMichael Perrotte <mike@npmjs.com>2020-03-03 03:50:01 +0300
committerisaacs <i@izs.me>2020-05-08 04:11:53 +0300
commit504384bbdc8b51d311f79da2d41d37f2e70ba933 (patch)
tree7b6b56a7c8ca97e902ac08552f76aaa24d2a275b /lib/search
parentcd754bc9d90cfdb0e78c034ec604d9259bbfda53 (diff)
fix: updated usage of libnpmsearch
- Removed figgy-pudding, bluebird, mississippi. - Refactor configs so that we have all the search related options grouped under a npm.flatOptions.search object. - Remove the old slow "all docs" search that doesn't work on the public registry anyway.
Diffstat (limited to 'lib/search')
-rw-r--r--lib/search/all-package-metadata.js325
-rw-r--r--lib/search/all-package-search.js50
-rw-r--r--lib/search/format-package-stream.js60
3 files changed, 38 insertions, 397 deletions
diff --git a/lib/search/all-package-metadata.js b/lib/search/all-package-metadata.js
deleted file mode 100644
index 5a6064401..000000000
--- a/lib/search/all-package-metadata.js
+++ /dev/null
@@ -1,325 +0,0 @@
-'use strict'
-
-const BB = require('bluebird')
-
-const cacheFile = require('npm-cache-filename')
-const chownr = BB.promisify(require('chownr'))
-const correctMkdir = BB.promisify(require('../utils/correct-mkdir.js'))
-const figgyPudding = require('figgy-pudding')
-const fs = require('graceful-fs')
-const JSONStream = require('JSONStream')
-const log = require('npmlog')
-const mkdir = BB.promisify(require('gentle-fs').mkdir)
-const ms = require('mississippi')
-const npmFetch = require('npm-registry-fetch')
-const path = require('path')
-const sortedUnionStream = require('sorted-union-stream')
-const url = require('url')
-const writeStreamAtomic = require('fs-write-stream-atomic')
-
-const statAsync = BB.promisify(fs.stat)
-
-const APMOpts = figgyPudding({
- cache: {},
- registry: {}
-})
-// Returns a sorted stream of all package metadata. Internally, takes care of
-// maintaining its metadata cache and making partial or full remote requests,
-// according to staleness, validity, etc.
-//
-// The local cache must hold certain invariants:
-// 1. It must be a proper JSON object
-// 2. It must have its keys lexically sorted
-// 3. The first entry must be `_updated` with a millisecond timestamp as a val.
-// 4. It must include all entries that exist in the metadata endpoint as of
-// the value in `_updated`
-module.exports = allPackageMetadata
-function allPackageMetadata (opts) {
- const staleness = opts.staleness
- const stream = ms.through.obj()
-
- opts = APMOpts(opts)
- const cacheBase = cacheFile(path.resolve(path.dirname(opts.cache)))(url.resolve(opts.registry, '/-/all'))
- const cachePath = path.join(cacheBase, '.cache.json')
- createEntryStream(
- cachePath, staleness, opts
- ).then(({entryStream, latest, newEntries}) => {
- log.silly('all-package-metadata', 'entry stream created')
- if (entryStream && newEntries) {
- return createCacheWriteStream(cachePath, latest, opts).then(writer => {
- log.silly('all-package-metadata', 'output stream created')
- ms.pipeline.obj(entryStream, writer, stream)
- })
- } else if (entryStream) {
- ms.pipeline.obj(entryStream, stream)
- } else {
- stream.emit('error', new Error('No search sources available'))
- }
- }).catch(err => stream.emit('error', err))
- return stream
-}
-
-// Creates a stream of the latest available package metadata.
-// Metadata will come from a combination of the local cache and remote data.
-module.exports._createEntryStream = createEntryStream
-function createEntryStream (cachePath, staleness, opts) {
- return createCacheEntryStream(
- cachePath, opts
- ).catch(err => {
- log.warn('', 'Failed to read search cache. Rebuilding')
- log.silly('all-package-metadata', 'cache read error: ', err)
- return {}
- }).then(({
- updateStream: cacheStream,
- updatedLatest: cacheLatest
- }) => {
- cacheLatest = cacheLatest || 0
- return createEntryUpdateStream(staleness, cacheLatest, opts).catch(err => {
- log.warn('', 'Search data request failed, search might be stale')
- log.silly('all-package-metadata', 'update request error: ', err)
- return {}
- }).then(({updateStream, updatedLatest}) => {
- updatedLatest = updatedLatest || 0
- const latest = updatedLatest || cacheLatest
- if (!cacheStream && !updateStream) {
- throw new Error('No search sources available')
- }
- if (cacheStream && updateStream) {
- // Deduped, unioned, sorted stream from the combination of both.
- return {
- entryStream: createMergedStream(cacheStream, updateStream),
- latest,
- newEntries: !!updatedLatest
- }
- } else {
- // Either one works if one or the other failed
- return {
- entryStream: cacheStream || updateStream,
- latest,
- newEntries: !!updatedLatest
- }
- }
- })
- })
-}
-
-// Merges `a` and `b` into one stream, dropping duplicates in favor of entries
-// in `b`. Both input streams should already be individually sorted, and the
-// returned output stream will have semantics resembling the merge step of a
-// plain old merge sort.
-module.exports._createMergedStream = createMergedStream
-function createMergedStream (a, b) {
- linkStreams(a, b)
- return sortedUnionStream(b, a, ({name}) => name)
-}
-
-// Reads the local index and returns a stream that spits out package data.
-module.exports._createCacheEntryStream = createCacheEntryStream
-function createCacheEntryStream (cacheFile, opts) {
- log.verbose('all-package-metadata', 'creating entry stream from local cache')
- log.verbose('all-package-metadata', cacheFile)
- return statAsync(cacheFile).then(stat => {
- // TODO - This isn't very helpful if `cacheFile` is empty or just `{}`
- const entryStream = ms.pipeline.obj(
- fs.createReadStream(cacheFile),
- JSONStream.parse('*'),
- // I believe this passthrough is necessary cause `jsonstream` returns
- // weird custom streams that behave funny sometimes.
- ms.through.obj()
- )
- return extractUpdated(entryStream, 'cached-entry-stream', opts)
- })
-}
-
-// Stream of entry updates from the server. If `latest` is `0`, streams the
-// entire metadata object from the registry.
-module.exports._createEntryUpdateStream = createEntryUpdateStream
-function createEntryUpdateStream (staleness, latest, opts) {
- log.verbose('all-package-metadata', 'creating remote entry stream')
- let partialUpdate = false
- let uri = '/-/all'
- if (latest && (Date.now() - latest < (staleness * 1000))) {
- // Skip the request altogether if our `latest` isn't stale.
- log.verbose('all-package-metadata', 'Local data up to date, skipping update')
- return BB.resolve({})
- } else if (latest === 0) {
- log.warn('', 'Building the local index for the first time, please be patient')
- log.verbose('all-package-metadata', 'No cached data: requesting full metadata db')
- } else {
- log.verbose('all-package-metadata', 'Cached data present with timestamp:', latest, 'requesting partial index update')
- uri += '/since?stale=update_after&startkey=' + latest
- partialUpdate = true
- }
- return npmFetch(uri, opts).then(res => {
- log.silly('all-package-metadata', 'request stream opened, code:', res.statusCode)
- let entryStream = ms.pipeline.obj(
- res.body,
- JSONStream.parse('*', (pkg, key) => {
- if (key[0] === '_updated' || key[0][0] !== '_') {
- return pkg
- }
- })
- )
- if (partialUpdate) {
- // The `/all/since` endpoint doesn't return `_updated`, so we
- // just use the request's own timestamp.
- return {
- updateStream: entryStream,
- updatedLatest: Date.parse(res.headers.get('date'))
- }
- } else {
- return extractUpdated(entryStream, 'entry-update-stream', opts)
- }
- })
-}
-
-// Both the (full) remote requests and the local index have `_updated` as their
-// first returned entries. This is the "latest" unix timestamp for the metadata
-// in question. This code does a bit of juggling with the data streams
-// so that we can pretend that field doesn't exist, but still extract `latest`
-function extractUpdated (entryStream, label, opts) {
- log.silly('all-package-metadata', 'extracting latest')
- return new BB((resolve, reject) => {
- function nope (msg) {
- return function () {
- log.warn('all-package-metadata', label, msg)
- entryStream.removeAllListeners()
- entryStream.destroy()
- reject(new Error(msg))
- }
- }
- const onErr = nope('Failed to read stream')
- const onEnd = nope('Empty or invalid stream')
- entryStream.on('error', onErr)
- entryStream.on('end', onEnd)
- entryStream.once('data', latest => {
- log.silly('all-package-metadata', 'got first stream entry for', label, latest)
- entryStream.removeListener('error', onErr)
- entryStream.removeListener('end', onEnd)
- if (typeof latest === 'number') {
- // The extra pipeline is to return a stream that will implicitly unpause
- // after having an `.on('data')` listener attached, since using this
- // `data` event broke its initial state.
- resolve({
- updateStream: entryStream.pipe(ms.through.obj()),
- updatedLatest: latest
- })
- } else {
- reject(new Error('expected first entry to be _updated'))
- }
- })
- })
-}
-
-// Creates a stream that writes input metadata to the current cache.
-// Cache updates are atomic, and the stream closes when *everything* is done.
-// The stream is also passthrough, so entries going through it will also
-// be output from it.
-module.exports._createCacheWriteStream = createCacheWriteStream
-function createCacheWriteStream (cacheFile, latest, opts) {
- return _ensureCacheDirExists(cacheFile, opts).then(({uid, gid}) => {
- log.silly('all-package-metadata', 'creating output stream')
- const outStream = _createCacheOutStream()
- const cacheFileStream = writeStreamAtomic(cacheFile)
- const inputStream = _createCacheInStream(
- cacheFileStream, outStream, latest
- )
-
- // Glue together the various streams so they fail together.
- // `cacheFileStream` errors are already handled by the `inputStream`
- // pipeline
- let errEmitted = false
- linkStreams(inputStream, outStream, () => { errEmitted = true })
-
- cacheFileStream.on('close', () => {
- if (!errEmitted) {
- if (typeof uid === 'number' &&
- typeof gid === 'number' &&
- process.getuid &&
- process.getgid &&
- (process.getuid() !== uid || process.getgid() !== gid)) {
- chownr.sync(cacheFile, uid, gid)
- }
- outStream.end()
- }
- })
-
- return ms.duplex.obj(inputStream, outStream)
- })
-}
-
-// return the {uid,gid} that the cache should have
-function _ensureCacheDirExists (cacheFile, opts) {
- var cacheBase = path.dirname(cacheFile)
- log.silly('all-package-metadata', 'making sure cache dir exists at', cacheBase)
- return correctMkdir(opts.cache).then(st => {
- return mkdir(cacheBase).then(made => {
- return chownr(made || cacheBase, st.uid, st.gid)
- }).then(() => ({ uid: st.uid, gid: st.gid }))
- })
-}
-
-function _createCacheOutStream () {
- // NOTE: this looks goofy, but it's necessary in order to get
- // JSONStream to play nice with the rest of everything.
- return ms.pipeline.obj(
- ms.through(),
- JSONStream.parse('*', (obj, key) => {
- // This stream happens to get _updated passed through it, for
- // implementation reasons. We make sure to filter it out cause
- // the fact that it comes t
- if (typeof obj === 'object') {
- return obj
- }
- }),
- ms.through.obj()
- )
-}
-
-function _createCacheInStream (writer, outStream, latest) {
- let updatedWritten = false
- const inStream = ms.pipeline.obj(
- ms.through.obj((pkg, enc, cb) => {
- if (!updatedWritten && typeof pkg === 'number') {
- // This is the `_updated` value getting sent through.
- updatedWritten = true
- return cb(null, ['_updated', pkg])
- } else if (typeof pkg !== 'object') {
- this.emit('error', new Error('invalid value written to input stream'))
- } else {
- // The [key, val] format is expected by `jsonstream` for object writing
- cb(null, [pkg.name, pkg])
- }
- }),
- JSONStream.stringifyObject('{', ',', '}'),
- ms.through((chunk, enc, cb) => {
- // This tees off the buffer data to `outStream`, and then continues
- // the pipeline as usual
- outStream.write(chunk, enc, () => cb(null, chunk))
- }),
- // And finally, we write to the cache file.
- writer
- )
- inStream.write(latest)
- return inStream
-}
-
-// Links errors between `a` and `b`, preventing cycles, and calls `cb` if
-// an error happens, once per error.
-function linkStreams (a, b, cb) {
- var lastError = null
- a.on('error', function (err) {
- if (err !== lastError) {
- lastError = err
- b.emit('error', err)
- cb && cb(err)
- }
- })
- b.on('error', function (err) {
- if (err !== lastError) {
- lastError = err
- a.emit('error', err)
- cb && cb(err)
- }
- })
-}
diff --git a/lib/search/all-package-search.js b/lib/search/all-package-search.js
deleted file mode 100644
index fef343bcb..000000000
--- a/lib/search/all-package-search.js
+++ /dev/null
@@ -1,50 +0,0 @@
-var ms = require('mississippi')
-var allPackageMetadata = require('./all-package-metadata')
-var packageFilter = require('./package-filter.js')
-
-module.exports = allPackageSearch
-function allPackageSearch (opts) {
- var searchSection = (opts.unicode ? '🤔 ' : '') + 'search'
-
- // Get a stream with *all* the packages. This takes care of dealing
- // with the local cache as well, but that's an internal detail.
- var allEntriesStream = allPackageMetadata(opts)
-
- // Grab a stream that filters those packages according to given params.
- var filterStream = streamFilter(function (pkg) {
- opts.log.gauge.pulse('search')
- opts.log.gauge.show({section: searchSection, logline: 'scanning ' + pkg.name})
- // Simply 'true' if the package matches search parameters.
- var match = packageFilter(pkg, opts.include, opts.exclude, {
- description: opts.description
- })
- return match
- })
- return ms.pipeline.obj(allEntriesStream, filterStream)
-}
-
-function streamFilter (filter) {
- return ms.through.obj(function (data, enc, cb) {
- if (filter(data)) {
- this.push(standardizePkg(data))
- }
- cb()
- })
-}
-
-function standardizePkg (data) {
- return {
- name: data.name,
- description: data.description,
- maintainers: (data.maintainers || []).map(function (m) {
- return { username: m.name, email: m.email }
- }),
- keywords: data.keywords || [],
- version: Object.keys(data.versions || {})[0] || [],
- date: (
- data.time &&
- data.time.modified &&
- new Date(data.time.modified)
- ) || null
- }
-}
diff --git a/lib/search/format-package-stream.js b/lib/search/format-package-stream.js
index bb0f552ba..0a013e3de 100644
--- a/lib/search/format-package-stream.js
+++ b/lib/search/format-package-stream.js
@@ -1,8 +1,11 @@
'use strict'
-var ms = require('mississippi')
-var jsonstream = require('JSONStream')
-var columnify = require('columnify')
+// XXX these output classes should not live in here forever. it'd be good to
+// split them out, perhaps to libnpmsearch
+
+const Pipeline = require('minipass-pipeline')
+const Minipass = require('minipass')
+const columnify = require('columnify')
// This module consumes package data in the following format:
//
@@ -18,29 +21,42 @@ var columnify = require('columnify')
// The returned stream will format this package data
// into a byte stream of formatted, displayable output.
-module.exports = formatPackageStream
-function formatPackageStream (opts) {
- opts = opts || {}
- if (opts.json) {
- return jsonOutputStream()
- } else {
- return textOutputStream(opts)
+module.exports = (opts = {}) =>
+ opts.json ? new JSONOutputStream() : new TextOutputStream(opts)
+
+class JSONOutputStream extends Minipass {
+ constructor () {
+ super()
+ this._didFirst = false
+ }
+ write (obj) {
+ if (!this._didFirst) {
+ super.write('[\n')
+ this._didFirst = true
+ } else {
+ super.write('\n,\n')
+ }
+ try {
+ return super.write(JSON.stringify(obj))
+ } catch (er) {
+ return this.emit('error', er)
+ }
+ }
+ end () {
+ super.write(this._didFirst ? ']\n' : '\n]\n')
}
}
-function jsonOutputStream () {
- return ms.pipeline.obj(
- ms.through.obj(),
- jsonstream.stringify('[', ',', ']'),
- ms.through()
- )
-}
+class TextOutputStream extends Minipass {
+ constructor (opts) {
+ super()
+ this._opts = opts
+ this._line = 0
+ }
-function textOutputStream (opts) {
- var line = 0
- return ms.through.obj(function (pkg, enc, cb) {
- cb(null, prettify(pkg, ++line, opts))
- })
+ write (pkg) {
+ return super.write(prettify(pkg, ++this._line, this._opts))
+ }
}
function prettify (data, num, opts) {