From f9d4bb7a0c536b60777060c610ca0b5426089fa8 Mon Sep 17 00:00:00 2001 From: AKW <2497744746@qq.com> Date: Tue, 19 Dec 2023 16:03:02 +0800 Subject: [PATCH] =?UTF-8?q?=E6=8E=A5=E7=B2=89=E6=8A=A5=E8=AD=A6?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- apps/jqr/models.py | 38 +------ apps/jqr/tasks.py | 208 +++++++++++++++++++++++++++++-------- apps/qc/models.py | 3 + libs/wechat/worker.py | 33 ++++++ start-celeryworker | 4 +- yzk_wechat_event/celery.py | 27 +++-- 6 files changed, 225 insertions(+), 88 deletions(-) diff --git a/apps/jqr/models.py b/apps/jqr/models.py index 3ad86ff..d35a96c 100644 --- a/apps/jqr/models.py +++ b/apps/jqr/models.py @@ -232,6 +232,9 @@ class JqrNewUserSendGroupMsg(JqrBaseSendMsg): priority = models.IntegerField(verbose_name='优先级', default=0) type = models.IntegerField(verbose_name='类型', choices=JqrNewUserSendGroupMsgTypeChoices.choices, default=JqrNewUserSendGroupMsgTypeChoices.PRIVATE) + remark = models.CharField( + max_length=128, verbose_name='备注模板', null=True, blank=True) + tags = models.JSONField(verbose_name='标签', null=True, blank=True) class Meta: db_table = 'jqr_newusersendgroupmsg' @@ -243,20 +246,6 @@ class JqrNewUserSendGroupMsg(JqrBaseSendMsg): return self.taskname -class JqrNewUserOrderSendGroupMsg(JqrBaseSendMsg): - # 优先级 - priority = models.IntegerField(verbose_name='优先级', default=0) - - class Meta: - db_table = 'jqr_newuserordersendgroupmsg' - verbose_name = 'JQR New User Order Send Group Message' - verbose_name_plural = 'JQR New User Order Send Group Messages' - ordering = ['-ctime'] - - def __str__(self): - return self.taskname - - class JqrTimePrivateSendGroupMsg(JqrBaseSendMsg): sendtype = models.IntegerField(verbose_name='发送类型,1=高级群发,2=极速群发', choices=JqrSendGroupMsgSendTypeChoices.choices, @@ -280,27 +269,6 @@ class JqrTimePrivateSendGroupMsg(JqrBaseSendMsg): return self.taskname -class JqrTimeQunSendGroupMsg(JqrBaseSendMsg): - sendtype = models.IntegerField(verbose_name='发送类型,1=高级群发,2=极速群发', - choices=JqrSendGroupMsgSendTypeChoices.choices, - default=JqrSendGroupMsgSendTypeChoices.ADVANCED) - needsid = models.IntegerField(verbose_name='是否需要sid', choices=JqrSendGroupMsgNeedSidChoices.choices, - default=JqrSendGroupMsgNeedSidChoices.NOT_NEED) - timetype = models.IntegerField(verbose_name='发送时间类型', choices=JqrSendGroupMsgTimeTypeChoices.choices, - default=None) - timejson = models.JSONField(verbose_name='发送时间对象', default=None) - sendtime = models.DateTimeField(verbose_name='下次具体发送时间', null=True, blank=True) - - class Meta: - db_table = 'jqr_timequnsendgroupmsg' - verbose_name = 'JQR Time Private Send Group Message' - verbose_name_plural = 'JQR Time Private Send Group Messages' - ordering = ['-ctime'] - - def __str__(self): - return self.taskname - - class JqrKeywordSendGroupMsg(JqrBaseSendMsg): type = models.IntegerField(verbose_name='类型', choices=JqrKeywordSendGroupMsgTypeChoices.choices, default=JqrKeywordSendGroupMsgTypeChoices.PRIVATE) diff --git a/apps/jqr/tasks.py b/apps/jqr/tasks.py index fa1f9e5..66e26b7 100644 --- a/apps/jqr/tasks.py +++ b/apps/jqr/tasks.py @@ -10,7 +10,7 @@ from apps.qc.choices import QcWechatbizeventAddcontactIsDeleteChoices, QcQrcodes from apps.qc.models import QcWechatbizeventAddcontact, QcQrcodes, QcWechatbizuserinfo, QcQrcodesEditLog, QcCorpinfo from apps.jqr.models import JqrExternalFollowUser, JqrExternalUser from apps.qc.utils import generate_qrcode_by_qrcode -from apps.warning.models import QcWarningRule +from apps.warning.models import QcWarningRule, WarningLog from libs.wechat import WechatWorkerUtil logger = logging.getLogger('apps') @@ -23,8 +23,7 @@ def save_add_contact(data, corpinfo, *args, **kwargs): if state and state.startswith('mg') and '_' in state: [_, _, qrcodeid] = state.split('_') data['qrcodeid'] = qrcodeid - # TODO 异步执行 - check_qrcode(qrcodeid, userid, corpinfo) + externaluserid = data.get('externaluserid') corpid = data.get('corpid') QcWechatbizeventAddcontact.objects.update_or_create( @@ -38,6 +37,11 @@ def save_add_contact(data, corpinfo, *args, **kwargs): 'ctime': datetime.now(), } ) + qrcodeid = data.get('qrcodeid') + if qrcodeid: + check_qrcode.delay(qrcodeid, userid, corpinfo) + tag_remark_contact.delay(qrcodeid, userid, externaluserid, corpinfo) + alarm_userid_by_qrcodeid.delay(qrcodeid) edit_add_contact.delay(data, corpinfo, *args, **kwargs) @@ -103,13 +107,18 @@ def delete_add_contact(data, corpinfo, *args, **kwargs): def check_qrcode(qrcodeid, userid, corpinfo): qrcode = QcQrcodes.objects.get(id=qrcodeid) users = qrcode.userids + if not users: + return + online_userid_map = {user.get('userid'): user for user in users if + user.get('isonline') == QcQrcodesEditLogOperateTypeChoices.OFFLINE} corpid = qrcode.corpid userinfo = QcWechatbizuserinfo.objects.get(corpid=corpid, userid=userid) today = date.today() # 账号当前活码的接粉情况 - count_info = QcWechatbizeventAddcontact.objects.filter(corpid=corpinfo.get('corpid'), userid=userid).values( + user_count_info = QcWechatbizeventAddcontact.objects.filter(corpid=corpid, + userid__in=list(online_userid_map.keys())).values( 'corpid', - 'userid').aggregate( + 'userid').annotate( total_count=Count('*'), today_count=Sum( Case( @@ -129,11 +138,13 @@ def check_qrcode(qrcodeid, userid, corpinfo): default=Value(0), ) ) - ) + ).values('userid', 'total_count', 'today_count', 'qrcode_count', 'qrcode_today_count') + user_count_info_map = {user_count.get('userid'): user_count for user_count in user_count_info} + count_info = user_count_info_map.get(userid) # 该账号总共加了多少人 - total_count = count_info.get('total_count', 0) + total_count = count_info.get('total_count') # 该账号今天加了多少人 - today_count = count_info.get('today_count', 0) + today_count = count_info.get('today_count') # 该账号当前活码加了多少人 qrcode_count = count_info.get('qrcode_count') # 该账号今天当前活码加了多少人 @@ -143,27 +154,31 @@ def check_qrcode(qrcodeid, userid, corpinfo): # 该用户总加粉总上限 totallimitaddusercount = userinfo.totallimitaddusercount # 该用户当前活码上限 - user_qrcode_limitcount = None - for user in users: - if user.get('userid') == userid: - user_qrcode_limitcount = user.get('limitcount') - break + user_qrcode_limitcount = online_userid_map.get(userid, {}).get('limitcount') - user_count = len(users) - offline_user_count = len( - [user for user in users if user.get('isonline') == QcQrcodesEditLogOperateTypeChoices.OFFLINE]) # 该账号是否需要下线 need_offline = False + warning_text_list = [] + # 当前接粉号 + current_user = online_userid_map.get(userid, {}) # 该账号今天当前活码加的人数 大于 该用户当前活码上限 - if user_qrcode_limitcount is not None and qrcode_today_count > user_qrcode_limitcount > 0: + if user_qrcode_limitcount is not None and qrcode_today_count >= user_qrcode_limitcount > 0: + warning_text_list.append( + f'账号 {current_user.get("username")}-{current_user.get("alias")}-{userid} 今天 {qrcode.name}活码加的人数: {qrcode_today_count} 大于 该用户当前活码上限: {user_qrcode_limitcount}') need_offline = True - if total_count is not None and total_count > totallimitaddusercount > 0: + if total_count is not None and total_count >= totallimitaddusercount > 0: + warning_text_list.append( + f'账号 {current_user.get("username")}-{current_user.get("alias")}-{userid} 总共加了 {total_count} 人 大于 该用户总共加粉上限: {totallimitaddusercount}') need_offline = True - if today_count is not None and today_count > limitaddusercount > 0: + if today_count is not None and today_count >= limitaddusercount > 0: + warning_text_list.append( + f'账号 {current_user.get("username")}-{current_user.get("alias")}-{userid} 今天加了 {today_count} 人 大于 该用户今天加粉上限: {limitaddusercount}') need_offline = True if need_offline: - if user_count - offline_user_count > 1: + warning_setting_id = qrcode.warning_setting_id + # 最后一个接粉号不下线 + if len(online_userid_map) > 1: for user in users: if user.get('userid') == userid: user['isonline'] = QcQrcodesEditLogOperateTypeChoices.OFFLINE @@ -171,26 +186,132 @@ def check_qrcode(qrcodeid, userid, corpinfo): qrcode.save() # 重新生成活码, 请求企微api更新活码 generate_qrcode_by_qrcode(qrcode, corpinfo, is_update=True) - # 生成活码修改记录 - QcQrcodesEditLog.objects.create( - userid=userid, + # 生成活码修改记录 + QcQrcodesEditLog.objects.create( + userid=userid, + uid=qrcode.uid, + corpid=qrcode.corpid, + agentid=qrcode.agentid, + qrcodeid=qrcode.pk, + detail={ + 'totallimit': totallimitaddusercount, + 'daliylimit': limitaddusercount, + 'qrcodelimit': user_qrcode_limitcount, + 'addcount': total_count, + 'qrcodeaddcount': qrcode_count, + 'qrcode_today_count': qrcode_today_count, + }, + type=QcQrcodesEditLogTypeChoices.AUTO, + operatetype=QcQrcodesEditLogOperateTypeChoices.OFFLINE, + ) + WarningLog.objects.create( + text=','.join(warning_text_list), uid=qrcode.uid, - corpid=qrcode.corpid, - agentid=qrcode.agentid, - qrcodeid=qrcode.pk, - detail={ - 'totallimit': totallimitaddusercount, - 'daliylimit': limitaddusercount, - 'qrcodelimit': user_qrcode_limitcount, - 'addcount': total_count, - 'qrcodeaddcount': qrcode_count, - 'qrcode_today_count': qrcode_today_count, + log_info={ + 'corpid': corpid, + 'userid': userid, }, - type=QcQrcodesEditLogTypeChoices.AUTO, - operatetype=QcQrcodesEditLogOperateTypeChoices.OFFLINE, + warning_setting_id=warning_setting_id, ) +@shared_task(name='alarm_userid_by_qrcodeid', queue='qrcode') +def alarm_userid_by_qrcodeid(qrcodeid): + qrcode = QcQrcodes.objects.get(id=qrcodeid) + users = qrcode.userids + if not users: + return + # 活码成员报警检测 + checkminutes = qrcode.checkminutes + now = datetime.now() + checkminutes_ago = now - timedelta(minutes=checkminutes) + online_userid_map = {user.get('userid'): user for user in users if + user.get('isonline') == QcQrcodesEditLogOperateTypeChoices.OFFLINE} + need_check_userid_map = {} + for userid, user in online_userid_map.items(): + starttime = user.get('starttime') + if starttime is None: + continue + try: + starttime = datetime.fromisoformat(starttime) + if starttime < checkminutes_ago: + need_check_userid_map[userid] = user + except Exception as e: + logger.error(e) + continue + if len(need_check_userid_map.keys()) == 0: + return + corpid = qrcode.corpid + # 当前活码在线接粉号的接粉情况 + user_count_info = QcWechatbizeventAddcontact.objects.filter(corpid=corpid, + userid__in=list(need_check_userid_map.keys()), + qrcodeid=qrcodeid, + createtime__gt=checkminutes_ago.timestamp()).values( + 'corpid', + 'userid').annotate( + qrcode_count=Count('*'), + ).values('qrcode_count', 'userid') + need_check_user_count = len(need_check_userid_map.keys()) + total_qrcode_count = sum([user_count.get('qrcode_count') for user_count in user_count_info]) + average_qrcode_count = total_qrcode_count / need_check_user_count + offlinechecknum = qrcode.offlinechecknum + warning_logs = [] + for user_count in user_count_info: + qrcode_count = user_count.get('qrcode_count', 0) + if (qrcode_count - average_qrcode_count) < offlinechecknum: + user = need_check_userid_map.get(user_count.get('userid')) + text = f"账号 {user.get('userid')} {user.get('username')} {user.get('alias')} 在渠道活码 {qrcode.name} 接粉异常" + log_info = { + 'corpid': corpid, + 'userid': user_count.get('userid'), + } + warning_log = WarningLog( + uid=qrcode.uid, + warning_setting_id=qrcode.warning_setting_id, + log_info=log_info, + text=text, + ) + warning_logs.append(warning_log) + WarningLog.objects.bulk_create(warning_logs) + qrcode.lastchecktime = now + qrcode.save() + + +@shared_task(name='tag_remark_contact', queue='tag_remark_contact') +def tag_remark_contact(qrcodeid, userid, externaluserid, corpinfo): + corpid = corpinfo.get('corpid') + appsecret = corpinfo.get('appsecret') + qrcode = QcQrcodes.objects.get(id=qrcodeid) + remark = qrcode.remark + wechat_worker = WechatWorkerUtil(corpid, appsecret) + if remark: + # '{添加日期} -{昵称}' + if '{添加日期}' in remark: + now = datetime.now().strftime('%Y%m%d') + remark = remark.replace('{添加日期}', now) + if '{昵称}' in remark or '{性别}' in remark: + success, data = wechat_worker.get_external_contact(externaluserid) + if success: + (external_contact, _, _) = data + name = external_contact.get('name') + remark = remark.replace('{昵称}', name) + gender = external_contact.get('gender') + # gender 外部联系人性别 0-未知 1-男性 2-女性 + if '{性别}' in remark: + if gender == 1: + remark = remark.replace('{性别}', '男') + elif gender == 2: + remark = remark.replace('{性别}', '女') + else: + remark = remark.replace('{性别}', '未知') + wechat_worker.externalcontact_remark(userid, externaluserid, remark) + + tags = qrcode.tags + if tags: + tag_ids = [tag.get('id') for tag in tags] + wechat_worker.externalcontact_mark_tag(userid, externaluserid, tag_ids) + + # 接粉号报警检测任务 @shared_task(name='check_follow_user', queue='check_follow_user') def check_follow_user(): @@ -199,11 +320,8 @@ def check_follow_user(): count = warning_rules.count() for i in range(0, count, batch_size): for warning_rule in warning_rules[i:i + batch_size]: + warning_setting_id = warning_rule.warning_setting_id users = warning_rule.userids - users = [{"alias": "饿了么小霸王155", "corpid": "ww4450b72962300373", "userid": "mg12976", - "username": "[155]5G-8-1-汪琦"}, - {"alias": "饿了么小霸王157", "corpid": "ww4450b72962300373", "userid": "mg04598", - "username": "[157]5G-3-2-汪潇"}] if users and len(users) > 0: corpid = users[0]["corpid"] userid_map = {user.get('userid'): user for user in users} @@ -221,6 +339,7 @@ def check_follow_user(): ) ), ).values('userid', 'warning_interval_count') + warning_logs = [] for user_count in user_count_info: if user_count.get('warning_interval_count') < 1: user = userid_map.get(user_count.get('userid')) @@ -229,13 +348,20 @@ def check_follow_user(): 'corpid': corpid, 'userid': user_count.get('userid'), } - - print(warning_rule.warning_setting) + warning_log = WarningLog( + uid=warning_rule.uid, + warning_setting_id=warning_setting_id, + log_info=log_info, + text=text, + ) + warning_logs.append(warning_log) + WarningLog.objects.bulk_create(warning_logs) # 活码上线 @shared_task(name='online_qrcode', queue='qrcode') def online_qrcode(): + logger.info('开始活码上线---') batch_size = 1000 qrcodes = QcQrcodes.objects.all() for i in range(0, qrcodes.count(), batch_size): diff --git a/apps/qc/models.py b/apps/qc/models.py index 7d936b2..9453af2 100644 --- a/apps/qc/models.py +++ b/apps/qc/models.py @@ -1,6 +1,7 @@ import uuid from .choices import * +from ..warning.models import WarningSetting class QcCorpinfo(models.Model): @@ -84,6 +85,8 @@ class QcQrcodes(models.Model): ctime = models.DateTimeField(verbose_name='创建时间', auto_now_add=True) utime = models.DateTimeField(verbose_name='更新时间', auto_now=True) offlinechecknum = models.IntegerField(verbose_name='同组进粉差()人时报警') + warning_setting = models.ForeignKey(WarningSetting, on_delete=models.DO_NOTHING, db_constraint=False, + verbose_name='报警配置', related_name='qrcodes', null=True, blank=True) class Meta: db_table = 'qc_qrcodes' diff --git a/libs/wechat/worker.py b/libs/wechat/worker.py index 2836c6c..8c3412a 100644 --- a/libs/wechat/worker.py +++ b/libs/wechat/worker.py @@ -309,3 +309,36 @@ class WechatWorkerUtil(object): if res.get('errcode') == 0: return True, (res.get('send_list'), res.get('next_cursor')) return False, (res.get('errmsg'), None) + + def externalcontact_remark(self, userid, external_userid, remark, description=None, remark_company=None, + remark_mobiles=None, + remark_pic_mediaid=None): + access_token = self.get_access_token() + url = f'{self.base_url}/externalcontact/remark?access_token={access_token}' + data = { + 'userid': userid, + 'external_userid': external_userid, + 'remark': remark, + 'description': description, + 'remark_company': remark_company, + 'remark_mobiles': remark_mobiles, + 'remark_pic_mediaid': remark_pic_mediaid, + } + res = requests.post(url, json=data).json() + if res.get('errcode') == 0: + return True, res.get('result') + return False, res.get('errmsg') + + def externalcontact_mark_tag(self, userid, external_userid, add_tag=None, remove_tag=None): + access_token = self.get_access_token() + url = f'{self.base_url}/externalcontact/mark_tag?access_token={access_token}' + data = { + 'userid': userid, + 'external_userid': external_userid, + 'add_tag': add_tag, + 'remove_tag': remove_tag, + } + res = requests.post(url, json=data).json() + if res.get('errcode') == 0: + return True, res.get('result') + return False, res.get('errmsg') diff --git a/start-celeryworker b/start-celeryworker index be27168..eff9642 100644 --- a/start-celeryworker +++ b/start-celeryworker @@ -5,7 +5,7 @@ set -o errexit set -o nounset -#watchmedo auto-restart -d yzk_wechat_event/ -p "*.py" -- celery multi start 6 -A yzk_wechat_event worker -l info --logfile=logs/celery.log --concurrency=4 -Q celery,contact,qrcode & -watchmedo auto-restart -d yzk_wechat_event/ -p "*.py" -- celery -A yzk_wechat_event worker -l info --logfile=logs/celery.log --concurrency=4 -Q celery,contact,qrcode,check_follow_user & +#watchmedo auto-restart -d yzk_wechat_event/ -p "*.py" -- celery multi start 6 -A yzk_wechat_event worker -l info --logfile=logs/celery.log --concurrency=4 -Q celery,contact,qrcode,check_follow_user, tag_remark_contact & +watchmedo auto-restart -d yzk_wechat_event/ -p "*.py" -- celery -A yzk_wechat_event worker -l info --logfile=logs/celery.log --concurrency=4 -Q celery,contact,qrcode,check_follow_user, tag_remark_contact & celery -A yzk_wechat_event beat -l info --logfile=logs/celery_beat.log & tail -f logs/*.log diff --git a/yzk_wechat_event/celery.py b/yzk_wechat_event/celery.py index 963930a..cc61c2a 100644 --- a/yzk_wechat_event/celery.py +++ b/yzk_wechat_event/celery.py @@ -3,6 +3,7 @@ import os from datetime import timedelta from celery import Celery +from celery.schedules import crontab os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'yzk_wechat_event.settings.base') @@ -14,19 +15,25 @@ app.autodiscover_tasks() # 定时任务的调度列表,用于注册定时任务 app.conf.beat_schedule = { - # 'warning_log': { - # 'task': 'warning_log', - # 'schedule': timedelta(minutes=1), - # }, + 'online_qrcode': { + 'task': 'online_qrcode', + 'schedule': crontab(minute="0", hour="0"), + }, + 'check_follow_user': { + 'task': 'check_follow_user', + 'schedule': timedelta(minutes=1), + }, } task_routes = { - 'save_add_contact': 'contact', - 'edit_add_contact': 'contact', - 'delete_add_contact': 'contact', - 'check_qrcode': 'qrcode', - 'check_follow_user': 'check_follow_user', - 'online_qrcode': 'qrcode', + 'save_add_contact': {'queue': 'contact'}, + 'edit_add_contact': {'queue': 'contact'}, + 'delete_add_contact': {'queue': 'contact'}, + 'check_qrcode': {'queue': 'contact'}, + 'check_follow_user': {'queue': 'check_follow_user'}, + 'online_qrcode': {'queue': 'qrcode'}, + 'alarm_userid_by_qrcodeid': {'queue': 'qrcode'}, + 'tag_remark_contact': {'queue': 'tag_remark_contact'}, } # app.conf.task_routes = task_routes