From 56101f77a887b3b30560a890ae683c716ae84da0 Mon Sep 17 00:00:00 2001 From: AKW <2497744746@qq.com> Date: Thu, 21 Dec 2023 18:07:23 +0800 Subject: [PATCH] =?UTF-8?q?=E6=B6=88=E6=81=AF=E5=8F=91=E5=B8=83=E8=AE=A2?= =?UTF-8?q?=E9=98=85?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- apps/msg/utils.py | 39 ++++++++++++++++++++++----------------- 1 file changed, 22 insertions(+), 17 deletions(-) diff --git a/apps/msg/utils.py b/apps/msg/utils.py index 1c58ca0..5a9a7a3 100644 --- a/apps/msg/utils.py +++ b/apps/msg/utils.py @@ -1,6 +1,6 @@ import json -from redis import StrictRedis +from redis import Redis from django.conf import settings from threading import Thread @@ -9,28 +9,33 @@ from yzk_wechat_event.settings.constant import Constant class JQRMSGPubSubUtils: - rc = StrictRedis(host=settings.REDIS_HOST, port=settings.REDIS_PORT, db=0) + rc = Redis(host=settings.REDIS_HOST, port=settings.REDIS_PORT, db=0) msg_list = [] - BATCH_SIZE = 10 + BATCH_SIZE = 100 @classmethod def publish(cls, data): - cls.rc.publish(Constant.JQR_MSG_PUBSUB_CHANNEL, json.dumps(data)) + cls.rc.lpush(Constant.JQR_MSG_PUBSUB_CHANNEL, json.dumps(data)) def listen(): - pubsub = JQRMSGPubSubUtils.rc.pubsub() - pubsub.subscribe(Constant.JQR_MSG_PUBSUB_CHANNEL) - for msg in pubsub.listen(): - print(msg) - if msg['type'] == 'message': - data = msg.get('data') - data = json.dumps(str(data)) - JQRMSGPubSubUtils.msg_list.append(data) - if len(JQRMSGPubSubUtils.msg_list) >= JQRMSGPubSubUtils.BATCH_SIZE: - TbMessage.objects.bulk_create(JQRMSGPubSubUtils.msg_list[JQRMSGPubSubUtils.BATCH_SIZE:]) - JQRMSGPubSubUtils.msg_list = JQRMSGPubSubUtils.msg_list[JQRMSGPubSubUtils.BATCH_SIZE:] + from apps.jqr.serializers import TbMessageModelSerializer + while True: + data = JQRMSGPubSubUtils.rc.brpop(Constant.JQR_MSG_PUBSUB_CHANNEL, 5) + if data is None: + if len(JQRMSGPubSubUtils.msg_list) > 0: + print(JQRMSGPubSubUtils.msg_list) + TbMessage.objects.bulk_create(JQRMSGPubSubUtils.msg_list) + JQRMSGPubSubUtils.msg_list = [] + continue + data = json.loads(data[1]) + serializer = TbMessageModelSerializer(data=data) + serializer.is_valid(raise_exception=True) + JQRMSGPubSubUtils.msg_list.append(TbMessage(**serializer.validated_data)) + if len(JQRMSGPubSubUtils.msg_list) >= JQRMSGPubSubUtils.BATCH_SIZE: + TbMessage.objects.bulk_create(JQRMSGPubSubUtils.msg_list) + JQRMSGPubSubUtils.msg_list.clear() -# t = Thread(target=listen) -# t.start() +t = Thread(target=listen) +t.start()