最近在为awd pwn题写checker,然后我写了个能检测pwn题全部功能是否可以正常使用的脚本,被反馈还需要改进下:

  • 不可以用pwntools库
  • 不能让选手直接nop free,要不选手体验差

pwntools禁用

对我这种fw pwn手来说,没了pwntools就是没了胳膊,干啥啥不行。不过对于checker来说,只要有网络通信功能就行,于是找到了一个平替,zio

1
2
3
4
5
6
7
8
9
10
11
12
13
from zio import *

is_local = True

if is_local:
io = zio('./buggy-server') # used for local pwning development
else:
io = zio(('1.2.3.4', 1337)) # used to exploit remote service

io.read_until(b'Welcome Banner')
io.write(your_awesome_ropchain_or_shellcode)
# hey, we got an interactive shell!
io.interact()

我猜不让用pwntools的原因大概是因为现在python2装不上pwntools了吧,我测试了下,现在ubuntu 16.04装不上,18.04还是可以装上的。

反正第一个问题算是解决了。

不能 nop free

第二个问题。

首先考虑到,这场awd是要给每个选手一台服务器维护的,我作为一个checker怎么能拿到选手服务器的pwn题?打awd的时候我可以一个exp打过去拿到shell再把pwn题传过来,但作为checker的话可想而知洞会被补上。想来想去只能上ssh了,上个公钥在选手服务器。

1
2
3
4
5
6
7
8
import paramiko
def down_from_remote(host, remotepath, localpath, port=22):
keyfile = open('./awd_rsa', 'r')
private_key = paramiko.RSAKey.from_private_key(keyfile)
t = paramiko.Transport((host, port))
t.connect(username='root', pkey=private_key)
sftp = paramiko.SFTPClient.from_transport(t)
sftp.get(remotepath, localpath)

然后考虑free的问题。

赛场上遇到给服务器的、pwn题有uaf的话,要我修我也是先nop(bushi),当然更优美地修的话就是加段置0的汇编,然后call free的时候跳到那里,例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
.eh_frame:0000000000401378 ; =============== S U B R O U T I N E =======================================
.eh_frame:0000000000401378
.eh_frame:0000000000401378
.eh_frame:0000000000401378 sub_401378 proc near ; CODE XREF: sub_400AEF+73竊叢
.eh_frame:0000000000401378 mov rax, [rbp-8] ; Keypatch modified this from:
.eh_frame:0000000000401378 ; db 14h
.eh_frame:0000000000401378 ; db 0
.eh_frame:0000000000401378 ; db 0
.eh_frame:0000000000401378 ; db 0
.eh_frame:000000000040137C mov rax, ds:ptr[rax*8] ; Keypatch modified this from:
.eh_frame:000000000040137C ; db 0
.eh_frame:000000000040137C ; db 0
.eh_frame:000000000040137C ; db 0
.eh_frame:000000000040137C ; db 0
.eh_frame:000000000040137C ; db 1
.eh_frame:000000000040137C ; db 7Ah
.eh_frame:000000000040137C ; db 52h
.eh_frame:000000000040137C ; db 0
.eh_frame:0000000000401384 mov rdi, rax ; Keypatch modified this from:
.eh_frame:0000000000401384 ; db 1
.eh_frame:0000000000401384 ; db 78h
.eh_frame:0000000000401384 ; db 10h
.eh_frame:0000000000401387 call _free ; Keypatch modified this from:
.eh_frame:0000000000401387 ; add [rbx], ebx
.eh_frame:0000000000401387 ; or al, 7
.eh_frame:0000000000401387 ; or [rax+14100701h], dl
.eh_frame:000000000040138C mov rax, [rbp-8] ; Keypatch modified this from:
.eh_frame:000000000040138C sub_401378 endp ; nop
.eh_frame:000000000040138C ; db 1
.eh_frame:000000000040138C ; db 7
.eh_frame:000000000040138C ; db 10h
.eh_frame:0000000000401390 mov rbx, 8
.eh_frame:0000000000401397 mul ebx ; Keypatch filled range [0x401397:0x401396] (0 bytes), replaced:
.eh_frame:0000000000401397 ;
.eh_frame:0000000000401397 ; Keypatch modified this from:
.eh_frame:0000000000401397 ; db 0C7h
.eh_frame:0000000000401397 ; db 0
.eh_frame:0000000000401397 ; Keypatch modified this from:
.eh_frame:0000000000401397 ; mul eax
.eh_frame:0000000000401399 add rax, 6020E0h ; Keypatch modified this from:
.eh_frame:0000000000401399 ; db 0
.eh_frame:0000000000401399 ; db 0
.eh_frame:0000000000401399 ; db 0
.eh_frame:0000000000401399 ; db 0
.eh_frame:0000000000401399 ; db 0E9h
.eh_frame:0000000000401399 ; db 0C5h
.eh_frame:000000000040139F mov qword ptr [rax], 0 ; Keypatch modified this from:
.eh_frame:000000000040139F ; db 0F7h
.eh_frame:000000000040139F ; db 0FFh
.eh_frame:000000000040139F ; db 0FFh
.eh_frame:000000000040139F ; db 0
.eh_frame:000000000040139F ; db 0
.eh_frame:000000000040139F ; db 0
.eh_frame:000000000040139F ; db 0
.eh_frame:00000000004013A6 jmp loc_400B67 ; Keypatch modified this from:
.eh_frame:00000000004013A6 ; db 0
.eh_frame:00000000004013A6 ; db 0
.eh_frame:00000000004013A6 ; db 14h
.eh_frame:00000000004013A6 ; db 0
.eh_frame:00000000004013A6 ; db 0
.eh_frame:00000000004013A6 ; Keypatch modified this from:
.eh_frame:00000000004013A6 ; call loc_400B67
.eh_frame:00000000004013A6 ; ---------------------------------------------------------------------------

