Introduction Link to heading

The challenge as hosted on Hack The Box entails the following:

Challenge name: Digital Safety Annex
Challenge Points: 20 points
Challenge Diff: Very Easy
Challenge Creator: [L1ynx](https://app.hackthebox.com/users/1024403)
Challenege Description: `Here at D.S.A we store all your super secret information in a secure vault until you provide us with proof you are who you say you are. We even use SHA256 instead of the weak SHA1! We are so confident, we invite all who wish to show us otherwise!`

The challenge focuses on key.

The Challenge Link to heading

In the zip file we have:

╰─[:)] ls -la
total 20
drwxr-xr-x 1 pyp pyp   72 Feb 23 13:53 .
drwxr-xr-x 1 pyp pyp  102 Mar 14 16:47 ..
-rw-r--r-- 1 pyp pyp  505 Feb 23 13:53 _account.py
-rw-r--r-- 1 pyp pyp 2191 Feb 23 13:53 _annex.py
-rw-r--r-- 1 pyp pyp 1085 Feb 23 13:53 _dsa.py
-rw-r--r-- 1 pyp pyp 5094 Feb 23 13:53 server.py

Looking at the script(s):

server.py Link to heading

from secret import FLAG, HTB_PASSWD
from _annex import Annex
import re

def show_menu():
    return input("""
Welcome to the Digital Safety Annex!
We will keep your data safe so you don't have to worry!

[0] Create Account 
[1] Store Secret
[2] Verify Secret
[3] Download Secret
[4] Developer Note
[5] Exit

[+] Option > """)

def input_number(prompt):
    n = input(prompt)
    return int(n) if n.isdigit() else None

def main():
    annex = Annex()
    annex.create_account("Admin", "5up3r_53cur3_P45sw0r6")
    annex.create_account("ElGamalSux", HTB_PASSWD)
    annex.sign("ElGamalSux", "DSA is a way better algorithm", HTB_PASSWD)
    annex.sign("Admin", "Testing signing feature", "5up3r_53cur3_P45sw0r6")
    annex.sign("ElGamalSux", "I doubt anyone could beat it", HTB_PASSWD)
    annex.sign("Admin", "I should display the user log and make sure its working", "5up3r_53cur3_P45sw0r6")
    annex.sign("ElGamalSux", "To prove it, I'm going to upload my most precious item! No one but me will be able to get it!", HTB_PASSWD)
    annex.sign("ElGamalSux", FLAG, HTB_PASSWD)

    account_username = ""

    while True:
        user_inp = show_menu()
        
        if user_inp == '0':
            account_username = annex.create_account()
        
        elif user_inp == '1':
            if not account_username:
                print("\n[!] You need to create an account with the Annex first before you can store any secrets!")
            else:
                while True:
                    message = input("\nPlease enter you super secret message: ")
                    r, s = annex.sign(account_username, message)

                    if r > 0 and s > 0:
                        break

                print(f"\n[+] Here is your signature (r, s): ({r}, {s})")
                print("Keep your signature safe!!")
        
        elif user_inp == '2':
            signature = input("\nPlease enter the signature (r,s) separated by a comma: ")
            signature = re.search(r'^(\d+),(\d+)$', signature)

            if not signature:
                print("\n[!] Sorry, need a valid signature to verify message!")
                continue
            
            message = input("\nPlease enter the message you wish to verify: ").strip()
            if not message:
                print("\n[!] Sorry, need a valid message to continue verification!")
                continue

            signature = (int(signature.group(1)), int(signature.group(2)))

            if annex.verify(message, signature):
                print("[+] Message has been successfully verified!")
            else:
                print("[!] Message could not be verified! Please make sure your signature and messages are correct!")

        elif user_inp == '3':
            uname = input("\nPlease enter the username that stored the message: ")
            if not uname in annex.vault:
                print("\n[!] Sorry, need valid existing username to download secret!")
                continue

            req_id = input("\nPlease enter the message's request id: ")
            if not req_id.isdigit() or not (0 <= int(req_id) < len(annex.vault[uname])):
                print("\n[!] Sorry, need valid request id to download secret!")
                continue
            
            req_id = int(req_id)
            if uname == account_username:
                account = annex.users[account_username]

                if account.login():
                    h, msg, sig = annex.vault[uname][req_id]
                    print(f"\n[+] Here is your message: {msg}")
                else:
                    print("[-] Invalid username and/or password!")
            else:
                k = input_number("\nPlease enter the message's nonce value: ")
                if not k:
                    print("\n[!] Sorry, need a valid nonce to download secret!")
                    continue
            
                x = input_number("\n[+] Please enter the private key: ")
                if not x:
                    print("\n[!] Sorry, need a valid private key to download secret!")
                    continue

                annex.download(x, k, req_id, uname)
        
        elif user_inp == '4':
            print("\nWe are here to prove that DSA is waaayyyy better than El Gamal!\nWe also modified our signature algorithm to use the super secure SHA-256. No way you can bypass our authentication. If you must try, be sure to bring tissues for your tears of failure.\nI'll throw you a bone, these are public record anyway:\n")

            p, q, g = annex.dsa.get_public_params()

            print(f'{p = }')
            print(f'{q = }')
            print(f'{g = }')

            inp = input("[+] Test user log (y/n): ").lower()
            if inp == 'y':
                if annex.users['Admin'].login():
                    print(f'\n{annex.user_log}')
        
        elif user_inp == '5':
            print("[!] Leaving the Annex. Thanks for choosing DSA!")
            break
        
        else:
            print("[!] Invalid option.")


