|
import streamlit as st |
|
import pandas as pd |
|
from datasets import load_dataset |
|
import re |
|
from datetime import datetime, date |
|
from io import StringIO |
|
from typing import Optional, Tuple, List, Dict, Any |
|
|
|
|
|
DEFAULT_SAMPLE_SIZE = 1000 |
|
DATE_FORMAT = "%Y%m%d" |
|
FULL_DATE_FORMAT = f"{DATE_FORMAT}%H%M%S" |
|
|
|
|
|
@st.cache_data(ttl=3600, show_spinner="Loading dataset...") |
|
def load_data(sample_size: int = DEFAULT_SAMPLE_SIZE) -> pd.DataFrame: |
|
""" |
|
Load and validate dataset with error handling. |
|
|
|
Args: |
|
sample_size (int): Number of records to load |
|
|
|
Returns: |
|
pd.DataFrame: Loaded and validated dataframe |
|
""" |
|
try: |
|
dataset = load_dataset( |
|
"dwb2023/gdelt-gkg-2025-v2", |
|
data_files={ |
|
"train": [ |
|
"gdelt_gkg_20250210.parquet", |
|
"gdelt_gkg_20250211.parquet", |
|
] |
|
}, |
|
split="train" |
|
) |
|
df = pd.DataFrame(dataset) |
|
|
|
|
|
if df.empty: |
|
st.error("Loaded dataset is empty") |
|
return pd.DataFrame() |
|
|
|
if "DATE" not in df.columns: |
|
st.error("Dataset missing required DATE column") |
|
return pd.DataFrame() |
|
|
|
return df |
|
|
|
except Exception as e: |
|
st.error(f"Error loading dataset: {str(e)}") |
|
st.stop() |
|
return pd.DataFrame() |
|
|
|
def initialize_app(df: pd.DataFrame) -> None: |
|
"""Initialize the Streamlit app interface.""" |
|
st.title("GDELT GKG 2025 Dataset Explorer") |
|
|
|
with st.sidebar: |
|
st.header("Search Criteria") |
|
st.markdown("🔍 Filter dataset using the controls below") |
|
|
|
def extract_unique_themes(df: pd.DataFrame, column: str) -> List[str]: |
|
""" |
|
Extract and clean unique themes from semicolon-separated column. |
|
|
|
Args: |
|
df (pd.DataFrame): Input dataframe |
|
column (str): Column name containing themes |
|
|
|
Returns: |
|
List[str]: Sorted list of unique themes |
|
""" |
|
if df.empty: |
|
return [] |
|
|
|
return sorted({ |
|
theme.split(",")[0].strip() |
|
for themes in df[column].dropna().str.split(";") |
|
for theme in themes if theme.strip() |
|
}) |
|
|
|
def get_date_range(df: pd.DataFrame, date_col: str) -> Tuple[date, date]: |
|
""" |
|
Get min/max dates from dataset with fallback defaults. |
|
|
|
Args: |
|
df (pd.DataFrame): Input dataframe |
|
date_col (str): Column name containing dates |
|
|
|
Returns: |
|
Tuple[date, date]: (min_date, max_date) as date objects |
|
""" |
|
try: |
|
|
|
dates = pd.to_datetime(df[date_col], format=FULL_DATE_FORMAT) |
|
return dates.min().date(), dates.max().date() |
|
except Exception as e: |
|
st.warning(f"Date range detection failed: {str(e)}") |
|
return datetime(2025, 2, 10).date(), datetime(2025, 2, 11).date() |
|
|
|
def create_filters(df: pd.DataFrame) -> Dict[str, Any]: |
|
""" |
|
Generate sidebar filters and return filter state. |
|
|
|
Args: |
|
df (pd.DataFrame): Input dataframe |
|
|
|
Returns: |
|
Dict[str, Any]: Dictionary of filter settings |
|
""" |
|
filters = {} |
|
|
|
with st.sidebar: |
|
|
|
filters["themes"] = st.multiselect( |
|
"V2EnhancedThemes (exact match)", |
|
options=extract_unique_themes(df, "V2EnhancedThemes"), |
|
help="Select exact themes to include (supports multiple selection)" |
|
) |
|
|
|
|
|
text_filters = { |
|
"source_common_name": ("SourceCommonName", "partial name match"), |
|
"document_identifier": ("DocumentIdentifier", "partial identifier match"), |
|
"sharing_image": ("V2.1SharingImage", "partial image URL match") |
|
} |
|
|
|
for key, (label, help_text) in text_filters.items(): |
|
filters[key] = st.text_input( |
|
f"{label} ({help_text})", |
|
placeholder=f"Enter {help_text}...", |
|
help=f"Case-insensitive {help_text}" |
|
) |
|
|
|
|
|
date_col = "DATE" |
|
min_date, max_date = get_date_range(df, date_col) |
|
|
|
filters["date_range"] = st.date_input( |
|
"Date range", |
|
value=(min_date, max_date), |
|
min_value=min_date, |
|
max_value=max_date, |
|
) |
|
|
|
|
|
filters["record_limit"] = st.number_input( |
|
"Max records to display", |
|
min_value=100, |
|
max_value=5000, |
|
value=1000, |
|
step=100, |
|
help="Limit results for better performance" |
|
) |
|
|
|
return filters |
|
|
|
def apply_filters(df: pd.DataFrame, filters: Dict[str, Any]) -> pd.DataFrame: |
|
""" |
|
Apply all filters to dataframe using vectorized operations. |
|
|
|
Args: |
|
df (pd.DataFrame): Input dataframe to filter |
|
filters (Dict[str, Any]): Dictionary containing filter parameters: |
|
- themes (list): List of themes to match exactly |
|
- source_common_name (str): Partial match for source name |
|
- document_identifier (str): Partial match for document ID |
|
- sharing_image (str): Partial match for image URL |
|
- date_range (tuple): (start_date, end_date) tuple |
|
- record_limit (int): Maximum number of records to return |
|
|
|
Returns: |
|
pd.DataFrame: Filtered dataframe |
|
""" |
|
filtered_df = df.copy() |
|
|
|
|
|
if filters["themes"]: |
|
pattern = r'(?:^|;)(?:{})(?:$|,|;)'.format('|'.join(map(re.escape, filters["themes"]))) |
|
filtered_df = filtered_df[filtered_df["V2EnhancedThemes"].str.contains(pattern, na=False)] |
|
|
|
|
|
text_columns = { |
|
"source_common_name": "SourceCommonName", |
|
"document_identifier": "DocumentIdentifier", |
|
"sharing_image": "V2.1SharingImage" |
|
} |
|
|
|
for filter_key, col_name in text_columns.items(): |
|
if value := filters.get(filter_key): |
|
filtered_df = filtered_df[ |
|
filtered_df[col_name] |
|
.str.contains(re.escape(value), case=False, na=False) |
|
] |
|
|
|
|
|
if len(filters["date_range"]) == 2: |
|
start_date, end_date = filters["date_range"] |
|
|
|
|
|
if start_date > end_date: |
|
st.error("Start date must be before end date") |
|
return filtered_df |
|
|
|
date_col = "DATE" |
|
try: |
|
|
|
date_series = pd.to_datetime(filtered_df[date_col], format=FULL_DATE_FORMAT) |
|
|
|
|
|
start_timestamp = pd.Timestamp(start_date).normalize() |
|
end_timestamp = pd.Timestamp(end_date) + pd.Timedelta(days=1) - pd.Timedelta(seconds=1) |
|
|
|
filtered_df = filtered_df[ |
|
(date_series >= start_timestamp) & |
|
(date_series <= end_timestamp) |
|
] |
|
except Exception as e: |
|
st.error(f"Error applying date filter: {str(e)}") |
|
return filtered_df |
|
|
|
|
|
return filtered_df.head(filters["record_limit"]) |
|
|
|
def main(): |
|
"""Main application entry point.""" |
|
df = load_data() |
|
if df.empty: |
|
st.warning("No data available - check data source") |
|
return |
|
|
|
initialize_app(df) |
|
filters = create_filters(df) |
|
filtered_df = apply_filters(df, filters) |
|
|
|
|
|
st.subheader(f"Results: {len(filtered_df)} records") |
|
|
|
st.dataframe(filtered_df, use_container_width=True) |
|
|
|
st.download_button( |
|
label="Download CSV", |
|
data=filtered_df.to_csv(index=False).encode(), |
|
file_name="filtered_results.csv", |
|
mime="text/csv", |
|
help="Download filtered results as CSV" |
|
) |
|
|
|
main() |
|
|