Skip to content

设计一种基于pow(Proof of Work)的接口验证

pow介绍

pow是工作量证明,用于验证区块链的区块是否合法。 作用是验证矿工的工作量,保证矿工在一定时间内进行了计算工作。 从接口保护角度来看可以通过这个方式增加接口滥用者的成本。

PoW 算法要求客户端在发起请求之前进行一定的计算,服务器只在计算结果符合要求时才处理请求。该计算需要一定的时间和资源,这样可以减轻大量自动化请求的压力,增加攻击者的攻击成本。

pow的实现

服务端

生成随机挑战: 服务器生成一个随机的挑战值 challenge,可以是当前时间戳、随机数、或请求上下文的哈希值。 设定难度: 设置一个难度值 difficulty,这表示客户端需要找到一个满足特定条件的值。比如难度要求结果的哈希前缀有一定数量的零。 发送挑战: 服务器将 challenge 和 difficulty 作为响应发回客户端,请求客户端完成 PoW。 python实现

python
# 服务端生成挑战
def generate_challenge():
    timestamp = str(int(time.time()))  # 使用当前时间戳作为挑战
    challenge = hashlib.sha256(timestamp.encode()).hexdigest()
    return challenge

java实现

java
    public static String generateChallenge() {
        long timestamp = System.currentTimeMillis() / 1000; // Get current timestamp in seconds
        String challenge = String.valueOf(timestamp);

        try {
            MessageDigest digest = MessageDigest.getInstance("SHA-256");
            byte[] hash = digest.digest(challenge.getBytes());
            StringBuilder hexString = new StringBuilder();

            for (byte b : hash) {
                String hex = Integer.toHexString(0xff & b);
                if (hex.length() == 1) {
                    hexString.append('0');
                }
                hexString.append(hex);
            }
            return hexString.toString();
        } catch (NoSuchAlgorithmException e) {
            throw new RuntimeException(e);
        }
    }

js实现

javascript

async function generateChallenge() {
    const timestamp = Math.floor(Date.now() / 1000).toString(); // Get current timestamp in seconds
    const encoder = new TextEncoder();
    const data = encoder.encode(timestamp);
    
    const hashBuffer = await crypto.subtle.digest('SHA-256', data); // Generate SHA-256 hash
    const hashArray = Array.from(new Uint8Array(hashBuffer)); // Convert buffer to byte array
    const challenge = hashArray.map(b => ('0' + b.toString(16)).slice(-2)).join(''); // Convert bytes to hex string

    return challenge;
}

客户端

客户端部分: 计算 nonce: 客户端接收到 challenge 后,通过暴力搜索找到一个值 nonce,使得将 challenge + nonce 的哈希值满足服务器设定的难度要求。 提交结果: 客户端将 nonce 发送回服务器。

python实现

python
# 客户端寻找 nonce
def find_nonce(challenge, difficulty):
    nonce = 0
    while True:
        test_value = f"{challenge}{nonce}".encode()
        test_hash = hashlib.sha256(test_value).hexdigest()
        if test_hash.startswith(difficulty):
            return nonce, test_hash
        nonce += 1

java实现

java

    public static String[] findNonce(String challenge, String difficulty) throws NoSuchAlgorithmException {
        long nonce = 0;
        while (true) {
            String testValue = challenge + nonce;
            String testHash = sha256(testValue);
            if (testHash.startsWith(difficulty)) {
                return new String[]{String.valueOf(nonce), testHash};
            }
            nonce++;
        }
    }

    private static String sha256(String input) throws NoSuchAlgorithmException {
        MessageDigest md = MessageDigest.getInstance("SHA-256");
        byte[] hashBytes = md.digest(input.getBytes());
        StringBuilder hexString = new StringBuilder();
        for (byte b : hashBytes) {
            String hex = Integer.toHexString(0xff & b);
            if (hex.length() == 1) hexString.append('0');
            hexString.append(hex);
        }
        return hexString.toString();
    }

js实现

javascript
async function findNonce(challenge, difficulty) {
    let nonce = 0;
    while (true) {
        const testValue = `${challenge}${nonce}`;
        const testHash = await sha256(testValue);
        if (testHash.startsWith(difficulty)) {
            return [nonce, testHash];
        }
        nonce++;
    }
}

async function sha256(message) {
    const msgBuffer = new TextEncoder().encode(message);
    const hashBuffer = await crypto.subtle.digest('SHA-256', msgBuffer);
    const hashArray = Array.from(new Uint8Array(hashBuffer));
    const hashHex = hashArray.map(b => b.toString(16).padStart(2, '0')).join('');
    return hashHex;
}

验证部分

验证结果: 服务器验证 challenge + nonce 的哈希是否满足设定的难度。如果验证通过,服务器才处理客户端请求,否则拒绝。

python实现

python
# 服务端验证结果
def verify_pow(challenge, nonce, difficulty):
    test_value = f"{challenge}{nonce}".encode()
    test_hash = hashlib.sha256(test_value).hexdigest()
    return test_hash.startswith(difficulty)

java实现

java
    // Server-side verification of the proof of work
    public static boolean verifyPow(String challenge, String nonce, String difficulty) {
        try {
            String testValue = challenge + nonce;
            String testHash = sha256(testValue);
            return testHash.startsWith(difficulty);
        } catch (NoSuchAlgorithmException e) {
            e.printStackTrace();
            return false;
        }
    }

    // Helper method to compute SHA-256 hash
    private static String sha256(String input) throws NoSuchAlgorithmException {
        MessageDigest digest = MessageDigest.getInstance("SHA-256");
        byte[] encodedhash = digest.digest(input.getBytes(StandardCharsets.UTF_8));
        return bytesToHex(encodedhash);
    }

    // Helper method to convert bytes to hexadecimal string
    private static String bytesToHex(byte[] hash) {
        StringBuilder hexString = new StringBuilder(2 * hash.length);
        for (byte b : hash) {
            String hex = Integer.toHexString(0xff & b);
            if (hex.length() == 1) {
                hexString.append('0');
            }
            hexString.append(hex);
        }
        return hexString.toString();
    }

js实现

javascript
function verifyPow(challenge, nonce, difficulty) {
    const testValue = `${challenge}${nonce}`;
    const testHash = CryptoJS.SHA256(testValue).toString();
    return testHash.startsWith(difficulty);
}

算法优化与扩展思考

动态难度调整:根据服务器当前的负载自动调整 difficulty,负载高时增加难度,负载低时降低难度。 超时机制:可以为每个挑战设置一个过期时间,客户端需要在有效期内提交结果。 哈希算法:使用更高效的哈希算法,或者考虑使用加密原语,如 SHA3 或 Keccak。 扩展保护:可以为关键接口(如登录、表单提交)增加 PoW,防止暴力破解。 这种 PoW 机制可以帮助在一定程度上缓解恶意请求带来的压力。