Source: classes/dynamo/DynamoWriteQueue.js

/** @module DynamoWriteQueue */

import sizeof from 'sizeof';
import DynamoDBWrapper from 'noodle-dynamo';

import { SizeExceeded } from '../../errors';

import {
  PROVISIONED_CAPACITY_UNITS,
  SINGLE_CAPACITY_UNIT_USED_TIME_MS,
  MAX_WRITE_DATA_SIZE_BYTES,
} from '../../constants';

/**
 * @typedef WriteQueueItem
 * @type {object}
 * @property {object} item The item to store in the database
 * @property {Function} callback The callback to call once the data has been read
 */

/**
 * Creates a new Queue for pushing data to Dynamo.
 */
class DynamoWriteQueue {
  /**
   * Constructor for DynamoWriteQueue
   *
   * @param {module:dynamo.DynamoCredentials} dynamoCredentials The credentials for a Dynamo table
   * @param {string} dynamoRegion The region of the Dynamo table we're using
   * @param {string} tableName The name of the table we want to store data in
   */
  constructor(dynamoCredentials, dynamoRegion, tableName) {
    this.dynamoClient = new DynamoDBWrapper(dynamoCredentials, dynamoRegion);
    this.queue = [];
    this.tableName = tableName;

    // setup the event queue
    setInterval(async () => {
      await this.persistBatch();
    }, SINGLE_CAPACITY_UNIT_USED_TIME_MS);
  }

  /**
   * Method to push items to our queue
   *
   * @param {module:DynamoWriteQueue.WriteQueueItem} item Any item that we want to push to Dynamo
   * @param {?Function} callback A method to call once the item has been stored
   * @returns {void} Nothing
   */
  push(item, callback) {
    if (sizeof.sizeof(item) < MAX_WRITE_DATA_SIZE_BYTES) {
      this.queue.push({ item, callback });
    } else {
      throw new SizeExceeded();
    }
  }

  /**
   * Method to push items to our queue
   *
   * @param {Array.<module:DynamoWriteQueue.WriteQueueItem>} batch A batch of items
   *  to push into the queue
   * @param {?Function} callback A method to call once the item has been stored
   * @returns {void} Nothing
   */
  pushBatch(batch, callback) {
    if (batch.some((x) => sizeof.sizeof(x) > MAX_WRITE_DATA_SIZE_BYTES)) {
      throw new SizeExceeded();
    }

    const batchWithCallbacks = batch.map((item) => ({
      item,
      callback,
    }));

    this.queue = this.queue.concat(batchWithCallbacks);
  }

  /**
   * Method to push items to dynamo. This will take a chunk of the current
   * queue, and execute writes to Dynamo for each
   *
   * @private
   * @returns {void} Nothing
   */
  async persistBatch() {
    if (this.queue.length > 0) {
      const batch = this.queue.splice(0, PROVISIONED_CAPACITY_UNITS);

      batch.forEach(async (queueItem) => {
        this.dynamoClient.writeTable(this.tableName, queueItem.item).then((result) => {
          if (queueItem.callback) {
            queueItem.callback(result);
          }
        });
      });
    }
  }
}

export default DynamoWriteQueue;