yzk_wechat_event/libs/wechat/worker.py

345 lines
14 KiB
Python
Raw Permalink Normal View History

2023-12-13 11:41:22 +08:00
import requests
import logging
from utils.redis_utils import RedisUtils
from yzk_wechat_event.settings.constant import Constant
logger = logging.getLogger('apps')
class WechatWorkerUtil(object):
base_url = 'https://qyapi.weixin.qq.com/cgi-bin'
redis_util = RedisUtils()
def __init__(self, corpid, secret):
self.corpid = corpid
self.secret = secret
def get_access_token(self):
key = f'{Constant.WECHAT_WORKER_TOKEN}{self.corpid}:{self.secret}'
token = self.redis_util.get(key)
if token:
return token
url = f'{self.base_url}/gettoken?corpid={self.corpid}&corpsecret={self.secret}'
data = requests.get(url).json()
if data.get('errcode') != 0:
logger.error(f'获取token失败--->{data}')
return
token = data.get('access_token')
self.redis_util.set_ex(
key, Constant.WECHAT_WORKER_TOKEN_EXPIRES, token)
return token
def get_department_list(self, department_id=None):
token = self.get_access_token()
url = f'{self.base_url}/department/list?access_token={token}&id={department_id}'
data = requests.get(url).json()
if data.get('errcode') == 0:
return True, data.get('department')
logger.error(f'获取部门失败--->{data}')
return False, data.get('errmsg')
def get_sub_department_id(self, department_id=None, token=None):
token = token or self.get_access_token()
url = f'{self.base_url}/department/simplelist?access_token={token}&id={department_id}'
data = requests.get(url).json()
if data.get('errcode') == 0:
department_list = data.get('department_id')
return True, department_list
return False, data.get('errmsg')
def get_department_user(self, department_id, token=None):
"""
获取部门成员
"""
token = token or self.get_access_token()
url = f'{self.base_url}/user/simplelist?access_token={token}&department_id={department_id}'
data = requests.get(url).json()
if data.get('errcode') == 0:
user_list = data.get('userlist')
return True, [user.get('userid') for user in user_list]
logger.error(f'获取用户失败-->{data}')
return False, data.get('errmsg')
def get_department_user_detail(self, department_id, token=None):
token = token or self.get_access_token()
url = f'{self.base_url}/user/list?access_token={token}&department_id={department_id}'
data = requests.get(url).json()
if data.get('errcode') == 0:
user_list = data.get('userlist')
return True, user_list
return False, data.get('errmsg')
def get_department_user_by_ids(self, ids):
token = self.get_access_token()
user_ids = []
for department_id in ids:
user_id_list = self.get_department_user(department_id, token)
user_ids = [*user_ids, *user_id_list]
return user_ids
def get_dep_user_group_by_dep_id(self, ids):
token = self.get_access_token()
user_id_mapping = {}
for department_id in ids:
user_id_list = self.get_department_user(department_id, token)
user_id_mapping[department_id] = user_id_list
return user_id_mapping
def get_user_fan_count(self, userid):
token = self.get_access_token()
url = f'{self.base_url}/externalcontact/list?access_token={token}&userid={userid}'
data = requests.get(url).json()
if data.get('errcode') == 0:
external_userid = data.get('external_userid')
return len(external_userid)
return 0
def get_user_id_list(self, access_token=None, cursor=None):
access_token = access_token or self.get_access_token()
url = f'{self.base_url}/user/list_id?access_token={access_token}'
data = {
'limit': 10000,
'cursor': cursor
}
res = requests.post(url=url, json=data).json()
if res.get('errcode') == 0:
return True, res.get('dept_user'), res.get('next_cursor')
return False, res.get('errmsg'), None
def get_user(self, user_id, access_token=None):
access_token = access_token or self.get_access_token()
url = f'{self.base_url}/user/get?access_token={access_token}&userid={user_id}'
res = requests.get(url).json()
if res.get('errcode') == 0:
return res
return
def get_tags(self, access_token=None, body=None):
access_token = access_token or self.get_access_token()
url = f'{self.base_url}/externalcontact/get_corp_tag_list?access_token={access_token}'
res = requests.post(url, json=body).json()
if res.get('errcode') == 0:
return True, res.get('tag_group')
return False, res.get('errmsg')
def get_external_contact_list(self, userid):
access_token = self.get_access_token()
url = f'{self.base_url}/externalcontact/list?access_token={access_token}&userid={userid}'
res = requests.get(url).json()
if res.get('errcode') == 0:
return True, res.get('external_userid')
return False, res.get('errmsg')
def get_external_contact(self, external_userid, cursor=None):
access_token = self.get_access_token()
params = {
'access_token': access_token,
'external_userid': external_userid,
}
if cursor:
params['cursor'] = cursor
url = f'{self.base_url}/externalcontact/get'
res = requests.get(url, params=params)
res = res.json()
if res.get('errcode') == 0:
2023-12-14 18:43:01 +08:00
return True, (res.get('external_contact'), res.get('follow_user'), res.get('next_cursor'))
2023-12-13 11:41:22 +08:00
return False, res.get('errmsg')
def get_external_contact_batch_by_user(self, userid_list, cursor='', limit=100):
url = f'{self.base_url}/externalcontact/batch/get_by_user?access_token={self.get_access_token()}'
data = {
'userid_list': userid_list,
'cursor': cursor,
'limit': limit
}
res = requests.post(url, json=data).json()
if res.get('errcode') == 0:
return True, res.get('external_contact_list'), res.get('next_cursor') or None
return False, res.get('errmsg'), None
def add_contact_way(
self,
_type=2,
scene=2,
style=1,
remark=None,
skip_verify=True,
state=None,
user=None,
party=None,
is_temp=False,
expires_in=None,
chat_expires_in=None,
unionid=None,
is_exclusive=False,
conclusions=None
):
access_token = self.get_access_token()
url = f'{self.base_url}/externalcontact/add_contact_way?access_token={access_token}'
data = {
'type': _type,
'scene': scene,
'style': style,
'remark': remark,
'skip_verify': skip_verify,
'state': state,
'user': user,
'party': party,
'is_temp': is_temp,
'expires_in': expires_in,
'chat_expires_in': chat_expires_in,
'unionid': unionid,
'is_exclusive': is_exclusive,
'conclusions': conclusions
}
res = requests.post(url, json=data).json()
if res.get('errcode') == 0:
return True, (res.get('config_id'), res.get('qr_code'))
return False, res.get('errmsg')
def update_contact_way(self, config_id, user, skip_verify=True, style=1, remark=None, state=None, party=None,
expires_in=None, chat_expires_in=None, unionid=None, conclusions=None):
access_token = self.get_access_token()
url = f'{self.base_url}/externalcontact/update_contact_way?access_token={access_token}'
data = {
'config_id': config_id,
'remark': remark,
'user': user,
'skip_verify': skip_verify,
'style': style,
'state': state,
'party': party,
'expires_in': expires_in,
'chat_expires_in': chat_expires_in,
'unionid': unionid,
'conclusions': conclusions
}
res = requests.post(url, json=data).json()
if res.get('errcode') == 0:
return True, (res.get('config_id'), res.get('qr_code'))
return False, res.get('errmsg')
def get_external_groupchat_list(self, owner_filter=None, status_filter=0, cursor=None, limit=1000):
access_token = self.get_access_token()
url = f'{self.base_url}/externalcontact/groupchat/list?access_token={access_token}'
data = {
'owner_filter': owner_filter,
'status_filter': status_filter,
'cursor': cursor,
'limit': limit,
}
res = requests.post(url, json=data).json()
if res.get('errcode') == 0:
return True, (res.get('group_chat_list'), res.get('next_cursor'))
return False, res.get('errmsg')
def get_external_groupchat(self, chat_id, need_name=1):
access_token = self.get_access_token()
url = f'{self.base_url}/externalcontact/groupchat/get?access_token={access_token}'
data = {
'chat_id': chat_id,
'need_name': need_name,
}
res = requests.post(url, json=data).json()
if res.get('errcode') == 0:
return True, res.get('group_chat')
return False, res.get('errmsg')
def get_externalcontact_follow_user_list(self):
access_token = self.get_access_token()
url = f'{self.base_url}/externalcontact/get_follow_user_list?access_token={access_token}'
res = requests.get(url=url).json()
if res.get('errcode') == 0:
return True, res.get('follow_user')
return False, res.get('errmsg')
def get_groupmsg_list_v2(self, start_time, end_time, chat_type='single', creator=None, filter_type=1, limit=100,
cursor=None):
"""
access_token 调用接口凭证
chat_type 群发任务的类型默认为single表示发送给客户group表示发送给客户群
start_time 群发任务记录开始时间
end_time 群发任务记录结束时间
creator 群发任务创建人企业账号id
filter_type 创建人类型0企业发表 1个人发表 2所有包括个人创建以及企业创建默认情况下为所有类型
limit 返回的最大记录数整型最大值100默认值50超过最大值时取默认值
cursor 用于分页查询的游标字符串类型由上一次调用返回首次调用可不填
"""
access_token = self.get_access_token()
url = f'{self.base_url}/externalcontact/get_groupmsg_list_v2?access_token={access_token}'
data = {
'chat_type': chat_type,
'start_time': start_time,
'end_time': end_time,
'creator': creator,
'filter_type': filter_type,
'limit': limit,
'cursor': cursor,
}
res = requests.post(url, json=data).json()
if res.get('errcode') == 0:
return True, (res.get('group_msg_list'), res.get('next_cursor'))
return False, (res.get('errmsg'), None)
def get_groupmsg_task(self, msgid, limit=1000, cursor=None):
access_token = self.get_access_token()
url = f'{self.base_url}/externalcontact/get_groupmsg_task?access_token={access_token}'
data = {
'msgid': msgid,
'limit': limit,
'cursor': cursor,
}
res = requests.post(url, json=data).json()
if res.get('errcode') == 0:
return True, (res.get('task_list'), res.get('next_cursor'))
return False, (res.get('errmsg'), None)
def get_groupmsg_send_result(self, msgid, userid, limit=1000, cursor=None):
access_token = self.get_access_token()
url = f'{self.base_url}/externalcontact/get_groupmsg_send_result?access_token={access_token}'
data = {
'msgid': msgid,
'userid': userid,
'limit': limit,
'cursor': cursor,
}
res = requests.post(url, json=data).json()
if res.get('errcode') == 0:
return True, (res.get('send_list'), res.get('next_cursor'))
return False, (res.get('errmsg'), None)
2023-12-19 16:03:02 +08:00
def externalcontact_remark(self, userid, external_userid, remark, description=None, remark_company=None,
remark_mobiles=None,
remark_pic_mediaid=None):
access_token = self.get_access_token()
url = f'{self.base_url}/externalcontact/remark?access_token={access_token}'
data = {
'userid': userid,
'external_userid': external_userid,
'remark': remark,
'description': description,
'remark_company': remark_company,
'remark_mobiles': remark_mobiles,
'remark_pic_mediaid': remark_pic_mediaid,
}
res = requests.post(url, json=data).json()
if res.get('errcode') == 0:
return True, res.get('result')
return False, res.get('errmsg')
def externalcontact_mark_tag(self, userid, external_userid, add_tag=None, remove_tag=None):
access_token = self.get_access_token()
url = f'{self.base_url}/externalcontact/mark_tag?access_token={access_token}'
data = {
'userid': userid,
'external_userid': external_userid,
'add_tag': add_tag,
'remove_tag': remove_tag,
}
res = requests.post(url, json=data).json()
if res.get('errcode') == 0:
return True, res.get('result')
return False, res.get('errmsg')