yzk_wechat_event/apps/jqr/tasks.py

516 lines
22 KiB
Python
Raw Normal View History

2023-12-13 15:27:31 +08:00
from celery import shared_task
2023-12-15 17:42:32 +08:00
from datetime import datetime, date, timedelta
2023-12-13 17:25:23 +08:00
import logging
2023-12-15 17:42:32 +08:00
from django.db.models import Count, Sum, When, Case, Value
2023-12-14 18:43:01 +08:00
from apps.jqr.choices import JqrWechatbizuserinfoDeleteTypeChoices, JqrAddTypeChoices
2023-12-25 11:06:08 +08:00
from apps.jqr.pubsub import JQRQrcodeCallbackPubSub
2023-12-27 15:26:58 +08:00
from apps.jqr.utils import send_new_user_msg
2023-12-25 14:35:54 +08:00
from apps.jqr.ws import WS
2023-12-15 17:42:32 +08:00
from apps.qc.choices import QcWechatbizeventAddcontactIsDeleteChoices, QcQrcodesEditLogOperateTypeChoices, \
QcQrcodesEditLogTypeChoices
from apps.qc.models import QcWechatbizeventAddcontact, QcQrcodes, QcWechatbizuserinfo, QcQrcodesEditLog, QcCorpinfo
2023-12-14 18:43:01 +08:00
from apps.jqr.models import JqrExternalFollowUser, JqrExternalUser
2023-12-15 17:42:32 +08:00
from apps.qc.utils import generate_qrcode_by_qrcode
2023-12-22 14:09:31 +08:00
from apps.warning.models import QcWarningRule, WarningLog, WarningSetting
2023-12-14 18:43:01 +08:00
from libs.wechat import WechatWorkerUtil
2023-12-13 17:25:23 +08:00
logger = logging.getLogger('apps')
2023-12-13 15:27:31 +08:00
2023-12-14 18:43:01 +08:00
@shared_task(name='save_add_contact', queue='contact')
def save_add_contact(data, corpinfo, *args, **kwargs):
state = data.get('state')
2023-12-15 17:42:32 +08:00
userid = data.get('userid')
2023-12-15 09:30:38 +08:00
if state and state.startswith('mg') and '_' in state:
2023-12-14 18:43:01 +08:00
[_, _, qrcodeid] = state.split('_')
data['qrcodeid'] = qrcodeid
2023-12-19 16:03:02 +08:00
2023-12-14 18:43:01 +08:00
externaluserid = data.get('externaluserid')
corpid = data.get('corpid')
QcWechatbizeventAddcontact.objects.update_or_create(
userid=userid,
externaluserid=externaluserid,
corpid=corpid,
defaults={
**data,
'isdelete': QcWechatbizeventAddcontactIsDeleteChoices.NOT_DELETE,
2023-12-15 09:32:22 +08:00
'deletetime': None,
'ctime': datetime.now(),
2023-12-14 18:43:01 +08:00
}
)
2023-12-19 16:03:02 +08:00
qrcodeid = data.get('qrcodeid')
if qrcodeid:
check_qrcode.delay(qrcodeid, userid, corpinfo)
alarm_userid_by_qrcodeid.delay(qrcodeid)
# 打标签备注
tag_remark_contact(qrcodeid, userid, externaluserid, corpinfo)
# 修复客户关系
edit_add_contact(data, corpinfo, *args, **kwargs)
# TODO 请求API 发送消息
2023-12-14 18:43:01 +08:00
2023-12-25 11:06:08 +08:00
def save_add_contact_by_channel(data, corpinfo, *args, **kwargs):
userid = data.get('userid')
externaluserid = data.get('externaluserid')
corpid = data.get('corpid')
QcWechatbizeventAddcontact.objects.update_or_create(
userid=userid,
externaluserid=externaluserid,
corpid=corpid,
defaults={
**data,
'isdelete': QcWechatbizeventAddcontactIsDeleteChoices.NOT_DELETE,
'deletetime': None,
'ctime': datetime.now(),
}
)
qrcodeid = data.get('qrcodeid')
if qrcodeid:
JQRQrcodeCallbackPubSub.publish({
'qrcodeid': qrcodeid,
'userid': userid,
'externaluserid': externaluserid,
'corpinfo': corpinfo,
'handler': f'{qrcode_channel_handler.__module__}.{qrcode_channel_handler.__name__}'
})
# 修复客户关系
edit_add_contact(data, corpinfo, *args, **kwargs)
2023-12-25 14:36:36 +08:00
# 转化外部用户Id
2023-12-27 15:21:02 +08:00
WS.transfer_external_userid_to_vid(corpid, userid, externaluserid)
2023-12-27 15:26:58 +08:00
# 发送新客欢迎
send_new_user_msg(corpid, userid, externaluserid)
2023-12-25 11:06:08 +08:00
2023-12-14 18:43:01 +08:00
@shared_task(name='edit_add_contact', queue='contact')
def edit_add_contact(data, corpinfo, *args, **kwargs):
# 更新 关系表
externaluserid = data.get('externaluserid')
corpid = corpinfo.get('corpid')
appsecret = corpinfo.get('appsecret')
wechat_worker = WechatWorkerUtil(corpid, appsecret)
cursor = ''
while cursor is not None:
success, data = wechat_worker.get_external_contact(externaluserid, cursor=cursor)
if not success:
logger.error(f'获取外部联系人信息失败,{data}')
return
(external_contact, follow_user, cursor) = data
JqrExternalUser.objects.update_or_create(
corpid=corpid,
external_userid=externaluserid,
defaults={
**external_contact,
'addtype': JqrAddTypeChoices.EVENT_CALLBACK,
'deletetype': None,
'dtime': None
}
)
for follow_info in follow_user:
userid = follow_info.get('userid')
follow_info['tags'] = [tag.get('tag_id') for tag in follow_info.get('tags', [])]
JqrExternalFollowUser.objects.update_or_create(
userid=userid,
corpid=corpid,
external_userid=externaluserid,
defaults={
**follow_info,
'addtype': JqrAddTypeChoices.EVENT_CALLBACK,
'deletetype': None,
'dtime': None
}
)
2023-12-25 11:06:08 +08:00
def edit_add_contact_by_channel(data, corpinfo, *args, **kwargs):
# 更新 关系表
externaluserid = data.get('externaluserid')
corpid = corpinfo.get('corpid')
appsecret = corpinfo.get('appsecret')
wechat_worker = WechatWorkerUtil(corpid, appsecret)
cursor = ''
while cursor is not None:
success, data = wechat_worker.get_external_contact(externaluserid, cursor=cursor)
if not success:
logger.error(f'获取外部联系人信息失败,{data}')
return
(external_contact, follow_user, cursor) = data
JqrExternalUser.objects.update_or_create(
corpid=corpid,
external_userid=externaluserid,
defaults={
**external_contact,
'addtype': JqrAddTypeChoices.EVENT_CALLBACK,
'deletetype': None,
'dtime': None
}
)
for follow_info in follow_user:
userid = follow_info.get('userid')
follow_info['tags'] = [tag.get('tag_id') for tag in follow_info.get('tags', [])]
JqrExternalFollowUser.objects.update_or_create(
userid=userid,
corpid=corpid,
external_userid=externaluserid,
defaults={
**follow_info,
'addtype': JqrAddTypeChoices.EVENT_CALLBACK,
'deletetype': None,
'dtime': None
}
)
2023-12-14 18:43:01 +08:00
@shared_task(name='delete_add_contact', queue='contact')
def delete_add_contact(data, corpinfo, *args, **kwargs):
userid = data.get('userid')
externaluserid = data.get('externaluserid')
corpid = data.get('corpid')
agentid = data.get('agentid')
# 更新add_contact 表删除字段
QcWechatbizeventAddcontact.objects.filter(userid=userid, externaluserid=externaluserid, corpid=corpid,
agentid=agentid).update(
isdelete=QcWechatbizeventAddcontactIsDeleteChoices.DELETED,
deletetime=datetime.now()
)
JqrExternalFollowUser.objects.filter(userid=userid, external_userid=externaluserid, corpid=corpid).update(
deletetype=JqrWechatbizuserinfoDeleteTypeChoices.EVENT_CALLBACK,
dtime=datetime.now()
)
2023-12-15 17:42:32 +08:00
2023-12-25 11:06:08 +08:00
def delete_add_contact_by_channel(data, corpinfo, *args, **kwargs):
userid = data.get('userid')
externaluserid = data.get('externaluserid')
corpid = data.get('corpid')
agentid = data.get('agentid')
# 更新add_contact 表删除字段
QcWechatbizeventAddcontact.objects.filter(userid=userid, externaluserid=externaluserid, corpid=corpid,
agentid=agentid).update(
isdelete=QcWechatbizeventAddcontactIsDeleteChoices.DELETED,
deletetime=datetime.now()
)
JqrExternalFollowUser.objects.filter(userid=userid, external_userid=externaluserid, corpid=corpid).update(
deletetype=JqrWechatbizuserinfoDeleteTypeChoices.EVENT_CALLBACK,
dtime=datetime.now()
)
2023-12-15 17:42:32 +08:00
@shared_task(name='check_qrcode', queue='qrcode')
def check_qrcode(qrcodeid, userid, corpinfo):
qrcode = QcQrcodes.objects.get(id=qrcodeid)
users = qrcode.userids
2023-12-19 16:03:02 +08:00
if not users:
return
online_userid_map = {user.get('userid'): user for user in users if
user.get('isonline') == QcQrcodesEditLogOperateTypeChoices.OFFLINE}
2023-12-15 17:42:32 +08:00
corpid = qrcode.corpid
userinfo = QcWechatbizuserinfo.objects.get(corpid=corpid, userid=userid)
today = date.today()
# 账号当前活码的接粉情况
2023-12-20 10:57:06 +08:00
count_info = QcWechatbizeventAddcontact.objects.filter(corpid=corpid,
userid=userid).values(
2023-12-15 17:42:32 +08:00
'corpid',
2023-12-20 10:57:06 +08:00
'userid').aggregate(
2023-12-15 17:42:32 +08:00
total_count=Count('*'),
today_count=Sum(
Case(
When(ctime__date=today, then=Value(1)),
default=Value(0),
)
),
qrcode_count=Sum(
Case(
When(qrcodeid=qrcodeid, then=Value(1)),
default=Value(0),
)
),
qrcode_today_count=Sum(
Case(
When(qrcodeid=qrcodeid, ctime__date=today, then=Value(1)),
default=Value(0),
)
)
2023-12-20 10:57:06 +08:00
)
2023-12-15 17:42:32 +08:00
# 该账号总共加了多少人
2023-12-19 16:03:02 +08:00
total_count = count_info.get('total_count')
2023-12-15 17:42:32 +08:00
# 该账号今天加了多少人
2023-12-19 16:03:02 +08:00
today_count = count_info.get('today_count')
2023-12-15 17:42:32 +08:00
# 该账号当前活码加了多少人
qrcode_count = count_info.get('qrcode_count')
# 该账号今天当前活码加了多少人
qrcode_today_count = count_info.get('qrcode_today_count')
# 该用户所有活码每日新增好友上限
limitaddusercount = userinfo.limitaddusercount
# 该用户总加粉总上限
totallimitaddusercount = userinfo.totallimitaddusercount
# 该用户当前活码上限
2023-12-19 16:03:02 +08:00
user_qrcode_limitcount = online_userid_map.get(userid, {}).get('limitcount')
2023-12-15 17:42:32 +08:00
# 该账号是否需要下线
need_offline = False
2023-12-19 16:03:02 +08:00
warning_text_list = []
# 当前接粉号
current_user = online_userid_map.get(userid, {})
2023-12-15 17:42:32 +08:00
# 该账号今天当前活码加的人数 大于 该用户当前活码上限
2023-12-19 16:03:02 +08:00
if user_qrcode_limitcount is not None and qrcode_today_count >= user_qrcode_limitcount > 0:
warning_text_list.append(
f'账号 {current_user.get("username")}-{current_user.get("alias")}-{userid} 今天 {qrcode.name}活码加的人数: {qrcode_today_count} 大于 该用户当前活码上限: {user_qrcode_limitcount}')
2023-12-15 17:42:32 +08:00
need_offline = True
2023-12-19 16:03:02 +08:00
if total_count is not None and total_count >= totallimitaddusercount > 0:
warning_text_list.append(
f'账号 {current_user.get("username")}-{current_user.get("alias")}-{userid} 总共加了 {total_count} 人 大于 该用户总共加粉上限: {totallimitaddusercount}')
2023-12-15 17:42:32 +08:00
need_offline = True
2023-12-19 16:03:02 +08:00
if today_count is not None and today_count >= limitaddusercount > 0:
warning_text_list.append(
f'账号 {current_user.get("username")}-{current_user.get("alias")}-{userid} 今天加了 {today_count} 人 大于 该用户今天加粉上限: {limitaddusercount}')
2023-12-15 17:42:32 +08:00
need_offline = True
if need_offline:
2023-12-19 16:03:02 +08:00
warning_setting_id = qrcode.warning_setting_id
# 最后一个接粉号不下线
if len(online_userid_map) > 1:
2023-12-15 17:42:32 +08:00
for user in users:
if user.get('userid') == userid:
user['isonline'] = QcQrcodesEditLogOperateTypeChoices.OFFLINE
user['endtime'] = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
qrcode.save()
# 重新生成活码, 请求企微api更新活码
generate_qrcode_by_qrcode(qrcode, corpinfo, is_update=True)
2023-12-19 16:03:02 +08:00
# 生成活码修改记录
QcQrcodesEditLog.objects.create(
userid=userid,
uid=qrcode.uid,
corpid=qrcode.corpid,
agentid=qrcode.agentid,
qrcodeid=qrcode.pk,
detail={
'totallimit': totallimitaddusercount,
'daliylimit': limitaddusercount,
'qrcodelimit': user_qrcode_limitcount,
'addcount': total_count,
'qrcodeaddcount': qrcode_count,
'qrcode_today_count': qrcode_today_count,
},
type=QcQrcodesEditLogTypeChoices.AUTO,
operatetype=QcQrcodesEditLogOperateTypeChoices.OFFLINE,
)
WarningLog.objects.create(
text=','.join(warning_text_list),
2023-12-15 17:42:32 +08:00
uid=qrcode.uid,
2023-12-19 16:03:02 +08:00
log_info={
'corpid': corpid,
'userid': userid,
2023-12-15 17:42:32 +08:00
},
2023-12-19 16:03:02 +08:00
warning_setting_id=warning_setting_id,
2023-12-15 17:42:32 +08:00
)
2023-12-19 16:03:02 +08:00
@shared_task(name='alarm_userid_by_qrcodeid', queue='qrcode')
def alarm_userid_by_qrcodeid(qrcodeid):
qrcode = QcQrcodes.objects.get(id=qrcodeid)
users = qrcode.userids
if not users:
return
# 活码成员报警检测
checkminutes = qrcode.checkminutes
now = datetime.now()
checkminutes_ago = now - timedelta(minutes=checkminutes)
online_userid_map = {user.get('userid'): user for user in users if
user.get('isonline') == QcQrcodesEditLogOperateTypeChoices.OFFLINE}
need_check_userid_map = {}
for userid, user in online_userid_map.items():
starttime = user.get('starttime')
if starttime is None:
continue
try:
starttime = datetime.fromisoformat(starttime)
if starttime < checkminutes_ago:
need_check_userid_map[userid] = user
except Exception as e:
logger.error(e)
continue
if len(need_check_userid_map.keys()) == 0:
return
corpid = qrcode.corpid
# 当前活码在线接粉号的接粉情况
user_count_info = QcWechatbizeventAddcontact.objects.filter(corpid=corpid,
userid__in=list(need_check_userid_map.keys()),
qrcodeid=qrcodeid,
createtime__gt=checkminutes_ago.timestamp()).values(
'corpid',
'userid').annotate(
qrcode_count=Count('*'),
).values('qrcode_count', 'userid')
need_check_user_count = len(need_check_userid_map.keys())
total_qrcode_count = sum([user_count.get('qrcode_count') for user_count in user_count_info])
average_qrcode_count = total_qrcode_count / need_check_user_count
offlinechecknum = qrcode.offlinechecknum
warning_logs = []
for user_count in user_count_info:
qrcode_count = user_count.get('qrcode_count', 0)
if (qrcode_count - average_qrcode_count) < offlinechecknum:
user = need_check_userid_map.get(user_count.get('userid'))
text = f"账号 {user.get('userid')} {user.get('username')} {user.get('alias')} 在渠道活码 {qrcode.name} 接粉异常"
log_info = {
'corpid': corpid,
'userid': user_count.get('userid'),
}
warning_log = WarningLog(
uid=qrcode.uid,
warning_setting_id=qrcode.warning_setting_id,
log_info=log_info,
text=text,
)
warning_logs.append(warning_log)
WarningLog.objects.bulk_create(warning_logs)
qrcode.lastchecktime = now
qrcode.save()
@shared_task(name='tag_remark_contact', queue='tag_remark_contact')
def tag_remark_contact(qrcodeid, userid, externaluserid, corpinfo):
corpid = corpinfo.get('corpid')
appsecret = corpinfo.get('appsecret')
qrcode = QcQrcodes.objects.get(id=qrcodeid)
remark = qrcode.remark
wechat_worker = WechatWorkerUtil(corpid, appsecret)
if remark:
# '{添加日期} -{昵称}'
if '{添加日期}' in remark:
now = datetime.now().strftime('%Y%m%d')
remark = remark.replace('{添加日期}', now)
if '{昵称}' in remark or '{性别}' in remark:
success, data = wechat_worker.get_external_contact(externaluserid)
if success:
(external_contact, _, _) = data
name = external_contact.get('name')
remark = remark.replace('{昵称}', name)
gender = external_contact.get('gender')
# gender 外部联系人性别 0-未知 1-男性 2-女性
if '{性别}' in remark:
if gender == 1:
remark = remark.replace('{性别}', '')
elif gender == 2:
remark = remark.replace('{性别}', '')
else:
remark = remark.replace('{性别}', '未知')
wechat_worker.externalcontact_remark(userid, externaluserid, remark)
tags = qrcode.tags
if tags:
tag_ids = [tag.get('id') for tag in tags]
wechat_worker.externalcontact_mark_tag(userid, externaluserid, tag_ids)
2023-12-15 17:42:32 +08:00
# 接粉号报警检测任务
@shared_task(name='check_follow_user', queue='check_follow_user')
def check_follow_user():
batch_size = 1000
warning_rules = QcWarningRule.objects.filter(is_on=True, is_delete=False)
count = warning_rules.count()
2023-12-22 14:09:31 +08:00
now = datetime.now()
2023-12-15 17:42:32 +08:00
for i in range(0, count, batch_size):
for warning_rule in warning_rules[i:i + batch_size]:
2023-12-19 16:03:02 +08:00
warning_setting_id = warning_rule.warning_setting_id
2023-12-15 17:42:32 +08:00
users = warning_rule.userids
if users and len(users) > 0:
corpid = users[0]["corpid"]
userid_map = {user.get('userid'): user for user in users}
warning_interval = warning_rule.warning_interval
now = datetime.now()
warning_interval_ago = now - timedelta(minutes=warning_interval)
user_count_info = QcWechatbizeventAddcontact.objects.filter(corpid=corpid,
userid__in=list(userid_map.keys())).values(
'corpid',
'userid').annotate(
warning_interval_count=Sum(
Case(
When(ctime__gte=warning_interval_ago, then=Value(1)),
default=Value(0),
)
),
).values('userid', 'warning_interval_count')
2023-12-19 16:03:02 +08:00
warning_logs = []
2023-12-22 14:09:31 +08:00
warning_setting = WarningSetting.objects.get(id=warning_setting_id)
2023-12-15 17:42:32 +08:00
for user_count in user_count_info:
if user_count.get('warning_interval_count') < 1:
user = userid_map.get(user_count.get('userid'))
2023-12-22 14:09:31 +08:00
text = f"""
报警名称{warning_setting.name}
用户id: {user.get('userid')}
用户昵称{user.get('username')}
用户别名{user.get('alias')}
持续未进粉时间{warning_interval}分钟
检测间隔5分钟
本次检测时间{now.strftime("%Y-%m-%d %H:%M:%S")}
"""
2023-12-15 17:42:32 +08:00
log_info = {
'corpid': corpid,
'userid': user_count.get('userid'),
}
2023-12-19 16:03:02 +08:00
warning_log = WarningLog(
uid=warning_rule.uid,
warning_setting_id=warning_setting_id,
log_info=log_info,
text=text,
)
warning_logs.append(warning_log)
WarningLog.objects.bulk_create(warning_logs)
2023-12-15 17:42:32 +08:00
# 活码上线
@shared_task(name='online_qrcode', queue='qrcode')
def online_qrcode():
2023-12-19 16:03:02 +08:00
logger.info('开始活码上线---')
2023-12-15 17:42:32 +08:00
batch_size = 1000
qrcodes = QcQrcodes.objects.all()
for i in range(0, qrcodes.count(), batch_size):
for qrcode in qrcodes[i:i + batch_size]:
users = qrcode.userids
need_generate_code = False
# 获取下线的用户
userids = []
for user in users:
if user.get('isonline') == QcQrcodesEditLogOperateTypeChoices.OFFLINE:
userids.append(user.get('userid'))
need_generate_code = True
user['isonline'] = QcQrcodesEditLogOperateTypeChoices.ONLINE
user['endtime'] = None
# 上线时间
user['starttime'] = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
if need_generate_code:
qrcode.save()
corp = QcCorpinfo.objects.get(corpid=qrcode.corpid, agentid=qrcode.agentid)
# 重新生成活码, 请求企微api更新活码
generate_qrcode_by_qrcode(qrcode, corp, is_update=True)
# 生成活码修改记录
for userid in userids:
QcQrcodesEditLog.objects.create(
userid=userid,
uid=qrcode.uid,
corpid=qrcode.corpid,
agentid=qrcode.agentid,
qrcodeid=qrcode.pk,
detail={
'totallimit': 0,
'daliylimit': 0,
'qrcodelimit': 0,
'addcount': 0,
'qrcodeaddcount': 0,
'qrcode_today_count': 0,
},
type=QcQrcodesEditLogTypeChoices.AUTO,
operatetype=QcQrcodesEditLogOperateTypeChoices.ONLINE,
)
2023-12-25 11:06:08 +08:00
def qrcode_channel_handler(qrcodeid, userid, externaluserid, corpinfo):
check_qrcode(qrcodeid, userid, corpinfo)
alarm_userid_by_qrcodeid(qrcodeid)
# 打标签备注
tag_remark_contact(qrcodeid, userid, externaluserid, corpinfo)