import concurrent.futures from datetime import datetime, timedelta from django.db import transaction from apps.jqr.models import JqrWechatbizuserinfo, JqrExternalQun, JqrExternalUser, JqrExternalFollowUser from apps.qc.choices import QcCorpInfoBusinessTypeChoices from apps.qc.models import QcWechatbizuserinfo, QcQrcodes, QcCorpinfo from libs.wechat import WechatWorkerUtil def get_department_map(department_list): department_map = {} for department in department_list: current_id = department.get('id') path = [current_id] while department.get('parentid') != 0: parent_id = department.get('parentid') path.insert(0, parent_id) try: department = next( dep for dep in department_list if dep.get('id') == parent_id) except StopIteration: department['parentid'] = 0 department_map[current_id] = path return department_map def sync_contact(department_id, department, uid, corpid, wechat_worker, business_type): success, data = wechat_worker.get_department_user_detail(department_id) if not success: print(data) return model = None if business_type == QcCorpInfoBusinessTypeChoices.QC: model = QcWechatbizuserinfo elif business_type == QcCorpInfoBusinessTypeChoices.JQR: model = JqrWechatbizuserinfo if model is None: return for user_data in data: userid = user_data.get('userid') user_data_dict = { 'corpid': corpid, 'maindepartment': user_data.get('main_department'), 'username': user_data.get('name'), } for key in user_data.keys(): try: model._meta.get_field(key) user_data_dict[key] = user_data.get(key) except Exception: pass user_data_dict['department'] = department # 使用 update_or_create 方法,根据指定的条件更新或创建记录 model.objects.update_or_create( corpid=corpid, userid=userid, defaults=user_data_dict # 如果创建新记录,使用这些默认值 ) def sync_external_contact(external_userid, wechat_worker): print(external_userid) success, data = wechat_worker.get_external_contact(external_userid) if success: return data def sync_contact_qun_by_corp(corp): wechat_worker = WechatWorkerUtil(corp.corpid, corp.appsecret) cursor = '' model = None business_type = corp.business_type if business_type == QcCorpInfoBusinessTypeChoices.QC: model = None elif business_type == QcCorpInfoBusinessTypeChoices.JQR: model = JqrExternalQun if model is None: return batch_size = 100 while cursor is not None: success, data = wechat_worker.get_external_groupchat_list(cursor=cursor) if not success: return False, data chats, cursor = data with concurrent.futures.ThreadPoolExecutor(max_workers=10) as executor: total_tasks = len(chats) for i in range(0, total_tasks, batch_size): batch_tasks = [executor.submit(sync_contact_qun, chats[j], wechat_worker, corp, model) for j in range(i, min(i + batch_size, total_tasks))] concurrent.futures.wait(batch_tasks) return True, True def sync_contact_qun(chat, wechat_worker, corp, model): corpid = corp.corpid chat_id = chat.get('chat_id') status = chat.get('status') success, data = wechat_worker.get_external_groupchat(chat_id) if not success: return False, data chat_data_dict = { 'corpid': corpid } for key in data.keys(): try: model._meta.get_field(key) chat_data_dict[key] = data.get(key) except Exception: pass model.objects.update_or_create( corpid=corpid, chat_id=chat_id, defaults=chat_data_dict # 如果创建新记录,使用这些默认值 ) def sync_external_user_by_corp(corp): wechat_worker = WechatWorkerUtil(corp.corpid, corp.appsecret) success, data = wechat_worker.get_externalcontact_follow_user_list() if not success: return executor = concurrent.futures.ThreadPoolExecutor(max_workers=10) for i in range(0, len(data), 100): cursor = '' while cursor is not None: try: success, external_contact_list, cursor = wechat_worker.get_external_contact_batch_by_user( data[i:i + 100], cursor=cursor) if success: executor.submit(sync_external_user, external_contact_list, corp) except Exception as e: print(e) break executor.shutdown() def sync_external_user(external_contact_list, corp): corpid = corp.corpid for e in external_contact_list: external_contact = e.get('external_contact') external_contact['corpid'] = corpid external_userid = external_contact.get('external_userid') follow_info = e.get('follow_info') follow_info['tags'] = follow_info.pop('tag_id', None) follow_info['corpid'] = corpid userid = follow_info.get('userid') with transaction.atomic(): JqrExternalUser.objects.update_or_create( corpid=corpid, external_userid=external_userid, defaults=external_contact ) JqrExternalFollowUser.objects.update_or_create( userid=userid, corpid=corpid, external_userid=external_userid, defaults=follow_info ) def generate_qrcode_by_qrcode(instance: QcQrcodes, corp, is_update=False): instance.state = f'mg_{instance.uid}_{instance.pk}' userids = instance.userids if not userids: return user = [userid.get('userid') for userid in userids if userid.get('isonline')] corpid = None appsecret = None if isinstance(corp, QcCorpinfo): corpid = corp.corpid appsecret = corp.appsecret if isinstance(corp, dict): corpid = corp.get('corpid') appsecret = corp.get('appsecret') wechat_worker = WechatWorkerUtil(corpid, appsecret) if not is_update: success, data = wechat_worker.add_contact_way( user=user, skip_verify=instance.skipverify, state=instance.state) print(success, data) if success: instance.configid, instance.qrcodeurl = data return success, data else: success, data = wechat_worker.update_contact_way( config_id=instance.configid, skip_verify=instance.skipverify, user=user, state=instance.state, ) return success, data def sync_group_msg_by_corp(corp: QcCorpinfo): wechat_worker = WechatWorkerUtil(corp.corpid, corp.appsecret) end_time = datetime.now() start_time = datetime.now() - timedelta(days=1) cursor = '' while cursor is not None: success, (data, cursor) = wechat_worker.get_groupmsg_list_v2(start_time.timestamp().__str__().split('.')[0], end_time.timestamp().__str__().split('.')[0], 'single', creator=None, filter_type=1, limit=100, cursor=cursor) if success: sync_groupmsg_task_by_corp(corp, data) def sync_groupmsg_task_by_corp(corp: QcCorpinfo, group_msg_list): wechat_worker = WechatWorkerUtil(corp.corpid, corp.appsecret) corpid = corp.corpid for group_msg in group_msg_list: print(group_msg) # cursor = '' # while cursor is not None: # success, (data, cursor) = wechat_worker.get_groupmsg_task(cursor=cursor) def get_query_by_corpkey(corpkey): try: corpid, agentid = corpkey.split('-') return { 'corpid': corpid, 'agentid': agentid, } except Exception as e: return {}