infnapitoggle / app.py
charliebaby2023's picture
Update app.py
e62b07b verified
raw
history blame
18.1 kB
import gradio as gr
from huggingface_hub import HfApi, whoami, InferenceClient
from config import howManyModelsToUse,num_models,max_images,inference_timeout,MAX_SEED,thePrompt,preSetPrompt,negPreSetPrompt
from all_models import models
import asyncio
import os
import logging
from fastapi import HTTPException
import http.client
import json
import ssl
import pandas as pd
import re
from datetime import datetime
from threading import RLock
lock = RLock()
HF_TOKEN = os.environ.get("ohgoddamn") if os.environ.get("ohgoddamn") else None # If private or gated models aren't used, ENV setting is unnecessary.
token= os.environ.get("ohgoddamn") if os.environ.get("ohgoddamn") else None # If private or gated models aren't used, ENV setting is unnecessary.
stop_event = asyncio.Event()
default_models = models[:howManyModelsToUse]
api = HfApi()
user_info = whoami(token=HF_TOKEN)
username = user_info["name"]
print(f"{username}")
print(f"{username}")
print(f"{username}")
print(f"{username}")
from handle_models import load_fn,infer,gen_fn
from externalmod import gr_Interface_load, save_image, randomize_seed
def extend_choices(choices):
return choices[:num_models] + (num_models - len(choices[:num_models])) * ['NA']
def update_imgbox(choices):
choices_plus = extend_choices(choices[:num_models])
return [gr.Image(None, label=m, visible=(m!='NA')) for m in choices_plus]
def random_choices():
import random
random.seed()
return random.choices(models, k=num_models)
url = "https://api-inference.huggingface.co/models/charliebaby2023/cybrpny"
headers = { "Authorization": f"Bearer {token}"}
response = requests.get(url, headers=headers)
print(response.status_code)
print(response.text)
load_fn(models,HF_TOKEN)
#client = InferenceClient( provider="hf-inference", api_key=HF_TOKEN,)
#image = client.text_to_image( "Astronaut riding a horse", model="charliebaby2023/cybrpny",)
#print(f"{image}")
model_id = "CompVis/stable-diffusion-v1-4-original"
endpoint = f"/models/{model_id}"
# === CONFIG ===
host = "api-inference.huggingface.co"
#endpoint = "/models/charliebaby2023/cybrpny"
#token = HF_TOKEN
prompt = "a futuristic city on Mars at sunset"
# === REQUEST SETUP ===
body = json.dumps({
"inputs": prompt
})
headers = {
"Authorization": f"Bearer {token}",
"Content-Type": "application/json",
"User-Agent": "PythonRawClient/1.0"
}
# === CONNECTION ===
context = ssl.create_default_context()
conn = http.client.HTTPSConnection(host, context=context)
# === RAW REQUEST ===
print("🔸 REQUEST LINE:")
print(f"POST {endpoint} HTTP/1.1")
print(f"Host: {host}")
for key, value in headers.items():
print(f"{key}: {value}")
print(f"\n{body}\n")
# Send request
conn.request("POST", endpoint, body=body, headers=headers)
# === RAW RESPONSE ===
response = conn.getresponse()
print("🔹 STATUS:", response.status, response.reason)
print("🔹 RESPONSE HEADERS:")
for hdr in response.getheaders():
print(f"{hdr[0]}: {hdr[1]}")
print("\n🔹 RESPONSE BODY (raw):")
raw = response.read()
try:
print(raw.decode("utf-8")[:1000]) # print first 1k chars
except UnicodeDecodeError:
print("[binary data]")
def query_model(model_name,prompt):
logs = []
img_out = None
host = "api-inference.huggingface.co"
endpoint = f"/models/{model_name}"
# Prepare request
body = json.dumps({"inputs": prompt})
headers = {
"Authorization": f"Bearer {token}",
"Content-Type": "application/json",
"User-Agent": "PythonRawClient/1.0"
}
# Connect
context = ssl.create_default_context()
conn = http.client.HTTPSConnection(host, context=context)
logs.append(f"📤 POST {endpoint}")
logs.append(f"Headers: {headers}")
logs.append(f"Body: {body}\n")
try:
conn.request("POST", endpoint, body=body, headers=headers)
response = conn.getresponse()
logs.append(f"📥 Status: {response.status} {response.reason}")
logs.append("Headers:")
for k, v in response.getheaders():
logs.append(f"{k}: {v}")
raw = response.read()
try:
text = raw.decode("utf-8")
result = json.loads(text)
logs.append("\nBody:\n" + text[:1000])
except:
result = raw
logs.append("\n⚠️ Binary response.")
# === HANDLE RESPONSE ===
def show(img_bytes):
try:
img = Image.open(BytesIO(img_bytes))
return img
except Exception as e:
logs.append(f"❌ Failed to open image: {e}")
return None
if isinstance(result, dict):
if "image" in result:
logs.append("🧠 Found base64 image in 'image'")
return show(base64.b64decode(result["image"])), "\n".join(logs)
elif "url" in result or "image_url" in result:
url = result.get("url") or result.get("image_url")
logs.append(f"🌐 Found image URL: {url}")
r = requests.get(url)
return show(r.content), "\n".join(logs)
else:
logs.append("⚠️ No image found in response.")
return None, "\n".join(logs)
elif isinstance(result, bytes):
logs.append("🧾 Raw image bytes returned.")
return show(result), "\n".join(logs)
else:
logs.append("❌ Unknown response format.")
return None, "\n".join(logs)
except Exception as e:
logs.append(f"💥 Exception: {e}")
return None, "\n".join(logs)
# === GRADIO UI ===
def query_model2(model_name, prompt):
logs = []
img_out = None
try:
model = gr.Interface.load(f"models/{model_name}", token=HF_TOKEN)
logs.append(f"Prompt: {prompt}")
response = model.predict(prompt)
logs.append(f"Model response: {response}")
def get_image_from_response(response):
if isinstance(response, dict):
if "image" in response:
img_data = base64.b64decode(response["image"])
img = Image.open(BytesIO(img_data))
return img
elif "url" in response or "image_url" in response:
url = response.get("url") or response.get("image_url")
img_data = requests.get(url).content
img = Image.open(BytesIO(img_data))
return img
elif isinstance(response, bytes):
img = Image.open(BytesIO(response))
return img
return None
img_out = get_image_from_response(response)
except Exception as e:
logs.append(f"Error: {e}")
response = None
return img_out, "\n".join(logs)
#print(f"Time launched: {hms()}")
pattern = r'HTTP/1\.1" (\d{3}) \d+'
class ErrorCodeLogHandler(logging.Handler):
def __init__(self):
super().__init__()
self.last_error_code = None # Store the last error code #printed
self.model_name_pattern = r'Model\s+(\S+)' # Pattern to extract model name (adjust this regex to your needs)
def emit(self, record):
log_message = self.format(record) # Get the log message from the record
error_code = self.extract_error_code(log_message) # Extract error code
model_name = self.extract_model_name(log_message) # Extract model name
if error_code and error_code != self.last_error_code:
#print(f'Error code: {error_code} | Model: {model_name}') # #print both error code and model name
self.last_error_code = error_code # Update the last #printed error code
def extract_error_code(self, log_message):
match = re.search(pattern, log_message)
if match:
return match.group(1) # Return the current error code
return None # Return None if no match is found
def extract_model_name(self, log_message):
match = re.search(self.model_name_pattern, log_message)
if match:
return match.group(1) # Return the model name or identifier
return "Unknown model" # Return a default value if no model name is found
def debugon():
print(f"DEBUGGING MODE : ON ")
logging.basicConfig(level=logging.DEBUG, format='%(message)s')
error_handler = ErrorCodeLogHandler()
print(f"{error_handler}")
logging.getLogger().addHandler(error_handler)
def debugoff():
print(f"DEBUGGING MODE : OFF ")
logging.basicConfig(level=logging.WARNING, format='%(message)s')
error_handler = ErrorCodeLogHandler()
print(f"{error_handler}")
logging.getLogger().addHandler(error_handler)
def handle_debug_mode(selected_option):
if selected_option == "debug on":
debugon()
else:
debugoff()
def stop_all_tasks():
print("Stopping...")
stop_event.set()
with gr.Blocks(fill_width=True) as demo:
with gr.Tab(label="DEBUG"):
with gr.Row():
radio = gr.Radio(["debug on", "debug off"], value="debug off", label=" Debug mode: activated in output log", interactive=True)
radio.change(handle_debug_mode, radio, None)
with gr.Tab(str(num_models) + ' Models'):
with gr.Column(scale=2):
with gr.Group():
txt_input = gr.Textbox(label='Your prompt:', value=preSetPrompt, lines=3, autofocus=1)
neg_input = gr.Textbox(label='Negative prompt:', value=negPreSetPrompt, lines=1)
timeout = gr.Slider(label="Timeout (seconds)", minimum=5, maximum=300, value=120, step=1)
with gr.Accordion("Advanced", open=False, visible=True):
with gr.Row():
width = gr.Slider(label="Width", info="If 0, the default value is used.", maximum=1216, step=32, value=0)
height = gr.Slider(label="Height", info="If 0, the default value is used.", maximum=1216, step=32, value=0)
with gr.Row():
steps = gr.Slider(label="Number of inference steps", info="If 0, the default value is used.", maximum=100, step=1, value=0)
cfg = gr.Slider(label="Guidance scale", info="If 0, the default value is used.", maximum=30.0, step=0.1, value=0)
seed = gr.Slider(label="Seed", info="Randomize Seed if -1.", minimum=-1, maximum=MAX_SEED, step=1, value=-1)
seed_rand = gr.Button("Randomize Seed 🎲", size="sm", variant="secondary")
seed_rand.click(randomize_seed, None, [seed], queue=False)
with gr.Row():
gen_button = gr.Button(f'Generate up to {int(num_models)} images', variant='primary', scale=3, elem_classes=["butt"])
random_button = gr.Button(f'Randomize Models', variant='secondary', scale=1)
with gr.Column(scale=1):
with gr.Group():
with gr.Row():
output = [gr.Image(label=m, show_download_button=True, elem_classes=["image-monitor"],
interactive=False, width=112, height=112, show_share_button=False, format="png",
visible=True) for m in default_models]
current_models = [gr.Textbox(m, visible=False) for m in default_models]
#for m, o in zip(current_models, output):
# gen_event = gr.on(triggers=[gen_button.click, txt_input.submit], fn=gen_fn,
# inputs=[m, txt_input, neg_input, height, width, steps, cfg, seed], outputs=[o],
# concurrency_limit=None, queue=False)
for m, o in zip(current_models, output):
gen_button.click( fn=gen_fn, inputs=[m, txt_input, neg_input, height, width, steps, cfg, seed, timeout], outputs=[o],queue=False)
#concurrency_limit=None,
txt_input.submit( fn=gen_fn, inputs=[m, txt_input, neg_input, height, width, steps, cfg, seed, timeout], outputs=[o],queue=False)
with gr.Column(scale=4):
with gr.Accordion('Model selection'):
model_choice = gr.CheckboxGroup(models, label = f'Choose up to {int(num_models)} different models from the {len(models)} available!', value=models, interactive=True)
model_choice.change(update_imgbox, model_choice, output)
model_choice.change(extend_choices, model_choice, current_models)
random_button.click(random_choices, None, model_choice)
stop_button = gr.Button("Stop 🛑", variant="stop")
stop_button.click(
fn=stop_all_tasks,
inputs=[],
outputs=[]
)
demo.launch(show_api=True, max_threads=400)
'''
with gr.Blocks(fill_width=True) as demo:
with gr.Row():
gr.Markdown(f"# ({username}) you are logged in")
#model_selector = gr.CheckboxGroup(choices=model_ids,value=model_ids, label="your models", interactive=True, )
#output_box = gr.Textbox(lines=10, label="Selected Models")
#model_selector.change(fn=handle_model_selection, inputs=model_selector, outputs=output_box)
source_selector = gr.CheckboxGroup(choices=source_choices, label="Model Source", value=["Combined"], interactive=True)
output = gr.Textbox(label="Selected Model Summary")
with gr.Tab(str(num_models) + ' Models'):
with gr.Column(scale=2):
with gr.Group():
txt_input = gr.Textbox(label='Your prompt:', value=preSetPrompt, lines=3, autofocus=1)
with gr.Accordion("Advanced", open=False, visible=True):
with gr.Row():
neg_input = gr.Textbox(label='Negative prompt:', value=negPreSetPrompt, lines=1)
with gr.Row():
width = gr.Slider(label="Width", info="If 0, the default value is used.", maximum=1216, step=32, value=0)
height = gr.Slider(label="Height", info="If 0, the default value is used.", maximum=1216, step=32, value=0)
with gr.Row():
steps = gr.Slider(label="Number of inference steps", info="If 0, the default value is used.", maximum=100, step=1, value=0)
cfg = gr.Slider(label="Guidance scale", info="If 0, the default value is used.", maximum=30.0, step=0.1, value=0)
seed = gr.Slider(label="Seed", info="Randomize Seed if -1.", minimum=-1, maximum=MAX_SEED, step=1, value=-1)
seed_rand = gr.Button("Randomize Seed 🎲", size="sm", variant="secondary")
seed_rand.click(randomize_seed, None, [seed], queue=False)
with gr.Row():
gen_button = gr.Button(f'Generate up to {int(num_models)} images', variant='primary', scale=3, elem_classes=["butt"])
random_button = gr.Button(f'Randomize Models', variant='secondary', scale=1)
with gr.Column(scale=1):
with gr.Group():
with gr.Row():
output = [gr.Image(label=m, show_download_button=True, elem_classes=["image-monitor"],
interactive=False, width=112, height=112, show_share_button=False, format="png",
visible=True) for m in default_models]
current_models = [gr.Textbox(m, visible=False) for m in default_models]
for m, o in zip(current_models, output):
gen_event = gr.on(triggers=[gen_button.click, txt_input.submit], fn=gen_fn,
inputs=[m, txt_input, neg_input, height, width, steps, cfg, seed], outputs=[o],
concurrency_limit=None, queue=False)
with gr.Column(scale=4):
with gr.Accordion('Model selection'):
#model_choice = gr.CheckboxGroup(models, label = f'Choose up to {int(num_models)} different models from the {len(models)} available!', value=default_models, interactive=True)
#model_choice.change(update_imgbox, model_choice, output)
#model_choice.change(extend_choices, model_choice, current_models)
model_choice = gr.CheckboxGroup(choices=combined_models, label="Models", value=combined_models[:20], interactive=True)
source_selector.change(update_model_choice, source_selector, model_choice)
model_choice.change(handle_model_selection, model_choice, output)
model_choice.change(update_imgbox, model_choice, output)
model_choice.change(extend_choices, model_choice, current_models)
random_button.click(random_choices, None, model_choice)
'''
'''
# --- Step 2: Fetch user's Spaces
spaces = list(api.list_spaces(author=username, token=HF_TOKEN))
space_df = pd.DataFrame([{"Space Name": f"<a href='#' data-space='{space.id}'>{space.id.split('/')[-1]}</a>",
"Last Modified": space.lastModified,} for space in spaces])
def load_space_files(evt: gr.SelectData):
clicked_html = evt.value
space_id = clicked_html.split("data-space='")[1].split("'")[0]
files = api.list_repo_files(repo_id=space_id, repo_type="space", token=HF_TOKEN)
file_df = pd.DataFrame([{ "File": f"<a href='https://huggingface.co/spaces/{username}/{space_id.split('/')[-1]}/edit/main/{file}' target='_blank'>{file}</a>"
} for file in files])
return file_df
# --- Step 4: Build Gradio interface
gr.Markdown(f"# Hugging Face Spaces for `{username}`")
with gr.Row():
left_df = gr.Dataframe(value=space_df, label="Your Spaces (click a name)",
interactive=False, datatype="str", max_rows=len(space_df), wrap=True )
right_df = gr.Dataframe( value=pd.DataFrame(columns=["File"]),
label="Files in Selected Space", interactive=False, wrap=True )
left_df.select(fn=load_space_files, outputs=right_df)
'''