state修改
This commit is contained in:
parent
b2cc43dd0c
commit
1022eee9a1
|
@ -5,6 +5,7 @@ from rest_framework import serializers
|
|||
from apps.jqr.models import JqrHookUser
|
||||
from apps.jqr.tasks import save_add_contact, delete_add_contact, edit_add_contact
|
||||
from apps.msg.models import TbMessage
|
||||
from apps.msg.utils import JQRMSGPubSubUtils
|
||||
from libs.weworkapi.callback.WXBizMsgCrypt3 import WXBizMsgCrypt, Prpcrypt
|
||||
from utils.tools import sha1_encoder, get_attribute, camel_to_snake
|
||||
from utils.base_serializer import BaseSerializer, CurrentIpDefault
|
||||
|
@ -91,10 +92,12 @@ class WechatEncryptSerializer(serializers.Serializer):
|
|||
print('添加客户事件')
|
||||
corp = self.context.get('corp')
|
||||
data.pop('tousername')
|
||||
data['corpid'] = corp.corpid
|
||||
data['agentid'] = corp.agentid
|
||||
data['uid'] = corp.uid
|
||||
print(data)
|
||||
state = data.get('state')
|
||||
if state and state.startswith('mg') and '_' in state:
|
||||
data['corpid'] = corp.corpid
|
||||
data['agentid'] = corp.agentid
|
||||
data['uid'] = corp.uid
|
||||
save_add_contact.delay(data, corp.to_dict(['corpid', 'appsecret']))
|
||||
|
||||
def handle_change_external_contact_edit_external_contact(self, data):
|
||||
|
@ -104,10 +107,12 @@ class WechatEncryptSerializer(serializers.Serializer):
|
|||
print('编辑客户事件')
|
||||
corp = self.context.get('corp')
|
||||
data.pop('tousername')
|
||||
data['corpid'] = corp.corpid
|
||||
data['agentid'] = corp.agentid
|
||||
data['uid'] = corp.uid
|
||||
print(data)
|
||||
state = data.get('state')
|
||||
if state and state.startswith('mg') and '_' in state:
|
||||
data['corpid'] = corp.corpid
|
||||
data['agentid'] = corp.agentid
|
||||
data['uid'] = corp.uid
|
||||
edit_add_contact.delay(data, corp.to_dict(['corpid', 'appsecret']))
|
||||
|
||||
def handle_change_external_contact_add_half_external_contact(self, data):
|
||||
|
@ -117,10 +122,12 @@ class WechatEncryptSerializer(serializers.Serializer):
|
|||
print('外部联系人免验证添加成员事件')
|
||||
corp = self.context.get('corp')
|
||||
data.pop('tousername')
|
||||
data['corpid'] = corp.corpid
|
||||
data['agentid'] = corp.agentid
|
||||
data['uid'] = corp.uid
|
||||
print(data)
|
||||
state = data.get('state')
|
||||
if state and state.startswith('mg') and '_' in state:
|
||||
data['corpid'] = corp.corpid
|
||||
data['agentid'] = corp.agentid
|
||||
data['uid'] = corp.uid
|
||||
save_add_contact.delay(data, corp.to_dict(['corpid', 'appsecret']))
|
||||
|
||||
def handle_change_external_contact_del_follow_user(self, data):
|
||||
|
@ -130,10 +137,12 @@ class WechatEncryptSerializer(serializers.Serializer):
|
|||
print('删除跟进成员事件')
|
||||
corp = self.context.get('corp')
|
||||
data.pop('tousername')
|
||||
data['corpid'] = corp.corpid
|
||||
data['agentid'] = corp.agentid
|
||||
data['uid'] = corp.uid
|
||||
print(data)
|
||||
state = data.get('state')
|
||||
if state and state.startswith('mg') and '_' in state:
|
||||
data['corpid'] = corp.corpid
|
||||
data['agentid'] = corp.agentid
|
||||
data['uid'] = corp.uid
|
||||
delete_add_contact.delay(data, corp.to_dict(['corpid', 'appsecret']))
|
||||
|
||||
def handle_change_external_chat_create(self, data):
|
||||
|
@ -182,6 +191,7 @@ class TbMessageModelSerializer(BaseSerializer):
|
|||
model = TbMessage
|
||||
fields = '__all__'
|
||||
|
||||
# def create(self, validated_data):
|
||||
# return TbMessage(**validated_data)
|
||||
def create(self, validated_data):
|
||||
JQRMSGPubSubUtils.publish(self.context.get('request').data)
|
||||
return TbMessage(**validated_data)
|
||||
|
||||
|
|
|
@ -23,4 +23,4 @@ data = {
|
|||
|
||||
for i in range(10):
|
||||
value_new = json.dumps(data)
|
||||
rc.publish("jqr_msg", value_new)
|
||||
rc.publish("jqr:msg", value_new)
|
||||
|
|
|
@ -5,20 +5,21 @@ import json
|
|||
|
||||
rc = StrictRedis(host='localhost', port=6379, db=0)
|
||||
ps = rc.pubsub()
|
||||
ps.subscribe('jqr_msg')
|
||||
ps.subscribe('jqr:msg')
|
||||
|
||||
msg_list = []
|
||||
|
||||
batch_size = 2
|
||||
while True:
|
||||
for item in ps.listen():
|
||||
if item['type'] == 'message':
|
||||
data = item.get('data')
|
||||
print(data)
|
||||
msg_list.append(data)
|
||||
if len(msg_list) >= 10:
|
||||
if len(msg_list) >= batch_size:
|
||||
try:
|
||||
time.sleep(1)
|
||||
msg_list.clear()
|
||||
msg_list = msg_list[-batch_size:]
|
||||
except Exception as e:
|
||||
print(e)
|
||||
print(len(msg_list))
|
||||
|
|
|
@ -0,0 +1,36 @@
|
|||
import json
|
||||
|
||||
from redis import StrictRedis
|
||||
from django.conf import settings
|
||||
from threading import Thread
|
||||
|
||||
from apps.msg.models import TbMessage
|
||||
from yzk_wechat_event.settings.constant import Constant
|
||||
|
||||
|
||||
class JQRMSGPubSubUtils:
|
||||
rc = StrictRedis(host=settings.REDIS_HOST, port=settings.REDIS_PORT, db=0)
|
||||
msg_list = []
|
||||
BATCH_SIZE = 10
|
||||
|
||||
@classmethod
|
||||
def publish(cls, data):
|
||||
cls.rc.publish(Constant.JQR_MSG_PUBSUB_CHANNEL, json.dumps(data))
|
||||
|
||||
|
||||
def listen():
|
||||
pubsub = JQRMSGPubSubUtils.rc.pubsub()
|
||||
pubsub.subscribe(Constant.JQR_MSG_PUBSUB_CHANNEL)
|
||||
for msg in pubsub.listen():
|
||||
print(msg)
|
||||
if msg['type'] == 'message':
|
||||
data = msg.get('data')
|
||||
data = json.dumps(str(data))
|
||||
JQRMSGPubSubUtils.msg_list.append(data)
|
||||
if len(JQRMSGPubSubUtils.msg_list) >= JQRMSGPubSubUtils.BATCH_SIZE:
|
||||
TbMessage.objects.bulk_create(JQRMSGPubSubUtils.msg_list[JQRMSGPubSubUtils.BATCH_SIZE:])
|
||||
JQRMSGPubSubUtils.msg_list = JQRMSGPubSubUtils.msg_list[JQRMSGPubSubUtils.BATCH_SIZE:]
|
||||
|
||||
|
||||
# t = Thread(target=listen)
|
||||
# t.start()
|
|
@ -3,6 +3,7 @@ class Constant:
|
|||
LOGIN_SMS = 'login:sms:'
|
||||
WECHAT_WORKER_TOKEN = 'wechat:worker:token:'
|
||||
WECHAT_WORKER_TOKEN_EXPIRES = 6000
|
||||
JQR_MSG_PUBSUB_CHANNEL = 'jqr:msg'
|
||||
ONE = 1
|
||||
TWO = 2
|
||||
THREE = 3
|
||||
|
|
Loading…
Reference in New Issue