检测的话直接检测代码段 call free这句有没有被改是不行的,毕竟我patch也得改这句。

检测call free是不是变成了nop指令也不行,因为除了nop也可以改成别的啊,虽然在不知道checker逻辑的情况下选手应该很难意识到这一点,但做checker嘛就得狠点吧大概(bushi)。

也想到可以用系统调用,结果发现不行,malloc的时候有brk的系统调用如图,free是没走系统调用的。

这里还有一个思路,就是checker可以一直malloc->free->malloc->free->malloc达到一定次数,如果选手把free nop了,堆空间就会超过上次brk的地方,进程就会再调用一次brk申请空间,如果free没有被nop就不会出现这样的情况,这样就区别开来了。然而我出的题限制了分配的次数,而且这个方法也不够普适。

又想到可以看进程的内存布局。这样我需要先找到使用题目端口的进程:

然后过滤checker ip,就能知道是哪个进程在和checker通信,继而查看进程里的内存。

也可以直接执行从选手那传过来的二进制,但是要直接执行的话怎么也得弄个沙箱吧,万一选手整了个什么了不得的东西,而且开销很大。

还有种办法是在选手服务器那边执行下看看进程堆内存,大概可行?但总感觉动选手的环境不应该是checker干的事,而且还是加一个进程进去,不过已经是能想到的比较可行的方案了。

然后我也看了Flappypig战队提出来的这个lowbits leak check,但是关键就在于:

  • 那么这就给了我们在CTF线下赛中一种针对堆漏洞的Checker的思路,我们在程序交互中预先在每次malloc后,把堆地址的低12bit输出。

我太懒了,我懒得改我费老劲出的题,我就想把checker改了交差。

。。。

所以为啥好多awd pwn都要求选手提供patch好的文件给主办方帮你替换,人工看的话是一件多方便的事情。

所以最终方案是:先检查选手有没有修改call free,再检查有没有\x90(前提是那一句里原先就没有90),再看进程的内存布局,多因子检查。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
def check_free(check_elf, ordinary_elf, call_free_address, size=5):
# check call free change
check_free_data = check_elf.get_content_from_virtual_address(
call_free_address, size)
ordinary_data = ordinary_elf.get_content_from_virtual_address(
call_free_address, size)

# equal => no change
if operator.eq(check_free_data, ordinary_data):
return True

# if has 90 => nop free
if 0x90 in check_free_data:
return False
# temporary
return True

def check_free_from_remote(host, pwnport, local, port=22):
keyfile = open('./awd_rsa', 'r')
private_key = paramiko.RSAKey.from_private_key(keyfile)
# connect to host
io = zio((host, pwnport))

# get pid infomation
s = paramiko.SSHClient()
s.set_missing_host_key_policy(paramiko.AutoAddPolicy())
s.connect(hostname=host, port=port, username='root', pkey=private_key) #
stdin, stdout, stderr = s.exec_command(
"lsof -i:8888|grep "+local+"|grep -v 'timeout' |awk '{print $2}'|head -n 1")

checker_pwn_pid = stdout.read().decode().strip()

# initialize heap
io.read_until(b'Input your choice:')
io.write('c') # function [c]create a book
io.read_until(b'Which book do you want to create?')
io.writeline('0')
io.read_until(b'Input your choice:')
io.write('c') # function [c]create a book
io.read_until(b'Which book do you want to create?')
io.writeline('1')
# heap info
stdin, stdout, stderr = s.exec_command(
"cat /proc/{0}/maps".format(checker_pwn_pid))
map_info = stdout.read().decode().split('\n')

heap_info = ''
if '[heap]' in map_info[3]:
heap_info = map_info[3]
else:
for i in map_info:
if '[heap]' in i:
heap_info = i
# malloc may be nopped
if heap_info == '':
io.close()
return False

# heap addr
heap_addr_start, heap_addr_end = [int(i, 16) for i in re.match(
"\w*-\w*", heap_info).group(0).split('-')]
# check mem
sftp = s.open_sftp()
io.read_until(b'Input your choice:')
io.write('d') # fuction [d]delete
io.read_until(b'Which book do you want to delete?')
io.writeline('1')
io.read_until(b'Input your choice:')
io.write('d') # fuction [d]delete
io.read_until(b'Which book do you want to delete?')
io.writeline('0')
# if free is there, heap should be bin->0->1
stdin, stdout, stderr = s.exec_command(
"lsof -i:8888")
with sftp.file("/proc/{0}/mem".format(checker_pwn_pid), mode='rb') as file:
file.seek(heap_addr_start+0x8)
chuck_1_size = int(str(unpack("<Q", file.read(8))[0]), 10)
chuck_1_fd = int(str(unpack("<Q", file.read(8))[0]), 10)
if (chuck_1_fd == (heap_addr_start+0x70)) and (chuck_1_size == 0x71):
io.close()
return True
else:
io.close()
return False

不能改大malloc size

跟free相比情况就少多了,malloc相关的代码块甚至是所在的函数都不允许改动即可。

不能扰乱plt表和got表

plt 和 got 表也不许改。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
import lief
def check_got(check_elf, ordinary_elf):
section = check_elf.get_section('.got.plt')
got_address = section.virtual_address
if(compare_data(check_elf, ordinary_elf, got_address, section.size)):
return True
else:
return False


def check_plt(check_elf, ordinary_elf):
section = check_elf.get_section('.plt')
plt_address = section.virtual_address
if(compare_data(check_elf, ordinary_elf, plt_address, section.size)):
return True
else:
return False