Spaces:
Sleeping
Sleeping
#!/usr/bin/env python3 | |
"""Enhanced web document annotation tool with modern UI.""" | |
import hashlib | |
import json | |
import os | |
import uuid | |
from collections import defaultdict | |
from dataclasses import dataclass, field | |
from datetime import datetime, timezone | |
from pathlib import Path | |
from random import sample, shuffle | |
import gradio as gr | |
from datasets import Dataset, load_dataset | |
from loguru import logger | |
# FDC (Free Decimal Correspondence) constants | |
SCIENCE_CODES = ["50", "51", "54", "57", "58", "59", "61"] | |
FDC_KEEP = ["61"] # Medicine | |
def prefix(dds_code: str) -> str: | |
"""Extract the first two digits from a DDS code.""" | |
if not dds_code: | |
return "" | |
return dds_code[:2] | |
def doc_hash(url: str, text: str) -> str: | |
return hashlib.sha256(f"{url}{text}".encode()).hexdigest() | |
def filterfunc(x: dict) -> bool: | |
if len(x.get("text", "").split()) < 100: | |
return False | |
if x.get("eai_taxonomy", {}).get("free_decimal_correspondence", {}).get("primary", {}).get("code", "")[:2] != "61": | |
return False | |
excluded = {"Promotional/Advertisement", "Machine-Generated", "Images/Videos/Audio", | |
"Truncated", "Spam/Ads", "Product Page", "Content Listing"} | |
for version in ["document_type_v1", "document_type_v2"]: | |
for level in ["primary", "secondary"]: | |
if label := x.get("eai_taxonomy", {}).get(version, {}).get(level, {}).get("label"): | |
if label in excluded: | |
return False | |
return True | |
class DocLoader: | |
__slots__ = ("docs", "index", "processed", "_dataset") | |
def __init__(self, processed: set[str]): | |
self.processed = processed | |
self.index = 0 | |
self.docs = [] | |
self._dataset = {} | |
self._load() | |
def _load(self): | |
ds = load_dataset("sumuks/essential-web-v1.0-sample-100M", split="train") | |
logger.info(f"Loaded {len(ds)} documents") | |
ds = ds.filter(filterfunc) | |
logger.info(f"Filtered to {len(ds)} documents") | |
# Build dataset lookup and collect unprocessed docs | |
unprocessed = [] | |
for idx, doc in enumerate(ds): | |
doc_key = doc.get("id", idx) | |
doc_with_key = dict(doc) | |
doc_with_key["_dataset_key"] = doc_key | |
self._dataset[doc_key] = doc_with_key | |
# Check if already processed | |
url = doc.get("metadata", {}).get("url", doc.get("url", "")) | |
h = doc_hash(url, doc.get("text", "")) | |
if h not in self.processed: | |
unprocessed.append(doc_with_key) | |
logger.info(f"Found {len(unprocessed)} unprocessed documents") | |
# Randomize the order for this session | |
shuffle(unprocessed) | |
self.docs = unprocessed | |
logger.info(f"Loaded {len(self.docs)} documents for this session") | |
def next(self) -> dict | None: | |
if self.index < len(self.docs): | |
doc = self.docs[self.index] | |
self.index += 1 | |
return doc | |
return None | |
def get_by_id(self, doc_id: str | int) -> dict | None: | |
result = self._dataset.get(doc_id) | |
if result is None and isinstance(doc_id, str) and doc_id.isdigit(): | |
result = self._dataset.get(int(doc_id)) | |
elif result is None and isinstance(doc_id, int): | |
result = self._dataset.get(str(doc_id)) | |
return result | |
def remaining(self) -> int: | |
return max(0, len(self.docs) - self.index) | |
class AnnotationStore: | |
path: Path | |
session_id: str = field(default_factory=lambda: str(uuid.uuid4())) | |
buffer: list[dict] = field(default_factory=list) | |
threshold: int = 25 | |
processed: set[str] = field(default_factory=set) | |
annotations: list[dict] = field(default_factory=list) | |
session_stats: dict = field(default_factory=lambda: { | |
"total": 0, | |
"selected": 0, | |
"discarded": 0, | |
"start_time": datetime.now(timezone.utc), | |
"decisions": [] | |
}) | |
def __post_init__(self): | |
self.path.parent.mkdir(parents=True, exist_ok=True) | |
if self.path.exists(): | |
for line in self.path.read_text().splitlines(): | |
if rec := self._parse_line(line): | |
self.processed.add(rec["hash"]) | |
self.annotations.append(rec) | |
logger.info(f"Loaded {len(self.processed)} existing annotations") | |
def _parse_line(self, line: str) -> dict | None: | |
try: | |
return json.loads(line) | |
except: | |
return None | |
def add(self, doc_hash: str, decision: str, doc_id: str | int): | |
if doc_hash in self.processed: | |
logger.warning(f"Attempted to add already processed document: {doc_hash}") | |
return | |
rec = { | |
"hash": doc_hash, | |
"decision": decision, | |
"session": self.session_id, | |
"id": doc_id, | |
"timestamp": datetime.now(timezone.utc).isoformat(), | |
} | |
self.path.open("a").write(json.dumps(rec) + "\n") | |
self.processed.add(doc_hash) | |
self.buffer.append(rec) | |
self.annotations.append(rec) | |
self.session_stats["total"] += 1 | |
if decision == "selected": | |
self.session_stats["selected"] += 1 | |
elif decision == "discarded": | |
self.session_stats["discarded"] += 1 | |
self.session_stats["decisions"].append((datetime.now(timezone.utc), decision)) | |
if len(self.buffer) >= self.threshold: | |
self.flush() | |
def flush(self): | |
if not self.buffer or not (token := os.getenv("HF_TOKEN")): | |
self.buffer.clear() | |
return | |
try: | |
Dataset.from_list(self.buffer).push_to_hub( | |
"yourbench/essential-web-annotations", | |
token=token | |
) | |
logger.info(f"Pushed {len(self.buffer)} annotations") | |
self.buffer.clear() | |
except Exception as e: | |
logger.error(f"Push failed: {e}") | |
def get_rate(self) -> float: | |
if not self.session_stats["decisions"]: | |
return 0.0 | |
elapsed = (datetime.now(timezone.utc) - self.session_stats["start_time"]).total_seconds() | |
return (self.session_stats["total"] / elapsed * 3600) if elapsed > 0 else 0.0 | |
def get_filtered(self, decision: str | None = None) -> list[dict]: | |
if decision is None or decision == "all": | |
return self.annotations | |
return [a for a in self.annotations if a.get("decision") == decision] | |
SESSION_LIMIT = 50 | |
store = AnnotationStore(Path("data/annotations.jsonl")) | |
loader = DocLoader(store.processed) | |
current = loader.next() | |
# Viewer state | |
viewer_state = { | |
"annotations": [], | |
"index": 0, | |
"filter": "all" | |
} | |
def format_stats() -> str: | |
stats = store.session_stats | |
rate = store.get_rate() | |
return f""" | |
<div class="stats-container"> | |
<div class="stat-item"> | |
<div class="stat-value">{stats['total']}</div> | |
<div class="stat-label">Total Annotated</div> | |
</div> | |
<div class="stat-item"> | |
<div class="stat-value">{stats['selected']}</div> | |
<div class="stat-label">Selected</div> | |
</div> | |
<div class="stat-item"> | |
<div class="stat-value">{stats['discarded']}</div> | |
<div class="stat-label">Discarded</div> | |
</div> | |
<div class="stat-item"> | |
<div class="stat-value">{rate:.0f}/hr</div> | |
<div class="stat-label">Annotation Rate</div> | |
</div> | |
<div class="stat-item"> | |
<div class="stat-value">{loader.remaining:,}</div> | |
<div class="stat-label">Remaining Docs</div> | |
</div> | |
</div> | |
""" | |
def format_progress() -> tuple[str, float]: | |
session_completed = store.session_stats["total"] | |
session_total = SESSION_LIMIT | |
progress = (session_completed / session_total) if session_total > 0 else 0 | |
percentage = progress * 100 | |
return ( | |
f""" | |
<div class="progress-container"> | |
<div class="progress-header"> | |
<span class="progress-title">Session Progress</span> | |
<span class="progress-numbers">{session_completed:,} / {session_total:,}</span> | |
</div> | |
<div class="progress-bar-bg"> | |
<div class="progress-bar-fill" style="width: {percentage:.1f}%"></div> | |
</div> | |
<div class="progress-percentage">{percentage:.1f}% Complete</div> | |
</div> | |
""", | |
progress | |
) | |
def format_document_info(doc: dict, annotation: dict | None = None) -> str: | |
if not doc: | |
return "" | |
meta = doc.get("metadata", {}) | |
url = meta.get("url", doc.get("url", "")) | |
domain = url.split('/')[2] if url and '/' in url else "Unknown" | |
cat = doc.get("eai_taxonomy", {}).get("document_type_v2", {}).get("primary", {}).get("label", "Uncategorized") | |
word_count = len(doc.get("text", "").split()) | |
annotation_info = "" | |
if annotation: | |
timestamp = datetime.fromisoformat(annotation["timestamp"].replace("Z", "+00:00")) | |
decision_color = "#667eea" if annotation["decision"] == "selected" else "#f5576c" | |
annotation_info = f""" | |
<div class="annotation-info" style="border-left: 4px solid {decision_color};"> | |
<span class="annotation-decision" style="color: {decision_color};"> | |
{"β " if annotation["decision"] == "selected" else "β"} {annotation["decision"].title()} | |
</span> | |
<span class="annotation-time">π {timestamp.strftime("%Y-%m-%d %H:%M:%S")}</span> | |
</div> | |
""" | |
return f""" | |
<div class="doc-info"> | |
{annotation_info} | |
<div class="doc-meta"> | |
<span class="doc-domain">π {domain}</span> | |
<span class="doc-category">π·οΈ {cat}</span> | |
<span class="doc-words">π {word_count:,} words</span> | |
</div> | |
<a href="{url}" target="_blank" class="doc-url">{url}</a> | |
</div> | |
""" | |
def choose(decision: str): | |
global current | |
if not current: | |
return done_state() | |
url = current.get("metadata", {}).get("url", current.get("url", "")) | |
h = doc_hash(url, current.get("text", "")) | |
doc_id = current.get("_dataset_key", current.get("id", "")) | |
store.add(h, decision, doc_id) | |
if store.session_stats["total"] >= SESSION_LIMIT: | |
return done_state() | |
current = loader.next() | |
if not current: | |
return done_state() | |
progress_html, progress_num = format_progress() | |
return ( | |
format_document_info(current), | |
current.get("text", ""), | |
gr.update(interactive=True), | |
gr.update(interactive=True), | |
format_stats(), | |
progress_html, | |
progress_num | |
) | |
def done_state(): | |
progress_html, progress_num = format_progress() | |
if store.session_stats["total"] >= SESSION_LIMIT: | |
message = "π Session Complete!" | |
description = f"Great job! You've completed your session of {SESSION_LIMIT} documents." | |
else: | |
message = "π All documents annotated!" | |
description = "Great job! You've completed all available documents." | |
return ( | |
f"<div class='done-message'>{message}</div>", | |
description, | |
gr.update(interactive=False), | |
gr.update(interactive=False), | |
format_stats(), | |
progress_html, | |
1.0 | |
) | |
def update_viewer_filter(filter_value: str): | |
viewer_state["filter"] = filter_value | |
viewer_state["index"] = 0 | |
viewer_state["annotations"] = store.get_filtered(filter_value) | |
logger.info(f"Filter: {filter_value}, Found {len(viewer_state['annotations'])} annotations") | |
return update_viewer_display() | |
def navigate_viewer(direction: int): | |
if not viewer_state["annotations"]: | |
return update_viewer_display() | |
viewer_state["index"] = (viewer_state["index"] + direction) % len(viewer_state["annotations"]) | |
return update_viewer_display() | |
def update_viewer_display(): | |
if not viewer_state["annotations"]: | |
return ( | |
"<div class='viewer-empty'>No annotations to display</div>", | |
"", | |
f"0 / 0", | |
gr.update(interactive=False), | |
gr.update(interactive=False) | |
) | |
idx = viewer_state["index"] | |
annotation = viewer_state["annotations"][idx] | |
doc = loader.get_by_id(annotation["id"]) | |
if not doc: | |
logger.warning(f"Document not found for ID: {annotation['id']} (type: {type(annotation['id'])})") | |
return ( | |
"<div class='viewer-error'>Document not found in dataset</div>", | |
f"Annotation details: {json.dumps(annotation, indent=2)}", | |
f"{idx + 1} / {len(viewer_state['annotations'])}", | |
gr.update(interactive=idx > 0), | |
gr.update(interactive=idx < len(viewer_state["annotations"]) - 1) | |
) | |
return ( | |
format_document_info(doc, annotation), | |
doc.get("text", ""), | |
f"{idx + 1} / {len(viewer_state['annotations'])}", | |
gr.update(interactive=idx > 0), | |
gr.update(interactive=idx < len(viewer_state["annotations"]) - 1) | |
) | |
def build() -> gr.Blocks: | |
css = """ | |
.stats-container { | |
display: flex; | |
gap: 15px; | |
margin: 10px 0; | |
flex-wrap: nowrap; | |
justify-content: space-between; | |
} | |
.stat-item { | |
background: linear-gradient(135deg, #667eea 0%, #764ba2 100%); | |
border-radius: 12px; | |
padding: 15px; | |
flex: 1; | |
min-width: 100px; | |
text-align: center; | |
box-shadow: 0 4px 6px rgba(0, 0, 0, 0.1); | |
transition: transform 0.2s; | |
} | |
.stat-item:hover { | |
transform: translateY(-2px); | |
} | |
.stat-value { | |
font-size: 24px; | |
font-weight: bold; | |
color: white; | |
margin-bottom: 3px; | |
} | |
.stat-label { | |
font-size: 12px; | |
color: rgba(255, 255, 255, 0.9); | |
} | |
.progress-container { | |
background: #f8f9fa; | |
border-radius: 12px; | |
padding: 15px; | |
margin: 10px 0; | |
} | |
.progress-header { | |
display: flex; | |
justify-content: space-between; | |
margin-bottom: 10px; | |
font-weight: 600; | |
} | |
.progress-bar-bg { | |
background: #e9ecef; | |
height: 20px; | |
border-radius: 10px; | |
overflow: hidden; | |
margin-bottom: 10px; | |
} | |
.progress-bar-fill { | |
background: linear-gradient(90deg, #667eea 0%, #764ba2 100%); | |
height: 100%; | |
transition: width 0.3s ease; | |
} | |
.progress-percentage { | |
text-align: center; | |
color: #6c757d; | |
font-size: 14px; | |
} | |
.doc-info { | |
background: #f8f9fa; | |
border-radius: 12px; | |
padding: 15px; | |
margin-bottom: 10px; | |
} | |
.doc-meta { | |
display: flex; | |
gap: 20px; | |
margin-bottom: 10px; | |
flex-wrap: wrap; | |
} | |
.doc-meta span { | |
font-size: 14px; | |
color: #495057; | |
} | |
.doc-url { | |
font-size: 14px; | |
color: #667eea; | |
text-decoration: none; | |
word-break: break-all; | |
} | |
.doc-url:hover { | |
text-decoration: underline; | |
} | |
.done-message { | |
font-size: 32px; | |
text-align: center; | |
padding: 40px; | |
background: linear-gradient(135deg, #667eea 0%, #764ba2 100%); | |
color: white; | |
border-radius: 12px; | |
font-weight: bold; | |
} | |
.annotation-info { | |
display: flex; | |
justify-content: space-between; | |
margin-bottom: 10px; | |
padding-left: 10px; | |
} | |
.annotation-decision { | |
font-weight: 600; | |
} | |
.annotation-time { | |
color: #6c757d; | |
font-size: 12px; | |
} | |
.viewer-empty, .viewer-error { | |
text-align: center; | |
padding: 40px; | |
color: #6c757d; | |
font-size: 18px; | |
} | |
.viewer-nav { | |
display: flex; | |
justify-content: center; | |
align-items: center; | |
gap: 20px; | |
margin: 10px 0; | |
} | |
.viewer-counter { | |
font-weight: 600; | |
color: #495057; | |
} | |
#select { | |
background: linear-gradient(135deg, #667eea 0%, #764ba2 100%); | |
border: none; | |
font-size: 18px; | |
padding: 12px 24px; | |
} | |
#discard { | |
background: linear-gradient(135deg, #f093fb 0%, #f5576c 100%); | |
border: none; | |
font-size: 18px; | |
padding: 12px 24px; | |
} | |
.dark .stat-item { | |
background: linear-gradient(135deg, #434343 0%, #000000 100%); | |
} | |
.dark .progress-container, .dark .doc-info { | |
background: #1a1a1a; | |
} | |
.dark .progress-bar-bg { | |
background: #2a2a2a; | |
} | |
@keyframes pulse { | |
0% { transform: scale(1); } | |
50% { transform: scale(1.05); } | |
100% { transform: scale(1); } | |
} | |
""" | |
shortcut_js = """ | |
<script> | |
function handleKeyboardShortcuts(e) { | |
var target = e.target || e.srcElement; | |
switch (target.tagName.toLowerCase()) { | |
case "input": | |
case "textarea": | |
case "select": | |
case "button": | |
return; | |
default: | |
if (e.code === "Digit1" || e.key === "1") { | |
var selectBtn = document.getElementById("select"); | |
if (selectBtn && !selectBtn.disabled) { | |
selectBtn.click(); | |
e.preventDefault(); | |
} | |
} | |
else if (e.code === "Digit2" || e.key === "2") { | |
var discardBtn = document.getElementById("discard"); | |
if (discardBtn && !discardBtn.disabled) { | |
discardBtn.click(); | |
e.preventDefault(); | |
} | |
} | |
} | |
} | |
document.addEventListener('keyup', handleKeyboardShortcuts, false); | |
document.addEventListener('keydown', function(e) { | |
if ((e.code === "Digit1" || e.key === "1") && document.getElementById("select") && !document.getElementById("select").disabled) { | |
document.getElementById("select").style.transform = "scale(0.95)"; | |
} | |
if ((e.code === "Digit2" || e.key === "2") && document.getElementById("discard") && !document.getElementById("discard").disabled) { | |
document.getElementById("discard").style.transform = "scale(0.95)"; | |
} | |
}); | |
document.addEventListener('keyup', function(e) { | |
if (e.code === "Digit1" || e.key === "1") { | |
var btn = document.getElementById("select"); | |
if (btn) btn.style.transform = "scale(1)"; | |
} | |
if (e.code === "Digit2" || e.key === "2") { | |
var btn = document.getElementById("discard"); | |
if (btn) btn.style.transform = "scale(1)"; | |
} | |
}); | |
</script> | |
""" | |
with gr.Blocks( | |
title="Essential Web Annotation", | |
theme=gr.themes.Default(), | |
css=css, | |
head=shortcut_js | |
) as demo: | |
gr.Markdown("# π Essential Web Annotation Tool") | |
with gr.Tabs(): | |
with gr.Tab("Annotate"): | |
gr.Markdown(""" | |
## π Document Quality Assessment | |
Your task is to evaluate documents for **high-quality, valuable content** that provides generalizable information. | |
### β **Select High-Quality Documents:** | |
Examples include: | |
- **Technical blogs** with detailed explanations | |
- **Scientific papers** and research articles | |
- **Information-rich discussions** with insights | |
- **Educational content** with actionable knowledge | |
- **Professional documentation** and guides | |
### β **Discard Low-Quality Documents:** | |
- Content with minimal informational value | |
### π― **Quick Assessment Tips:** | |
- High-quality documents are usually immediately recognizable to a human. | |
- Use the **Viewer** tab to browse examples of selected documents | |
- Trust your judgment on content value and depth | |
### β¨οΈ **Keyboard Shortcuts:** | |
| Key | Action | | |
|-----|--------| | |
| **`1`** | β Select document | | |
| **`2`** | β Discard document | | |
""") | |
progress_html, progress_num = format_progress() | |
progress_display = gr.HTML(progress_html) | |
stats_display = gr.HTML(format_stats()) | |
if current: | |
doc_info_html = format_document_info(current) | |
text_val = current.get("text", "") | |
else: | |
doc_info_html = "<div class='doc-info'>No documents loaded.</div>" | |
text_val = "" | |
doc_info = gr.HTML(doc_info_html) | |
with gr.Column(variant="panel"): | |
text_display = gr.Textbox( | |
text_val, | |
label="π Document Content", | |
lines=20, | |
interactive=False, | |
show_copy_button=True | |
) | |
with gr.Row(): | |
btn_sel = gr.Button( | |
"β Select (1)", | |
elem_id="select", | |
variant="primary", | |
interactive=bool(current), | |
size="lg" | |
) | |
btn_dis = gr.Button( | |
"β Discard (2)", | |
elem_id="discard", | |
variant="stop", | |
interactive=bool(current), | |
size="lg" | |
) | |
progress_bar = gr.Number(value=progress_num, visible=False) | |
outputs = [doc_info, text_display, btn_sel, btn_dis, stats_display, progress_display, progress_bar] | |
btn_sel.click(lambda: choose("selected"), outputs=outputs) | |
btn_dis.click(lambda: choose("discarded"), outputs=outputs) | |
with gr.Tab("Viewer"): | |
gr.Markdown("### π Browse Annotated Documents") | |
with gr.Row(): | |
filter_dropdown = gr.Radio( | |
choices=["all", "selected", "discarded"], | |
value="all", | |
label="Filter", | |
interactive=True | |
) | |
viewer_info = gr.HTML() | |
with gr.Column(variant="panel"): | |
viewer_text = gr.Textbox( | |
label="π Document Content", | |
lines=20, | |
interactive=False, | |
show_copy_button=True | |
) | |
with gr.Row(): | |
prev_btn = gr.Button("β Previous", size="lg") | |
viewer_counter = gr.HTML("<div class='viewer-counter'>0 / 0</div>") | |
next_btn = gr.Button("Next β", size="lg") | |
filter_dropdown.change( | |
update_viewer_filter, | |
inputs=[filter_dropdown], | |
outputs=[viewer_info, viewer_text, viewer_counter, prev_btn, next_btn] | |
) | |
prev_btn.click( | |
lambda: navigate_viewer(-1), | |
outputs=[viewer_info, viewer_text, viewer_counter, prev_btn, next_btn] | |
) | |
next_btn.click( | |
lambda: navigate_viewer(1), | |
outputs=[viewer_info, viewer_text, viewer_counter, prev_btn, next_btn] | |
) | |
demo.load( | |
lambda: update_viewer_filter("all"), | |
outputs=[viewer_info, viewer_text, viewer_counter, prev_btn, next_btn] | |
) | |
gr.HTML(""" | |
<script> | |
const observer = new MutationObserver(() => { | |
document.querySelectorAll('.stat-item').forEach(item => { | |
item.style.animation = 'pulse 0.3s ease-out'; | |
}); | |
}); | |
observer.observe(document.body, { childList: true, subtree: true }); | |
</script> | |
""") | |
return demo | |
if __name__ == "__main__": | |
build().launch() | |