Soilstate backend

Tutorial objectives

At the end of the tutorial on the backend you should be able to:

  • Understand how a packet is transformed from a LoRaWAN packet into a domain object
  • Understand how to encrypt a domain object and publish it to a durable queue
  • Understand how a durable queue is used to receive these domain objects and decrypt them
  • Understand how the front end application is served
  • Understand how Streambed user identity is used by the backend
  • Understand how the back end provides data to the front end

A quick overview

Let’s dive right in. Once we’re through this section, we’ll back off and go through the project step by step.

The back end service principally delivers an Akka HTTP route with a base URI path of / on an operator-configured port. Data is served at an /api path, which is secured with an HTTP Bearer Token. There is one exception: /api/login is used to authenticate a user and obtain a bearer token. Bearer tokens have a lease time and will expire, resulting in requiring a user to authenticate again after a period of time.

All traffic on a service gateway is encrypted using TLS.

After serving up the front end assets, the front end makes a Server Sent Event (SSE) request for data at the /api/soilstate URI path. Akka SSE is part of Akka HTTP and upon receiving a connection, it will subscribe to durable queue events on a DurableQueue using the soilstate-data-up-json topic.

A durable queue is a commit log where its elements are stored for a period of time. Kafka and Chronicle Queue are good examples of durable queues. Topics are addresses of queues and can be used to tail and append data from/to them. When a stream tailing service is not running its events are still stored so that when it starts, the service will start reading from the tail and build its state accordingly. Therefore, services can be started and stopped with no loss of data. This is known as event sourcing and is particularly useful when swapping out an old service with a new one.

Events will be decrypted using a secret key. If this key is unavailable to the logged in user then no decryption and therefore no events will be returned. Here is a sample of the decrypted JSON data returned to the front end over its TLS connection:

{
  "time": "2018-02-22T10:18:22.173768Z",
  "nwkAddr": "01645655",
  "temperature": 24.9,
  "moisturePercentage": 20.2
}

The front end receives meta data in relation to a soil moisture/temp sensor (provisioning sensors in general is outside of the scope of this tutorial, but it is a function provided with Streambed). These sensors are keyed by a network address (nwkAddr). An /api/end-devices URI path is provided for receiving SSE events on sensor meta data. Here is a sample of the payload describing an update to sensor positioning data:

{
  "nwkAddr": "01645655",
  "time": "2018-02-22T10:20:02.073263Z",
  "position": {
    "lat": 125.6,
    "lng": 10.1
  }
}

The transformer component

Streambed holds the notion of a “transformer” with an associated “domain model”. The responsibility of a transformer is to marshal a LoRaWAN packet into a normalized domain model representation. We do this so that various other services are able to consume the data more easily and de-coupled from the knowledge that it was source from LoRaWAN (Streambed isn’t tied to LoRaWAN and is able to support a variety of sensor-data-receiving methods).

We have a template so that you can quickly generate a transformer with an associated domain model. You will be prompted for the following information:

  • a project name - we use names like soilstate to represent Soil Moisture/Temperature sensors

  • an organization - the reverse domain notation used for declaring namespaces

  • an organizationName - the plain english name of your organization for use in copyright statements

  • a deviceType - an identifier representing the type of device you’re focused on

To generate the project (accept the defaults when prompted):

Maven/Java
docker run \
  --rm -it \
  -v $PWD:/g8out \
  moredip/giter8 \
  https://github.com/streambed/lora-transformer-ui-java.g8
sbt/Scala
sbt new https://github.com/streambed/lora-transformer-ui-scala.g8.git

Open the project up in your favorite editor or IDE. We will now go over some areas that will require your attention for every new type of sensor that requires a transformer.

Note

If your editor or IDE supports the ability to highlight “TODO” or “FIXME” comments then you can use that to quickly navigate to these areas.

The domain model

The domain model describes how sensor observations are represented in your programming language. The domain model is what you start with when needing to describe a new type of sensor to Str. Here is a Scala domain model for soil moisture/temperature:

Java

