很久没写过文章了,今天来写点什么吧。
最近有客户有需求实时提醒,之前用的是server酱,现在想自己搞一个。
通过微信订阅号自己提醒。
微信公众号发送消息好像是有限的,只能通过模板消息来发送。
这时候就需要搞清楚相应的接口:

获取access_token,和两个发送消息的api url
接下来只要构造好消息体,然后发送消息就可以了。
因为是要写一个模块,我们抽象出来一个client用作获取access和发送消息
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40
   | import httpx from .loggings import logger from .consts import *
  class Client:     def __init__(self, appid, appsecret):         self.appid = appid         self.appsecret = appsecret         self.access = self.get_access_token()
      def _request(self, method, url, params=None, data=None, timeout=None):         if params is None:             params = {}
          if method == 'GET':             result = httpx.get(url, params=params)         elif method == 'POST':             result =  httpx.post(url, json=data, params=params, timeout=timeout)         result.raise_for_status()
          if result.json().get('errcode'):             logger.error(f"[-] 访问出错:{result.json()}")             return None         return result.json()
      def get_access_token(self, appid=None, appsecret=None):         if appid is None:             appid = self.appid         if appsecret is None:             appsecret = self.appsecret
          data = {             "grant_type": "client_credential",             "appid": self.appid,             "secret": self.appsecret,             "force_refresh": False         }
          result = self._request('POST', GET_ACCESS_TOKEN, data=data)         return result['access_token']
   | 
 
这样client的_request处理api返回错误,只要失败就返回空,然后日志打印出来错误
这样的好处是,后续逻辑中可以判断:
1 2 3 4 5
   | rep = client._request(...) if rep:      else:     
   | 
 
然后再搞个发送模版消息的类:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
   | from ..client import Client from ..loggings import logger from ..consts import *
  class SendMsgTemplate(Client):     def __init__(self, appid, appsecret):         super().__init__(appid, appsecret)
      def send_msg_template(self, to_user, template_id, content, url=""):         send_data = {            "touser":to_user,            "template_id":template_id,            "data":content,            "url":url                    }         res = self._request("POST", SEND_TEXT_MSG_TEMPLATE, data=send_data, params={"ACCESS_TOKEN": self.access})         if res:             logger.info("发送消息模板成功")             return res         else:             logger.error("发送消息模板失败")
   | 
 
这里要注意的是content格式为:
1 2 3 4 5 6 7 8
   | content = {     "first": {         "value": "first msg",     },     "keyword1": {         "value": "keyword1 msg",     } }
  | 
 
之后调用发送就行了:
1 2 3
   | from wechatMsgPushSDK.tools.send import SendMsg send = SendMsg('appid', 'appsecret') send.send_msg('openid', '给你男朋友买饭了, 宝, 别生气了')
   | 
 
详细代码在GitHub:
https://github.com/ximutech/wechatMsgPushSDK
不成熟还未开源,后续会完善,需要公众号回复1获取源码
再来看看loggings模块
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29
   | import logging import os
  def setup_logger(log_file_name='./logs/app.log'):
           if not os.path.exists('logs'):         os.mkdir('logs')
           logger = logging.getLogger('my_logger')     logger.setLevel(logging.DEBUG)
           log_file_path = os.path.join(os.getcwd(), log_file_name)     file_handler = logging.FileHandler(log_file_path)     file_handler.setLevel(logging.DEBUG)
           formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')     file_handler.setFormatter(formatter)
           logger.addHandler(file_handler)
      return logger
 
  logger = setup_logger()
   | 
 
果真chatGPT是个呆瓜,以前错误处理问chatGPT他给的处理方法及其复杂,因为我想捕获错误行到文件。
后来发现logger就有这个功能,只要设置一个格式化就行了。
Formatter这个类的解析中有格式化参数。错误行时lineno,错误路径是pathname。