import requests import logging from utils.redis_utils import RedisUtils from yzk_wechat_event.settings.constant import Constant logger = logging.getLogger('apps') class WechatWorkerUtil(object): base_url = 'https://qyapi.weixin.qq.com/cgi-bin' redis_util = RedisUtils() def __init__(self, corpid, secret): self.corpid = corpid self.secret = secret def get_access_token(self): key = f'{Constant.WECHAT_WORKER_TOKEN}{self.corpid}:{self.secret}' token = self.redis_util.get(key) if token: return token url = f'{self.base_url}/gettoken?corpid={self.corpid}&corpsecret={self.secret}' data = requests.get(url).json() if data.get('errcode') != 0: logger.error(f'获取token失败--->{data}') return token = data.get('access_token') self.redis_util.set_ex( key, Constant.WECHAT_WORKER_TOKEN_EXPIRES, token) return token def get_department_list(self, department_id=None): token = self.get_access_token() url = f'{self.base_url}/department/list?access_token={token}&id={department_id}' data = requests.get(url).json() if data.get('errcode') == 0: return True, data.get('department') logger.error(f'获取部门失败--->{data}') return False, data.get('errmsg') def get_sub_department_id(self, department_id=None, token=None): token = token or self.get_access_token() url = f'{self.base_url}/department/simplelist?access_token={token}&id={department_id}' data = requests.get(url).json() if data.get('errcode') == 0: department_list = data.get('department_id') return True, department_list return False, data.get('errmsg') def get_department_user(self, department_id, token=None): """ 获取部门成员 """ token = token or self.get_access_token() url = f'{self.base_url}/user/simplelist?access_token={token}&department_id={department_id}' data = requests.get(url).json() if data.get('errcode') == 0: user_list = data.get('userlist') return True, [user.get('userid') for user in user_list] logger.error(f'获取用户失败-->{data}') return False, data.get('errmsg') def get_department_user_detail(self, department_id, token=None): token = token or self.get_access_token() url = f'{self.base_url}/user/list?access_token={token}&department_id={department_id}' data = requests.get(url).json() if data.get('errcode') == 0: user_list = data.get('userlist') return True, user_list return False, data.get('errmsg') def get_department_user_by_ids(self, ids): token = self.get_access_token() user_ids = [] for department_id in ids: user_id_list = self.get_department_user(department_id, token) user_ids = [*user_ids, *user_id_list] return user_ids def get_dep_user_group_by_dep_id(self, ids): token = self.get_access_token() user_id_mapping = {} for department_id in ids: user_id_list = self.get_department_user(department_id, token) user_id_mapping[department_id] = user_id_list return user_id_mapping def get_user_fan_count(self, userid): token = self.get_access_token() url = f'{self.base_url}/externalcontact/list?access_token={token}&userid={userid}' data = requests.get(url).json() if data.get('errcode') == 0: external_userid = data.get('external_userid') return len(external_userid) return 0 def get_user_id_list(self, access_token=None, cursor=None): access_token = access_token or self.get_access_token() url = f'{self.base_url}/user/list_id?access_token={access_token}' data = { 'limit': 10000, 'cursor': cursor } res = requests.post(url=url, json=data).json() if res.get('errcode') == 0: return True, res.get('dept_user'), res.get('next_cursor') return False, res.get('errmsg'), None def get_user(self, user_id, access_token=None): access_token = access_token or self.get_access_token() url = f'{self.base_url}/user/get?access_token={access_token}&userid={user_id}' res = requests.get(url).json() if res.get('errcode') == 0: return res return def get_tags(self, access_token=None, body=None): access_token = access_token or self.get_access_token() url = f'{self.base_url}/externalcontact/get_corp_tag_list?access_token={access_token}' res = requests.post(url, json=body).json() if res.get('errcode') == 0: return True, res.get('tag_group') return False, res.get('errmsg') def get_external_contact_list(self, userid): access_token = self.get_access_token() url = f'{self.base_url}/externalcontact/list?access_token={access_token}&userid={userid}' res = requests.get(url).json() if res.get('errcode') == 0: return True, res.get('external_userid') return False, res.get('errmsg') def get_external_contact(self, external_userid, cursor=None): access_token = self.get_access_token() params = { 'access_token': access_token, 'external_userid': external_userid, } if cursor: params['cursor'] = cursor url = f'{self.base_url}/externalcontact/get' res = requests.get(url, params=params) res = res.json() if res.get('errcode') == 0: return True, (res.get('external_contact'), res.get('follow_user'), res.get('next_cursor')) return False, res.get('errmsg') def get_external_contact_batch_by_user(self, userid_list, cursor='', limit=100): url = f'{self.base_url}/externalcontact/batch/get_by_user?access_token={self.get_access_token()}' data = { 'userid_list': userid_list, 'cursor': cursor, 'limit': limit } res = requests.post(url, json=data).json() if res.get('errcode') == 0: return True, res.get('external_contact_list'), res.get('next_cursor') or None return False, res.get('errmsg'), None def add_contact_way( self, _type=2, scene=2, style=1, remark=None, skip_verify=True, state=None, user=None, party=None, is_temp=False, expires_in=None, chat_expires_in=None, unionid=None, is_exclusive=False, conclusions=None ): access_token = self.get_access_token() url = f'{self.base_url}/externalcontact/add_contact_way?access_token={access_token}' data = { 'type': _type, 'scene': scene, 'style': style, 'remark': remark, 'skip_verify': skip_verify, 'state': state, 'user': user, 'party': party, 'is_temp': is_temp, 'expires_in': expires_in, 'chat_expires_in': chat_expires_in, 'unionid': unionid, 'is_exclusive': is_exclusive, 'conclusions': conclusions } res = requests.post(url, json=data).json() if res.get('errcode') == 0: return True, (res.get('config_id'), res.get('qr_code')) return False, res.get('errmsg') def update_contact_way(self, config_id, user, skip_verify=True, style=1, remark=None, state=None, party=None, expires_in=None, chat_expires_in=None, unionid=None, conclusions=None): access_token = self.get_access_token() url = f'{self.base_url}/externalcontact/update_contact_way?access_token={access_token}' data = { 'config_id': config_id, 'remark': remark, 'user': user, 'skip_verify': skip_verify, 'style': style, 'state': state, 'party': party, 'expires_in': expires_in, 'chat_expires_in': chat_expires_in, 'unionid': unionid, 'conclusions': conclusions } res = requests.post(url, json=data).json() if res.get('errcode') == 0: return True, (res.get('config_id'), res.get('qr_code')) return False, res.get('errmsg') def get_external_groupchat_list(self, owner_filter=None, status_filter=0, cursor=None, limit=1000): access_token = self.get_access_token() url = f'{self.base_url}/externalcontact/groupchat/list?access_token={access_token}' data = { 'owner_filter': owner_filter, 'status_filter': status_filter, 'cursor': cursor, 'limit': limit, } res = requests.post(url, json=data).json() if res.get('errcode') == 0: return True, (res.get('group_chat_list'), res.get('next_cursor')) return False, res.get('errmsg') def get_external_groupchat(self, chat_id, need_name=1): access_token = self.get_access_token() url = f'{self.base_url}/externalcontact/groupchat/get?access_token={access_token}' data = { 'chat_id': chat_id, 'need_name': need_name, } res = requests.post(url, json=data).json() if res.get('errcode') == 0: return True, res.get('group_chat') return False, res.get('errmsg') def get_externalcontact_follow_user_list(self): access_token = self.get_access_token() url = f'{self.base_url}/externalcontact/get_follow_user_list?access_token={access_token}' res = requests.get(url=url).json() if res.get('errcode') == 0: return True, res.get('follow_user') return False, res.get('errmsg') def get_groupmsg_list_v2(self, start_time, end_time, chat_type='single', creator=None, filter_type=1, limit=100, cursor=None): """ access_token 是 调用接口凭证 chat_type 是 群发任务的类型,默认为single,表示发送给客户,group表示发送给客户群 start_time 是 群发任务记录开始时间 end_time 是 群发任务记录结束时间 creator 否 群发任务创建人企业账号id filter_type 否 创建人类型。0:企业发表 1:个人发表 2:所有,包括个人创建以及企业创建,默认情况下为所有类型 limit 否 返回的最大记录数,整型,最大值100,默认值50,超过最大值时取默认值 cursor 否 用于分页查询的游标,字符串类型,由上一次调用返回,首次调用可不填 """ access_token = self.get_access_token() url = f'{self.base_url}/externalcontact/get_groupmsg_list_v2?access_token={access_token}' data = { 'chat_type': chat_type, 'start_time': start_time, 'end_time': end_time, 'creator': creator, 'filter_type': filter_type, 'limit': limit, 'cursor': cursor, } res = requests.post(url, json=data).json() if res.get('errcode') == 0: return True, (res.get('group_msg_list'), res.get('next_cursor')) return False, (res.get('errmsg'), None) def get_groupmsg_task(self, msgid, limit=1000, cursor=None): access_token = self.get_access_token() url = f'{self.base_url}/externalcontact/get_groupmsg_task?access_token={access_token}' data = { 'msgid': msgid, 'limit': limit, 'cursor': cursor, } res = requests.post(url, json=data).json() if res.get('errcode') == 0: return True, (res.get('task_list'), res.get('next_cursor')) return False, (res.get('errmsg'), None) def get_groupmsg_send_result(self, msgid, userid, limit=1000, cursor=None): access_token = self.get_access_token() url = f'{self.base_url}/externalcontact/get_groupmsg_send_result?access_token={access_token}' data = { 'msgid': msgid, 'userid': userid, 'limit': limit, 'cursor': cursor, } res = requests.post(url, json=data).json() if res.get('errcode') == 0: return True, (res.get('send_list'), res.get('next_cursor')) return False, (res.get('errmsg'), None) def externalcontact_remark(self, userid, external_userid, remark, description=None, remark_company=None, remark_mobiles=None, remark_pic_mediaid=None): access_token = self.get_access_token() url = f'{self.base_url}/externalcontact/remark?access_token={access_token}' data = { 'userid': userid, 'external_userid': external_userid, 'remark': remark, 'description': description, 'remark_company': remark_company, 'remark_mobiles': remark_mobiles, 'remark_pic_mediaid': remark_pic_mediaid, } res = requests.post(url, json=data).json() if res.get('errcode') == 0: return True, res.get('result') return False, res.get('errmsg') def externalcontact_mark_tag(self, userid, external_userid, add_tag=None, remove_tag=None): access_token = self.get_access_token() url = f'{self.base_url}/externalcontact/mark_tag?access_token={access_token}' data = { 'userid': userid, 'external_userid': external_userid, 'add_tag': add_tag, 'remove_tag': remove_tag, } res = requests.post(url, json=data).json() if res.get('errcode') == 0: return True, res.get('result') return False, res.get('errmsg')