Implementing put and delete operations using BatchWriteItem

By Andres Jaimes

- 6 minutes read - 1221 words

The BatchWriteItem operation allows us to work with groups of up to 25 items per request. Additionally, we can put and delete items to one or multiple tables. Amazon DynamoDB’s page says:

A single call to BatchWriteItem can write up to 16 MB of data, which can comprise as many as 25 put or delete requests. Individual items to be written can be as large as 400 KB.

In this article we are going to go through the process of putting and deleting items to a table that looks like this:

While we process a bacth of items, some of them can fail. If this is the case, we can implement a retry strategy as recommended by Amazon:

If DynamoDB returns any unprocessed items, you should retry the batch operation on those items. However, we strongly recommend that you use an exponential backoff algorithm. If you retry the batch operation immediately, the underlying read or write requests can still fail due to throttling on the individual tables. If you delay the batch operation using exponential backoff, the individual requests in the batch are much more likely to succeed.

Implementation

Let’s start by adding some import statements.

import akka.actor.ActorSystem
import akka.pattern.after
import com.amazonaws.services.dynamodbv2.model.{AttributeValue, BatchWriteItemRequest, DeleteRequest, PutRequest, QueryRequest, WriteRequest}

import scala.collection.JavaConversions._
import scala.collection.JavaConverters._
import scala.concurrent.duration._
import scala.concurrent.{ExecutionContext, Future}
  • ActorSystem and after imports are necessary to set up a retry scheduler for those cases when we have unprocessed items.
  • Java conversions and converters are required for Java/Scala interaction.

We now declare class-level constants:

protected val MaxOpsPerBatch = 25
protected val MaxRetries = 3
protected val RetryDelay = 1.second
  • MaxOpsPerBatch is what dynamodb can handle per batch.
  • MaxRetries is the number of retries we want to do if we get unprocessed items.
  • RetryDelay is the number of seconds we’ll wait before resending a batch of unprocessed items. This can be implemented using an exponential backoff strategy, but it’s out of the scope of this article.

We’re now going to create a couple of functions that interact with other classes.

def create(items: Seq[MyItem])(ec: ExecutionContext): Future[Unit] = 
  Future.sequence(items.grouped(MaxOpsPerBatch).toList.map { group =>
    batchWriteItemWithRetries(toPutRequests(group), 1)(actorSystem, ec)
  }).flatMap(_ => Future.successful(()))

def delete(partitionId: String, rangeIds: Seq[String])(ec: ExecutionContext): Future[Unit] = 
  Future.sequence(rangeIds.grouped(MaxOpsPerBatch).toList.map { group =>
    batchWriteItemWithRetries(toDeleteRequests(partitionId, group), 1)(actorSystem, ec)
  }).flatMap(_ => Future.successful(()))
  • create receives a sequence of items to insert into the database.
  • delete receives a partition key and a sequence of range keys.

Both functions receive a sequence of items of any length, which we may have to divide into smaller chunks of MaxOpsPerBatch (25 for our example), to make them manageable for dynamodb. When done, items have to be encapsulated into instances of java.util.Map[String, java.util.List[WriteRequest]]. A couple of functions help us with this process:

protected def toPutRequests(items: Seq[MyItem]): java.util.Map[String, java.util.List[WriteRequest]] = {
  val writeRequests = items.map { item =>
    val putRequest = new PutRequest()
      .addItemEntry("myPartitionKey", new AttributeValue().withS(item.partitionId))
      .addItemEntry("myRangeKey", new AttributeValue().withS(item.rangeId))
      .addItemEntry("otherField", new AttributeValue().withS(item.otherField))
    // maybe add an optional field too...
    item.optionalField.foreach { value => putRequest.addItemEntry("optionalField", new AttributeValue().withS(value)) }
    new WriteRequest().withPutRequest(putRequest)
  }.asJava
  Map("tableName" -> writeRequests).asJava
}

protected def toDeleteRequests(partitionId: String, rangeIds: Seq[String]): java.util.Map[String, java.util.List[WriteRequest]] = {
  val writeRequests = rangeIds.map { rangeId =>
    val deleteRequest = new DeleteRequest()
      .addKeyEntry("myPartitionKey", new AttributeValue().withS(partitionId))
      .addKeyEntry("myRangeKey", new AttributeValue().withS(rangeId))
    new WriteRequest().withDeleteRequest(deleteRequest)
  }.asJava
  Map("tableName" -> writeRequests).asJava
}

WriteRequest can contain put and/or delete requests. In this case we are implementing a couple of functions: one for put requests and one for delete requests. We make sure that the return type is java.util.Map[String, java.util.List[WriteRequest]] so we avoid transformations in the functions that deal with the actual data submission.

Finally, a couple of functions that perform the actual data submission to dynamo and a function that retries the operation if we have unprocessed items:

