事件回调修改

This commit is contained in:
AKW 2023-12-25 11:34:35 +08:00
parent c935be72f5
commit 238e00d2fc
4 changed files with 51 additions and 50 deletions

View File

@ -59,6 +59,7 @@ class JQRQrcodeCallbackPubSub:
handler(qrcodeid, userid, externaluserid, corpinfo) handler(qrcodeid, userid, externaluserid, corpinfo)
# if not settings.DEBUG:
t = Thread(target=JQREventCallbackPubSub.event_callback_listener) t = Thread(target=JQREventCallbackPubSub.event_callback_listener)
t.start() t.start()
t2 = Thread(target=JQRQrcodeCallbackPubSub.qrcode_callback_listener) t2 = Thread(target=JQRQrcodeCallbackPubSub.qrcode_callback_listener)

View File

@ -10,7 +10,7 @@ from apps.jqr.pubsub import JQREventCallbackPubSub
from apps.jqr.tasks import save_add_contact, delete_add_contact, edit_add_contact, save_add_contact_by_channel, \ from apps.jqr.tasks import save_add_contact, delete_add_contact, edit_add_contact, save_add_contact_by_channel, \
edit_add_contact_by_channel, delete_add_contact_by_channel edit_add_contact_by_channel, delete_add_contact_by_channel
from apps.msg.models import TbMessage from apps.msg.models import TbMessage
from apps.msg.utils import JQRMSGPubSubUtils from apps.msg.pubsub import JQRMSGPubSub
from apps.qc.choices import QcCorpInfoCallbackStatusChoices from apps.qc.choices import QcCorpInfoCallbackStatusChoices
from libs.weworkapi.callback.WXBizMsgCrypt3 import WXBizMsgCrypt, Prpcrypt from libs.weworkapi.callback.WXBizMsgCrypt3 import WXBizMsgCrypt, Prpcrypt
from utils.tools import sha1_encoder, get_attribute, camel_to_snake from utils.tools import sha1_encoder, get_attribute, camel_to_snake
@ -226,7 +226,7 @@ class TbMessageModelSerializer(BaseSerializer):
fields = '__all__' fields = '__all__'
def create(self, validated_data): def create(self, validated_data):
JQRMSGPubSubUtils.publish({ JQRMSGPubSub.publish({
**self.context.get('request').data, **self.context.get('request').data,
'corpid': validated_data.get('corpid'), 'corpid': validated_data.get('corpid'),
'userid': validated_data.get('userid'), 'userid': validated_data.get('userid'),

48
apps/msg/pubsub.py Normal file
View File

@ -0,0 +1,48 @@
import json
import logging
import os
import time
from django_redis import get_redis_connection
from django.conf import settings
from threading import Thread
from apps.msg.models import TbMessage
from yzk_wechat_event.settings.constant import Constant
logger = logging.getLogger('apps')
class JQRMSGPubSub:
rc = get_redis_connection()
msg_list = []
BATCH_SIZE = 100
@classmethod
def publish(cls, data):
cls.rc.lpush(Constant.JQR_MSG_PUBSUB_CHANNEL, json.dumps(data))
@classmethod
def listen(cls):
from apps.jqr.serializers import TbMessageModelSerializer
while True:
data = cls.rc.brpop(Constant.JQR_MSG_PUBSUB_CHANNEL, 5)
if data is None:
if len(cls.msg_list) > 0:
print(cls.msg_list)
TbMessage.objects.bulk_create(cls.msg_list)
cls.msg_list = []
time.sleep(5)
continue
data = json.loads(data[1])
serializer = TbMessageModelSerializer(data=data)
serializer.is_valid(raise_exception=True)
cls.msg_list.append(TbMessage(**serializer.validated_data))
if len(cls.msg_list) >= cls.BATCH_SIZE:
TbMessage.objects.bulk_create(cls.msg_list)
cls.msg_list.clear()
# if not settings.DEBUG:
t = Thread(target=JQRMSGPubSub.listen)
t.start()

View File

@ -1,48 +0,0 @@
import json
import logging
import os
import time
from django_redis import get_redis_connection
from django.conf import settings
from threading import Thread
from apps.msg.models import TbMessage
from yzk_wechat_event.settings.constant import Constant
logger = logging.getLogger('apps')
class JQRMSGPubSubUtils:
rc = get_redis_connection()
msg_list = []
BATCH_SIZE = 100
@classmethod
def publish(cls, data):
cls.rc.lpush(Constant.JQR_MSG_PUBSUB_CHANNEL, json.dumps(data))
def listen():
from apps.jqr.serializers import TbMessageModelSerializer
while True:
data = JQRMSGPubSubUtils.rc.brpop(Constant.JQR_MSG_PUBSUB_CHANNEL, 5)
if data is None:
if len(JQRMSGPubSubUtils.msg_list) > 0:
print(JQRMSGPubSubUtils.msg_list)
TbMessage.objects.bulk_create(JQRMSGPubSubUtils.msg_list)
JQRMSGPubSubUtils.msg_list = []
time.sleep(5)
continue
data = json.loads(data[1])
serializer = TbMessageModelSerializer(data=data)
serializer.is_valid(raise_exception=True)
JQRMSGPubSubUtils.msg_list.append(TbMessage(**serializer.validated_data))
if len(JQRMSGPubSubUtils.msg_list) >= JQRMSGPubSubUtils.BATCH_SIZE:
TbMessage.objects.bulk_create(JQRMSGPubSubUtils.msg_list)
JQRMSGPubSubUtils.msg_list.clear()
if not settings.DEBUG:
t = Thread(target=listen)
t.start()