diff --git a/apps/jqr/pubsub.py b/apps/jqr/pubsub.py index 749366f..767cc40 100644 --- a/apps/jqr/pubsub.py +++ b/apps/jqr/pubsub.py @@ -59,6 +59,7 @@ class JQRQrcodeCallbackPubSub: handler(qrcodeid, userid, externaluserid, corpinfo) +# if not settings.DEBUG: t = Thread(target=JQREventCallbackPubSub.event_callback_listener) t.start() t2 = Thread(target=JQRQrcodeCallbackPubSub.qrcode_callback_listener) diff --git a/apps/jqr/serializers.py b/apps/jqr/serializers.py index 8ccbcf5..0d6ab75 100644 --- a/apps/jqr/serializers.py +++ b/apps/jqr/serializers.py @@ -10,7 +10,7 @@ from apps.jqr.pubsub import JQREventCallbackPubSub from apps.jqr.tasks import save_add_contact, delete_add_contact, edit_add_contact, save_add_contact_by_channel, \ edit_add_contact_by_channel, delete_add_contact_by_channel from apps.msg.models import TbMessage -from apps.msg.utils import JQRMSGPubSubUtils +from apps.msg.pubsub import JQRMSGPubSub from apps.qc.choices import QcCorpInfoCallbackStatusChoices from libs.weworkapi.callback.WXBizMsgCrypt3 import WXBizMsgCrypt, Prpcrypt from utils.tools import sha1_encoder, get_attribute, camel_to_snake @@ -226,7 +226,7 @@ class TbMessageModelSerializer(BaseSerializer): fields = '__all__' def create(self, validated_data): - JQRMSGPubSubUtils.publish({ + JQRMSGPubSub.publish({ **self.context.get('request').data, 'corpid': validated_data.get('corpid'), 'userid': validated_data.get('userid'), diff --git a/apps/msg/pubsub.py b/apps/msg/pubsub.py new file mode 100644 index 0000000..9173cc3 --- /dev/null +++ b/apps/msg/pubsub.py @@ -0,0 +1,48 @@ +import json +import logging +import os +import time + +from django_redis import get_redis_connection +from django.conf import settings +from threading import Thread + +from apps.msg.models import TbMessage +from yzk_wechat_event.settings.constant import Constant + +logger = logging.getLogger('apps') + + +class JQRMSGPubSub: + rc = get_redis_connection() + msg_list = [] + BATCH_SIZE = 100 + + @classmethod + def publish(cls, data): + cls.rc.lpush(Constant.JQR_MSG_PUBSUB_CHANNEL, json.dumps(data)) + + @classmethod + def listen(cls): + from apps.jqr.serializers import TbMessageModelSerializer + while True: + data = cls.rc.brpop(Constant.JQR_MSG_PUBSUB_CHANNEL, 5) + if data is None: + if len(cls.msg_list) > 0: + print(cls.msg_list) + TbMessage.objects.bulk_create(cls.msg_list) + cls.msg_list = [] + time.sleep(5) + continue + data = json.loads(data[1]) + serializer = TbMessageModelSerializer(data=data) + serializer.is_valid(raise_exception=True) + cls.msg_list.append(TbMessage(**serializer.validated_data)) + if len(cls.msg_list) >= cls.BATCH_SIZE: + TbMessage.objects.bulk_create(cls.msg_list) + cls.msg_list.clear() + + +# if not settings.DEBUG: +t = Thread(target=JQRMSGPubSub.listen) +t.start() diff --git a/apps/msg/utils.py b/apps/msg/utils.py deleted file mode 100644 index 0c117e1..0000000 --- a/apps/msg/utils.py +++ /dev/null @@ -1,48 +0,0 @@ -import json -import logging -import os -import time - -from django_redis import get_redis_connection -from django.conf import settings -from threading import Thread - -from apps.msg.models import TbMessage -from yzk_wechat_event.settings.constant import Constant - -logger = logging.getLogger('apps') - - -class JQRMSGPubSubUtils: - rc = get_redis_connection() - msg_list = [] - BATCH_SIZE = 100 - - @classmethod - def publish(cls, data): - cls.rc.lpush(Constant.JQR_MSG_PUBSUB_CHANNEL, json.dumps(data)) - - -def listen(): - from apps.jqr.serializers import TbMessageModelSerializer - while True: - data = JQRMSGPubSubUtils.rc.brpop(Constant.JQR_MSG_PUBSUB_CHANNEL, 5) - if data is None: - if len(JQRMSGPubSubUtils.msg_list) > 0: - print(JQRMSGPubSubUtils.msg_list) - TbMessage.objects.bulk_create(JQRMSGPubSubUtils.msg_list) - JQRMSGPubSubUtils.msg_list = [] - time.sleep(5) - continue - data = json.loads(data[1]) - serializer = TbMessageModelSerializer(data=data) - serializer.is_valid(raise_exception=True) - JQRMSGPubSubUtils.msg_list.append(TbMessage(**serializer.validated_data)) - if len(JQRMSGPubSubUtils.msg_list) >= JQRMSGPubSubUtils.BATCH_SIZE: - TbMessage.objects.bulk_create(JQRMSGPubSubUtils.msg_list) - JQRMSGPubSubUtils.msg_list.clear() - - -if not settings.DEBUG: - t = Thread(target=listen) - t.start()