yzk_wechat_event/apps/msg/utils.py

46 lines
1.4 KiB
Python

import json
import logging
import os
from redis import Redis
from django.conf import settings
from threading import Thread
from apps.msg.models import TbMessage
from yzk_wechat_event.settings.constant import Constant
logger = logging.getLogger('apps')
class JQRMSGPubSubUtils:
rc = Redis(host=settings.REDIS_HOST, port=settings.REDIS_PORT, db=0)
msg_list = []
BATCH_SIZE = 100
@classmethod
def publish(cls, data):
cls.rc.lpush(Constant.JQR_MSG_PUBSUB_CHANNEL, json.dumps(data))
def listen():
from apps.jqr.serializers import TbMessageModelSerializer
while True:
data = JQRMSGPubSubUtils.rc.brpop(Constant.JQR_MSG_PUBSUB_CHANNEL, 5)
if data is None:
if len(JQRMSGPubSubUtils.msg_list) > 0:
print(JQRMSGPubSubUtils.msg_list)
TbMessage.objects.bulk_create(JQRMSGPubSubUtils.msg_list)
JQRMSGPubSubUtils.msg_list = []
continue
data = json.loads(data[1])
serializer = TbMessageModelSerializer(data=data)
serializer.is_valid(raise_exception=True)
JQRMSGPubSubUtils.msg_list.append(TbMessage(**serializer.validated_data))
if len(JQRMSGPubSubUtils.msg_list) >= JQRMSGPubSubUtils.BATCH_SIZE:
TbMessage.objects.bulk_create(JQRMSGPubSubUtils.msg_list)
JQRMSGPubSubUtils.msg_list.clear()
t = Thread(target=listen)
t.start()