yzk_wechat_event/apps/jqr/tasks.py

574 lines
24 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from celery import shared_task
from datetime import datetime, date, timedelta
import logging
from django.db.models import Count, Sum, When, Case, Value
from apps.jqr.choices import JqrWechatbizuserinfoDeleteTypeChoices, JqrAddTypeChoices
from apps.jqr.pubsub import JQRQrcodeCallbackPubSub
from apps.jqr.utils import send_new_user_msg
from apps.jqr.ws import WS
from apps.qc.choices import QcWechatbizeventAddcontactIsDeleteChoices, QcQrcodesEditLogOperateTypeChoices, \
QcQrcodesEditLogTypeChoices
from apps.qc.models import QcWechatbizeventAddcontact, QcQrcodes, QcWechatbizuserinfo, QcQrcodesEditLog, QcCorpinfo
from apps.jqr.models import JqrExternalFollowUser, JqrExternalUser, JqrHookUser, JqrExternalQun
from apps.qc.utils import generate_qrcode_by_qrcode
from apps.warning.models import QcWarningRule, WarningLog, WarningSetting
from libs.wechat import WechatWorkerUtil
from utils.redis_lock import SimpleLock
logger = logging.getLogger('apps')
@shared_task(name='save_add_contact', queue='contact')
def save_add_contact(data, corpinfo, *args, **kwargs):
state = data.get('state')
userid = data.get('userid')
if state and state.startswith('mg') and '_' in state:
[_, _, qrcodeid] = state.split('_')
data['qrcodeid'] = qrcodeid
externaluserid = data.get('externaluserid')
corpid = data.get('corpid')
QcWechatbizeventAddcontact.objects.update_or_create(
userid=userid,
externaluserid=externaluserid,
corpid=corpid,
defaults={
**data,
'isdelete': QcWechatbizeventAddcontactIsDeleteChoices.NOT_DELETE,
'deletetime': None,
'ctime': datetime.now(),
}
)
qrcodeid = data.get('qrcodeid')
if qrcodeid:
check_qrcode.delay(qrcodeid, userid, corpinfo)
alarm_userid_by_qrcodeid.delay(qrcodeid)
# 打标签备注
tag_remark_contact(qrcodeid, userid, externaluserid, corpinfo)
# 修复客户关系
edit_add_contact(data, corpinfo, *args, **kwargs)
# TODO 请求API 发送消息
def save_add_contact_by_channel(data, corpinfo, *args, **kwargs):
userid = data.get('userid')
externaluserid = data.get('externaluserid')
corpid = data.get('corpid')
QcWechatbizeventAddcontact.objects.update_or_create(
userid=userid,
externaluserid=externaluserid,
corpid=corpid,
defaults={
**data,
'isdelete': QcWechatbizeventAddcontactIsDeleteChoices.NOT_DELETE,
'deletetime': None,
'ctime': datetime.now(),
}
)
qrcodeid = data.get('qrcodeid')
if qrcodeid:
JQRQrcodeCallbackPubSub.publish({
'qrcodeid': qrcodeid,
'userid': userid,
'externaluserid': externaluserid,
'corpinfo': corpinfo,
'handler': f'{qrcode_channel_handler.__module__}.{qrcode_channel_handler.__name__}'
})
# 修复客户关系
edit_add_contact(data, corpinfo, *args, **kwargs)
# # 转化外部用户Id
# WS.transfer_external_userid_to_vid(corpid, userid, externaluserid)
# 发送新客欢迎
hook_user = JqrHookUser.objects.filter(corpid=corpid, userid=userid).first()
if hook_user is not None:
# utime 是否在 9 分钟内
now = datetime.now()
nine_minute_ago = now - timedelta(minutes=9)
uid = hook_user.uid
createtime = data.get('createtime')
rc = SimpleLock(f'{corpid}:{userid}:{externaluserid}:{createtime}')
success = rc.try_lock(60 * 10)
if hook_user.utime > nine_minute_ago and hook_user.new_user and success:
# 发送消息
send_new_user_msg(corpid, userid, external_userid=externaluserid, uid=uid)
@shared_task(name='edit_add_contact', queue='contact')
def edit_add_contact(data, corpinfo, *args, **kwargs):
# 更新 关系表
externaluserid = data.get('externaluserid')
corpid = corpinfo.get('corpid')
appsecret = corpinfo.get('appsecret')
wechat_worker = WechatWorkerUtil(corpid, appsecret)
cursor = ''
while cursor is not None:
success, data = wechat_worker.get_external_contact(externaluserid, cursor=cursor)
if not success:
logger.error(f'获取外部联系人信息失败,{data}')
return
(external_contact, follow_user, cursor) = data
JqrExternalUser.objects.update_or_create(
corpid=corpid,
external_userid=externaluserid,
defaults={
**external_contact,
'addtype': JqrAddTypeChoices.EVENT_CALLBACK,
}
)
for follow_info in follow_user:
userid = follow_info.get('userid')
follow_info['tags'] = [tag.get('tag_id') for tag in follow_info.get('tags', [])]
JqrExternalFollowUser.objects.update_or_create(
userid=userid,
corpid=corpid,
external_userid=externaluserid,
defaults={
**follow_info,
'addtype': JqrAddTypeChoices.EVENT_CALLBACK,
'deletetype': 0,
'dtime': None
}
)
def edit_add_contact_by_channel(data, corpinfo, *args, **kwargs):
# 更新 关系表
externaluserid = data.get('externaluserid')
corpid = corpinfo.get('corpid')
appsecret = corpinfo.get('appsecret')
wechat_worker = WechatWorkerUtil(corpid, appsecret)
cursor = ''
while cursor is not None:
success, data = wechat_worker.get_external_contact(externaluserid, cursor=cursor)
if not success:
logger.error(f'获取外部联系人信息失败,{data}')
return
(external_contact, follow_user, cursor) = data
JqrExternalUser.objects.update_or_create(
corpid=corpid,
external_userid=externaluserid,
defaults={
**external_contact,
'addtype': JqrAddTypeChoices.EVENT_CALLBACK,
}
)
for follow_info in follow_user:
userid = follow_info.get('userid')
follow_info['tags'] = [tag.get('tag_id') for tag in follow_info.get('tags', [])]
JqrExternalFollowUser.objects.update_or_create(
userid=userid,
corpid=corpid,
external_userid=externaluserid,
defaults={
**follow_info,
'addtype': JqrAddTypeChoices.EVENT_CALLBACK,
'deletetype': 0,
'dtime': None
}
)
@shared_task(name='delete_add_contact', queue='contact')
def delete_add_contact(data, corpinfo, *args, **kwargs):
userid = data.get('userid')
externaluserid = data.get('externaluserid')
corpid = data.get('corpid')
agentid = data.get('agentid')
# 更新add_contact 表删除字段
QcWechatbizeventAddcontact.objects.filter(userid=userid, externaluserid=externaluserid, corpid=corpid,
agentid=agentid).update(
isdelete=QcWechatbizeventAddcontactIsDeleteChoices.DELETED,
deletetime=datetime.now()
)
JqrExternalFollowUser.objects.filter(userid=userid, external_userid=externaluserid, corpid=corpid).update(
deletetype=JqrWechatbizuserinfoDeleteTypeChoices.EVENT_CALLBACK,
dtime=datetime.now()
)
def delete_add_contact_by_channel(data, corpinfo, *args, **kwargs):
userid = data.get('userid')
externaluserid = data.get('externaluserid')
corpid = data.get('corpid')
agentid = data.get('agentid')
# 更新add_contact 表删除字段
QcWechatbizeventAddcontact.objects.filter(userid=userid, externaluserid=externaluserid, corpid=corpid,
agentid=agentid).update(
isdelete=QcWechatbizeventAddcontactIsDeleteChoices.DELETED,
deletetime=datetime.now()
)
JqrExternalFollowUser.objects.filter(userid=userid, external_userid=externaluserid, corpid=corpid).update(
deletetype=JqrWechatbizuserinfoDeleteTypeChoices.EVENT_CALLBACK,
dtime=datetime.now()
)
@shared_task(name='check_qrcode', queue='qrcode')
def check_qrcode(qrcodeid, userid, corpinfo):
qrcode = QcQrcodes.objects.get(id=qrcodeid)
users = qrcode.userids
if not users:
return
online_userid_map = {user.get('userid'): user for user in users if
user.get('isonline') == QcQrcodesEditLogOperateTypeChoices.OFFLINE}
corpid = qrcode.corpid
userinfo = QcWechatbizuserinfo.objects.get(corpid=corpid, userid=userid)
today = date.today()
# 账号当前活码的接粉情况
count_info = QcWechatbizeventAddcontact.objects.filter(corpid=corpid,
userid=userid).values(
'corpid',
'userid').aggregate(
total_count=Count('*'),
today_count=Sum(
Case(
When(ctime__date=today, then=Value(1)),
default=Value(0),
)
),
qrcode_count=Sum(
Case(
When(qrcodeid=qrcodeid, then=Value(1)),
default=Value(0),
)
),
qrcode_today_count=Sum(
Case(
When(qrcodeid=qrcodeid, ctime__date=today, then=Value(1)),
default=Value(0),
)
)
)
# 该账号总共加了多少人
total_count = count_info.get('total_count')
# 该账号今天加了多少人
today_count = count_info.get('today_count')
# 该账号当前活码加了多少人
qrcode_count = count_info.get('qrcode_count')
# 该账号今天当前活码加了多少人
qrcode_today_count = count_info.get('qrcode_today_count')
# 该用户所有活码每日新增好友上限
limitaddusercount = userinfo.limitaddusercount
# 该用户总加粉总上限
totallimitaddusercount = userinfo.totallimitaddusercount
# 该用户当前活码上限
user_qrcode_limitcount = online_userid_map.get(userid, {}).get('limitcount')
# 该账号是否需要下线
need_offline = False
warning_text_list = []
# 当前接粉号
current_user = online_userid_map.get(userid, {})
# 该账号今天当前活码加的人数 大于 该用户当前活码上限
if user_qrcode_limitcount is not None and qrcode_today_count >= user_qrcode_limitcount > 0:
warning_text_list.append(
f'账号 {current_user.get("username")}-{current_user.get("alias")}-{userid} 今天 {qrcode.name}活码加的人数: {qrcode_today_count} 大于 该用户当前活码上限: {user_qrcode_limitcount}')
need_offline = True
if total_count is not None and total_count >= totallimitaddusercount > 0:
warning_text_list.append(
f'账号 {current_user.get("username")}-{current_user.get("alias")}-{userid} 总共加了 {total_count} 人 大于 该用户总共加粉上限: {totallimitaddusercount}')
need_offline = True
if today_count is not None and today_count >= limitaddusercount > 0:
warning_text_list.append(
f'账号 {current_user.get("username")}-{current_user.get("alias")}-{userid} 今天加了 {today_count} 人 大于 该用户今天加粉上限: {limitaddusercount}')
need_offline = True
if need_offline:
warning_setting_id = qrcode.warning_setting_id
# 最后一个接粉号不下线
if len(online_userid_map) > 1:
for user in users:
if user.get('userid') == userid:
user['isonline'] = QcQrcodesEditLogOperateTypeChoices.OFFLINE
user['endtime'] = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
qrcode.save()
# 重新生成活码, 请求企微api更新活码
generate_qrcode_by_qrcode(qrcode, corpinfo, is_update=True)
# 生成活码修改记录
QcQrcodesEditLog.objects.create(
userid=userid,
uid=qrcode.uid,
corpid=qrcode.corpid,
agentid=qrcode.agentid,
qrcodeid=qrcode.pk,
detail={
'totallimit': totallimitaddusercount,
'daliylimit': limitaddusercount,
'qrcodelimit': user_qrcode_limitcount,
'addcount': total_count,
'qrcodeaddcount': qrcode_count,
'qrcode_today_count': qrcode_today_count,
},
type=QcQrcodesEditLogTypeChoices.AUTO,
operatetype=QcQrcodesEditLogOperateTypeChoices.OFFLINE,
)
WarningLog.objects.create(
text=','.join(warning_text_list),
uid=qrcode.uid,
log_info={
'corpid': corpid,
'userid': userid,
},
warning_setting_id=warning_setting_id,
)
@shared_task(name='alarm_userid_by_qrcodeid', queue='qrcode')
def alarm_userid_by_qrcodeid(qrcodeid):
qrcode = QcQrcodes.objects.get(id=qrcodeid)
users = qrcode.userids
if not users:
return
# 活码成员报警检测
checkminutes = qrcode.checkminutes
now = datetime.now()
checkminutes_ago = now - timedelta(minutes=checkminutes)
online_userid_map = {user.get('userid'): user for user in users if
user.get('isonline') == QcQrcodesEditLogOperateTypeChoices.OFFLINE}
need_check_userid_map = {}
for userid, user in online_userid_map.items():
starttime = user.get('starttime')
if starttime is None:
continue
try:
starttime = datetime.fromisoformat(starttime)
if starttime < checkminutes_ago:
need_check_userid_map[userid] = user
except Exception as e:
logger.error(e)
continue
if len(need_check_userid_map.keys()) == 0:
return
corpid = qrcode.corpid
# 当前活码在线接粉号的接粉情况
user_count_info = QcWechatbizeventAddcontact.objects.filter(corpid=corpid,
userid__in=list(need_check_userid_map.keys()),
qrcodeid=qrcodeid,
createtime__gt=checkminutes_ago.timestamp()).values(
'corpid',
'userid').annotate(
qrcode_count=Count('*'),
).values('qrcode_count', 'userid')
need_check_user_count = len(need_check_userid_map.keys())
total_qrcode_count = sum([user_count.get('qrcode_count') for user_count in user_count_info])
average_qrcode_count = total_qrcode_count / need_check_user_count
offlinechecknum = qrcode.offlinechecknum
warning_logs = []
for user_count in user_count_info:
qrcode_count = user_count.get('qrcode_count', 0)
if (qrcode_count - average_qrcode_count) < offlinechecknum:
user = need_check_userid_map.get(user_count.get('userid'))
text = f"账号 {user.get('userid')} {user.get('username')} {user.get('alias')} 在渠道活码 {qrcode.name} 接粉异常"
log_info = {
'corpid': corpid,
'userid': user_count.get('userid'),
}
warning_log = WarningLog(
uid=qrcode.uid,
warning_setting_id=qrcode.warning_setting_id,
log_info=log_info,
text=text,
)
warning_logs.append(warning_log)
WarningLog.objects.bulk_create(warning_logs)
qrcode.lastchecktime = now
qrcode.save()
@shared_task(name='tag_remark_contact', queue='tag_remark_contact')
def tag_remark_contact(qrcodeid, userid, externaluserid, corpinfo):
corpid = corpinfo.get('corpid')
appsecret = corpinfo.get('appsecret')
qrcode = QcQrcodes.objects.get(id=qrcodeid)
remark = qrcode.remark
wechat_worker = WechatWorkerUtil(corpid, appsecret)
if remark:
# '{添加日期} -{昵称}'
if '{添加日期}' in remark:
now = datetime.now().strftime('%Y%m%d')
remark = remark.replace('{添加日期}', now)
if '{昵称}' in remark or '{性别}' in remark:
success, data = wechat_worker.get_external_contact(externaluserid)
if success:
(external_contact, _, _) = data
name = external_contact.get('name')
remark = remark.replace('{昵称}', name)
gender = external_contact.get('gender')
# gender 外部联系人性别 0-未知 1-男性 2-女性
if '{性别}' in remark:
if gender == 1:
remark = remark.replace('{性别}', '')
elif gender == 2:
remark = remark.replace('{性别}', '')
else:
remark = remark.replace('{性别}', '未知')
wechat_worker.externalcontact_remark(userid, externaluserid, remark)
tags = qrcode.tags
if tags:
tag_ids = [tag.get('id') for tag in tags]
wechat_worker.externalcontact_mark_tag(userid, externaluserid, tag_ids)
# 接粉号报警检测任务
@shared_task(name='check_follow_user', queue='check_follow_user')
def check_follow_user():
batch_size = 1000
warning_rules = QcWarningRule.objects.filter(is_on=True, is_delete=False)
count = warning_rules.count()
now = datetime.now()
for i in range(0, count, batch_size):
for warning_rule in warning_rules[i:i + batch_size]:
warning_setting_id = warning_rule.warning_setting_id
users = warning_rule.userids
if users and len(users) > 0:
corpid = users[0]["corpid"]
userid_map = {user.get('userid'): user for user in users}
warning_interval = warning_rule.warning_interval
now = datetime.now()
warning_interval_ago = now - timedelta(minutes=warning_interval)
user_count_info = QcWechatbizeventAddcontact.objects.filter(corpid=corpid,
userid__in=list(userid_map.keys())).values(
'corpid',
'userid').annotate(
warning_interval_count=Sum(
Case(
When(ctime__gte=warning_interval_ago, then=Value(1)),
default=Value(0),
)
),
).values('userid', 'warning_interval_count')
warning_logs = []
warning_setting = WarningSetting.objects.get(id=warning_setting_id)
for user_count in user_count_info:
if user_count.get('warning_interval_count') < 1:
user = userid_map.get(user_count.get('userid'))
text = f"""
报警名称:{warning_setting.name}
用户id: {user.get('userid')}
用户昵称:{user.get('username')}
用户别名:{user.get('alias')}
持续未进粉时间:{warning_interval}分钟
检测间隔5分钟
本次检测时间:{now.strftime("%Y-%m-%d %H:%M:%S")}
"""
log_info = {
'corpid': corpid,
'userid': user_count.get('userid'),
}
warning_log = WarningLog(
uid=warning_rule.uid,
warning_setting_id=warning_setting_id,
log_info=log_info,
text=text,
)
warning_logs.append(warning_log)
WarningLog.objects.bulk_create(warning_logs)
# 活码上线
@shared_task(name='online_qrcode', queue='qrcode')
def online_qrcode():
logger.info('开始活码上线---')
batch_size = 1000
qrcodes = QcQrcodes.objects.all()
for i in range(0, qrcodes.count(), batch_size):
for qrcode in qrcodes[i:i + batch_size]:
users = qrcode.userids
need_generate_code = False
# 获取下线的用户
userids = []
for user in users:
if user.get('isonline') == QcQrcodesEditLogOperateTypeChoices.OFFLINE:
userids.append(user.get('userid'))
need_generate_code = True
user['isonline'] = QcQrcodesEditLogOperateTypeChoices.ONLINE
user['endtime'] = None
# 上线时间
user['starttime'] = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
if need_generate_code:
qrcode.save()
corp = QcCorpinfo.objects.get(corpid=qrcode.corpid, agentid=qrcode.agentid)
# 重新生成活码, 请求企微api更新活码
generate_qrcode_by_qrcode(qrcode, corp, is_update=True)
# 生成活码修改记录
for userid in userids:
QcQrcodesEditLog.objects.create(
userid=userid,
uid=qrcode.uid,
corpid=qrcode.corpid,
agentid=qrcode.agentid,
qrcodeid=qrcode.pk,
detail={
'totallimit': 0,
'daliylimit': 0,
'qrcodelimit': 0,
'addcount': 0,
'qrcodeaddcount': 0,
'qrcode_today_count': 0,
},
type=QcQrcodesEditLogTypeChoices.AUTO,
operatetype=QcQrcodesEditLogOperateTypeChoices.ONLINE,
)
def qrcode_channel_handler(qrcodeid, userid, externaluserid, corpinfo):
check_qrcode(qrcodeid, userid, corpinfo)
alarm_userid_by_qrcodeid(qrcodeid)
# 打标签备注
tag_remark_contact(qrcodeid, userid, externaluserid, corpinfo)
def add_qun_by_channel(data, corpinfo, *args, **kwargs):
chat_id = data.get('chatid')
corpid = corpinfo.get('corpid')
appsecret = corpinfo.get('appsecret')
wechat_worker = WechatWorkerUtil(corpid, appsecret)
success, data = wechat_worker.get_external_groupchat(chat_id)
if not success:
return
chat_data_dict = {
'corpid': corpid,
'addtype': JqrAddTypeChoices.EVENT_CALLBACK,
}
for key in data.keys():
try:
JqrExternalQun._meta.get_field(key)
chat_data_dict[key] = data.get(key)
except Exception:
pass
JqrExternalQun.objects.update_or_create(
corpid=corpid,
chat_id=chat_id,
defaults=chat_data_dict # 如果创建新记录,使用这些默认值
)
def update_qun_by_channel(data, corpinfo, *args, **kwargs):
update_detail = data.get('updatedetail')
chat_id = data.get('chatid')
corpid = corpinfo.get('corpid')
qun = JqrExternalQun.objects.filter(chat_id=chat_id, corpid=corpid).first()
print(f'新增群成员,{chat_id}')
print(f'qun ---> {qun}')
if qun is None:
return
userid = qun.owner
if update_detail == 'add_member':
uid = None
hook_user = JqrHookUser.objects.filter(corpid=corpid, userid=userid).first()
if hook_user is not None:
uid = hook_user.uid
send_new_user_msg(corpid, userid, chat_id=chat_id, uid=uid)
def del_qun_by_channel(data, corpinfo, *args, **kwargs):
chat_id = data.get('chatid')
corpid = corpinfo.get('corpid')
logger.info(f'删除群聊,{chat_id}')
JqrExternalQun.objects.filter(chat_id=chat_id, corpid=corpid).delete()