src/icarus/backend/replicator.js
import {EventEmitter} from 'events'
import ExponentialBackoff from '../../lib/backoff-wrapper'
import {getRemoteDB} from '../../lib/db'
/**
* Handles replication between a local and remote PouchDB database.
* Keeps track of state, uses an {@link ExponentialBackoff} to prevent
* connection saturation, serving as a wrapper over PouchDB replication.
*
* The source database is set once in the constructor and remains unchanged
* and unchangeable during operation.
*/
export default class Replicator extends EventEmitter {
/**
* @param {Logger} logger Bunyan logger instance.
* @param {PouchDB} sourceDB Database to be replicated.
*/
constructor(logger, sourceDB) {
super()
/**
* Logger instance.
* @type {Bunyan}
*/
this._log = logger
/**
* Replicator status.
* @type {String}
*/
this._state = 'inactive'
/**
* Source database.
* @type {PouchDB}
*/
this._sourceDB = sourceDB
/**
* Target database.
* @type {!PouchDB}
*/
this._targetDB = undefined
/**
* ExponentialBackoff instance.
* Keeps track of the replication backoff.
* @type {ExponentialBackoff}
*/
this._backoff = new ExponentialBackoff()
/**
* PouchDB replication instance/ID.
* @type {!PouchDBReplicationID}
*/
this._replication = undefined
this._backoff.on('retry', retry => setImmediate(() => this._ensureReplication(retry)))
this._backoff.on('backoff', (retry, delay) => {
// Cancels current replicator if there is one (no need in keeping an errored replicator)
this._cancelCurrentReplicator()
// Don't signal we're connecting if we're inactive
if (this._state === 'inactive' || this._state === 'cleanup') {
return
}
this._log.debug('Backing off from replication', {retry, delay})
this._updateState('idle')
})
}
/**
* Stops the replication process.
* @emits state('inactive')
*/
stop() {
this._log.info('Stopping replication')
this._updateState('inactive')
this._cancelCurrentReplicator()
this._backoff.reset()
}
_cancelCurrentReplicator() {
if (this._replication) {
// Prevent the replicator from triggering the backoff
this._replication.removeAllListeners()
// Get rid of it
this._replication.cancel()
this._replication = undefined
}
}
/**
* Stops current replication and attempts last-minute replication.
* Last-minute replication is done in "one-shot" mode: replicates whatever data it has
* and immediately stops without listening for changes in the local database.
* @emits state('cleanup')
* @returns {Promise}
*/
async cleanup() {
// Before doing anything, stop!
this.stop()
this._log.info('cleanup')
this._updateState('cleanup')
let ok = false
try {
// Before going away attempt one last replication to get the remaining data out
ok = (await this._createReplicator({live: false, retry: true})).ok
} catch (err) {
this._log.warn('Error in last-minute replication', {err})
}
if (ok) {
this._log.info('Last minute replication successful')
} else {
this._log.warn('Last minute replication failed')
}
}
/**
* Starts replication to a remote database.
* @param {String} dbName Name/URL of the remote database.
* @param {String} [username] Database username, if applicable.
* @param {String} [password] Database password, if applicable.
* @emits state(state): the state of the replicator.
* @returns {Promise} resolved when the replication connection succeeds for the first time.
*/
replicate(dbUrl, username, password) {
// Don't try replicating when we're going away
if (this._state === 'cleanup') {
return Promise.resolve()
}
// Stop any already running replication
this.stop()
this._log.info('Live replication triggered', {dbUrl, username})
this._updateState('idle')
// Create the DB object here to avoid memory leaks
this._targetDB = getRemoteDB(dbUrl, username, password)
// Kick the process into action
return new Promise(resolve => {
this._backoff.once('success', resolve)
this._backoff.backoff()
})
}
/**
* Restarts the replication process when unintentionally stopped.
* @param {Number} retry Retry counter
*/
_ensureReplication(retry) {
// Don't re-create a replicator that is not meant to be
if (this._state === 'inactive' || this._state === 'cleanup' || !this._targetDB) {
return
}
// Cancel current replicator if there is one
this._cancelCurrentReplicator()
this._log.info('Attempting replication', {retry})
// (re)create replicator and listen to events for state magic
this._updateState('connecting')
this._replication = this._createReplicator()
.on('paused', err => {
if (!err) {
this._backoff.success()
}
this._updateState('pause', err)
})
.on('active', () => {
this._backoff.success()
this._updateState('active')
})
.on('denied', err => {
// This is not fatal for the replicator but is a pretty bad sign
this._log.error('Replication denied', err)
})
.once('error', err => {
// This instance is bad, discard and recreate it with exponential backoff
this._log.warn('Replication error', err)
this._backoff.backoff()
})
.once('complete', () => {
// This instance is bad, discard and recreate it with exponential backoff
this._log.warn('Replication canceled/completed')
this._backoff.backoff()
})
}
/**
* Creates a new PouchDB replicator.
* Remember that this class is simply a wrapper.
* @param {Object} [opts] PouchDB replication options.
* @param {Boolean} [opts.live=true] Keep the replication running, pushing changes as they happen.
* @param {Boolean} [opts.retry=false] Enable PouchDB's built-in backoff algorithm.
* @returns {PouchDBReplicationID}
*/
_createReplicator(opts = {live: true, retry: false, batch_size: 20, batch_limit: 5}) { // eslint-disable-line camelcase
return this._sourceDB.replicate.to(
this._targetDB,
opts
)
}
/**
* Updates the Replicator's {@link Replicator#_state}.
* @param {String} state New state.
* @param {Object} [moreLogData] Extra data to be included in the log.
* @emits state(state)
*/
_updateState(state, moreLogData) {
// Prevent infinite loops of the log replicator flipping between active and inactive
// Just log active/paused states when you're not switching between them or when errors occur
if ((state !== 'active' && state !== 'pause') || (this._state !== 'active' && this._state !== 'pause') || moreLogData) {
this._log.info('updateState', moreLogData, {state})
}
this._state = state
this.emit('state', state)
}
}