yzk_wechat_event/apps/msg/utils.py

46 lines
1.4 KiB
Python
Raw Normal View History

2023-12-21 17:17:07 +08:00
import json
2023-12-22 14:15:34 +08:00
import logging
2023-12-21 17:17:07 +08:00
2023-12-21 18:07:23 +08:00
from redis import Redis
2023-12-21 17:17:07 +08:00
from django.conf import settings
from threading import Thread
from apps.msg.models import TbMessage
from yzk_wechat_event.settings.constant import Constant
2023-12-22 14:15:34 +08:00
logger = logging.getLogger('apps')
2023-12-21 17:17:07 +08:00
class JQRMSGPubSubUtils:
2023-12-21 18:07:23 +08:00
rc = Redis(host=settings.REDIS_HOST, port=settings.REDIS_PORT, db=0)
2023-12-21 17:17:07 +08:00
msg_list = []
2023-12-21 18:07:23 +08:00
BATCH_SIZE = 100
2023-12-21 17:17:07 +08:00
@classmethod
def publish(cls, data):
2023-12-21 18:07:23 +08:00
cls.rc.lpush(Constant.JQR_MSG_PUBSUB_CHANNEL, json.dumps(data))
2023-12-21 17:17:07 +08:00
def listen():
2023-12-22 14:15:34 +08:00
logger.info('监听消息队列-----')
2023-12-21 18:07:23 +08:00
from apps.jqr.serializers import TbMessageModelSerializer
while True:
data = JQRMSGPubSubUtils.rc.brpop(Constant.JQR_MSG_PUBSUB_CHANNEL, 5)
if data is None:
if len(JQRMSGPubSubUtils.msg_list) > 0:
print(JQRMSGPubSubUtils.msg_list)
TbMessage.objects.bulk_create(JQRMSGPubSubUtils.msg_list)
JQRMSGPubSubUtils.msg_list = []
continue
data = json.loads(data[1])
serializer = TbMessageModelSerializer(data=data)
serializer.is_valid(raise_exception=True)
JQRMSGPubSubUtils.msg_list.append(TbMessage(**serializer.validated_data))
if len(JQRMSGPubSubUtils.msg_list) >= JQRMSGPubSubUtils.BATCH_SIZE:
TbMessage.objects.bulk_create(JQRMSGPubSubUtils.msg_list)
JQRMSGPubSubUtils.msg_list.clear()
t = Thread(target=listen)
t.start()