/** @module DynamoReadQueue */
import DynamoDBWrapper from 'noodle-dynamo';
import { PROVISIONED_CAPACITY_UNITS, SINGLE_CAPACITY_UNIT_USED_TIME_MS } from '../../constants';
import { DataNotFound, InvalidQueueReadItem } from '../../errors';
import { isUndefined } from '../../methods';
/**
* @typedef ReadItem
* @type {object}
* @property {string} expression The expression to read the database with
* @property {object} expressionData The data to provide to the above expression
*/
/**
* Creates a new Queue for pushing data to Dynamo.
*/
class DynamoReadQueue {
/**
* Constructor for DynamoReadQueue
*
* @param {module:dynamo.DynamoCredentials} dynamoCredentials The credentials for a Dynamo table
* @param {string} dynamoRegion The region of the Dynamo table we're using
* @param {string} tableName The name of the table we want to store data in
*/
constructor(dynamoCredentials, dynamoRegion, tableName) {
this.dynamoClient = new DynamoDBWrapper(dynamoCredentials, dynamoRegion);
this.queue = [];
this.tableName = tableName;
// setup the event queue
setInterval(async () => {
await this.readBatch();
}, SINGLE_CAPACITY_UNIT_USED_TIME_MS);
}
/**
* Pushes item to be read into the queue
*
* @param {module:DynamoReadQueue.ReadItem} item An item to be read from Dynamo
* @param {Function} callback The method to be called once the item has been read
*/
push(item, callback) {
validateItem(item);
validateCallback(callback);
this.queue.push({ item, callback });
}
/**
* Method to push items to our queue
*
* @param {Array.<module:DynamoReadQueue.ReadItem>} batch A batch of items to push into the queue
* @param {Function} callback A method to be called for each read item
* @returns {void} Nothing
*/
pushBatch(batch, callback) {
validateCallback(callback);
batch.forEach((item) => validateItem(item));
const queueItems = batch.map((item) => ({
item,
callback,
}));
this.queue = this.queue.concat(queueItems);
}
/**
* Method to read a batch of items from dynamo
*
* @private
* @returns {void} Nothing
*/
async readBatch() {
if (this.queue.length > 0) {
const batch = this.queue.splice(0, PROVISIONED_CAPACITY_UNITS);
batch.forEach(async (queueItem) => {
try {
const readData = await this.dynamoClient.readTable(
this.tableName,
queueItem.item.expression,
queueItem.item.expressionData,
);
if (readData.Count === 0) {
throw new DataNotFound();
}
const dataItem = readData.Items;
queueItem.callback(dataItem);
} catch (e) {
queueItem.callback(e);
}
});
}
}
}
/**
* Validates that an item pushed to the queue is valid
*
* @private
* @param {module:DynamoReadQueue.ReadItem} item Specifies what to read from the database
*/
function validateItem(item) {
if (isUndefined(item.expression)) {
throw new InvalidQueueReadItem('Missing expression!');
}
if (isUndefined(item.expressionData)) {
throw new InvalidQueueReadItem('Missing expressionData!');
}
}
/**
* Validates that the callback fulfils the requirements for a read
*
* @private
* @param {Function} callback The method to validate
*/
function validateCallback(callback) {
if (isUndefined(callback)) {
throw new InvalidQueueReadItem('Missing callback!');
}
}
export default DynamoReadQueue;