企业微信pc版的栏目中不定时会推送信息,这个信息为文本,如何接收信息就保存文本。
接受信息
@app.route('/blackcat/v1/receive_task', methods=['POST', 'GET'])
def receive():
try:
auth_verify = AuthVerify()
if request.method == "POST":
msg_signature = request.args.get('msg_signature')
timestamp = request.args.get('timestamp')
nonce = request.args.get('nonce')
request_data = request.data
param = auth_verify.verifi_content(msg_signature, timestamp, nonce, request_data)
submit_task_receive.delay(param)
if request.method == "GET":
建立一个对应消息处理机制,即收到什么信息,做什么处理
如发送消息
可用下面代码
发送信息
#! /usr/bin/env python
# -*- coding: UTF-8 -*-
import requests, sys
class SendWeiXinWork():
def __init__(self):
self.CORP_ID = "xxx" # 企业号的标识
self.SECRET = "xxx" # 管理组凭证密钥
self.AGENT_ID = xxx # 应用ID
self.token = self.get_token()
def get_token(self):
url = "https://qyapi.weixin.qq.com/cgi-bin/gettoken"
data = {
"corpid": self.CORP_ID,
"corpsecret": self.SECRET
}
req = requests.get(url=url, params=data)
res = req.json()
if res['errmsg'] == 'ok':
return res["access_token"]
else:
return res
def send_message(self, to_user, content):
url = "https://qyapi.weixin.qq.com/cgi-bin/message/send?access_token=%s" % self.token
data = {
# "touser": to_user, # 发送个人就填用户账号
"toparty": to_user, # 发送组内成员就填部门ID
"msgtype": "text",
"agentid": self.AGENT_ID,
"text": {"content": content},
"safe": "0"
}
req = requests.post(url=url, json=data)
res = req.json()
if res['errmsg'] == 'ok':
print("send message sucessed")
return "send message sucessed"
else:
return res
if __name__ == '__main__':
SendWeiXinWork = SendWeiXinWork()
SendWeiXinWork.send_message("2", "测试a")
这个不会,学习了
解决方法
直接获取企业微信信息推送到微信,
再根据自己的需求实际修改下即可
代码如下:
import json
import requests
# 发送的消息
message = '我就试一下'
def wx_push(message):
touser = '账号' # 多个接收者用 | 分隔
agentid = 'AgentId'
secret = 'Secret'
corpid = '企业ID'
json_dict = {
"touser": touser,
"msgtype": "text",
"agentid": agentid,
"text": {
"content": message
},
"safe": 0,
"enable_id_trans": 0,
"enable_duplicate_check": 0,
"duplicate_check_interval": 1800
}
response = requests.get(
f"https://qyapi.weixin.qq.com/cgi-bin/gettoken?corpid={corpid}&corpsecret={secret}")
data = json.loads(response.text)
access_token = data['access_token']
json_str = json.dumps(json_dict)
response_send = requests.post(
f"https://qyapi.weixin.qq.com/cgi-bin/message/send?access_token={access_token}", data=json_str)
return json.loads(response_send.text)['errmsg'] == 'ok'
wx_push(message)
如有问题及时沟通