Splunk is committed to using inclusive and unbiased language. This blog post might contain terminology that we no longer use. For more information on our updated terminology and our stance on biased language, please visit our blog post. We appreciate your understanding as we work towards making our community more inclusive for everyone.
In our previous blog post, we introduced the concept of event-driven architecture (EDA) and provided a high-level reference architecture for an EDA built using the core technologies within the Streamlio platform.
Figure 1: Streamlio EDA Reference Architecture
In this blog post, we will take a deep dive into simple event processors and the capabilities provided by the Streamlio platform to support them.
We have already established that the defining feature of an event-driven architecture is the central importance of events. Within an EDA, event consumers react to the arrival of events following a programming style referred to as event-based-programming (EBP). In contrast to batch-oriented or procedural programming, in EBP the software system executes processing in response to receiving one or more event notifications, and communicates with other software components entirely via events in an asynchronous manner.
While not all event-based applications are the same, they typically follow the structure shown in the figure below, with event producers that introduce events into an event processing framework in the middle that is responsible for persisting and delivering the events to the event consumers.
Figure 2: Event-Based Architecture
In addition to its event-routing responsibilities, the intermediate event processing framework also hosts the components that we will refer to as event processors. Event processors ingest events and can forward events or emit new ones, so they are in some sense both event consumers and producers. However, we won’t refer to these event processors as either event producers or event consumers as we wish to distinguish them from entities that live outside of the event processing framework.
Within an EDA, event processors generally fall into one of the following categories:
As one can imagine, an event-based application typically consists of a number of event processors arranged in a specific sequence or flow. We refer to this collection of event producers, event processors, and event consumers as an event processing network. These event processing networks are composed to solve one or more specific business problems.
As you can see below in Figure 3, the external event producers and consumers are at the edges, with several event processors in between. The lines show the flow of the events between the event processors. They are referred to as implicit channels and are used to push events directly from one event processor to another. When implemented with Apache Pulsar, these implicit channels are implemented as topics.
There is also another method of inter-event-processor communication depicted in Figure 3: shared state management. It is not uncommon for an event processor to need to retain computational state across multiple events. The event processing framework needs to provide a mechanism for persisting this state and allowing it to be accessed by event processors directly. This shared state facility provides another mechanism for sharing information between event processors and enables stateful event processing, which we will discuss in the next section.
Figure 3: Event Processing Network
Note that multiple event-based applications can be associated with a single event type. In the above diagram, we depict the chaining of one event-based application (in blue) to a different application. The output of the first application is fed to both event consumer 1 as well as to another event processor for further processing before being sent to event consumer 2.
There are a variety of reasons why you would want to chain event-based applications together. For instance, consider a scenario in which you are monitoring an IoT sensor reading for patterns or abnormalities, while at the same time you wish to retain these events in long-term storage (e.g. HDFS or Amazon S3) for your data science team to use for model training.
The first sequence of event processors would handle the ETL-type processing of the events to transform them into a consumable format. Those records would be sent to event consumer 1, which in this case would be HDFS. At the same time, we also want to forward these cleansed events to our secondary sequence of processors that implements the anomaly detection workflow. In the next section we will explore using Apache Pulsar Functions as a framework for implementing event-based processing using simple programming logic functions.
Apache Pulsar Functions provides an easy-to-use framework that developers can use to create and deploy processing logic that is executed by the Apache Pulsar messaging system. With Pulsar Functions, you can write functions of any level of complexity in Java or Python and deploy them to a Pulsar cluster without needing to run a separate stream processing engine. Pulsar Functions are lightweight compute processes that:
Figure 4: The Pulsar Functions Programming Model
Pulsar Functions can be written in Java and Python. In both Java and Python you have two options for writing a Function:
import java.util.Function; public class EchoFunction implements Function<String, String> { public String apply(String input) { // Logic Here } }
import org.apache.pulsar.functions.api.Context; import org.apache.pulsar.functions.api.Function; public interface Function<I, O> { O process(I input, Context context) throws Exception; }
As you can see, the language-native approach provides a clean, API-free way of writing Pulsar Functions, and is ideal for the development of stateless event processors. However, the trade-off for this simplicity is the lack of access to any previous state information.
After you have compiled and tested your Pulsar Functions, you will need to deploy them to a Pulsar cluster. Pulsar Functions were built to support multiple deployment scenarios. Currently there are two ways to run Pulsar Functions:
Local Run Mode: If you run a Pulsar Function in this mode, it will run on the machine from which the command was issued, (e.g. your laptop, or an edge node). Here’s an example local run command:
$ bin/pulsar-admin functions localrun \ —py myfunc.py \ —className myfunc.SomeFunction —inputs input-topic-1 —outputs output-topic-1
Cluster Mode: When you run a Pulsar Function in cluster mode, the function code will be uploaded to a broker in a Pulsar cluster and run alongside the broker rather than in your local environment. You can run a function in cluster mode using the create command as shown below, executed on the Pulsar broker node.
$ bin/pulsar-admin functions create \ —jar target/my-functions.jar \ —className org.example.functions.MyFunction \ —inputs input-topic-1 \ —outputs output-topic-1 \ —parallelism 4 \ —cpu 2 \ —ram 8589934592 \ —disk 10737418240
The command above will lunch 4 instances of the org.example.functions.MyFunction class on the Pulsar Broker, each with 2 CPU cores, 8GB of RAM, and 10GB of disk space. (Note that the RAM and disk settings are specified in bytes and the CPU and Disk settings are only enforced in Docker environments.)
Lastly, there is also a way to provide user configuration properties when creating a Pulsar Function, which is useful when you need to reuse a function. We pass in a collection of key-value pairs in the command below by specifying a JSON string for the —userConfig property. These values will be accessible at runtime via the Context object for Pulsar Functions that utilize the Pulsar Functions SDK, which we will examine in the next section.
$ bin/pulsar-admin functions create \ —jar target/my-functions.jar \ —className org.example.functions.MyFunction \ —inputs input-topic-1 \ —outputs output-topic-1 \ —parallelism 4 \ —cpu 2 \ —ram 8589934592 \ —disk 10737418240 \ —userConfig ‘{“key-1”: “value-1”, “key-2”, “value-2”}’
The additional Context object defined in both the Java and Python SDKs provides a wide variety of information and functionality to the function, including the ability to persist intermediate results that can be used to provide stateful event processing. The following is a sample of the information contained within the context object:
In this section we will cover on a few usage patterns that leverage these features of the Context object.
When you run or update Pulsar Functions created using the SDK, you can pass arbitrary key/values to them via the command line with the –userConfig flag. Key/values must be specified as JSON. Here’s an example of a function creation command that passes a user config key/value to a function:
$ bin/pulsar-admin functions create \ —name word-filter \ —userConfig ‘{“filter”, “$.Sensors{?(@.Type==‘Temp’)]”}’ \ # Other function configs
This feature allows us to write generic functions that can be used multiple times, but with a slightly different configuration. For instance, let’s say you want to write a function that filters JSON events based on a JSON path expression. When an event arrives, the contents are compared to the configured expression, and those entries that don’t match are filtered out.
Obviously the behavior of this function depends entirely upon the JSON path expression it is filtering on. In order to make it more reusable, with the Pulsar SDK we can defer specifying this value until it is deployed.
As we can see in the above example, the value of the JSON path filter to use isn’t known at compile time, and is instead pulled from the context using the getUserConfigValueOrDefault method.
import org.apache.pulsar.functions.api.Context; import org.apache.pulsar.functions.api.Function; import com.jayway.jsonpath.JsonPath; public JsonPathFilterFunction implements Function<String, String> { String process(String input, Context context) throws Exception { // Get the filter from the context String filter = context.getUserConfigValueOrDefault(“filter”, “$”) .toString(); Object filtered = JsonPath.read(input, filter); Return filtered.toString(); } }
Stateful event processors require a memory of prior events in order to generate their output. The ability to store state is a key building block for processing multiple events. Within the Apache Pulsar Function framework this state information is stored in a dedicated key-value store built on top of Apache BookKeeper. The Pulsar SDK provides access to the state information via the Context object.
Figure 5: Apache Pulsar State Management
Let’s look at a simple example to illustrate the idea of a stateful agent. Suppose we have an application that receives temperature reading events from an IoT sensor, and we are interested in knowing the average temperature of the sensor. We can use an event processing agent to continuously recompute this value using the following function.
import org.apache.pulsar.functions.api.Context; import org.apache.pulsar.functions.api.Function; public AvgTempFunction implements Function<Float, Float> { Float process(Float currentTemp, Context context) { // Increment and get counter context.incrCounter(“num-measurements”); Integer n = context.getCounter(“num-measurements”); // Calculate new average based on old average and count Float old_average = context.getState(“avg-temp”); Float new_average = (old_average * (n-1) + currentTemp) / n; context.putState(“avg-temp”, new_average); return new_average; } }
Pulsar Functions can publish results to one or more output topics, but this isn’t required. It is also possible to have functions that simply produce a log, write results to an external database, or just monitor the stream for anomalies. The following is a function that simply logs each event it receives.
import org.apache.pulsar.functions.api.Context; import org.apache.pulsar.functions.api.Function; import org.slf4j.Logger; public LogFunction implements Function<String, Void> { Void process(String input, Context context) throws Exception { Logger LOG = context.getLogger(); LOG.info("Received {}”, input); return null; } }
When using Java functions in which the return output type is Void, the function must always return null. For functions that don’t have an output type of Void, you can return null when you don’t want to produce an output event, e.g. when you are applying a filter and don’t want an event to be processed.
As we saw in Figure 4, Pulsar Functions can consume events from multiple topics, so let’s take a look at how one goes about writing a function that can do this:
import org.apache.pulsar.functions.api.Context; import org.apache.pulsar.functions.api.Function; public MultiTopicFunction implements Function<String, String> { String process(String input, Context context) throws Exception { String sourceTopic = context.getSourceTopic(); if (sourceTopic.equals(“TopicA”) { // parse as TopicA Object } else if (sourceTopic.equals(“TopicB”) { // parse as Topic B Object } else if (sourceTopic.equals(“TopicC”) { // parse as Topic C Object } …. } }
As you can see from the code example, we first get the name of the input topic from the Context object itself, and then based upon that information parse/handle the event accordingly.
The Apache Pulsar SDK provides a metrics collection mechanism that you can use to record any user-defined metrics you choose. In the following example, we keep separate metrics to track the total number of times the function was called, and another to track the number of times it was called with invalid input. For instructions on reading and using metrics, see the Monitoring guide.
import org.apache.pulsar.functions.api.Context; import org.apache.pulsar.functions.api.Function; public MetricFunction implements Function<Integer, Void> { Void process(String input, Context context) throws Exception { context.recordMetric(“invocation count”, 1); if (input < 0) { context.recordMetric(“Invalid data”, 1); } return null; } }
In this blog post, we took a deep dive into the simple event processing capabilities provided by the Streamlio platform based on Apache Pulsar Functions.
We introduced the concept of event based programming, and examined how many individual functions can be interconnected to form event processing networks. Lastly, we covered the Apache Pulsar Functions SDK and some of the best practices for utilizing the state management features provided by the Functions SDK.
----------------------------------------------------
Thanks!
David Kjerrumgaard
The Splunk platform removes the barriers between data and action, empowering observability, IT and security teams to ensure their organizations are secure, resilient and innovative.
Founded in 2003, Splunk is a global company — with over 7,500 employees, Splunkers have received over 1,020 patents to date and availability in 21 regions around the world — and offers an open, extensible data platform that supports shared data across any environment so that all teams in an organization can get end-to-end visibility, with context, for every interaction and business process. Build a strong data foundation with Splunk.