insight / pages /6_🧪_Feb_2025_Dataset_Explorer.py
dwb2023's picture
Initial commit for Hugging Face Spaces
3bb5fb5
raw
history blame
8.21 kB
import streamlit as st
import pandas as pd
from datasets import load_dataset
import re
from datetime import datetime, date
from io import StringIO
from typing import Optional, Tuple, List, Dict, Any
# Constants
DEFAULT_SAMPLE_SIZE = 1000
DATE_FORMAT = "%Y%m%d"
FULL_DATE_FORMAT = f"{DATE_FORMAT}%H%M%S"
# Load dataset with enhanced caching and validation
@st.cache_data(ttl=3600, show_spinner="Loading dataset...")
def load_data(sample_size: int = DEFAULT_SAMPLE_SIZE) -> pd.DataFrame:
"""
Load and validate dataset with error handling.
Args:
sample_size (int): Number of records to load
Returns:
pd.DataFrame: Loaded and validated dataframe
"""
try:
dataset = load_dataset(
"dwb2023/gdelt-gkg-2025-v2",
data_files={
"train": [
"gdelt_gkg_20250210.parquet",
"gdelt_gkg_20250211.parquet",
]
},
split="train"
)
df = pd.DataFrame(dataset)
# Basic data validation
if df.empty:
st.error("Loaded dataset is empty")
return pd.DataFrame()
if "DATE" not in df.columns:
st.error("Dataset missing required DATE column")
return pd.DataFrame()
return df
except Exception as e:
st.error(f"Error loading dataset: {str(e)}")
st.stop()
return pd.DataFrame()
def initialize_app(df: pd.DataFrame) -> None:
"""Initialize the Streamlit app interface."""
st.title("GDELT GKG 2025 Dataset Explorer")
with st.sidebar:
st.header("Search Criteria")
st.markdown("🔍 Filter dataset using the controls below")
def extract_unique_themes(df: pd.DataFrame, column: str) -> List[str]:
"""
Extract and clean unique themes from semicolon-separated column.
Args:
df (pd.DataFrame): Input dataframe
column (str): Column name containing themes
Returns:
List[str]: Sorted list of unique themes
"""
if df.empty:
return []
return sorted({
theme.split(",")[0].strip()
for themes in df[column].dropna().str.split(";")
for theme in themes if theme.strip()
})
def get_date_range(df: pd.DataFrame, date_col: str) -> Tuple[date, date]:
"""
Get min/max dates from dataset with fallback defaults.
Args:
df (pd.DataFrame): Input dataframe
date_col (str): Column name containing dates
Returns:
Tuple[date, date]: (min_date, max_date) as date objects
"""
try:
# Convert YYYYMMDDHHMMSS string format to datetime using constant
dates = pd.to_datetime(df[date_col], format=FULL_DATE_FORMAT)
return dates.min().date(), dates.max().date()
except Exception as e:
st.warning(f"Date range detection failed: {str(e)}")
return datetime(2025, 2, 10).date(), datetime(2025, 2, 11).date()
def create_filters(df: pd.DataFrame) -> Dict[str, Any]:
"""
Generate sidebar filters and return filter state.
Args:
df (pd.DataFrame): Input dataframe
Returns:
Dict[str, Any]: Dictionary of filter settings
"""
filters = {}
with st.sidebar:
# Theme multi-select
filters["themes"] = st.multiselect(
"V2EnhancedThemes (exact match)",
options=extract_unique_themes(df, "V2EnhancedThemes"),
help="Select exact themes to include (supports multiple selection)"
)
# Text-based filters
text_filters = {
"source_common_name": ("SourceCommonName", "partial name match"),
"document_identifier": ("DocumentIdentifier", "partial identifier match"),
"sharing_image": ("V2.1SharingImage", "partial image URL match")
}
for key, (label, help_text) in text_filters.items():
filters[key] = st.text_input(
f"{label} ({help_text})",
placeholder=f"Enter {help_text}...",
help=f"Case-insensitive {help_text}"
)
# Date range with dataset-based defaults
date_col = "DATE"
min_date, max_date = get_date_range(df, date_col)
filters["date_range"] = st.date_input(
"Date range",
value=(min_date, max_date),
min_value=min_date,
max_value=max_date,
)
# Record limit
filters["record_limit"] = st.number_input(
"Max records to display",
min_value=100,
max_value=5000,
value=1000,
step=100,
help="Limit results for better performance"
)
return filters
def apply_filters(df: pd.DataFrame, filters: Dict[str, Any]) -> pd.DataFrame:
"""
Apply all filters to dataframe using vectorized operations.
Args:
df (pd.DataFrame): Input dataframe to filter
filters (Dict[str, Any]): Dictionary containing filter parameters:
- themes (list): List of themes to match exactly
- source_common_name (str): Partial match for source name
- document_identifier (str): Partial match for document ID
- sharing_image (str): Partial match for image URL
- date_range (tuple): (start_date, end_date) tuple
- record_limit (int): Maximum number of records to return
Returns:
pd.DataFrame: Filtered dataframe
"""
filtered_df = df.copy()
# Theme exact match filter - set regex groups to be non-capturing using (?:) syntax
if filters["themes"]:
pattern = r'(?:^|;)(?:{})(?:$|,|;)'.format('|'.join(map(re.escape, filters["themes"])))
filtered_df = filtered_df[filtered_df["V2EnhancedThemes"].str.contains(pattern, na=False)]
# Text partial match filters
text_columns = {
"source_common_name": "SourceCommonName",
"document_identifier": "DocumentIdentifier",
"sharing_image": "V2.1SharingImage"
}
for filter_key, col_name in text_columns.items():
if value := filters.get(filter_key):
filtered_df = filtered_df[
filtered_df[col_name]
.str.contains(re.escape(value), case=False, na=False)
]
# Date range filter with validation
if len(filters["date_range"]) == 2:
start_date, end_date = filters["date_range"]
# Validate date range
if start_date > end_date:
st.error("Start date must be before end date")
return filtered_df
date_col = "DATE"
try:
# Convert full datetime strings to datetime objects using constant
date_series = pd.to_datetime(filtered_df[date_col], format=FULL_DATE_FORMAT)
# Create timestamps for start/end of day
start_timestamp = pd.Timestamp(start_date).normalize() # Start of day
end_timestamp = pd.Timestamp(end_date) + pd.Timedelta(days=1) - pd.Timedelta(seconds=1) # End of day
filtered_df = filtered_df[
(date_series >= start_timestamp) &
(date_series <= end_timestamp)
]
except Exception as e:
st.error(f"Error applying date filter: {str(e)}")
return filtered_df
# Apply record limit
return filtered_df.head(filters["record_limit"])
def main():
"""Main application entry point."""
df = load_data()
if df.empty:
st.warning("No data available - check data source")
return
initialize_app(df)
filters = create_filters(df)
filtered_df = apply_filters(df, filters)
# Display results
st.subheader(f"Results: {len(filtered_df)} records")
st.dataframe(filtered_df, use_container_width=True)
st.download_button(
label="Download CSV",
data=filtered_df.to_csv(index=False).encode(),
file_name="filtered_results.csv",
mime="text/csv",
help="Download filtered results as CSV"
)
main()