Accessing the Akka-Persistence Snapshot-Store Future-API

6 Feb
2021

At my current project, we use snapshots for versioning certain important states in our application. As we have the requirement to “jump back” to these states for some specialised queries, we needed a way to access a specific snapshot, without having to create a Persistent-Actor / Eventsourced-Behaviour that could mess up the journal of that Entity or would start to consume the event-log after loading the snapshot.

So, basically what we needed is the future-api that is already defined in SnapshotStore.scala in Akka.

Unfortunatelly, this API is not easily accessible, as it can only be implemented inside an Actor and we’re not supposed to access an actors internal state from the outside, thus making it impossible to use the already provided Future-based API.

As a workaround, I’ve created these “helpers” to easily access the snapshot-store and interact with its API.

package akka.persistence
// code to accompany the blog post at
// https://dominikdorn.com/2021/02/accessing-the-akka-persistence-snapshot-store-future-api/
// This class gives you access to the future based API of a SnapshotStore by providing the SnapshotStore's ActorRef
private[akka] object SnapshotStoreAccessor {
def apply(ref: ActorRef)(
implicit timeout: Timeout = Timeout(5.seconds)): SnapshotStoreBase =
new SnapshotStoreBase {
import SnapshotProtocol._
override def loadAsync(
persistenceId: String,
criteria: SnapshotSelectionCriteria): Future[Option[SelectedSnapshot]] =
(ref ? SnapshotProtocol.LoadSnapshot(persistenceId, criteria = criteria, toSequenceNr = criteria.maxSequenceNr))
.mapTo[SnapshotProtocol.Response]
.flatMap {
case LoadSnapshotResult(snapshot, _) => Future.successful(snapshot)
case LoadSnapshotFailed(cause) => Future.failed(cause)
case _ => Future.failed(new IllegalStateException("unexpected result"))
}(ExecutionContext.parasitic)
override def saveAsync(metadata: SnapshotMetadata, snapshot: Any): Future[Unit] =
(ref ? SnapshotProtocol.SaveSnapshot(metadata, snapshot))
.mapTo[SnapshotProtocol.Response]
.flatMap {
case SaveSnapshotSuccess(_) => Future.successful(())
case SaveSnapshotFailure(_, cause) => Future.failed(cause)
case _ => Future.failed(new IllegalStateException("unexpected result"))
}(ExecutionContext.parasitic)
override def deleteAsync(metadata: SnapshotMetadata): Future[Unit] =
(ref ? SnapshotProtocol.DeleteSnapshot(metadata))
.mapTo[SnapshotProtocol.Response]
.flatMap {
case DeleteSnapshotSuccess(_) => Future.unit
case DeleteSnapshotFailure(_, cause) => Future.failed(cause)
case _ => Future.failed(new IllegalStateException("unexpected result"))
}(ExecutionContext.parasitic)
override def deleteAsync(persistenceId: String, criteria: SnapshotSelectionCriteria): Future[Unit] =
(ref ? SnapshotProtocol.DeleteSnapshots(persistenceId, criteria))
.mapTo[SnapshotProtocol.Response]
.flatMap {
case DeleteSnapshotsSuccess(_) => Future.unit
case DeleteSnapshotsFailure(_, cause) => Future.failed(cause)
case _ => Future.failed(new IllegalStateException("unexpected result"))
}(ExecutionContext.parasitic)
}
}

Here, I first extracted (bascially copy-pasted) the future based API from SnapshotStore.scala, so I can easily reference the type later – I called this trait SnapshotStoreBase.

Then I created the “SnapshotStoreAccessor”, which basically given the ActorRef of a SnapshotStore gives you access to its Future-based API.

