from celery import shared_task from datetime import datetime, date, timedelta import logging from django.db.models import Count, Sum, When, Case, Value from apps.jqr.choices import JqrWechatbizuserinfoDeleteTypeChoices, JqrAddTypeChoices from apps.jqr.pubsub import JQRQrcodeCallbackPubSub from apps.qc.choices import QcWechatbizeventAddcontactIsDeleteChoices, QcQrcodesEditLogOperateTypeChoices, \ QcQrcodesEditLogTypeChoices from apps.qc.models import QcWechatbizeventAddcontact, QcQrcodes, QcWechatbizuserinfo, QcQrcodesEditLog, QcCorpinfo from apps.jqr.models import JqrExternalFollowUser, JqrExternalUser from apps.qc.utils import generate_qrcode_by_qrcode from apps.warning.models import QcWarningRule, WarningLog, WarningSetting from libs.wechat import WechatWorkerUtil logger = logging.getLogger('apps') @shared_task(name='save_add_contact', queue='contact') def save_add_contact(data, corpinfo, *args, **kwargs): state = data.get('state') userid = data.get('userid') if state and state.startswith('mg') and '_' in state: [_, _, qrcodeid] = state.split('_') data['qrcodeid'] = qrcodeid externaluserid = data.get('externaluserid') corpid = data.get('corpid') QcWechatbizeventAddcontact.objects.update_or_create( userid=userid, externaluserid=externaluserid, corpid=corpid, defaults={ **data, 'isdelete': QcWechatbizeventAddcontactIsDeleteChoices.NOT_DELETE, 'deletetime': None, 'ctime': datetime.now(), } ) qrcodeid = data.get('qrcodeid') if qrcodeid: check_qrcode.delay(qrcodeid, userid, corpinfo) alarm_userid_by_qrcodeid.delay(qrcodeid) # 打标签备注 tag_remark_contact(qrcodeid, userid, externaluserid, corpinfo) # 修复客户关系 edit_add_contact(data, corpinfo, *args, **kwargs) # TODO 请求API 发送消息 def save_add_contact_by_channel(data, corpinfo, *args, **kwargs): userid = data.get('userid') externaluserid = data.get('externaluserid') corpid = data.get('corpid') QcWechatbizeventAddcontact.objects.update_or_create( userid=userid, externaluserid=externaluserid, corpid=corpid, defaults={ **data, 'isdelete': QcWechatbizeventAddcontactIsDeleteChoices.NOT_DELETE, 'deletetime': None, 'ctime': datetime.now(), } ) qrcodeid = data.get('qrcodeid') if qrcodeid: JQRQrcodeCallbackPubSub.publish({ 'qrcodeid': qrcodeid, 'userid': userid, 'externaluserid': externaluserid, 'corpinfo': corpinfo, 'handler': f'{qrcode_channel_handler.__module__}.{qrcode_channel_handler.__name__}' }) # 修复客户关系 edit_add_contact(data, corpinfo, *args, **kwargs) # TODO 请求API 发送消息 @shared_task(name='edit_add_contact', queue='contact') def edit_add_contact(data, corpinfo, *args, **kwargs): # 更新 关系表 externaluserid = data.get('externaluserid') corpid = corpinfo.get('corpid') appsecret = corpinfo.get('appsecret') wechat_worker = WechatWorkerUtil(corpid, appsecret) cursor = '' while cursor is not None: success, data = wechat_worker.get_external_contact(externaluserid, cursor=cursor) if not success: logger.error(f'获取外部联系人信息失败,{data}') return (external_contact, follow_user, cursor) = data JqrExternalUser.objects.update_or_create( corpid=corpid, external_userid=externaluserid, defaults={ **external_contact, 'addtype': JqrAddTypeChoices.EVENT_CALLBACK, 'deletetype': None, 'dtime': None } ) for follow_info in follow_user: userid = follow_info.get('userid') follow_info['tags'] = [tag.get('tag_id') for tag in follow_info.get('tags', [])] JqrExternalFollowUser.objects.update_or_create( userid=userid, corpid=corpid, external_userid=externaluserid, defaults={ **follow_info, 'addtype': JqrAddTypeChoices.EVENT_CALLBACK, 'deletetype': None, 'dtime': None } ) def edit_add_contact_by_channel(data, corpinfo, *args, **kwargs): # 更新 关系表 externaluserid = data.get('externaluserid') corpid = corpinfo.get('corpid') appsecret = corpinfo.get('appsecret') wechat_worker = WechatWorkerUtil(corpid, appsecret) cursor = '' while cursor is not None: success, data = wechat_worker.get_external_contact(externaluserid, cursor=cursor) if not success: logger.error(f'获取外部联系人信息失败,{data}') return (external_contact, follow_user, cursor) = data JqrExternalUser.objects.update_or_create( corpid=corpid, external_userid=externaluserid, defaults={ **external_contact, 'addtype': JqrAddTypeChoices.EVENT_CALLBACK, 'deletetype': None, 'dtime': None } ) for follow_info in follow_user: userid = follow_info.get('userid') follow_info['tags'] = [tag.get('tag_id') for tag in follow_info.get('tags', [])] JqrExternalFollowUser.objects.update_or_create( userid=userid, corpid=corpid, external_userid=externaluserid, defaults={ **follow_info, 'addtype': JqrAddTypeChoices.EVENT_CALLBACK, 'deletetype': None, 'dtime': None } ) @shared_task(name='delete_add_contact', queue='contact') def delete_add_contact(data, corpinfo, *args, **kwargs): userid = data.get('userid') externaluserid = data.get('externaluserid') corpid = data.get('corpid') agentid = data.get('agentid') # 更新add_contact 表删除字段 QcWechatbizeventAddcontact.objects.filter(userid=userid, externaluserid=externaluserid, corpid=corpid, agentid=agentid).update( isdelete=QcWechatbizeventAddcontactIsDeleteChoices.DELETED, deletetime=datetime.now() ) JqrExternalFollowUser.objects.filter(userid=userid, external_userid=externaluserid, corpid=corpid).update( deletetype=JqrWechatbizuserinfoDeleteTypeChoices.EVENT_CALLBACK, dtime=datetime.now() ) def delete_add_contact_by_channel(data, corpinfo, *args, **kwargs): userid = data.get('userid') externaluserid = data.get('externaluserid') corpid = data.get('corpid') agentid = data.get('agentid') # 更新add_contact 表删除字段 QcWechatbizeventAddcontact.objects.filter(userid=userid, externaluserid=externaluserid, corpid=corpid, agentid=agentid).update( isdelete=QcWechatbizeventAddcontactIsDeleteChoices.DELETED, deletetime=datetime.now() ) JqrExternalFollowUser.objects.filter(userid=userid, external_userid=externaluserid, corpid=corpid).update( deletetype=JqrWechatbizuserinfoDeleteTypeChoices.EVENT_CALLBACK, dtime=datetime.now() ) @shared_task(name='check_qrcode', queue='qrcode') def check_qrcode(qrcodeid, userid, corpinfo): qrcode = QcQrcodes.objects.get(id=qrcodeid) users = qrcode.userids if not users: return online_userid_map = {user.get('userid'): user for user in users if user.get('isonline') == QcQrcodesEditLogOperateTypeChoices.OFFLINE} corpid = qrcode.corpid userinfo = QcWechatbizuserinfo.objects.get(corpid=corpid, userid=userid) today = date.today() # 账号当前活码的接粉情况 count_info = QcWechatbizeventAddcontact.objects.filter(corpid=corpid, userid=userid).values( 'corpid', 'userid').aggregate( total_count=Count('*'), today_count=Sum( Case( When(ctime__date=today, then=Value(1)), default=Value(0), ) ), qrcode_count=Sum( Case( When(qrcodeid=qrcodeid, then=Value(1)), default=Value(0), ) ), qrcode_today_count=Sum( Case( When(qrcodeid=qrcodeid, ctime__date=today, then=Value(1)), default=Value(0), ) ) ) # 该账号总共加了多少人 total_count = count_info.get('total_count') # 该账号今天加了多少人 today_count = count_info.get('today_count') # 该账号当前活码加了多少人 qrcode_count = count_info.get('qrcode_count') # 该账号今天当前活码加了多少人 qrcode_today_count = count_info.get('qrcode_today_count') # 该用户所有活码每日新增好友上限 limitaddusercount = userinfo.limitaddusercount # 该用户总加粉总上限 totallimitaddusercount = userinfo.totallimitaddusercount # 该用户当前活码上限 user_qrcode_limitcount = online_userid_map.get(userid, {}).get('limitcount') # 该账号是否需要下线 need_offline = False warning_text_list = [] # 当前接粉号 current_user = online_userid_map.get(userid, {}) # 该账号今天当前活码加的人数 大于 该用户当前活码上限 if user_qrcode_limitcount is not None and qrcode_today_count >= user_qrcode_limitcount > 0: warning_text_list.append( f'账号 {current_user.get("username")}-{current_user.get("alias")}-{userid} 今天 {qrcode.name}活码加的人数: {qrcode_today_count} 大于 该用户当前活码上限: {user_qrcode_limitcount}') need_offline = True if total_count is not None and total_count >= totallimitaddusercount > 0: warning_text_list.append( f'账号 {current_user.get("username")}-{current_user.get("alias")}-{userid} 总共加了 {total_count} 人 大于 该用户总共加粉上限: {totallimitaddusercount}') need_offline = True if today_count is not None and today_count >= limitaddusercount > 0: warning_text_list.append( f'账号 {current_user.get("username")}-{current_user.get("alias")}-{userid} 今天加了 {today_count} 人 大于 该用户今天加粉上限: {limitaddusercount}') need_offline = True if need_offline: warning_setting_id = qrcode.warning_setting_id # 最后一个接粉号不下线 if len(online_userid_map) > 1: for user in users: if user.get('userid') == userid: user['isonline'] = QcQrcodesEditLogOperateTypeChoices.OFFLINE user['endtime'] = datetime.now().strftime('%Y-%m-%d %H:%M:%S') qrcode.save() # 重新生成活码, 请求企微api更新活码 generate_qrcode_by_qrcode(qrcode, corpinfo, is_update=True) # 生成活码修改记录 QcQrcodesEditLog.objects.create( userid=userid, uid=qrcode.uid, corpid=qrcode.corpid, agentid=qrcode.agentid, qrcodeid=qrcode.pk, detail={ 'totallimit': totallimitaddusercount, 'daliylimit': limitaddusercount, 'qrcodelimit': user_qrcode_limitcount, 'addcount': total_count, 'qrcodeaddcount': qrcode_count, 'qrcode_today_count': qrcode_today_count, }, type=QcQrcodesEditLogTypeChoices.AUTO, operatetype=QcQrcodesEditLogOperateTypeChoices.OFFLINE, ) WarningLog.objects.create( text=','.join(warning_text_list), uid=qrcode.uid, log_info={ 'corpid': corpid, 'userid': userid, }, warning_setting_id=warning_setting_id, ) @shared_task(name='alarm_userid_by_qrcodeid', queue='qrcode') def alarm_userid_by_qrcodeid(qrcodeid): qrcode = QcQrcodes.objects.get(id=qrcodeid) users = qrcode.userids if not users: return # 活码成员报警检测 checkminutes = qrcode.checkminutes now = datetime.now() checkminutes_ago = now - timedelta(minutes=checkminutes) online_userid_map = {user.get('userid'): user for user in users if user.get('isonline') == QcQrcodesEditLogOperateTypeChoices.OFFLINE} need_check_userid_map = {} for userid, user in online_userid_map.items(): starttime = user.get('starttime') if starttime is None: continue try: starttime = datetime.fromisoformat(starttime) if starttime < checkminutes_ago: need_check_userid_map[userid] = user except Exception as e: logger.error(e) continue if len(need_check_userid_map.keys()) == 0: return corpid = qrcode.corpid # 当前活码在线接粉号的接粉情况 user_count_info = QcWechatbizeventAddcontact.objects.filter(corpid=corpid, userid__in=list(need_check_userid_map.keys()), qrcodeid=qrcodeid, createtime__gt=checkminutes_ago.timestamp()).values( 'corpid', 'userid').annotate( qrcode_count=Count('*'), ).values('qrcode_count', 'userid') need_check_user_count = len(need_check_userid_map.keys()) total_qrcode_count = sum([user_count.get('qrcode_count') for user_count in user_count_info]) average_qrcode_count = total_qrcode_count / need_check_user_count offlinechecknum = qrcode.offlinechecknum warning_logs = [] for user_count in user_count_info: qrcode_count = user_count.get('qrcode_count', 0) if (qrcode_count - average_qrcode_count) < offlinechecknum: user = need_check_userid_map.get(user_count.get('userid')) text = f"账号 {user.get('userid')} {user.get('username')} {user.get('alias')} 在渠道活码 {qrcode.name} 接粉异常" log_info = { 'corpid': corpid, 'userid': user_count.get('userid'), } warning_log = WarningLog( uid=qrcode.uid, warning_setting_id=qrcode.warning_setting_id, log_info=log_info, text=text, ) warning_logs.append(warning_log) WarningLog.objects.bulk_create(warning_logs) qrcode.lastchecktime = now qrcode.save() @shared_task(name='tag_remark_contact', queue='tag_remark_contact') def tag_remark_contact(qrcodeid, userid, externaluserid, corpinfo): corpid = corpinfo.get('corpid') appsecret = corpinfo.get('appsecret') qrcode = QcQrcodes.objects.get(id=qrcodeid) remark = qrcode.remark wechat_worker = WechatWorkerUtil(corpid, appsecret) if remark: # '{添加日期} -{昵称}' if '{添加日期}' in remark: now = datetime.now().strftime('%Y%m%d') remark = remark.replace('{添加日期}', now) if '{昵称}' in remark or '{性别}' in remark: success, data = wechat_worker.get_external_contact(externaluserid) if success: (external_contact, _, _) = data name = external_contact.get('name') remark = remark.replace('{昵称}', name) gender = external_contact.get('gender') # gender 外部联系人性别 0-未知 1-男性 2-女性 if '{性别}' in remark: if gender == 1: remark = remark.replace('{性别}', '男') elif gender == 2: remark = remark.replace('{性别}', '女') else: remark = remark.replace('{性别}', '未知') wechat_worker.externalcontact_remark(userid, externaluserid, remark) tags = qrcode.tags if tags: tag_ids = [tag.get('id') for tag in tags] wechat_worker.externalcontact_mark_tag(userid, externaluserid, tag_ids) # 接粉号报警检测任务 @shared_task(name='check_follow_user', queue='check_follow_user') def check_follow_user(): batch_size = 1000 warning_rules = QcWarningRule.objects.filter(is_on=True, is_delete=False) count = warning_rules.count() now = datetime.now() for i in range(0, count, batch_size): for warning_rule in warning_rules[i:i + batch_size]: warning_setting_id = warning_rule.warning_setting_id users = warning_rule.userids if users and len(users) > 0: corpid = users[0]["corpid"] userid_map = {user.get('userid'): user for user in users} warning_interval = warning_rule.warning_interval now = datetime.now() warning_interval_ago = now - timedelta(minutes=warning_interval) user_count_info = QcWechatbizeventAddcontact.objects.filter(corpid=corpid, userid__in=list(userid_map.keys())).values( 'corpid', 'userid').annotate( warning_interval_count=Sum( Case( When(ctime__gte=warning_interval_ago, then=Value(1)), default=Value(0), ) ), ).values('userid', 'warning_interval_count') warning_logs = [] warning_setting = WarningSetting.objects.get(id=warning_setting_id) for user_count in user_count_info: if user_count.get('warning_interval_count') < 1: user = userid_map.get(user_count.get('userid')) text = f""" 报警名称:{warning_setting.name} 用户id: {user.get('userid')} 用户昵称:{user.get('username')} 用户别名:{user.get('alias')} 持续未进粉时间:{warning_interval}分钟 检测间隔:5分钟 本次检测时间:{now.strftime("%Y-%m-%d %H:%M:%S")} """ log_info = { 'corpid': corpid, 'userid': user_count.get('userid'), } warning_log = WarningLog( uid=warning_rule.uid, warning_setting_id=warning_setting_id, log_info=log_info, text=text, ) warning_logs.append(warning_log) WarningLog.objects.bulk_create(warning_logs) # 活码上线 @shared_task(name='online_qrcode', queue='qrcode') def online_qrcode(): logger.info('开始活码上线---') batch_size = 1000 qrcodes = QcQrcodes.objects.all() for i in range(0, qrcodes.count(), batch_size): for qrcode in qrcodes[i:i + batch_size]: users = qrcode.userids need_generate_code = False # 获取下线的用户 userids = [] for user in users: if user.get('isonline') == QcQrcodesEditLogOperateTypeChoices.OFFLINE: userids.append(user.get('userid')) need_generate_code = True user['isonline'] = QcQrcodesEditLogOperateTypeChoices.ONLINE user['endtime'] = None # 上线时间 user['starttime'] = datetime.now().strftime('%Y-%m-%d %H:%M:%S') if need_generate_code: qrcode.save() corp = QcCorpinfo.objects.get(corpid=qrcode.corpid, agentid=qrcode.agentid) # 重新生成活码, 请求企微api更新活码 generate_qrcode_by_qrcode(qrcode, corp, is_update=True) # 生成活码修改记录 for userid in userids: QcQrcodesEditLog.objects.create( userid=userid, uid=qrcode.uid, corpid=qrcode.corpid, agentid=qrcode.agentid, qrcodeid=qrcode.pk, detail={ 'totallimit': 0, 'daliylimit': 0, 'qrcodelimit': 0, 'addcount': 0, 'qrcodeaddcount': 0, 'qrcode_today_count': 0, }, type=QcQrcodesEditLogTypeChoices.AUTO, operatetype=QcQrcodesEditLogOperateTypeChoices.ONLINE, ) def qrcode_channel_handler(qrcodeid, userid, externaluserid, corpinfo): check_qrcode(qrcodeid, userid, corpinfo) alarm_userid_by_qrcodeid(qrcodeid) # 打标签备注 tag_remark_contact(qrcodeid, userid, externaluserid, corpinfo)