Browse Source

feature(rate-limit): 新增令牌桶类,用于主动限制调用gpt3.5, dalle接口频率

goldfishh 3 năm trước cách đây
mục cha
commit
3f889ab75f
2 tập tin đã thay đổi với 55 bổ sung1 xóa
  1. 10 1
      bot/chatgpt/chat_gpt_bot.py
  2. 45 0
      common/token_bucket.py

+ 10 - 1
bot/chatgpt/chat_gpt_bot.py

@@ -3,6 +3,7 @@
 from bot.bot import Bot
 from config import conf, load_config
 from common.log import logger
+from common.token_bucket import TokenBucket
 from common.expired_dict import ExpiredDict
 import openai
 import time
@@ -21,6 +22,10 @@ class ChatGPTBot(Bot):
         proxy = conf().get('proxy')
         if proxy:
             openai.proxy = proxy
+        if conf().get('rate_limit_chatgpt'):
+            self.tb4chatgpt = TokenBucket(conf().get('rate_limit_chatgpt', 20))
+        if conf().get('rate_limit_dalle'):
+            self.tb4dalle = TokenBucket(conf().get('rate_limit_dalle', 50))
 
     def reply(self, query, context=None):
         # acquire reply content
@@ -63,6 +68,8 @@ class ChatGPTBot(Bot):
         :return: {}
         '''
         try:
+            if conf().get('rate_limit_chatgpt') and not self.tb4chatgpt.get_token():
+                return {"completion_tokens": 0, "content": "提问太快啦,请休息一下再问我吧"}
             response = openai.ChatCompletion.create(
                 model= conf().get("model") or "gpt-3.5-turbo",  # 对话模型的名称
                 messages=session,
@@ -102,6 +109,8 @@ class ChatGPTBot(Bot):
 
     def create_img(self, query, retry_count=0):
         try:
+            if conf().get('rate_limit_dalle') and not self.tb4dalle.get_token():
+                return "请求太快了,请休息一下再问我吧"
             logger.info("[OPEN_AI] image_query={}".format(query))
             response = openai.Image.create(
                 prompt=query,    #图片描述
@@ -118,7 +127,7 @@ class ChatGPTBot(Bot):
                 logger.warn("[OPEN_AI] ImgCreate RateLimit exceed, 第{}次重试".format(retry_count+1))
                 return self.create_img(query, retry_count+1)
             else:
-                return "提问太快啦,请休息一下再问我吧"
+                return "请求太快啦,请休息一下再问我吧"
         except Exception as e:
             logger.exception(e)
             return None

+ 45 - 0
common/token_bucket.py

@@ -0,0 +1,45 @@
+import threading
+import time
+
+
+class TokenBucket:
+    def __init__(self, tpm, timeout=None):
+        self.capacity = int(tpm)  # 令牌桶容量
+        self.tokens = 0  # 初始令牌数为0
+        self.rate = int(tpm) / 60  # 令牌每秒生成速率
+        self.timeout = timeout  # 等待令牌超时时间
+        self.cond = threading.Condition()  # 条件变量
+        self.is_running = True
+        # 开启令牌生成线程
+        threading.Thread(target=self._generate_tokens).start()
+
+    def _generate_tokens(self):
+        """生成令牌"""
+        while self.is_running:
+            with self.cond:
+                if self.tokens < self.capacity:
+                    self.tokens += 1
+                self.cond.notify()  # 通知获取令牌的线程
+            time.sleep(1 / self.rate)
+
+    def get_token(self):
+        """获取令牌"""
+        with self.cond:
+            while self.tokens <= 0:
+                flag = self.cond.wait(self.timeout)
+                if not flag:  # 超时
+                    return False
+            self.tokens -= 1
+        return True
+
+    def close(self):
+        self.is_running = False
+
+
+if __name__ == "__main__":
+    token_bucket = TokenBucket(20, None)  # 创建一个每分钟生产20个tokens的令牌桶
+    # token_bucket = TokenBucket(20, 0.1)
+    for i in range(3):
+        if token_bucket.get_token():
+            print(f"第{i+1}次请求成功")
+    token_bucket.close()