import gradio as gr
import numpy as np
import pandas as pd
import torch
import matplotlib.pyplot as plt
import json
import time
import os
from functools import partial
import datetime
import plotly.graph_objects as go
from plotly.subplots import make_subplots
# Global variables to store models
tokenizer = None
ner_pipeline = None
pos_pipeline = None
intent_classifier = None
semantic_model = None
stt_model = None # Speech-to-text model
models_loaded = False
# Database to store keyword ranking history (in-memory database for this example)
# In a real app, you would use a proper database
ranking_history = {}
def load_models(progress=gr.Progress()):
"""Lazy-load models only when needed"""
global tokenizer, ner_pipeline, pos_pipeline, intent_classifier, semantic_model, stt_model, models_loaded
if models_loaded:
return True
try:
progress(0.1, desc="Loading models...")
# Use smaller models and load them sequentially to reduce memory pressure
from transformers import AutoTokenizer, pipeline
progress(0.2, desc="Loading tokenizer...")
tokenizer = AutoTokenizer.from_pretrained("bert-base-uncased")
progress(0.3, desc="Loading NER model...")
ner_pipeline = pipeline("ner", model="dslim/bert-base-NER")
progress(0.4, desc="Loading POS model...")
# Use smaller POS model
from transformers import AutoModelForTokenClassification, BertTokenizerFast
pos_model = AutoModelForTokenClassification.from_pretrained("vblagoje/bert-english-uncased-finetuned-pos")
pos_tokenizer = BertTokenizerFast.from_pretrained("vblagoje/bert-english-uncased-finetuned-pos")
pos_pipeline = pipeline("token-classification", model=pos_model, tokenizer=pos_tokenizer)
progress(0.6, desc="Loading intent classifier...")
# Use a smaller model for zero-shot classification
intent_classifier = pipeline(
"zero-shot-classification",
model="typeform/distilbert-base-uncased-mnli", # Smaller than BART
device=0 if torch.cuda.is_available() else -1 # Use GPU if available
)
progress(0.7, desc="Loading speech-to-text model...")
try:
# Load automatic speech recognition model
from transformers import WhisperProcessor, WhisperForConditionalGeneration
processor = WhisperProcessor.from_pretrained("openai/whisper-small.en")
stt_model = WhisperForConditionalGeneration.from_pretrained("openai/whisper-small.en")
stt_model = (processor, stt_model)
except Exception as e:
print(f"Warning: Could not load speech-to-text model: {str(e)}")
stt_model = None # Set to None so we can check if it's available
progress(0.8, desc="Loading semantic model...")
try:
from sentence_transformers import SentenceTransformer
semantic_model = SentenceTransformer('all-MiniLM-L6-v2')
except Exception as e:
print(f"Warning: Could not load semantic model: {str(e)}")
semantic_model = None # Set to None so we can check if it's available
progress(1.0, desc="Models loaded successfully!")
models_loaded = True
return True
except Exception as e:
print(f"Error loading models: {str(e)}")
return f"Error: {str(e)}"
def speech_to_text(audio_path):
"""Convert speech to text using the loaded speech-to-text model"""
if stt_model is None:
return "Speech-to-text model not loaded. Please try text input instead."
try:
import librosa
import numpy as np
# Load audio file
audio, sr = librosa.load(audio_path, sr=16000)
# Process audio with Whisper
processor, model = stt_model
input_features = processor(audio, sampling_rate=16000, return_tensors="pt").input_features
# Generate token ids
predicted_ids = model.generate(input_features)
# Decode token ids to text
transcription = processor.batch_decode(predicted_ids, skip_special_tokens=True)[0]
return transcription
except Exception as e:
print(f"Error in speech_to_text: {str(e)}")
return f"Error processing speech: {str(e)}"
def handle_voice_input(audio):
"""Handle voice input and convert to text"""
if audio is None:
return "No audio detected. Please try again."
try:
# Convert speech to text
text = speech_to_text(audio)
return text
except Exception as e:
print(f"Error in handle_voice_input: {str(e)}")
return f"Error: {str(e)}"
def simulate_google_serp(keyword, num_results=10):
"""Simulate Google SERP results for a keyword"""
try:
# In a real implementation, this would call the Google API
# For now, we'll generate fake SERP data
# Deterministic seed for consistent results by keyword
np.random.seed(sum(ord(c) for c in keyword))
serp_results = []
domains = [
"example.com", "wikipedia.org", "medium.com", "github.com",
"stackoverflow.com", "amazon.com", "youtube.com", "reddit.com",
"linkedin.com", "twitter.com", "facebook.com", "instagram.com"
]
for i in range(1, num_results + 1):
domain = domains[i % len(domains)]
title = f"{keyword.title()} - {domain.split('.')[0].title()} Resource #{i}"
snippet = f"This is a simulated SERP result for '{keyword}'. Result #{i} would provide relevant information about this topic."
url = f"https://www.{domain}/{keyword.replace(' ', '-')}-resource-{i}"
position = i
ctr = round(0.3 * (0.85 ** (i - 1)), 4) # Simulate click-through rate decay
serp_results.append({
"position": position,
"title": title,
"url": url,
"domain": domain,
"snippet": snippet,
"ctr_estimate": ctr,
"impressions_estimate": np.random.randint(1000, 10000)
})
return serp_results
except Exception as e:
print(f"Error in simulate_google_serp: {str(e)}")
return []
def update_ranking_history(keyword, serp_results):
"""Update the ranking history for a keyword"""
try:
# Get current timestamp
timestamp = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
# Initialize if keyword not in history
if keyword not in ranking_history:
ranking_history[keyword] = []
# Add new entry
ranking_history[keyword].append({
"timestamp": timestamp,
"results": serp_results[:5] # Store top 5 results for history
})
# Keep only last 10 entries for each keyword
if len(ranking_history[keyword]) > 10:
ranking_history[keyword] = ranking_history[keyword][-10:]
return True
except Exception as e:
print(f"Error in update_ranking_history: {str(e)}")
return False
def get_semantic_similarity(token, comparison_terms):
"""Calculate semantic similarity between a token and comparison terms"""
try:
from sklearn.metrics.pairwise import cosine_similarity
token_embedding = semantic_model.encode([token])[0]
comparison_embeddings = semantic_model.encode(comparison_terms)
similarities = []
for i, emb in enumerate(comparison_embeddings):
similarity = cosine_similarity([token_embedding], [emb])[0][0]
similarities.append((comparison_terms[i], float(similarity)))
return sorted(similarities, key=lambda x: x[1], reverse=True)
except Exception as e:
print(f"Error in semantic similarity: {str(e)}")
# Return dummy data on error
return [(term, 0.5) for term in comparison_terms]
def get_token_colors(token_type):
colors = {
"prefix": "#D8BFD8", # Light purple
"suffix": "#AEDAA4", # Light green
"stem": "#A4C2F4", # Light blue
"compound_first": "#FFCC80", # Light orange
"compound_second": "#FFCC80", # Light orange
"word": "#E5E5E5" # Light gray
}
return colors.get(token_type, "#E5E5E5")
def simulate_historical_data(token):
"""Generate simulated historical usage data for a token"""
eras = ["1900s", "1950s", "1980s", "2000s", "2010s", "Present"]
# Different patterns based on token characteristics
if len(token) > 8:
# Possibly a technical term - recent growth
values = [10, 20, 30, 60, 85, 95]
elif token.startswith(("un", "re", "de", "pre")):
# Prefix words tend to be older
values = [45, 50, 60, 70, 75, 80]
else:
# Standard pattern for common words
# Use token hash value modulo instead of hash() directly to avoid different results across runs
base = 50 + (sum(ord(c) for c in token) % 30)
# Use a fixed seed for reproducibility
np.random.seed(sum(ord(c) for c in token))
noise = np.random.normal(0, 5, 6)
values = [max(5, min(95, base + i*5 + n)) for i, n in enumerate(noise)]
return list(zip(eras, values))
def generate_origin_data(token):
"""Generate simulated origin/etymology data for a token"""
origins = [
{"era": "Ancient", "language": "Latin"},
{"era": "Ancient", "language": "Greek"},
{"era": "Medieval", "language": "Old English"},
{"era": "16th century", "language": "French"},
{"era": "18th century", "language": "Germanic"},
{"era": "19th century", "language": "Anglo-Saxon"},
{"era": "20th century", "language": "Modern English"}
]
# Deterministic selection based on the token
index = sum(ord(c) for c in token) % len(origins)
origin = origins[index]
note = f"First appeared in {origin['era']} texts derived from {origin['language']}."
origin["note"] = note
return origin
def analyze_token_types(tokens):
"""Identify token types (prefix, suffix, compound, etc.)"""
processed_tokens = []
prefixes = ["un", "re", "de", "pre", "post", "anti", "pro", "inter", "sub", "super"]
suffixes = ["ing", "ed", "ly", "ment", "tion", "able", "ible", "ness", "ful", "less"]
for token in tokens:
token_text = token.lower()
token_type = "word"
# Check for prefixes
for prefix in prefixes:
if token_text.startswith(prefix) and len(token_text) > len(prefix) + 2:
if token_text != prefix: # Make sure the word isn't just the prefix
token_type = "prefix"
break
# Check for suffixes
if token_type == "word":
for suffix in suffixes:
if token_text.endswith(suffix) and len(token_text) > len(suffix) + 2:
token_type = "suffix"
break
# Check for compound words (simplified)
if token_type == "word" and len(token_text) > 8:
token_type = "compound_first" # Simplified - in reality would need more analysis
processed_tokens.append({
"text": token_text,
"type": token_type
})
return processed_tokens
def plot_historical_data(historical_data):
"""Create a plot of historical usage data, with error handling"""
try:
eras = [item[0] for item in historical_data]
values = [item[1] for item in historical_data]
plt.figure(figsize=(8, 3))
plt.bar(eras, values, color='skyblue')
plt.title('Historical Usage')
plt.xlabel('Era')
plt.ylabel('Usage Level')
plt.ylim(0, 100)
plt.xticks(rotation=45)
plt.tight_layout()
return plt
except Exception as e:
print(f"Error in plot_historical_data: {str(e)}")
# Return a simple error plot
plt.figure(figsize=(8, 3))
plt.text(0.5, 0.5, f"Error creating plot: {str(e)}",
horizontalalignment='center', verticalalignment='center')
plt.axis('off')
return plt
def create_evolution_chart(data, forecast_months=6, growth_scenario="Moderate"):
"""Create a simpler chart that's more compatible with Gradio"""
try:
import plotly.graph_objects as go
# Create a basic figure without subplots
fig = go.Figure()
# Add main trace for search volume
fig.add_trace(
go.Scatter(
x=[item["month"] for item in data],
y=[item["searchVolume"] for item in data],
name="Search Volume",
line=dict(color="#8884d8", width=3),
mode="lines+markers"
)
)
# Scale the other metrics to be visible on the same chart
max_volume = max([item["searchVolume"] for item in data])
scale_factor = max_volume / 100
# Add competition score (scaled)
fig.add_trace(
go.Scatter(
x=[item["month"] for item in data],
y=[item["competitionScore"] * scale_factor for item in data],
name="Competition Score",
line=dict(color="#82ca9d", width=2, dash="dot"),
mode="lines+markers"
)
)
# Add intent clarity (scaled)
fig.add_trace(
go.Scatter(
x=[item["month"] for item in data],
y=[item["intentClarity"] * scale_factor for item in data],
name="Intent Clarity",
line=dict(color="#ffc658", width=2, dash="dash"),
mode="lines+markers"
)
)
# Simple layout
fig.update_layout(
title=f"Keyword Evolution Forecast ({growth_scenario} Growth)",
xaxis_title="Month",
yaxis_title="Value",
legend=dict(orientation="h", y=1.1),
height=500
)
return fig
except Exception as e:
print(f"Error in chart creation: {str(e)}")
# Fallback to an even simpler chart
fig = go.Figure(data=go.Scatter(x=[1, 2, 3], y=[4, 1, 2]))
fig.update_layout(title="Fallback Chart (Error occurred)")
return fig
def create_ranking_history_chart(keyword_history):
"""Create a chart showing keyword ranking history over time"""
try:
if not keyword_history or len(keyword_history) < 2:
# Not enough data for a meaningful chart
fig = go.Figure()
fig.update_layout(
title="Insufficient Ranking Data",
annotations=[{
"text": "Need at least 2 data points for ranking history",
"showarrow": False,
"font": {"size": 16},
"xref": "paper",
"yref": "paper",
"x": 0.5,
"y": 0.5
}]
)
return fig
# Create a figure
fig = go.Figure()
# Extract timestamps and convert to datetime objects
timestamps = [entry["timestamp"] for entry in keyword_history]
dates = [datetime.datetime.strptime(ts, "%Y-%m-%d %H:%M:%S") for ts in timestamps]
# Get unique domains from all results
all_domains = set()
for entry in keyword_history:
for result in entry["results"]:
all_domains.add(result["domain"])
# Colors for different domains
domain_colors = {}
color_palette = [
"#1f77b4", "#ff7f0e", "#2ca02c", "#d62728", "#9467bd",
"#8c564b", "#e377c2", "#7f7f7f", "#bcbd22", "#17becf"
]
for i, domain in enumerate(all_domains):
domain_colors[domain] = color_palette[i % len(color_palette)]
# Track domains and their positions over time
domain_tracking = {domain: {"x": [], "y": [], "text": []} for domain in all_domains}
for i, entry in enumerate(keyword_history):
for result in entry["results"]:
domain = result["domain"]
position = result["position"]
title = result["title"]
domain_tracking[domain]["x"].append(dates[i])
domain_tracking[domain]["y"].append(position)
domain_tracking[domain]["text"].append(title)
# Add traces for each domain
for domain, data in domain_tracking.items():
if len(data["x"]) > 0: # Only add domains that have data
fig.add_trace(
go.Scatter(
x=data["x"],
y=data["y"],
mode="lines+markers",
name=domain,
line=dict(color=domain_colors[domain]),
hovertemplate="%{text}
Position: %{y}
Date: %{x}