很久没写过文章了,今天来写点什么吧。
最近有客户有需求实时提醒,之前用的是server酱,现在想自己搞一个。
通过微信订阅号自己提醒。
微信公众号发送消息好像是有限的,只能通过模板消息来发送。
这时候就需要搞清楚相应的接口:
获取access_token,和两个发送消息的api url
接下来只要构造好消息体,然后发送消息就可以了。
因为是要写一个模块,我们抽象出来一个client用作获取access和发送消息
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40
| import httpx from .loggings import logger from .consts import *
class Client: def __init__(self, appid, appsecret): self.appid = appid self.appsecret = appsecret self.access = self.get_access_token()
def _request(self, method, url, params=None, data=None, timeout=None): if params is None: params = {}
if method == 'GET': result = httpx.get(url, params=params) elif method == 'POST': result = httpx.post(url, json=data, params=params, timeout=timeout) result.raise_for_status()
if result.json().get('errcode'): logger.error(f"[-] 访问出错:{result.json()}") return None return result.json()
def get_access_token(self, appid=None, appsecret=None): if appid is None: appid = self.appid if appsecret is None: appsecret = self.appsecret
data = { "grant_type": "client_credential", "appid": self.appid, "secret": self.appsecret, "force_refresh": False }
result = self._request('POST', GET_ACCESS_TOKEN, data=data) return result['access_token']
|
这样client的_request处理api返回错误,只要失败就返回空,然后日志打印出来错误
这样的好处是,后续逻辑中可以判断:
1 2 3 4 5
| rep = client._request(...) if rep: else:
|
然后再搞个发送模版消息的类:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
| from ..client import Client from ..loggings import logger from ..consts import *
class SendMsgTemplate(Client): def __init__(self, appid, appsecret): super().__init__(appid, appsecret)
def send_msg_template(self, to_user, template_id, content, url=""): send_data = { "touser":to_user, "template_id":template_id, "data":content, "url":url } res = self._request("POST", SEND_TEXT_MSG_TEMPLATE, data=send_data, params={"ACCESS_TOKEN": self.access}) if res: logger.info("发送消息模板成功") return res else: logger.error("发送消息模板失败")
|
这里要注意的是content格式为:
1 2 3 4 5 6 7 8
| content = { "first": { "value": "first msg", }, "keyword1": { "value": "keyword1 msg", } }
|
之后调用发送就行了:
1 2 3
| from wechatMsgPushSDK.tools.send import SendMsg send = SendMsg('appid', 'appsecret') send.send_msg('openid', '给你男朋友买饭了, 宝, 别生气了')
|
详细代码在GitHub:
https://github.com/ximutech/wechatMsgPushSDK
不成熟还未开源,后续会完善,需要公众号回复1获取源码
再来看看loggings模块
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29
| import logging import os
def setup_logger(log_file_name='./logs/app.log'):
if not os.path.exists('logs'): os.mkdir('logs')
logger = logging.getLogger('my_logger') logger.setLevel(logging.DEBUG)
log_file_path = os.path.join(os.getcwd(), log_file_name) file_handler = logging.FileHandler(log_file_path) file_handler.setLevel(logging.DEBUG)
formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s') file_handler.setFormatter(formatter)
logger.addHandler(file_handler)
return logger
logger = setup_logger()
|
果真chatGPT是个呆瓜,以前错误处理问chatGPT他给的处理方法及其复杂,因为我想捕获错误行到文件。
后来发现logger就有这个功能,只要设置一个格式化就行了。
Formatter这个类的解析中有格式化参数。错误行时lineno,错误路径是pathname。