openalex_mapper / openalex_utils.py
m7n's picture
Many updates, mainly added categorical.
f895c88
import numpy as np
from urllib.parse import urlparse, parse_qs
from pyalex import Works, Authors, Institutions
import pandas as pd
import ast, json
def openalex_url_to_pyalex_query(url):
"""
Convert an OpenAlex search URL to a pyalex query.
Args:
url (str): The OpenAlex search URL.
Returns:
tuple: (Works object, dict of parameters)
"""
parsed_url = urlparse(url)
query_params = parse_qs(parsed_url.query)
# Initialize the Works object
query = Works()
# Handle filters
if 'filter' in query_params:
filters = query_params['filter'][0].split(',')
for f in filters:
if ':' in f:
key, value = f.split(':', 1)
if key == 'default.search':
query = query.search(value)
else:
query = query.filter(**{key: value})
# Handle sort - Fixed to properly handle field:direction format
if 'sort' in query_params:
sort_params = query_params['sort'][0].split(',')
for s in sort_params:
if ':' in s: # Handle field:direction format
field, direction = s.split(':')
query = query.sort(**{field: direction})
elif s.startswith('-'): # Handle -field format
query = query.sort(**{s[1:]: 'desc'})
else: # Handle field format
query = query.sort(**{s: 'asc'})
# Handle other parameters
params = {}
for key in ['page', 'per-page', 'sample', 'seed']:
if key in query_params:
params[key] = query_params[key][0]
return query, params
def invert_abstract(inv_index):
"""Reconstruct abstract from OpenAlex' inverted-index.
Handles dicts, JSON / repr strings, or missing values gracefully.
"""
# Try to coerce a string into a Python object first
if isinstance(inv_index, str):
try:
inv_index = json.loads(inv_index) # double-quoted JSON
except Exception:
try:
inv_index = ast.literal_eval(inv_index) # single-quoted repr
except Exception:
inv_index = None
if isinstance(inv_index, dict):
l_inv = [(w, p) for w, pos in inv_index.items() for p in pos]
return " ".join(w for w, _ in sorted(l_inv, key=lambda x: x[1]))
else:
return " "
def get_pub(x):
"""Extract publication name from record."""
try:
source = x['source']['display_name']
if source not in ['parsed_publication','Deleted Journal']:
return source
else:
return ' '
except:
return ' '
def get_field(x):
"""Extract academic field from record."""
try:
field = x['primary_topic']['subfield']['display_name']
if field is not None:
return field
else:
return np.nan
except:
return np.nan
def process_records_to_df(records):
"""
Convert OpenAlex records to a pandas DataFrame with processed fields.
Can handle either raw OpenAlex records or an existing DataFrame.
Args:
records (list or pd.DataFrame): List of OpenAlex record dictionaries or existing DataFrame
Returns:
pandas.DataFrame: Processed DataFrame with abstracts, publications, and titles
"""
# If records is already a DataFrame, use it directly
if isinstance(records, pd.DataFrame):
records_df = records.copy()
# Only process abstract_inverted_index and primary_location if they exist
if 'abstract_inverted_index' in records_df.columns:
records_df['abstract'] = [invert_abstract(t) for t in records_df['abstract_inverted_index']]
if 'primary_location' in records_df.columns:
records_df['parsed_publication'] = [get_pub(x) for x in records_df['primary_location']]
records_df['parsed_publication'] = records_df['parsed_publication'].fillna(' ') # fill missing values with space, only if we have them.
else:
# Process raw records as before
records_df = pd.DataFrame(records)
records_df['abstract'] = [invert_abstract(t) for t in records_df['abstract_inverted_index']]
records_df['parsed_publication'] = [get_pub(x) for x in records_df['primary_location']]
records_df['parsed_publication'] = records_df['parsed_publication'].fillna(' ')
# Fill missing values and deduplicate
records_df['abstract'] = records_df['abstract'].fillna(' ')
records_df['title'] = records_df['title'].fillna(' ')
records_df = records_df.drop_duplicates(subset=['id']).reset_index(drop=True)
return records_df
def openalex_url_to_filename(url):
"""
Convert an OpenAlex URL to a filename-safe string with timestamp.
Args:
url (str): The OpenAlex search URL
Returns:
str: A filename-safe string with timestamp (without extension)
"""
from datetime import datetime
import re
# First parse the URL into query and params
parsed_url = urlparse(url)
query_params = parse_qs(parsed_url.query)
# Create parts of the filename
parts = []
# Handle filters
if 'filter' in query_params:
filters = query_params['filter'][0].split(',')
for f in filters:
if ':' in f:
key, value = f.split(':', 1)
# Replace dots with underscores and clean the value
key = key.replace('.', '_')
# Clean the value to be filename-safe and add spaces around words
clean_value = re.sub(r'[^\w\s-]', '', value)
# Replace multiple spaces with single space and strip
clean_value = ' '.join(clean_value.split())
# Replace spaces with underscores for filename
clean_value = clean_value.replace(' ', '_')
if key == 'default_search':
parts.append(f"search_{clean_value}")
else:
parts.append(f"{key}_{clean_value}")
# Handle sort parameters
if 'sort' in query_params:
sort_params = query_params['sort'][0].split(',')
for s in sort_params:
if s.startswith('-'):
parts.append(f"sort_{s[1:].replace('.', '_')}_desc")
else:
parts.append(f"sort_{s.replace('.', '_')}_asc")
# Add timestamp
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
# Combine all parts
filename = '__'.join(parts) if parts else 'openalex_query'
filename = f"{filename}__{timestamp}"
# Ensure filename is not too long (max 255 chars is common filesystem limit)
if len(filename) > 255:
filename = filename[:251] # leave room for potential extension
return filename
def get_records_from_dois(doi_list, block_size=50):
"""
Download OpenAlex records for a list of DOIs in blocks.
Args:
doi_list (list): List of DOIs (strings)
block_size (int): Number of DOIs to fetch per request (default 50)
Returns:
pd.DataFrame: DataFrame of OpenAlex records
"""
from pyalex import Works
from tqdm import tqdm
all_records = []
for i in tqdm(range(0, len(doi_list), block_size)):
sublist = doi_list[i:i+block_size]
doi_str = "|".join(sublist)
try:
record_list = Works().filter(doi=doi_str).get(per_page=block_size)
all_records.extend(record_list)
except Exception as e:
print(f"Error fetching DOIs {sublist}: {e}")
return pd.DataFrame(all_records)
def openalex_url_to_readable_name(url):
"""
Convert an OpenAlex URL to a short, human-readable query description.
Args:
url (str): The OpenAlex search URL
Returns:
str: A short, human-readable description of the query
Examples:
- "Search: 'Kuramoto Model'"
- "Search: 'quantum physics', 2020-2023"
- "Cites: Popper (1959)"
- "From: University of Pittsburgh, 1999-2020"
- "By: Einstein, A., 1905-1955"
"""
import re
# Parse the URL
parsed_url = urlparse(url)
query_params = parse_qs(parsed_url.query)
# Initialize description parts
parts = []
year_range = None
# Handle filters
if 'filter' in query_params:
filters = query_params['filter'][0].split(',')
for f in filters:
if ':' not in f:
continue
key, value = f.split(':', 1)
try:
if key == 'default.search':
# Clean up search term (remove quotes if present)
search_term = value.strip('"\'')
parts.append(f"Search: '{search_term}'")
elif key == 'publication_year':
# Handle year ranges or single years
if '-' in value:
start_year, end_year = value.split('-')
year_range = f"{start_year}-{end_year}"
else:
year_range = value
elif key == 'cites':
# Look up the cited work to get author and year
work_id = value
try:
cited_work = Works()[work_id]
if cited_work:
# Get first author's last name
author_name = "Unknown"
year = "Unknown"
if cited_work.get('authorships') and len(cited_work['authorships']) > 0:
first_author = cited_work['authorships'][0]['author']
if first_author.get('display_name'):
# Extract last name (assuming "First Last" format)
name_parts = first_author['display_name'].split()
author_name = name_parts[-1] if name_parts else first_author['display_name']
if cited_work.get('publication_year'):
year = str(cited_work['publication_year'])
parts.append(f"Cites: {author_name} ({year})")
else:
parts.append(f"Cites: Work {work_id}")
except Exception as e:
print(f"Could not fetch cited work {work_id}: {e}")
parts.append(f"Cites: Work {work_id}")
elif key == 'authorships.institutions.lineage':
# Look up institution name
inst_id = value
try:
institution = Institutions()[inst_id]
if institution and institution.get('display_name'):
parts.append(f"From: {institution['display_name']}")
else:
parts.append(f"From: Institution {inst_id}")
except Exception as e:
print(f"Could not fetch institution {inst_id}: {e}")
parts.append(f"From: Institution {inst_id}")
elif key == 'authorships.author.id':
# Look up author name
author_id = value
try:
author = Authors()[author_id]
if author and author.get('display_name'):
parts.append(f"By: {author['display_name']}")
else:
parts.append(f"By: Author {author_id}")
except Exception as e:
print(f"Could not fetch author {author_id}: {e}")
parts.append(f"By: Author {author_id}")
elif key == 'type':
# Handle work types
type_mapping = {
'article': 'Articles',
'book': 'Books',
'book-chapter': 'Book Chapters',
'dissertation': 'Dissertations',
'preprint': 'Preprints'
}
work_type = type_mapping.get(value, value.replace('-', ' ').title())
parts.append(f"Type: {work_type}")
elif key == 'host_venue.id':
# Look up venue name
venue_id = value
try:
# For venues, we can use Works to get source info, but let's try a direct approach
# This might need adjustment based on pyalex API structure
parts.append(f"In: Venue {venue_id}") # Fallback
except Exception as e:
parts.append(f"In: Venue {venue_id}")
elif key.startswith('concepts.id'):
# Handle concept filters - these are topic/concept IDs
concept_id = value
parts.append(f"Topic: {concept_id}") # Could be enhanced with concept lookup
else:
# Generic handling for other filters
clean_key = key.replace('_', ' ').replace('.', ' ').title()
clean_value = value.replace('_', ' ')
parts.append(f"{clean_key}: {clean_value}")
except Exception as e:
print(f"Error processing filter {f}: {e}")
continue
# Combine parts into final description
if not parts:
description = "OpenAlex Query"
else:
description = ", ".join(parts)
# Add year range if present
if year_range:
if parts:
description += f", {year_range}"
else:
description = f"Works from {year_range}"
# Limit length to keep it readable
if len(description) > 100:
description = description[:97] + "..."
return description