package storage
Storage provides mechanisms to persist and retrieve arbitrary data. This complements com.cisco.streambed.durablequeue.DurableQueue and can be used a snapshotting mechanism, a common pattern in event sourcing and CQRS systems.
By default, an implementation using the local filesystem is provided. E.g. see "com.cisco.streambed.storage.fs.FileSystemRawStorage" elsewhere.
- Alphabetic
- By Inheritance
- storage
- AnyRef
- Any
- Hide All
- Show All
- Public
- Protected
Type Members
- abstract class RawStorage extends AnyRef
RawStorageis responsible for taking an identifier and aSourceof bits and persisting them.RawStorageis responsible for taking an identifier and aSourceof bits and persisting them. It's also responsible for retrieving those bits given the same identifier.Implementations need not concern themselves with concurrency matters. Concurrency is handled by the
Storageclass, which is the interface that consumers of the storage mechanism interact with.To implement, subclasses should extend this class and define
clean,source, andsinkmethods. - trait StateCodec[A] extends AnyRef
A
StateCodecdefines functions required to serialize and deserialize arbitrary data in a streaming fashion. - final class Storage extends AnyRef
Provides an interface to the
Storagemechanism that allows concurrent read/write access to the underlyingRawStorageinstance.
Value Members
- object RawStorage
- object Snapshots
Types and functions for managing snapshots.
Types and functions for managing snapshots. Snapshots dramatically speed up the sourcing of events. The actual load/save mechanism is an outside concern to the domain of this module. This module is focused on how events can flow from and to storage and become aggregated.
A Snapshot's events are loaded and emitted as Snapshots.StoredEvent, followed by a Snapshots.StoredEventsDone on completion. New events are then sourced and emitted as Snapshots.StreamedEvents. A typical stream looks like:
Snapshots .fromStorage(storageLoader) .flatMapConcat( initialState => tailFromOffset(initialState.offset, finite) .scan((initialState, Option.empty[DashboardsEvents.Event])) { case ((state, _), (element, offset)) => val newState = nextState(state, element, offset) (newState, Some(element)) } .drop(1) .wireTap(Snapshots.toStorage(storageSaver)) .collect(Snapshots.streamedEvent) .prepend(Snapshots.storedEvents(source(initialState), None))) - object StateCodec
- object Storage
Storageprovides arbitrary ephemeral data storage.Storageprovides arbitrary ephemeral data storage. One intended use-case is to persist snapshots of read models that are computed by consumingDurableQueuetopics.