49 lines
1.3 KiB
Python
49 lines
1.3 KiB
Python
import base64
|
|
import hashlib
|
|
import hmac
|
|
import logging
|
|
import time
|
|
import urllib.parse
|
|
|
|
import requests
|
|
|
|
logger = logging.getLogger('apps')
|
|
|
|
|
|
def get_timestamp_and_sign(secret: str):
|
|
timestamp = str(round(time.time() * 1000))
|
|
secret_enc = secret.encode('utf-8')
|
|
string_to_sign = f'{timestamp}\n{secret}'
|
|
string_to_sign_enc = string_to_sign.encode('utf-8')
|
|
hmac_code = hmac.new(secret_enc, string_to_sign_enc, digestmod=hashlib.sha256).digest()
|
|
sign = urllib.parse.quote_plus(base64.b64encode(hmac_code))
|
|
return timestamp, sign
|
|
|
|
|
|
class DingDing(object):
|
|
|
|
@classmethod
|
|
def send_message(cls, text, webhook, secret):
|
|
headers = {
|
|
"Content-Type": "application/json",
|
|
"Charset": "UTF-8"
|
|
}
|
|
# 构建请求数据
|
|
data = {
|
|
"msgtype": "text",
|
|
"text": {
|
|
"content": text
|
|
},
|
|
"at": {
|
|
"isAtAll": True
|
|
}
|
|
}
|
|
|
|
timestamp, sign = get_timestamp_and_sign(secret)
|
|
url = f'{webhook}×tamp={timestamp}&sign={sign}'
|
|
|
|
res = requests.post(json=data, url=url, headers=headers)
|
|
logger.info(f'钉钉消息发送--->{res.json()}')
|
|
res = res.json()
|
|
return res.get('errcode') == 0
|