消息发布订阅

This commit is contained in:
AKW 2023-12-21 18:07:23 +08:00
parent 884255e209
commit 56101f77a8
1 changed files with 22 additions and 17 deletions

View File

@ -1,6 +1,6 @@
import json import json
from redis import StrictRedis from redis import Redis
from django.conf import settings from django.conf import settings
from threading import Thread from threading import Thread
@ -9,28 +9,33 @@ from yzk_wechat_event.settings.constant import Constant
class JQRMSGPubSubUtils: class JQRMSGPubSubUtils:
rc = StrictRedis(host=settings.REDIS_HOST, port=settings.REDIS_PORT, db=0) rc = Redis(host=settings.REDIS_HOST, port=settings.REDIS_PORT, db=0)
msg_list = [] msg_list = []
BATCH_SIZE = 10 BATCH_SIZE = 100
@classmethod @classmethod
def publish(cls, data): def publish(cls, data):
cls.rc.publish(Constant.JQR_MSG_PUBSUB_CHANNEL, json.dumps(data)) cls.rc.lpush(Constant.JQR_MSG_PUBSUB_CHANNEL, json.dumps(data))
def listen(): def listen():
pubsub = JQRMSGPubSubUtils.rc.pubsub() from apps.jqr.serializers import TbMessageModelSerializer
pubsub.subscribe(Constant.JQR_MSG_PUBSUB_CHANNEL) while True:
for msg in pubsub.listen(): data = JQRMSGPubSubUtils.rc.brpop(Constant.JQR_MSG_PUBSUB_CHANNEL, 5)
print(msg) if data is None:
if msg['type'] == 'message': if len(JQRMSGPubSubUtils.msg_list) > 0:
data = msg.get('data') print(JQRMSGPubSubUtils.msg_list)
data = json.dumps(str(data)) TbMessage.objects.bulk_create(JQRMSGPubSubUtils.msg_list)
JQRMSGPubSubUtils.msg_list.append(data) JQRMSGPubSubUtils.msg_list = []
if len(JQRMSGPubSubUtils.msg_list) >= JQRMSGPubSubUtils.BATCH_SIZE: continue
TbMessage.objects.bulk_create(JQRMSGPubSubUtils.msg_list[JQRMSGPubSubUtils.BATCH_SIZE:]) data = json.loads(data[1])
JQRMSGPubSubUtils.msg_list = JQRMSGPubSubUtils.msg_list[JQRMSGPubSubUtils.BATCH_SIZE:] serializer = TbMessageModelSerializer(data=data)
serializer.is_valid(raise_exception=True)
JQRMSGPubSubUtils.msg_list.append(TbMessage(**serializer.validated_data))
if len(JQRMSGPubSubUtils.msg_list) >= JQRMSGPubSubUtils.BATCH_SIZE:
TbMessage.objects.bulk_create(JQRMSGPubSubUtils.msg_list)
JQRMSGPubSubUtils.msg_list.clear()
# t = Thread(target=listen) t = Thread(target=listen)
# t.start() t.start()