Add-on Builder 2.0 provides capabilities to build modular inputs without writing any code. In this post however, we focus on using an advanced feature of Splunk’s Add-on Builder 2.0 to write custom python while taking advantage of its powerful helper functions.
NB: Future versions of Add-on Builder will obviate the need for some of the techniques mentioned below, most notably techniques in step #6 & step #8.
There is a veritable cornucopia of useful resources for building modular inputs at docs.splunk.com, dev.splunk.com, blogs.splunk.com, and more. This post certainly isn’t meant to replace those. No no, this post will simply walk you through leveraging Splunk Add-on Builder 2.0 to create custom code to query an API.
In this post we will create a modular input using some custom code to have more control while also leveraging Splunk Add-on Builder’s powerful helper functions. We’ll additionally explore some caveats between test mode and final-cut behavior.
If you’re looking for Parts I-III of this blog, they do not strictly exist. Neither did Leonard Parts I-V.
For a backstory, Part I would have used the Add-on Builder feature to Add a data input using a REST API and Part II would have used the feature to add a data input using shell commands. Part III would have therefore described adding a data input by writing your own code. They’re well described in those linked docs though, so we start where those stories lead us by expanding on Part III in this Part IV installment. Kind of A New Hope for our custom code.
You may have seen the post announcing Splunk’s Add-on Builder 2.0. If not, that would be a good pre-read too.
Step 1 – Install Add-on Builder v. 2.0
Step 2 – Read through your API documentation
Step 3 – Create Your Add-On
Step 4 – Create Input
Step 5 – Initialize Parameters
Step 6 – Custom Code Primer: Single Instance Mode
Step 7 – Custom Code Auto Generated
Step 8 – Customizing The Auto Generated Code
Step 9 – Entering test values
Step 10 – Run Test
Step 11 – Save Work
Step 12 – Finish
Step 13 – Restart
Step 14 – Cooking With Gas
Download & install Add-on Builder version 2.0, which we’ll henceforth refer to as AoB
You know what? This should actually be first, so you can decide how to implement this in AoB. The quicker add a data input using a REST API option may be in play. Yet here we are. For this example, we’ll use HackerNews, because I haven’t previously implemented it and it doesn’t require oAuth (I hope to release a Part V to review oAuth before 2017). Here is some documentation about the HackerNews API: https://github.com/HackerNews/API
Reading through it the first time I note we don’t need an API token or access key. I also notice we need to query max number of records each time, as text. The data itself will be returned in json format. We’ll want to use checkpointing to see how far we read in previous queries to see if we need to read more, etc. Fortunately, the AoB custom python option provides us helper functions to do these things, as we’ll see later in this example.
Click “Create an add-on”
Fill in Add-on details and click Create
Now we’ll define the input. Follow the workflow to create a custom Python input (Configure Data Collection -> Add Data -> Modular Input Code)
Now we need to specify the data input properties. NB: Default collection interval is 30 seconds, I adjusted to 300.
Next, data input variables are defined.
These are the per-instance variables configured in settings -> data inputs. There will be one of these for each user-configured input.
Generally this would be where user-specific information lives (e.g. API tokens, etc).
As this is a simple example, we will simply use “number of records to query each time” as a way to demo.
Finally, we’ll define the Add-on Setup Parameters. These are the global input parameters defined in the Add-on’s setup page. These will be the universal settings available to all inputs configured using this modular input.
In this example, we’ll specify an API base URI and the API version.
Modular inputs have a lot of plasticity. One such flexibility is that they can execute in single or multiple instance modes.
AoB 2.0 ‘my custom python’ feature always leverages single instance mode. It is statically defined in supporting-code that is automatically generated for you. It is not recommended to modify that code as it is re-generated each time you save your custom code in step 8.
It is mentioned here so that you understand that single instance mode only runs your custom code once for ALL its defined inputs. This means if you have three inputs, say foo, bar, and baz (each having their own stanzas in inputs.conf), your custom code will need to embed its logic within a loop that iterates over each stanza.
Don’t worry, we’ll solve for that in step 8 in an explicit example.
To further understand this topic, you may read Splunk’s mod input documentation that reviews single & multiple instance modes of a script.
NB: There are plans to make this easier in a future AoB version, this post is specifically written with AoB 2.0 in mind.
This is the code that is generated automatically. Notice all the guidance provided that is commented out, just ready for you to un-comment and use.
Review it here or in your browser, and skip to step 8.
# encoding = utf-8 import os
import sys
import time
import datetime
IMPORTANT
Edit only the validate_input and collect_events functions.
Do not edit any other part in this file.
This file is generated only once when creating
the modular input.
def validate_input(helper, definition):
“””Implement your own validation logic to validate the input stanza configurations”””
# This example accesses the modular input variable
# query_max = definition.parameters.get(‘query_max’, None)
pass
def collect_events(helper, inputs, ew):
“””Implement your data collection logic here”””
# The following example accesses the configurations and arguments
# Get the arguments of this input
# opt_query_max = helper.get_arg(‘query_max’)
# Get options from setup page configuration
# Get the loglevel from the setup page
# loglevel = helper.get_log_level()
# Proxy setting configuration
# proxy_settings = helper.get_proxy()
# User credentials
# account = helper.get_user_credential(“username”)
# Global variable configuration
# global_api_uri_base = helper.get_global_setting(“api_uri_base”)
# global_api_version = helper.get_global_setting(“api_version”)
# Write to the log for this modular input
# helper.log_error(“log message”)
# helper.log_info(“log message”)
# helper.log_debug(“log message”)
# Set the log level for this modular input
# helper.set_log_level(‘debug’)
# helper.set_log_level(‘info’)
# helper.set_log_level(‘warning’)
# helper.set_log_level(‘error’)
# helper function to send http request
# response = helper.send_http_request(url, method, parameters=None, payload=None,
# headers=None, cookies=None, verify=True, cert=None, timeout=None, use_proxy=True)
# get the response headers
# r_headers = response.headers
# get the response body as text
# r_text = response.text
# get response body as json. If the body text is not a json string, raise a ValueError
# r_json = response.json()
# get response cookies
# r_cookies = response.cookies
# get redirect history
# historical_responses = response.history
# get response status code
# r_status = response.status_code
# check the response status, if the status is not sucessful, raise requests.HTTPError
# response.raise_for_status()
#
# checkpoint related helper functions
# save checkpoint
# helper.save_check_point(key, state)
# delete checkpoint
# helper.delete_check_point(key)
# get checkpoint
# state = helper.get_check_point(key)
#
”’
# The following example writes a random number as an event
import random
data = str(random.randint(0,100))
event = helper.new_event(source=helper.get_input_name(), index=helper.get_output_index(), sourcetype=helper.get_sourcetype(), data=data)
try:
ew.write_event(event)
except Exception as e:
raise e
”’
|
Here we update auto generated code with our logic.
This is just a quick example so I skipped many important elements of a prod solution including (but not limited to) validation of inputs, logging verbosity flexibility, error handling opportunities, etc.
# encoding = utf-8import osimport sysimport timeimport datetime''' IMPORTANT Edit only the validate_input and collect_events functions. Do not edit any other part in this file. This file is generated only once when creating the modular input.'''def validate_input(helper, definition): """Implement your own validation logic to validate the input stanza configurations""" # This example accesses the modular input variable # query_max = definition.parameters.get('query_max', None) passdef collect_events(helper, inputs, ew): # We import json library for use in massaging data before writing the event import json # Return all the stanzas (per step #6) stanzas = helper.input_stanzas # Iterate through each defined Stanza (per step #6) # NB: I only ident this with two spaces so I don't have to re-ident everything else for stanza in stanzas: # Another two-space identation keeps all the "give-me" code from step #7 in-play # without more indenting exercises helper.log_info('current stanza is: {}'.format(stanza)) """Implement your data collection logic here""" # The following example accesses the args per defined input opt_query_max = helper.get_arg('query_max') # Test mode will yield single instance value, but once deployed, # args are returned in dictionary so we take either one if type(opt_query_max) == dict: opt_query_max = int(opt_query_max[stanza]) else: opt_query_max = int(opt_query_max) # Fetch global variable configuration (add-on setup page vars) # same as above regarding dictionary check global_api_uri_base = helper.get_global_setting("api_uri_base") if type(global_api_uri_base) == dict: global_api_uri_base = global_api_uri_base[stanza] global_api_version = helper.get_global_setting("api_version") if type(global_api_version) == dict: global_api_version = global_api_version[stanza] # now we construct the actual URI from those global vars api_uri = '/'.join([global_api_uri_base, 'v' + global_api_version]) helper.log_info('api uri: {}'.format(api_uri)) # set method & define url for initial API query method = 'GET' url = '/'.join([api_uri, 'maxitem.json?print=pretty']) # submit query response = helper.send_http_request(url, method, parameters=None, payload=None, headers=None, cookies=None, verify=True, cert=None, timeout=None, use_proxy=True) # store total number of entries available from API num_entries = int(response.text) helper.log_info('number of entries available: {}'.format(num_entries)) # get checkpoint or make one up if it doesn't exist state = helper.get_check_point('stanza' + '_max_id') if not state: # get some backlog if it doesn't exist by multiplying number of queries by 10 # and subtracting from total number of entries available state = num_entries - (10 * opt_query_max) if state < 0: state = 0 helper.log_info('fetched checkpoint value for {}_max_id: {}'.format(stanza, state)) # Start a loop to grab up to number of queries per invocation without # exceeding number of entries available count = 0 while (count < opt_query_max) or (count + state > num_entries): helper.log_info('while loop using count: {}, opt_query_max: {}, state: {}, and num_entries: {}'.format(count, opt_query_max, state, num_entries)) count += 1 # update url to examine actual record instead of getting number of entries url = '/'.join([api_uri, 'item', str(state + count) + '.json?print=pretty']) response = helper.send_http_request(url, method, parameters=None, payload=None, headers=None, cookies=None, verify=True, cert=None, timeout=None, use_proxy=True) # store result as python dictionary r_json = response.json() # massage epoch to a human readable datetime and stash it in key named the same if r_json['time']: r_json['datetime'] = datetime.datetime.fromtimestamp(r_json['time']).strftime('%Y-%m-%d %H:%M:%S') helper.log_info('item {} is: {}'.format(state + count, r_json)) # format python dict to json proper data = json.dumps(r_json) # similar to getting args for input instance, find sourcetype & index # regardless of if we're in test mode (single value) or running as input (dict of values) st = helper.get_sourcetype() if type(st) == dict: st = st[stanza] idx = helper.get_output_index() if type(idx) == dict: idx = idx[stanza] # write event to index if all goes well # NB: source is modified to reflect input instance in addition to input type event = helper.new_event(source=helper.get_input_name() + ':' + stanza, index=idx, sourcetype=st, data=data) try: ew.write_event(event) # assuming everything went well, increment checkpoint value by 1 state += 1 except Exception as e: raise e # write new checkpoint value helper.log_info('saving check point for stanza {} @ {}'.format(stanza + '_max_id', state)) helper.save_check_point('stanza' + '_max_id', state) |
Now that we’ve copied pasta and modified for our own purposes, we can test!
Be sure to update the forms on the tabs Data input Definition & Add-on Setup Parameters to the left of the Code Editor to make sure test mode has parameters with which to work.
I suppose this could have been included in step #9. Run the test once you’ve entered the parameters on both tabs per step #9 by clicking on that Test button.
In AoB 2.0, events successfully written to Splunk will be displayed in Output window on the right hand side.
A log for your mod input instance will be in $SPLUNK_HOME/var/log/splunk/<ta_name>_<input_name>.log
In my example, it lives at
/opt/splunk/var/log/splunk/ta_hackernews_hackernews.log
Click the Save button
Click the Finish button
Re-start Splunk if you haven’t been prompted to do so by now
Your custom code is setup. It is considered a good practice to create a sandbox index for which to test your new add-on. You can keep tweaking it via AoB until you get everything as you want, cleaning the sandbox index as needed, before validating & packaging (using AoB features, of course)
If you get stuck or have challenges, check the AoB Docs and explore Splunk Answers for Add-on Builder. If you don’t find what you need there, post a new question (be sure to tag it with “Splunk Add-on Builder”)
----------------------------------------------------
Thanks!
Ashok Sankar
The Splunk platform removes the barriers between data and action, empowering observability, IT and security teams to ensure their organizations are secure, resilient and innovative.
Founded in 2003, Splunk is a global company — with over 7,500 employees, Splunkers have received over 1,020 patents to date and availability in 21 regions around the world — and offers an open, extensible data platform that supports shared data across any environment so that all teams in an organization can get end-to-end visibility, with context, for every interaction and business process. Build a strong data foundation with Splunk.