package storage
Storage provides mechanisms to persist and retrieve arbitrary data. This complements com.cisco.streambed.durablequeue.DurableQueue and can be used a snapshotting mechanism, a common pattern in event sourcing and CQRS systems.
By default, an implementation using the local filesystem is provided. E.g. see "com.cisco.streambed.storage.fs.FileSystemRawStorage" elsewhere.
- Alphabetic
- By Inheritance
- storage
- AnyRef
- Any
- Hide All
- Show All
- Public
- Protected
Type Members
- abstract class RawStorage extends AnyRef
RawStorage
is responsible for taking an identifier and aSource
of bits and persisting them.RawStorage
is responsible for taking an identifier and aSource
of bits and persisting them. It's also responsible for retrieving those bits given the same identifier.Implementations need not concern themselves with concurrency matters. Concurrency is handled by the
Storage
class, which is the interface that consumers of the storage mechanism interact with.To implement, subclasses should extend this class and define
clean
,source
, andsink
methods. - trait StateCodec[A] extends AnyRef
A
StateCodec
defines functions required to serialize and deserialize arbitrary data in a streaming fashion. - final class Storage extends AnyRef
Provides an interface to the
Storage
mechanism that allows concurrent read/write access to the underlyingRawStorage
instance.
Value Members
- object RawStorage
- object Snapshots
Types and functions for managing snapshots.
Types and functions for managing snapshots. Snapshots dramatically speed up the sourcing of events. The actual load/save mechanism is an outside concern to the domain of this module. This module is focused on how events can flow from and to storage and become aggregated.
A Snapshot's events are loaded and emitted as Snapshots.StoredEvent, followed by a Snapshots.StoredEventsDone on completion. New events are then sourced and emitted as Snapshots.StreamedEvents. A typical stream looks like:
Snapshots .fromStorage(storageLoader) .flatMapConcat( initialState => tailFromOffset(initialState.offset, finite) .scan((initialState, Option.empty[DashboardsEvents.Event])) { case ((state, _), (element, offset)) => val newState = nextState(state, element, offset) (newState, Some(element)) } .drop(1) .wireTap(Snapshots.toStorage(storageSaver)) .collect(Snapshots.streamedEvent) .prepend(Snapshots.storedEvents(source(initialState), None)))
- object StateCodec
- object Storage
Storage
provides arbitrary ephemeral data storage.Storage
provides arbitrary ephemeral data storage. One intended use-case is to persist snapshots of read models that are computed by consumingDurableQueue
topics.