if __name__ == "__main__":
    main()

We have some side scripts (modules) being used that offer some side functionality. We will look at the key ones and provide further insights. Let us dive right in:

  • We have a menu that allows us to:
    • Create an account in the Annex class. It checks whether a username already exists and requires both a username and password.
    • Store a secret. This option ensures we create a user first, it then uses the sign method to sign the message using DSA(Digital Signature Algorithm) parameters provided. It appears to be using the Diffie-Hellman protocol exchange to sign our key. We can control the range of the nonce k by manipulating the length of the username. It then signs the message hash and brings out the signature. It proceeds to log the message in the format self.vault[account.username].append((h, msg, (r, s))), also gives us back r and s
    • Verify a message and its signatures using the verify method => If the hashed message signature doesn’t match the provided message then it fails.
    • Access a message, we can access the message of any user but we need to provide their nonce (k) and private key x if it is not us. We also need a valid request id for their vault to access the data within the vault. It then downloads the data but first verifies the parameters are fine.
    • Obtain public parameters and testing for Admin login (just lists his logs)
    • Exit the application

Well in theory there is a lot that appears to go on, but I’ll focus on the key areas.

The attack Link to heading

Let us first understand the scheme in user: Digital Signature Algorithm

  1. It is an asymmetric form of encryption and decryption (meaning it uses public keys and private keys) that relies on the discrete logarithm problem and modular exponentiation.

$$private \space key = x \space$$ => x can be in a random range $$public \space key = \space g^{x} \space mod \space p$$ => g is a generator, but p is a prime number hence the public key exists within a finite field. $$random \space nonce = k$$ => k is a random nonce; it must be kept secret and unique per signature (that is why ours is in a random range)

  1. During signing, we have some few features $$r = (g^{k} \space mod \space p) \space mod \space q$$ => q is another prime number hence our value is still within a finite field. $$hash \space function = H(m)$$ => The hash function can be anything such as a sha256 hashing function. $$s = (k^{-1} \cdot (H(m) + x \space \cdot r)) \space mod \space q$$ => This is a mathematical equation that when well versed in modular calculations allows you to manipulate it accordingly.

The output is r and s which are available for us.

  1. Verification contains some few features too: $$w \cong s^{-1} \space mod \space q$$ $$u_{1} \cong H(m) \cdot w \space mod \space q$$ $$u_{2} \cong r \cdot w \space mod \space q$$ $$v \cong (g^{u_{1}} \space \cdot pub^{u_{2}}_{key} \space mod \space p) \space mod \space q$$ The value of v must be equal to r. These are the basics one requires to understand the attack we will do but let us focus on what is to come.

Let us focus on the vulnerable part:


class DSA:
    def __init__(self, key_size=2048):
        key = PrimeGenerator.generate(key_size)
        self.p, self.q = key.p, key.q
        self.x, self.y, self.g = self.generate_keys()
        self.k_min = 65500

class Account:
    def __init__(self, username="default-user", passwd=""):
        self.username = username
        self.password = sha256(passwd.encode()).digest()
        self.k_max = int(len(self.username) ** 6)
        if self.k_max < 65536:
            self.k_max += 1000000
        self.stored_msgs = 0
  • From the above, we note the following. If the k_max is lower than 65536 . The range becomes:
(65500, k_max + 1,000,000) # Very small range we can bruteforce

We have a very small range for the nonce, that we can easily bruteforce. It is computationally feasible for us to recover the nonce we know of a possible username.

