Эх сурвалжийг харах

Add wechatmp_service channel

JS00000 3 жил өмнө
parent
commit
d35d01f980

+ 1 - 1
app.py

@@ -28,7 +28,7 @@ def run():
             # os.environ['WECHATY_PUPPET_SERVICE_ENDPOINT'] = '127.0.0.1:9001'
 
         channel = channel_factory.create_channel(channel_name)
-        if channel_name in ['wx','wxy','wechatmp']:
+        if channel_name in ['wx','wxy','wechatmp','wechatmp_service']:
             PluginManager().load_plugins()
 
         # startup channel

+ 4 - 1
channel/channel_factory.py

@@ -19,5 +19,8 @@ def create_channel(channel_type):
         return TerminalChannel()
     elif channel_type == 'wechatmp':
         from channel.wechatmp.wechatmp_channel import WechatMPChannel
-        return WechatMPChannel()
+        return WechatMPChannel(passive_reply = True)
+    elif channel_type == 'wechatmp_service':
+        from channel.wechatmp.wechatmp_channel import WechatMPChannel
+        return WechatMPChannel(passive_reply = False)
     raise RuntimeError

+ 51 - 0
channel/wechatmp/ServiceAccount.py

@@ -0,0 +1,51 @@
+import web
+import time
+import channel.wechatmp.reply as reply
+import channel.wechatmp.receive as receive
+from config import conf
+from common.log import logger
+from bridge.context import *
+from channel.wechatmp.common import * 
+from channel.wechatmp.wechatmp_channel import WechatMPChannel
+
+# This class is instantiated once per query
+class Query():
+
+    def GET(self):
+        return verify_server(web.input())
+
+    def POST(self):
+        # Make sure to return the instance that first created, @singleton will do that. 
+        channel_instance = WechatMPChannel()
+        try:
+            webData = web.data()
+            # logger.debug("[wechatmp] Receive request:\n" + webData.decode("utf-8"))
+            wechatmp_msg = receive.parse_xml(webData)
+            if wechatmp_msg.msg_type == 'text':
+                from_user = wechatmp_msg.from_user_id
+                message = wechatmp_msg.content.decode("utf-8")
+                message_id = wechatmp_msg.msg_id
+
+                logger.info("[wechatmp] {}:{} Receive post query {} {}: {}".format(web.ctx.env.get('REMOTE_ADDR'), web.ctx.env.get('REMOTE_PORT'), from_user, message_id, message))
+                context = channel_instance._compose_context(ContextType.TEXT, message, isgroup=False, msg=wechatmp_msg)
+                if context:
+                    # set private openai_api_key
+                    # if from_user is not changed in itchat, this can be placed at chat_channel
+                    user_data = conf().get_user_data(from_user)
+                    context['openai_api_key'] = user_data.get('openai_api_key') # None or user openai_api_key
+                    channel_instance.produce(context)
+                # The reply will be sent by channel_instance.send() in another thread
+                return "success"
+
+            elif wechatmp_msg.msg_type == 'event':
+                logger.info("[wechatmp] Event {} from {}".format(wechatmp_msg.Event, wechatmp_msg.from_user_id))
+                content = subscribe_msg()
+                replyMsg = reply.TextMsg(wechatmp_msg.from_user_id, wechatmp_msg.to_user_id, content)
+                return replyMsg.send()
+            else:
+                logger.info("暂且不处理")
+                return "success"
+        except Exception as exc:
+            logger.exception(exc)
+            return exc
+

+ 144 - 0
channel/wechatmp/SubscribeAccount.py

