wechatcomapp_channel.py 8.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183
  1. # -*- coding=utf-8 -*-
  2. import io
  3. import os
  4. import time
  5. import requests
  6. import web
  7. from wechatpy.enterprise import create_reply, parse_message
  8. from wechatpy.enterprise.crypto import WeChatCrypto
  9. from wechatpy.enterprise.exceptions import InvalidCorpIdException
  10. from wechatpy.exceptions import InvalidSignatureException, WeChatClientException
  11. from bridge.context import Context
  12. from bridge.reply import Reply, ReplyType
  13. from channel.chat_channel import ChatChannel
  14. from channel.wechatcom.wechatcomapp_client import WechatComAppClient
  15. from channel.wechatcom.wechatcomapp_message import WechatComAppMessage
  16. from common.log import logger
  17. from common.singleton import singleton
  18. from common.utils import compress_imgfile, fsize, split_string_by_utf8_length
  19. from config import conf, subscribe_msg
  20. from voice.audio_convert import any_to_amr, split_audio, any_to_mp3
  21. MAX_UTF8_LEN = 2048
  22. @singleton
  23. class WechatComAppChannel(ChatChannel):
  24. NOT_SUPPORT_REPLYTYPE = []
  25. def __init__(self):
  26. super().__init__()
  27. self.corp_id = conf().get("wechatcom_corp_id")
  28. self.secret = conf().get("wechatcomapp_secret")
  29. self.agent_id = conf().get("wechatcomapp_agent_id")
  30. self.token = conf().get("wechatcomapp_token")
  31. self.aes_key = conf().get("wechatcomapp_aes_key")
  32. print(self.corp_id, self.secret, self.agent_id, self.token, self.aes_key)
  33. logger.info(
  34. "[wechatcom] init: corp_id: {}, secret: {}, agent_id: {}, token: {}, aes_key: {}".format(self.corp_id,
  35. self.secret,
  36. self.agent_id,
  37. self.token,
  38. self.aes_key)
  39. )
  40. self.crypto = WeChatCrypto(self.token, self.aes_key, self.corp_id)
  41. self.client = WechatComAppClient(self.corp_id, self.secret)
  42. def startup(self):
  43. # start message listener
  44. urls = ("/wxcomapp", "channel.wechatcom.wechatcomapp_channel.Query")
  45. app = web.application(urls, globals(), autoreload=False)
  46. port = conf().get("wechatcomapp_port", 9898)
  47. web.httpserver.runsimple(app.wsgifunc(), ("0.0.0.0", port))
  48. def send(self, reply: Reply, context: Context):
  49. receiver = context["receiver"]
  50. if reply.type in [ReplyType.TEXT, ReplyType.ERROR, ReplyType.INFO]:
  51. reply_text = reply.content
  52. texts = split_string_by_utf8_length(reply_text, MAX_UTF8_LEN)
  53. if len(texts) > 1:
  54. logger.info("[wechatcom] text too long, split into {} parts".format(len(texts)))
  55. for i, text in enumerate(texts):
  56. self.client.message.send_text(self.agent_id, receiver, text)
  57. if i != len(texts) - 1:
  58. time.sleep(0.5) # 休眠0.5秒,防止发送过快乱序
  59. logger.info("[wechatcom] Do send text to {}: {}".format(receiver, reply_text))
  60. elif reply.type == ReplyType.VOICE:
  61. try:
  62. media_ids = []
  63. file_path = reply.content
  64. amr_file = os.path.splitext(file_path)[0] + ".amr"
  65. any_to_mp3(file_path, amr_file)
  66. duration, files = split_audio(amr_file, 60 * 1000)
  67. if len(files) > 1:
  68. logger.info("[wechatcom] voice too long {}s > 60s , split into {} parts".format(duration / 1000.0,
  69. len(files)))
  70. for path in files:
  71. response = self.client.media.upload("voice", open(path, "rb"))
  72. logger.debug("[wechatcom] upload voice response: {}".format(response))
  73. media_ids.append(response["media_id"])
  74. except WeChatClientException as e:
  75. logger.error("[wechatcom] upload voice failed: {}".format(e))
  76. return
  77. try:
  78. os.remove(file_path)
  79. if amr_file != file_path:
  80. os.remove(amr_file)
  81. except Exception:
  82. pass
  83. for media_id in media_ids:
  84. self.client.message.send_voice(self.agent_id, receiver, media_id)
  85. time.sleep(1)
  86. logger.info("[wechatcom] sendVoice={}, receiver={}".format(reply.content, receiver))
  87. elif reply.type == ReplyType.IMAGE_URL: # 从网络下载图片
  88. img_url = reply.content
  89. pic_res = requests.get(img_url, stream=True)
  90. image_storage = io.BytesIO()
  91. for block in pic_res.iter_content(1024):
  92. image_storage.write(block)
  93. sz = fsize(image_storage)
  94. if sz >= 10 * 1024 * 1024:
  95. logger.info("[wechatcom] image too large, ready to compress, sz={}".format(sz))
  96. image_storage = compress_imgfile(image_storage, 10 * 1024 * 1024 - 1)
  97. logger.info("[wechatcom] image compressed, sz={}".format(fsize(image_storage)))
  98. image_storage.seek(0)
  99. try:
  100. response = self.client.media.upload("image", image_storage)
  101. logger.debug("[wechatcom] upload image response: {}".format(response))
  102. except WeChatClientException as e:
  103. logger.error("[wechatcom] upload image failed: {}".format(e))
  104. return
  105. self.client.message.send_image(self.agent_id, receiver, response["media_id"])
  106. logger.info("[wechatcom] sendImage url={}, receiver={}".format(img_url, receiver))
  107. elif reply.type == ReplyType.IMAGE: # 从文件读取图片
  108. image_storage = reply.content
  109. sz = fsize(image_storage)
  110. if sz >= 10 * 1024 * 1024:
  111. logger.info("[wechatcom] image too large, ready to compress, sz={}".format(sz))
  112. image_storage = compress_imgfile(image_storage, 10 * 1024 * 1024 - 1)
  113. logger.info("[wechatcom] image compressed, sz={}".format(fsize(image_storage)))
  114. image_storage.seek(0)
  115. try:
  116. response = self.client.media.upload("image", image_storage)
  117. logger.debug("[wechatcom] upload image response: {}".format(response))
  118. except WeChatClientException as e:
  119. logger.error("[wechatcom] upload image failed: {}".format(e))
  120. return
  121. self.client.message.send_image(self.agent_id, receiver, response["media_id"])
  122. logger.info("[wechatcom] sendImage, receiver={}".format(receiver))
  123. class Query:
  124. def GET(self):
  125. channel = WechatComAppChannel()
  126. params = web.input()
  127. logger.info("[wechatcom] receive params: {}".format(params))
  128. try:
  129. signature = params.msg_signature
  130. timestamp = params.timestamp
  131. nonce = params.nonce
  132. echostr = params.echostr
  133. echostr = channel.crypto.check_signature(signature, timestamp, nonce, echostr)
  134. except InvalidSignatureException:
  135. raise web.Forbidden()
  136. return echostr
  137. def POST(self):
  138. channel = WechatComAppChannel()
  139. params = web.input()
  140. logger.info("[wechatcom] receive params: {}".format(params))
  141. try:
  142. signature = params.msg_signature
  143. timestamp = params.timestamp
  144. nonce = params.nonce
  145. message = channel.crypto.decrypt_message(web.data(), signature, timestamp, nonce)
  146. except (InvalidSignatureException, InvalidCorpIdException):
  147. raise web.Forbidden()
  148. msg = parse_message(message)
  149. logger.debug("[wechatcom] receive message: {}, msg= {}".format(message, msg))
  150. if msg.type == "event":
  151. if msg.event == "subscribe":
  152. reply_content = subscribe_msg()
  153. if reply_content:
  154. reply = create_reply(reply_content, msg).render()
  155. res = channel.crypto.encrypt_message(reply, nonce, timestamp)
  156. return res
  157. else:
  158. try:
  159. wechatcom_msg = WechatComAppMessage(msg, client=channel.client)
  160. except NotImplementedError as e:
  161. logger.debug("[wechatcom] " + str(e))
  162. return "success"
  163. context = channel._compose_context(
  164. wechatcom_msg.ctype,
  165. wechatcom_msg.content,
  166. isgroup=False,
  167. msg=wechatcom_msg,
  168. )
  169. if context:
  170. channel.produce(context)
  171. return "success"