protected def batchWriteItemWithRetries(writeRequests: java.util.Map[String, java.util.List[WriteRequest]], attempt: Int, retryDelay: FiniteDuration = RetryDelay)(implicit actorSystem: ActorSystem, ec: ExecutionContext): Future[Unit] =
  batchWriteItem(writeRequests, attempt).recoverWith {
    case e: UnprocessedItemsException =>
      after(retryDelay, actorSystem.scheduler) {
        batchWriteItemWithRetries(e.unprocessedItems, attempt + 1)
      }
    case e: Throwable => throw t
  }

protected def batchWriteItem(writeRequests: java.util.Map[String, java.util.List[WriteRequest]], attempt: Int)(implicit ec: ExecutionContext): Future[Unit] = Future {
  if (writeRequests.nonEmpty) {
    val result = dynamodb.getClient.batchWriteItem(
      new BatchWriteItemRequest().withRequestItems(writeRequests))
    if (result.getUnprocessedItems.nonEmpty) {
      if (attempt + 1 > MaxRetries)
        throw MaxRetriesException(s"max retries...")
      else throw UnprocessedItemsException(result.getUnprocessedItems, s"unprocessed items...")
    }
  } else Future.successful(())
}

batchWriteItemWithRetries makes a call to batchWriteItem. If the function returns successfully, then we’re done. However, if we catch a UnprocessedItemsException then we are going to wait for retryDelay seconds before resubmitting the work with those unprocessed items. The number of attempts increase every time we call batchWriteItem.

batchWriteItem makes the actual dynamodb call and throws exceptions if we have reached the max number of attempts or we have resulting unprocessed items.

Our UnprocessedItemsException class looks like this:

case class UnprocessedItemsException(unprocessedItems: java.util.Map[String, java.util.List[WriteRequest]]) extends Exception

it has a java.util.Map[String, java.util.List[WriteRequest]] parameter so we can pass any unprocessed items back to the calling function.

Removing all items that match a partition key

I was wondering if there could be a way to delete a batch of rows by specifying a partition key only (for cases where you have a partition/range pair of keys), but Amazon states clearly on their docs that this is not possible:

DeleteRequest - Perform a DeleteItem operation on the specified item. The item to be deleted is identified by a Key subelement: Key - A map of primary key attribute values that uniquely identify the item. Each entry in this map consists of an attribute name and an attribute value. For each primary key, you must provide all of the key attributes. For example, with a simple primary key, you only need to provide a value for the partition key. For a composite primary key, you must provide values for both the partition key and the sort key.

So we have to create a delete request for each item. Luckily, we can implement it using BatchWriteItem as well.

Our first step is to identify all the rows that match our partition key.

def findByPartitionId(myPartitionKey: String)(implicit ec: ExecutionContext): Future[Seq[MyItem]] = {
  val request: QueryRequest = new QueryRequest()
    .withTableName("tableName")
    .withKeyConditionExpression(s"myPartitionKey = :myPartitionKey")
    .withExpressionAttributeValues(Map(
      ":myPartitionKey" -> new AttributeValue().withS(myPartitionKey)))

  Future {
    val result = dynamodb.getClient.query(request).getItems.toList
    result map { i:  Map[String, AttributeValue] => /* instantiate resulting rows */ }
  }
}

Once we have a list of keys, we can go through a process similar to the previous one, where we submit all of those rows in batches. query results can return a dataset of up to 1MB, so we may have to iterate multiple times looking for matching items on the table. findRows is in charge of this iteration.

def delete(myPartitionId: String)(implicit requestId: String, ec: ExecutionContext): Future[Unit] = {
  val MaxIterations = 100 // make sure we don't go into an infinite loop

  def deleteRows(myPartitionId: String, rangeIds: Seq[String]): Future[Unit] =
    Future.sequence(rangeIds.grouped(MaxOpsPerBatch).toList.map { group =>
      batchWriteItemWithRetries(toDeleteRequests(myPartitionId, group), 1)(actorSystem, ec)
    }).flatMap(_ => Future.successful(()))

  def findRows(myPartitionId: String, iteration: Int): Future[Unit] = {
    if (iteration > MaxIterations) Future.successful(())
    else for {
      items <- findByPartitionId(myPartitionId)
      _ <- items.toList match {
        case Nil => Future.successful(())
        case _ :: _ => for {
            _ <- deleteRows(myPartitionId, items.map(_.rangeKey))
            _ <- findByPartitionId(myPartitionId, iteration + 1)
          } yield ()
        }
    } yield ()
  }

  findRows(myPartitionId, 1)
}

findRows will iterate over any resulting rows that have myPartitionKey as partition key. Then for each batch of items, deleteRows will call our previous batchWriteItemWithRetries function to perform the actual row deletion.

References