@@ -0,0 +1,144 @@
+import web
+import time
+import channel.wechatmp.reply as reply
+import channel.wechatmp.receive as receive
+from config import conf
+from common.log import logger
+from bridge.context import *
+from channel.wechatmp.common import * 
+from channel.wechatmp.wechatmp_channel import WechatMPChannel
+
+# This class is instantiated once per query
+class Query():
+
+    def GET(self):
+        return verify_server(web.input())
+
+    def POST(self):
+        # Make sure to return the instance that first created, @singleton will do that. 
+        channel_instance = WechatMPChannel()
+        try:
+            query_time = time.time()
+            webData = web.data()
+            # logger.debug("[wechatmp] Receive request:\n" + webData.decode("utf-8"))
+            wechatmp_msg = receive.parse_xml(webData)
+            if wechatmp_msg.msg_type == 'text':
+                from_user = wechatmp_msg.from_user_id
+                to_user = wechatmp_msg.to_user_id
+                message = wechatmp_msg.content.decode("utf-8")
+                message_id = wechatmp_msg.msg_id
+
+                logger.info("[wechatmp] {}:{} Receive post query {} {}: {}".format(web.ctx.env.get('REMOTE_ADDR'), web.ctx.env.get('REMOTE_PORT'), from_user, message_id, message))
+
+                cache_key = from_user
+                cache = channel_instance.cache_dict.get(cache_key)
+
+                reply_text = ""
+                # New request
+                if cache == None:
+                    # The first query begin, reset the cache
+                    channel_instance.cache_dict[cache_key] = (0, "")
+
+                    context = channel_instance._compose_context(ContextType.TEXT, message, isgroup=False, msg=wechatmp_msg)
+                    if context:
+                        # set private openai_api_key
+                        # if from_user is not changed in itchat, this can be placed at chat_channel
+                        user_data = conf().get_user_data(from_user)
+                        context['openai_api_key'] = user_data.get('openai_api_key') # None or user openai_api_key
+                        channel_instance.produce(context)
+
+
+                    channel_instance.query1[cache_key] = False
+                    channel_instance.query2[cache_key] = False
+                    channel_instance.query3[cache_key] = False
+                # Request again
+                elif cache[0] == 0 and channel_instance.query1.get(cache_key) == True and channel_instance.query2.get(cache_key) == True and channel_instance.query3.get(cache_key) == True:
+                    channel_instance.query1[cache_key] = False  #To improve waiting experience, this can be set to True.
+                    channel_instance.query2[cache_key] = False  #To improve waiting experience, this can be set to True.
+                    channel_instance.query3[cache_key] = False
+                elif cache[0] >= 1:
+                    # Skip the waiting phase
+                    channel_instance.query1[cache_key] = True
+                    channel_instance.query2[cache_key] = True
+                    channel_instance.query3[cache_key] = True
+
+
+                cache = channel_instance.cache_dict.get(cache_key)
+                if channel_instance.query1.get(cache_key) == False:
+                    # The first query from wechat official server
+                    logger.debug("[wechatmp] query1 {}".format(cache_key))
+                    channel_instance.query1[cache_key] = True
+                    cnt = 0
+                    while cache[0] == 0 and cnt < 45:
+                        cnt = cnt + 1
+                        time.sleep(0.1)
+                        cache = channel_instance.cache_dict.get(cache_key)
+                    if cnt == 45:
+                        # waiting for timeout (the POST query will be closed by wechat official server)
+                        time.sleep(5)
+                        # and do nothing
+                        return
+                    else:
+                        pass
+                elif channel_instance.query2.get(cache_key) == False:
+                    # The second query from wechat official server
+                    logger.debug("[wechatmp] query2 {}".format(cache_key))
+                    channel_instance.query2[cache_key] = True
+                    cnt = 0
+                    while cache[0] == 0 and cnt < 45:
+                        cnt = cnt + 1
+                        time.sleep(0.1)
+                        cache = channel_instance.cache_dict.get(cache_key)
+                    if cnt == 45:
+                        # waiting for timeout (the POST query will be closed by wechat official server)
+                        time.sleep(5)
+                        # and do nothing
+                        return
+                    else:
+                        pass
+                elif channel_instance.query3.get(cache_key) == False:
+                    # The third query from wechat official server
+                    logger.debug("[wechatmp] query3 {}".format(cache_key))
+                    channel_instance.query3[cache_key] = True
+                    cnt = 0
+                    while cache[0] == 0 and cnt < 45:
+                        cnt = cnt + 1
+                        time.sleep(0.1)
+                        cache = channel_instance.cache_dict.get(cache_key)
+                    if cnt == 45:
+                        # Have waiting for 3x5 seconds
+                        # return timeout message
+                        reply_text = "【正在响应中,回复任意文字尝试获取回复】"
+                        logger.info("[wechatmp] Three queries has finished For {}: {}".format(from_user, message_id))
+                        replyPost = reply.TextMsg(from_user, to_user, reply_text).send()
+                        return replyPost
+                    else:
+                        pass
+
+                if float(time.time()) - float(query_time) > 4.8:
+                    logger.info("[wechatmp] Timeout for {} {}".format(from_user, message_id))
+                    return
+
+
+                if cache[0] > 1:
+                    reply_text = cache[1][:600] + "\n【未完待续,回复任意文字以继续】" #wechatmp auto_reply length limit
+                    channel_instance.cache_dict[cache_key] = (cache[0] - 1, cache[1][600:])
+                elif cache[0] == 1:
+                    reply_text = cache[1]
+                    channel_instance.cache_dict.pop(cache_key)
+                logger.info("[wechatmp] {}:{} Do send {}".format(web.ctx.env.get('REMOTE_ADDR'), web.ctx.env.get('REMOTE_PORT'), reply_text))
+                replyPost = reply.TextMsg(from_user, to_user, reply_text).send()
+                return replyPost
+
+            elif wechatmp_msg.msg_type == 'event':
+                logger.info("[wechatmp] Event {} from {}".format(wechatmp_msg.Event, wechatmp_msg.from_user_id))
+                content = subscribe_msg()
+                replyMsg = reply.TextMsg(wechatmp_msg.from_user_id, wechatmp_msg.to_user_id, content)
+                return replyMsg.send()
+            else:
+                logger.info("暂且不处理")
+                return "success"
+        except Exception as exc:
+            logger.exception(exc)
+            return exc
+