Let us see how this affects the security:

elif user_inp == '3':
	uname = input("\nPlease enter the username that stored the message: ")
	if not uname in annex.vault:
		print("\n[!] Sorry, need valid existing username to download secret!")
		continue

	req_id = input("\nPlease enter the message's request id: ")
	if not req_id.isdigit() or not (0 <= int(req_id) < len(annex.vault[uname])):
		print("\n[!] Sorry, need valid request id to download secret!")
		continue
	
	req_id = int(req_id)
	if uname == account_username:
		account = annex.users[account_username]

		if account.login():
			h, msg, sig = annex.vault[uname][req_id]
			print(f"\n[+] Here is your message: {msg}")
		else:
			print("[-] Invalid username and/or password!")
	else:
		k = input_number("\nPlease enter the message's nonce value: ")
		if not k:
			print("\n[!] Sorry, need a valid nonce to download secret!")
			continue
	
		x = input_number("\n[+] Please enter the private key: ")
		if not x:
			print("\n[!] Sorry, need a valid private key to download secret!")
			continue

		annex.download(x, k, req_id, uname)
  • Here we see that to requests another user’s message we need to provide their nonce and private key. It proceeds to validate this:
def download(self, priv, nonce, req_id, username):
	h, m, sig = self.vault[username][req_id]
	
	p, q, g = self.dsa.get_public_params()

	rp = pow(g, nonce, p) % q
	sp = (pow(nonce, -1, q) * (int(h, 16) + priv * rp)) % q

	new_sig = (str(rp), str(sp))

	if new_sig == sig:
		print(f"[+] Here is your super secret message: {m}")
	else:
		print(f"[!] Invalid private key or nonce value! This attempt has been recorded!")
  • It fetches the h (hash), m (message), sig (signaure) from the users vault.
  • It will use the public parameters
  • Then user the supplied nonce to calculate a pseudo value for r
  • The pseudo value for s is also calculated from the supplied private key with the pseudo value of r.
  • The message will only be provided if we match the signatures (both r and s).

The above has a fault of logic, since we control nonce, private, rp. Since the value of r must be the same, we need to know the nonce (which we got it covered). For sp we need to recover the correct private key for the scheme. Since we have some bunch of messages:

    annex = Annex()
    annex.create_account("Admin", "5up3r_53cur3_P45sw0r6")
    annex.create_account("ElGamalSux", HTB_PASSWD)
    
    annex.sign("ElGamalSux", "DSA is a way better algorithm", HTB_PASSWD)
    annex.sign("Admin", "Testing signing feature", "5up3r_53cur3_P45sw0r6")
    annex.sign("ElGamalSux", "I doubt anyone could beat it", HTB_PASSWD)
    annex.sign("Admin", "I should display the user log and make sure its working", "5up3r_53cur3_P45sw0r6")
    annex.sign("ElGamalSux", "To prove it, I'm going to upload my most precious item! No one but me will be able to get it!", HTB_PASSWD)
    annex.sign("ElGamalSux", FLAG, HTB_PASSWD)

[SNIPPED]
	inp = input("[+] Test user log (y/n): ").lower()
	if inp == 'y':
		if annex.users['Admin'].login():
			print(f'\n{annex.user_log}')

We can recover everything for that particular user (Admin) and recover the universal private key that exists.

Proof of Concept Link to heading

Let us first look at the nonce value and how we can recover it:

sage: user_data = {tusers[0] : (65500, int(len(tusers[0]) ** 6)), tusers[1] : (65500, int(len(tusers[1]) ** 6) + 1000000)}
sage: user_data
{'ElGamalSux': (65500, 1000000), 'Admin': (65500, 1015625)}

sage: server_data = [((1198823310564388738726423815822015944396540162735471771670317374382, 8617581960463884940022600671833636061966856101645767492419589096962), 'a0aad39c9280260016dabaed8c
....: a0c16d812ed8f2ccaa79eed07908f6bc74fb48'), ((5074211728851381403862561403816386507166607496350359751310893811807, 6079219462476548854085179811795954768703390028457124286987866618315),
....: 'b61c744b656adfca5049503c898073fefce49413e072505541e78460c02345ac'), ((6502220926242497519125502743995012448020689148738997561959988030770, 7408595859826152859162307386903060366843399
....: 851654922814033478414982), '625e8f4530e14927bd3095b35917e127a056517ba11fa7570deae5485a1a8503'), ((7259862359084842021638241881816713528524689931266447219086010824891, 1248792770144665
....: 592769817463893894433441487548742762675796847684363), '0e9722da720ecebce84ce77efa3f047e7a9b1c1fb11264be7d5cc1adbb8a73a5'), ((4422239719408158035833240710299160340188033406398317870357
....: 63522248, 229087328524806697582319236597735331461244697551390029482352282418), '36f6e72c03df0167409761fa929d4574b13a7d513a903a8c49193836c0cb34cd'), ((707734469455541114437035942306240
....: 2582458988405149942149126781953287, 13331993912133513525281914797728613586837432403987353012778918310907), '520c08ed5af48f940ff2a92b919a3e7d06ffb04b7b6d154034baaf2fa39fc918')]
sage: admin_val = server_data[1] # 2nd message comes from Admin use

