翻譯自https://github.com/p4-team/ctf/tree/master/2018-07-21-ctfzone-quals/crypto_federation
題目描述
The source code for the Federation Workflow System has been leaked online this night.
Our goal is to inspect it and gain access to their Top Secret documents.
nc crypto-04.v7frkwrfyhsjtbpfcppnu.ctfz.one 7331
考察的是在一些條件下,對(duì)AES-ECB的攻擊。
給了server.py 和 client.py
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import os
import sys
import hmac
import socket
from hashlib import sha1
from Crypto.Cipher import AES
from struct import pack, unpack
from threading import Thread, Lock
from base64 import standard_b64encode
from time import time, sleep, strftime
class SecureServer:
def __init__(self):
self.msg_end = '</msg>'
self.msg_not_found = 'NOT_FOUND'
self.msg_wrong_pin = 'BAD_PIN'
self.lock = Lock()
self.log_path = '../top_secret/server.log'
self.real_flag = '../top_secret/real.flag'
self.aes_key = '../top_secret/aes.key'
self.totp_key = 'totp.secret'
self.files_available = [
'lorem.txt',
'flag.txt',
'admin.txt',
'password.txt'
]
self.host = '0.0.0.0'
self.port = 7331
self.buff_size = 1024
self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
self.sock.bind((self.host, self.port))
self.sock.listen(50)
self.listener = Thread(target=self.listen)
self.listener.daemon = True
self.listener.start()
self.log('Server started')
def listen(self):
while True:
try:
client, address = self.sock.accept()
client.settimeout(30)
sock_thread = Thread(target=self.handle, args=(client, address))
sock_thread.daemon = True
sock_thread.start()
self.log('Client {0} connected'.format(address[0]))
except Exception as ex:
self.log(ex)
def handle(self, client, address):
data = self.recv_until(client, self.msg_end)
self.log('Got message from client {0}: {1}'.format(address[0], data))
args = data.split(' ', 1)
command = args[0].strip()
if command == 'list':
self.send_list_files(client, address)
elif command == 'login':
self.send_login_time(client, address)
elif command == 'file':
if len(args) != 2:
self.send(client, 'Bad request')
else:
self.send_file_data(args[1], client, address)
elif command == 'admin':
if len(args) != 2:
self.send(client, 'Bad request')
else:
self.send_admin_token(args[1], client, address)
else:
self.send(client, 'Bad request or timed out')
client.close()
def send_list_files(self, client, address):
self.send(client, ','.join(self.files_available))
self.log('Sending available files list to client {0}'.format(address[0]))
def send_login_time(self, client, address):
self.send(client, int(time()))
self.log('Client auth from {0}'.format(address[0]))
def send_file_data(self, file, client, address):
content = self.read_file(file)
response = '{0}: {1}'.format(file, content)
encrypted_response = self.encrypt(response)
self.send(client, encrypted_response)
self.log('Sending file "{0}" to client {1}'.format(file, address[0]))
def send_admin_token(self, client_pin, client, address):
try:
if self.check_totp(client_pin):
response = 'flag: {0}'.format(open(self.real_flag).read())
self.send(client, response)
self.log('Sending admin token to client {0}'.format(address[0]))
else:
self.send(client, self.msg_wrong_pin)
self.log('Wrong pin from client {0}'.format(address[0]))
except Exception as ex:
self.log(ex)
self.send(client, 'Bad request')
def check_totp(self, client_pin):
try:
secret = open(self.totp_key).read()
server_pin = self.totp(secret)
return client_pin == server_pin
except Exception as ex:
self.log(ex)
return False
def totp(self, secret):
counter = pack('>Q', int(time()) // 30)
totp_hmac = hmac.new(secret.encode('UTF-8'), counter, sha1).digest()
offset = totp_hmac[19] & 15
totp_pin = str((unpack('>I', totp_hmac[offset:offset + 4])[0] & 0x7fffffff) % 1000000)
return totp_pin.zfill(6)
def encrypt(self, data):
block_size = 16
data = data.encode('utf-8')
pad = block_size - len(data) % block_size
data = data + (pad * chr(pad)).encode('utf-8')
key = open(self.aes_key).read()
cipher = AES.new(key, AES.MODE_ECB)
return standard_b64encode(cipher.encrypt(data)).decode('utf-8')
def read_file(self, file):
try:
clean_path = self.sanitize(file)
if clean_path is not None:
return open(clean_path).read()
else:
return self.msg_not_found
except Exception as ex:
self.log(ex)
return self.msg_not_found
def sanitize(self, file):
try:
if file.find('\x00') == -1:
file_name = file
else:
file_name = file[:file.find('\x00')]
file_path = os.path.realpath('files/{0}'.format(file_name))
if file_path.startswith(os.getcwd()):
return file_path
else:
return None
except Exception as ex:
self.log(ex)
return None
def send(self, client, data):
client.send('{0}{1}'.format(data, self.msg_end).encode('UTF-8'))
def recv_until(self, client, end):
try:
recv = client.recv(self.buff_size).decode('utf-8')
while recv.find(end) == -1:
recv += client.recv(self.buff_size).decode('utf-8')
return recv[:recv.find(end)]
except Exception as ex:
self.log(ex)
return ''
def log(self, data):
self.lock.acquire()
print('[{0}] {1}'.format(strftime('%d.%m.%Y %H:%M:%S'), data))
sys.stdout.flush()
self.lock.release()
if __name__ == '__main__':
secure_server = SecureServer()
while True:
try:
sleep(1)
except KeyboardInterrupt:
secure_server.log('Server terminated')
exit(0)
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import os
import socket
from time import sleep
from Crypto.Cipher import AES
from base64 import standard_b64decode
class SecureClient:
def __init__(self):
self.msg_end = '</msg>'
self.msg_wrong_pin = 'BAD_PIN'
self.aes_key = 'aes.key'
self.host = 'crypto-04.v7frkwrfyhsjtbpfcppnu.ctfz.one'
self.port = 7331
self.buff_size = 1024
try:
self.greeting()
except KeyboardInterrupt:
exit(0)
def greeting(self):
self.cls()
print('\n ==================================== !!! CONFIDENTIALITY NOTICE !!! ====================================')
print(' || You trying to access high confidential Federation workflow system. ||')
print(' || If you are not authorised to use this system leave it immediately. ||')
print(' || Otherwise incident will be reported and you will be eliminated as it considered by Federation Law. ||')
print(' ========================================================================================================\n')
user_choice = input(' Do you want to proceed? (yes/no) > ')
if user_choice.lower() == 'yes':
print(' Checking user...')
sleep(5)
print(' SUCCESS: ACCESS GRANTED')
print(' Last login time: {0}'.format(self.get_last_login()))
sleep(1)
self.cls()
print('\n Welcome, Head Consul.')
self.main_menu()
else:
print(' Checking user...')
sleep(5)
print(' ERROR: UNAUTHORISED USER')
sleep(1)
print('\n Reporting incident...')
sleep(5)
print(' SUCCESS: INCIDENT REPORTED')
sleep(1)
print('\n Please stay in place and wait for Federation Security Department extraction team.\n')
exit(0)
def main_menu(self):
while True:
print("\n You are authorised to:")
print(" list - view list of available files")
print(" file - request file from server")
print(" admin - use administrative functions")
print(" exit - exit workflow system")
user_choice = input('\n What do you want to do? (list/file/admin/exit) > ')
self.cls()
if user_choice.lower() == 'list':
self.list_files()
elif user_choice.lower() == 'file':
self.view_file()
elif user_choice.lower() == 'admin':
self.admin()
elif user_choice.lower() == 'exit':
exit(0)
else:
print('\n Unrecognized command, try again')
def list_files(self):
file_list = self.get_file_list()
print('\n You are authorised to view listed files:\n')
for file in file_list:
print(' - {0}'.format(file))
def view_file(self):
self.list_files()
filename = input('\n Which file you want to view? > ')
file_content = self.send('file {0}'.format(filename))
if len(file_content) > 0:
plain_content = self.decrypt(file_content)
if len(plain_content) > 0:
print('\n ========================================================================================================')
print(' Content of {0}'.format(plain_content))
print(' ========================================================================================================')
else:
print('\n Seems like you have no decryption key, so you can\'t see any files.')
else:
print('\n Error while requesting file')
def admin(self):
print('\n Access to administrative functions requires additional security check.')
pin = input(' Enter your administrative PIN > ')
response = self.send('admin {0}'.format(pin))
if response == self.msg_wrong_pin:
print('\n Wrong administrative PIN. Incident will be reported.')
else:
print('\n High confidential administrative data: {0}'.format(response))
def decrypt(self, data):
try:
key = open(self.aes_key).read()
cipher = AES.new(key, AES.MODE_ECB)
plain = cipher.decrypt(standard_b64decode(data)).decode('UTF-8')
plain = plain[:-ord(plain[-1])]
return plain
except Exception as ex:
return ''
def get_last_login(self):
return self.send('login')
def get_file_list(self):
files = self.send('list')
if len(files) > 0:
return files.split(',')
else:
return ['no files available']
def send(self, data):
try:
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.connect((self.host, self.port))
sock.send('{0}{1}'.format(data, self.msg_end).encode('UTF-8'))
response = self.recv_until(sock, self.msg_end)
sock.close()
return response
except Exception as ex:
return ''
def recv_until(self, sock, end):
try:
recv = sock.recv(self.buff_size).decode('utf-8')
while recv.find(end) == -1:
recv += sock.recv(self.buff_size).decode('utf-8')
return recv[:recv.find(end)]
except Exception as ex:
return ''
def cls(self):
os.system('cls' if os.name == 'nt' else 'clear')
if __name__ == '__main__':
secure_client = SecureClient()
簡(jiǎn)單瀏覽一下客戶(hù)端的邏輯,客戶(hù)端的功能有:
- 列出服務(wù)器上的文件
- 得到AES-ECB 加密的文件內(nèi)容(但是不知道密鑰)
- 如果我們能提供正確的OTP,那么可以登錄為Admin并且讀出flag
如果我們進(jìn)一步閱讀服務(wù)器的代碼,我們可以看到文件列表是硬編碼的,并不是很有用。其次我們可以看到發(fā)送來(lái)的并不完全是加密的文件,而是:
content = self.read_file(file)
response = '{0}: {1}'.format(file, content)
encrypted_response = self.encrypt(response)
在文件內(nèi)容外,起那面會(huì)加上我們提供的文件名!文件名會(huì)以一種奇怪的方式sanitized:
def sanitize(self, file):
try:
if file.find('\x00') == -1:
file_name = file
else:
file_name = file[:file.find('\x00')]
file_path = os.path.realpath('files/{0}'.format(file_name))
if file_path.startswith(os.getcwd()):
return file_path
else:
return None
可以看到只會(huì)去第一個(gè)null字節(jié)之前的內(nèi)容作為文件名。
我們可以繼續(xù)看看服務(wù)器上有哪些文件:
self.log_path = '../top_secret/server.log'
self.real_flag = '../top_secret/real.flag'
self.aes_key = '../top_secret/aes.key'
self.totp_key = 'totp.secret'
可以看到flag,aes_key 和log 都是不可達(dá)的,totp.secret文件是我們可以通過(guò)服務(wù)器請(qǐng)求到的。
現(xiàn)在讓我們來(lái)看一下admin指令。他會(huì)校驗(yàn)OTP,如果正確就會(huì)發(fā)來(lái) flag。OTP生成算法如下:
def totp(self, secret):
counter = pack('>Q', int(time()) // 30)
totp_hmac = hmac.new(secret.encode('UTF-8'), counter, sha1).digest()
offset = ord(totp_hmac[19]) & 15
totp_pin = str((unpack('>I', totp_hmac[offset:offset + 4])[0] & 0x7fffffff) % 1000000)
return totp_pin.zfill(6)
該算法是基于時(shí)間的,我們可以通過(guò)get time 指令來(lái)獲得服務(wù)器上的時(shí)間。因此唯一未知的值就是secret,該值是從totp.secret文件中讀取的。如果我們能獲得該文件的內(nèi)容就可以計(jì)算出正確的OTP,從而以admin身份獲得flag。
在之前我們提到我們獲得的不僅是文件的內(nèi)容,還有文件名,而且我們可以在文件名之后加任意的nullbyte,仍能夠讀到正確的文件。
我們可以用上述的性質(zhì)來(lái)恢復(fù)加密的內(nèi)容!這是因?yàn)槲覀兛梢栽诳梢钥刂魄熬Y的情況下解密任何AES-ECB密文的后綴。思路如下:
- 將第一個(gè)塊隨意的填充,只留最后一個(gè)字節(jié)為可變的
- 窮舉256種情況,保持相同的前綴,只有最后一個(gè)字節(jié)不同,并加密,得到對(duì)應(yīng)的256種密文
- 對(duì)于我們想要解密的后綴suffix,讓其第一字節(jié)為第一個(gè)塊的最后一個(gè)字節(jié),第一個(gè)塊與第2步保持相同的前綴,并加密
- 把第三個(gè)塊加密的結(jié)果同第二步的256種情況進(jìn)行比較,即可會(huì)的后綴的第一個(gè)字節(jié)
- 不停重復(fù)上述過(guò)程,每求出一個(gè)字節(jié),就在第一步種左移一個(gè)字節(jié),讓待求的字節(jié)占據(jù)第一個(gè)塊的最后一個(gè)字節(jié),這樣每窮舉256次即可求出一個(gè)字節(jié)。我們也可以擴(kuò)展這個(gè)方法來(lái)求超出一個(gè)塊長(zhǎng)度的后綴。
代碼如下:
def brute_ecb_suffix(encrypt_function, block_size=16, expected_suffix_len=32, pad_char='A'):
suffix = ""
recovery_block = expected_suffix_len / block_size - 1
for i in range(expected_suffix_len - len(suffix) - 1, -1, -1):
data = pad_char * i
correct = chunk(encrypt_function(data), block_size)[recovery_block]
for character in range(256):
c = chr(character)
test = data + suffix + c
try:
encrypted = chunk(encrypt_function(test), block_size)[recovery_block]
if correct == encrypted:
suffix += c
print('FOUND', expected_suffix_len - i, c)
break
except:
pass
return suffix
對(duì)于這道題而言,應(yīng)該結(jié)合如下加密函數(shù):
def encrypt(pad):
return send("file ../totp.secret\0\0" + pad).decode("base64")[16:]
這樣就能獲得totp.secret,并計(jì)算出OTP,使用admin指令
def main():
# secret = brute_ecb_suffix(encrypt, 16, 64, '\0')[2:]
secret = '0b25610980900cffe65bfa11c41512e28b0c96881a939a2d'
result = send('login')
time = int(result)
print(send('admin ' + totp(secret, time)))