cover

最近在为awd pwn题写checker,然后我写了个能检测pwn题全部功能是否可以正常使用的脚本,被反馈还需要改进下:

  • 不可以用pwntools库
  • 不能让选手直接nop free,要不选手体验差

项目代码已开源:https://github.com/Q1IQ/AWD-PWN-Checker

pwntools禁用

对我这种fw pwn手来说,没了pwntools就是没了胳膊,干啥啥不行。不过对于checker来说,只要有网络通信功能就行,于是找到了一个平替,zio

from zio import *

is_local = True

if is_local:
    io = zio('./buggy-server')            # used for local pwning development
else:
    io = zio(('1.2.3.4', 1337))           # used to exploit remote service

io.read_until(b'Welcome Banner')
io.write(your_awesome_ropchain_or_shellcode)
# hey, we got an interactive shell!
io.interact()

我猜不让用pwntools的原因大概是因为现在python2装不上pwntools了吧,我测试了下,现在ubuntu 16.04装不上,18.04还是可以装上的。

反正第一个问题算是解决了。

不能 nop free

第二个问题。

首先考虑到,这场awd是要给每个选手一台服务器维护的,我作为一个checker怎么能拿到选手服务器的pwn题?打awd的时候我可以一个exp打过去拿到shell再把pwn题传过来,但作为checker的话可想而知洞会被补上。想来想去只能上ssh了,上个公钥在选手服务器。

import paramiko
def down_from_remote(host, remotepath, localpath, port=22):
    keyfile = open('./awd_rsa', 'r')
    private_key = paramiko.RSAKey.from_private_key(keyfile)
    t = paramiko.Transport((host, port))
    t.connect(username='root', pkey=private_key)
    sftp = paramiko.SFTPClient.from_transport(t)
    sftp.get(remotepath, localpath)

然后考虑free的问题。

赛场上遇到给服务器的、pwn题有uaf的话,要我修我也是先nop(bushi),当然更优美地修的话就是加段置0的汇编,然后call free的时候跳到那里,例:

