CTFZone2018-Federation Workflow System

翻譯自https://github.com/p4-team/ctf/tree/master/2018-07-21-ctfzone-quals/crypto_federation
題目描述

The source code for the Federation Workflow System has been leaked online this night.
Our goal is to inspect it and gain access to their Top Secret documents.
nc crypto-04.v7frkwrfyhsjtbpfcppnu.ctfz.one 7331
考察的是在一些條件下,對(duì)AES-ECB的攻擊。
給了server.py 和 client.py

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import os
import sys
import hmac
import socket
from hashlib import sha1
from Crypto.Cipher import AES
from struct import pack, unpack
from threading import Thread, Lock
from base64 import standard_b64encode
from time import time, sleep, strftime


class SecureServer:

    def __init__(self):
        self.msg_end = '</msg>'
        self.msg_not_found = 'NOT_FOUND'
        self.msg_wrong_pin = 'BAD_PIN'
        self.lock = Lock()
        self.log_path = '../top_secret/server.log'
        self.real_flag = '../top_secret/real.flag'
        self.aes_key = '../top_secret/aes.key'
        self.totp_key = 'totp.secret'
        self.files_available = [
                                    'lorem.txt',
                                    'flag.txt',
                                    'admin.txt',
                                    'password.txt'
                                ]

        self.host = '0.0.0.0'
        self.port = 7331
        self.buff_size = 1024

        self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
        self.sock.bind((self.host, self.port))
        self.sock.listen(50)

        self.listener = Thread(target=self.listen)
        self.listener.daemon = True
        self.listener.start()

        self.log('Server started')

    def listen(self):
        while True:
            try:
                client, address = self.sock.accept()
                client.settimeout(30)
                sock_thread = Thread(target=self.handle, args=(client, address))
                sock_thread.daemon = True
                sock_thread.start()

                self.log('Client {0} connected'.format(address[0]))

            except Exception as ex:
                self.log(ex)

    def handle(self, client, address):
        data = self.recv_until(client, self.msg_end)
        self.log('Got message from client {0}: {1}'.format(address[0], data))

        args = data.split(' ', 1)
        command = args[0].strip()

        if command == 'list':
            self.send_list_files(client, address)
        elif command == 'login':
            self.send_login_time(client, address)
        elif command == 'file':
            if len(args) != 2:
                self.send(client, 'Bad request')
            else:
                self.send_file_data(args[1], client, address)
        elif command == 'admin':
            if len(args) != 2:
                self.send(client, 'Bad request')
            else:
                self.send_admin_token(args[1], client, address)
        else:
            self.send(client, 'Bad request or timed out')

        client.close()

    def send_list_files(self, client, address):
        self.send(client, ','.join(self.files_available))
        self.log('Sending available files list to client {0}'.format(address[0]))

    def send_login_time(self, client, address):
        self.send(client, int(time()))
        self.log('Client auth from {0}'.format(address[0]))

    def send_file_data(self, file, client, address):
        content = self.read_file(file)
        response = '{0}: {1}'.format(file, content)
        encrypted_response = self.encrypt(response)
        self.send(client, encrypted_response)
        self.log('Sending file "{0}" to client {1}'.format(file, address[0]))

    def send_admin_token(self, client_pin, client, address):
        try:
            if self.check_totp(client_pin):
                response = 'flag: {0}'.format(open(self.real_flag).read())
                self.send(client, response)
                self.log('Sending admin token to client {0}'.format(address[0]))
            else:
                self.send(client, self.msg_wrong_pin)
                self.log('Wrong pin from client {0}'.format(address[0]))

        except Exception as ex:
            self.log(ex)
            self.send(client, 'Bad request')

    def check_totp(self, client_pin):
        try:
            secret = open(self.totp_key).read()
            server_pin = self.totp(secret)
            return client_pin == server_pin

        except Exception as ex:
            self.log(ex)
            return False

    def totp(self, secret):
        counter = pack('>Q', int(time()) // 30)
        totp_hmac = hmac.new(secret.encode('UTF-8'), counter, sha1).digest()
        offset = totp_hmac[19] & 15
        totp_pin = str((unpack('>I', totp_hmac[offset:offset + 4])[0] & 0x7fffffff) % 1000000)
        return totp_pin.zfill(6)

    def encrypt(self, data):
        block_size = 16

        data = data.encode('utf-8')
        pad = block_size - len(data) % block_size
        data = data + (pad * chr(pad)).encode('utf-8')

        key = open(self.aes_key).read()
        cipher = AES.new(key, AES.MODE_ECB)

        return standard_b64encode(cipher.encrypt(data)).decode('utf-8')

    def read_file(self, file):
        try:
            clean_path = self.sanitize(file)
            if clean_path is not None:
                return open(clean_path).read()
            else:
                return self.msg_not_found

        except Exception as ex:
            self.log(ex)
            return self.msg_not_found

    def sanitize(self, file):
        try:
            if file.find('\x00') == -1:
                file_name = file
            else:
                file_name = file[:file.find('\x00')]

            file_path = os.path.realpath('files/{0}'.format(file_name))

            if file_path.startswith(os.getcwd()):
                return file_path
            else:
                return None

        except Exception as ex:
            self.log(ex)
            return None

    def send(self, client, data):
        client.send('{0}{1}'.format(data, self.msg_end).encode('UTF-8'))

    def recv_until(self, client, end):
        try:
            recv = client.recv(self.buff_size).decode('utf-8')
            while recv.find(end) == -1:
                recv += client.recv(self.buff_size).decode('utf-8')
            return recv[:recv.find(end)]

        except Exception as ex:
            self.log(ex)
            return ''

    def log(self, data):
        self.lock.acquire()
        print('[{0}] {1}'.format(strftime('%d.%m.%Y %H:%M:%S'), data))
        sys.stdout.flush()
        self.lock.release()


if __name__ == '__main__':
    secure_server = SecureServer()

    while True:
        try:
            sleep(1)
        except KeyboardInterrupt:
            secure_server.log('Server terminated')
            exit(0)
#!/usr/bin/env python
# -*- coding: utf-8 -*-

import os
import socket
from time import sleep
from Crypto.Cipher import AES
from base64 import standard_b64decode


class SecureClient:

    def __init__(self):
        self.msg_end = '</msg>'
        self.msg_wrong_pin = 'BAD_PIN'
        self.aes_key = 'aes.key'

        self.host = 'crypto-04.v7frkwrfyhsjtbpfcppnu.ctfz.one'
        self.port = 7331
        self.buff_size = 1024

        try:
            self.greeting()
        except KeyboardInterrupt:
            exit(0)

    def greeting(self):
        self.cls()

        print('\n   ==================================== !!! CONFIDENTIALITY NOTICE !!! ====================================')
        print('   ||                 You trying to access high confidential Federation workflow system.                 ||')
        print('   ||                 If you are not authorised to use this system leave it immediately.                 ||')
        print('   || Otherwise incident will be reported and you will be eliminated as it considered by Federation Law. ||')
        print('   ========================================================================================================\n')
        user_choice = input('   Do you want to proceed? (yes/no) > ')

        if user_choice.lower() == 'yes':
            print('   Checking user...')
            sleep(5)
            print('   SUCCESS: ACCESS GRANTED')
            print('   Last login time: {0}'.format(self.get_last_login()))
            sleep(1)
            self.cls()
            print('\n   Welcome, Head Consul.')
            self.main_menu()

        else:
            print('   Checking user...')
            sleep(5)
            print('   ERROR: UNAUTHORISED USER')
            sleep(1)

            print('\n   Reporting incident...')
            sleep(5)
            print('   SUCCESS: INCIDENT REPORTED')
            sleep(1)

            print('\n   Please stay in place and wait for Federation Security Department extraction team.\n')
            exit(0)

    def main_menu(self):
        while True:
            print("\n   You are authorised to:")
            print("      list - view list of available files")
            print("      file - request file from server")
            print("      admin - use administrative functions")
            print("      exit - exit workflow system")

            user_choice = input('\n   What do you want to do? (list/file/admin/exit) > ')

            self.cls()

            if user_choice.lower() == 'list':
                self.list_files()
            elif user_choice.lower() == 'file':
                self.view_file()
            elif user_choice.lower() == 'admin':
                self.admin()
            elif user_choice.lower() == 'exit':
                exit(0)
            else:
                print('\n   Unrecognized command, try again')

    def list_files(self):
        file_list = self.get_file_list()

        print('\n   You are authorised to view listed files:\n')
        for file in file_list:
            print('   - {0}'.format(file))

    def view_file(self):
        self.list_files()

        filename = input('\n   Which file you want to view? > ')
        file_content = self.send('file {0}'.format(filename))

        if len(file_content) > 0:
            plain_content = self.decrypt(file_content)
            if len(plain_content) > 0:
                print('\n   ========================================================================================================')
                print('   Content of {0}'.format(plain_content))
                print('   ========================================================================================================')
            else:
                print('\n   Seems like you have no decryption key, so you can\'t see any files.')
        else:
            print('\n   Error while requesting file')

    def admin(self):
        print('\n   Access to administrative functions requires additional security check.')
        pin = input('   Enter your administrative PIN > ')
        response = self.send('admin {0}'.format(pin))

        if response == self.msg_wrong_pin:
            print('\n   Wrong administrative PIN. Incident will be reported.')
        else:
            print('\n   High confidential administrative data: {0}'.format(response))

    def decrypt(self, data):
        try:
            key = open(self.aes_key).read()
            cipher = AES.new(key, AES.MODE_ECB)

            plain = cipher.decrypt(standard_b64decode(data)).decode('UTF-8')
            plain = plain[:-ord(plain[-1])]
            return plain

        except Exception as ex:
            return ''

    def get_last_login(self):
        return self.send('login')

    def get_file_list(self):
        files = self.send('list')

        if len(files) > 0:
            return files.split(',')
        else:
            return ['no files available']

    def send(self, data):
        try:
            sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
            sock.connect((self.host, self.port))
            sock.send('{0}{1}'.format(data, self.msg_end).encode('UTF-8'))
            response = self.recv_until(sock, self.msg_end)
            sock.close()
            return response

        except Exception as ex:
            return ''

    def recv_until(self, sock, end):
        try:
            recv = sock.recv(self.buff_size).decode('utf-8')
            while recv.find(end) == -1:
                recv += sock.recv(self.buff_size).decode('utf-8')
            return recv[:recv.find(end)]

        except Exception as ex:
            return ''

    def cls(self):
        os.system('cls' if os.name == 'nt' else 'clear')


if __name__ == '__main__':
    secure_client = SecureClient()

簡(jiǎn)單瀏覽一下客戶(hù)端的邏輯,客戶(hù)端的功能有:

  • 列出服務(wù)器上的文件
  • 得到AES-ECB 加密的文件內(nèi)容(但是不知道密鑰)
  • 如果我們能提供正確的OTP,那么可以登錄為Admin并且讀出flag

如果我們進(jìn)一步閱讀服務(wù)器的代碼,我們可以看到文件列表是硬編碼的,并不是很有用。其次我們可以看到發(fā)送來(lái)的并不完全是加密的文件,而是:

content = self.read_file(file)
response = '{0}: {1}'.format(file, content)
encrypted_response = self.encrypt(response)

在文件內(nèi)容外,起那面會(huì)加上我們提供的文件名!文件名會(huì)以一種奇怪的方式sanitized:

def sanitize(self, file):
    try:
        if file.find('\x00') == -1:
            file_name = file
        else:
            file_name = file[:file.find('\x00')]

        file_path = os.path.realpath('files/{0}'.format(file_name))

        if file_path.startswith(os.getcwd()):
            return file_path
        else:
            return None

可以看到只會(huì)去第一個(gè)null字節(jié)之前的內(nèi)容作為文件名。
我們可以繼續(xù)看看服務(wù)器上有哪些文件:

self.log_path = '../top_secret/server.log'
self.real_flag = '../top_secret/real.flag'
self.aes_key = '../top_secret/aes.key'
self.totp_key = 'totp.secret'

可以看到flag,aes_key 和log 都是不可達(dá)的,totp.secret文件是我們可以通過(guò)服務(wù)器請(qǐng)求到的。
現(xiàn)在讓我們來(lái)看一下admin指令。他會(huì)校驗(yàn)OTP,如果正確就會(huì)發(fā)來(lái) flag。OTP生成算法如下:

def totp(self, secret):
    counter = pack('>Q', int(time()) // 30)
    totp_hmac = hmac.new(secret.encode('UTF-8'), counter, sha1).digest()
    offset = ord(totp_hmac[19]) & 15
    totp_pin = str((unpack('>I', totp_hmac[offset:offset + 4])[0] & 0x7fffffff) % 1000000)
    return totp_pin.zfill(6)

該算法是基于時(shí)間的,我們可以通過(guò)get time 指令來(lái)獲得服務(wù)器上的時(shí)間。因此唯一未知的值就是secret,該值是從totp.secret文件中讀取的。如果我們能獲得該文件的內(nèi)容就可以計(jì)算出正確的OTP,從而以admin身份獲得flag。
在之前我們提到我們獲得的不僅是文件的內(nèi)容,還有文件名,而且我們可以在文件名之后加任意的nullbyte,仍能夠讀到正確的文件。
我們可以用上述的性質(zhì)來(lái)恢復(fù)加密的內(nèi)容!這是因?yàn)槲覀兛梢栽诳梢钥刂魄熬Y的情況下解密任何AES-ECB密文的后綴。思路如下:

  • 將第一個(gè)塊隨意的填充,只留最后一個(gè)字節(jié)為可變的
  • 窮舉256種情況,保持相同的前綴,只有最后一個(gè)字節(jié)不同,并加密,得到對(duì)應(yīng)的256種密文
  • 對(duì)于我們想要解密的后綴suffix,讓其第一字節(jié)為第一個(gè)塊的最后一個(gè)字節(jié),第一個(gè)塊與第2步保持相同的前綴,并加密
  • 把第三個(gè)塊加密的結(jié)果同第二步的256種情況進(jìn)行比較,即可會(huì)的后綴的第一個(gè)字節(jié)
  • 不停重復(fù)上述過(guò)程,每求出一個(gè)字節(jié),就在第一步種左移一個(gè)字節(jié),讓待求的字節(jié)占據(jù)第一個(gè)塊的最后一個(gè)字節(jié),這樣每窮舉256次即可求出一個(gè)字節(jié)。我們也可以擴(kuò)展這個(gè)方法來(lái)求超出一個(gè)塊長(zhǎng)度的后綴。
    代碼如下:
def brute_ecb_suffix(encrypt_function, block_size=16, expected_suffix_len=32, pad_char='A'):
    suffix = ""
    recovery_block = expected_suffix_len / block_size - 1
    for i in range(expected_suffix_len - len(suffix) - 1, -1, -1):
        data = pad_char * i
        correct = chunk(encrypt_function(data), block_size)[recovery_block]
        for character in range(256):
            c = chr(character)
            test = data + suffix + c
            try:
                encrypted = chunk(encrypt_function(test), block_size)[recovery_block]
                if correct == encrypted:
                    suffix += c
                    print('FOUND', expected_suffix_len - i, c)
                    break
            except:
                pass
    return suffix

對(duì)于這道題而言,應(yīng)該結(jié)合如下加密函數(shù):

def encrypt(pad):
    return send("file ../totp.secret\0\0" + pad).decode("base64")[16:]

這樣就能獲得totp.secret,并計(jì)算出OTP,使用admin指令

def main():
    # secret = brute_ecb_suffix(encrypt, 16, 64, '\0')[2:]
    secret = '0b25610980900cffe65bfa11c41512e28b0c96881a939a2d'
    result = send('login')
    time = int(result)
    print(send('admin ' + totp(secret, time)))
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書(shū)系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

  • 本文主要介紹移動(dòng)端的加解密算法的分類(lèi)、其優(yōu)缺點(diǎn)特性及應(yīng)用,幫助讀者由淺入深地了解和選擇加解密算法。文中會(huì)包含算法的...
    蘋(píng)果粉閱讀 11,678評(píng)論 5 29
  • Spring Cloud為開(kāi)發(fā)人員提供了快速構(gòu)建分布式系統(tǒng)中一些常見(jiàn)模式的工具(例如配置管理,服務(wù)發(fā)現(xiàn),斷路器,智...
    卡卡羅2017閱讀 136,563評(píng)論 19 139
  • 這篇文章主要講述在Mobile BI(移動(dòng)商務(wù)智能)開(kāi)發(fā)過(guò)程中,在網(wǎng)絡(luò)通信、數(shù)據(jù)存儲(chǔ)、登錄驗(yàn)證這幾個(gè)方面涉及的加密...
    雨_樹(shù)閱讀 3,039評(píng)論 0 6
  • 目錄一、對(duì)稱(chēng)加密?1、對(duì)稱(chēng)加密是什么?2、對(duì)稱(chēng)加密的優(yōu)點(diǎn)?3、對(duì)稱(chēng)加密的問(wèn)題?4、對(duì)稱(chēng)加密的應(yīng)用場(chǎng)景?5、對(duì)稱(chēng)加密...
    意一ineyee閱讀 62,588評(píng)論 8 110
  • 水上萍閱讀 136評(píng)論 0 6

友情鏈接更多精彩內(nèi)容