diff --git a/apps/msg/utils.py b/apps/msg/utils.py index 1c58ca0..5a9a7a3 100644 --- a/apps/msg/utils.py +++ b/apps/msg/utils.py @@ -1,6 +1,6 @@ import json -from redis import StrictRedis +from redis import Redis from django.conf import settings from threading import Thread @@ -9,28 +9,33 @@ from yzk_wechat_event.settings.constant import Constant class JQRMSGPubSubUtils: - rc = StrictRedis(host=settings.REDIS_HOST, port=settings.REDIS_PORT, db=0) + rc = Redis(host=settings.REDIS_HOST, port=settings.REDIS_PORT, db=0) msg_list = [] - BATCH_SIZE = 10 + BATCH_SIZE = 100 @classmethod def publish(cls, data): - cls.rc.publish(Constant.JQR_MSG_PUBSUB_CHANNEL, json.dumps(data)) + cls.rc.lpush(Constant.JQR_MSG_PUBSUB_CHANNEL, json.dumps(data)) def listen(): - pubsub = JQRMSGPubSubUtils.rc.pubsub() - pubsub.subscribe(Constant.JQR_MSG_PUBSUB_CHANNEL) - for msg in pubsub.listen(): - print(msg) - if msg['type'] == 'message': - data = msg.get('data') - data = json.dumps(str(data)) - JQRMSGPubSubUtils.msg_list.append(data) - if len(JQRMSGPubSubUtils.msg_list) >= JQRMSGPubSubUtils.BATCH_SIZE: - TbMessage.objects.bulk_create(JQRMSGPubSubUtils.msg_list[JQRMSGPubSubUtils.BATCH_SIZE:]) - JQRMSGPubSubUtils.msg_list = JQRMSGPubSubUtils.msg_list[JQRMSGPubSubUtils.BATCH_SIZE:] + from apps.jqr.serializers import TbMessageModelSerializer + while True: + data = JQRMSGPubSubUtils.rc.brpop(Constant.JQR_MSG_PUBSUB_CHANNEL, 5) + if data is None: + if len(JQRMSGPubSubUtils.msg_list) > 0: + print(JQRMSGPubSubUtils.msg_list) + TbMessage.objects.bulk_create(JQRMSGPubSubUtils.msg_list) + JQRMSGPubSubUtils.msg_list = [] + continue + data = json.loads(data[1]) + serializer = TbMessageModelSerializer(data=data) + serializer.is_valid(raise_exception=True) + JQRMSGPubSubUtils.msg_list.append(TbMessage(**serializer.validated_data)) + if len(JQRMSGPubSubUtils.msg_list) >= JQRMSGPubSubUtils.BATCH_SIZE: + TbMessage.objects.bulk_create(JQRMSGPubSubUtils.msg_list) + JQRMSGPubSubUtils.msg_list.clear() -# t = Thread(target=listen) -# t.start() +t = Thread(target=listen) +t.start()