yzk_wechat_event/apps/qc/utils.py

233 lines
8.1 KiB
Python

import concurrent.futures
from datetime import datetime, timedelta
from django.db import transaction
from apps.jqr.models import JqrWechatbizuserinfo, JqrExternalQun, JqrExternalUser, JqrExternalFollowUser
from apps.qc.choices import QcCorpInfoBusinessTypeChoices
from apps.qc.models import QcWechatbizuserinfo, QcQrcodes, QcCorpinfo
from libs.wechat import WechatWorkerUtil
def get_department_map(department_list):
department_map = {}
for department in department_list:
current_id = department.get('id')
path = [current_id]
while department.get('parentid') != 0:
parent_id = department.get('parentid')
path.insert(0, parent_id)
try:
department = next(
dep for dep in department_list if dep.get('id') == parent_id)
except StopIteration:
department['parentid'] = 0
department_map[current_id] = path
return department_map
def sync_contact(department_id, department, uid, corpid, wechat_worker, business_type):
success, data = wechat_worker.get_department_user_detail(department_id)
if not success:
print(data)
return
model = None
if business_type == QcCorpInfoBusinessTypeChoices.QC:
model = QcWechatbizuserinfo
elif business_type == QcCorpInfoBusinessTypeChoices.JQR:
model = JqrWechatbizuserinfo
if model is None:
return
for user_data in data:
userid = user_data.get('userid')
user_data_dict = {
'corpid': corpid,
'maindepartment': user_data.get('main_department'),
'username': user_data.get('name'),
}
for key in user_data.keys():
try:
model._meta.get_field(key)
user_data_dict[key] = user_data.get(key)
except Exception:
pass
user_data_dict['department'] = department
# 使用 update_or_create 方法,根据指定的条件更新或创建记录
model.objects.update_or_create(
corpid=corpid,
userid=userid,
defaults=user_data_dict # 如果创建新记录,使用这些默认值
)
def sync_external_contact(external_userid, wechat_worker):
print(external_userid)
success, data = wechat_worker.get_external_contact(external_userid)
if success:
return data
def sync_contact_qun_by_corp(corp):
wechat_worker = WechatWorkerUtil(corp.corpid, corp.appsecret)
cursor = ''
model = None
business_type = corp.business_type
if business_type == QcCorpInfoBusinessTypeChoices.QC:
model = None
elif business_type == QcCorpInfoBusinessTypeChoices.JQR:
model = JqrExternalQun
if model is None:
return
batch_size = 100
while cursor is not None:
success, data = wechat_worker.get_external_groupchat_list(cursor=cursor)
if not success:
return False, data
chats, cursor = data
with concurrent.futures.ThreadPoolExecutor(max_workers=10) as executor:
total_tasks = len(chats)
for i in range(0, total_tasks, batch_size):
batch_tasks = [executor.submit(sync_contact_qun, chats[j], wechat_worker, corp, model) for j in
range(i, min(i + batch_size, total_tasks))]
concurrent.futures.wait(batch_tasks)
return True, True
def sync_contact_qun(chat, wechat_worker, corp, model):
corpid = corp.corpid
chat_id = chat.get('chat_id')
status = chat.get('status')
success, data = wechat_worker.get_external_groupchat(chat_id)
if not success:
return False, data
chat_data_dict = {
'corpid': corpid
}
for key in data.keys():
try:
model._meta.get_field(key)
chat_data_dict[key] = data.get(key)
except Exception:
pass
model.objects.update_or_create(
corpid=corpid,
chat_id=chat_id,
defaults=chat_data_dict # 如果创建新记录,使用这些默认值
)
def sync_external_user_by_corp(corp):
wechat_worker = WechatWorkerUtil(corp.corpid, corp.appsecret)
success, data = wechat_worker.get_externalcontact_follow_user_list()
if not success:
return
executor = concurrent.futures.ThreadPoolExecutor(max_workers=10)
for i in range(0, len(data), 100):
cursor = ''
while cursor is not None:
try:
success, external_contact_list, cursor = wechat_worker.get_external_contact_batch_by_user(
data[i:i + 100], cursor=cursor)
if success:
executor.submit(sync_external_user, external_contact_list, corp)
except Exception as e:
print(e)
break
executor.shutdown()
def sync_external_user(external_contact_list, corp):
corpid = corp.corpid
for e in external_contact_list:
external_contact = e.get('external_contact')
external_contact['corpid'] = corpid
external_userid = external_contact.get('external_userid')
follow_info = e.get('follow_info')
follow_info['tags'] = follow_info.pop('tag_id', None)
follow_info['corpid'] = corpid
userid = follow_info.get('userid')
with transaction.atomic():
JqrExternalUser.objects.update_or_create(
corpid=corpid,
external_userid=external_userid,
defaults=external_contact
)
JqrExternalFollowUser.objects.update_or_create(
userid=userid,
corpid=corpid,
external_userid=external_userid,
defaults=follow_info
)
def generate_qrcode_by_qrcode(instance: QcQrcodes, corp, is_update=False):
instance.state = f'mg_{instance.uid}_{instance.pk}'
userids = instance.userids
if not userids:
return
user = [userid.get('userid')
for userid in userids if userid.get('isonline')]
corpid = None
appsecret = None
if isinstance(corp, QcCorpinfo):
corpid = corp.corpid
appsecret = corp.appsecret
if isinstance(corp, dict):
corpid = corp.get('corpid')
appsecret = corp.get('appsecret')
wechat_worker = WechatWorkerUtil(corpid, appsecret)
if not is_update:
success, data = wechat_worker.add_contact_way(
user=user, skip_verify=instance.skipverify, state=instance.state)
print(success, data)
if success:
instance.configid, instance.qrcodeurl = data
return success, data
else:
success, data = wechat_worker.update_contact_way(
config_id=instance.configid,
skip_verify=instance.skipverify,
user=user,
state=instance.state,
)
return success, data
def sync_group_msg_by_corp(corp: QcCorpinfo):
wechat_worker = WechatWorkerUtil(corp.corpid, corp.appsecret)
end_time = datetime.now()
start_time = datetime.now() - timedelta(days=1)
cursor = ''
while cursor is not None:
success, (data, cursor) = wechat_worker.get_groupmsg_list_v2(start_time.timestamp().__str__().split('.')[0],
end_time.timestamp().__str__().split('.')[0],
'single',
creator=None,
filter_type=1, limit=100, cursor=cursor)
if success:
sync_groupmsg_task_by_corp(corp, data)
def sync_groupmsg_task_by_corp(corp: QcCorpinfo, group_msg_list):
wechat_worker = WechatWorkerUtil(corp.corpid, corp.appsecret)
corpid = corp.corpid
for group_msg in group_msg_list:
print(group_msg)
# cursor = ''
# while cursor is not None:
# success, (data, cursor) = wechat_worker.get_groupmsg_task(cursor=cursor)
def get_query_by_corpkey(corpkey):
try:
corpid, agentid = corpkey.split('-')
return {
'corpid': corpid,
'agentid': agentid,
}
except Exception as e:
return {}