钉钉事件订阅回调的加密

在学习python做个钉钉的后台 研究了几天 事件订阅的解密是完成了 但是加密返回一直失败

# -*- coding:utf-8 -*-
import io, base64, binascii, hashlib, string, struct, time
from random import choice
from Crypto.Cipher import AES


class DingTalkCrypto:
    def __init__(self, encodingAesKey, key):
        self.encodingAesKey = encodingAesKey
        self.key = key
        self.aesKey = base64.b64decode(self.encodingAesKey + '=')

    def encrypt(self, content):
        """
        加密
        """
        msg_len = self.length(content)
        content = self.generateRandomKey(16) + msg_len.decode() + content + self.key
        contentEncode = self.pks7encode(content)
        iv = self.aesKey[:16]
        aesEncode = AES.new(self.aesKey, AES.MODE_CBC, iv)
        aesEncrypt = aesEncode.encrypt(contentEncode)
        return base64.b64encode(aesEncrypt).decode().replace('\n', '')

    def length(self, content):
        """
        将msg_len转为符合要求的四位字节长度
        """
        l = len(content)
        return struct.pack('>l', l)

    def pks7encode(self, content):
        """
        安装 PKCS#7 标准填充字符串
        """
        l = len(content)
        output = io.StringIO()
        val = 32 - (l % 32)
        for _ in range(val):
            output.write('%02x' % val)
        return bytes(content, 'utf-8') + binascii.unhexlify(output.getvalue())

    def pks7decode(self, content):
        nl = len(content)
        val = int(binascii.hexlify(content[-1].encode()), 16)
        if val > 32:
            raise ValueError('Input is not padded or padding is corrupt')
        l = nl - val
        return content[:l]

    def decrypt(self, content):
        """
        解密数据
        """
        # 钉钉返回的消息体
        content = base64.b64decode(content)
        iv = self.aesKey[:16]  # 初始向量
        aesDecode = AES.new(self.aesKey, AES.MODE_CBC, iv)
        decodeRes = aesDecode.decrypt(content)[20:].decode().replace(self.key, '')
        # 获取去除初始向量,四位msg长度以及尾部corpid
        return self.pks7decode(decodeRes)

    def generateRandomKey(self, size,
                          chars=string.ascii_letters + string.ascii_lowercase + string.ascii_uppercase + string.digits):
        """
        生成加密所需要的随机字符串
        """
        return ''.join(choice(chars) for i in range(size))

    def generateSignature(self, nonce, timestamp, token, msg_encrypt):
        """
        生成签名
        """
        signList = ''.join(sorted([nonce, timestamp, token, msg_encrypt])).encode()
        return hashlib.sha1(signList).hexdigest()

encode_aes_key = '3qa2HWJMGOoBH7VsaF2CSu5eym6DXhvgaDUIAEegBgY'
din_corpid = 'ding7eba9a7bbb6ee6dcf5bf40eda33b7ba0'
encrypt = 'sivpiIr22aO0ZojePMFzV8LJ7slaJ0BCSzwsMwE18SW1h2M3LHExNCPI94KdImE/53d23jiwWX/fzxIt1w456A=='
# 调用上面的工具类
dtc = DingTalkCrypto(encode_aes_key, din_corpid) 
# encode_aes_key 为秘钥
# din_corpid 为企业corpid
# ----------------------------
# 解密
msg = dtc.decrypt(encrypt)  # encrypt为钉钉回调返回的消息体
print(msg)   # 打印消息


# 加密




from flask import Flask, request, jsonify

app = Flask(__name__)

@app.route('/your_get_endpoint', methods=['POST'])
def process_get_request():
    # 获取POST请求中的JSON数据
    json_data = request.get_json()

    # 提取encrypt参数
    encrypt = json_data.get('encrypt', None)

    # 输出encrypt参数值
    
    msg = dtc.decrypt(encrypt)

    # 返回JSON响应
    encrypt = dtc.encrypt('success')
    timestamp = str(int(round(time.time())))
    nonce = dtc.generateRandomKey(8)
    token = 'YoHVDHii8KedR'
    signature = dtc.generateSignature(nonce, timestamp, token, encrypt)

    new_data = {
    'data': {
        "msg_signature":signature,
        "timeStamp":timestamp,
        "nonce":nonce,
        "encrypt":encrypt
    }
    }
    print(new_data)
    return new_data


if __name__ == '__main__':
    app.run(host='0.0.0.0', port=5000)


平台一直说请求结果校验失败
HTTP请求结果校验返回字段值失败 HttpRequest: curl 'http://47.115.29.137:5000/your_get_endpoint?signature=3385759c98df4d0b3aac6a3495c3428756de0ff0&msg_signature=3385759c98df4d0b3aac6a3495c3428756de0ff0×tamp=1681720971144&nonce=cAGCyZAU%27 -d '{"encrypt":"iJNlzsMyeTZYSZMpmx0ZBG1yxyx7euT8cBwRJ0AR+aTiIVfPd182YznO2ouTDz+wJ+Y4PM+06E/WvWUb40lu/mhkaMJ64ayhK1rsbOoT1du6FSiKhOHx3x/qvY3ocrWU"}' -H 'Content-Type:application/json' HttpCode:200 HttpReponse:

