import json import logging import os import time from django_redis import get_redis_connection from django.conf import settings from threading import Thread from apps.msg.models import TbMessage from yzk_wechat_event.settings.constant import Constant logger = logging.getLogger('apps') class JQRMSGPubSub: rc = get_redis_connection() msg_list = [] BATCH_SIZE = 100 @classmethod def publish(cls, data): cls.rc.lpush(Constant.JQR_MSG_PUBSUB_CHANNEL, json.dumps(data)) @classmethod def listen(cls): from apps.jqr.serializers import TbMessageModelSerializer while True: data = cls.rc.brpop(Constant.JQR_MSG_PUBSUB_CHANNEL, 5) if data is None: if len(cls.msg_list) > 0: print(cls.msg_list) TbMessage.objects.bulk_create(cls.msg_list) cls.msg_list = [] time.sleep(5) continue data = json.loads(data[1]) serializer = TbMessageModelSerializer(data=data) serializer.is_valid(raise_exception=True) cls.msg_list.append(TbMessage(**serializer.validated_data)) if len(cls.msg_list) >= cls.BATCH_SIZE: TbMessage.objects.bulk_create(cls.msg_list) cls.msg_list.clear() # if not settings.DEBUG: # t = Thread(target=JQRMSGPubSub.listen) # t.start()