+ 43 - 0
channel/wechatmp/common.py

@@ -0,0 +1,43 @@
+from config import conf
+import hashlib
+import textwrap
+
+
+class WeChatAPIException(Exception):
+    pass
+
+def verify_server(data):
+    try:
+        if len(data) == 0:
+            return "None"
+        signature = data.signature
+        timestamp = data.timestamp
+        nonce = data.nonce
+        echostr = data.echostr
+        token = conf().get('wechatmp_token') #请按照公众平台官网\基本配置中信息填写
+
+        data_list = [token, timestamp, nonce]
+        data_list.sort()
+        sha1 = hashlib.sha1()
+        # map(sha1.update, data_list) #python2
+        sha1.update("".join(data_list).encode('utf-8'))
+        hashcode = sha1.hexdigest()
+        print("handle/GET func: hashcode, signature: ", hashcode, signature)
+        if hashcode == signature:
+            return echostr
+        else:
+            return ""
+    except Exception as Argument:
+        return Argument
+
+def subscribe_msg():
+    msg = textwrap.dedent("""\
+                    感谢您的关注!
+                    这里是ChatGPT,可以自由对话。
+                    资源有限,回复较慢,请勿着急。
+                    支持通用表情输入。
+                    暂时不支持图片输入。
+                    支持图片输出,画字开头的问题将回复图片或链接。
+                    支持角色扮演和文字冒险两种定制模式对话。
+                    输入'#帮助' 查看详细指令。""")
+    return msg

+ 89 - 193
channel/wechatmp/wechatmp_channel.py

@@ -1,19 +1,18 @@
 # -*- coding: utf-8 -*-
+import sys
 import web
-import time
 import math
-import hashlib
-import textwrap
-from channel.chat_channel import ChatChannel
-import channel.wechatmp.reply as reply
-import channel.wechatmp.receive as receive
+import time
+import json
+import requests
+import threading
 from common.singleton import singleton
 from common.log import logger
 from config import conf
 from bridge.reply import *
 from bridge.context import *
-from plugins import *
-import traceback
+from channel.chat_channel import ChatChannel
+from channel.wechatmp.common import * 
 
 # If using SSL, uncomment the following lines, and modify the certificate path.
 # from cheroot.server import HTTPServer
@@ -22,199 +21,96 @@ import traceback
 #         certificate='/ssl/cert.pem',
 #         private_key='/ssl/cert.key')
 
