Boofuzz源码解析

boofuzz源码分析

ftp_simple.py为例:

1
2
3
4
5
session = Session(target=Target(connection=TCPSocketConnection("127.0.0.1", 21)))  # 创建session

define_proto(session=session) # 定义proto

session.fuzz() # 开始模糊测试

其中,define_proto() 将调用session.connect()构造调用流图。如果connect(node1)的参数只有只有一个,那么该节点将保存在self.nodes变量中,同时会创建self.root到该节点的边;如果connect(node1, node2),node1和node2节点信息都将保存在self.nodes变量中,同时会创建node1到node2的边

1
2
3
4
session.connect(user)
session.connect(user, passw)
session.connect(passw, stor)
session.connect(passw, retr)

session.fuzz()核心代码在_main_fuzz_loop

_main_fuzz_loop中:

self.server_init():启动web_interface_thread(网络接口线程:默认为localhost:26000)

self._start_target():启动一个boofuzz.sessions.Target实例

相关变量

变量名 含义
self.num_cases_actually_fuzzed 实际fuzz的测试用例数
mutation_context 变异上下文
fuzz_case_iterator fuzz测试用例迭代器

遍历fuzz_case_iterator中的mutation_context

  • 如果设置了restart_interval,表明每次运行restart_interval个测试用例后重启target

  • _fuzz_current_case()函数对当前的测试用例进行模糊测试,而该测试用例由fuzz_case_iterator控制:

    • _pause_if_pause_flag_is_set()函数pause flag是否被唤起,如果被唤起,则进入一个无限循环中等待其变为False(Why?)

    • self._test_case_name()函数用于构造测试用例名,测试用例名字格式为:message_path:[qualified_name1:mutation.index1, qualified_name2:mutation.index2, ...],例如user:[user.key:0]

    • self._fuzz_data_logger.open_test_case()记录测试用例,默认是讲保存插入测试用例信息的sql语句添加到self._queue队列中。(在何处执行?在此处执行)

    • self._open_connection_keep_trying()尝试与服务器进行连接(建立套接字连接),如果是因为可用套接字数不够导致的错误,那就再进行50轮*5s的判断【如果在这段时间内有可以创建套接字,则继续进行,否则将报错】。

    • self._pre_send(target):不知道干啥的

    • self.transmit_fuzz()

      • self.fuzz_node.render():渲染模糊测试节点数据
      • 渲染完成后,由self.targets[0]来发送该数据
      • 如果self._receive_data_after_fuzzTrue,则将返回的信息保存到received变量中
    • self._check_for_passively_detected_failures():被动检查错误。首先,需要遍历target.monitors该数组两遍,第一遍检查所有的monitor,判断其是否报告一个错误,如果报告了错误,那么需要收集一个崩溃信息【不确定是否monitor一定会提供一个崩溃信息,但以防万一还是要检查一下】;在第二遍,我们尝试从未检测到崩溃的监视器中获取崩溃概要作为补充信息。如果未检测到错误,则输出”No crash detected.”,并返回是否崩溃的标志

    • 如果self._reuse_target_connection(重用目标连接)为假,那么直接关闭连接

    • 最后进行三个操作:self._process_failures()【处理错误】、self._fuzz_data_logger.close_test_case()【将日志写到数据库中】和self.export_file()【将对象值导出到本地磁盘/需要设置self.session_filename


详解fuzz_case_iterator

  • 最先传入的是self._generate_mutations_indefinitely(max_depth=max_depth) 【默认 max_depth 为 None
1
2
3
4
5
6
7
8
9
10
11
def _generate_mutations_indefinitely(self, max_depth=None, path=None):
"""Yield MutationContext with n mutations per message over all messages, with n increasing indefinitely."""
depth = 1
while max_depth is None or depth <= max_depth: # 当max_depth为None时,这里是一个无限循环
valid_case_found_at_this_depth = False # 一个标志位,表示是否在该深度找到合法的测试用例
for m in self._generate_n_mutations(depth=depth, path=path): # 调用_generate_n_mutations()生成可迭代数据
valid_case_found_at_this_depth = True # 将是否在该深度找到合法的测试用例的标志位设置为1
yield m # 生成器返回m
if not valid_case_found_at_this_depth: # 表示该层没有生成任何有效数据,则退出循环
break
depth += 1 # 深度++
  • 接着就是分析内层生成器,i.e. self._generate_n_mutations(depth=depth, path=path)
1
2
3
4
5
def _generate_n_mutations(self, depth, path):
"""Yield MutationContext with n mutations per message over all messages."""
for path in self._iterate_protocol_message_paths(path=path): # 遍历path(消息)
for m in self._generate_n_mutations_for_path(path, depth=depth): # 为每一个消息进行n次变异
yield m

