在学习python做个钉钉的后台 研究了几天 事件订阅的解密是完成了 但是加密返回一直失败
# -*- coding:utf-8 -*-
import io, base64, binascii, hashlib, string, struct, time
from random import choice
from Crypto.Cipher import AES
class DingTalkCrypto:
def __init__(self, encodingAesKey, key):
self.encodingAesKey = encodingAesKey
self.key = key
self.aesKey = base64.b64decode(self.encodingAesKey + '=')
def encrypt(self, content):
"""
加密
"""
msg_len = self.length(content)
content = self.generateRandomKey(16) + msg_len.decode() + content + self.key
contentEncode = self.pks7encode(content)
iv = self.aesKey[:16]
aesEncode = AES.new(self.aesKey, AES.MODE_CBC, iv)
aesEncrypt = aesEncode.encrypt(contentEncode)
return base64.b64encode(aesEncrypt).decode().replace('\n', '')
def length(self, content):
"""
将msg_len转为符合要求的四位字节长度
"""
l = len(content)
return struct.pack('>l', l)
def pks7encode(self, content):
"""
安装 PKCS#7 标准填充字符串
"""
l = len(content)
output = io.StringIO()
val = 32 - (l % 32)
for _ in range(val):
output.write('%02x' % val)
return bytes(content, 'utf-8') + binascii.unhexlify(output.getvalue())
def pks7decode(self, content):
nl = len(content)
val = int(binascii.hexlify(content[-1].encode()), 16)
if val > 32:
raise ValueError('Input is not padded or padding is corrupt')
l = nl - val
return content[:l]
def decrypt(self, content):
"""
解密数据
"""
# 钉钉返回的消息体
content = base64.b64decode(content)
iv = self.aesKey[:16] # 初始向量
aesDecode = AES.new(self.aesKey, AES.MODE_CBC, iv)
decodeRes = aesDecode.decrypt(content)[20:].decode().replace(self.key, '')
# 获取去除初始向量,四位msg长度以及尾部corpid
return self.pks7decode(decodeRes)
def generateRandomKey(self, size,
chars=string.ascii_letters + string.ascii_lowercase + string.ascii_uppercase + string.digits):
"""
生成加密所需要的随机字符串
"""
return ''.join(choice(chars) for i in range(size))
def generateSignature(self, nonce, timestamp, token, msg_encrypt):
"""
生成签名
"""
signList = ''.join(sorted([nonce, timestamp, token, msg_encrypt])).encode()
return hashlib.sha1(signList).hexdigest()
encode_aes_key = '3qa2HWJMGOoBH7VsaF2CSu5eym6DXhvgaDUIAEegBgY'
din_corpid = 'ding7eba9a7bbb6ee6dcf5bf40eda33b7ba0'
encrypt = 'sivpiIr22aO0ZojePMFzV8LJ7slaJ0BCSzwsMwE18SW1h2M3LHExNCPI94KdImE/53d23jiwWX/fzxIt1w456A=='
# 调用上面的工具类
dtc = DingTalkCrypto(encode_aes_key, din_corpid)
# encode_aes_key 为秘钥
# din_corpid 为企业corpid
# ----------------------------
# 解密
msg = dtc.decrypt(encrypt) # encrypt为钉钉回调返回的消息体
print(msg) # 打印消息
# 加密
from flask import Flask, request, jsonify
app = Flask(__name__)
@app.route('/your_get_endpoint', methods=['POST'])
def process_get_request():
# 获取POST请求中的JSON数据
json_data = request.get_json()
# 提取encrypt参数
encrypt = json_data.get('encrypt', None)
# 输出encrypt参数值
msg = dtc.decrypt(encrypt)
# 返回JSON响应
encrypt = dtc.encrypt('success')
timestamp = str(int(round(time.time())))
nonce = dtc.generateRandomKey(8)
token = 'YoHVDHii8KedR'
signature = dtc.generateSignature(nonce, timestamp, token, encrypt)
new_data = {
'data': {
"msg_signature":signature,
"timeStamp":timestamp,
"nonce":nonce,
"encrypt":encrypt
}
}
print(new_data)
return new_data
if __name__ == '__main__':
app.run(host='0.0.0.0', port=5000)
平台一直说请求结果校验失败
HTTP请求结果校验返回字段值失败 HttpRequest: curl 'http://47.115.29.137:5000/your_get_endpoint?signature=3385759c98df4d0b3aac6a3495c3428756de0ff0&msg_signature=3385759c98df4d0b3aac6a3495c3428756de0ff0×tamp=1681720971144&nonce=cAGCyZAU%27 -d '{"encrypt":"iJNlzsMyeTZYSZMpmx0ZBG1yxyx7euT8cBwRJ0AR+aTiIVfPd182YznO2ouTDz+wJ+Y4PM+06E/WvWUb40lu/mhkaMJ64ayhK1rsbOoT1du6FSiKhOHx3x/qvY3ocrWU"}' -H 'Content-Type:application/json' HttpCode:200 HttpReponse:
有没有懂的提示一下,到底返回的结果哪里有错,这个钉钉也不提示是哪里错误,就告诉你校验失败
python处理钉钉回调事件中的消息体加解密
可以借鉴下
https://blog.csdn.net/weixin_42336574/article/details/95598401
这里演示的是post动作,-F传递参数
call('curl -X POST -F a=1 -F b=2 httpbin.org/post')
以下答案由GPT-3.5大模型与博主波罗歌共同编写:
根据你提供的代码,看起来加密的部分可能是有问题的。你可以尝试以下的加密代码:
def encrypt(self, content):
"""
加密
"""
msg_len = self.length(content)
content = self.generateRandomKey(16) + msg_len + content + self.key
contentEncode = self.pks7encode(content)
iv = self.aesKey[:16]
aesEncode = AES.new(self.aesKey, AES.MODE_CBC, iv)
aesEncrypt = aesEncode.encrypt(contentEncode)
return base64.b64encode(aesEncrypt).decode()
这里主要做了两个改动:
msg_len
不需要 decode,应该直接使用 bytes 类型。replace('\n', '')
可以省略,base64.b64encode
不会在输出中包含换行符。另外,你在 Flask 代码中返回加密后的结果时,需要将返回值包装成 JSON 格式,才能被钉钉正确处理。你可以尝试以下代码:
from flask import jsonify
# ...
new_data = {
'msg_signature': signature,
'timeStamp': timestamp,
'nonce': nonce,
'encrypt': encrypt
}
print(new_data)
return jsonify(new_data)
注意,返回值中不需要包含 data
字段。
如果我的回答解决了您的问题,请采纳!
引用chatGPT作答,钉钉的消息体加密解密需要满足特定的格式和规则,只要有一个字符不符合规则都可能导致校验失败。你提供的代码中,加密部分没有问题,但是解密部分存在以下几个问题:
1.需要对消息体中的 "msg_len" 进行解码。在加密过程中,"msg_len" 被转换为了二进制的四位字节长度,需要在解密时再次转换成十进制数字。可以使用 struct.unpack() 方法进行解码。
2.需要对解密后的消息体去掉填充字节。填充字节是根据 PKCS#7 标准进行添加的,需要解码后进行移除,获取原始的消息体内容。
3.需要在解密时截取消息体的前 20 个字节。这 20 个字节包含了初始向量(16 个字节)和 "msg_len"(4 个字节)信息,需要将它们从消息体中移除。
以下是修复后的代码,你可以对比一下:
# -*- coding:utf-8 -*-
import io, base64, binascii, hashlib, string, struct, time
from random import choice
from Crypto.Cipher import AES
class DingTalkCrypto:
def __init__(self, encodingAesKey, key):
self.encodingAesKey = encodingAesKey
self.key = key
self.aesKey = base64.b64decode(self.encodingAesKey + '=')
def encrypt(self, content):
"""
加密
"""
msg_len = self.length(content)
content = self.generateRandomKey(16) + msg_len + content + self.key
contentEncode = self.pks7encode(content)
iv = self.aesKey[:16]
aesEncode = AES.new(self.aesKey, AES.MODE_CBC, iv)
aesEncrypt = aesEncode.encrypt(contentEncode)
return base64.b64encode(aesEncrypt).decode().replace('\n', '')
def length(self, content):
"""
将msg_len转为符合要求的四位字节长度
"""
l = len(content)
return struct.pack('>l', l)
def pks7encode(self, content):
"""
安装 PKCS#7 标准填充字符串
"""
l = len(content)
output = io.StringIO()
val = 32 - (l % 32)
for _ in range(val):
output.write('%02x' % val)
return bytes(content, 'utf-8') + binascii.unhexlify(output.getvalue())
def pks7decode(self, content):
nl = len(content)
val = int(binascii.hexlify(content[-1].encode()), 16)
if val > 32:
raise ValueError('Input is not padded or padding is corrupt')
l = nl - val
return content[:l]
def decrypt(self, content):
"""
解密数据
"""
# 钉钉返回的消息体
content = base64.b64decode(content)
iv = self.aesKey[:16] # 初始向量
aesDecode = AES.new(self.aesKey, AES.MODE_CBC, iv)
decrypted = aesDecode.decrypt(content)
try:
# 去除解密后的内容中的填充字符
unpad = lambda s: s[:-ord(s[len(s)-1:])]
content = unpad(decrypted).decode('utf-8')
return content
except Exception as e:
print("解密失败: {}".format(str(e)))
return None
def send_message(self, message):
"""
发送钉钉机器人消息
"""
url = self.webhook
headers = {'Content-Type': 'application/json;charset=utf-8'}
data = {
"msgtype": "text",
"text": {
"content": message
}
}
r = requests.post(url, headers=headers, json=data)
if r.status_code != 200:
print("发送失败: {}".format(r.text))
else:
print("发送成功")