python搭建handsame时光机


一、简介

之前用handsame主题时也是采用自建微信服务号的方式来直接给站点发送时光机信息,由于之前使用的后台采用的是php语言编写,现在想加一个查询每日基金收益情况的功能,对没学过php语法的我来说甚是困难,本着方便自己的原则,于是采用python改写之前的程序,然后增加一个可以查询每日基金估值收入的功能。

二、环境要求

  • linux系统(本人使用centos系统)
  • python3
  • nginx反代配置(宝塔面板即可)

三、前提准备

  1. 安装python3和pip,此处省略安装过程。
  2. 第三方库要求

    # requestsment.txt文件如下,自行使用pip进行安装
    
    Pillow
    requests
    colorama
    prettytable
    pymysql
    wechatpy
    xmltodict
    flask
    sqlalchemy
    cryptography
  3. 微信公众号服务号

自行完成开发者设置,获取

  1. 公网服务器一台
  2. 备案域名一个

四、核心代码

  1. 数据库连接类

    #!/usr/bin/python3
    # -*- coding: utf-8 -*-
    ## author:zbl
    import threading
    
    import pymysql
    import json
    import os
    
    from sqlalchemy import create_engine
    from sqlalchemy.orm import sessionmaker, scoped_session
    
    
    class DBConnect(object):
        _instance_lock = threading.Lock()
        def __init__(cls):
            pass
    
        #单例
        def __new__(cls, *args, **kwargs):
            if not hasattr(cls, '_instance'):
                with DBConnect._instance_lock:
                    if not hasattr(cls, '_instance'):
                        cls.__creatSession(cls)
                        cls._instance = object.__new__(cls, *args, **kwargs)
    
            return cls._instance
    
        def __creatSession(cls):
            """
                创建数据库Session
            """
            root_dir = os.path.abspath('.')
            # root_dir = os.path.dirname(os.path.abspath('.'))
            f = open(root_dir + "/conf/Config.json", 'r', encoding='utf8')
            dbInfo = json.load(f)
            str = 'mysql+pymysql://{0}:{1}@{2}/{3}'.format(dbInfo['username'], dbInfo['passwd'], dbInfo['host'],
                                                           dbInfo['datebase'])
    
            cls._engine = create_engine(str)
            # self._db = pymysql.connect(host = dbInfo['host'], port = dbInfo['port'], user = dbInfo['username'], passwd = dbInfo['passwd'], database = dbInfo['datebase'])
            cls._DBSession = scoped_session(sessionmaker(bind=cls._engine))
            f.close()
    
    
        # 获取session
        def getSession(cls):
            session = cls._DBSession
            return session()
    
    
        # 获取engine
        def getEngine(cls):
            return cls._engine
  2. 配置文件

    {
      "host": "数据库连接地址",
      "port": "端口号(请去除双引号)",
      "username": "用户名",
      "passwd": "密码",
      "datebase": "数据库名",
      "appid": "微信公众号appid",
      "secret": "微信公众号secret"
    }
  3. 各model类

    #!/usr/bin/python3
    # -*- coding: utf-8 -*-
    ## author:zbl
    from sqlalchemy import Column, text
    from sqlalchemy.dialects.mysql import INTEGER, TEXT, VARCHAR
    from sqlalchemy.ext.declarative import declarative_base
    
    Base = declarative_base()
    metadata = Base.metadata
    
    
    class Cross(Base):
        __tablename__ = 'cross'
    
        id = Column(INTEGER(11), primary_key=True)
        openid = Column(VARCHAR(255), nullable=False, server_default=text("''"))
        url = Column(VARCHAR(255))
        timecode = Column(VARCHAR(255))
        cid = Column(INTEGER(11))
        mid = Column(INTEGER(11))
        msg_type = Column(VARCHAR(16))
        content = Column(TEXT)
    
    
    #!/usr/bin/python3
    # -*- coding: utf-8 -*-
    ## author:zbl
    
    from sqlalchemy import Column, DECIMAL, String
    from sqlalchemy.dialects.mysql import INTEGER
    from sqlalchemy.ext.declarative import declarative_base
    
    Base = declarative_base()
    metadata = Base.metadata
    
    
    class Fund(Base):
        __tablename__ = 'fund'
        __table_args__ = {'comment': '基金信息表'}
    
        id = Column(INTEGER(11), primary_key=True, comment='主键')
        code = Column(String(8), comment='基金代码')
        name = Column(String(50), comment='基金名称')
        share = Column(DECIMAL(20, 3), comment='份额')
    
    
        def __repr__(self):
            return '%s(%r)' % (self.__class__.__name__, self.id)
    
    
    #!/usr/bin/python3
    # -*- coding: utf-8 -*-
    ## author:zbl
    
    from sqlalchemy import Column
    from sqlalchemy.dialects.mysql import INTEGER
    from sqlalchemy.ext.declarative import declarative_base
    
    Base = declarative_base()
    metadata = Base.metadata
    
    
    class RelaWxFund(Base):
        __tablename__ = 'rela_wx_fund'
        __table_args__ = {'comment': '微信-基金关联表'}
    
        id = Column(INTEGER(11), primary_key=True, comment='主键')
        wx_info_id = Column(INTEGER(11), comment='微信信息表主键id')
        fund_id = Column(INTEGER(11), comment='基金表主键id')
    
    
    #!/usr/bin/python3
    # -*- coding: utf-8 -*-
    ## author:zbl
    
    from sqlalchemy import Column, String
    from sqlalchemy.dialects.mysql import INTEGER
    from sqlalchemy.ext.declarative import declarative_base
    
    Base = declarative_base()
    metadata = Base.metadata
    
    
    class WxInfo(Base):
        __tablename__ = 'wx_info'
        __table_args__ = {'comment': '微信信息表'}
    
        id = Column(INTEGER(11), primary_key=True, comment='主键')
        wx_id = Column(String(50), comment='微信id')
        wx_name = Column(String(50), comment='微信名')
        is_del = Column(String(1), comment="是否删除")
  4. handsame时光机操作类

    • 绑定界面接口类

      import json
      
      from flask import render_template, request
      
      from model.Cross import Cross
      from utils.tool import getJiJinInfo
      from . import pageManage
      from repository.CrossRepository import CrossRepository
      
      
      @pageManage.route("/bind",methods = ["POST", "GET"])
      def bind():
          data = request.get_data()
          try:
              cross = Cross()
      
              cross.url = request.form.get("url")
              cross.openid = request.form.get("openid")
              cross.timecode = request.form.get("timecode")
              cross.cid = request.form.get("cid")
              cross.mid = request.form.get("mid")
      
      
              if CrossRepository.getCrossListByOpenId(cross.openid):
                  # 更新
                  result = CrossRepository.updateCross(cross)
                  return "2"
              else:
                  # 添加
                  result = CrossRepository.addCross(cross)
                  return "1"
          except Exception as e:
              print("Exception:", e)
              return "参数错误!"
      
      
      @pageManage.route("/getBind/<openid>",methods = ["POST", "GET"])
      def getBind(openid):
      
          cross = CrossRepository.getCrossListByOpenId(openid)
      
          return render_template('bind.html',cross = cross, openid = openid)
    • 时光机工具类(注意此处需要自行修改您的时光机网址信息)

      import json
      import urllib
      from hashlib import md5
      
      import requests
      
      from repository.CrossRepository import CrossRepository
      
      myrequests = requests.Session()
      
      
      def request_port(url,post_data):
          #判空
          if url or post_data:
              return False
          o = ""
      
          for k,v in post_data.items():
              o += k + "=" + urllib.parse.urlencode(v) + "&"
      
          post_data = o[0:-1]
      
      
          response = myrequests.post(url=url,data=post_data)
      
          if response:
              result = response.text
              return result
      
          return False
      
      
      def push(content, cross):
          """
              推送数据
          """
          data = {
              "cid": cross.cid,
              "mid": cross.mid,
              "content": content,
              "action": "send_talk",
              "time_code": md5(str(cross.timecode).encode("utf-8")).hexdigest(),
              "msg_type": cross.msg_type,
              "token": "weixin"
          }
      
      
          response = myrequests.post(url=cross.url, data = data)
      
      
          if response:
              result = response.text
              return result
      
          return False
      
      def judgeStatus(status):
          if status == "1":
              return "biubiubiu~发送成功";
          elif status == "-1":
              return "请求参数错误";
          elif status == "-2":
              return "信息缺失";
          elif status == "-3":
              return "身份验证失败";
          else:
              return status
      
      
      def updateData(openid, buffer, type, msg_type, content):
          """
              更新数据列
          """
          if type == 'mixed_talk' or type == 'mixed_post':
              content = buffer + msg_type + "->" + content + "@"
          else:
              CrossRepository.updateCrossMsgTypeByOpenId(openid, type)
      
          CrossRepository.updateCrossContentByOpenId(openid, content)
      
      
      
      def dataProcessing(xml_dict):
          # MsgType是消息类型 这里是提取消息类型
          msgType = xml_dict.get("MsgType")
          content = xml_dict.get("Content")
      
      
          if content == "绑定":
              cross = CrossRepository.getCrossListByOpenId(xml_dict.get("FromUserName"))
              if cross:
                  return "<a href='您的网址/混淆信息/getBind/" + cross.openid + "'>您已绑定,点击查看或修改</a>";
              else:
                  return "<a href='您的网址/混淆信息/getBind/" + xml_dict.get("FromUserName") + "'>点击绑定</a>";
      
          elif content == "解绑" or content ==  "解除绑定":
              # 删除绑定信息
      
              result = CrossRepository.deleteCrossByOpenId(xml_dict.get("FromUserName"))
              if result:
                  return "已经解除绑定"
              else:
                  return "操作失败,未知错误"
          elif content == "帮助":
              return """1.发送 绑定 进行绑定或修改绑定信息
      2.向时光机发送消息
      支持文字、图片、地理位置、链接四种消息类型。
      
      其他消息类型等后续开发,暂不支持(如果发送了,会提示不支持该类型的,如语音消息)。
      
      如果发送的是图片会自动将图片存放到typecho 的 usr/uploads/time 目录下。
      
      支持发送私密说说。只需要在发送内容前加入#即可。 举例发送:#这是私密的说说,仅发送者可见。
      
      连续发送多条信息
      发送【开始】,开始一轮连续发送
      发送【结束】,结束当前轮的发送
      
      3.发送文章
      输入【发文章】,开始文章发送,支持多条消息,支持多条消息图文混合
      输入【结束】,结束文章发送
      
      4.其他操作
      发送 博客收到你的博客地址的链接
      发送 发博客收到发博文的字的链接
      发送 解除绑定 或 解绑 可删除掉你的绑定信息
      发送 帮助 查看帮助信息
      
      5.<a href=\'https://auth.ihewro.com/user/docs/#/wechat\'>图文教程</a>"""
          else:
              #查询是否存在记录
              cross = CrossRepository.getCrossListByOpenId(xml_dict.get("FromUserName"))
              if cross:
                  if content == "文章":
                      return '<a href=\''+ cross.url +'/admin/write-post.php\'>发布文章</a>';
                  elif content == "博客":
                      return '<a href=\'' + cross.url + '\'>打开博客</a>';
                  else:
                      if content == "发文章":
                          CrossRepository.updateCrossMsgTypeByOpenId(cross.openid,"mixed_post")
                          return "开启博文构造模式,请继续发送消息,下面的消息最后将组成一篇完整的文章发送到博客,发送『结束』结束本次发送,发送『取消』取消本次发送~";
                      elif content == "取消":
                          CrossRepository.updateCrossMsgTypeByOpenId(cross.openid,'')
                          return "已取消发送";
                      elif content == "开始":
                          CrossRepository.updateCrossMsgTypeByOpenId(cross.openid,"mixed_talk")
                          return "当前处于混合消息模式,请继续,发送『结束』结束本次发送,发送『取消』取消本次发送~";
                      elif content == "结束":
      
                          str = cross.content
                          if not str:
                              CrossRepository.updateCrossMsgTypeByOpenId(cross.openid, '')
                              return "已结束,本次操作未发送任何信息~";
                          strList = str.split('@')
      
                          con = []
                          for i in strList:
                              con.append(i.split('->'))
      
                          result = []
                          for j in con:
                              if j[0] != '':
                                  result.append({
                                      'type': j[0],
                                      'content': j[1]
                                  })
      
                          content = {
                              'results': result
                          }
                          #提交
                          status = push(json.dumps(content),cross)
      
                          #重置
                          CrossRepository.updateCrossMsgTypeByOpenId(cross.openid, '')
      
                          #判断是否成功
                          return judgeStatus(status)
                      else:
                          buffer = cross.content
                          type = cross.msg_type
                          if not type:
                              type = msgType
      
                          if msgType == "location":
                              content = xml_dict.get("Location_X") + "#" + xml_dict.get("Location_Y") \
                                        + "#" + xml_dict.get("Label") + "#http://restapi.amap.com/v3/staticmap?location=" \
                                        + xml_dict.get("Location_Y") + "," + xml_dict.get("Location_X") \
                                        + "&zoom=10&size=750*300&markers=mid,,A:" + xml_dict.get("Location_Y") \
                                        + "," + xml_dict.get("Location_X") + "&key=2a5048a9ad453654e037b6a68abd13c4"
      
      
      
                              updateData(cross.openid, buffer, type, "location", content)
      
                          elif msgType == "image":
                              content = xml_dict.get("PicUrl")
                              updateData(cross.openid, buffer, type, "image", content)
      
                          elif msgType == "link":
                              content = xml_dict.get("Title") + "#" + xml_dict.get("Description") + "#" + xml_dict.get("Url")
                              updateData(cross.openid, buffer, type, "link", content)
      
                          elif msgType == "text":
                              if xml_dict.get("Content")[0] == "#":
                                  content = "[secret]" + xml_dict.get("Content")[1:] + "[/secret]"
                              else:
                                  content = xml_dict.get("Content")
                              updateData(cross.openid, buffer, type, "text", content)
      
                          else:
                              return "不支持的消息类型";
      
                          # 再次查询
                          cross = CrossRepository.getCrossListByOpenId(xml_dict.get("FromUserName"))
                          type = cross.msg_type
      
                          if type == "mixed_post" or type == "mixed_talk":
                              # 重置
                              # CrossRepository.updateCrossMsgTypeByOpenId(cross.openid, '')
                              return "请继续,发送『结束』结束本次发送,发送『取消』取消本次发送~";
                          else:
                              status = push(cross.content, cross)
                              # 重置
                              # CrossRepository.updateCrossMsgTypeByOpenId(cross.openid, '')
                              return judgeStatus(status)
      
      
              else:
                  return "<a href='您的网址/混淆信息/getBind/" + xml_dict.get("FromUserName") + "'>您还未绑定,点击绑定</a>";
      
              return "未知错误!"
  5. weChatClient操作图片工具类

    import json
    import logging
    import os
    import threading
    
    from wechatpy import WeChatClient
    
    
    class MyWeChatClient(object):
        """
        自用操作微信API
        """
        _instance_lock = threading.Lock()
        def __init__(self):
            pass
    
        def __new__(cls, *args, **kwargs):
            if not hasattr(cls, '_instance'):
                with MyWeChatClient._instance_lock:
                    if not hasattr(cls, '_instance'):
                        # root_dir = os.path.dirname(os.path.abspath('.'))
                        f = open("./conf/Config.json", 'r', encoding='utf8')
                        info = json.load(f)
                        appid = info["appid"]
                        secret = info["secret"]
                        f.close()
                        cls._myWeChatClient = WeChatClient(appid, secret)
                        MyWeChatClient._instance = super().__new__(cls)
    
                return MyWeChatClient._instance
    
        def getWeChatClient(cls):
            """
            获取连接
            :return: WeChatClient
            """
            return cls._myWeChatClient
    
    
    wx = MyWeChatClient()
    
    
    def uploadImage(filePath):
        """
        上传图片
        :param filePath:
        :return:
        """
        try:
            # 上传文件
            result = wx.getWeChatClient().media.upload("image",open(filePath,'rb'))
            # 返回media_id值
            return result['media_id']
        except Exception as e:
            print(e)
            return False
    
    def downloadImage(mediaId):
        """
        下载图片
        :param mediaId:
        :return:
        """
        try:
            result = wx.getWeChatClient().media.download(mediaId)
    
        except:
            return False
    
    
    def sendImage(media_id,wxId,url=None):
        """
            发送图片
        :return:
        """
        try:
            # if url:
            #     result = wx.getWeChatClient().message.send
            # else:
            result = wx.getWeChatClient().message.send_image(wxId,media_id)
    
            return result
    
        except Exception as e:
            logging.error(e)
            return False
    
    def sendMessage(content,wxId,url=None):
        """
            发送信息
        :return:
        """
        try:
            # if url:
            #     result = wx.getWeChatClient().message.send
            # else:
            result = wx.getWeChatClient().message.send_text(wxId,content)
    
            return result
    
        except Exception as e:
            logging.error(e)
            return False
  6. app.py

    #!/usr/bin/python3
    # -*- coding: utf-8 -*-
    ## author:zbl
    import logging
    
    from flask import Flask, request, abort, render_template
    import hashlib
    import xmltodict
    import time,os
    from wechatpy.replies import ImageReply
    from wechatpy import parse_message
    
    # 微信的token令牌
    from db.DBConnect import DBConnect
    from model.WxInfo import WxInfo
    from repository.CrossRepository import CrossRepository
    from repository.WxInfoRepository import WxInfoRepository
    from utils.MyWeChatpy.MyWeChatClient import uploadImage, sendImage, sendMessage
    from utils.handsome import dataProcessing
    from utils.readTxt import GetData
    from utils.tool import getJiJinInfo
    
    WECHAT_TOKEN = '自行填写微信公众号开发者Token'
    app = Flask(__name__)
    
    @app.route("/wx", methods=["POST","GET"])
    def wechat():
        """验证服务器地址的有效性"""
        # 开发者提交信息后,微信服务器将发送GET请求到填写的服务器地址URL上,GET请求携带四个参数:
        # signature:微信加密, signature结合了开发者填写的token参数和请求中的timestamp参数 nonce参数
        # timestamp:时间戳(chuo这是拼音)
        # nonce: 随机数
        # echostr: 随机字符串
        # 接收微信服务器发送参数
        signature = request.args.get("signature")
        timestamp = request.args.get("timestamp")
        nonce = request.args.get("nonce")
    
        # 校验参数
        # 校验流程:
        # 将token、timestamp、nonce三个参数进行字典序排序
        # 将三个参数字符串拼接成一个字符串进行sha1加密
        # 开发者获得加密后的字符串可与signature对比,标识该请求来源于微信
        if not all([signature, timestamp, nonce]):
            # 抛出400错误
            abort(400)
    
        # 按照微信的流程计算签名
        li = [WECHAT_TOKEN, timestamp, nonce]
        # 排序
        li.sort()
        # 拼接字符串
        tmp_str = "".join(li)
        tmp_str = tmp_str.encode('utf-8')
    
        # 进行sha1加密, 得到正确的签名值
        sign = hashlib.sha1(tmp_str).hexdigest()
    
        # 将自己计算的签名值, 与请求的签名参数进行对比, 如果相同, 则证明请求来自微信
        if signature != sign:
            # 代表请求不是来自微信
            # 弹出报错信息, 身份有问题
            abort(403)
        else:
            # 表示是微信发送的请求
            if request.method == "GET":
                # 表示第一次接入微信服务器的验证
                echostr = request.args.get("echostr")
                # 校验echostr
                if not echostr:
                    abort(400)
                return echostr
    
            elif request.method == "POST":
                # 表示微信服务器转发消息过来
                # 拿去xml的请求数据
                xml_str = request.data
    
                # 当xml_str为空时
                if not xml_str:
                    abort(400)
    
                # 对xml字符串进行解析成字典
                xml_dict = xmltodict.parse(xml_str)
    
                xml_dict = xml_dict.get("xml")
                # print(xml_dict)
                # MsgType是消息类型 这里是提取消息类型
                msg_type = xml_dict.get("MsgType")
                # logging.info(xml_dict)
    
                # 标志位
                flag = False
    
                if xml_dict.get("Content") == "openid":
                    resp_dict = {
                        "xml": {
                            "ToUserName": xml_dict.get("FromUserName"),
                            "FromUserName": xml_dict.get("ToUserName"),
                            "CreateTime": int(time.time()),
                            "MsgType": "text",
                            "Content": xml_dict.get("FromUserName")
                        }
                    }
                    return resp_dict
    
                elif xml_dict.get("Content") == "基金":
                    # 读取值返回
                    result = getJiJinInfo(xml_dict.get("FromUserName"))
    
                    if result:
    
                        response = uploadImage(
                            os.path.abspath(".") + "/images/" + xml_dict.get("FromUserName") + "jjInfo.jpg")
    
                        if response:
                            msg = parse_message(xml_str)
                            reply = ImageReply(media_id=response, message=msg)
                            # print(reply)
                            # print(reply.render())
                            xml = reply.render()
                            return xml
    
    
                        else:
                            resp_dict = {
                                "xml": {
                                    "ToUserName": xml_dict.get("FromUserName"),
                                    "FromUserName": xml_dict.get("ToUserName"),
                                    "CreateTime": int(time.time()),
                                    "MsgType": "text",
                                    "Content": "上传出错!"
                                }
                            }
    
                else:
                    result = dataProcessing(xml_dict)
                    resp_dict = {
                        "xml": {
                            "ToUserName": xml_dict.get("FromUserName"),
                            "FromUserName": xml_dict.get("ToUserName"),
                            "CreateTime": int(time.time()),
                            "MsgType": "text",
                            "Content": result
                        }
                    }
                resp_xml_str = xmltodict.unparse(resp_dict)
                # 返回消息数据给微信服务器
                return resp_xml_str
    @app.route("/pushTask",methods = ["POST", "GET"])
    def pushTask():
        """
            主动推送当日基金涨跌情况给用户
        """
        token = request.args.get("token")
        if token == "right":
            #获取目前用户
    
            wxInfoList = WxInfoRepository.getWxInfoList()
            msg = ""
            if wxInfoList:
                for i in wxInfoList:
                    # 读取值返回
                    result = getJiJinInfo(i[0])
                    # xml_str = """<xml>
                    #                     <ToUserName><![CDATA[gh_a6026c9fc172]]></ToUserName>\n
                    #                     <FromUserName><![CDATA[{0}]]></FromUserName>\n
                    #                     <CreateTime>{1}</CreateTime>\n
                    #                     <MsgType><![CDATA[text]]></MsgType>\n
                    #                     <Content><![CDATA[jj]]></Content>\n
                    #                     <MsgId>23055705217771921</MsgId>\n
                    #                 </xml>""".format(i[0], int(time.time()))
                    # msg = parse_message(xml_str)
                    if result:
                        # 上传临时素材,基金信息图片
                        response = uploadImage(
                            os.path.abspath(".") + "/images/" + i[0] + "jjInfo.jpg")
    
                        if response:
                            # 主动发送图片
                            sendImage(response,i[0])
                            msg += "获取成功:" + i[0]
                        else:
                            # 发送消息,提示失败
                            sendMessage("获取基金信息失败!", i[0])
                            msg += "获取基金信息失败!" + i[0]
                return msg
            return "查询失败!"
        else:
            return "非法访问!"

五、改进点

  1. 采用mysql存储时光机动态内容信息
  2. 基金项目修改原项目文件存储的方式,使用mysql数据库存储基金信息。后续可能会采用sqlite数据库文件进行存储,节省资源
  3. python使用flask框架,采用sqlalchemy进行数据库操作,首次用到flask中蓝图模块,对后台接口地址进行分类、分文件编写
  4. 采用数据仓库服务概念,利用单例模式创建数据库连接,减少程序资源占用率,提高效率
  5. 采用nginx反向代理,外部通过制定域名访问80端口由nginx转发至内部程序监听端口实现程序的安全性
  6. 后续优化点:在前端界面实现对基金份额增删查改操作,目前后端代码已基本完成,前端代码暂未编写,后续在考虑是集成至微信小程序中还是在公众号新建菜单的方式进行实现

六、演示

绑定界面

发送动态

查看帮助

时光机界面

查看基金涨跌

七、参考项目

php版:handsame时光机

python版:python获取基金涨跌情况

最后修改:2021 年 01 月 28 日 02 : 58 PM
如果觉得我的文章对你有用,请随意赞赏