Soilstate backend
Tutorial objectives
At the end of the tutorial on the backend you should be able to:
- Understand how a packet is transformed from a LoRaWAN packet into a domain object
- Understand how to encrypt a domain object and publish it to a durable queue
- Understand how a durable queue is used to receive these domain objects and decrypt them
- Understand how the front end application is served
- Understand how Streambed user identity is used by the backend
- Understand how the back end provides data to the front end
A quick overview
Let’s dive right in. Once we’re through this section, we’ll back off and go through the project step by step.
The back end service principally delivers an Akka HTTP route with a base URI path of /
on an operator-configured port. Data is served at an /api
path, which is secured with an HTTP Bearer Token. There is one exception: /api/login
is used to authenticate a user and obtain a bearer token. Bearer tokens have a lease time and will expire, resulting in requiring a user to authenticate again after a period of time.
All traffic on a service gateway is encrypted using TLS.
After serving up the front end assets, the front end makes a Server Sent Event (SSE) request for data at the /api/soilstate
URI path. Akka SSE is part of Akka HTTP and upon receiving a connection, it will subscribe to durable queue events on a DurableQueue
using the soilstate-data-up-json
topic.
A durable queue is a commit log where its elements are stored for a period of time. Kafka and Chronicle Queue are good examples of durable queues. Topics are addresses of queues and can be used to tail and append data from/to them. When a stream tailing service is not running its events are still stored so that when it starts, the service will start reading from the tail and build its state accordingly. Therefore, services can be started and stopped with no loss of data. This is known as event sourcing and is particularly useful when swapping out an old service with a new one.
Events will be decrypted using a secret key. If this key is unavailable to the logged in user then no decryption and therefore no events will be returned. Here is a sample of the decrypted JSON data returned to the front end over its TLS connection:
{
"time": "2018-02-22T10:18:22.173768Z",
"nwkAddr": "01645655",
"temperature": 24.9,
"moisturePercentage": 20.2
}
The front end receives meta data in relation to a soil moisture/temp sensor (provisioning sensors in general is outside of the scope of this tutorial, but it is a function provided with Streambed). These sensors are keyed by a network address (nwkAddr
). An /api/end-devices
URI path is provided for receiving SSE events on sensor meta data. Here is a sample of the payload describing an update to sensor positioning data:
{
"nwkAddr": "01645655",
"time": "2018-02-22T10:20:02.073263Z",
"position": {
"lat": 125.6,
"lng": 10.1
}
}
The transformer component
Streambed holds the notion of a “transformer” with an associated “domain model”. The responsibility of a transformer is to marshal a LoRaWAN packet into a normalized domain model representation. We do this so that various other services are able to consume the data more easily and de-coupled from the knowledge that it was source from LoRaWAN (Streambed isn’t tied to LoRaWAN and is able to support a variety of sensor-data-receiving methods).
We have a template so that you can quickly generate a transformer with an associated domain model. You will be prompted for the following information:
-
a project
name
- we use names likesoilstate
to represent Soil Moisture/Temperature sensors -
an
organization
- the reverse domain notation used for declaring namespaces -
an
organizationName
- the plain english name of your organization for use in copyright statements -
a
deviceType
- an identifier representing the type of device you’re focused on
To generate the project (accept the defaults when prompted):
- Maven/Java
-
docker run \ --rm -it \ -v $PWD:/g8out \ moredip/giter8 \ https://github.com/streambed/lora-transformer-ui-java.g8
- sbt/Scala
-
sbt new https://github.com/streambed/lora-transformer-ui-scala.g8.git
Open the project up in your favorite editor or IDE. We will now go over some areas that will require your attention for every new type of sensor that requires a transformer.
If your editor or IDE supports the ability to highlight “TODO” or “FIXME” comments then you can use that to quickly navigate to these areas.
The domain model
Maven/Java: https://github.com/streambed/fdp-soilstate-ui-java sbt/Scala: https://github.com/streambed/fdp-soilstate-ui-scala
The domain model describes how sensor observations are represented in your programming language. The domain model is what you start with when needing to describe a new type of sensor to Str. Here is a Scala domain model for soil moisture/temperature:
- Java
-
/** * Captures a temperature/moisture reading of soil. */ public final class SoilStateReading { public static final ObjectMapper mapper = new ObjectMapper(); static { mapper.registerModule(new JavaTimeModule()); mapper.configure(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS, false); } private final Instant time; private final int nwkAddr; private final BigDecimal temperature; private final BigDecimal moisturePercentage; @JsonCreator public SoilStateReading(@JsonProperty(value = "time", required = true) Instant time, @JsonProperty(value = "nwkAddr", required = true) int nwkAddr, @JsonProperty(value = "temperature", required = true) BigDecimal temperature, @JsonProperty(value = "moisturePercentage", required = true) BigDecimal moisturePercentage) { this.time = time; this.nwkAddr = nwkAddr; this.temperature = temperature; this.moisturePercentage = moisturePercentage; } public Instant getTime() { return time; } public int getNwkAddr() { return nwkAddr; } public BigDecimal getTemperature() { return temperature; } public BigDecimal getMoisturePercentage() { return moisturePercentage; } /** * Construct from raw bytes */ public SoilStateReading(Instant time, int nwkAddr, byte[] payload) { this.time = time; this.nwkAddr = nwkAddr; if (payload.length >= 4) { ByteBuffer iter = ByteBuffer.wrap(payload); BigDecimal TEMP_OFFSET = BigDecimal.valueOf(40, 0); BigDecimal t = BigDecimal.valueOf(iter.getShort() & 0xFFFF, 1).subtract(TEMP_OFFSET); BigDecimal m = BigDecimal.valueOf(iter.getShort() & 0xFFFF, 1); this.temperature = t; this.moisturePercentage = m; } else { this.temperature = BigDecimal.ZERO; this.moisturePercentage = BigDecimal.ZERO; } } @Override public boolean equals(Object o) { if (this == o) return true; if (o == null || getClass() != o.getClass()) return false; SoilStateReading that = (SoilStateReading) o; return nwkAddr == that.nwkAddr && Objects.equals(time, that.time) && Objects.equals(temperature, that.temperature) && Objects.equals(moisturePercentage, that.moisturePercentage); } @Override public int hashCode() { return Objects.hash(time, nwkAddr, temperature, moisturePercentage); } @Override public String toString() { return "SoilStateReading{" + "time=" + time + ", nwkAddr=" + nwkAddr + ", temperature=" + temperature + ", moisturePercentage=" + moisturePercentage + '}'; } - Scala
-
/** * Captures a temperature/moisture reading of soil. */ final case class SoilStateReading( time: Instant, nwkAddr: Int, temperature: BigDecimal, moisturePercentage: BigDecimal )
Each reading will therefore have a:
time
expressed in UTC form - this is the time that it is received at the transformer (described in the next section)nwkAddr
, which is the address of the device, distinct within the network managed by Streambedtemperature
, the soil temperature in degrees celsiusmoisturePercentage
, the moisture of the soil expressed in percentage terms, with 100% being fully saturated
After declaring a domain object, we must also declare a means to marshal it from a sequence of bytes. LoRaWAN will always ensure that the entire byte sequence from the sensor is delivered. We use an alternate constructor to form the domain object from the byte sequence:
- Java
-
/** * Construct from raw bytes */ public SoilStateReading(Instant time, int nwkAddr, byte[] payload) { this.time = time; this.nwkAddr = nwkAddr; if (payload.length >= 4) { ByteBuffer iter = ByteBuffer.wrap(payload); BigDecimal TEMP_OFFSET = BigDecimal.valueOf(40, 0); BigDecimal t = BigDecimal.valueOf(iter.getShort() & 0xFFFF, 1).subtract(TEMP_OFFSET); BigDecimal m = BigDecimal.valueOf(iter.getShort() & 0xFFFF, 1); this.temperature = t; this.moisturePercentage = m; } else { this.temperature = BigDecimal.ZERO; this.moisturePercentage = BigDecimal.ZERO; } } - Scala
-
val TempOffset = BigDecimal(400, 1) val Zero = BigDecimal(0, 1) /** * Construct from a decrypted LoRaWAN ConfirmedDataUp/UnconfirmedDataUp FRMPayload */ def apply(time: Instant, nwkAddr: Int, payload: Array[Byte]): SoilStateReading = { val (temperature, moisturePercentage) = if (payload.length >= 4) { val iter = ByteBuffer.wrap(payload) val t = BigDecimal(iter.getShort & 0xffff, 1) - TempOffset val m = BigDecimal(iter.getShort & 0xffff, 1) t -> m } else Zero -> Zero new SoilStateReading(time, nwkAddr, temperature, moisturePercentage) }
This sensor conveys both the temperature and moisture within 4 bytes. As you can see, LoRaWAN payloads are generally quite small and less than 12 bytes. Temperature is the first 2 bytes and offset at value of 400 as per the sensor manufacturer’s specifications. If there’s a problem with decoding the bytes (perhaps there aren’t enough bytes given a buggy sensor) then the observation is rendered having values of 0. These could easily be filtered out and dropped by a transformer (described next, although we don’t drop them).
The next step when developing the domain model is to provide a means to marshal domain objects to and from JSON. We do this using spray-json
as is illustrated below:
- Java
-
public static final ObjectMapper mapper = new ObjectMapper(); static { mapper.registerModule(new JavaTimeModule()); mapper.configure(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS, false); } private final Instant time; private final int nwkAddr; private final BigDecimal temperature; private final BigDecimal moisturePercentage; @JsonCreator public SoilStateReading(@JsonProperty(value = "time", required = true) Instant time, @JsonProperty(value = "nwkAddr", required = true) int nwkAddr, @JsonProperty(value = "temperature", required = true) BigDecimal temperature, @JsonProperty(value = "moisturePercentage", required = true) BigDecimal moisturePercentage) { this.time = time; this.nwkAddr = nwkAddr; this.temperature = temperature; this.moisturePercentage = moisturePercentage; }
- Scala
-
/** * A JSON codec for [[SoilStateReading]] */ object SoilStateReadingJsonProtocol extends DefaultJsonProtocol { implicit val instantFormat: JsonFormat[Instant] = new JsonFormat[Instant] { override def write(obj: Instant): JsValue = JsString(obj.toString) override def read(json: JsValue): Instant = Instant.parse(json.convertTo[String]) } implicit val soilStateReadingFormat: JsonFormat[SoilStateReading] = jsonFormat4(SoilStateReading.apply) }
We next provide conveniences for appending and tailing streams of observations. Appender and tailer utilities are useful though as there can be one or more services producing or consuming soil moisture/temperature sensor observations, each doing something different with the data. One example of an appender is the transformer service described later.
Here is our implementation of a function providing a tailer:
- Java
-
/** * Conveniently tail, decrypt and decode readings. Yields the reading and its offset. */ public static Flow<DurableQueue.Received, Tuple2<SoilStateReading, Long>, NotUsed> tailer(Function<String, CompletionStage<Either<Principal.FailureResponse, Principal.SecretRetrieved>>> getSecret) { return Flow.<DurableQueue.Received>create() .mapAsync(1, e -> { ByteString encryptedData = e.data(); long o = e.offset(); return Crypto .decrypt(getSecret.apply(KEY_PATH), encryptedData) .thenCompose(e1 -> { if (e1.isRight()) { return CompletableFuture.completedFuture(new Tuple2<>(e1.right().get(), o)); } else if (e1.isLeft() && e1.left().get().getClass() == Principal.Unauthorized.class) { return CompletableFuture.completedFuture(new Tuple2<>(ByteString$.MODULE$.empty(), o)); } else { throw new RuntimeException(e1.left().get().toString()); } }); }) .collect(new JavaPartialFunction<Tuple2<ByteString, Long>, Tuple2<ByteString, Long>>() { @Override public Tuple2<ByteString, Long> apply(Tuple2<ByteString, Long> x, boolean isCheck) { if (x._1.nonEmpty()) { return x; } else { throw noMatch(); } } }) .map(e -> { ByteString data = e._1(); long o = e._2(); return new Tuple2<>(mapper.readValue(data.toArray(), SoilStateReading.class), o); }) .withAttributes(ActorAttributes.withSupervisionStrategy(Supervision.getResumingDecider())); } - Scala
-
/** * Conveniently tail, decrypt and decode readings. Yields the reading and its offset. */ def tailer( getSecret: Principal.GetSecret ): Flow[DurableQueue.Received, (SoilStateReading, Long), NotUsed] = Flow[DurableQueue.Received] .mapAsync(1) { case DurableQueue.Received(_, bytes, o, _, _) => Crypto .decrypt(getSecret(SoilStateKey), bytes) .collect { case Right(encrypted) => Some(encrypted) -> o case Left(_: Principal.Unauthorized) => None -> o }(ExecutionContext.parasitic) } .collect { case (Some(encrypted), carry) => encrypted -> carry } .map { case (data, o) => import SoilStateReadingJsonProtocol._ (data.utf8String.parseJson.convertTo[SoilStateReading], o) } .withAttributes( ActorAttributes.supervisionStrategy(Supervision.resumingDecider) )
The tailer function will decrypt data from the durable queue using an address to a secret key, SoilStateKey
(discussed next). The Streams.decrypter
flow will call on the secrets manager with this key. How this call is made depends on the implementation of the GetSecret
function which is passed in e.g. a call is made to IOx’s Secure Storage Service for IOx targets. Decrypted bytes are then marshaled to JSON and, if successful, flow out downstream from this tailer. The Supervision.resumingDecider
is there to ignore observations that cannot be decoded for any reason e.g. unable to decrypt or unable to be parsed as JSON (Akka streams normally fail if there’s an exception).
By default, every packet that is received by LoRaWAN will have an application payload encrypted in relation to a device’s network address (nwkAddr
). For our application, we have decided that once decrypted from LoRaWAN, we will publish them to a durable queue and encrypt them in relation to the application has a whole. This is a design decision and should you need to encrypt all observations then you can form the key passed to GetSecret
with the device’s network address (passed in as the first parameter of the DurableQueue.Received
object - which we’re ignoring in our example).
As a final step, we provide some constants representing the name of the topic that our sensor observations can be tailed from, and the key used to encrypt/decrypt them:
- Java
-
/** * The topic used to send encrypted domain object data to and consume it from */ public static final String DATA_UP_JSON_TOPIC = "soilstate-data-up-json"; /** * The path of the secret to be used for encrypting and decrypting the domain object */ public static final String KEY_PATH = "secrets.soilstate.key";
- Scala
-
val SoilStateDataUpJsonTopic: DurableQueue.Topic = "soilstate-data-up-json" val SoilStateKey: String = "secrets.soilstate.key"
The transformer
The transformer’s role is to tail the durable queue that LoRaWAN sensor events are appended to and call upon the domain model’s functions to marshal them from byte form to a domain object while also decrypting them. We then serialize to JSON, encrypt using the soilstate application’s key, and append to another durable queue. The following code represents the transformer in its entirety:
- Java
-
/** * Run the transformation process to convert from LoRaWAN packets * to their soilstate domain object expressed in json and then * re-published. The packets received on this topic have already * been verified by an NS and so do not require MIC or counter * verification. Any MacPayload data that is not ConfirmedDataUp * or UnconfirmedDataUp can also be safely ignored as it should * not be received here. */ public final class SoilStateTransformer { /** * The durable queue topic where LoRaWAN packets are consumed from */ static final String DATA_UP_MAC_PAYLOAD_TOPIC = "soilstate-data-up-mac-payload"; /** * Provides a source to perform the transformation. */ public static Source<Span, NotUsed> source(DurableQueue durableQueue, Function<String, CompletionStage<Either<Principal.FailureResponse, Principal.SecretRetrieved>>> getSecret) { Tracer tracer = OpenTelemetry.getTracer(SoilStateTransformer.class.getName()); Flow<DurableQueue.Received, Span, NotUsed> transform = Flow.<DurableQueue.Received>create() .named("soilstate") .log("soilstate") .collectType(DurableQueue.Received.class) .map(received -> { ByteString data = received.data(); Span span = tracer .spanBuilder("soilstate-transformation") .addLink(tracer.getCurrentSpan().getContext()) .startSpan(); try (Scope ignored = tracer.withSpan(span)) { Propagation.context(received.headers(), Context.current()); } return new Tuple2<>(data, new Tuple2<>(received, span)); }) .via(Streams.dataUpDecoder(getSecret)) .map(e -> { int nwkAddr = e._1()._1(); ByteString payload = e._1()._3(); Tuple2<DurableQueue.Received, Span> carry = e._2(); return new Tuple2<>( new SoilStateReading( Instant.now(), nwkAddr, payload.toArray()), carry); }) .via(SoilStateReading.appender(getSecret)) .via(durableQueue.flow()) .collect(new JavaPartialFunction<DurableQueue.CommandReply<Tuple2<DurableQueue.Received, Span>>, Tuple2<DurableQueue.Committable, Span>>() { @Override public Tuple2<DurableQueue.Committable, Span> apply(DurableQueue.CommandReply<Tuple2<DurableQueue.Received, Span>> x, boolean isCheck) { if (x.event() instanceof DurableQueue.SendAck$) { Tuple2<DurableQueue.Received, Span> carry = x.carry().get(); return new Tuple2<>(carry._1(), carry._2()); } else throw noMatch(); } }) .via(durableQueue.commit(UuidOps.v5(SoilStateTransformer.class))) .wireTap(Span::end); return Source.fromFuture(durableQueue.offset(DATA_UP_MAC_PAYLOAD_TOPIC, UuidOps.v5(SoilStateTransformer.class))) .flatMapConcat(offset -> durableQueue.source( DATA_UP_MAC_PAYLOAD_TOPIC, offset, false )) .via(transform.asScala()); } } - Scala
-
/** * Run the transformation process to convert from LoRaWAN packets * to their soilstate domain object expressed in json and then * re-published. The packets received on this topic have already * been verified by an NS and so do not require MIC or counter * verification. Any MacPayload data that is not ConfirmedDataUp * or UnconfirmedDataUp can also be safely ignored as it should * not be received here. */ object SoilStateTransformer { /** * The durable queue topic where transformations are published to */ val SoilStateDataUpMacPayloadTopic: DurableQueue.Topic = "soilstate-data-up-mac-payload" /** * Provides a source to perform the transformation. */ def source(durableQueue: DurableQueue, getSecret: Principal.GetSecret)(implicit mat: Materializer ): Source[Span, NotUsed] = { val tracer = OpenTelemetry.getTracer(SoilStateTransformer.getClass.getName) val transform = Flow[DurableQueue.Received] .named("soilstate") .log("soilstate", identity) .map { received => val span = tracer .spanBuilder("soilstate-transformer") .addLink(tracer.getCurrentSpan.getContext) .startSpan() val _ = Using.resource(tracer.withSpan(span)) { _ => val _ = Propagation.context(received.headers, Context.current()) } (received.data, received -> span) } .via(LoRaStreams.dataUpDecoder(getSecret)) .map { case ((nwkAddr, _, payload), (received, span)) => (SoilStateReading(Instant.now(), nwkAddr, payload.toArray), (received, span)) } .via(SoilStateReading.appender(getSecret)) .via(durableQueue.flow) .collect { case DurableQueue.CommandReply(DurableQueue.SendAck, Some(carry)) => carry } .via(durableQueue.commit(UuidOps.v5(SoilStateTransformer.getClass))) .wireTap(_.`end`()) Source .future( durableQueue .offset(SoilStateDataUpMacPayloadTopic, UuidOps.v5(SoilStateTransformer.getClass)) ) .flatMapConcat(offset => durableQueue.source( SoilStateDataUpMacPayloadTopic, offset, finite = false ) ) .via(transform) } }
To break it down:
-
We declare the topic that we tail when consuming observations. The queue of the topic is appended to by the LoRaWAN Network Server.
- Java
-
/** * The durable queue topic where LoRaWAN packets are consumed from */ static final String DATA_UP_MAC_PAYLOAD_TOPIC = "soilstate-data-up-mac-payload";
- Scala
-
/** * The durable queue topic where transformations are published to */ val SoilStateDataUpMacPayloadTopic: DurableQueue.Topic = "soilstate-data-up-mac-payload"
-
We declare a function that will return an Akka Stream’s
Source
- ignore the type of source that it returns as it is unimportant. This Source will be “materialized” by the caller - materialization is the act of running a stream.- Java
-
/** * Provides a source to perform the transformation. */ public static Source<Span, NotUsed> source(DurableQueue durableQueue, Function<String, CompletionStage<Either<Principal.FailureResponse, Principal.SecretRetrieved>>> getSecret) { - Scala
-
/** * Provides a source to perform the transformation. */ def source(durableQueue: DurableQueue, getSecret: Principal.GetSecret)(implicit mat: Materializer ): Source[Span, NotUsed] = {
-
Durable queues have an offset to start tailing from. The offset is typically used to expresss resumption from where the last successful tailing run from. We do this by using a
resumableSource
. This way, if the service is restarted, it will not start from the beginning of the durable queue, but from the last place that it successfully processed.- Java
-
return Source.fromFuture(durableQueue.offset(DATA_UP_MAC_PAYLOAD_TOPIC, UuidOps.v5(SoilStateTransformer.class))) .flatMapConcat(offset -> durableQueue.source( DATA_UP_MAC_PAYLOAD_TOPIC, offset, false )) .via(transform.asScala());
- Scala
-
Source .future( durableQueue .offset(SoilStateDataUpMacPayloadTopic, UuidOps.v5(SoilStateTransformer.getClass)) ) .flatMapConcat(offset => durableQueue.source( SoilStateDataUpMacPayloadTopic, offset, finite = false ) ) .via(transform)
NoteGiven a restart, it is entirely possible for a transformer to publish an observation that it published before. Streambed takes the view that preventing duplication is almost impossible, particularly for distributed systems. For example, imagine that there are multiple xDP based routers in a network and two of them receive the same LoRaWAN packet, which is entirely possible and indeed common. Suppose also that those two routers do not communicate with each other given the lack of network connectivity. There is nothing that can be done to prevent any duplication in a distributed system - you can only minimize it. So, rather than try and avoid duplication, Streambed pushes the problem to the point of rendering the information where it is easy to deal with e.g. a user interface.
-
A Flow is passed into the
source
which extracts the LoRaWAN payload bytes (data
) and any header information added by the LoRaWAN Network Server.- Java
-
Tracer tracer = OpenTelemetry.getTracer(SoilStateTransformer.class.getName()); Flow<DurableQueue.Received, Span, NotUsed> transform = Flow.<DurableQueue.Received>create() .named("soilstate") .log("soilstate") .collectType(DurableQueue.Received.class)
- Scala
-
val transform = Flow[DurableQueue.Received] .named("soilstate") .log("soilstate", identity)
Headers may include OpenTracing span information. Spans permit an event to be traced across a distributed set of services and can be very useful in determining the health of a system, particularly when bad things happen. The Streambed LoRaWAN Network Server will initiate a span upon a LoRaWAN event occurring and transformers can carry that data through to other components, even on different networks. The
Headers.spanContext
invocation is a streambed utility that conveniently forms a span out of header information. -
Optional step: Given this tracing information, we then call a function that instruments our flow by calling open tracing.
- Java
-
.map(received -> { ByteString data = received.data(); Span span = tracer .spanBuilder("soilstate-transformation") .addLink(tracer.getCurrentSpan().getContext()) .startSpan(); try (Scope ignored = tracer.withSpan(span)) { Propagation.context(received.headers(), Context.current()); } return new Tuple2<>(data, new Tuple2<>(received, span)); })
- Scala
-
.map { received => val span = tracer .spanBuilder("soilstate-transformer") .addLink(tracer.getCurrentSpan.getContext) .startSpan() val _ = Using.resource(tracer.withSpan(span)) { _ => val _ = Propagation.context(received.headers, Context.current()) } (received.data, received -> span) }
-
The call to
LoRaStreams.dataUpDecoder
decrypts and decodes a LoRaWAN packet:- Java
-
.via(Streams.dataUpDecoder(getSecret))
- Scala
-
.via(LoRaStreams.dataUpDecoder(getSecret))
Given a sequence of bytes, the above will decode them as either a LoRaWAN
ConfirmedDataUp
orUnconfirmedDataUp
message and return the device’snwkAddr
along with a decrypted payload. -
This next line transforms a sequence of bytes into the domain object that we declared earlier:
- Java
-
.map(e -> { int nwkAddr = e._1()._1(); ByteString payload = e._1()._3(); Tuple2<DurableQueue.Received, Span> carry = e._2(); return new Tuple2<>( new SoilStateReading( Instant.now(), nwkAddr, payload.toArray()), carry); })
- Scala
-
.map { case ((nwkAddr, _, payload), (received, span)) => (SoilStateReading(Instant.now(), nwkAddr, payload.toArray), (received, span)) }
NoteLoRaWAN devices generally don’t convey time given clock-skew. So we note the time received within the transformer as it is an application consideration i.e. we may not actually care about the time an observation was received (although we do generally).
-
Encode as JSON, encrypt and form a request to append
- Java
-
.via(SoilStateReading.appender(getSecret))
- Scala
-
.via(SoilStateReading.appender(getSecret))
NoteJSON isn’t a strict requirement but is used to promote interoperability, particularly given that it is quite common for data to be consumed outside of Streambed.
-
Append to the durable queue:
- Java
-
.via(durableQueue.flow())
- Scala
-
.via(durableQueue.flow)
-
This stage will store the offset of the LoRaWAN payload that we originally received so that we can avoid transforming the same data multiple times.
- Java
-
.collect(new JavaPartialFunction<DurableQueue.CommandReply<Tuple2<DurableQueue.Received, Span>>, Tuple2<DurableQueue.Committable, Span>>() { @Override public Tuple2<DurableQueue.Committable, Span> apply(DurableQueue.CommandReply<Tuple2<DurableQueue.Received, Span>> x, boolean isCheck) { if (x.event() instanceof DurableQueue.SendAck$) { Tuple2<DurableQueue.Received, Span> carry = x.carry().get(); return new Tuple2<>(carry._1(), carry._2()); } else throw noMatch(); } }) .via(durableQueue.commit(UuidOps.v5(SoilStateTransformer.class)))
- Scala
-
.collect { case DurableQueue.CommandReply(DurableQueue.SendAck, Some(carry)) => carry } .via(durableQueue.commit(UuidOps.v5(SoilStateTransformer.getClass)))
-
Our final stage will call our instrumentation code again; this time signifying that the transformation has completed.
- Java
-
.wireTap(Span::end);
- Scala
-
.wireTap(_.`end`())
At this point, the source
stages are complete and the current offset in the durable queue is persisted.
Transformers are a service in their own right, and can be housed within the same process as the backend web service, or in a process of its own. We have decided to run it within its own process. To break it down:
-
All streambed applications will have an entry point (main method):
- Java
-
/** * Bootstraps our application and handles signals */ public class SoilStateServerEntryPoints { private static ApplicationProcess applicationProcess = null; public static void main(String[] args) { applicationProcess = new ApplicationProcess(new SoilStateServer()); applicationProcess.main(args); } public static void trap(int signal) { if (applicationProcess != null) { applicationProcess.trap(signal); } } } - Scala
-
/** * Bootstraps our application and handles signals */ object SoilstateServerEntryPoints { private lazy val applicationProcess = ApplicationProcess(SoilStateServer) def main(args: Array[String]): Unit = applicationProcess.main(args) def trap(signal: Int): Unit = applicationProcess.trap(signal) }
A static application process is created which sets up an application’s environment. The static
main
method is as per the regular JVM entrypoint.trap
is provided when run within Streambed. Streambed will calltrap
to forward on any operating system signals to your process, for eachApplicationProcess
provides a great deal of default behavior. -
This next line declares an instance of a streambed
Application
and mixes in traits (interfaces) ofDurableQueueProvider
andSecretStoreProvider
.- Java
-
/** * This is our main entry point to the application being responsible for serving assets as well as providings the UI's * RESTful endpoints */ public class SoilStateServer extends Application implements DurableQueueProvider, RawStorageProvider, SecretStoreProvider { // Snapshots are saved after processing this many events. private static final long SAVE_EVERY = 1000; // Wire in DQ Server/Client as our durable queue @Override public Client acquireDurableQueue(ActorSystem system) { return DurableQueueProvider.super.acquireDurableQueue(system); } // Wire in FileSystemRawStorage as our raw storage @Override public FileSystemRawStorage acquireRawStorage(ActorSystem system) { return RawStorageProvider.super.acquireRawStorage(system); } // Wire in the IOx secret store as our secret store @Override public IOxSecretStore acquireSecretStore(ActorSystem system) { return SecretStoreProvider.super.acquireSecretStore(system); } - Scala
-
/** * The Soil moisture/temp application */ object SoilStateServer extends Application with DurableQueueProvider with RawStorageProvider with SecretStoreProvider {
How these traits are imported declares whether to use ChronicleQueue for the durable queue and IOx for secret storage. When targeting IOx, these are exactly what we require.
- Java
-
import com.cisco.streambed.durablequeue.remote.Client; import com.cisco.streambed.durablequeue.remote.DurableQueueProvider;
- Scala
-
import com.cisco.streambed.durablequeue.chroniclequeue.DurableQueueProvider import com.cisco.streambed.storage.fs.RawStorageProvider import com.cisco.streambed.identity.iox.SecretStoreProvider
Other platforms would be backed by something else e.g. Kafka and Hashicorp Vault.
-
The final step is to have the
ApplicationProcess
main
method establish an Open Tracingtracer
and associate it with the ability to have its telemetry inspected over a Unix Domain Socket.- Java
-
// Start the transformer's telemetry reporting up as we are running it within the same process // We have no metrics, hence the Source.empty for the Unix Domain Socket reporter. Pair<StreamMetricExporter, StreamSpanExporter> exporters = TelemetryConfig.create(system); TelemetryReporter .report(exporters.first().source().asJava(), exporters.second().source().asJava(), system) .thenApply(serverBindings -> { serverBindings.forEach(b -> log.info("Telemetry listening on {}", b)); return serverBindings; }); // Start the transformer itself RestartSource.withBackoff(minBackoff, maxBackoff, backoffRandomFactor, () -> SoilStateTransformer .source(context.durableQueue(), Principal.toJava(context.principal().getSecret())) ).runWith(Sink.ignore(), system);
- Scala
-
val (metricExporter, spanExporter) = TelemetryConfig() val _ = TelemetryReporter .report(metricExporter.source, spanExporter.source) .foreach { bs => bs.foreach { b => system.log.info("Telemetry listening on {}", b) } }(ExecutionContext.parasitic) val _ = RestartSource .withBackoff(minBackoff, maxBackoff, backoffRandomFactor) { () => SoilStateTransformer.source(durableQueue, principal.getSecret) } .runWith(Sink.ignore)
As a final step, the transformer is hosted by a RestartSource
so that any abnormal failures will cause it to be restarted.
The transformer is now ready to be run. Startup the sandbox:
sandbox | docker-compose -p xdp-sandbox -f - up
Start the transformer using your favorite IDE or development tools. It will connect to the sandbox based services.
When starting the transformer, or any Streambed based service alongside the sandbox, you can use Jaeger tracing to trace activity instead of polluting your beautiful code with logging statements. To do this simple enable Jaeger tracing by declaring the property, -Djaeger.enabled=true
, for your application. When you started the sandbox, it also started the Jaeger tracing UI and reported the address of where it resides. We will refer to Jaeger again later.
Back now at the command line, we will send an event to the transformer, as if it came from the LoRaWAN Network Server. We will also observe the event being emitted from the transformer. To do this we will use Streambed’s MQTT gateway service. Please type in the following:
Remember, when prompted for a username and password, use the ones that the sandbox informs you of.
The MQTT configuration is only for testing so that we can verify data flowing through. The configuration supports your development workflow.
lora type add \
soilstate-data-up-mac-payload \
"Soil Moisture/Temp"
streambed secret add secrets.soilstate.key 2B7E151628AED2A6ABF7158809CF4F3C
lora end-device add \
soilstate-data-up-mac-payload \
v1 \
abp \
--dev-eui 0000000045645655 \
--dev-addr 45645655 \
--app-s-key 2B7E151628AED2A6ABF7158809CF4F3C \
--nwk-s-key 2B7E151628AED2A6ABF7158809CF4F3C
streambed mqtt add down \
soilstate-down \
mqtt-soilstate-up \
soilstate-data-up-mac-payload \
--data-is-binary
streambed mqtt add up \
soilstate-up \
--secret-path=secrets.soilstate.key \
soilstate-data-up-json \
mqtt-soilstate-down
OK, that may seem daunting! Let’s break it down:
- We declare a new type of device using
lora type add
. - We add a secret to be used for encryption having transformed from the
soilstate-data-up-mac-payload
topic. - We add a sensor to the system, associating it with the
soilstate-data-up-json
topic. This will, in turn, populate the secret atsecrets.lora.AppSKey.01645655
with2B7E151628AED2A6ABF7158809CF4F3C
. - We then add a downstream route to the MQTT gateway so that what we publish to the
mqtt-soilstate-up
MQTT topic will be encrypted with the key atsecrets.lora.AppSKey.01645655
and appended to thesoilstate-data-up-mac-payload
topic. This route represents our pretence to be a LoRaWAN Network Server, hence the--data-is-binary
. This then permits us to convey Base64 encoded binary data to MQTT, which’ll end up as raw bytes when presented to our transformer - exactly as when received from a LoRaWAN Network Server. - We finally add an upstream route to the MQTT gateway so that data appended to
soilstate-data-up-json
will be emitted on the MQTT topic,mqtt-soilstate-down
.
Now that we have our transformer running, and Streambed is provisioned with the information it needs, we are in a position to observe sensor data flowing from a “pretend” LoRaWAN Network Server, through the transformer, and then emitted into another MQTT topic.
First, you’ll need some MQTT command line utilities, namely mosquitto_sub
and mosquitto_pub
which will be used to subscribe to MQTT topics and publish to them respectively. If you don’t already have them then they can be found at the Eclipse Mosquitto website.
From one terminal window, subscribe to observing all topics of Mosquitto:
mosquitto_sub -v -h localhost -p 1883 -t '#'
From another terminal window, we can now publish some data as if we are the LoRaWAN Network Server:
mosquitto_pub -t mqtt-soilstate-up -m '{"key":23352917,"data":"gFVWZEUAAgAPXkErnCVxx9fZN6IMGBfBKYKrHQ7T4w=="}'
The key
will be interpretted by the MQTT gateway as the sensor’s nwkAddr
field (in hex, 0x1645655
) and the data
is the Base64 encoding of a LoRaWAN packet as received by a LoRaWAN packet forwarder.
When published to Mosquitto, you should then see output as follows:
mqtt-soilstate-up {"key":23352917,"data":"gFVWZEUAAgAPXkErnCVxx9fZN6IMGBfBKYKrHQ7T4w=="}
mqtt-soilstate-down {"key":23352917,"data":{"time":"2018-08-10T02:41:09.111Z","nwkAddr":23352917,"temperature":24.9,"moisturePercentage":20.2}}
This shows us the message that we published the mqtt-soilstate-up
Mosquitto topic. It also shows us what was published to the mqtt-soilstate-down
, which is the output from the transformer. Here’s a flow of what just happened:
mqtt-soilstate-up --> soilstate-data-up-mac-payload -+
|
v
transform
|
|
mqtt-soilstate-down <-- soilstate-data-up-json <-+
… where soilstate-data-up-mac-payload
and soilstate-data-up-json
are our durable queue topics.
If you enabled Jaeger tracing with the transformer as described in an earlier note, you can also use its UI to report on the transformation including how long it took.
Congratulations! You’ve now tested your transformer without leaving the comfort of your regular development environment!
Serving the user interface
Your application is now able to transform LoRaWAN packets into a domain object. The next step is to be able to serve any web assets that your application has to run within a web browser, and to provide the following APIs:
- authentication - permits the user to login
- end device events - sensor meta data
- observations - sensor readings
Let’s first see how HTTP is served:
- Java
-
// In order to access all directives we need an instance where the routes are define. Routes routes = new Routes(); final Flow<HttpRequest, HttpResponse, NotUsed> routeFlow = routes.createMainRoute(context).flow(system); HttpServerConfig .bindRoute(routeFlow, context) .thenApply(serverBindings -> { serverBindings.forEach(b -> log.info("Server listening on {}", b)); return serverBindings; }) .exceptionally(failure -> { log.error(failure, "Bind failed, exiting"); System.exit(1); return null; });
Akka HTTP provides an embedded, Reactive-Streams-based, fully asynchronous HTTP/1.1 server implemented on top of Akka Streams (also used in the transformer). The code above binds the server to a network interface and configures all HTTP traffic to go through a “route” declaration. This route declaration declares how requests are handled, and is shown in its entirety below:
- Java
-
// Describe the HTTP routes private static class Routes extends AllDirectives { private Route createMainRoute(ApplicationContext context) { ActorSystem system = context.system(); DurableQueue durableQueue = context.durableQueue(); Storage storage = context.storage(); UserIdentityService identityService = UserIdentityService.apply(context); return concat( path(PathMatchers.segment("api").slash("login"), () -> post(() -> entity(Jackson.unmarshaller(Credentials.class), credentials -> { CompletionStage<String> result = FutureConverters .toJava(identityService.authenticate(credentials.getUsername(), credentials.getPassword())) .thenApply(r -> OptionConverters.toJava(r).orElseThrow(RuntimeException::new)); return onComplete(result, maybeResult -> maybeResult .map(this::complete) .recover(new PFBuilder<Throwable, Route>() .matchAny(ex -> complete(StatusCodes.UNAUTHORIZED, "Bad credentials")) .build()) .get()); }))), pathPrefix("api", () -> authenticateOAuth2Async("secured api", identityService::verifier, principal -> concat( path("end-devices", () -> concat( get(() -> completeOK( EndDeviceService.events( durableQueue, storage, Principal.toJava(principal.getSecret()), eventsProcessed -> false, system) .map(t -> ServerSentEventMarshaller.toServerSentEvent(t._1(), t._2())) .keepAlive(Duration.of(10, ChronoUnit.SECONDS), ServerSentEvent::heartbeat), EventStreamMarshalling.toEventStream())))), path("soilstate", () -> concat( get(() -> completeOK( SoilstateService.events( durableQueue, storage, Principal.toJava(principal.getSecret()), eventsProcessed -> false, system) .map(t -> ServerSentEventMarshaller.toServerSentEvent(t._1(), t._2())) .keepAlive(Duration.of(10, ChronoUnit.SECONDS), ServerSentEvent::heartbeat), EventStreamMarshalling.toEventStream()))))))), // Explicitly list out all SPA routes so that refreshing works as expected pathEndOrSingleSlash(() -> getFromResource("dist/index.html")), pathPrefix("end-devices", () -> getFromResource("dist/index.html")), getFromResourceDirectory("dist") ).seal(); } }
Web assets
Breaking this down, the last section declares how web assets (your mobile application) will be served:
- Java
-
// Explicitly list out all SPA routes so that refreshing works as expected pathEndOrSingleSlash(() -> getFromResource("dist/index.html")), pathPrefix("end-devices", () -> getFromResource("dist/index.html")), getFromResourceDirectory("dist")
The first 2 lines declare how certain paths should be handled with respect to serving up index.html
. Your application may have an effect on these lines, and you typically have the first line that causes a single slash to serve up the main page. The second line is specific to our Angular application example where end-devices
is a path that should always serve up the index page.
The last line will always be present and translates a path into a file at the dist
folder. Our convention is that your frontend’s web assets will always be found at the dist
folder on the class path. The build file of the project will copy files front frontend/dist
into dist
on the class path. Therefore, your workflow for building a release must cause frontend/dist
to be populated with the web assets that this backend will serve up.
Authentication
Whenever a request to the /api
path is made, so long as it is not /api/login
, then an OAuth 2.0 token is expected given that the paths are “protected” i.e. your user must be logged in.
Here is the authentication route declaration:
- Java
-
path(PathMatchers.segment("api").slash("login"), () -> post(() -> entity(Jackson.unmarshaller(Credentials.class), credentials -> { CompletionStage<String> result = FutureConverters .toJava(identityService.authenticate(credentials.getUsername(), credentials.getPassword())) .thenApply(r -> OptionConverters.toJava(r).orElseThrow(RuntimeException::new)); return onComplete(result, maybeResult -> maybeResult .map(this::complete) .recover(new PFBuilder<Throwable, Route>() .matchAny(ex -> complete(StatusCodes.UNAUTHORIZED, "Bad credentials")) .build()) .get()); }))),
These lines will accept an HTTP POST
request that supplies a “credentials” JSON object containing a user name and password:
- Java
-
/** * Required for authentication */ public class Credentials { private final String username; private final String password; @JsonCreator public Credentials( @JsonProperty(value = "username", required = true) String username, @JsonProperty(value = "password", required = true) String password) { this.username = username; this.password = password; } public String getUsername() { return username; } public String getPassword() { return password; } }
The “unmarshaller” will automatically convert the credentials from their JSON form into the Credentials
object and then pass them onto the Streambed “identity” service for authentication. A successful authentication will cause an OAuth token to be replied. Any frontend can then supply this token with other requests to /api
.
Tokens “expire” and when they do, subsequent requests to /api
will reply that a new token is required i.e. the user is required to login again.
All traffic is passed over TLS and so passing credentials is done so in an encrypted manner.
End device events
End device events describe metadata in relation to a sensor. The aim is to return these events to the frontend for those devices that the user is authorized to observe. The route declaration will cause an HTTP GET
request to be made on end device events, which will then present a continuous stream of replies, known as Server Sent Events. The following code shows how the end device events route is declared:
- Java
-
path("end-devices", () -> concat( get(() -> completeOK( EndDeviceService.events( durableQueue, storage, Principal.toJava(principal.getSecret()), eventsProcessed -> false, system) .map(t -> ServerSentEventMarshaller.toServerSentEvent(t._1(), t._2())) .keepAlive(Duration.of(10, ChronoUnit.SECONDS), ServerSentEvent::heartbeat), EventStreamMarshalling.toEventStream())))),
The GET
request will cause EndDeviceService.events
to be called which, in turn, will return a stream of end device events. The events will then be converted into Server Sent Events and replied to the frontend. We also use a keepAlive
so that clients are able to reliably detect whether a connection remains open.
The EndDeviceService
is as follows (please read the comment):
- Java
-
/** * Manages services in relation to end devices. */ class EndDeviceService { /** * Snapshots for this service are stored in storage with the following id */ private static final UUID latestPositionsId = UuidOps.v5(EndDeviceService.class, "latestPositions"); /** * Subscribe to the end device events queue returning a source of events and their offsets in the queue. * <p> * To optimize state rebuild time for the UI, we pick up from the latest snapshot, and defer the * decision of whether to save snapshots to a provided function. * <p> * After emitting the latest position for all sensors given the latest snapshot, we then continue to tail * the queue and directly emit events. */ static Source<Tuple2<EndDeviceEvents.Event, Long>, NotUsed> events(DurableQueue durableQueue, Storage storage, Function<String, CompletionStage<Either<Principal.FailureResponse, Principal.SecretRetrieved>>> getSecret, Function<Long, Boolean> shouldSave, ActorSystem system) { ExecutionContext ec = system.dispatcher(); return Source .fromSourceCompletionStage( FutureConverters .toJava(storage.load(LatestDevicePositions.getStateCodec(), latestPositionsId, ec)) .thenApply(maybeSnapshot -> Source.single( maybeSnapshot.isDefined() ? maybeSnapshot.get() : new LatestDevicePositions() )) ) .flatMapConcat(initialState -> Source .from(initialState.getPositions().values()) .map(reading -> new Tuple2<EndDeviceEvents.Event, Long>(reading, initialState.getOffset().orElse(0L))) .concat( durableQueue.source(EndDeviceEvents.EventTopic(), OptionConverters$.MODULE$.toScala(initialState.getOffset())).asJava() .dropWhile(r -> initialState.getOffset().isPresent() && initialState.getOffset().getAsLong() == r.offset()) .via(EndDeviceEvents.tailer(getSecret)) .scan(new Tuple3<>(initialState, Optional.<Tuple2<EndDeviceEvents.Event, Long>>empty(), 0L), (state, eventWithOffset) -> { final LatestDevicePositions latestPositions = state._1(); long eventsProcessed = state._3(); latestPositions.update(eventWithOffset); eventsProcessed++; if (shouldSave.apply(eventsProcessed)) { storage.save( LatestDevicePositions.getStateCodec(), latestPositionsId, latestPositions, ec ); } return new Tuple3<>(latestPositions, Optional.of(eventWithOffset), eventsProcessed); }) .map(Tuple3::_2) .filter(Optional::isPresent) .map(Optional::get) ) ) .mapMaterializedValue(v -> NotUsed.getInstance()); } }
Akka streams is used here to ensure that only what the frontend requires is what the frontend receives. This helps manage the amount of data transferred over the network by keeping it minimal. In the case of our frontend application, we are only ever interested in position updates (the sensor has been moved) and whether it has been removed from the system.
Observations
Sensor observations are served in a very similar way to the meta data of end device events. In our application’s case, we are only ever interested in rendering the latest observations. We therefore reduce the observations that we have down to the latest ones. Your application may have different requirements e.g. it may want the last two weeks of observations so that it can render a graph over time.
Packaging and deploying
Let’s now see how we can package our application, ready for deployment. We package as a Docker image that tends to have an assembly of all of our application’s dependencies in one jar file.
To package:
- Maven
-
mvn package docker:build
- sbt
-
sbt fdpSoilStateTransformer/docker:publishLocal
Let’s suppose the image we’ve built is streambed/fdpsoilstateui:0.5.1-SNAPSHOT
. Our goal is now to run the image within the context of the sandbox that we already have running:
docker run \
--rm \
--network xdp-sandbox_default \
streambed/fdpsoilstateui:0.5.1-SNAPSHOT
The xdp-sandbox_default
is the network that the sandbox uses.
That’s it! The application should behave as it did before i.e. using mosquitto_pub
and friends.