.eh_frame:0000000000401378 ; =============== S U B R O U T I N E =======================================
.eh_frame:0000000000401378
.eh_frame:0000000000401378
.eh_frame:0000000000401378 sub_401378 proc near ; CODE XREF: sub_400AEF+73竊叢
.eh_frame:0000000000401378 mov rax, [rbp-8] ; Keypatch modified this from:
.eh_frame:0000000000401378 ; db 14h
.eh_frame:0000000000401378 ; db 0
.eh_frame:0000000000401378 ; db 0
.eh_frame:0000000000401378 ; db 0
.eh_frame:000000000040137C mov rax, ds:ptr[rax*8] ; Keypatch modified this from:
.eh_frame:000000000040137C ; db 0
.eh_frame:000000000040137C ; db 0
.eh_frame:000000000040137C ; db 0
.eh_frame:000000000040137C ; db 0
.eh_frame:000000000040137C ; db 1
.eh_frame:000000000040137C ; db 7Ah
.eh_frame:000000000040137C ; db 52h
.eh_frame:000000000040137C ; db 0
.eh_frame:0000000000401384 mov rdi, rax ; Keypatch modified this from:
.eh_frame:0000000000401384 ; db 1
.eh_frame:0000000000401384 ; db 78h
.eh_frame:0000000000401384 ; db 10h
.eh_frame:0000000000401387 call _free ; Keypatch modified this from:
.eh_frame:0000000000401387 ; add [rbx], ebx
.eh_frame:0000000000401387 ; or al, 7
.eh_frame:0000000000401387 ; or [rax+14100701h], dl
.eh_frame:000000000040138C mov rax, [rbp-8] ; Keypatch modified this from:
.eh_frame:000000000040138C sub_401378 endp ; nop
.eh_frame:000000000040138C ; db 1
.eh_frame:000000000040138C ; db 7
.eh_frame:000000000040138C ; db 10h
.eh_frame:0000000000401390 mov rbx, 8
.eh_frame:0000000000401397 mul ebx ; Keypatch filled range [0x401397:0x401396] (0 bytes), replaced:
.eh_frame:0000000000401397 ;
.eh_frame:0000000000401397 ; Keypatch modified this from:
.eh_frame:0000000000401397 ; db 0C7h
.eh_frame:0000000000401397 ; db 0
.eh_frame:0000000000401397 ; Keypatch modified this from:
.eh_frame:0000000000401397 ; mul eax
.eh_frame:0000000000401399 add rax, 6020E0h ; Keypatch modified this from:
.eh_frame:0000000000401399 ; db 0
.eh_frame:0000000000401399 ; db 0
.eh_frame:0000000000401399 ; db 0
.eh_frame:0000000000401399 ; db 0
.eh_frame:0000000000401399 ; db 0E9h
.eh_frame:0000000000401399 ; db 0C5h
.eh_frame:000000000040139F mov qword ptr [rax], 0 ; Keypatch modified this from:
.eh_frame:000000000040139F ; db 0F7h
.eh_frame:000000000040139F ; db 0FFh
.eh_frame:000000000040139F ; db 0FFh
.eh_frame:000000000040139F ; db 0
.eh_frame:000000000040139F ; db 0
.eh_frame:000000000040139F ; db 0
.eh_frame:000000000040139F ; db 0
.eh_frame:00000000004013A6 jmp loc_400B67 ; Keypatch modified this from:
.eh_frame:00000000004013A6 ; db 0
.eh_frame:00000000004013A6 ; db 0
.eh_frame:00000000004013A6 ; db 14h
.eh_frame:00000000004013A6 ; db 0
.eh_frame:00000000004013A6 ; db 0
.eh_frame:00000000004013A6 ; Keypatch modified this from:
.eh_frame:00000000004013A6 ; call loc_400B67
.eh_frame:00000000004013A6 ; ---------------------------------------------------------------------------

检测的话直接检测代码段 call free 这句有没有被改是不行的,毕竟我patch也得改这句。

检测call free 是不是变成了nop指令也不行,因为除了nop也可以改成别的啊,虽然在不知道checker逻辑的情况下选手应该很难意识到这一点,但做checker嘛就得狠点吧大概(bushi)。

也想到可以用系统调用,结果发现不行,malloc的时候有brk的系统调用如图,free是没走系统调用的。

这里还有一个思路,就是checker可以一直malloc->free->malloc->free->malloc达到一定次数,如果选手把free nop了,堆空间就会超过上次brk的地方,进程就会再调用一次brk申请空间,如果free没有被nop就不会出现这样的情况,这样就区别开来了。然而我出的题限制了分配的次数,而且这个方法也不够普适。

又想到可以看进程的内存布局。这样我需要先找到使用题目端口的进程:

然后过滤checker ip,就能知道是哪个进程在和checker通信,继而查看进程里的内存。

也可以直接执行从选手那传过来的二进制,但是要直接执行的话怎么也得弄个沙箱吧,万一选手整了个什么了不得的东西,而且开销很大。

还有种办法是在选手服务器那边执行下看看进程堆内存,大概可行?但总感觉动选手的环境不应该是checker干的事,而且还是加一个进程进去,不过已经是能想到的比较可行的方案了。

然后我也看了Flappypig战队提出来的这个lowbits leak check,但是关键就在于:

  • 那么这就给了我们在CTF线下赛中一种针对堆漏洞的Checker的思路,我们在程序交互中预先在每次malloc后,把堆地址的低12bit输出。

我太懒了,我懒得改我费老劲出的题,我就想把checker改了交差。

。。。

所以为啥好多awd pwn都要求选手提供patch好的文件给主办方帮你替换,人工看的话是一件多方便的事情。