/** * Captures a temperature/moisture reading of soil. */ public final class SoilStateReading { public static final ObjectMapper mapper = new ObjectMapper(); static { mapper.registerModule(new JavaTimeModule()); mapper.configure(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS, false); } private final Instant time; private final int nwkAddr; private final BigDecimal temperature; private final BigDecimal moisturePercentage; @JsonCreator public SoilStateReading(@JsonProperty(value = "time", required = true) Instant time, @JsonProperty(value = "nwkAddr", required = true) int nwkAddr, @JsonProperty(value = "temperature", required = true) BigDecimal temperature, @JsonProperty(value = "moisturePercentage", required = true) BigDecimal moisturePercentage) { this.time = time; this.nwkAddr = nwkAddr; this.temperature = temperature; this.moisturePercentage = moisturePercentage; } public Instant getTime() { return time; } public int getNwkAddr() { return nwkAddr; } public BigDecimal getTemperature() { return temperature; } public BigDecimal getMoisturePercentage() { return moisturePercentage; } /** * Construct from raw bytes */ public SoilStateReading(Instant time, int nwkAddr, byte[] payload) { this.time = time; this.nwkAddr = nwkAddr; if (payload.length >= 4) { ByteBuffer iter = ByteBuffer.wrap(payload); BigDecimal TEMP_OFFSET = BigDecimal.valueOf(40, 0); BigDecimal t = BigDecimal.valueOf(iter.getShort() & 0xFFFF, 1).subtract(TEMP_OFFSET); BigDecimal m = BigDecimal.valueOf(iter.getShort() & 0xFFFF, 1); this.temperature = t; this.moisturePercentage = m; } else { this.temperature = BigDecimal.ZERO; this.moisturePercentage = BigDecimal.ZERO; } } @Override public boolean equals(Object o) { if (this == o) return true; if (o == null || getClass() != o.getClass()) return false; SoilStateReading that = (SoilStateReading) o; return nwkAddr == that.nwkAddr && Objects.equals(time, that.time) && Objects.equals(temperature, that.temperature) && Objects.equals(moisturePercentage, that.moisturePercentage); } @Override public int hashCode() { return Objects.hash(time, nwkAddr, temperature, moisturePercentage); } @Override public String toString() { return "SoilStateReading{" + "time=" + time + ", nwkAddr=" + nwkAddr + ", temperature=" + temperature + ", moisturePercentage=" + moisturePercentage + '}'; }
Scala
/**
  * Captures a temperature/moisture reading of soil.
  */
final case class SoilStateReading(
    time: Instant,
    nwkAddr: Int,
    temperature: BigDecimal,
    moisturePercentage: BigDecimal
)

Each reading will therefore have a:

  • time expressed in UTC form - this is the time that it is received at the transformer (described in the next section)
  • nwkAddr, which is the address of the device, distinct within the network managed by Streambed
  • temperature, the soil temperature in degrees celsius
  • moisturePercentage, the moisture of the soil expressed in percentage terms, with 100% being fully saturated

After declaring a domain object, we must also declare a means to marshal it from a sequence of bytes. LoRaWAN will always ensure that the entire byte sequence from the sensor is delivered. We use an alternate constructor to form the domain object from the byte sequence:

Java

/** * Construct from raw bytes */ public SoilStateReading(Instant time, int nwkAddr, byte[] payload) { this.time = time; this.nwkAddr = nwkAddr; if (payload.length >= 4) { ByteBuffer iter = ByteBuffer.wrap(payload); BigDecimal TEMP_OFFSET = BigDecimal.valueOf(40, 0); BigDecimal t = BigDecimal.valueOf(iter.getShort() & 0xFFFF, 1).subtract(TEMP_OFFSET); BigDecimal m = BigDecimal.valueOf(iter.getShort() & 0xFFFF, 1); this.temperature = t; this.moisturePercentage = m; } else { this.temperature = BigDecimal.ZERO; this.moisturePercentage = BigDecimal.ZERO; } }
Scala
val TempOffset = BigDecimal(400, 1)
val Zero       = BigDecimal(0, 1)

/**
  * Construct from a decrypted LoRaWAN ConfirmedDataUp/UnconfirmedDataUp FRMPayload
  */
def apply(time: Instant, nwkAddr: Int, payload: Array[Byte]): SoilStateReading = {
  val (temperature, moisturePercentage) = if (payload.length >= 4) {
    val iter = ByteBuffer.wrap(payload)
    val t    = BigDecimal(iter.getShort & 0xffff, 1) - TempOffset
    val m    = BigDecimal(iter.getShort & 0xffff, 1)
    t -> m
  } else
    Zero -> Zero
  new SoilStateReading(time, nwkAddr, temperature, moisturePercentage)
}

This sensor conveys both the temperature and moisture within 4 bytes. As you can see, LoRaWAN payloads are generally quite small and less than 12 bytes. Temperature is the first 2 bytes and offset at value of 400 as per the sensor manufacturer’s specifications. If there’s a problem with decoding the bytes (perhaps there aren’t enough bytes given a buggy sensor) then the observation is rendered having values of 0. These could easily be filtered out and dropped by a transformer (described next, although we don’t drop them).

The next step when developing the domain model is to provide a means to marshal domain objects to and from JSON. We do this using spray-json as is illustrated below:

Java
public static final ObjectMapper mapper = new ObjectMapper();

static {
    mapper.registerModule(new JavaTimeModule());
    mapper.configure(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS, false);
}

private final Instant time;

private final int nwkAddr;

private final BigDecimal temperature;

private final BigDecimal moisturePercentage;

@JsonCreator
public SoilStateReading(@JsonProperty(value = "time", required = true) Instant time,
                        @JsonProperty(value = "nwkAddr", required = true) int nwkAddr,
                        @JsonProperty(value = "temperature", required = true) BigDecimal temperature,
                        @JsonProperty(value = "moisturePercentage", required = true) BigDecimal moisturePercentage) {
    this.time = time;
    this.nwkAddr = nwkAddr;
    this.temperature = temperature;
    this.moisturePercentage = moisturePercentage;
}
Scala
/**
  * A JSON codec for [[SoilStateReading]]
  */
object SoilStateReadingJsonProtocol extends DefaultJsonProtocol {
  implicit val instantFormat: JsonFormat[Instant] =
    new JsonFormat[Instant] {
      override def write(obj: Instant): JsValue =
        JsString(obj.toString)
      override def read(json: JsValue): Instant =
        Instant.parse(json.convertTo[String])
    }

  implicit val soilStateReadingFormat: JsonFormat[SoilStateReading] =
    jsonFormat4(SoilStateReading.apply)
}

We next provide conveniences for appending and tailing streams of observations. Appender and tailer utilities are useful though as there can be one or more services producing or consuming soil moisture/temperature sensor observations, each doing something different with the data. One example of an appender is the transformer service described later.

Here is our implementation of a function providing a tailer:

Java

/** * Conveniently tail, decrypt and decode readings. Yields the reading and its offset. */ public static Flow<DurableQueue.Received, Tuple2<SoilStateReading, Long>, NotUsed> tailer(Function<String, CompletionStage<Either<Principal.FailureResponse, Principal.SecretRetrieved>>> getSecret) { return Flow.<DurableQueue.Received>create() .mapAsync(1, e -> { ByteString encryptedData = e.data(); long o = e.offset(); return Crypto .decrypt(getSecret.apply(KEY_PATH), encryptedData) .thenCompose(e1 -> { if (e1.isRight()) { return CompletableFuture.completedFuture(new Tuple2<>(e1.right().get(), o)); } else if (e1.isLeft() && e1.left().get().getClass() == Principal.Unauthorized.class) { return CompletableFuture.completedFuture(new Tuple2<>(ByteString$.MODULE$.empty(), o)); } else { throw new RuntimeException(e1.left().get().toString()); } }); }) .collect(new JavaPartialFunction<Tuple2<ByteString, Long>, Tuple2<ByteString, Long>>() { @Override public Tuple2<ByteString, Long> apply(Tuple2<ByteString, Long> x, boolean isCheck) { if (x._1.nonEmpty()) { return x; } else { throw noMatch(); } } }) .map(e -> { ByteString data = e._1(); long o = e._2(); return new Tuple2<>(mapper.readValue(data.toArray(), SoilStateReading.class), o); }) .withAttributes(ActorAttributes.withSupervisionStrategy(Supervision.getResumingDecider())); }
Scala
/**
  * Conveniently tail, decrypt and decode readings. Yields the reading and its offset.
  */
def tailer(
    getSecret: Principal.GetSecret
): Flow[DurableQueue.Received, (SoilStateReading, Long), NotUsed] =
  Flow[DurableQueue.Received]
    .mapAsync(1) {
      case DurableQueue.Received(_, bytes, o, _, _) =>
        Crypto
          .decrypt(getSecret(SoilStateKey), bytes)
          .collect {
            case Right(encrypted)                => Some(encrypted) -> o
            case Left(_: Principal.Unauthorized) => None            -> o
          }(ExecutionContext.parasitic)
    }
    .collect { case (Some(encrypted), carry) => encrypted -> carry }
    .map {
      case (data, o) =>
        import SoilStateReadingJsonProtocol._
        (data.utf8String.parseJson.convertTo[SoilStateReading], o)
    }
    .withAttributes(
      ActorAttributes.supervisionStrategy(Supervision.resumingDecider)
    )

The tailer function will decrypt data from the durable queue using an address to a secret key, SoilStateKey (discussed next). The Streams.decrypter flow will call on the secrets manager with this key. How this call is made depends on the implementation of the GetSecret function which is passed in e.g. a call is made to IOx’s Secure Storage Service for IOx targets. Decrypted bytes are then marshaled to JSON and, if successful, flow out downstream from this tailer. The Supervision.resumingDecider is there to ignore observations that cannot be decoded for any reason e.g. unable to decrypt or unable to be parsed as JSON (Akka streams normally fail if there’s an exception).

Note

By default, every packet that is received by LoRaWAN will have an application payload encrypted in relation to a device’s network address (nwkAddr). For our application, we have decided that once decrypted from LoRaWAN, we will publish them to a durable queue and encrypt them in relation to the application has a whole. This is a design decision and should you need to encrypt all observations then you can form the key passed to GetSecret with the device’s network address (passed in as the first parameter of the DurableQueue.Received object - which we’re ignoring in our example).

As a final step, we provide some constants representing the name of the topic that our sensor observations can be tailed from, and the key used to encrypt/decrypt them:

Java
/**
 * The topic used to send encrypted domain object data to and consume it from
 */
public static final String DATA_UP_JSON_TOPIC = "soilstate-data-up-json";

/**
 * The path of the secret to be used for encrypting and decrypting the domain object
 */
public static final String KEY_PATH = "secrets.soilstate.key";
Scala
val SoilStateDataUpJsonTopic: DurableQueue.Topic =
  "soilstate-data-up-json"

val SoilStateKey: String =
  "secrets.soilstate.key"

The transformer

The transformer’s role is to tail the durable queue that LoRaWAN sensor events are appended to and call upon the domain model’s functions to marshal them from byte form to a domain object while also decrypting them. We then serialize to JSON, encrypt using the soilstate application’s key, and append to another durable queue. The following code represents the transformer in its entirety:

Java

/** * Run the transformation process to convert from LoRaWAN packets * to their soilstate domain object expressed in json and then * re-published. The packets received on this topic have already * been verified by an NS and so do not require MIC or counter * verification. Any MacPayload data that is not ConfirmedDataUp * or UnconfirmedDataUp can also be safely ignored as it should * not be received here. */ public final class SoilStateTransformer { /** * The durable queue topic where LoRaWAN packets are consumed from */ static final String DATA_UP_MAC_PAYLOAD_TOPIC = "soilstate-data-up-mac-payload"; /** * Provides a source to perform the transformation. */ public static Source<Span, NotUsed> source(DurableQueue durableQueue, Function<String, CompletionStage<Either<Principal.FailureResponse, Principal.SecretRetrieved>>> getSecret) { Tracer tracer = OpenTelemetry.getTracer(SoilStateTransformer.class.getName()); Flow<DurableQueue.Received, Span, NotUsed> transform = Flow.<DurableQueue.Received>create() .named("soilstate") .log("soilstate") .collectType(DurableQueue.Received.class) .map(received -> { ByteString data = received.data(); Span span = tracer .spanBuilder("soilstate-transformation") .addLink(tracer.getCurrentSpan().getContext()) .startSpan(); try (Scope ignored = tracer.withSpan(span)) { Propagation.context(received.headers(), Context.current()); } return new Tuple2<>(data, new Tuple2<>(received, span)); }) .via(Streams.dataUpDecoder(getSecret)) .map(e -> { int nwkAddr = e._1()._1(); ByteString payload = e._1()._3(); Tuple2<DurableQueue.Received, Span> carry = e._2(); return new Tuple2<>( new SoilStateReading( Instant.now(), nwkAddr, payload.toArray()), carry); }) .via(SoilStateReading.appender(getSecret)) .via(durableQueue.flow()) .collect(new JavaPartialFunction<DurableQueue.CommandReply<Tuple2<DurableQueue.Received, Span>>, Tuple2<DurableQueue.Committable, Span>>() { @Override public Tuple2<DurableQueue.Committable, Span> apply(DurableQueue.CommandReply<Tuple2<DurableQueue.Received, Span>> x, boolean isCheck) { if (x.event() instanceof DurableQueue.SendAck$) { Tuple2<DurableQueue.Received, Span> carry = x.carry().get(); return new Tuple2<>(carry._1(), carry._2()); } else throw noMatch(); } }) .via(durableQueue.commit(UuidOps.v5(SoilStateTransformer.class))) .wireTap(Span::end); return Source.fromFuture(durableQueue.offset(DATA_UP_MAC_PAYLOAD_TOPIC, UuidOps.v5(SoilStateTransformer.class))) .flatMapConcat(offset -> durableQueue.source( DATA_UP_MAC_PAYLOAD_TOPIC, offset, false )) .via(transform.asScala()); } }
Scala
/**
  * Run the transformation process to convert from LoRaWAN packets
  * to their soilstate domain object expressed in json and then
  * re-published. The packets received on this topic have already
  * been verified by an NS and so do not require MIC or counter
  * verification. Any MacPayload data that is not ConfirmedDataUp
  * or UnconfirmedDataUp can also be safely ignored as it should
  * not be received here.
  */
object SoilStateTransformer {

  /**
    * The durable queue topic where transformations are published to
    */
  val SoilStateDataUpMacPayloadTopic: DurableQueue.Topic =
    "soilstate-data-up-mac-payload"

  /**
    * Provides a source to perform the transformation.
    */
  def source(durableQueue: DurableQueue, getSecret: Principal.GetSecret)(implicit
      mat: Materializer
  ): Source[Span, NotUsed] = {
    val tracer = OpenTelemetry.getTracer(SoilStateTransformer.getClass.getName)

    val transform = Flow[DurableQueue.Received]
      .named("soilstate")
      .log("soilstate", identity)
      .map { received =>
        val span =
          tracer
            .spanBuilder("soilstate-transformer")
            .addLink(tracer.getCurrentSpan.getContext)
            .startSpan()
        val _ = Using.resource(tracer.withSpan(span)) { _ =>
          val _ = Propagation.context(received.headers, Context.current())
        }

        (received.data, received -> span)
      }
      .via(LoRaStreams.dataUpDecoder(getSecret))
      .map {
        case ((nwkAddr, _, payload), (received, span)) =>
          (SoilStateReading(Instant.now(), nwkAddr, payload.toArray), (received, span))
      }
      .via(SoilStateReading.appender(getSecret))
      .via(durableQueue.flow)
      .collect { case DurableQueue.CommandReply(DurableQueue.SendAck, Some(carry)) => carry }
      .via(durableQueue.commit(UuidOps.v5(SoilStateTransformer.getClass)))
      .wireTap(_.`end`())

    Source
      .future(
        durableQueue
          .offset(SoilStateDataUpMacPayloadTopic, UuidOps.v5(SoilStateTransformer.getClass))
      )
      .flatMapConcat(offset =>
        durableQueue.source(
          SoilStateDataUpMacPayloadTopic,
          offset,
          finite = false
        )
      )
      .via(transform)
  }
}

To break it down:

  1. We declare the topic that we tail when consuming observations. The queue of the topic is appended to by the LoRaWAN Network Server.

    Java
    /**
     * The durable queue topic where LoRaWAN packets are consumed from
     */
    static final String DATA_UP_MAC_PAYLOAD_TOPIC = "soilstate-data-up-mac-payload";
    Scala
    /**
      * The durable queue topic where transformations are published to
      */
    val SoilStateDataUpMacPayloadTopic: DurableQueue.Topic =
      "soilstate-data-up-mac-payload"
  2. We declare a function that will return an Akka Stream’s Source - ignore the type of source that it returns as it is unimportant. This Source will be “materialized” by the caller - materialization is the act of running a stream.

    Java

    /** * Provides a source to perform the transformation. */ public static Source<Span, NotUsed> source(DurableQueue durableQueue, Function<String, CompletionStage<Either<Principal.FailureResponse, Principal.SecretRetrieved>>> getSecret) {
    Scala
    /**
      * Provides a source to perform the transformation.
      */
    def source(durableQueue: DurableQueue, getSecret: Principal.GetSecret)(implicit
        mat: Materializer
    ): Source[Span, NotUsed] = {
  3. Durable queues have an offset to start tailing from. The offset is typically used to expresss resumption from where the last successful tailing run from. We do this by using a resumableSource. This way, if the service is restarted, it will not start from the beginning of the durable queue, but from the last place that it successfully processed.

    Java
    return Source.fromFuture(durableQueue.offset(DATA_UP_MAC_PAYLOAD_TOPIC, UuidOps.v5(SoilStateTransformer.class)))
            .flatMapConcat(offset -> durableQueue.source(
                    DATA_UP_MAC_PAYLOAD_TOPIC,
                    offset,
                    false
            ))
            .via(transform.asScala());
    Scala
    Source
      .future(
        durableQueue
          .offset(SoilStateDataUpMacPayloadTopic, UuidOps.v5(SoilStateTransformer.getClass))
      )
      .flatMapConcat(offset =>
        durableQueue.source(
          SoilStateDataUpMacPayloadTopic,
          offset,
          finite = false
        )
      )
      .via(transform)
    Note

    Given a restart, it is entirely possible for a transformer to publish an observation that it published before. Streambed takes the view that preventing duplication is almost impossible, particularly for distributed systems. For example, imagine that there are multiple xDP based routers in a network and two of them receive the same LoRaWAN packet, which is entirely possible and indeed common. Suppose also that those two routers do not communicate with each other given the lack of network connectivity. There is nothing that can be done to prevent any duplication in a distributed system - you can only minimize it. So, rather than try and avoid duplication, Streambed pushes the problem to the point of rendering the information where it is easy to deal with e.g. a user interface.

  4. A Flow is passed into the source which extracts the LoRaWAN payload bytes (data) and any header information added by the LoRaWAN Network Server.

    Java
    Tracer tracer = OpenTelemetry.getTracer(SoilStateTransformer.class.getName());
    
    Flow<DurableQueue.Received, Span, NotUsed> transform = Flow.<DurableQueue.Received>create()
            .named("soilstate")
            .log("soilstate")
            .collectType(DurableQueue.Received.class)
    Scala
    val transform = Flow[DurableQueue.Received]
      .named("soilstate")
      .log("soilstate", identity)

    Headers may include OpenTracing span information. Spans permit an event to be traced across a distributed set of services and can be very useful in determining the health of a system, particularly when bad things happen. The Streambed LoRaWAN Network Server will initiate a span upon a LoRaWAN event occurring and transformers can carry that data through to other components, even on different networks. The Headers.spanContext invocation is a streambed utility that conveniently forms a span out of header information.

  5. Optional step: Given this tracing information, we then call a function that instruments our flow by calling open tracing.

    Java
    .map(received -> {
        ByteString data = received.data();
    
        Span span = tracer
                .spanBuilder("soilstate-transformation")
                .addLink(tracer.getCurrentSpan().getContext())
                .startSpan();
    
        try (Scope ignored = tracer.withSpan(span)) {
            Propagation.context(received.headers(), Context.current());
        }
    
        return new Tuple2<>(data, new Tuple2<>(received, span));
    })
    Scala
    .map { received =>
      val span =
        tracer
          .spanBuilder("soilstate-transformer")
          .addLink(tracer.getCurrentSpan.getContext)
          .startSpan()
      val _ = Using.resource(tracer.withSpan(span)) { _ =>
        val _ = Propagation.context(received.headers, Context.current())
      }
    
      (received.data, received -> span)
    }
  6. The call to LoRaStreams.dataUpDecoder decrypts and decodes a LoRaWAN packet:

    Java
    .via(Streams.dataUpDecoder(getSecret))
    Scala
    .via(LoRaStreams.dataUpDecoder(getSecret))

    Given a sequence of bytes, the above will decode them as either a LoRaWAN ConfirmedDataUp or UnconfirmedDataUp message and return the device’s nwkAddr along with a decrypted payload.

  7. This next line transforms a sequence of bytes into the domain object that we declared earlier:

    Java
    .map(e -> {
        int nwkAddr = e._1()._1();
        ByteString payload = e._1()._3();
        Tuple2<DurableQueue.Received, Span> carry = e._2();
        return new Tuple2<>(
                new SoilStateReading(
                        Instant.now(),
                        nwkAddr,
                        payload.toArray()), carry);
    })
    Scala
    .map {
      case ((nwkAddr, _, payload), (received, span)) =>
        (SoilStateReading(Instant.now(), nwkAddr, payload.toArray), (received, span))
    }
    Note

    LoRaWAN devices generally don’t convey time given clock-skew. So we note the time received within the transformer as it is an application consideration i.e. we may not actually care about the time an observation was received (although we do generally).

  8. Encode as JSON, encrypt and form a request to append

    Java
    .via(SoilStateReading.appender(getSecret))
    Scala
    .via(SoilStateReading.appender(getSecret))
    Note

    JSON isn’t a strict requirement but is used to promote interoperability, particularly given that it is quite common for data to be consumed outside of Streambed.

  9. Append to the durable queue:

    Java
    .via(durableQueue.flow())
    Scala
    .via(durableQueue.flow)
  10. This stage will store the offset of the LoRaWAN payload that we originally received so that we can avoid transforming the same data multiple times.

    Java
    .collect(new JavaPartialFunction<DurableQueue.CommandReply<Tuple2<DurableQueue.Received, Span>>, Tuple2<DurableQueue.Committable, Span>>() {
        @Override
        public Tuple2<DurableQueue.Committable, Span> apply(DurableQueue.CommandReply<Tuple2<DurableQueue.Received, Span>> x, boolean isCheck) {
            if (x.event() instanceof DurableQueue.SendAck$) {
                Tuple2<DurableQueue.Received, Span> carry = x.carry().get();
    
                return new Tuple2<>(carry._1(), carry._2());
            } else
                throw noMatch();
        }
    })
    .via(durableQueue.commit(UuidOps.v5(SoilStateTransformer.class)))
    Scala
    .collect { case DurableQueue.CommandReply(DurableQueue.SendAck, Some(carry)) => carry }
    .via(durableQueue.commit(UuidOps.v5(SoilStateTransformer.getClass)))
  11. Our final stage will call our instrumentation code again; this time signifying that the transformation has completed.

    Java
    .wireTap(Span::end);
    
    Scala
    .wireTap(_.`end`())

At this point, the source stages are complete and the current offset in the durable queue is persisted.

Transformers are a service in their own right, and can be housed within the same process as the backend web service, or in a process of its own. We have decided to run it within its own process. To break it down:

  1. All streambed applications will have an entry point (main method):

    Java

    /** * Bootstraps our application and handles signals */ public class SoilStateServerEntryPoints { private static ApplicationProcess applicationProcess = null; public static void main(String[] args) { applicationProcess = new ApplicationProcess(new SoilStateServer()); applicationProcess.main(args); } public static void trap(int signal) { if (applicationProcess != null) { applicationProcess.trap(signal); } } }
    Scala
    /**
      * Bootstraps our application and handles signals
      */
    object SoilstateServerEntryPoints {
      private lazy val applicationProcess = ApplicationProcess(SoilStateServer)
    
      def main(args: Array[String]): Unit =
        applicationProcess.main(args)
    
      def trap(signal: Int): Unit =
        applicationProcess.trap(signal)
    }

    A static application process is created which sets up an application’s environment. The static main method is as per the regular JVM entrypoint. trap is provided when run within Streambed. Streambed will call trap to forward on any operating system signals to your process, for each ApplicationProcess provides a great deal of default behavior.

  2. This next line declares an instance of a streambed Application and mixes in traits (interfaces) of DurableQueueProvider and SecretStoreProvider.

    Java

    /** * This is our main entry point to the application being responsible for serving assets as well as providings the UI's * RESTful endpoints */ public class SoilStateServer extends Application implements DurableQueueProvider, RawStorageProvider, SecretStoreProvider { // Snapshots are saved after processing this many events. private static final long SAVE_EVERY = 1000; // Wire in DQ Server/Client as our durable queue @Override public Client acquireDurableQueue(ActorSystem system) { return DurableQueueProvider.super.acquireDurableQueue(system); } // Wire in FileSystemRawStorage as our raw storage @Override public FileSystemRawStorage acquireRawStorage(ActorSystem system) { return RawStorageProvider.super.acquireRawStorage(system); } // Wire in the IOx secret store as our secret store @Override public IOxSecretStore acquireSecretStore(ActorSystem system) { return SecretStoreProvider.super.acquireSecretStore(system); }
    Scala
    /**
      * The Soil moisture/temp application
      */
    object SoilStateServer
        extends Application
        with DurableQueueProvider
        with RawStorageProvider
        with SecretStoreProvider {

    How these traits are imported declares whether to use ChronicleQueue for the durable queue and IOx for secret storage. When targeting IOx, these are exactly what we require.

    Java
    import com.cisco.streambed.durablequeue.remote.Client;
    import com.cisco.streambed.durablequeue.remote.DurableQueueProvider;
    Scala
    import com.cisco.streambed.durablequeue.chroniclequeue.DurableQueueProvider
    import com.cisco.streambed.storage.fs.RawStorageProvider
    import com.cisco.streambed.identity.iox.SecretStoreProvider

    Other platforms would be backed by something else e.g. Kafka and Hashicorp Vault.

  3. The final step is to have the ApplicationProcess main method establish an Open Tracing tracer and associate it with the ability to have its telemetry inspected over a Unix Domain Socket.

    Java
    // Start the transformer's telemetry reporting up as we are running it within the same process
    // We have no metrics, hence the Source.empty for the Unix Domain Socket reporter.
    
    Pair<StreamMetricExporter, StreamSpanExporter> exporters = TelemetryConfig.create(system);
    
    TelemetryReporter
            .report(exporters.first().source().asJava(), exporters.second().source().asJava(), system)
            .thenApply(serverBindings -> {
                serverBindings.forEach(b -> log.info("Telemetry listening on {}", b));
                return serverBindings;
            });
    
    // Start the transformer itself
    RestartSource.withBackoff(minBackoff, maxBackoff, backoffRandomFactor, () ->
            SoilStateTransformer
                    .source(context.durableQueue(), Principal.toJava(context.principal().getSecret()))
    ).runWith(Sink.ignore(), system);
    
    Scala

    val (metricExporter, spanExporter) = TelemetryConfig() val _ = TelemetryReporter .report(metricExporter.source, spanExporter.source) .foreach { bs => bs.foreach { b => system.log.info("Telemetry listening on {}", b) } }(ExecutionContext.parasitic) val _ = RestartSource .withBackoff(minBackoff, maxBackoff, backoffRandomFactor) { () => SoilStateTransformer.source(durableQueue, principal.getSecret) } .runWith(Sink.ignore)

As a final step, the transformer is hosted by a RestartSource so that any abnormal failures will cause it to be restarted.

The transformer is now ready to be run. Startup the sandbox:

sandbox | docker-compose -p xdp-sandbox -f - up

Start the transformer using your favorite IDE or development tools. It will connect to the sandbox based services.

Note

When starting the transformer, or any Streambed based service alongside the sandbox, you can use Jaeger tracing to trace activity instead of polluting your beautiful code with logging statements. To do this simple enable Jaeger tracing by declaring the property, -Djaeger.enabled=true, for your application. When you started the sandbox, it also started the Jaeger tracing UI and reported the address of where it resides. We will refer to Jaeger again later.

Back now at the command line, we will send an event to the transformer, as if it came from the LoRaWAN Network Server. We will also observe the event being emitted from the transformer. To do this we will use Streambed’s MQTT gateway service. Please type in the following:

Note

Remember, when prompted for a username and password, use the ones that the sandbox informs you of.

Note

The MQTT configuration is only for testing so that we can verify data flowing through. The configuration supports your development workflow.

lora type add \
  soilstate-data-up-mac-payload \
  "Soil Moisture/Temp"
streambed secret add secrets.soilstate.key 2B7E151628AED2A6ABF7158809CF4F3C
lora end-device add \
  soilstate-data-up-mac-payload \
  v1 \
  abp \
  --dev-eui 0000000045645655 \
  --dev-addr 45645655 \
  --app-s-key 2B7E151628AED2A6ABF7158809CF4F3C \
  --nwk-s-key 2B7E151628AED2A6ABF7158809CF4F3C
streambed mqtt add down \
  soilstate-down \
  mqtt-soilstate-up \
  soilstate-data-up-mac-payload \
  --data-is-binary
streambed mqtt add up \
  soilstate-up \
  --secret-path=secrets.soilstate.key \
  soilstate-data-up-json \
  mqtt-soilstate-down

OK, that may seem daunting! Let’s break it down:

  1. We declare a new type of device using lora type add.
  2. We add a secret to be used for encryption having transformed from the soilstate-data-up-mac-payload topic.
  3. We add a sensor to the system, associating it with the soilstate-data-up-json topic. This will, in turn, populate the secret at secrets.lora.AppSKey.01645655 with 2B7E151628AED2A6ABF7158809CF4F3C.
  4. We then add a downstream route to the MQTT gateway so that what we publish to the mqtt-soilstate-up MQTT topic will be encrypted with the key at secrets.lora.AppSKey.01645655 and appended to the soilstate-data-up-mac-payload topic. This route represents our pretence to be a LoRaWAN Network Server, hence the --data-is-binary. This then permits us to convey Base64 encoded binary data to MQTT, which’ll end up as raw bytes when presented to our transformer - exactly as when received from a LoRaWAN Network Server.
  5. We finally add an upstream route to the MQTT gateway so that data appended to soilstate-data-up-json will be emitted on the MQTT topic, mqtt-soilstate-down.

Now that we have our transformer running, and Streambed is provisioned with the information it needs, we are in a position to observe sensor data flowing from a “pretend” LoRaWAN Network Server, through the transformer, and then emitted into another MQTT topic.

First, you’ll need some MQTT command line utilities, namely mosquitto_sub and mosquitto_pub which will be used to subscribe to MQTT topics and publish to them respectively. If you don’t already have them then they can be found at the Eclipse Mosquitto website.

From one terminal window, subscribe to observing all topics of Mosquitto:

mosquitto_sub -v -h localhost -p 1883 -t '#'

From another terminal window, we can now publish some data as if we are the LoRaWAN Network Server:

mosquitto_pub -t mqtt-soilstate-up -m '{"key":23352917,"data":"gFVWZEUAAgAPXkErnCVxx9fZN6IMGBfBKYKrHQ7T4w=="}'

The key will be interpretted by the MQTT gateway as the sensor’s nwkAddr field (in hex, 0x1645655) and the data is the Base64 encoding of a LoRaWAN packet as received by a LoRaWAN packet forwarder.

When published to Mosquitto, you should then see output as follows:

mqtt-soilstate-up {"key":23352917,"data":"gFVWZEUAAgAPXkErnCVxx9fZN6IMGBfBKYKrHQ7T4w=="}
mqtt-soilstate-down {"key":23352917,"data":{"time":"2018-08-10T02:41:09.111Z","nwkAddr":23352917,"temperature":24.9,"moisturePercentage":20.2}}

This shows us the message that we published the mqtt-soilstate-up Mosquitto topic. It also shows us what was published to the mqtt-soilstate-down, which is the output from the transformer. Here’s a flow of what just happened:

mqtt-soilstate-up --> soilstate-data-up-mac-payload -+
                                                     |
                                                     v
                                                 transform
                                                     |
                                                     |
    mqtt-soilstate-down <-- soilstate-data-up-json <-+

… where soilstate-data-up-mac-payload and soilstate-data-up-json are our durable queue topics.

Note

If you enabled Jaeger tracing with the transformer as described in an earlier note, you can also use its UI to report on the transformation including how long it took.

Congratulations! You’ve now tested your transformer without leaving the comfort of your regular development environment!

Serving the user interface

Your application is now able to transform LoRaWAN packets into a domain object. The next step is to be able to serve any web assets that your application has to run within a web browser, and to provide the following APIs:

  • authentication - permits the user to login
  • end device events - sensor meta data
  • observations - sensor readings

Let’s first see how HTTP is served:

Java
// In order to access all directives we need an instance where the routes are define.
Routes routes = new Routes();

final Flow<HttpRequest, HttpResponse, NotUsed> routeFlow = routes.createMainRoute(context).flow(system);

HttpServerConfig
        .bindRoute(routeFlow, context)
        .thenApply(serverBindings -> {
            serverBindings.forEach(b -> log.info("Server listening on {}", b));

            return serverBindings;
        })
        .exceptionally(failure -> {
            log.error(failure, "Bind failed, exiting");
            System.exit(1);
            return null;
        });

Akka HTTP provides an embedded, Reactive-Streams-based, fully asynchronous HTTP/1.1 server implemented on top of Akka Streams (also used in the transformer). The code above binds the server to a network interface and configures all HTTP traffic to go through a “route” declaration. This route declaration declares how requests are handled, and is shown in its entirety below:

Java
// Describe the HTTP routes
private static class Routes extends AllDirectives {
    private Route createMainRoute(ApplicationContext context) {

        ActorSystem system = context.system();
        DurableQueue durableQueue = context.durableQueue();
        Storage storage = context.storage();
        UserIdentityService identityService = UserIdentityService.apply(context);

        return concat(
                path(PathMatchers.segment("api").slash("login"), () ->
                        post(() ->
                                entity(Jackson.unmarshaller(Credentials.class), credentials -> {
                                    CompletionStage<String> result = FutureConverters
                                            .toJava(identityService.authenticate(credentials.getUsername(), credentials.getPassword()))
                                            .thenApply(r -> OptionConverters.toJava(r).orElseThrow(RuntimeException::new));
                                    return onComplete(result, maybeResult ->
                                            maybeResult
                                                    .map(this::complete)
                                                    .recover(new PFBuilder<Throwable, Route>()
                                                            .matchAny(ex -> complete(StatusCodes.UNAUTHORIZED, "Bad credentials"))
                                                            .build())
                                                    .get());
                                }))),

                pathPrefix("api", () ->
                        authenticateOAuth2Async("secured api", identityService::verifier, principal -> concat(
                                path("end-devices", () -> concat(

                                        get(() ->
                                                completeOK(
                                                        EndDeviceService.events(
                                                                durableQueue,
                                                                storage,
                                                                Principal.toJava(principal.getSecret()),
                                                                eventsProcessed -> false,
                                                                system)
                                                                .map(t -> ServerSentEventMarshaller.toServerSentEvent(t._1(), t._2()))
                                                                .keepAlive(Duration.of(10, ChronoUnit.SECONDS), ServerSentEvent::heartbeat),
                                                        EventStreamMarshalling.toEventStream())))),
                                path("soilstate", () -> concat(

                                        get(() ->
                                                completeOK(
                                                        SoilstateService.events(
                                                                durableQueue,
                                                                storage,
                                                                Principal.toJava(principal.getSecret()),
                                                                eventsProcessed -> false,
                                                                system)
                                                                .map(t -> ServerSentEventMarshaller.toServerSentEvent(t._1(), t._2()))
                                                                .keepAlive(Duration.of(10, ChronoUnit.SECONDS), ServerSentEvent::heartbeat),
                                                        EventStreamMarshalling.toEventStream()))))))),

                // Explicitly list out all SPA routes so that refreshing works as expected

                pathEndOrSingleSlash(() -> getFromResource("dist/index.html")),

                pathPrefix("end-devices", () -> getFromResource("dist/index.html")),

                getFromResourceDirectory("dist")
        ).seal();
    }
}

Web assets

Breaking this down, the last section declares how web assets (your mobile application) will be served:

Java
// Explicitly list out all SPA routes so that refreshing works as expected

pathEndOrSingleSlash(() -> getFromResource("dist/index.html")),

pathPrefix("end-devices", () -> getFromResource("dist/index.html")),

getFromResourceDirectory("dist")

The first 2 lines declare how certain paths should be handled with respect to serving up index.html. Your application may have an effect on these lines, and you typically have the first line that causes a single slash to serve up the main page. The second line is specific to our Angular application example where end-devices is a path that should always serve up the index page.

The last line will always be present and translates a path into a file at the dist folder. Our convention is that your frontend’s web assets will always be found at the dist folder on the class path. The build file of the project will copy files front frontend/dist into dist on the class path. Therefore, your workflow for building a release must cause frontend/dist to be populated with the web assets that this backend will serve up.

Authentication

Whenever a request to the /api path is made, so long as it is not /api/login, then an OAuth 2.0 token is expected given that the paths are “protected” i.e. your user must be logged in.

Here is the authentication route declaration:

Java
path(PathMatchers.segment("api").slash("login"), () ->
        post(() ->
                entity(Jackson.unmarshaller(Credentials.class), credentials -> {
                    CompletionStage<String> result = FutureConverters
                            .toJava(identityService.authenticate(credentials.getUsername(), credentials.getPassword()))
                            .thenApply(r -> OptionConverters.toJava(r).orElseThrow(RuntimeException::new));
                    return onComplete(result, maybeResult ->
                            maybeResult
                                    .map(this::complete)
                                    .recover(new PFBuilder<Throwable, Route>()
                                            .matchAny(ex -> complete(StatusCodes.UNAUTHORIZED, "Bad credentials"))
                                            .build())
                                    .get());
                }))),

These lines will accept an HTTP POST request that supplies a “credentials” JSON object containing a user name and password:

Java

/** * Required for authentication */ public class Credentials { private final String username; private final String password; @JsonCreator public Credentials( @JsonProperty(value = "username", required = true) String username, @JsonProperty(value = "password", required = true) String password) { this.username = username; this.password = password; } public String getUsername() { return username; } public String getPassword() { return password; } }

The “unmarshaller” will automatically convert the credentials from their JSON form into the Credentials object and then pass them onto the Streambed “identity” service for authentication. A successful authentication will cause an OAuth token to be replied. Any frontend can then supply this token with other requests to /api.

Note

Tokens “expire” and when they do, subsequent requests to /api will reply that a new token is required i.e. the user is required to login again.

Note

All traffic is passed over TLS and so passing credentials is done so in an encrypted manner.

End device events

End device events describe metadata in relation to a sensor. The aim is to return these events to the frontend for those devices that the user is authorized to observe. The route declaration will cause an HTTP GET request to be made on end device events, which will then present a continuous stream of replies, known as Server Sent Events. The following code shows how the end device events route is declared:

Java
path("end-devices", () -> concat(

        get(() ->
                completeOK(
                        EndDeviceService.events(
                                durableQueue,
                                storage,
                                Principal.toJava(principal.getSecret()),
                                eventsProcessed -> false,
                                system)
                                .map(t -> ServerSentEventMarshaller.toServerSentEvent(t._1(), t._2()))
                                .keepAlive(Duration.of(10, ChronoUnit.SECONDS), ServerSentEvent::heartbeat),
                        EventStreamMarshalling.toEventStream())))),

The GET request will cause EndDeviceService.events to be called which, in turn, will return a stream of end device events. The events will then be converted into Server Sent Events and replied to the frontend. We also use a keepAlive so that clients are able to reliably detect whether a connection remains open.

The EndDeviceService is as follows (please read the comment):

Java

/** * Manages services in relation to end devices. */ class EndDeviceService { /** * Snapshots for this service are stored in storage with the following id */ private static final UUID latestPositionsId = UuidOps.v5(EndDeviceService.class, "latestPositions"); /** * Subscribe to the end device events queue returning a source of events and their offsets in the queue. * <p> * To optimize state rebuild time for the UI, we pick up from the latest snapshot, and defer the * decision of whether to save snapshots to a provided function. * <p> * After emitting the latest position for all sensors given the latest snapshot, we then continue to tail * the queue and directly emit events. */ static Source<Tuple2<EndDeviceEvents.Event, Long>, NotUsed> events(DurableQueue durableQueue, Storage storage, Function<String, CompletionStage<Either<Principal.FailureResponse, Principal.SecretRetrieved>>> getSecret, Function<Long, Boolean> shouldSave, ActorSystem system) { ExecutionContext ec = system.dispatcher(); return Source .fromSourceCompletionStage( FutureConverters .toJava(storage.load(LatestDevicePositions.getStateCodec(), latestPositionsId, ec)) .thenApply(maybeSnapshot -> Source.single( maybeSnapshot.isDefined() ? maybeSnapshot.get() : new LatestDevicePositions() )) ) .flatMapConcat(initialState -> Source .from(initialState.getPositions().values()) .map(reading -> new Tuple2<EndDeviceEvents.Event, Long>(reading, initialState.getOffset().orElse(0L))) .concat( durableQueue.source(EndDeviceEvents.EventTopic(), OptionConverters$.MODULE$.toScala(initialState.getOffset())).asJava() .dropWhile(r -> initialState.getOffset().isPresent() && initialState.getOffset().getAsLong() == r.offset()) .via(EndDeviceEvents.tailer(getSecret)) .scan(new Tuple3<>(initialState, Optional.<Tuple2<EndDeviceEvents.Event, Long>>empty(), 0L), (state, eventWithOffset) -> { final LatestDevicePositions latestPositions = state._1(); long eventsProcessed = state._3(); latestPositions.update(eventWithOffset); eventsProcessed++; if (shouldSave.apply(eventsProcessed)) { storage.save( LatestDevicePositions.getStateCodec(), latestPositionsId, latestPositions, ec ); } return new Tuple3<>(latestPositions, Optional.of(eventWithOffset), eventsProcessed); }) .map(Tuple3::_2) .filter(Optional::isPresent) .map(Optional::get) ) ) .mapMaterializedValue(v -> NotUsed.getInstance()); } }

Akka streams is used here to ensure that only what the frontend requires is what the frontend receives. This helps manage the amount of data transferred over the network by keeping it minimal. In the case of our frontend application, we are only ever interested in position updates (the sensor has been moved) and whether it has been removed from the system.

Observations

Sensor observations are served in a very similar way to the meta data of end device events. In our application’s case, we are only ever interested in rendering the latest observations. We therefore reduce the observations that we have down to the latest ones. Your application may have different requirements e.g. it may want the last two weeks of observations so that it can render a graph over time.

Packaging and deploying

Let’s now see how we can package our application, ready for deployment. We package as a Docker image that tends to have an assembly of all of our application’s dependencies in one jar file.

To package:

Maven
mvn package docker:build
sbt
sbt fdpSoilStateTransformer/docker:publishLocal

Let’s suppose the image we’ve built is streambed/fdpsoilstateui:0.5.1-SNAPSHOT. Our goal is now to run the image within the context of the sandbox that we already have running:

docker run \
  --rm \
  --network xdp-sandbox_default \
  streambed/fdpsoilstateui:0.5.1-SNAPSHOT

The xdp-sandbox_default is the network that the sandbox uses.

That’s it! The application should behave as it did before i.e. using mosquitto_pub and friends.