-
-# from concurrent.futures import ThreadPoolExecutor
-# thread_pool = ThreadPoolExecutor(max_workers=8)
-
 @singleton
 class WechatMPChannel(ChatChannel):
-    def __init__(self):
+    def __init__(self, passive_reply = True):
         super().__init__()
-        self.cache_dict = dict()
-        self.query1 = dict()
-        self.query2 = dict()
-        self.query3 = dict()
-
+        self.passive_reply = passive_reply
+        if self.passive_reply:
+            self.cache_dict = dict()
+            self.query1 = dict()
+            self.query2 = dict()
+            self.query3 = dict()
+        else:
+            self.app_id = conf().get('wechatmp_app_id')
+            self.app_secret = conf().get('wechatmp_app_secret')
+            self.access_token = None
+            self.access_token_expires_time = 0
+            self.access_token_lock = threading.Lock()
+            self.get_access_token()
 
     def startup(self):
-        urls = (
-            '/wx', 'SubsribeAccountQuery',
-        )
+        if self.passive_reply:
+            urls = ('/wx', 'channel.wechatmp.SubscribeAccount.Query')
+        else:
+            urls = ('/wx', 'channel.wechatmp.ServiceAccount.Query')
         app = web.application(urls, globals())
         app.run()
 
 
-    def send(self, reply: Reply, context: Context):
-        reply_cnt = math.ceil(len(reply.content) / 600)
-        receiver = context["receiver"]
-        self.cache_dict[receiver] = (reply_cnt, reply.content)
-        logger.debug("[send] reply to {} saved to cache: {}".format(receiver, reply))
-
-
-def verify_server():
-    try:
-        data = web.input()
-        if len(data) == 0:
-            return "None"
-        signature = data.signature
-        timestamp = data.timestamp
-        nonce = data.nonce
-        echostr = data.echostr
-        token = conf().get('wechatmp_token') #请按照公众平台官网\基本配置中信息填写
-
-        data_list = [token, timestamp, nonce]
-        data_list.sort()
-        sha1 = hashlib.sha1()
-        # map(sha1.update, data_list) #python2
-        sha1.update("".join(data_list).encode('utf-8'))
-        hashcode = sha1.hexdigest()
-        print("handle/GET func: hashcode, signature: ", hashcode, signature)
-        if hashcode == signature:
-            return echostr
+    def wechatmp_request(self, method, url, **kwargs):
+        r = requests.request(method=method, url=url, **kwargs)
+        r.raise_for_status()
+        r.encoding = "utf-8"
+        ret = r.json()
+        if "errcode" in ret and ret["errcode"] != 0:
+            raise WeChatAPIException("{}".format(ret))
+        return ret
+
+    def get_access_token(self):
+
+        # return the access_token
+        if self.access_token:
+            if self.access_token_expires_time - time.time() > 60:
+                return self.access_token
+
+        # Get new access_token
+        # Do not request access_token in parallel! Only the last obtained is valid.
+        if self.access_token_lock.acquire(blocking=False):
+            # Wait for other threads that have previously obtained access_token to complete the request
+            # This happens every 2 hours, so it doesn't affect the experience very much
+            time.sleep(1)
+            self.access_token = None
+            url="https://api.weixin.qq.com/cgi-bin/token"
+            params={
+                "grant_type": "client_credential",
+                "appid": self.app_id,
+                "secret": self.app_secret
+            }
+            data = self.wechatmp_request(method='get', url=url, params=params)
+            self.access_token = data['access_token']
+            self.access_token_expires_time = int(time.time()) + data['expires_in']
+            logger.info("[wechatmp] access_token: {}".format(self.access_token))
+            self.access_token_lock.release()
         else:
