import logging import os import re import shelve import shutil import subprocess import tempfile from pathlib import Path import gradio as gr import requests from bs4 import BeautifulSoup from dotenv import load_dotenv # Import functions from your scripts (assuming they are structured appropriately) # It's often better to refactor scripts into functions for easier import try: from huggingface_hub import InferenceClient from src.create_presentation import ( DEFAULT_LLM_MODEL, DEFAULT_PRESENTATION_PROMPT_TEMPLATE, generate_presentation_with_llm, ) from src.create_video import ( cleanup_temp_files, concatenate_clips, convert_pdf_to_images, create_video_clips, find_audio_files, ) from src.transcription_to_audio import VOICE_ID, text_to_speech except ImportError as e: print(f"Error importing script functions: {e}") print("Please ensure scripts are in the 'src' directory and structured correctly.") exit(1) load_dotenv() # --- Configuration & Setup --- logging.basicConfig( level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s" ) logger = logging.getLogger(__name__) HF_API_KEY = os.getenv("HF_API_KEY") LLM_MODEL = os.getenv("LLM_MODEL", DEFAULT_LLM_MODEL) PRESENTATION_PROMPT = os.getenv( "PRESENTATION_PROMPT", DEFAULT_PRESENTATION_PROMPT_TEMPLATE ) CACHE_DIR = ".cache" # For TTS caching URL_CACHE_DIR = ".url_cache" URL_CACHE_FILE = os.path.join(URL_CACHE_DIR, "presentations_cache") TEMPLATE_DIR = os.getenv("TEMPLATE_DIR", "app/template") # Initialize clients (do this once if possible, or manage carefully in functions) try: if HF_API_KEY: hf_client = InferenceClient(token=HF_API_KEY, provider="cohere") else: logger.warning("HF_API_KEY not found. LLM generation will fail.") hf_client = None except Exception as e: logger.error(f"Failed to initialize Hugging Face client: {e}") hf_client = None # --- Helper Functions --- def fetch_webpage_content(url): """Fetches and extracts basic text content from a webpage.""" logger.info(f"Fetching content from: {url}") try: response = requests.get(url, timeout=15) response.raise_for_status() # Raise an exception for bad status codes soup = BeautifulSoup(response.text, "html.parser") # Basic text extraction (can be improved significantly) paragraphs = soup.find_all("p") headings = soup.find_all(["h1", "h2", "h3", "h4", "h5", "h6"]) list_items = soup.find_all("li") content = ( "\n".join([h.get_text() for h in headings]) + "\n\n" + "\n".join([p.get_text() for p in paragraphs]) + "\n\n" + "\n".join(["- " + li.get_text() for li in list_items]) ) # Simple cleanup content = re.sub(r"\s\s+", " ", content).strip() logger.info( f"Successfully fetched and parsed content (length: {len(content)})." ) return content except requests.exceptions.RequestException as e: logger.error(f"Error fetching URL {url}: {e}") return None except Exception as e: logger.error(f"Error parsing URL {url}: {e}") return None def parse_presentation_markdown(markdown_content): """Splits presentation markdown into slides with content and notes.""" slides = [] slide_parts = re.split(r"\n\n---\n\n", markdown_content) for i, part in enumerate(slide_parts): if "???" in part: content, notes = part.split("???", 1) slides.append({"id": i, "content": content.strip(), "notes": notes.strip()}) else: # Handle slides without notes (like title slide maybe) slides.append( { "id": i, "content": part.strip(), "notes": "", # Add empty notes field } ) logger.info(f"Parsed {len(slides)} slides from markdown.") return slides def reconstruct_presentation_markdown(slides_data): """Reconstructs the markdown string from slide data.""" full_md = [] for slide in slides_data: slide_md = slide["content"] if slide[ "notes" ]: # Only add notes separator if notes exist and are not just whitespace slide_md += f"\n\n???\n{slide['notes'].strip()}" full_md.append(slide_md.strip()) # Ensure each slide part is stripped return "\n\n---\n\n".join(full_md) def generate_pdf_from_markdown(markdown_file_path, output_pdf_path): """Generates a PDF from a Markdown file using bs export + decktape.""" logger.info(f"Attempting PDF gen: {markdown_file_path} -> {output_pdf_path}") working_dir = os.path.dirname(markdown_file_path) markdown_filename = os.path.basename(markdown_file_path) html_output_dir_name = "bs_html_output" html_output_dir_abs = os.path.join(working_dir, html_output_dir_name) expected_html_filename = os.path.splitext(markdown_filename)[0] + ".html" generated_html_path_abs = os.path.join(html_output_dir_abs, expected_html_filename) pdf_gen_success = False # ---- Step 1: Generate HTML using bs export ---- try: Path(html_output_dir_abs).mkdir(parents=True, exist_ok=True) export_command = ["bs", "export", markdown_filename, "-o", html_output_dir_name] logger.info(f"Running: {' '.join(export_command)} in CWD: {working_dir}") export_result = subprocess.run( export_command, cwd=working_dir, capture_output=True, text=True, check=True, timeout=60, ) logger.info("Backslide (bs export) OK.") logger.debug(f"bs export stdout:\n{export_result.stdout}") logger.debug(f"bs export stderr:\n{export_result.stderr}") if not os.path.exists(generated_html_path_abs): logger.error(f"Expected HTML not found: {generated_html_path_abs}") try: files_in_dir = os.listdir(html_output_dir_abs) logger.error(f"Files in {html_output_dir_abs}: {files_in_dir}") except FileNotFoundError: logger.error( f"HTML output directory {html_output_dir_abs} not found after bs run." ) raise FileNotFoundError( f"Generated HTML not found: {generated_html_path_abs}" ) except FileNotFoundError: logger.error( "`bs` command not found. Install backslide (`npm install -g backslide`)." ) raise gr.Error("HTML generation tool (backslide/bs) not found.") except subprocess.CalledProcessError as e: logger.error(f"Backslide (bs export) failed (code {e.returncode}).") logger.error(f"bs stderr:\n{e.stderr}") raise gr.Error(f"Backslide HTML failed: {e.stderr[:500]}...") except subprocess.TimeoutExpired: logger.error("Backslide (bs export) timed out.") raise gr.Error("HTML generation timed out (backslide).") except Exception as e: logger.error(f"Unexpected error during bs export: {e}", exc_info=True) raise gr.Error(f"Unexpected error during HTML generation: {e}") # ---- Step 2: Generate PDF from HTML using decktape ---- try: Path(output_pdf_path).parent.mkdir(parents=True, exist_ok=True) html_file_url = Path(generated_html_path_abs).as_uri() decktape_command = [ "decktape", "-p", "1000", "--chrome-arg=--allow-running-insecure-content", "--chrome-arg=--disable-web-security", "--chrome-arg=--no-sandbox", html_file_url, str(output_pdf_path), ] logger.info(f"Running PDF conversion: {' '.join(decktape_command)}") decktape_result = subprocess.run( decktape_command, capture_output=True, text=True, check=True, timeout=120, ) logger.info("Decktape command executed successfully.") logger.debug(f"decktape stdout:\n{decktape_result.stdout}") logger.debug(f"decktape stderr:\n{decktape_result.stderr}") if os.path.exists(output_pdf_path): logger.info(f"PDF generated successfully: {output_pdf_path}") pdf_gen_success = True return output_pdf_path else: logger.error("Decktape command finished but output PDF not found.") raise gr.Error("Decktape finished, but the PDF file was not created.") except FileNotFoundError: logger.error( "`decktape` command not found. Install decktape (`npm install -g decktape`)." ) raise gr.Error("PDF generation tool (decktape) not found.") except subprocess.CalledProcessError as e: logger.error(f"Decktape command failed (code {e.returncode}).") logger.error(f"decktape stderr:\n{e.stderr}") raise gr.Error(f"Decktape PDF failed: {e.stderr[:500]}...") except subprocess.TimeoutExpired: logger.error("Decktape command timed out.") raise gr.Error("PDF generation timed out (decktape).") except Exception as e: logger.error( f"Unexpected error during decktape PDF generation: {e}", exc_info=True ) raise gr.Error(f"Unexpected error during PDF generation: {e}") finally: # --- Cleanup HTML output directory --- if os.path.exists(html_output_dir_abs): try: shutil.rmtree(html_output_dir_abs) logger.info(f"Cleaned up HTML temp dir: {html_output_dir_abs}") except Exception as cleanup_e: logger.warning( f"Could not cleanup HTML dir {html_output_dir_abs}: {cleanup_e}" ) # Log final status if pdf_gen_success: logger.info(f"PDF generation process completed for {output_pdf_path}.") else: logger.error(f"PDF generation process failed for {output_pdf_path}.") # --- Helper Function to Read CSS --- def load_css(css_filename="style.scss"): """Loads CSS content from the template directory.""" css_path = os.path.join(TEMPLATE_DIR, css_filename) try: with open(css_path, "r", encoding="utf-8") as f: return f.read() except FileNotFoundError: logger.warning(f"CSS file not found at {css_path}. No custom styles applied.") return "" # Return empty string instead of None except Exception as e: logger.error(f"Error reading CSS file {css_path}: {e}") return "" # Return empty string on error # --- Gradio Workflow Functions --- def step1_fetch_and_generate_presentation(url, progress=gr.Progress(track_tqdm=True)): """Fetches content, generates presentation markdown, prepares editor, and copies template. Uses caching based on URL.""" if not url: raise gr.Error("Please enter a URL.") logger.info(f"Step 1: Fetching & Generating for {url}") status_update = f"Starting Step 1: Fetching content from {url}..." yield ( gr.update(value=status_update), None, None, [], gr.update(), gr.update(), gr.update(), ) # --- Cache Check --- try: os.makedirs(URL_CACHE_DIR, exist_ok=True) # Ensure cache dir exists with shelve.open(URL_CACHE_FILE) as cache: if url in cache: logger.info(f"Cache hit for URL: {url}") progress(0.5, desc="Loading cached presentation...") cached_data = cache[url] presentation_md = cached_data.get("presentation_md") slides_data = cached_data.get("slides_data") if not presentation_md: logger.warning( f"Cache for {url} missing 'presentation_md'. Regenerating." ) if not slides_data: logger.warning( f"Cache for {url} missing 'slides_data'. Regenerating." ) if presentation_md and slides_data: logger.info( f"Found complete cache entry for {url} with {len(slides_data)} slides." ) temp_dir = tempfile.mkdtemp() md_path = os.path.join(temp_dir, "presentation.md") try: with open(md_path, "w", encoding="utf-8") as f: f.write(presentation_md) logger.info( f"Wrote cached presentation to temp file: {md_path}" ) # --- Copy Template Directory for Cached Item --- template_src_dir = TEMPLATE_DIR template_dest_dir = os.path.join( temp_dir, os.path.basename(TEMPLATE_DIR) ) if os.path.isdir(template_src_dir): try: shutil.copytree(template_src_dir, template_dest_dir) logger.info( f"Copied template dir to {template_dest_dir} (cached)" ) except Exception as copy_e: logger.error( f"Failed to copy template dir for cache: {copy_e}" ) shutil.rmtree(temp_dir) raise gr.Error(f"Failed to prepare template: {copy_e}") else: logger.error( f"Template source dir '{template_src_dir}' not found." ) shutil.rmtree(temp_dir) raise gr.Error( f"Required template '{template_src_dir}' not found." ) progress(0.9, desc="Preparing editor from cache...") status_update = ( "Loaded presentation from cache. Preparing editor..." ) yield ( gr.update(value=status_update), None, None, [], gr.update(), gr.update(), gr.update(), ) logger.info(f"Using cached data for {len(slides_data)} slides.") # Final yield must match outputs list yield ( gr.update(value=status_update), temp_dir, md_path, slides_data, gr.update(visible=True), # editor_column gr.update( visible=True ), # btn_build_slides (Enable PDF button next) gr.update( interactive=False ), # btn_fetch_generate (disable) ) return # End generator here for cache hit except Exception as e: logger.error(f"Error writing cached markdown: {e}") if os.path.exists(temp_dir): shutil.rmtree(temp_dir) # If writing cache fails, raise to trigger full regeneration flow raise gr.Error(f"Failed to write cached markdown: {e}") else: # This case is now covered by the more specific logging above pass # Continue to regeneration # --- Cache Miss or Failed Cache Load --- logger.info(f"Cache miss for URL: {url}. Proceeding with generation.") progress(0.1, desc="Fetching webpage content...") if not hf_client: raise gr.Error("LLM Client not initialized. Check API Key.") status_update = "Fetching webpage content..." yield ( gr.update(value=status_update), None, None, [], gr.update(), gr.update(), gr.update(), ) web_content = fetch_webpage_content(url) if not web_content: raise gr.Error("Failed to fetch or parse content from the URL.") progress(0.3, desc="Generating presentation with LLM...") status_update = "Generating presentation with LLM..." yield ( gr.update(value=status_update), None, None, [], gr.update(), gr.update(), gr.update(), ) try: presentation_md = generate_presentation_with_llm( hf_client, LLM_MODEL, PRESENTATION_PROMPT, web_content, url ) except Exception as e: logger.error(f"Error during LLM call: {e}", exc_info=True) raise gr.Error(f"Failed to generate presentation from LLM: {e}") if not presentation_md: logger.error("LLM generation returned None.") raise gr.Error("LLM generation failed (received None).") # Check for basic structure early, but parsing handles final validation if "---" not in presentation_md: logger.warning( "LLM output missing slide separators ('---'). Parsing might fail." ) if "???" not in presentation_md: logger.warning( "LLM output missing notes separators ('???'). Notes might be empty." ) progress(0.7, desc="Parsing presentation slides...") status_update = "Parsing presentation slides..." yield ( gr.update(value=status_update), None, None, [], gr.update(), gr.update(), gr.update(), ) slides_data = parse_presentation_markdown(presentation_md) if not slides_data: logger.error("Parsing markdown resulted in zero slides.") raise gr.Error("Failed to parse generated presentation markdown.") # Create a temporary directory for this session temp_dir = tempfile.mkdtemp() md_path = os.path.join(temp_dir, "presentation.md") with open(md_path, "w", encoding="utf-8") as f: f.write(presentation_md) logger.info(f"Presentation markdown saved to temp file: {md_path}") # --- Copy Template Directory for New Item --- template_src_dir = TEMPLATE_DIR template_dest_dir = os.path.join(temp_dir, os.path.basename(TEMPLATE_DIR)) if os.path.isdir(template_src_dir): try: shutil.copytree(template_src_dir, template_dest_dir) logger.info(f"Copied template directory to {template_dest_dir}") except Exception as copy_e: logger.error(f"Failed to copy template directory: {copy_e}") shutil.rmtree(temp_dir) raise gr.Error(f"Failed to prepare template: {copy_e}") else: logger.error(f"Template source dir '{template_src_dir}' not found.") shutil.rmtree(temp_dir) raise gr.Error(f"Required template '{template_src_dir}' not found.") # --- Store in Cache --- try: with shelve.open(URL_CACHE_FILE) as cache_write: cache_write[url] = { "presentation_md": presentation_md, "slides_data": slides_data, } logger.info( f"Stored generated presentation in cache for URL: {url}" ) except Exception as e: logger.error(f"Failed to write to cache for URL {url}: {e}") progress(0.9, desc="Preparing editor...") status_update = "Generated presentation. Preparing editor..." yield ( gr.update(value=status_update), None, None, [], gr.update(), gr.update(), gr.update(), ) logger.info(f"Prepared data for {len(slides_data)} slides.") # Final yield must match outputs list yield ( gr.update(value=status_update), temp_dir, md_path, slides_data, gr.update(visible=True), # editor_column gr.update(visible=True), # btn_build_slides (Enable PDF button next) gr.update(interactive=False), # btn_fetch_generate (disable) ) except Exception as e: logger.error(f"Error in step 1 (fetch/generate): {e}", exc_info=True) status_update = f"Error during presentation setup: {e}" yield ( gr.update(value=status_update), None, None, [], gr.update(), gr.update(), gr.update(), ) # Yield a final tuple matching outputs, indicating error state if possible yield ( gr.update(value=status_update), None, None, [], gr.update(visible=False), # Keep editor hidden gr.update(visible=False), # Keep build button hidden gr.update(interactive=True), # Re-enable fetch button ) # Do not re-raise if we want the Gradio app to stay alive # raise gr.Error(f"Error during presentation setup: {e}") def step2_build_slides( state_temp_dir, state_md_path, state_slides_data, *editors, progress=gr.Progress(track_tqdm=True), ): """Renamed from step2_generate_pdf""" if not all([state_temp_dir, state_md_path, state_slides_data]): raise gr.Error("Session state missing.") logger.info("Step 2: Building Slides (PDF + Images)") status_update = "Starting Step 2: Building slides..." yield ( gr.update(value=status_update), None, [], gr.update(), gr.update(), gr.update(), ) num_slides = len(state_slides_data) MAX_SLIDES = 20 all_editors = list(editors) if len(all_editors) != MAX_SLIDES * 2: raise gr.Error(f"Incorrect editor inputs: {len(all_editors)}") edited_contents = all_editors[:MAX_SLIDES][:num_slides] edited_notes_list = all_editors[MAX_SLIDES:][:num_slides] if len(edited_contents) != num_slides or len(edited_notes_list) != num_slides: raise gr.Error("Editor input mismatch.") progress(0.1, desc="Saving edited markdown...") status_update = "Saving edited markdown..." yield ( gr.update(value=status_update), None, [], gr.update(), gr.update(), gr.update(), ) updated_slides = [] for i in range(num_slides): updated_slides.append( {"id": i, "content": edited_contents[i], "notes": edited_notes_list[i]} ) updated_md = reconstruct_presentation_markdown(updated_slides) try: with open(state_md_path, "w", encoding="utf-8") as f: f.write(updated_md) logger.info(f"Saved edited markdown: {state_md_path}") except IOError as e: status_update = f"Failed to save markdown: {e}" yield ( gr.update(value=status_update), None, [], gr.update(), gr.update(), gr.update(), ) # Final yield must match outputs yield ( gr.update(value=status_update), None, [], gr.update(), gr.update(visible=True), gr.update(), ) raise gr.Error(f"Failed to save markdown: {e}") progress(0.3, desc="Generating PDF...") status_update = "Generating PDF..." yield ( gr.update(value=status_update), None, [], gr.update(), gr.update(), gr.update(), ) pdf_output_path = os.path.join(state_temp_dir, "presentation.pdf") try: generated_pdf_path = generate_pdf_from_markdown(state_md_path, pdf_output_path) if not generated_pdf_path: raise gr.Error("PDF generation failed (check logs).") except gr.Error as e: status_update = f"PDF Generation Error: {e}" yield ( gr.update(value=status_update), None, [], gr.update(), gr.update(), gr.update(), ) # Final yield must match outputs yield ( gr.update(value=status_update), None, [], gr.update(), gr.update(visible=True), gr.update(), ) raise e except Exception as e: status_update = f"Unexpected PDF Error: {e}" yield ( gr.update(value=status_update), None, [], gr.update(), gr.update(), gr.update(), ) # Final yield must match outputs yield ( gr.update(value=status_update), None, [], gr.update(), gr.update(visible=True), gr.update(), ) raise gr.Error(f"Unexpected error generating PDF: {e}") progress(0.7, desc="Converting PDF to images...") status_update = "Converting PDF to images..." yield ( gr.update(value=status_update), generated_pdf_path, [], gr.update(), gr.update(), gr.update(), ) pdf_images = [] try: pdf_images = convert_pdf_to_images( generated_pdf_path, dpi=150 ) # Use generated path if not pdf_images: raise gr.Error("PDF to image conversion failed.") logger.info(f"Converted PDF to {len(pdf_images)} images.") if len(pdf_images) != num_slides: warning_msg = f"Warning: PDF page count ({len(pdf_images)}) != slide count ({num_slides}). Images might mismatch." gr.Warning(warning_msg) status_update += f" ({warning_msg})" # Pad or truncate? For now, just return what we have, UI update logic handles MAX_SLIDES except Exception as e: logger.error(f"Error converting PDF to images: {e}", exc_info=True) status_update = f"Failed to convert PDF to images: {e}" yield ( gr.update(value=status_update), generated_pdf_path, [], gr.update(), gr.update(), gr.update(), ) # Final yield must match outputs yield ( gr.update(value=status_update), generated_pdf_path, [], gr.update(), gr.update(visible=True), gr.update(value=generated_pdf_path, visible=True), ) # Proceed without images? Or raise error? Let's raise. raise gr.Error(f"Failed to convert PDF to images: {e}") info_msg = f"Built {len(pdf_images)} slide images. Ready for Step 3." logger.info(info_msg) progress(1.0, desc="Slide build complete.") status_update = f"Step 2 Complete: {info_msg}" yield ( gr.update(value=status_update), generated_pdf_path, pdf_images, # Return the list of image paths gr.update(visible=True), # btn_generate_audio gr.update(visible=False), # btn_build_slides gr.update(value=generated_pdf_path, visible=True), # pdf_download_link ) def step3_generate_audio(*args, progress=gr.Progress(track_tqdm=True)): """Generates audio files for the speaker notes using edited content.""" # Args structure adjustment: # args[0]: state_temp_dir # args[1]: state_md_path # args[2]: original_slides_data # args[3...]: editors state_temp_dir = args[0] state_md_path = args[1] original_slides_data = args[2] # editors = args[3:] # Old slicing num_slides = len(original_slides_data) if num_slides == 0: logger.error("Step 3 (Audio) called with zero slides data.") raise gr.Error("No slide data available. Please start over.") MAX_SLIDES = 20 # Ensure this matches UI definition # --- Adjust indices based on removing status_textbox input --- # Start editors from index 3 now code_editors_start_index = 3 notes_textboxes_start_index = 3 + MAX_SLIDES # Slice the *actual* edited values based on num_slides edited_contents = args[ code_editors_start_index : code_editors_start_index + num_slides ] edited_notes_list = args[ notes_textboxes_start_index : notes_textboxes_start_index + num_slides ] if not state_temp_dir or not state_md_path: raise gr.Error("Session state lost (Audio step). Please start over.") # Check slicing if len(edited_contents) != num_slides or len(edited_notes_list) != num_slides: logger.error( f"Input slicing error (Audio step): Expected {num_slides}, got {len(edited_contents)} contents, {len(edited_notes_list)} notes." ) raise gr.Error( f"Input processing error: Mismatch after slicing ({num_slides} slides)." ) logger.info(f"Processing {num_slides} slides for audio generation.") status_update = "Starting Step 3: Generating audio..." yield (gr.update(value=status_update), None, gr.update(), gr.update()) + ( gr.update(), ) * MAX_SLIDES * 2 audio_dir = os.path.join(state_temp_dir, "audio") os.makedirs(audio_dir, exist_ok=True) # --- Update the presentation.md file AGAIN in case notes changed after PDF --- # This might be redundant if users don't edit notes between PDF and Audio steps, # but ensures the audio matches the *latest* notes displayed. progress(0.1, desc="Saving latest notes...") status_update = "Saving latest notes..." yield (gr.update(value=status_update), None, gr.update(), gr.update()) + ( gr.update(), ) * MAX_SLIDES * 2 updated_slides_data = [] for i in range(num_slides): updated_slides_data.append( { "id": original_slides_data[i]["id"], # Keep original ID "content": edited_contents[i], # Use sliced edited content "notes": edited_notes_list[i], # Use sliced edited notes } ) updated_markdown = reconstruct_presentation_markdown(updated_slides_data) try: with open(state_md_path, "w", encoding="utf-8") as f: f.write(updated_markdown) logger.info(f"Updated presentation markdown before audio gen: {state_md_path}") except IOError as e: warning_msg = f"Warning: Could not save latest notes to markdown file: {e}" gr.Warning(warning_msg) status_update += f" ({warning_msg})" yield (gr.update(value=status_update), None, gr.update(), gr.update()) + ( gr.update(), ) * MAX_SLIDES * 2 # Note: We continue processing audio even if saving markdown fails # If we need to stop and return final state, do it here: # yield (gr.update(value=status_update), state_audio_dir, gr.update(), gr.update(visible=True), *[gr.update()]*N, *[gr.update()]*N) # raise gr.Error(f"Failed to save updated markdown before audio gen: {e}") # Old raise generated_audio_paths = ["" for _ in range(num_slides)] audio_generation_failed = False successful_audio_count = 0 for i in range(num_slides): note_text = edited_notes_list[i] slide_num = i + 1 progress_val = (i + 1) / num_slides * 0.8 + 0.1 progress( progress_val, desc=f"Audio slide {slide_num}/{num_slides}", ) status_update = f"Generating audio for slide {slide_num}/{num_slides}..." yield (gr.update(value=status_update), audio_dir, gr.update(), gr.update()) + ( gr.update(), ) * MAX_SLIDES * 2 output_file_path = Path(audio_dir) / f"{slide_num}.wav" if not note_text or not note_text.strip(): try: # Generate silence subprocess.run( [ "ffmpeg", "-y", "-f", "lavfi", "-i", "anullsrc=r=44100:cl=mono", "-t", "0.1", "-q:a", "9", str(output_file_path), ], check=True, capture_output=True, text=True, ) generated_audio_paths[i] = str(output_file_path) except Exception as e: audio_generation_failed = True logger.error(f"Silence gen failed slide {i + 1}: {e}") continue try: # Generate TTS success = text_to_speech( note_text, output_file_path, voice=VOICE_ID, cache_dir=CACHE_DIR ) if success: generated_audio_paths[i] = str(output_file_path) successful_audio_count += 1 else: audio_generation_failed = True logger.error(f"TTS failed slide {i + 1}") except Exception as e: audio_generation_failed = True logger.error(f"TTS exception slide {i + 1}: {e}", exc_info=True) # --- Prepare outputs for Gradio --- audio_player_updates = [ gr.update(value=p if p else None, visible=bool(p and os.path.exists(p))) for p in generated_audio_paths ] regen_button_updates = [gr.update(visible=True)] * num_slides audio_player_updates.extend( [gr.update(value=None, visible=False)] * (MAX_SLIDES - num_slides) ) regen_button_updates.extend([gr.update(visible=False)] * (MAX_SLIDES - num_slides)) info_msg = f"Generated {successful_audio_count}/{num_slides} audio clips. " if audio_generation_failed: info_msg += "Some audio failed. Review/Regenerate before video." gr.Warning(info_msg) else: info_msg += "Ready for Step 4." logger.info(info_msg) progress(1.0, desc="Audio generation complete.") status_update = f"Step 3 Complete: {info_msg}" # Return tuple including status update + original outputs # Final yield must match outputs list yield ( gr.update(value=status_update), audio_dir, gr.update(visible=True), # btn_generate_video gr.update(visible=False), # btn_generate_audio *audio_player_updates, *regen_button_updates, ) def step4_generate_video( state_temp_dir, state_audio_dir, state_pdf_path, # Use PDF path from state progress=gr.Progress(track_tqdm=True), ): """Generates the final video using PDF images and audio files.""" if not state_temp_dir or not state_audio_dir or not state_pdf_path: raise gr.Error("Session state lost (Video step). Please start over.") if not os.path.exists(state_pdf_path): raise gr.Error(f"PDF file not found: {state_pdf_path}. Cannot generate video.") if not os.path.isdir(state_audio_dir): raise gr.Error( f"Audio directory not found: {state_audio_dir}. Cannot generate video." ) video_output_path = os.path.join(state_temp_dir, "final_presentation.mp4") status_update = "Starting Step 4: Generating video..." yield (gr.update(value=status_update), gr.update(), gr.update()) progress(0.1, desc="Preparing video components...") pdf_images = [] # Initialize to ensure cleanup happens try: # Find audio files (natsorted) audio_files = find_audio_files(state_audio_dir, "*.wav") if not audio_files: warning_msg = f"Warning: No WAV files found in {state_audio_dir}. Video might lack audio." logger.warning(warning_msg) status_update += f" ({warning_msg})" # Decide whether to proceed with silent video or error out # raise gr.Error(f"No audio files found in {state_audio_dir}") # Convert PDF to images progress(0.2, desc="Converting PDF to images...") status_update = "Converting PDF back to images for video..." yield (gr.update(value=status_update), gr.update(), gr.update()) pdf_images = convert_pdf_to_images(state_pdf_path, dpi=150) if not pdf_images: raise gr.Error(f"Failed to convert PDF ({state_pdf_path}) to images.") # Allow video generation even if audio is missing or count mismatch # The create_video_clips function should handle missing audio gracefully (e.g., use image duration) if len(pdf_images) != len(audio_files): warning_msg = f"Warning: Mismatch: {len(pdf_images)} PDF pages vs {len(audio_files)} audio files. Video clips might have incorrect durations or missing audio." logger.warning(warning_msg) status_update += f" ({warning_msg})" # yield status_update # Old yield yield (gr.update(value=status_update), gr.update(), gr.update()) progress(0.5, desc="Creating individual video clips...") status_update = "Creating individual video clips..." # yield status_update # Old yield yield (gr.update(value=status_update), gr.update(), gr.update()) buffer_seconds = 1.0 output_fps = 10 video_clips = create_video_clips( pdf_images, audio_files, buffer_seconds, output_fps ) if not video_clips: raise gr.Error("Failed to create any video clips.") progress(0.8, desc="Concatenating clips...") status_update = "Concatenating clips into final video..." # yield status_update # Old yield yield (gr.update(value=status_update), gr.update(), gr.update()) concatenate_clips(video_clips, video_output_path, output_fps) logger.info(f"Video concatenation complete: {video_output_path}") progress(0.95, desc="Cleaning up temp images...") status_update = "Cleaning up temporary image files..." # yield status_update # Old yield yield (gr.update(value=status_update), gr.update(), gr.update()) cleanup_temp_files(pdf_images) # Pass the list of image paths except Exception as e: if pdf_images: cleanup_temp_files(pdf_images) logger.error(f"Video generation failed: {e}", exc_info=True) status_update = f"Video generation failed: {e}" yield (gr.update(value=status_update), gr.update(), gr.update()) # Final yield must match outputs yield (gr.update(value=status_update), gr.update(), gr.update(visible=True)) raise gr.Error(f"Video generation failed: {e}") info_msg = f"Video generated: {os.path.basename(video_output_path)}" logger.info(info_msg) progress(1.0, desc="Video Complete.") status_update = f"Step 4 Complete: {info_msg}" # Return tuple including status update yield ( gr.update(value=status_update), gr.update(value=video_output_path, visible=True), # video_output gr.update(visible=False), # btn_generate_video ) def cleanup_session(temp_dir): """Removes the temporary directory.""" if temp_dir and isinstance(temp_dir, str) and os.path.exists(temp_dir): try: shutil.rmtree(temp_dir) logger.info(f"Cleaned up temporary directory: {temp_dir}") return "Cleaned up session files." except Exception as e: logger.error(f"Error cleaning up temp directory {temp_dir}: {e}") return f"Error during cleanup: {e}" logger.warning(f"Cleanup called but temp_dir invalid or not found: {temp_dir}") return "No valid temporary directory found to clean." # --- Dependency Check and Installation --- (Added) def check_and_install_npm_dependency(command_name, package_name, install_instructions): """Checks if a command exists, tries to install via npm if not.""" if shutil.which(command_name): logger.info(f"Command '{command_name}' found.") return True else: logger.warning( f"Command '{command_name}' not found. Attempting to install '{package_name}' globally via npm..." ) npm_command = ["npm", "install", "-g", package_name] try: result = subprocess.run( npm_command, check=True, capture_output=True, text=True, timeout=300 ) logger.info(f"Successfully installed '{package_name}'.") logger.debug(f"npm stdout:\n{result.stdout}") logger.debug(f"npm stderr:\n{result.stderr}") # Verify again after install attempt if shutil.which(command_name): logger.info(f"Command '{command_name}' is now available.") return True else: logger.error( f"Installation of '{package_name}' reported success, but command '{command_name}' still not found. Check npm logs and PATH." ) return False except FileNotFoundError: logger.error( f"`npm` command not found. Cannot install '{package_name}'. {install_instructions}" ) return False except subprocess.CalledProcessError as e: logger.error( f"Failed to install '{package_name}' (return code {e.returncode})." ) logger.error(f"npm stdout:\n{e.stdout}") logger.error(f"npm stderr:\n{e.stderr}") logger.error( "Installation might require administrator privileges (e.g., run with 'sudo')." ) return False except subprocess.TimeoutExpired: logger.error(f"Installation of '{package_name}' timed out.") return False except Exception as e: logger.error( f"An unexpected error occurred during '{package_name}' installation: {e}", exc_info=True, ) return False # --- Gradio Interface --- # Load custom CSS custom_css = load_css() with gr.Blocks( theme=gr.themes.Soft(), css=custom_css, title="Webpage to Video" ) as demo: gr.Markdown("# Webpage to Video Presentation Generator") # State variables state_temp_dir = gr.State(None) state_md_path = gr.State(None) state_audio_dir = gr.State(None) state_pdf_path = gr.State(None) state_slides_data = gr.State([]) state_pdf_image_paths = gr.State([]) MAX_SLIDES = 20 # --- Tabbed Interface --- with gr.Tabs(elem_id="tabs") as tabs_widget: # Tab 1: Generate Presentation with gr.TabItem("1. Generate Presentation", id=0): with gr.Row(): with gr.Column(scale=1): gr.Markdown("**Step 1:** Enter URL") input_url = gr.Textbox( label="Webpage URL", value="https://huggingface.co/blog/llm-course", ) btn_fetch_generate = gr.Button( value="1. Fetch & Generate", variant="primary" ) with gr.Column(scale=4): gr.Markdown( "### Instructions\n1. Enter URL & click 'Fetch & Generate'.\n2. Editor appears below.\n3. Go to next tab." ) # Tab 2: Build Slides with gr.TabItem("2. Build Slides", id=1): with gr.Row(): with gr.Column(scale=1): gr.Markdown("**Step 2:** Review/Edit, then build slides.") btn_build_slides = gr.Button( value="2. Build Slides", variant="secondary", visible=False ) pdf_download_link = gr.File( label="Download PDF", visible=False, interactive=False ) with gr.Column(scale=4): gr.Markdown( "### Instructions\n1. Edit content/notes below.\n2. Click 'Build Slides'. Images appear.\n3. Download PDF from sidebar.\n4. Go to next tab." ) # Tab 3: Generate Audio with gr.TabItem("3. Generate Audio", id=2): with gr.Row(): with gr.Column(scale=1): gr.Markdown("**Step 3:** Review/Edit notes, then generate audio.") btn_generate_audio = gr.Button( value="3. Generate Audio", variant="primary", visible=False ) with gr.Column(scale=4): gr.Markdown( "### Instructions\n1. Finalize notes below.\n2. Click 'Generate Audio'.\n3. Regenerate if needed.\n4. Go to next tab." ) # Tab 4: Generate Video with gr.TabItem("4. Create Video", id=3): with gr.Row(): with gr.Column(scale=1): gr.Markdown("**Step 4:** Create the final video.") btn_generate_video = gr.Button( value="4. Create Video", variant="primary", visible=False ) with gr.Column(scale=4): gr.Markdown( "### Instructions\n1. Click 'Create Video'.\n2. Video appears below.".replace( "\n", "\n" ) # Ensure newlines are rendered ) video_output = gr.Video(label="Final Video", visible=False) # --- Status Textbox (Moved BEFORE editor_column) --- with gr.Row(): status_textbox = gr.Textbox( label="Status", value="Enter a URL and click 'Fetch & Generate' to start.", interactive=False, lines=1, max_lines=1, ) # Define the shared editor structure once, AFTER tabs and status box slide_editors_group = [] with gr.Column(visible=False) as editor_column: # Initially hidden gr.Markdown("--- \n## Edit Slides & Notes") gr.Markdown("_(PDF uses content & notes, Audio uses notes only)_") for i in range(MAX_SLIDES): with gr.Accordion(f"Slide {i + 1}", open=(i == 0), visible=False) as acc: with gr.Row(): # Row for Content/Preview/Image with gr.Column(scale=1): code_editor = gr.Code( label="Content (Markdown)", language="markdown", lines=15, interactive=True, visible=False, ) notes_textbox = ( gr.Textbox( # Changed from gr.Code to gr.Textbox label="Script/Notes (for Audio)", lines=8, # language="markdown", # Removed language parameter interactive=True, visible=False, ) ) with gr.Column(scale=1): slide_image = gr.Image( label="Slide Image", visible=False, interactive=False, # height=300, # Removed fixed height ) md_preview = gr.Markdown(visible=False) with gr.Row(): # Row for audio controls audio_player = gr.Audio( label="Generated Audio", visible=False, interactive=False, scale=3, ) regen_button = gr.Button( value="Regen Audio", visible=False, scale=1, size="sm" ) slide_editors_group.append( ( acc, code_editor, md_preview, notes_textbox, audio_player, regen_button, slide_image, ) ) code_editor.change( fn=lambda x: x, inputs=code_editor, outputs=md_preview, show_progress="hidden", ) # --- Component Lists for Updates --- all_editor_components = [comp for group in slide_editors_group for comp in group] all_code_editors = [group[1] for group in slide_editors_group] all_notes_textboxes = [group[3] for group in slide_editors_group] all_audio_players = [group[4] for group in slide_editors_group] all_regen_buttons = [group[5] for group in slide_editors_group] all_slide_images = [group[6] for group in slide_editors_group] # --- Main Button Click Handlers --- (Outputs use locally defined component vars) # Step 1 Click Handler step1_outputs = [ status_textbox, # Added status output state_temp_dir, state_md_path, state_slides_data, editor_column, # Show the editor column btn_build_slides, # Enable the button in Tab 2 btn_fetch_generate, # Disable self ] btn_fetch_generate.click( fn=step1_fetch_and_generate_presentation, inputs=[input_url], outputs=step1_outputs, show_progress="full", ).then( fn=lambda s_data: ( gr.update( value="Editor populated. Proceed to Step 2." ), # Status update first # Then unpack the list/tuple of editor updates *[ upd for i, slide in enumerate(s_data) if i < MAX_SLIDES for upd in [ gr.update( # Get cleaned first line for title, then use in f-string label=f"Slide {i + 1}: {(slide['content'].splitlines()[0] if slide['content'] else '').strip()[:30]}...", visible=True, open=(i == 0), ), # Accordion gr.update(value=slide["content"], visible=True), # Code Editor gr.update(value=slide["content"], visible=True), # MD Preview gr.update(value=slide["notes"], visible=True), # Notes Textbox gr.update(value=None, visible=False), # Audio Player gr.update(visible=False), # Regen Button gr.update(value=None, visible=False), # Slide Image ] ] + [ upd for i in range(len(s_data), MAX_SLIDES) for upd in [gr.update(visible=False)] * 7 ], ), # Correctly construct the output tuple inputs=[state_slides_data], outputs=[status_textbox] + all_editor_components, # Add status_textbox output show_progress="hidden", ).then(lambda: gr.update(selected=1), outputs=tabs_widget) # Switch to Tab 2 # Step 2 Click Handler step2_inputs = ( [state_temp_dir, state_md_path, state_slides_data] + all_code_editors + all_notes_textboxes ) step2_outputs = [ status_textbox, # Added status output state_pdf_path, state_pdf_image_paths, btn_generate_audio, # Enable button in Tab 3 btn_build_slides, # Disable self pdf_download_link, # Update download link in Tab 2 ] btn_build_slides.click( fn=step2_build_slides, inputs=step2_inputs, outputs=step2_outputs, show_progress="full", ).then( fn=lambda image_paths: ( gr.update( value="Slide images updated. Proceed to Step 3." ), # Status update first # Then unpack the list/tuple of image updates *[ gr.update( value=image_paths[i] if i < len(image_paths) else None, visible=(i < len(image_paths)), ) for i in range(MAX_SLIDES) ], ), # Correctly construct the output tuple inputs=[state_pdf_image_paths], outputs=[status_textbox] + all_slide_images, # Add status_textbox output show_progress="hidden", ).then(lambda: gr.update(selected=2), outputs=tabs_widget) # Switch to Tab 3 # Step 3 Click Handler step3_inputs = ( [state_temp_dir, state_md_path, state_slides_data] + all_code_editors + all_notes_textboxes ) step3_outputs = ( [ status_textbox, # Added status output state_audio_dir, btn_generate_video, # Enable button in Tab 4 btn_generate_audio, # Disable self ] + all_audio_players + all_regen_buttons ) btn_generate_audio.click( fn=step3_generate_audio, inputs=step3_inputs, outputs=step3_outputs, show_progress="full", ).then( fn=lambda: ( gr.update(value="Audio generated. Proceed to Step 4."), gr.update(selected=3), ), # Status update is handled by the main function yield now # No inputs needed for this simple status update + tab switch outputs=[status_textbox, tabs_widget], show_progress="hidden", ) # Step 4 Click Handler step4_inputs = [state_temp_dir, state_audio_dir, state_pdf_path] step4_outputs = [ status_textbox, # Added status output video_output, # Update video output in Tab 4 btn_generate_video, # Disable self ] btn_generate_video.click( fn=step4_generate_video, inputs=step4_inputs, outputs=step4_outputs, show_progress="full", ) if __name__ == "__main__": os.makedirs(CACHE_DIR, exist_ok=True) os.makedirs(URL_CACHE_DIR, exist_ok=True) # --- Check/Install Dependencies --- (Added) logger.info("Checking for external dependencies...") backslide_ok = check_and_install_npm_dependency( "bs", "backslide", "Please install Node.js/npm (https://nodejs.org/) and then run 'npm install -g backslide'", ) decktape_ok = check_and_install_npm_dependency( "decktape", "decktape", "Please install Node.js/npm (https://nodejs.org/) and then run 'npm install -g decktape'", ) if not backslide_ok: gr.Warning( "Backslide (bs) command check/install failed. PDF generation might fail. Check logs." ) if not decktape_ok: gr.Warning( "Decktape command check/install failed. PDF generation might fail. Check logs." ) # --- End Dependency Check --- demo.queue().launch(debug=True)