233 lines
8.1 KiB
Python
233 lines
8.1 KiB
Python
import concurrent.futures
|
|
from datetime import datetime, timedelta
|
|
|
|
from django.db import transaction
|
|
|
|
from apps.jqr.models import JqrWechatbizuserinfo, JqrExternalQun, JqrExternalUser, JqrExternalFollowUser
|
|
from apps.qc.choices import QcCorpInfoBusinessTypeChoices
|
|
from apps.qc.models import QcWechatbizuserinfo, QcQrcodes, QcCorpinfo
|
|
from libs.wechat import WechatWorkerUtil
|
|
|
|
|
|
def get_department_map(department_list):
|
|
department_map = {}
|
|
for department in department_list:
|
|
current_id = department.get('id')
|
|
path = [current_id]
|
|
while department.get('parentid') != 0:
|
|
parent_id = department.get('parentid')
|
|
path.insert(0, parent_id)
|
|
try:
|
|
department = next(
|
|
dep for dep in department_list if dep.get('id') == parent_id)
|
|
except StopIteration:
|
|
department['parentid'] = 0
|
|
department_map[current_id] = path
|
|
return department_map
|
|
|
|
|
|
def sync_contact(department_id, department, uid, corpid, wechat_worker, business_type):
|
|
success, data = wechat_worker.get_department_user_detail(department_id)
|
|
if not success:
|
|
print(data)
|
|
return
|
|
model = None
|
|
if business_type == QcCorpInfoBusinessTypeChoices.QC:
|
|
model = QcWechatbizuserinfo
|
|
elif business_type == QcCorpInfoBusinessTypeChoices.JQR:
|
|
model = JqrWechatbizuserinfo
|
|
if model is None:
|
|
return
|
|
|
|
for user_data in data:
|
|
userid = user_data.get('userid')
|
|
|
|
user_data_dict = {
|
|
'corpid': corpid,
|
|
'maindepartment': user_data.get('main_department'),
|
|
'username': user_data.get('name'),
|
|
}
|
|
for key in user_data.keys():
|
|
try:
|
|
model._meta.get_field(key)
|
|
user_data_dict[key] = user_data.get(key)
|
|
except Exception:
|
|
pass
|
|
user_data_dict['department'] = department
|
|
# 使用 update_or_create 方法,根据指定的条件更新或创建记录
|
|
model.objects.update_or_create(
|
|
corpid=corpid,
|
|
userid=userid,
|
|
defaults=user_data_dict # 如果创建新记录,使用这些默认值
|
|
)
|
|
|
|
|
|
def sync_external_contact(external_userid, wechat_worker):
|
|
print(external_userid)
|
|
success, data = wechat_worker.get_external_contact(external_userid)
|
|
if success:
|
|
return data
|
|
|
|
|
|
def sync_contact_qun_by_corp(corp):
|
|
wechat_worker = WechatWorkerUtil(corp.corpid, corp.appsecret)
|
|
cursor = ''
|
|
model = None
|
|
business_type = corp.business_type
|
|
if business_type == QcCorpInfoBusinessTypeChoices.QC:
|
|
model = None
|
|
elif business_type == QcCorpInfoBusinessTypeChoices.JQR:
|
|
model = JqrExternalQun
|
|
if model is None:
|
|
return
|
|
|
|
batch_size = 100
|
|
while cursor is not None:
|
|
success, data = wechat_worker.get_external_groupchat_list(cursor=cursor)
|
|
if not success:
|
|
return False, data
|
|
chats, cursor = data
|
|
with concurrent.futures.ThreadPoolExecutor(max_workers=10) as executor:
|
|
total_tasks = len(chats)
|
|
for i in range(0, total_tasks, batch_size):
|
|
batch_tasks = [executor.submit(sync_contact_qun, chats[j], wechat_worker, corp, model) for j in
|
|
range(i, min(i + batch_size, total_tasks))]
|
|
concurrent.futures.wait(batch_tasks)
|
|
return True, True
|
|
|
|
|
|
def sync_contact_qun(chat, wechat_worker, corp, model):
|
|
corpid = corp.corpid
|
|
chat_id = chat.get('chat_id')
|
|
status = chat.get('status')
|
|
success, data = wechat_worker.get_external_groupchat(chat_id)
|
|
if not success:
|
|
return False, data
|
|
chat_data_dict = {
|
|
'corpid': corpid
|
|
}
|
|
for key in data.keys():
|
|
try:
|
|
model._meta.get_field(key)
|
|
chat_data_dict[key] = data.get(key)
|
|
except Exception:
|
|
pass
|
|
model.objects.update_or_create(
|
|
corpid=corpid,
|
|
chat_id=chat_id,
|
|
defaults=chat_data_dict # 如果创建新记录,使用这些默认值
|
|
)
|
|
|
|
|
|
def sync_external_user_by_corp(corp):
|
|
wechat_worker = WechatWorkerUtil(corp.corpid, corp.appsecret)
|
|
success, data = wechat_worker.get_externalcontact_follow_user_list()
|
|
if not success:
|
|
return
|
|
executor = concurrent.futures.ThreadPoolExecutor(max_workers=10)
|
|
for i in range(0, len(data), 100):
|
|
cursor = ''
|
|
while cursor is not None:
|
|
try:
|
|
success, external_contact_list, cursor = wechat_worker.get_external_contact_batch_by_user(
|
|
data[i:i + 100], cursor=cursor)
|
|
if success:
|
|
executor.submit(sync_external_user, external_contact_list, corp)
|
|
except Exception as e:
|
|
print(e)
|
|
break
|
|
executor.shutdown()
|
|
|
|
|
|
def sync_external_user(external_contact_list, corp):
|
|
corpid = corp.corpid
|
|
for e in external_contact_list:
|
|
external_contact = e.get('external_contact')
|
|
external_contact['corpid'] = corpid
|
|
external_userid = external_contact.get('external_userid')
|
|
follow_info = e.get('follow_info')
|
|
follow_info['tags'] = follow_info.pop('tag_id', None)
|
|
follow_info['corpid'] = corpid
|
|
userid = follow_info.get('userid')
|
|
with transaction.atomic():
|
|
JqrExternalUser.objects.update_or_create(
|
|
corpid=corpid,
|
|
external_userid=external_userid,
|
|
defaults=external_contact
|
|
)
|
|
JqrExternalFollowUser.objects.update_or_create(
|
|
userid=userid,
|
|
corpid=corpid,
|
|
external_userid=external_userid,
|
|
defaults=follow_info
|
|
)
|
|
|
|
|
|
def generate_qrcode_by_qrcode(instance: QcQrcodes, corp, is_update=False):
|
|
instance.state = f'mg_{instance.uid}_{instance.pk}'
|
|
userids = instance.userids
|
|
if not userids:
|
|
return
|
|
user = [userid.get('userid')
|
|
for userid in userids if userid.get('isonline')]
|
|
corpid = None
|
|
appsecret = None
|
|
if isinstance(corp, QcCorpinfo):
|
|
corpid = corp.corpid
|
|
appsecret = corp.appsecret
|
|
if isinstance(corp, dict):
|
|
corpid = corp.get('corpid')
|
|
appsecret = corp.get('appsecret')
|
|
wechat_worker = WechatWorkerUtil(corpid, appsecret)
|
|
if not is_update:
|
|
success, data = wechat_worker.add_contact_way(
|
|
user=user, skip_verify=instance.skipverify, state=instance.state)
|
|
print(success, data)
|
|
if success:
|
|
instance.configid, instance.qrcodeurl = data
|
|
return success, data
|
|
else:
|
|
success, data = wechat_worker.update_contact_way(
|
|
config_id=instance.configid,
|
|
skip_verify=instance.skipverify,
|
|
user=user,
|
|
state=instance.state,
|
|
)
|
|
return success, data
|
|
|
|
|
|
def sync_group_msg_by_corp(corp: QcCorpinfo):
|
|
wechat_worker = WechatWorkerUtil(corp.corpid, corp.appsecret)
|
|
end_time = datetime.now()
|
|
start_time = datetime.now() - timedelta(days=1)
|
|
cursor = ''
|
|
while cursor is not None:
|
|
success, (data, cursor) = wechat_worker.get_groupmsg_list_v2(start_time.timestamp().__str__().split('.')[0],
|
|
end_time.timestamp().__str__().split('.')[0],
|
|
'single',
|
|
creator=None,
|
|
filter_type=1, limit=100, cursor=cursor)
|
|
if success:
|
|
sync_groupmsg_task_by_corp(corp, data)
|
|
|
|
|
|
def sync_groupmsg_task_by_corp(corp: QcCorpinfo, group_msg_list):
|
|
wechat_worker = WechatWorkerUtil(corp.corpid, corp.appsecret)
|
|
corpid = corp.corpid
|
|
for group_msg in group_msg_list:
|
|
print(group_msg)
|
|
# cursor = ''
|
|
# while cursor is not None:
|
|
# success, (data, cursor) = wechat_worker.get_groupmsg_task(cursor=cursor)
|
|
|
|
|
|
def get_query_by_corpkey(corpkey):
|
|
try:
|
|
corpid, agentid = corpkey.split('-')
|
|
return {
|
|
'corpid': corpid,
|
|
'agentid': agentid,
|
|
}
|
|
except Exception as e:
|
|
return {}
|