-            return ""
-    except Exception as Argument:
-        return Argument
-
-
-# This class is instantiated once per query
-class SubsribeAccountQuery():
-
-    def GET(self):
-        return verify_server()
-
-    def POST(self):
-        channel_instance = WechatMPChannel()
-        try:
-            query_time = time.time()
-            webData = web.data()
-            # logger.debug("[wechatmp] Receive request:\n" + webData.decode("utf-8"))
-            wechat_msg = receive.parse_xml(webData)
-            if wechat_msg.msg_type == 'text':
-                from_user = wechat_msg.from_user_id
-                to_user = wechat_msg.to_user_id
-                message = wechat_msg.content.decode("utf-8")
-                message_id = wechat_msg.msg_id
-
-                logger.info("[wechatmp] {}:{} Receive post query {} {}: {}".format(web.ctx.env.get('REMOTE_ADDR'), web.ctx.env.get('REMOTE_PORT'), from_user, message_id, message))
-
-                cache_key = from_user
-                cache = channel_instance.cache_dict.get(cache_key)
-
-                reply_text = ""
-                # New request
-                if cache == None:
-                    # The first query begin, reset the cache
-                    channel_instance.cache_dict[cache_key] = (0, "")
-
-                    context = channel_instance._compose_context(ContextType.TEXT, message, isgroup=False, msg=wechat_msg)
-                    if context:
-                        # set private openai_api_key
-                        # if from_user is not changed in itchat, this can be placed at chat_channel
-                        user_data = conf().get_user_data(from_user)
-                        context['openai_api_key'] = user_data.get('openai_api_key') # None or user openai_api_key
-                        channel_instance.produce(context)
-
-
-                    channel_instance.query1[cache_key] = False
-                    channel_instance.query2[cache_key] = False
-                    channel_instance.query3[cache_key] = False
-                # Request again
-                elif cache[0] == 0 and channel_instance.query1.get(cache_key) == True and channel_instance.query2.get(cache_key) == True and channel_instance.query3.get(cache_key) == True:
-                    channel_instance.query1[cache_key] = False  #To improve waiting experience, this can be set to True.
-                    channel_instance.query2[cache_key] = False  #To improve waiting experience, this can be set to True.
-                    channel_instance.query3[cache_key] = False
-                elif cache[0] >= 1:
-                    # Skip the waiting phase
-                    channel_instance.query1[cache_key] = True
-                    channel_instance.query2[cache_key] = True
-                    channel_instance.query3[cache_key] = True
-
-
-                cache = channel_instance.cache_dict.get(cache_key)
-                if channel_instance.query1.get(cache_key) == False:
-                    # The first query from wechat official server
-                    logger.debug("[wechatmp] query1 {}".format(cache_key))
-                    channel_instance.query1[cache_key] = True
-                    cnt = 0
-                    while cache[0] == 0 and cnt < 45:
-                        cnt = cnt + 1
-                        time.sleep(0.1)
-                        cache = channel_instance.cache_dict.get(cache_key)
-                    if cnt == 45:
-                        # waiting for timeout (the POST query will be closed by wechat official server)
-                        time.sleep(5)
-                        # and do nothing
-                        return
-                    else:
-                        pass
-                elif channel_instance.query2.get(cache_key) == False:
-                    # The second query from wechat official server
-                    logger.debug("[wechatmp] query2 {}".format(cache_key))
-                    channel_instance.query2[cache_key] = True
-                    cnt = 0
-                    while cache[0] == 0 and cnt < 45:
-                        cnt = cnt + 1
-                        time.sleep(0.1)
-                        cache = channel_instance.cache_dict.get(cache_key)
-                    if cnt == 45:
-                        # waiting for timeout (the POST query will be closed by wechat official server)
-                        time.sleep(5)
-                        # and do nothing
-                        return
-                    else:
-                        pass
-                elif channel_instance.query3.get(cache_key) == False:
-                    # The third query from wechat official server
-                    logger.debug("[wechatmp] query3 {}".format(cache_key))
-                    channel_instance.query3[cache_key] = True
-                    cnt = 0
-                    while cache[0] == 0 and cnt < 45:
-                        cnt = cnt + 1
-                        time.sleep(0.1)
-                        cache = channel_instance.cache_dict.get(cache_key)
-                    if cnt == 45:
-                        # Have waiting for 3x5 seconds
-                        # return timeout message
-                        reply_text = "【正在响应中,回复任意文字尝试获取回复】"
-                        logger.info("[wechatmp] Three queries has finished For {}: {}".format(from_user, message_id))
-                        replyPost = reply.TextMsg(from_user, to_user, reply_text).send()
-                        return replyPost
-                    else:
-                        pass
-
-                if float(time.time()) - float(query_time) > 4.8:
-                    logger.info("[wechatmp] Timeout for {} {}".format(from_user, message_id))
-                    return
-
-
-                if cache[0] > 1:
-                    reply_text = cache[1][:600] + "\n【未完待续,回复任意文字以继续】" #wechatmp auto_reply length limit
-                    channel_instance.cache_dict[cache_key] = (cache[0] - 1, cache[1][600:])
-                elif cache[0] == 1:
-                    reply_text = cache[1]
-                    channel_instance.cache_dict.pop(cache_key)
-                logger.info("[wechatmp] {}:{} Do send {}".format(web.ctx.env.get('REMOTE_ADDR'), web.ctx.env.get('REMOTE_PORT'), reply_text))
-                replyPost = reply.TextMsg(from_user, to_user, reply_text).send()
-                return replyPost
-
-            elif wechat_msg.msg_type == 'event':
-                logger.info("[wechatmp] Event {} from {}".format(wechat_msg.Event, wechat_msg.from_user_id))
-                content = textwrap.dedent("""\
-                    感谢您的关注!
-                    这里是ChatGPT,可以自由对话。
-                    资源有限,回复较慢,请勿着急。
-                    支持通用表情输入。
-                    暂时不支持图片输入。
-                    支持图片输出,画字开头的问题将回复图片链接。
-                    支持角色扮演和文字冒险两种定制模式对话。
-                    输入'#帮助' 查看详细指令。""")
-                replyMsg = reply.TextMsg(wechat_msg.from_user_id, wechat_msg.to_user_id, content)
-                return replyMsg.send()
-            else:
-                logger.info("暂且不处理")
-                return "success"
-        except Exception as exc:
-            logger.exception(exc)
-            return exc
+            # Wait for token update
+            while self.access_token_lock.locked():
+                time.sleep(0.1)
+        return self.access_token
 
