Spaces:
Running
Running
import gradio as gr | |
from huggingface_hub import HfApi, whoami, InferenceClient | |
from config import howManyModelsToUse,num_models,max_images,inference_timeout,MAX_SEED,thePrompt,preSetPrompt,negPreSetPrompt | |
from all_models import models | |
import asyncio | |
import os | |
import logging | |
from fastapi import HTTPException | |
import http.client | |
import json | |
import ssl | |
import pandas as pd | |
import re | |
from datetime import datetime | |
from threading import RLock | |
lock = RLock() | |
HF_TOKEN = os.environ.get("ohgoddamn") if os.environ.get("ohgoddamn") else None # If private or gated models aren't used, ENV setting is unnecessary. | |
token= os.environ.get("ohgoddamn") if os.environ.get("ohgoddamn") else None # If private or gated models aren't used, ENV setting is unnecessary. | |
stop_event = asyncio.Event() | |
default_models = models[:howManyModelsToUse] | |
api = HfApi() | |
user_info = whoami(token=HF_TOKEN) | |
username = user_info["name"] | |
print(f"{username}") | |
print(f"{username}") | |
print(f"{username}") | |
print(f"{username}") | |
from handle_models import load_fn,infer,gen_fn | |
from externalmod import gr_Interface_load, save_image, randomize_seed | |
def extend_choices(choices): | |
return choices[:num_models] + (num_models - len(choices[:num_models])) * ['NA'] | |
def update_imgbox(choices): | |
choices_plus = extend_choices(choices[:num_models]) | |
return [gr.Image(None, label=m, visible=(m!='NA')) for m in choices_plus] | |
def random_choices(): | |
import random | |
random.seed() | |
return random.choices(models, k=num_models) | |
url = "https://api-inference.huggingface.co/models/charliebaby2023/cybrpny" | |
headers = { "Authorization": f"Bearer {token}"} | |
response = requests.get(url, headers=headers) | |
print(response.status_code) | |
print(response.text) | |
load_fn(models,HF_TOKEN) | |
#client = InferenceClient( provider="hf-inference", api_key=HF_TOKEN,) | |
#image = client.text_to_image( "Astronaut riding a horse", model="charliebaby2023/cybrpny",) | |
#print(f"{image}") | |
model_id = "CompVis/stable-diffusion-v1-4-original" | |
endpoint = f"/models/{model_id}" | |
# === CONFIG === | |
host = "api-inference.huggingface.co" | |
#endpoint = "/models/charliebaby2023/cybrpny" | |
#token = HF_TOKEN | |
prompt = "a futuristic city on Mars at sunset" | |
# === REQUEST SETUP === | |
body = json.dumps({ | |
"inputs": prompt | |
}) | |
headers = { | |
"Authorization": f"Bearer {token}", | |
"Content-Type": "application/json", | |
"User-Agent": "PythonRawClient/1.0" | |
} | |
# === CONNECTION === | |
context = ssl.create_default_context() | |
conn = http.client.HTTPSConnection(host, context=context) | |
# === RAW REQUEST === | |
print("🔸 REQUEST LINE:") | |
print(f"POST {endpoint} HTTP/1.1") | |
print(f"Host: {host}") | |
for key, value in headers.items(): | |
print(f"{key}: {value}") | |
print(f"\n{body}\n") | |
# Send request | |
conn.request("POST", endpoint, body=body, headers=headers) | |
# === RAW RESPONSE === | |
response = conn.getresponse() | |
print("🔹 STATUS:", response.status, response.reason) | |
print("🔹 RESPONSE HEADERS:") | |
for hdr in response.getheaders(): | |
print(f"{hdr[0]}: {hdr[1]}") | |
print("\n🔹 RESPONSE BODY (raw):") | |
raw = response.read() | |
try: | |
print(raw.decode("utf-8")[:1000]) # print first 1k chars | |
except UnicodeDecodeError: | |
print("[binary data]") | |
def query_model(model_name,prompt): | |
logs = [] | |
img_out = None | |
host = "api-inference.huggingface.co" | |
endpoint = f"/models/{model_name}" | |
# Prepare request | |
body = json.dumps({"inputs": prompt}) | |
headers = { | |
"Authorization": f"Bearer {token}", | |
"Content-Type": "application/json", | |
"User-Agent": "PythonRawClient/1.0" | |
} | |
# Connect | |
context = ssl.create_default_context() | |
conn = http.client.HTTPSConnection(host, context=context) | |
logs.append(f"📤 POST {endpoint}") | |
logs.append(f"Headers: {headers}") | |
logs.append(f"Body: {body}\n") | |
try: | |
conn.request("POST", endpoint, body=body, headers=headers) | |
response = conn.getresponse() | |
logs.append(f"📥 Status: {response.status} {response.reason}") | |
logs.append("Headers:") | |
for k, v in response.getheaders(): | |
logs.append(f"{k}: {v}") | |
raw = response.read() | |
try: | |
text = raw.decode("utf-8") | |
result = json.loads(text) | |
logs.append("\nBody:\n" + text[:1000]) | |
except: | |
result = raw | |
logs.append("\n⚠️ Binary response.") | |
# === HANDLE RESPONSE === | |
def show(img_bytes): | |
try: | |
img = Image.open(BytesIO(img_bytes)) | |
return img | |
except Exception as e: | |
logs.append(f"❌ Failed to open image: {e}") | |
return None | |
if isinstance(result, dict): | |
if "image" in result: | |
logs.append("🧠 Found base64 image in 'image'") | |
return show(base64.b64decode(result["image"])), "\n".join(logs) | |
elif "url" in result or "image_url" in result: | |
url = result.get("url") or result.get("image_url") | |
logs.append(f"🌐 Found image URL: {url}") | |
r = requests.get(url) | |
return show(r.content), "\n".join(logs) | |
else: | |
logs.append("⚠️ No image found in response.") | |
return None, "\n".join(logs) | |
elif isinstance(result, bytes): | |
logs.append("🧾 Raw image bytes returned.") | |
return show(result), "\n".join(logs) | |
else: | |
logs.append("❌ Unknown response format.") | |
return None, "\n".join(logs) | |
except Exception as e: | |
logs.append(f"💥 Exception: {e}") | |
return None, "\n".join(logs) | |
# === GRADIO UI === | |
def query_model2(model_name, prompt): | |
logs = [] | |
img_out = None | |
try: | |
model = gr.Interface.load(f"models/{model_name}", token=HF_TOKEN) | |
logs.append(f"Prompt: {prompt}") | |
response = model.predict(prompt) | |
logs.append(f"Model response: {response}") | |
def get_image_from_response(response): | |
if isinstance(response, dict): | |
if "image" in response: | |
img_data = base64.b64decode(response["image"]) | |
img = Image.open(BytesIO(img_data)) | |
return img | |
elif "url" in response or "image_url" in response: | |
url = response.get("url") or response.get("image_url") | |
img_data = requests.get(url).content | |
img = Image.open(BytesIO(img_data)) | |
return img | |
elif isinstance(response, bytes): | |
img = Image.open(BytesIO(response)) | |
return img | |
return None | |
img_out = get_image_from_response(response) | |
except Exception as e: | |
logs.append(f"Error: {e}") | |
response = None | |
return img_out, "\n".join(logs) | |
#print(f"Time launched: {hms()}") | |
pattern = r'HTTP/1\.1" (\d{3}) \d+' | |
class ErrorCodeLogHandler(logging.Handler): | |
def __init__(self): | |
super().__init__() | |
self.last_error_code = None # Store the last error code #printed | |
self.model_name_pattern = r'Model\s+(\S+)' # Pattern to extract model name (adjust this regex to your needs) | |
def emit(self, record): | |
log_message = self.format(record) # Get the log message from the record | |
error_code = self.extract_error_code(log_message) # Extract error code | |
model_name = self.extract_model_name(log_message) # Extract model name | |
if error_code and error_code != self.last_error_code: | |
#print(f'Error code: {error_code} | Model: {model_name}') # #print both error code and model name | |
self.last_error_code = error_code # Update the last #printed error code | |
def extract_error_code(self, log_message): | |
match = re.search(pattern, log_message) | |
if match: | |
return match.group(1) # Return the current error code | |
return None # Return None if no match is found | |
def extract_model_name(self, log_message): | |
match = re.search(self.model_name_pattern, log_message) | |
if match: | |
return match.group(1) # Return the model name or identifier | |
return "Unknown model" # Return a default value if no model name is found | |
def debugon(): | |
print(f"DEBUGGING MODE : ON ") | |
logging.basicConfig(level=logging.DEBUG, format='%(message)s') | |
error_handler = ErrorCodeLogHandler() | |
print(f"{error_handler}") | |
logging.getLogger().addHandler(error_handler) | |
def debugoff(): | |
print(f"DEBUGGING MODE : OFF ") | |
logging.basicConfig(level=logging.WARNING, format='%(message)s') | |
error_handler = ErrorCodeLogHandler() | |
print(f"{error_handler}") | |
logging.getLogger().addHandler(error_handler) | |
def handle_debug_mode(selected_option): | |
if selected_option == "debug on": | |
debugon() | |
else: | |
debugoff() | |
def stop_all_tasks(): | |
print("Stopping...") | |
stop_event.set() | |
with gr.Blocks(fill_width=True) as demo: | |
with gr.Tab(label="DEBUG"): | |
with gr.Row(): | |
radio = gr.Radio(["debug on", "debug off"], value="debug off", label=" Debug mode: activated in output log", interactive=True) | |
radio.change(handle_debug_mode, radio, None) | |
with gr.Tab(str(num_models) + ' Models'): | |
with gr.Column(scale=2): | |
with gr.Group(): | |
txt_input = gr.Textbox(label='Your prompt:', value=preSetPrompt, lines=3, autofocus=1) | |
neg_input = gr.Textbox(label='Negative prompt:', value=negPreSetPrompt, lines=1) | |
timeout = gr.Slider(label="Timeout (seconds)", minimum=5, maximum=300, value=120, step=1) | |
with gr.Accordion("Advanced", open=False, visible=True): | |
with gr.Row(): | |
width = gr.Slider(label="Width", info="If 0, the default value is used.", maximum=1216, step=32, value=0) | |
height = gr.Slider(label="Height", info="If 0, the default value is used.", maximum=1216, step=32, value=0) | |
with gr.Row(): | |
steps = gr.Slider(label="Number of inference steps", info="If 0, the default value is used.", maximum=100, step=1, value=0) | |
cfg = gr.Slider(label="Guidance scale", info="If 0, the default value is used.", maximum=30.0, step=0.1, value=0) | |
seed = gr.Slider(label="Seed", info="Randomize Seed if -1.", minimum=-1, maximum=MAX_SEED, step=1, value=-1) | |
seed_rand = gr.Button("Randomize Seed 🎲", size="sm", variant="secondary") | |
seed_rand.click(randomize_seed, None, [seed], queue=False) | |
with gr.Row(): | |
gen_button = gr.Button(f'Generate up to {int(num_models)} images', variant='primary', scale=3, elem_classes=["butt"]) | |
random_button = gr.Button(f'Randomize Models', variant='secondary', scale=1) | |
with gr.Column(scale=1): | |
with gr.Group(): | |
with gr.Row(): | |
output = [gr.Image(label=m, show_download_button=True, elem_classes=["image-monitor"], | |
interactive=False, width=112, height=112, show_share_button=False, format="png", | |
visible=True) for m in default_models] | |
current_models = [gr.Textbox(m, visible=False) for m in default_models] | |
#for m, o in zip(current_models, output): | |
# gen_event = gr.on(triggers=[gen_button.click, txt_input.submit], fn=gen_fn, | |
# inputs=[m, txt_input, neg_input, height, width, steps, cfg, seed], outputs=[o], | |
# concurrency_limit=None, queue=False) | |
for m, o in zip(current_models, output): | |
gen_button.click( fn=gen_fn, inputs=[m, txt_input, neg_input, height, width, steps, cfg, seed, timeout], outputs=[o],queue=False) | |
#concurrency_limit=None, | |
txt_input.submit( fn=gen_fn, inputs=[m, txt_input, neg_input, height, width, steps, cfg, seed, timeout], outputs=[o],queue=False) | |
with gr.Column(scale=4): | |
with gr.Accordion('Model selection'): | |
model_choice = gr.CheckboxGroup(models, label = f'Choose up to {int(num_models)} different models from the {len(models)} available!', value=models, interactive=True) | |
model_choice.change(update_imgbox, model_choice, output) | |
model_choice.change(extend_choices, model_choice, current_models) | |
random_button.click(random_choices, None, model_choice) | |
stop_button = gr.Button("Stop 🛑", variant="stop") | |
stop_button.click( | |
fn=stop_all_tasks, | |
inputs=[], | |
outputs=[] | |
) | |
demo.launch(show_api=True, max_threads=400) | |
''' | |
with gr.Blocks(fill_width=True) as demo: | |
with gr.Row(): | |
gr.Markdown(f"# ({username}) you are logged in") | |
#model_selector = gr.CheckboxGroup(choices=model_ids,value=model_ids, label="your models", interactive=True, ) | |
#output_box = gr.Textbox(lines=10, label="Selected Models") | |
#model_selector.change(fn=handle_model_selection, inputs=model_selector, outputs=output_box) | |
source_selector = gr.CheckboxGroup(choices=source_choices, label="Model Source", value=["Combined"], interactive=True) | |
output = gr.Textbox(label="Selected Model Summary") | |
with gr.Tab(str(num_models) + ' Models'): | |
with gr.Column(scale=2): | |
with gr.Group(): | |
txt_input = gr.Textbox(label='Your prompt:', value=preSetPrompt, lines=3, autofocus=1) | |
with gr.Accordion("Advanced", open=False, visible=True): | |
with gr.Row(): | |
neg_input = gr.Textbox(label='Negative prompt:', value=negPreSetPrompt, lines=1) | |
with gr.Row(): | |
width = gr.Slider(label="Width", info="If 0, the default value is used.", maximum=1216, step=32, value=0) | |
height = gr.Slider(label="Height", info="If 0, the default value is used.", maximum=1216, step=32, value=0) | |
with gr.Row(): | |
steps = gr.Slider(label="Number of inference steps", info="If 0, the default value is used.", maximum=100, step=1, value=0) | |
cfg = gr.Slider(label="Guidance scale", info="If 0, the default value is used.", maximum=30.0, step=0.1, value=0) | |
seed = gr.Slider(label="Seed", info="Randomize Seed if -1.", minimum=-1, maximum=MAX_SEED, step=1, value=-1) | |
seed_rand = gr.Button("Randomize Seed 🎲", size="sm", variant="secondary") | |
seed_rand.click(randomize_seed, None, [seed], queue=False) | |
with gr.Row(): | |
gen_button = gr.Button(f'Generate up to {int(num_models)} images', variant='primary', scale=3, elem_classes=["butt"]) | |
random_button = gr.Button(f'Randomize Models', variant='secondary', scale=1) | |
with gr.Column(scale=1): | |
with gr.Group(): | |
with gr.Row(): | |
output = [gr.Image(label=m, show_download_button=True, elem_classes=["image-monitor"], | |
interactive=False, width=112, height=112, show_share_button=False, format="png", | |
visible=True) for m in default_models] | |
current_models = [gr.Textbox(m, visible=False) for m in default_models] | |
for m, o in zip(current_models, output): | |
gen_event = gr.on(triggers=[gen_button.click, txt_input.submit], fn=gen_fn, | |
inputs=[m, txt_input, neg_input, height, width, steps, cfg, seed], outputs=[o], | |
concurrency_limit=None, queue=False) | |
with gr.Column(scale=4): | |
with gr.Accordion('Model selection'): | |
#model_choice = gr.CheckboxGroup(models, label = f'Choose up to {int(num_models)} different models from the {len(models)} available!', value=default_models, interactive=True) | |
#model_choice.change(update_imgbox, model_choice, output) | |
#model_choice.change(extend_choices, model_choice, current_models) | |
model_choice = gr.CheckboxGroup(choices=combined_models, label="Models", value=combined_models[:20], interactive=True) | |
source_selector.change(update_model_choice, source_selector, model_choice) | |
model_choice.change(handle_model_selection, model_choice, output) | |
model_choice.change(update_imgbox, model_choice, output) | |
model_choice.change(extend_choices, model_choice, current_models) | |
random_button.click(random_choices, None, model_choice) | |
''' | |
''' | |
# --- Step 2: Fetch user's Spaces | |
spaces = list(api.list_spaces(author=username, token=HF_TOKEN)) | |
space_df = pd.DataFrame([{"Space Name": f"<a href='#' data-space='{space.id}'>{space.id.split('/')[-1]}</a>", | |
"Last Modified": space.lastModified,} for space in spaces]) | |
def load_space_files(evt: gr.SelectData): | |
clicked_html = evt.value | |
space_id = clicked_html.split("data-space='")[1].split("'")[0] | |
files = api.list_repo_files(repo_id=space_id, repo_type="space", token=HF_TOKEN) | |
file_df = pd.DataFrame([{ "File": f"<a href='https://huggingface.co/spaces/{username}/{space_id.split('/')[-1]}/edit/main/{file}' target='_blank'>{file}</a>" | |
} for file in files]) | |
return file_df | |
# --- Step 4: Build Gradio interface | |
gr.Markdown(f"# Hugging Face Spaces for `{username}`") | |
with gr.Row(): | |
left_df = gr.Dataframe(value=space_df, label="Your Spaces (click a name)", | |
interactive=False, datatype="str", max_rows=len(space_df), wrap=True ) | |
right_df = gr.Dataframe( value=pd.DataFrame(columns=["File"]), | |
label="Files in Selected Space", interactive=False, wrap=True ) | |
left_df.select(fn=load_space_files, outputs=right_df) | |
''' |