Event Processing with Riemann
deRiemann is a stream processing system that is excellent for collecting and processing events and logs from servers and systems. We successfully use Riemann in production in very large IT systems as a core component for log and metric processing and for monitoring.
We use Riemann to process events and forward them to long-term storage systems like Elasticsearch, where these events can be conveniently searched through user interfaces like Kibana. Additionally, we use Riemann to write metrics to time-series databases like InfluxDB; these time series can then be visualized through user interfaces like Grafana or can also be used to alert on errors and problems.
Today we‘ll show with an application example how to extract metrics from logs using Riemann.
Riemann
Riemann is written in Clojure and can also be configured and programmed in Clojure. In fact, the configuration of a Riemann instance is a Clojure program that controls event processing using a powerful and efficient stream processing language.
Riemann enables sending events through streams that can filter, modify, combine, aggregate, and project these events. There are dozens of built-in streams and it‘s very easy to write custom streams, since a stream is simply a function that accepts an event.
Events
Events are represented in Riemann as maps, i.e., as a collection of key-value pairs, like for example this event that represents a log line of a request to a web server:
(def request-event
{:timestamp 1663577804126
:method "GET"
:request "/index.html"
:requestor "192.168.1.23"
:transaction "uid-82a9dda829"
:service "webserver-request"
:host "192.168.1.1"
:time 1})
We bind the event to the name request-event so we can use it
for tests. Riemann defines some special fields like
:host and :service that appear in all events; otherwise there are
no restrictions on which fields and values an event can contain.
Streams
As already mentioned, a stream in Riemann is a function that accepts an event. In Riemann, „stream“ doesn‘t mean a sequence of data elements, which is what this term usually means in most contexts. Perhaps „event processor“ would be a better name for what a stream actually is in Riemann: a callback that accepts events and in turn passes these events on to other event processor callbacks (and has no return value). However, we‘ll continue to use the term stream, as this is the common designation in Riemann.
A simple stream is, for example, a where stream, which is built into Riemann:
(where <condition> <stream>)
A where stream is itself an event processor that accepts an event,
checks if the specified condition is true, and then calls the event processor
<stream> with the event.
With this we build a stream that separates log events from metrics: log events should be forwarded to an Elasticsearch server for long-term storage and metrics to an InfluxDB time-series database.
(streams
(where
(metric nil)
elasticsearch-stream
(else
influxdb-stream)))
The where stream checks if flowing events match the predicate (metric
nil). The predicate (metric nil) means that an event doesn‘t contain a
:metric field, which in our example means that an event is not a
metric. (Riemann‘s event taxonomy assumes that metrics are distinguished from
other event types by the existence of the :metric field.) In
this case, the event flows on to an elasticsearch-stream that
finally sends it to the Elasticsearch server. In the else branch there are
only metrics left, which Riemann then forwards to InfluxDB
via the influxdb-stream.
The definitions of elasticsearch-stream and influxdb-stream look like this:
(def elasticsearch-stream
(tap :elasticsearch-stream elasticsearch))
(def influxdb-stream
(tap :influxdb-stream influxdb))
The tap in each of these definitions is a construct for
intercepting events for test cases. We‘ll make use of this shortly.
The actual definitions of elasticsearch and influxdb would exceed the scope of this
blog article, so we‘ll simply rely on wishful thinking and assume
that they do what they‘re supposed to: forward the incoming events to
Elasticsearch and InfluxDB respectively. For those particularly
interested, we use the implementation from
our Riemann library
active-riemann for these streams.
Tests
Unit tests are indispensable; we also need to test our stream definition.
Riemann brings support for unit tests; we can run tests with riemann
test on the command line. But we first need to write the tests. The first test ensures that events that are not metrics
only end up in Elasticsearch:
(deftest event-test
(let [actual (inject! [request-event])]
(is (= [request-event] (:elasticsearch-stream actual)))
(is (empty? (:influxdb-stream actual)))))
The inject! function only exists to feed the streams in the context of unit tests. It sends a list of events into the defined streams. Here
there‘s only one event in the list, the request-event defined above. The
return value of inject! contains all events that have passed by the
program points we
marked with tap. Since we‘re listening to our stream to
Elasticsearch with :elasticsearch-stream, we expect that we‘ll
find exactly this one event there, because our stream definition has
routed it there. And since we didn‘t inject a metric, we expect that
no events have passed by :influxdb-stream, so this list
must be empty, i.e., match the empty? predicate.
For the next test we want to inject a metric, so we create an
example metric. For this we first build a helper function in which we
abstract the appearance of our metric. The metric has a :label field for the
label and a :metric field for the metric value, which we can
pass to the helper function:
(defn make-webserver-transaction-metric
[duration]
{:label "webserver-transaction-duration-milliseconds"
:metric duration})
Then we generate our example metric with the value 116:
(def example-metric (make-webserver-transaction-metric 116))
For the next test case we inject this example metric:
(deftest metric-test
(let [actual (inject! [example-metric])]
(is (empty? (:elasticsearch-stream actual)))
(is (= [example-metric] (:influxdb-stream actual)))))
Here we expect that no event has passed by :elasticsearch-stream,
but the example metric appears at :influxdb-stream. All tests
pass.
Web Server Logs
For our application example we assume that logs are generated on an imaginary web server. The requests make it into our Riemann system in the format described above.1 In addition to the requests, the web server also logs the delivery of the response. An event for this looks like this:
(def reply-event
{:timestamp 1663577804242
:transaction "uid-82a9dda829"
:service "webserver-reply"
:host "192.168.1.1"})
The web server links request and response with a unique
transaction ID in the :transaction field.
Determining Transaction Time
For our use case we‘re interested in how long the web server takes to
answer a request. We want to measure and record this metric in order to
monitor the web server‘s performance. This means we need to determine the
elapsed time between the events of the request and response of a
transaction and write it to our InfluxDB for storage, later visualization, or
possibly also for alerts. If we subtract the
timestamps from the :timestamp fields from each other, we get the
transaction time in milliseconds. For our example this results in a
transaction time of 116 milliseconds.
The crucial part is that we only calculate the distance of the associated log entries, so we need to find the log entries that have the same transaction ID. Specifically, this means we need to remember the requests until we see the matching response, i.e., the response with the same transaction ID as the request. Remembering something means state. Riemann brings support for storing state in the form of an index.
Index
The index is a storage for events built into Riemann. Riemann stores
the most recently seen event for each tuple of :host and :service.
We can query these events from the index, and the events in the index also
have an expiration date: the :ttl field sets the „time-to-live“ for an event in the
index in seconds. And we can specify at what intervals
expired events should be deleted. Riemann not only deletes these events,
but also notifies about these expired events—we can also use this information
meaningfully for our goal of determining transaction time.
We initialize our index as follows:
(def default-ttl 60)
(def index
(do
(periodically-expire (/ default-ttl 10))
(index)))
(def index-stream
(sdo
(tap :index identity)
(default :ttl default-ttl index)))
First we define our default expiration time default-ttl as 60
seconds; we bind a new index to the name index and define with
periodically-expire that Riemann should check for expired events
every six seconds. Then we create a new index with the call to the Riemann function (index). We wrap the new index in a stream with the name
index-stream. More precisely, our index-stream is an sdo composition
of two streams: sdo is do for streams, so events flow into all
streams in the body. Here an incoming event passes through the streams (tap
:index identity) and (default :ttl default-ttl index):
-
The stream
(tap :index identity)is only there so we can listen to our index stream for our unit tests. -
The stream
(default :ttl default-ttl index)provides each event that doesn‘t yet have a:ttlfield with a default value for the:ttlfield, namely with the value of our defined default lifetime. Subsequently, these events are then passed to the index to store them there.
Writing to the Index
We need a stream that writes a request event to the index. We
call the stream store-requests-stream:
(def store-requests-stream
(smap (fn [request]
(clojure.set/rename-keys request {:transaction :service}))
index-stream))
The stream uses an smap stream to apply the specified function to each
event — i.e., to each request — and then forward the result into the
index-stream. The function that smap applies transforms a
request so that the index can store it correctly: as mentioned above,
the index stores one event per tuple of :host and :service. In
order to really store all events, we simply rename the
:transaction field to :service in the request and thus have the unique transaction ID
as part of the key in the index. The original value of :service
doesn‘t interest us further. An indexed event looks like this, for example:
(def request-event
{:timestamp 1663577804126
:method "GET"
:request "/index.html"
:requestor "192.168.1.23"
:service "uid-82a9dda829"
:host "192.168.1.1"
:time 1})
Reading from the Index
When response events arrive, we need to read associated requests from the index.
We call this stream lookup-requests-and-calculate-metric-stream; the
stream expects response events:
(def lookup-requests-and-calculate-metric-stream
(smap (fn [reply]
(when-let [request (riemann.index/lookup index
(:host reply)
(:transaction reply))]
(riemann.index/delete index request)
(make-webserver-transaction-metric (- (:timestamp reply)
(:timestamp request)))))
reinject))
This stream also uses an smap stream to call a function on each
response event and then reinject the result back into our streams via the reinject stream,
i.e., push it back into the front of our streams.
In the body of this function several things happen: riemann.index/lookup tries to find a possibly stored
request based on the :host and
:transaction fields of the response. If this succeeds, the function removes
the found event from the index so that it can‘t expire later — after all, it‘s
already been processed. The result of this function is a metric
that we create with make-webserver-transaction-metric and which calculates the
difference of the timestamps between request and response as its value. If a
matching request is not in the index, for example because we started our program
after this request would have arrived, the function returns nil. This
then has the intended effect that this nil is not reinjected,
since smap doesn‘t pass the value nil on to the streams.
Keeping the Index Clean
In order for requests to not accumulate for which the web server — for whatever
reason — never sent out a response, we‘ve given each request
in the index a maximum lifetime. When Riemann removes such
expired requests from the index, it reinjects information
about this into the streams, in the form of the request, but additionally
marked with a :state field with the value expired. With the predicate (state "expired")
Riemann can filter these events—and we can use this information.
In case of such a timeout we want to output a metric whose value should be very
large, i.e., Double/MAX_VALUE in our number space. We call the
metric timeout-metric:
(def timeout-metric
(make-webserver-transaction-metric Double/MAX_VALUE))
We call the stream for this emit-timeout-metric-stream:
(def emit-timeout-metric-stream
(smap (constantly timeout-metric) reinject))
The smap stream reinjects a metric with Double/MAX_VALUE into the streams
for each incoming value, regardless of the
type of value.
Don‘t Cross the Streams
With this we have all the components together; now we can supplement our original stream with a few switches to achieve our desired functionality:
(streams
(where
(metric nil)
(sdo
(where (not (state "expired")) elasticsearch-stream)
(split
(service "webserver-request") store-requests-stream
(service "webserver-reply") lookup-replies-and-calculate-metric-stream
(state "expired") emit-timeout-metric-stream))
(else
influxdb-stream)))
The case for metrics remains unchanged. The case for events that are not a metric
becomes more complicated: the sdo marks that the events should be
passed on to multiple streams. First is the stream that forwards to Elasticsearch.
However, we don‘t want to see the information about the expired
requests there, so we filter them out with (not (state "expired")).
Otherwise all requests and all responses should end up in Elasticsearch.
The next stream implements the calculation of the transaction metrics. We
split the event stream in a split stream into requests with the predicate
(service "webserver-request"), into responses with the predicate (service
"webserver-reply"), and into expired events with the predicate (state
"expired"):
-
The requests end up in the index via the
store-requests-streamdefined above. -
The responses lead to metrics via
lookup-replies-and-calculate-metric-streamand the requests previously stored in the index. -
The expired requests lead to metrics that mark the timeout.
„Why not?“ — „That would be bad!“
Two unit tests should secure our implementation. The first tests a complete transaction consisting of request and response:
(deftest transaction-test
(let [actual (inject! [request-event reply-event])]
(is (= [request-event reply-event]
(:elasticsearch-stream actual)))
(is (= [indexed-event]
(:index actual)))
(is (= [example-metric]
(:influxdb-stream actual)))))
The test injects our example events for request request-event and response
reply-event. Our expectations:
- Both events are sent to Elasticsearch.
- The request ends up in modified form as indexed event
indexed-eventin the index. - The calculated metric
example-metricreaches InfluxDB.
The second test verifies the behavior in the timeout case:
(deftest transaction-expired-test
(let [actual (do
(inject! [request-event])
(riemann.time.controlled/advance! (* 2 default-ttl))
(inject! [reply-event]))]
(is (= [request-event reply-event]
(:elasticsearch-stream actual)))
(is (= [indexed-event]
(:index actual)))
(is (= [timeout-metric]
(:influxdb-stream actual)))))
Here we first inject only the request request-event and then
advance the clock for this test with riemann.time.controlled/advance! so far forward that
the request in the index has definitely expired. Only then do we send the
response reply-event, which then finds no matching request. Consequently,
we see both request and response event in Elasticsearch, and also
the indexed request event passed through the index. However, the only metric
that made it to InfluxDB is the timeout metric timeout-metric.
In addition to the behavior in the timeout case, this unit test even covers the
behavior in the case where no request is present in the index
for a response event.
With this we‘ve tested all branches of our streams and achieved our goal: we‘ve derived metrics from incoming events with Riemann.
Conclusion
Riemann is performant and efficient in execution - our systems running in production process several thousand events per second despite large and complicated stream logic.
And Riemann is elegant to program: with the extensive stream processing language, typical tasks in event processing like filtering, enriching, combining, aggregating, and projecting can be easily accomplished. And for all other tasks we can flexibly extend the stream processing language.