+    def send(self, reply: Reply, context: Context):
+        if self.passive_reply:
+            receiver = context["receiver"]
+            reply_text = reply.content
+            reply_cnt = math.ceil(len(reply_text) / 600)
+            self.cache_dict[receiver] = (reply_cnt, reply_text)
+            logger.debug("[send] reply to {} saved to cache: {}".format(receiver, reply_text))
+        else:
+            receiver = context["receiver"]
+            reply_text = reply.content
+            url="https://api.weixin.qq.com/cgi-bin/message/custom/send"
+            params = {
+                "access_token": self.get_access_token()
+            }
+            json_data = {
+                "touser": receiver,
+                "msgtype": "text",
+                "text": {"content": reply_text}
+            }
+            self.wechatmp_request(method='post', url=url, params=params, data=json.dumps(json_data, ensure_ascii=False).encode('utf8'))
+            logger.info("[send] Do send to {}: {}".format(receiver, reply_text))
+        return
+
+# Last import to avoid circular import
+import channel.wechatmp.SubscribeAccount
+import channel.wechatmp.ServiceAccount

+ 4 - 2
config.py

@@ -79,13 +79,15 @@ available_setting = {
     "wechaty_puppet_service_token": "",  # wechaty的token
 
     # wechatmp的配置
-    "wechatmp_token": "",  # 微信公众平台的Token
+    "wechatmp_token": "",       # 微信公众平台的Token
+    "wechatmp_app_id": "",      # 微信公众平台的appID
+    "wechatmp_app_secret": "",  # 微信公众平台的appsecret
 
     # chatgpt指令自定义触发词
     "clear_memory_commands": ['#清除记忆'],  # 重置会话指令,必须以#开头
 
     # channel配置
-    "channel_type": "wx", # 通道类型,支持:{wx,wxy,terminal,wechatmp}
+    "channel_type": "wx", # 通道类型,支持:{wx,wxy,terminal,wechatmp,wechatmp_service}
 
     "debug": False,  # 是否开启debug模式,开启后会打印更多日志