yzk_wechat_event/apps/msg/pubsub.py

49 lines
1.4 KiB
Python

import json
import logging
import os
import time
from django_redis import get_redis_connection
from django.conf import settings
from threading import Thread
from apps.msg.models import TbMessage
from yzk_wechat_event.settings.constant import Constant
logger = logging.getLogger('apps')
class JQRMSGPubSub:
rc = get_redis_connection('pubsub')
msg_list = []
BATCH_SIZE = 100
@classmethod
def publish(cls, data):
cls.rc.lpush(Constant.JQR_MSG_PUBSUB_CHANNEL, json.dumps(data))
@classmethod
def listen(cls):
from apps.jqr.serializers import TbMessageModelSerializer
while True:
data = cls.rc.brpop(Constant.JQR_MSG_PUBSUB_CHANNEL, 5)
if data is None:
if len(cls.msg_list) > 0:
print(cls.msg_list)
TbMessage.objects.bulk_create(cls.msg_list)
cls.msg_list = []
time.sleep(5)
continue
data = json.loads(data[1])
serializer = TbMessageModelSerializer(data=data)
serializer.is_valid(raise_exception=True)
cls.msg_list.append(TbMessage(**serializer.validated_data))
if len(cls.msg_list) >= cls.BATCH_SIZE:
TbMessage.objects.bulk_create(cls.msg_list)
cls.msg_list.clear()
# if not settings.DEBUG:
# t = Thread(target=JQRMSGPubSub.listen)
# t.start()