yzk_wechat_event/libs/kz/kz.py

80 lines
2.4 KiB
Python
Raw Normal View History

2023-12-13 11:41:22 +08:00
import hashlib
import json
import logging
import urllib.parse
from django.conf import settings
import requests
loger = logging.getLogger('apps')
class KZ:
def __init__(self, appkey=settings.KZ.get('appkey'), secret=settings.KZ.get('secret')):
self.appkey = appkey
self.secret = secret
self.base_url = 'https://cloud.kuaizhan.com/api'
def sign_top_request(self, params, secret=None):
secret = secret or self.secret
# 第一步:检查参数是否已经排序
keys = sorted(params.keys())
# 第二步:把所有参数名和参数值串在一起
query = secret
for key in keys:
value = params[key]
if isinstance(value, bool):
value = json.dumps(value)
if key and str(key).strip() and value and str(value).strip() and key != "sign":
query += key + str(value)
query += secret
# 第三步使用MD5加密
return hashlib.md5(query.encode('utf-8')).hexdigest()
def gen_short_link(
self,
link,
appkey=None,
response_format='json',
callback=None,
expire=None,
domain=None,
use_domain_pool=None,
):
link = urllib.parse.quote(link)
params = {
'appKey': appkey or self.appkey,
'link': link,
'format': response_format,
'callback': callback,
'expire': expire,
'domain': domain,
'useDomainPool': use_domain_pool,
}
sign = self.sign_top_request(params, secret=self.secret)
params['sign'] = sign
url = f'{self.base_url}/v1/km/genShortLink'
res = requests.get(url, params)
data = res.json()
if data.get('code') == 0:
return True, data.get('url')
loger.error(f'获取快码失败---->{data}')
return False, data
def change_domain_https_forward(self, site_id=8081085619, https_forward=True, domain='elmbwc.kuaizhan.com'):
"""
API-开启域名Https跳转
"""
data = {
'appKey': self.appkey,
'siteId': site_id,
'httpsForward': https_forward,
'domain': domain
}
sign = self.sign_top_request(data)
data['sign'] = sign
url = f'{self.base_url}/v1/tbk/changeDomainHttpsForward'
res = requests.post(url, data=data)
print(res.json())