from celery import shared_task from datetime import datetime, date, timedelta import logging from django.db.models import Count, Sum, When, Case, Value from apps.jqr.choices import JqrWechatbizuserinfoDeleteTypeChoices, JqrAddTypeChoices from apps.jqr.pubsub import JQRQrcodeCallbackPubSub from apps.jqr.utils import send_new_user_msg from apps.jqr.ws import WS from apps.qc.choices import QcWechatbizeventAddcontactIsDeleteChoices, QcQrcodesEditLogOperateTypeChoices, \ QcQrcodesEditLogTypeChoices from apps.qc.models import QcWechatbizeventAddcontact, QcQrcodes, QcWechatbizuserinfo, QcQrcodesEditLog, QcCorpinfo from apps.jqr.models import JqrExternalFollowUser, JqrExternalUser, JqrHookUser from apps.qc.utils import generate_qrcode_by_qrcode from apps.warning.models import QcWarningRule, WarningLog, WarningSetting from libs.wechat import WechatWorkerUtil from utils.redis_lock import SimpleLock logger = logging.getLogger('apps') @shared_task(name='save_add_contact', queue='contact') def save_add_contact(data, corpinfo, *args, **kwargs): state = data.get('state') userid = data.get('userid') if state and state.startswith('mg') and '_' in state: [_, _, qrcodeid] = state.split('_') data['qrcodeid'] = qrcodeid externaluserid = data.get('externaluserid') corpid = data.get('corpid') QcWechatbizeventAddcontact.objects.update_or_create( userid=userid, externaluserid=externaluserid, corpid=corpid, defaults={ **data, 'isdelete': QcWechatbizeventAddcontactIsDeleteChoices.NOT_DELETE, 'deletetime': None, 'ctime': datetime.now(), } ) qrcodeid = data.get('qrcodeid') if qrcodeid: check_qrcode.delay(qrcodeid, userid, corpinfo) alarm_userid_by_qrcodeid.delay(qrcodeid) # 打标签备注 tag_remark_contact(qrcodeid, userid, externaluserid, corpinfo) # 修复客户关系 edit_add_contact(data, corpinfo, *args, **kwargs) # TODO 请求API 发送消息 def save_add_contact_by_channel(data, corpinfo, *args, **kwargs): userid = data.get('userid') externaluserid = data.get('externaluserid') corpid = data.get('corpid') QcWechatbizeventAddcontact.objects.update_or_create( userid=userid, externaluserid=externaluserid, corpid=corpid, defaults={ **data, 'isdelete': QcWechatbizeventAddcontactIsDeleteChoices.NOT_DELETE, 'deletetime': None, 'ctime': datetime.now(), } ) qrcodeid = data.get('qrcodeid') if qrcodeid: JQRQrcodeCallbackPubSub.publish({ 'qrcodeid': qrcodeid, 'userid': userid, 'externaluserid': externaluserid, 'corpinfo': corpinfo, 'handler': f'{qrcode_channel_handler.__module__}.{qrcode_channel_handler.__name__}' }) # 修复客户关系 edit_add_contact(data, corpinfo, *args, **kwargs) # # 转化外部用户Id # WS.transfer_external_userid_to_vid(corpid, userid, externaluserid) # 发送新客欢迎 hook_user = JqrHookUser.objects.filter(corpid=corpid, userid=userid).first() if hook_user is not None: # utime 是否在 9 分钟内 now = datetime.now() nine_minute_ago = now - timedelta(minutes=9) uid = hook_user.uid createtime = data.get('createtime') rc = SimpleLock(f'{corpid}:{userid}:{externaluserid}:{createtime}') success = rc.try_lock(60 * 10) if hook_user.utime > nine_minute_ago and hook_user.new_user and success: # 发送消息 send_new_user_msg(corpid, userid, external_userid=externaluserid, uid=uid) @shared_task(name='edit_add_contact', queue='contact') def edit_add_contact(data, corpinfo, *args, **kwargs): # 更新 关系表 externaluserid = data.get('externaluserid') corpid = corpinfo.get('corpid') appsecret = corpinfo.get('appsecret') wechat_worker = WechatWorkerUtil(corpid, appsecret) cursor = '' while cursor is not None: success, data = wechat_worker.get_external_contact(externaluserid, cursor=cursor) if not success: logger.error(f'获取外部联系人信息失败,{data}') return (external_contact, follow_user, cursor) = data JqrExternalUser.objects.update_or_create( corpid=corpid, external_userid=externaluserid, defaults={ **external_contact, 'addtype': JqrAddTypeChoices.EVENT_CALLBACK, } ) for follow_info in follow_user: userid = follow_info.get('userid') follow_info['tags'] = [tag.get('tag_id') for tag in follow_info.get('tags', [])] JqrExternalFollowUser.objects.update_or_create( userid=userid, corpid=corpid, external_userid=externaluserid, defaults={ **follow_info, 'addtype': JqrAddTypeChoices.EVENT_CALLBACK, 'deletetype': 0, 'dtime': None } ) def edit_add_contact_by_channel(data, corpinfo, *args, **kwargs): # 更新 关系表 externaluserid = data.get('externaluserid') corpid = corpinfo.get('corpid') appsecret = corpinfo.get('appsecret') wechat_worker = WechatWorkerUtil(corpid, appsecret) cursor = '' while cursor is not None: success, data = wechat_worker.get_external_contact(externaluserid, cursor=cursor) if not success: logger.error(f'获取外部联系人信息失败,{data}') return (external_contact, follow_user, cursor) = data JqrExternalUser.objects.update_or_create( corpid=corpid, external_userid=externaluserid, defaults={ **external_contact, 'addtype': JqrAddTypeChoices.EVENT_CALLBACK, } ) for follow_info in follow_user: userid = follow_info.get('userid') follow_info['tags'] = [tag.get('tag_id') for tag in follow_info.get('tags', [])] JqrExternalFollowUser.objects.update_or_create( userid=userid, corpid=corpid, external_userid=externaluserid, defaults={ **follow_info, 'addtype': JqrAddTypeChoices.EVENT_CALLBACK, 'deletetype': 0, 'dtime': None } ) @shared_task(name='delete_add_contact', queue='contact') def delete_add_contact(data, corpinfo, *args, **kwargs): userid = data.get('userid') externaluserid = data.get('externaluserid') corpid = data.get('corpid') agentid = data.get('agentid') # 更新add_contact 表删除字段 QcWechatbizeventAddcontact.objects.filter(userid=userid, externaluserid=externaluserid, corpid=corpid, agentid=agentid).update( isdelete=QcWechatbizeventAddcontactIsDeleteChoices.DELETED, deletetime=datetime.now() ) JqrExternalFollowUser.objects.filter(userid=userid, external_userid=externaluserid, corpid=corpid).update( deletetype=JqrWechatbizuserinfoDeleteTypeChoices.EVENT_CALLBACK, dtime=datetime.now() ) def delete_add_contact_by_channel(data, corpinfo, *args, **kwargs): userid = data.get('userid') externaluserid = data.get('externaluserid') corpid = data.get('corpid') agentid = data.get('agentid') # 更新add_contact 表删除字段 QcWechatbizeventAddcontact.objects.filter(userid=userid, externaluserid=externaluserid, corpid=corpid, agentid=agentid).update( isdelete=QcWechatbizeventAddcontactIsDeleteChoices.DELETED, deletetime=datetime.now() ) JqrExternalFollowUser.objects.filter(userid=userid, external_userid=externaluserid, corpid=corpid).update( deletetype=JqrWechatbizuserinfoDeleteTypeChoices.EVENT_CALLBACK, dtime=datetime.now() ) @shared_task(name='check_qrcode', queue='qrcode') def check_qrcode(qrcodeid, userid, corpinfo): qrcode = QcQrcodes.objects.get(id=qrcodeid) users = qrcode.userids if not users: return online_userid_map = {user.get('userid'): user for user in users if user.get('isonline') == QcQrcodesEditLogOperateTypeChoices.OFFLINE} corpid = qrcode.corpid userinfo = QcWechatbizuserinfo.objects.get(corpid=corpid, userid=userid) today = date.today() # 账号当前活码的接粉情况 count_info = QcWechatbizeventAddcontact.objects.filter(corpid=corpid, userid=userid).values( 'corpid', 'userid').aggregate( total_count=Count('*'), today_count=Sum( Case( When(ctime__date=today, then=Value(1)), default=Value(0), ) ), qrcode_count=Sum( Case( When(qrcodeid=qrcodeid, then=Value(1)), default=Value(0), ) ), qrcode_today_count=Sum( Case( When(qrcodeid=qrcodeid, ctime__date=today, then=Value(1)), default=Value(0), ) ) ) # 该账号总共加了多少人 total_count = count_info.get('total_count') # 该账号今天加了多少人 today_count = count_info.get('today_count') # 该账号当前活码加了多少人 qrcode_count = count_info.get('qrcode_count') # 该账号今天当前活码加了多少人 qrcode_today_count = count_info.get('qrcode_today_count') # 该用户所有活码每日新增好友上限 limitaddusercount = userinfo.limitaddusercount # 该用户总加粉总上限 totallimitaddusercount = userinfo.totallimitaddusercount # 该用户当前活码上限 user_qrcode_limitcount = online_userid_map.get(userid, {}).get('limitcount') # 该账号是否需要下线 need_offline = False warning_text_list = [] # 当前接粉号 current_user = online_userid_map.get(userid, {}) # 该账号今天当前活码加的人数 大于 该用户当前活码上限 if user_qrcode_limitcount is not None and qrcode_today_count >= user_qrcode_limitcount > 0: warning_text_list.append( f'账号 {current_user.get("username")}-{current_user.get("alias")}-{userid} 今天 {qrcode.name}活码加的人数: {qrcode_today_count} 大于 该用户当前活码上限: {user_qrcode_limitcount}') need_offline = True if total_count is not None and total_count >= totallimitaddusercount > 0: warning_text_list.append( f'账号 {current_user.get("username")}-{current_user.get("alias")}-{userid} 总共加了 {total_count} 人 大于 该用户总共加粉上限: {totallimitaddusercount}') need_offline = True if today_count is not None and today_count >= limitaddusercount > 0: warning_text_list.append( f'账号 {current_user.get("username")}-{current_user.get("alias")}-{userid} 今天加了 {today_count} 人 大于 该用户今天加粉上限: {limitaddusercount}') need_offline = True if need_offline: warning_setting_id = qrcode.warning_setting_id # 最后一个接粉号不下线 if len(online_userid_map) > 1: for user in users: if user.get('userid') == userid: user['isonline'] = QcQrcodesEditLogOperateTypeChoices.OFFLINE user['endtime'] = datetime.now().strftime('%Y-%m-%d %H:%M:%S') qrcode.save() # 重新生成活码, 请求企微api更新活码 generate_qrcode_by_qrcode(qrcode, corpinfo, is_update=True) # 生成活码修改记录 QcQrcodesEditLog.objects.create( userid=userid, uid=qrcode.uid, corpid=qrcode.corpid, agentid=qrcode.agentid, qrcodeid=qrcode.pk, detail={ 'totallimit': totallimitaddusercount, 'daliylimit': limitaddusercount, 'qrcodelimit': user_qrcode_limitcount, 'addcount': total_count, 'qrcodeaddcount': qrcode_count, 'qrcode_today_count': qrcode_today_count, }, type=QcQrcodesEditLogTypeChoices.AUTO, operatetype=QcQrcodesEditLogOperateTypeChoices.OFFLINE, ) WarningLog.objects.create( text=','.join(warning_text_list), uid=qrcode.uid, log_info={ 'corpid': corpid, 'userid': userid, }, warning_setting_id=warning_setting_id, ) @shared_task(name='alarm_userid_by_qrcodeid', queue='qrcode') def alarm_userid_by_qrcodeid(qrcodeid): qrcode = QcQrcodes.objects.get(id=qrcodeid) users = qrcode.userids if not users: return # 活码成员报警检测 checkminutes = qrcode.checkminutes now = datetime.now() checkminutes_ago = now - timedelta(minutes=checkminutes) online_userid_map = {user.get('userid'): user for user in users if user.get('isonline') == QcQrcodesEditLogOperateTypeChoices.OFFLINE} need_check_userid_map = {} for userid, user in online_userid_map.items(): starttime = user.get('starttime') if starttime is None: continue try: starttime = datetime.fromisoformat(starttime) if starttime < checkminutes_ago: need_check_userid_map[userid] = user except Exception as e: logger.error(e) continue if len(need_check_userid_map.keys()) == 0: return corpid = qrcode.corpid # 当前活码在线接粉号的接粉情况 user_count_info = QcWechatbizeventAddcontact.objects.filter(corpid=corpid, userid__in=list(need_check_userid_map.keys()), qrcodeid=qrcodeid, createtime__gt=checkminutes_ago.timestamp()).values( 'corpid', 'userid').annotate( qrcode_count=Count('*'), ).values('qrcode_count', 'userid') need_check_user_count = len(need_check_userid_map.keys()) total_qrcode_count = sum([user_count.get('qrcode_count') for user_count in user_count_info]) average_qrcode_count = total_qrcode_count / need_check_user_count offlinechecknum = qrcode.offlinechecknum warning_logs = [] for user_count in user_count_info: qrcode_count = user_count.get('qrcode_count', 0) if (qrcode_count - average_qrcode_count) < offlinechecknum: user = need_check_userid_map.get(user_count.get('userid')) text = f"账号 {user.get('userid')} {user.get('username')} {user.get('alias')} 在渠道活码 {qrcode.name} 接粉异常" log_info = { 'corpid': corpid, 'userid': user_count.get('userid'), } warning_log = WarningLog( uid=qrcode.uid, warning_setting_id=qrcode.warning_setting_id, log_info=log_info, text=text, ) warning_logs.append(warning_log) WarningLog.objects.bulk_create(warning_logs) qrcode.lastchecktime = now qrcode.save() @shared_task(name='tag_remark_contact', queue='tag_remark_contact') def tag_remark_contact(qrcodeid, userid, externaluserid, corpinfo): corpid = corpinfo.get('corpid') appsecret = corpinfo.get('appsecret') qrcode = QcQrcodes.objects.get(id=qrcodeid) remark = qrcode.remark wechat_worker = WechatWorkerUtil(corpid, appsecret) if remark: # '{添加日期} -{昵称}' if '{添加日期}' in remark: now = datetime.now().strftime('%Y%m%d') remark = remark.replace('{添加日期}', now) if '{昵称}' in remark or '{性别}' in remark: success, data = wechat_worker.get_external_contact(externaluserid) if success: (external_contact, _, _) = data name = external_contact.get('name') remark = remark.replace('{昵称}', name) gender = external_contact.get('gender') # gender 外部联系人性别 0-未知 1-男性 2-女性 if '{性别}' in remark: if gender == 1: remark = remark.replace('{性别}', '男') elif gender == 2: remark = remark.replace('{性别}', '女') else: remark = remark.replace('{性别}', '未知') wechat_worker.externalcontact_remark(userid, externaluserid, remark) tags = qrcode.tags if tags: tag_ids = [tag.get('id') for tag in tags] wechat_worker.externalcontact_mark_tag(userid, externaluserid, tag_ids) # 接粉号报警检测任务 @shared_task(name='check_follow_user', queue='check_follow_user') def check_follow_user(): batch_size = 1000 warning_rules = QcWarningRule.objects.filter(is_on=True, is_delete=False) count = warning_rules.count() now = datetime.now() for i in range(0, count, batch_size): for warning_rule in warning_rules[i:i + batch_size]: warning_setting_id = warning_rule.warning_setting_id users = warning_rule.userids if users and len(users) > 0: corpid = users[0]["corpid"] userid_map = {user.get('userid'): user for user in users} warning_interval = warning_rule.warning_interval now = datetime.now() warning_interval_ago = now - timedelta(minutes=warning_interval) user_count_info = QcWechatbizeventAddcontact.objects.filter(corpid=corpid, userid__in=list(userid_map.keys())).values( 'corpid', 'userid').annotate( warning_interval_count=Sum( Case( When(ctime__gte=warning_interval_ago, then=Value(1)), default=Value(0), ) ), ).values('userid', 'warning_interval_count') warning_logs = [] warning_setting = WarningSetting.objects.get(id=warning_setting_id) for user_count in user_count_info: if user_count.get('warning_interval_count') < 1: user = userid_map.get(user_count.get('userid')) text = f""" 报警名称:{warning_setting.name} 用户id: {user.get('userid')} 用户昵称:{user.get('username')} 用户别名:{user.get('alias')} 持续未进粉时间:{warning_interval}分钟 检测间隔:5分钟 本次检测时间:{now.strftime("%Y-%m-%d %H:%M:%S")} """ log_info = { 'corpid': corpid, 'userid': user_count.get('userid'), } warning_log = WarningLog( uid=warning_rule.uid, warning_setting_id=warning_setting_id, log_info=log_info, text=text, ) warning_logs.append(warning_log) WarningLog.objects.bulk_create(warning_logs) # 活码上线 @shared_task(name='online_qrcode', queue='qrcode') def online_qrcode(): logger.info('开始活码上线---') batch_size = 1000 qrcodes = QcQrcodes.objects.all() for i in range(0, qrcodes.count(), batch_size): for qrcode in qrcodes[i:i + batch_size]: users = qrcode.userids need_generate_code = False # 获取下线的用户 userids = [] for user in users: if user.get('isonline') == QcQrcodesEditLogOperateTypeChoices.OFFLINE: userids.append(user.get('userid')) need_generate_code = True user['isonline'] = QcQrcodesEditLogOperateTypeChoices.ONLINE user['endtime'] = None # 上线时间 user['starttime'] = datetime.now().strftime('%Y-%m-%d %H:%M:%S') if need_generate_code: qrcode.save() corp = QcCorpinfo.objects.get(corpid=qrcode.corpid, agentid=qrcode.agentid) # 重新生成活码, 请求企微api更新活码 generate_qrcode_by_qrcode(qrcode, corp, is_update=True) # 生成活码修改记录 for userid in userids: QcQrcodesEditLog.objects.create( userid=userid, uid=qrcode.uid, corpid=qrcode.corpid, agentid=qrcode.agentid, qrcodeid=qrcode.pk, detail={ 'totallimit': 0, 'daliylimit': 0, 'qrcodelimit': 0, 'addcount': 0, 'qrcodeaddcount': 0, 'qrcode_today_count': 0, }, type=QcQrcodesEditLogTypeChoices.AUTO, operatetype=QcQrcodesEditLogOperateTypeChoices.ONLINE, ) def qrcode_channel_handler(qrcodeid, userid, externaluserid, corpinfo): check_qrcode(qrcodeid, userid, corpinfo) alarm_userid_by_qrcodeid(qrcodeid) # 打标签备注 tag_remark_contact(qrcodeid, userid, externaluserid, corpinfo)