1⃣ self._iterate_protocol_message_paths(path=path)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
def _iterate_protocol_message_paths(self, path=None):
"""
Iterates over protocol and yields a path (list of Connection) leading to a given message).

Args:
path (list of Connection): Provide a specific path to yield only that specific path.

Yields:
list of Connection: List of edges along the path to the current one being fuzzed.

Raises:
exception.SulleyRuntimeError: If no requests defined or no targets specified
"""
# we can't fuzz if we don't have at least one target and one request.
if not self.targets:
raise exception.SullyRuntimeError("No targets specified in session")

if not self.edges_from(self.root.id):
raise exception.SullyRuntimeError("No requests specified in session")

if path is not None:
yield path
else:
for x in self._iterate_protocol_message_paths_recursive(this_node=self.root, path=[]): # 最关键的是该函数
yield x

该函数self._iterate_protocol_message_paths_recursive的作用是返回一个以边构成的路径(请求序列)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
def _iterate_protocol_message_paths_recursive(self, this_node, path):
"""Recursive helper for _iterate_protocol.

Args:
this_node (node.Node): Current node that is being fuzzed.
path (list of Connection): List of edges along the path to the current one being fuzzed.

Yields:
list of Connection: List of edges along the path to the current one being fuzzed.
"""
# step through every edge from the current node.
for edge in self.edges_from(this_node.id): # 遍历请求依赖流图中以this_node.id起始的边(从root开始)
# keep track of the path as we fuzz through it, don't count the root node.
# we keep track of edges as opposed to nodes because if there is more then one path through a set of
# given nodes we don't want any ambiguity.
path.append(edge)

message_path = self._message_path_to_str(path)
logging.debug("fuzzing: {0}".format(message_path))
self.fuzz_node = self.nodes[path[-1].dst] # 与this_node连接的边的下一个节点(该节点为fuzz节点)

yield path

# recursively fuzz the remainder of the nodes in the session graph.
for x in self._iterate_protocol_message_paths_recursive(self.fuzz_node, path):
yield x

# finished with the last node on the path, pop it off the path stack.
if path:
path.pop()

以boofuzz的提供的ftp脚本ftp_simple.py为例,其请求依赖流图如下所示:

graph TB
a((root))-->b((user))-->c((pass))-->d((stor))
c-->e((retr))

_iterate_protocol_message_paths_recursive生成器将会生成path有:[user], [user, pass], [user, pass, stor], [user, pass, retr]

2⃣ self._generate_n_mutations_for_path(path, depth=depth)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
def _generate_n_mutations_for_path(self, path, depth):
"""Yield MutationContext with n mutations for a specific message.

Args:
path (list of Connection): Nodes (Requests) along the path to the current one being fuzzed.
depth (int): Yield sets of depth mutations.

Yields:
MutationContext: A MutationContext containing one mutation.
"""
for mutations in self._generate_n_mutations_for_path_recursive(path, depth=depth):
if not self._mutations_contain_duplicate(mutations):
self.total_mutant_index += 1
yield MutationContext(message_path=path, mutations={n.qualified_name: n for n in mutations})

该函数self._generate_n_mutations_for_path_recursive的作用是

1
2
3
4
5
6
7
8
9
10
11
def _generate_n_mutations_for_path_recursive(self, path, depth, skip_elements=None):
if skip_elements is None:
skip_elements = set()
if depth == 0:
yield []
return
new_skip = skip_elements.copy()
for mutations in self._generate_mutations_for_request(path=path, skip_elements=skip_elements):
new_skip.update(m.qualified_name for m in mutations)
for ms in self._generate_n_mutations_for_path_recursive(path, depth=depth - 1, skip_elements=new_skip):
yield mutations + ms

self._generate_mutations_for_request –> self.fuzz_node.get_mutations –> self.mutations –> item.get_mutations()–>mutations() [in string.py]

这里需要注意的是,boofuzz/primitives/string.py中定义了一些字典值,存放在变量self._fuzz_library

1
2
3
4
5
6
7
8
9
10
11
12
# store fuzz_library as a class variable to avoid copying the ~70MB structure across each instantiated primitive.
# Has to be sorted to avoid duplicates
_fuzz_library = [
"!@#$%%^#$%#$@#$%$$@#$%^^**(()",
"", # strings ripped from spike (and some others I added)
"$(reboot)",
"$;reboot",
...
"|touch /tmp/SULLEY", # command injection.
"||reboot;",
"||reboot|",
]

该调用链每次会构造一个变异类,然后在_main_fuzz_loop模糊测试大循环中使用:

1
2
3
4
def _main_fuzz_loop(self, fuzz_case_iterator):
...
for mutation_context in fuzz_case_iterator: # fuzz_case_iterator --> 变异产生迭代器
...

总结

简单来说,boofuzz总体流程是:1. 遍历请求序列流图(树)并构造序列;2. 产生变异数据;3. 发送到目标服务器

boofuzz整体框架如下图所示:


Boofuzz源码解析
http://bladchan.github.io/2022/09/27/Boofuzz源码解析/
作者
bladchan
发布于
2022年9月27日
许可协议