yzk_wechat_event/libs/topsdk/top.py

90 lines
3.8 KiB
Python

import logging
from datetime import datetime
from libs.topsdk.client import TopApiClient, TopException
from libs.topsdk.defaultability.defaultability import Defaultability
from libs.topsdk.defaultability.request.alibaba_alsc_union_eleme_promotion_officialactivity_get_request import \
AlibabaAlscUnionElemePromotionOfficialactivityGetRequest
from libs.topsdk.defaultability.request.alibaba_alsc_union_kbcpx_positive_order_get_request import \
AlibabaAlscUnionKbcpxPositiveOrderGetRequest
from libs.topsdk.defaultability.request.alibaba_alsc_union_media_zone_add_request import \
AlibabaAlscUnionMediaZoneAddRequest
from libs.topsdk.defaultability.request.alibaba_alsc_union_media_zone_get_request import \
AlibabaAlscUnionMediaZoneGetRequest
from libs.topsdk.util import convert_basic
loger = logging.getLogger('apps')
class TopUtils(object):
def __init__(self, appkey, secret, gateway_url='http://gw.api.taobao.com/router/rest'):
self.appkey = appkey
self.secret = secret
self.gateway_url = gateway_url
self.client = TopApiClient(appkey=self.appkey, app_sercet=self.secret,
top_gateway_url=self.gateway_url,
verify_ssl=False)
self.ability = Defaultability(client=self.client)
def get_order_info(self, start_date, date_type=4, end_date=None, biz_unit=2, page_size=10, page_number=1, **kwargs):
request = AlibabaAlscUnionKbcpxPositiveOrderGetRequest()
request.date_type = date_type
request.end_date = convert_basic(end_date) or datetime.today().strftime('%Y-%m-%d %H:%M:%S')
request.biz_unit = biz_unit
request.page_size = page_size
request.page_number = page_number
request.start_date = convert_basic(start_date)
if kwargs:
for attr, value in kwargs.items():
setattr(request, attr, value)
try:
response = self.ability.alibaba_alsc_union_kbcpx_positive_order_get(request)
if response.get('biz_error_code') == 0:
return response.get('result'), response.get('total_count')
except TopException as e:
print(e)
loger.error(f'拉取订单失败, {e}')
def get_zone_list(self, page=1, limit=10):
request = AlibabaAlscUnionMediaZoneGetRequest()
request.page = page
request.limit = limit
try:
response = self.ability.alibaba_alsc_union_media_zone_get(request)
if response.get('biz_error_code') == 0:
return response.get('result')
except TopException as e:
print(e)
loger.error(f'获取推广位失败, {e}')
def add_zone(self, zone_name, media_id):
request = AlibabaAlscUnionMediaZoneAddRequest()
request.zone_name = zone_name
request.media_id = media_id
try:
response = self.ability.alibaba_alsc_union_media_zone_add(request)
if response.get('biz_error_code') == 0:
return response.get('result')
except TopException as e:
print(e)
loger.error(f'创建推广位失败, {e}')
def get_official_activity(self, pid, activity_id, sid=None, include_wx_img=True, include_qr_code=True):
request = AlibabaAlscUnionElemePromotionOfficialactivityGetRequest(query_request={
'pid': pid,
'activity_id': activity_id,
'sid': sid,
'include_wx_img': include_wx_img,
'include_qr_code': include_qr_code,
})
try:
response = self.ability.alibaba_alsc_union_eleme_promotion_officialactivity_get(request)
if response.get('result_code') == 0:
return response.get('data')
except TopException as e:
print(e)
loger.error(f'获取官方活动失败, {e}')