package com.dominikdorn.akkatest
import akka.actor.ActorSystem
import akka.persistence.SnapshotSelectionCriteria
import akka.dominikdorn.SnapshotStoreGetter
import com.dominikdorn.akkatest.testentity.FilledState
class Example {
def main(args: Array[String]): Unit = {
implicit val actorSystem = ActorSystem.create()
implicit val ec = actorSystem.dispatcher
val snapshotStore = SnapshotStoreGetter.getSnapshotStore("jdbc-snapshot-store")
val maybeSnapshot = snapshotStore.loadAsync("my-entity-1", SnapshotSelectionCriteria(maxSequenceNr = 500))
val filledState = maybeSnapshot.collect{
case Some(selectedSnapshot) => selectedSnapshot.snapshot.asInstanceOf[FilledState]
}
filledState.foreach(stateOfSnapshot => {
println("the name of the entity is = " + stateOfSnapshot.name)
})
}
}
view raw Example.scala hosted with ❤ by GitHub

So by now, we have a trait that models the future-based way of accessing the Snapshot-Store and an implementation that allows us to access this using the Ask-Pattern.

What is still missing is a way to get a hold on the ActorRef of the SnapshotStore, so we can use that one together with the SnapshotStoreAccessor.

For this, I created the SnapshotStoreGetter:

package akka.dominikdorn
// code to accompany the blog post at
// https://dominikdorn.com/2021/02/accessing-the-akka-persistence-snapshot-store-future-api/
object SnapshotStoreGetter {
private def getSnapshotStoreActorRef(system: ActorSystem, snapshotPluginId: String): ActorRef =
Persistence(system).snapshotStoreFor(snapshotPluginId)
private def getSnapshotStoreFor(ref: ActorRef)(
implicit ec: ExecutionContext): SnapshotStoreBase = SnapshotStoreAccessor(ref)
def getSnapshotStore(snapshotPluginId: String)(implicit ac: ActorSystem): SnapshotStoreBase = {
val ref = getSnapshotStoreActorRef(ac, snapshotPluginId)
getSnapshotStoreFor(ref)(ac.dispatcher)
}
// zio helper
def snapshotStore(snapshotPluginId: String): ZIO[Has[ActorSystem], Nothing, SnapshotStoreBase] =
for {
actorSystem < ZIO.service[ActorSystem]
ref = getSnapshotStoreActorRef(actorSystem, snapshotPluginId)
snapshotStore < ZIO.fromFuture(implicit ec => Future.successful(getSnapshotStoreFor(ref))).orDie
} yield snapshotStore
}

Here, we first have a Method “getSnapshotStoreActorRef” which uses an internal Akka API to get the reference to the SnapshotStore actor of the given “snapshotStorePluginId”. As the underlying method in akka.persistence.Persistence is scoped to the akka package, we have to make sure this accessor is also in the akka package.
The “getSnapshotStoreFor” method than just takes that ActorRef and created the SnapshotStoreAccessor. Finally, the “getSnapshotStore” is our public API, giving access to the future-based API by just providing the ID and an implicit ActorSystem.

So how do we now use this to access our Snapshot? Easy! Just look at this example:

package com.dominikdorn.akkatest
import akka.actor.ActorSystem
import akka.persistence.SnapshotSelectionCriteria
import akka.dominikdorn.SnapshotStoreGetter
import com.dominikdorn.akkatest.testentity.FilledState
class Example {
def main(args: Array[String]): Unit = {
implicit val actorSystem = ActorSystem.create()
implicit val ec = actorSystem.dispatcher
val snapshotStore = SnapshotStoreGetter.getSnapshotStore("jdbc-snapshot-store")
val maybeSnapshot = snapshotStore.loadAsync("my-entity-1", SnapshotSelectionCriteria(maxSequenceNr = 500))
val filledState = maybeSnapshot.collect{
case Some(selectedSnapshot) => selectedSnapshot.snapshot.asInstanceOf[FilledState]
}
filledState.foreach(stateOfSnapshot => {
println("the name of the entity is = " + stateOfSnapshot.name)
})
}
}
view raw Example.scala hosted with ❤ by GitHub

Comment Form

top