object Snapshots
Types and functions for managing snapshots. Snapshots dramatically speed up the sourcing of events. The actual load/save mechanism is an outside concern to the domain of this module. This module is focused on how events can flow from and to storage and become aggregated.
A Snapshot's events are loaded and emitted as Snapshots.StoredEvent, followed by a Snapshots.StoredEventsDone on completion. New events are then sourced and emitted as Snapshots.StreamedEvents. A typical stream looks like:
Snapshots .fromStorage(storageLoader) .flatMapConcat( initialState => tailFromOffset(initialState.offset, finite) .scan((initialState, Option.empty[DashboardsEvents.Event])) { case ((state, _), (element, offset)) => val newState = nextState(state, element, offset) (newState, Some(element)) } .drop(1) .wireTap(Snapshots.toStorage(storageSaver)) .collect(Snapshots.streamedEvent) .prepend(Snapshots.storedEvents(source(initialState), None)))
- Alphabetic
- By Inheritance
- Snapshots
- AnyRef
- Any
- Hide All
- Show All
- Public
- Protected
Type Members
- trait Aggregatable[A] extends AnyRef
An Aggregatable is a structure for aggregated state that can have a iterable of elements along with some offset that they represent.
An Aggregatable is a structure for aggregated state that can have a iterable of elements along with some offset that they represent. An offset is opaque to the snapshotting mechanism and can mean whatever it needs to mean to be able to source new events given a point in time.
- A
The type of the elements to represent an aggregation of
- sealed trait SnapshotEvent[A] extends AnyRef
An event that can be omitted by storage events.
An event that can be omitted by storage events.
- A
The type of the elements to represent
- final case class StoredEvent[A](element: A) extends SnapshotEvent[A] with Product with Serializable
The event was sourced from storage.
The event was sourced from storage.
- A
The type of the elements to represent
- final case class StoredEventsDone[A, B](offset: Option[Long], context: Option[B]) extends SnapshotEvent[A] with Product with Serializable
Signals that no more events are source from storage, along with an optional context value that can be used for any purpose.
Signals that no more events are source from storage, along with an optional context value that can be used for any purpose.
- A
The type of the elements to represent
- B
The type of the context
- final case class StreamedEvent[A](element: A, offset: Option[Long]) extends SnapshotEvent[A] with Product with Serializable
A regular streamed event i.e.
A regular streamed event i.e. not loaded by storage.
- A
The type of the elements to represent
Value Members
- final def !=(arg0: Any): Boolean
- Definition Classes
- AnyRef → Any
- final def ##: Int
- Definition Classes
- AnyRef → Any
- final def ==(arg0: Any): Boolean
- Definition Classes
- AnyRef → Any
- final def asInstanceOf[T0]: T0
- Definition Classes
- Any
- def clone(): AnyRef
- Attributes
- protected[lang]
- Definition Classes
- AnyRef
- Annotations
- @throws(classOf[java.lang.CloneNotSupportedException]) @native()
- final def eq(arg0: AnyRef): Boolean
- Definition Classes
- AnyRef
- def equals(arg0: AnyRef): Boolean
- Definition Classes
- AnyRef → Any
- def finalize(): Unit
- Attributes
- protected[lang]
- Definition Classes
- AnyRef
- Annotations
- @throws(classOf[java.lang.Throwable])
- def fromStorage[A <: Aggregatable[_]](storageLoader: Future[A]): Source[A, NotUsed]
Source an Aggregatable value from storage.
Source an Aggregatable value from storage.
- A
The type of the Aggregatable
- storageLoader
A source of having loaded the initial snapshot, which is an Aggregatable
- returns
A source of the Aggregatable
- final def getClass(): Class[_ <: AnyRef]
- Definition Classes
- AnyRef → Any
- Annotations
- @native()
- def hashCode(): Int
- Definition Classes
- AnyRef → Any
- Annotations
- @native()
- final def isInstanceOf[T0]: Boolean
- Definition Classes
- Any
- final def ne(arg0: AnyRef): Boolean
- Definition Classes
- AnyRef
- final def notify(): Unit
- Definition Classes
- AnyRef
- Annotations
- @native()
- final def notifyAll(): Unit
- Definition Classes
- AnyRef
- Annotations
- @native()
- def storedEvents[A <: Aggregatable[B], B, Mat](elements: Source[B, Mat], offset: Option[Long], context: Option[_]): Source[SnapshotEvent[B], Mat]
Source the elements of an Aggregatable that represent stored events and map them to StoredEvents followed by a StoredEventsDone on completion.
Source the elements of an Aggregatable that represent stored events and map them to StoredEvents followed by a StoredEventsDone on completion.
- A
The type of the Aggregatable
- B
The type of the element of the Aggregatable
- elements
the source elements to be emitted as StoredEvents
- offset
An offset to emitted with each StoredEvent
- context
some optional context to record in the StoredEventsDone element
- def streamedEvent[A <: Aggregatable[B], B]: PartialFunction[(A, Option[B]), StreamedEvent[B]]
Partial function for testing an Aggregatable that has some associated element and, if so, maps it to a StreamedEvent.
Partial function for testing an Aggregatable that has some associated element and, if so, maps it to a StreamedEvent. Useful in the context of a Stream's "collect" style of stage.
- A
The type of the Aggregatable
- B
The type of the element of the Aggregatable
- returns
A StreamedEvent along with the current offset, if there is an element.
- final def synchronized[T0](arg0: => T0): T0
- Definition Classes
- AnyRef
- def toStorage[A <: Aggregatable[_]](storageSaver: (A) => Future[Done]): Sink[Tuple2[A, _], Future[Done]]
Save an Aggregatable to storage.
Save an Aggregatable to storage.
- A
The type of the Aggregatable
- storageSaver
A function to perform the actual saving to storage
- returns
A sink that performs the side-effecting saving to storage
- def toString(): String
- Definition Classes
- AnyRef → Any
- final def wait(): Unit
- Definition Classes
- AnyRef
- Annotations
- @throws(classOf[java.lang.InterruptedException])
- final def wait(arg0: Long, arg1: Int): Unit
- Definition Classes
- AnyRef
- Annotations
- @throws(classOf[java.lang.InterruptedException])
- final def wait(arg0: Long): Unit
- Definition Classes
- AnyRef
- Annotations
- @throws(classOf[java.lang.InterruptedException]) @native()