sage: 
....: p = 242134561560675...8052177
....: q = 13760695642377604420356033604062307536790922953086914973981531459641
....: g = 2367019128854...63598626583386026572171

sage: admin_r = admin_val[0][0]
sage: h = int(admin_val[1], 16)

sage: [k for k in range(user_data['Admin'][0], user_data['Admin'][1]) if pow(g,k,p)%q == admin_r]
[492627]

We are able to recover the nonce used for that message, even though we had to bruteforce, will take a minute or 2 minutes. But this method allows us to recover all nonces by simply allowing us to cover that range allowing us to check the value.

To recover the private key, we can simply deduce the equations: $$s \cong ((k^{-1} \space mod \space q) \cdot (H(m) \cdot x * r)) \space mod \space q$$ $$s \cong (k^{-1} \space mod \space q) \cdot (H(m) \cdot x * r) \space mod \space q$$ $$s \cdot k \cong (H(m) \cdot x * r) \space mod \space q$$ $$s \cdot k - H(m) \cong (x * r) \space mod \space q$$ $$r^{-1} \cdot (s \cdot k - H(m) \cong x \space mod \space q$$ $$\therefore \space x \cong (r^{-1} \cdot (s \cdot k - H(m)) \space mod \space q$$ That is how we recover the private key:

sage: h = int(admin_val[1], 16)
sage: admin_r = admin_val[0][0]
sage: admin_s = admin_val[0][1]
sage: 
sage: k = 492627
sage: x = (pow(admin_r,-1,q) * (admin_s * k - h) % q) % q
sage: x
12674568732834892565505953030495533518446262158276581567970812923111
sage: 1 <= x <= (q - 1)
True
sage: s = (pow(k, -1, q) * (h + x * admin_r)) % q
sage: s == admin_s
True

We recover the private key since the signatures match! With that out of our way, we can go ahead and write the solution.

The Solution Link to heading

The python script below comes into view:

#!/usr/bin/env python3

# Modules
from pwn import remote, process, args, context
from ast import literal_eval
import asyncio
import multiprocessing
from functools import partial

context.log_level = 'error'
# Functions
async def brute_nonce_chunk(params, k_min, k_max, r, chunk_size=10000):
    """Process a chunk of nonce values with early return"""
    p, q, g = params
    for nonce in range(k_min, min(k_max, k_min + chunk_size)):
        if pow(g, nonce, p) % q == r:
            return nonce
    return None


def process_chunk(params, k_min, r, chunk_size=10000):
    """Non-async version for multiprocessing"""
    p, q, g = params
    k_max = k_min + chunk_size
    for nonce in range(k_min, k_max):
        if pow(g, nonce, p) % q == r:
            return nonce
    return None


async def recover_x(q, nonce, r, s, h):
    x = (pow(r, -1, q) * (s * nonce - h) % q) % q
    return x

def formatter(number:int)->str:
    # Format => 2444...3423
    return f"{number}"[:4] + "..." + f"{number}"[-4:]

async def parallel_brute_force(params, k_max, r, num_processes=None):
    """Use multiprocessing for CPU-bound task"""
    if num_processes is None:
        num_processes = multiprocessing.cpu_count()
    
    print(f"[*] Using {num_processes} CPU cores for parallel processing")
    
    chunk_size = 100000
    with multiprocessing.Pool(processes=num_processes) as pool:
        func = partial(process_chunk, params, r=r, chunk_size=chunk_size)
        
        # Create chunks of ranges to check
        chunks_start = range(0, k_max, chunk_size)
        
        # Process chunks with early termination
        for i, result in enumerate(pool.imap_unordered(func, chunks_start)):
            if i % 10 == 0:
                print(f"[*] Processed {i * chunk_size}/{k_max} iterations...")
            if result is not None:
                pool.terminate()  # Stop all other workers
                return result
    
    return None


async def main():
    HOST, PORT = "127.0.0.1:1337".split(":")
    rc = remote(HOST, PORT) if args.REMOTE else process(["python3", "server.py"])

    # Register an account
    rc.sendlineafter(b'> ', b'0')
    rc.sendlineafter(b': ', b'admin')
    rc.sendlineafter(b': ', b'admin')

    # Recover the users data
    rc.sendlineafter(b'> ', b'4')
    p = rc.recvline_contains(b'p = ').decode().split(' = ')[1].strip()
    q = rc.recvline_contains(b'q = ').decode().split(' = ')[1].strip()
    g = rc.recvline_contains(b'g = ').decode().split(' = ')[1].strip()

    p, q, g = int(p), int(q), int(g)
    print(f"[+] p: {formatter(p)}\n[+] q: {formatter(q)}\n[+] g: {formatter(g)}")

    # Receive the Users logs
    rc.sendlineafter(b':', b'y')
    rc.sendlineafter(b':', b'5up3r_53cur3_P45sw0r6')  # Send for Admin

    users_data = literal_eval(rc.recvline_contains(b'[').decode())
    flag_message = users_data[5]  # The last message contains the flag

    r = int(flag_message[0][0])
    s = int(flag_message[0][1])
    h = int(flag_message[1], 16)

    print(f"[+] r: {formatter(r)}\n[+] s: {formatter(s)}\n[+] h: {formatter(h)}")

    # Recover the nonce
    user = 'ElGamalSux'
    k_max = len(user) ** 6
    if k_max < 65536:
        k_max += 1000000

    print(f"[*] Brute forcing nonce with a maximum of {k_max} iterations")
    
    # Use multiprocessing for the CPU-intensive task
    nonce = await parallel_brute_force((p, q, g), k_max, r)

    if not nonce:
        print("[!] Failed to recover the nonce")
        return

    print(f"\n[+] Nonce: {nonce}")

    # Recover the private key
    x = await recover_x(q, nonce, r, s, h)
    print(f"[+] Private Key: {x}")

    rc.sendlineafter(b'> ', b'3')
    rc.sendlineafter(b': ', user.encode())
    rc.sendlineafter(b': ', b'3')
    rc.sendlineafter(b': ', str(nonce).encode())
    rc.sendlineafter(b': ', str(x).encode())

    flag = literal_eval(rc.recvline_contains(b'message:').split(b': ')[-1].strip().decode())
    flag = flag.decode() if isinstance(flag, bytes) else flag
    print(f"[+] Flag: {flag}")


if __name__ == '__main__':
    asyncio.run(main())

Running it localy:

╰─[:(] % python3 sol.py

[+] p: 2287...3083
[+] q: 1534...2439
[+] g: 1980...3467
[+] r: 3135...8151
[+] s: 9415...8082
[+] h: 3711...8520
[*] Brute forcing nonce with a maximum of 1000000 iterations
[*] Using 8 CPU cores for parallel processing
[*] Processed 0/1000000 iterations...

[+] Nonce: 246668
[+] Private Key: 7477315000306870354424831117598304789270452318296472821366969323379
[+] Flag: HTB{FAKE_FLAG_FOR_DEMO_PURPOSES}

Let us run it remotely:

╰─[:)] % python3 ../sol1.py REMOTE
[+] p: 2537...5939
[+] q: 1688...1877
[+] g: 1710...7448
[+] r: 7159...6936
[+] s: 1498...5272
[+] h: 1022...0549
[*] Brute forcing nonce with a maximum of 1000000 iterations
[*] Using 8 CPU cores for parallel processing
[*] Processed 0/1000000 iterations...

[+] Nonce: 197062
[+] Private Key: 8631038904157924481272260663384257655768068000003883843717927178323
[+] Flag: HTB{1_Gu3ss_d54_15_N07_4s_s3CuR3_A5_1_7h0u647}

We get back the flag:

[+] Flag: HTB{1_Gu3ss_d54_15_N07_4s_s3CuR3_A5_1_7h0u647}

Notes:

  • By understanding the public key infrastructure of the DSA scheme, we are able to derive our modular equations in recovering the hidden paramaters.
  • For a nonce in a small range, the entire scheme is broken since recovering the nonce and having the right parameters, then recovery of the private key is feasible.

“What you get by achieving your goals is not as important as what you become by achieving your goals.” - Zig Ziglar