所以最终方案是:先检查选手有没有修改call free,再检查有没有\x90(前提是那一句里原先就没有90),再看进程的内存布局,多因子检查。

def check_free(check_elf, ordinary_elf, call_free_address, size=5):
    # check call free change
    check_free_data = check_elf.get_content_from_virtual_address(
        call_free_address, size)
    ordinary_data = ordinary_elf.get_content_from_virtual_address(
        call_free_address, size)

    # equal => no change
    if operator.eq(check_free_data, ordinary_data):
        return True

    # if has 90 => nop free
    if 0x90 in check_free_data:
        return False
    # temporary
    return True

def check_free_from_remote(host, pwnport, local, port=22):
    keyfile = open('./awd_rsa', 'r')
    private_key = paramiko.RSAKey.from_private_key(keyfile)
    # connect to host
    io = zio((host, pwnport))

    # get pid infomation
    s = paramiko.SSHClient()
    s.set_missing_host_key_policy(paramiko.AutoAddPolicy())
    s.connect(hostname=host, port=port, username='root', pkey=private_key)  #
    stdin, stdout, stderr = s.exec_command(
        "lsof -i:8888|grep "+local+"|grep  -v 'timeout' |awk '{print $2}'|head -n 1")

    checker_pwn_pid = stdout.read().decode().strip()

    # initialize heap
    io.read_until(b'Input your choice:')
    io.write('c')  # function [c]create a book
    io.read_until(b'Which book do you want to create?')
    io.writeline('0')
    io.read_until(b'Input your choice:')
    io.write('c')  # function [c]create a book
    io.read_until(b'Which book do you want to create?')
    io.writeline('1')
    # heap info
    stdin, stdout, stderr = s.exec_command(
        "cat /proc/{0}/maps".format(checker_pwn_pid))
    map_info = stdout.read().decode().split('\n')

    heap_info = ''
    if '[heap]' in map_info[3]:
        heap_info = map_info[3]
    else:
        for i in map_info:
            if '[heap]' in i:
                heap_info = i
    # malloc may be nopped
    if heap_info == '':
        io.close()
        return False

    # heap addr
    heap_addr_start, heap_addr_end = [int(i, 16) for i in re.match(
        "\w*-\w*", heap_info).group(0).split('-')]
    # check mem
    sftp = s.open_sftp()
    io.read_until(b'Input your choice:')
    io.write('d')  # fuction [d]delete
    io.read_until(b'Which book do you want to delete?')
    io.writeline('1')
    io.read_until(b'Input your choice:')
    io.write('d')  # fuction [d]delete
    io.read_until(b'Which book do you want to delete?')
    io.writeline('0')
    # if free is there, heap should be bin->0->1
    stdin, stdout, stderr = s.exec_command(
        "lsof -i:8888")
    with sftp.file("/proc/{0}/mem".format(checker_pwn_pid), mode='rb') as file:
        file.seek(heap_addr_start+0x8)
        chuck_1_size = int(str(unpack("<Q", file.read(8))[0]), 10)
        chuck_1_fd = int(str(unpack("<Q", file.read(8))[0]), 10)
        if (chuck_1_fd == (heap_addr_start+0x70)) and (chuck_1_size == 0x71):
            io.close()
            return True
        else:
            io.close()
            return False

不能改大malloc size

跟free相比情况就少多了,malloc相关的代码块甚至是所在的函数都不允许改动即可。

不能扰乱plt表和got表

plt 和 got 表也不许改。

import lief
def check_got(check_elf, ordinary_elf):
    section = check_elf.get_section('.got.plt')
    got_address = section.virtual_address
    if(compare_data(check_elf, ordinary_elf, got_address, section.size)):
        return True
    else:
        return False


def check_plt(check_elf, ordinary_elf):
    section = check_elf.get_section('.plt')
    plt_address = section.virtual_address
    if(compare_data(check_elf, ordinary_elf, plt_address, section.size)):
        return True
    else:
        return False