hadadrjt commited on
Commit
4f6f363
·
1 Parent(s): 55378c8

ai: Implementing parallel request with first-success strategy.

Browse files
Files changed (1) hide show
  1. jarvis.py +28 -13
jarvis.py CHANGED
@@ -55,8 +55,6 @@ META_TAGS = os.getenv("META_TAGS")
55
 
56
  ALLOWED_EXTENSIONS = json.loads(os.getenv("ALLOWED_EXTENSIONS", "[]"))
57
 
58
- ACTIVE_CANDIDATE = None
59
-
60
  class SessionWithID(requests.Session):
61
  def __init__(sess):
62
  super().__init__()
@@ -222,13 +220,19 @@ async def fetch_response_async(host, key, model, msgs, cfg, sid):
222
  marked_item(key, LINUX_SERVER_PROVIDER_KEYS_MARKED, LINUX_SERVER_PROVIDER_KEYS_ATTEMPTS)
223
  return None
224
 
 
 
 
 
225
  async def chat_with_model_async(history, user_input, model_display, sess, custom_prompt):
226
  ensure_stop_event(sess)
227
  if not get_available_items(LINUX_SERVER_PROVIDER_KEYS, LINUX_SERVER_PROVIDER_KEYS_MARKED) or not get_available_items(LINUX_SERVER_HOSTS, LINUX_SERVER_HOSTS_ATTEMPTS):
228
  return RESPONSES["RESPONSE_3"]
229
- if not hasattr(sess, "session_id"):
230
  sess.session_id = str(uuid.uuid4())
231
  sess.stop_event = asyncio.Event()
 
 
232
  model_key = get_model_key(model_display)
233
  cfg = MODEL_CONFIG.get(model_key, DEFAULT_CONFIG)
234
  msgs = [{"role": "user", "content": u} for u, _ in history] + [{"role": "assistant", "content": a} for _, a in history if a]
@@ -238,21 +242,32 @@ async def chat_with_model_async(history, user_input, model_display, sess, custom
238
  prompt = custom_prompt or SYSTEM_PROMPT_MAPPING.get(model_key, SYSTEM_PROMPT_DEFAULT)
239
  msgs.insert(0, {"role": "system", "content": prompt})
240
  msgs.append({"role": "user", "content": user_input})
241
- global ACTIVE_CANDIDATE
242
- if ACTIVE_CANDIDATE:
243
- res = await fetch_response_async(ACTIVE_CANDIDATE[0], ACTIVE_CANDIDATE[1], model_key, msgs, cfg, sess.session_id)
244
  if res:
245
  return res
246
- ACTIVE_CANDIDATE = None
247
  keys = get_available_items(LINUX_SERVER_PROVIDER_KEYS, LINUX_SERVER_PROVIDER_KEYS_MARKED)
248
  hosts = get_available_items(LINUX_SERVER_HOSTS, LINUX_SERVER_HOSTS_ATTEMPTS)
249
  cands = [(h, k) for h in hosts for k in keys]
250
  random.shuffle(cands)
251
- for h, k in cands:
252
- res = await fetch_response_async(h, k, model_key, msgs, cfg, sess.session_id)
253
- if res:
254
- ACTIVE_CANDIDATE = (h, k)
255
- return res
 
 
 
 
 
 
 
 
 
 
 
 
256
  return RESPONSES["RESPONSE_2"]
257
 
258
  async def respond_async(multi, history, model_display, sess, custom_prompt):
@@ -287,7 +302,7 @@ async def respond_async(multi, history, model_display, sess, custom_prompt):
287
  buffer.clear()
288
  last_update = current_time
289
  yield history, gr.update(interactive=False, submit_btn=False, stop_btn=True), sess
290
- await asyncio.sleep(0.016)
291
  if buffer:
292
  history[-1][1] += "".join(buffer)
293
  yield history, gr.update(interactive=False, submit_btn=False, stop_btn=True), sess
 
55
 
56
  ALLOWED_EXTENSIONS = json.loads(os.getenv("ALLOWED_EXTENSIONS", "[]"))
57
 
 
 
58
  class SessionWithID(requests.Session):
59
  def __init__(sess):
60
  super().__init__()
 
220
  marked_item(key, LINUX_SERVER_PROVIDER_KEYS_MARKED, LINUX_SERVER_PROVIDER_KEYS_ATTEMPTS)
221
  return None
222
 
223
+ async def candidate_task(h, k, model, msgs, cfg, sid):
224
+ r = await fetch_response_async(h, k, model, msgs, cfg, sid)
225
+ return r, (h, k)
226
+
227
  async def chat_with_model_async(history, user_input, model_display, sess, custom_prompt):
228
  ensure_stop_event(sess)
229
  if not get_available_items(LINUX_SERVER_PROVIDER_KEYS, LINUX_SERVER_PROVIDER_KEYS_MARKED) or not get_available_items(LINUX_SERVER_HOSTS, LINUX_SERVER_HOSTS_ATTEMPTS):
230
  return RESPONSES["RESPONSE_3"]
231
+ if not hasattr(sess, "session_id") or not sess.session_id:
232
  sess.session_id = str(uuid.uuid4())
233
  sess.stop_event = asyncio.Event()
234
+ if not hasattr(sess, "active_candidate"):
235
+ sess.active_candidate = None
236
  model_key = get_model_key(model_display)
237
  cfg = MODEL_CONFIG.get(model_key, DEFAULT_CONFIG)
238
  msgs = [{"role": "user", "content": u} for u, _ in history] + [{"role": "assistant", "content": a} for _, a in history if a]
 
242
  prompt = custom_prompt or SYSTEM_PROMPT_MAPPING.get(model_key, SYSTEM_PROMPT_DEFAULT)
243
  msgs.insert(0, {"role": "system", "content": prompt})
244
  msgs.append({"role": "user", "content": user_input})
245
+ if sess.active_candidate:
246
+ res = await fetch_response_async(sess.active_candidate[0], sess.active_candidate[1], model_key, msgs, cfg, sess.session_id)
 
247
  if res:
248
  return res
249
+ sess.active_candidate = None
250
  keys = get_available_items(LINUX_SERVER_PROVIDER_KEYS, LINUX_SERVER_PROVIDER_KEYS_MARKED)
251
  hosts = get_available_items(LINUX_SERVER_HOSTS, LINUX_SERVER_HOSTS_ATTEMPTS)
252
  cands = [(h, k) for h in hosts for k in keys]
253
  random.shuffle(cands)
254
+ tasks = [asyncio.create_task(candidate_task(h, k, model_key, msgs, cfg, sess.session_id)) for h, k in cands]
255
+ first_success = None
256
+ candidate = None
257
+ for task in asyncio.as_completed(tasks):
258
+ try:
259
+ res, cand = await task
260
+ if res is not None:
261
+ first_success = res
262
+ candidate = cand
263
+ break
264
+ except:
265
+ continue
266
+ for t in tasks:
267
+ t.cancel()
268
+ if first_success:
269
+ sess.active_candidate = candidate
270
+ return first_success
271
  return RESPONSES["RESPONSE_2"]
272
 
273
  async def respond_async(multi, history, model_display, sess, custom_prompt):
 
302
  buffer.clear()
303
  last_update = current_time
304
  yield history, gr.update(interactive=False, submit_btn=False, stop_btn=True), sess
305
+ await asyncio.sleep(0.020)
306
  if buffer:
307
  history[-1][1] += "".join(buffer)
308
  yield history, gr.update(interactive=False, submit_btn=False, stop_btn=True), sess