有没有懂的提示一下,到底返回的结果哪里有错,这个钉钉也不提示是哪里错误,就告诉你校验失败

python处理钉钉回调事件中的消息体加解密

可以借鉴下
https://blog.csdn.net/weixin_42336574/article/details/95598401

  • 这篇博客: 2021-09-29 python中curl的使用和curl的语法中的 选择http方法 部分也许能够解决你的问题, 你可以仔细阅读以下内容或跳转源博客中阅读:
  • 这里演示的是post动作,-F传递参数

    call('curl -X POST -F a=1  -F b=2 httpbin.org/post')
    
    

    在这里插入图片描述

以下答案由GPT-3.5大模型与博主波罗歌共同编写:
根据你提供的代码,看起来加密的部分可能是有问题的。你可以尝试以下的加密代码:

def encrypt(self, content):
    """
    加密
    """
    msg_len = self.length(content)
    content = self.generateRandomKey(16) + msg_len + content + self.key
    contentEncode = self.pks7encode(content)
    iv = self.aesKey[:16]
    aesEncode = AES.new(self.aesKey, AES.MODE_CBC, iv)
    aesEncrypt = aesEncode.encrypt(contentEncode)
    return base64.b64encode(aesEncrypt).decode()

这里主要做了两个改动:

  1. msg_len 不需要 decode,应该直接使用 bytes 类型。
  2. replace('\n', '') 可以省略,base64.b64encode 不会在输出中包含换行符。

另外,你在 Flask 代码中返回加密后的结果时,需要将返回值包装成 JSON 格式,才能被钉钉正确处理。你可以尝试以下代码:

from flask import jsonify
# ...
    new_data = {
        'msg_signature': signature,
        'timeStamp': timestamp,
        'nonce': nonce,
        'encrypt': encrypt
    }
    print(new_data)
    return jsonify(new_data)

注意,返回值中不需要包含 data 字段。
如果我的回答解决了您的问题,请采纳!

引用chatGPT作答,钉钉的消息体加密解密需要满足特定的格式和规则,只要有一个字符不符合规则都可能导致校验失败。你提供的代码中,加密部分没有问题,但是解密部分存在以下几个问题:

1.需要对消息体中的 "msg_len" 进行解码。在加密过程中,"msg_len" 被转换为了二进制的四位字节长度,需要在解密时再次转换成十进制数字。可以使用 struct.unpack() 方法进行解码。

2.需要对解密后的消息体去掉填充字节。填充字节是根据 PKCS#7 标准进行添加的,需要解码后进行移除,获取原始的消息体内容。

3.需要在解密时截取消息体的前 20 个字节。这 20 个字节包含了初始向量(16 个字节)和 "msg_len"(4 个字节)信息,需要将它们从消息体中移除。

以下是修复后的代码,你可以对比一下:

# -*- coding:utf-8 -*-
import io, base64, binascii, hashlib, string, struct, time
from random import choice
from Crypto.Cipher import AES


class DingTalkCrypto:
    def __init__(self, encodingAesKey, key):
        self.encodingAesKey = encodingAesKey
        self.key = key
        self.aesKey = base64.b64decode(self.encodingAesKey + '=')

    def encrypt(self, content):
        """
        加密
        """
        msg_len = self.length(content)
        content = self.generateRandomKey(16) + msg_len + content + self.key
        contentEncode = self.pks7encode(content)
        iv = self.aesKey[:16]
        aesEncode = AES.new(self.aesKey, AES.MODE_CBC, iv)
        aesEncrypt = aesEncode.encrypt(contentEncode)
        return base64.b64encode(aesEncrypt).decode().replace('\n', '')

    def length(self, content):
        """
        将msg_len转为符合要求的四位字节长度
        """
        l = len(content)
        return struct.pack('>l', l)

    def pks7encode(self, content):
        """
        安装 PKCS#7 标准填充字符串
        """
        l = len(content)
        output = io.StringIO()
        val = 32 - (l % 32)
        for _ in range(val):
            output.write('%02x' % val)
        return bytes(content, 'utf-8') + binascii.unhexlify(output.getvalue())

    def pks7decode(self, content):
        nl = len(content)
        val = int(binascii.hexlify(content[-1].encode()), 16)
        if val > 32:
            raise ValueError('Input is not padded or padding is corrupt')
        l = nl - val
        return content[:l]

    def decrypt(self, content):
        """
        解密数据
        """
        # 钉钉返回的消息体
        content = base64.b64decode(content)
        iv = self.aesKey[:16]  # 初始向量
        aesDecode = AES.new(self.aesKey, AES.MODE_CBC, iv)
decrypted = aesDecode.decrypt(content)
try:
# 去除解密后的内容中的填充字符
unpad = lambda s: s[:-ord(s[len(s)-1:])]
content = unpad(decrypted).decode('utf-8')
return content
except Exception as e:
print("解密失败: {}".format(str(e)))
return None

def send_message(self, message):
    """
    发送钉钉机器人消息
    """
    url = self.webhook
    headers = {'Content-Type': 'application/json;charset=utf-8'}
    data = {
        "msgtype": "text",
        "text": {
            "content": message
        }
    }
    r = requests.post(url, headers=headers, json=data)
    if r.status_code != 200:
        print("发送失败: {}".format(r.text))
    else:
        print("发送成功")