import json import logging import os from redis import Redis from django.conf import settings from threading import Thread from apps.msg.models import TbMessage from yzk_wechat_event.settings.constant import Constant logger = logging.getLogger('apps') class JQRMSGPubSubUtils: rc = Redis(host=settings.REDIS_HOST, port=settings.REDIS_PORT, db=0) msg_list = [] BATCH_SIZE = 100 @classmethod def publish(cls, data): cls.rc.lpush(Constant.JQR_MSG_PUBSUB_CHANNEL, json.dumps(data)) def listen(): api = os.environ.get('api') logger.info(f'监听消息队列-----{api}') from apps.jqr.serializers import TbMessageModelSerializer while True: data = JQRMSGPubSubUtils.rc.brpop(Constant.JQR_MSG_PUBSUB_CHANNEL, 5) if data is None: if len(JQRMSGPubSubUtils.msg_list) > 0: print(JQRMSGPubSubUtils.msg_list) TbMessage.objects.bulk_create(JQRMSGPubSubUtils.msg_list) JQRMSGPubSubUtils.msg_list = [] continue data = json.loads(data[1]) serializer = TbMessageModelSerializer(data=data) serializer.is_valid(raise_exception=True) JQRMSGPubSubUtils.msg_list.append(TbMessage(**serializer.validated_data)) if len(JQRMSGPubSubUtils.msg_list) >= JQRMSGPubSubUtils.BATCH_SIZE: TbMessage.objects.bulk_create(JQRMSGPubSubUtils.msg_list) JQRMSGPubSubUtils.msg_list.clear() if not settings.DEBUG: t = Thread(target=listen) t.start()