diff --git a/apps/jqr/serializers.py b/apps/jqr/serializers.py index bf4c594..e2457dd 100644 --- a/apps/jqr/serializers.py +++ b/apps/jqr/serializers.py @@ -5,6 +5,7 @@ from rest_framework import serializers from apps.jqr.models import JqrHookUser from apps.jqr.tasks import save_add_contact, delete_add_contact, edit_add_contact from apps.msg.models import TbMessage +from apps.msg.utils import JQRMSGPubSubUtils from libs.weworkapi.callback.WXBizMsgCrypt3 import WXBizMsgCrypt, Prpcrypt from utils.tools import sha1_encoder, get_attribute, camel_to_snake from utils.base_serializer import BaseSerializer, CurrentIpDefault @@ -91,10 +92,12 @@ class WechatEncryptSerializer(serializers.Serializer): print('添加客户事件') corp = self.context.get('corp') data.pop('tousername') - data['corpid'] = corp.corpid - data['agentid'] = corp.agentid - data['uid'] = corp.uid print(data) + state = data.get('state') + if state and state.startswith('mg') and '_' in state: + data['corpid'] = corp.corpid + data['agentid'] = corp.agentid + data['uid'] = corp.uid save_add_contact.delay(data, corp.to_dict(['corpid', 'appsecret'])) def handle_change_external_contact_edit_external_contact(self, data): @@ -104,10 +107,12 @@ class WechatEncryptSerializer(serializers.Serializer): print('编辑客户事件') corp = self.context.get('corp') data.pop('tousername') - data['corpid'] = corp.corpid - data['agentid'] = corp.agentid - data['uid'] = corp.uid print(data) + state = data.get('state') + if state and state.startswith('mg') and '_' in state: + data['corpid'] = corp.corpid + data['agentid'] = corp.agentid + data['uid'] = corp.uid edit_add_contact.delay(data, corp.to_dict(['corpid', 'appsecret'])) def handle_change_external_contact_add_half_external_contact(self, data): @@ -117,10 +122,12 @@ class WechatEncryptSerializer(serializers.Serializer): print('外部联系人免验证添加成员事件') corp = self.context.get('corp') data.pop('tousername') - data['corpid'] = corp.corpid - data['agentid'] = corp.agentid - data['uid'] = corp.uid print(data) + state = data.get('state') + if state and state.startswith('mg') and '_' in state: + data['corpid'] = corp.corpid + data['agentid'] = corp.agentid + data['uid'] = corp.uid save_add_contact.delay(data, corp.to_dict(['corpid', 'appsecret'])) def handle_change_external_contact_del_follow_user(self, data): @@ -130,10 +137,12 @@ class WechatEncryptSerializer(serializers.Serializer): print('删除跟进成员事件') corp = self.context.get('corp') data.pop('tousername') - data['corpid'] = corp.corpid - data['agentid'] = corp.agentid - data['uid'] = corp.uid print(data) + state = data.get('state') + if state and state.startswith('mg') and '_' in state: + data['corpid'] = corp.corpid + data['agentid'] = corp.agentid + data['uid'] = corp.uid delete_add_contact.delay(data, corp.to_dict(['corpid', 'appsecret'])) def handle_change_external_chat_create(self, data): @@ -182,6 +191,7 @@ class TbMessageModelSerializer(BaseSerializer): model = TbMessage fields = '__all__' - # def create(self, validated_data): - # return TbMessage(**validated_data) + def create(self, validated_data): + JQRMSGPubSubUtils.publish(self.context.get('request').data) + return TbMessage(**validated_data) diff --git a/apps/msg/pub.py b/apps/msg/pub.py index 36d8354..d7c0d70 100644 --- a/apps/msg/pub.py +++ b/apps/msg/pub.py @@ -23,4 +23,4 @@ data = { for i in range(10): value_new = json.dumps(data) - rc.publish("jqr_msg", value_new) + rc.publish("jqr:msg", value_new) diff --git a/apps/msg/sub.py b/apps/msg/sub.py index 072a557..a197a92 100644 --- a/apps/msg/sub.py +++ b/apps/msg/sub.py @@ -5,20 +5,21 @@ import json rc = StrictRedis(host='localhost', port=6379, db=0) ps = rc.pubsub() -ps.subscribe('jqr_msg') +ps.subscribe('jqr:msg') msg_list = [] +batch_size = 2 while True: for item in ps.listen(): if item['type'] == 'message': data = item.get('data') print(data) msg_list.append(data) - if len(msg_list) >= 10: + if len(msg_list) >= batch_size: try: time.sleep(1) - msg_list.clear() + msg_list = msg_list[-batch_size:] except Exception as e: print(e) print(len(msg_list)) diff --git a/apps/msg/utils.py b/apps/msg/utils.py new file mode 100644 index 0000000..1c58ca0 --- /dev/null +++ b/apps/msg/utils.py @@ -0,0 +1,36 @@ +import json + +from redis import StrictRedis +from django.conf import settings +from threading import Thread + +from apps.msg.models import TbMessage +from yzk_wechat_event.settings.constant import Constant + + +class JQRMSGPubSubUtils: + rc = StrictRedis(host=settings.REDIS_HOST, port=settings.REDIS_PORT, db=0) + msg_list = [] + BATCH_SIZE = 10 + + @classmethod + def publish(cls, data): + cls.rc.publish(Constant.JQR_MSG_PUBSUB_CHANNEL, json.dumps(data)) + + +def listen(): + pubsub = JQRMSGPubSubUtils.rc.pubsub() + pubsub.subscribe(Constant.JQR_MSG_PUBSUB_CHANNEL) + for msg in pubsub.listen(): + print(msg) + if msg['type'] == 'message': + data = msg.get('data') + data = json.dumps(str(data)) + JQRMSGPubSubUtils.msg_list.append(data) + if len(JQRMSGPubSubUtils.msg_list) >= JQRMSGPubSubUtils.BATCH_SIZE: + TbMessage.objects.bulk_create(JQRMSGPubSubUtils.msg_list[JQRMSGPubSubUtils.BATCH_SIZE:]) + JQRMSGPubSubUtils.msg_list = JQRMSGPubSubUtils.msg_list[JQRMSGPubSubUtils.BATCH_SIZE:] + + +# t = Thread(target=listen) +# t.start() diff --git a/yzk_wechat_event/settings/constant.py b/yzk_wechat_event/settings/constant.py index 687b283..960b5f2 100644 --- a/yzk_wechat_event/settings/constant.py +++ b/yzk_wechat_event/settings/constant.py @@ -3,6 +3,7 @@ class Constant: LOGIN_SMS = 'login:sms:' WECHAT_WORKER_TOKEN = 'wechat:worker:token:' WECHAT_WORKER_TOKEN_EXPIRES = 6000 + JQR_MSG_PUBSUB_CHANNEL = 'jqr:msg' ONE = 1 TWO = 2 THREE = 3