import os
from threading import Thread
from typing import Iterator

import gradio as gr
import spaces
import torch
from transformers import AutoModelForCausalLM, AutoTokenizer, TextIteratorStreamer

MAX_MAX_NEW_TOKENS = 2048
DEFAULT_MAX_NEW_TOKENS = 1024
MAX_INPUT_TOKEN_LENGTH = int(os.getenv("MAX_INPUT_TOKEN_LENGTH", "4096"))

DESCRIPTION = """\
# Nekochu/Luminia-13B-v3
This Space demonstrates model Nekochu/Luminia-13B-v3 by Nekochu, a Llama 2 model with 13B parameters fine-tuned for SD gen prompt 
"""

LICENSE = """
<p/>
---.
"""

models_cache = {}

def load_model(model_id):
    if model_id not in models_cache:
        model = AutoModelForCausalLM.from_pretrained(model_id, device_map="auto", load_in_4bit=True)
        tokenizer = AutoTokenizer.from_pretrained(model_id)
        tokenizer.use_default_system_prompt = False
        models_cache[model_id] = (model, tokenizer)
    return models_cache[model_id]

if not torch.cuda.is_available():
    DESCRIPTION += "\n<p>Running on CPU 🥶 This demo does not work on CPU.</p>"

if torch.cuda.is_available():
    model_id = "Nekochu/Luminia-13B-v3"
    model, tokenizer = load_model(model_id)


@spaces.GPU(duration=120)
def generate(
    message: str,
    chat_history: list[tuple[str, str]],
    system_prompt: str,
    model_id: str = "Nekochu/Luminia-13B-v3",
    max_new_tokens: int = 1024,
    temperature: float = 0.6,
    top_p: float = 0.9,
    top_k: int = 50,
    repetition_penalty: float = 1.2,
) -> Iterator[str]:
    model, tokenizer = load_model(model_id)
    conversation = []
    if system_prompt:
        conversation.append({"role": "system", "content": system_prompt})
    for user, assistant in chat_history:
        conversation.extend([{"role": "user", "content": user}, {"role": "assistant", "content": assistant}])
    conversation.append({"role": "user", "content": message})

    input_ids = tokenizer.apply_chat_template(conversation, return_tensors="pt")
    if input_ids.shape[1] > MAX_INPUT_TOKEN_LENGTH:
        input_ids = input_ids[:, -MAX_INPUT_TOKEN_LENGTH:]
        gr.Warning(f"Trimmed input from conversation as it was longer than {MAX_INPUT_TOKEN_LENGTH} tokens.")
    input_ids = input_ids.to(model.device)

    streamer = TextIteratorStreamer(tokenizer, timeout=10.0, skip_prompt=True, skip_special_tokens=True)
    generate_kwargs = dict(
        {"input_ids": input_ids},
        streamer=streamer,
        max_new_tokens=max_new_tokens,
        do_sample=True,
        top_p=top_p,
        top_k=top_k,
        temperature=temperature,
        num_beams=1,
        repetition_penalty=repetition_penalty,
    )
    t = Thread(target=model.generate, kwargs=generate_kwargs)
    t.start()

    outputs = []
    for text in streamer:
        outputs.append(text)
        yield "".join(outputs)


chat_interface = gr.ChatInterface(
    fn=generate,
    additional_inputs=[
        gr.Textbox(label="System prompt", lines=6),
        gr.Textbox(label="Model ID", value="Nekochu/Luminia-13B-v3", placeholder="Enter a model ID here, e.g. Nekochu/Llama-2-13B-German-ORPO"),
        gr.Slider(
            label="Max new tokens",
            minimum=1,
            maximum=MAX_MAX_NEW_TOKENS,
            step=1,
            value=DEFAULT_MAX_NEW_TOKENS,
        ),
        gr.Slider(
            label="Temperature",
            minimum=0.1,
            maximum=4.0,
            step=0.1,
            value=0.6,
        ),
        gr.Slider(
            label="Top-p (nucleus sampling)",
            minimum=0.05,
            maximum=1.0,
            step=0.05,
            value=0.9,
        ),
        gr.Slider(
            label="Top-k",
            minimum=1,
            maximum=1000,
            step=1,
            value=50,
        ),
        gr.Slider(
            label="Repetition penalty",
            minimum=1.0,
            maximum=2.0,
            step=0.05,
            value=1.2,
        ),
    ],
    stop_btn=None,
    examples=[
        ["### Instruction: Create stable diffusion metadata based on the given english description. Luminia ### Input: favorites and popular SFW ### Response:"],
        ["### Instruction: Provide tips on stable diffusion to optimize low token prompts and enhance quality include prompt example. ### Response:"],
    ],
)

with gr.Blocks(css="style.css") as demo:
   gr.Markdown(DESCRIPTION)
   with gr.Row():
       gr.DuplicateButton(value="GPU Ver", elem_id="duplicate-button")
       gr.HTML("""<a href="https://huggingface.co/spaces/Nekochu/Luminia-13B_SD_Prompt/tree/Luminia-13B-v3-GGUF" style="margin:0 0 0 8px; padding:2px 8px; border:1px solid; border-radius:4px; text-decoration:none; font-size:0.9em;">or clone only the GGUF branch for free CPU Ver</a>""")
   chat_interface.render()
   gr.Markdown(LICENSE)

if __name__ == "__main__":
    demo.queue(max_size=20).launch()