tts_labeling / utils /sentry_integration.py
vargha's picture
sentry intergration
86cf81a
# utils/sentry_integration.py
import os
import sentry_sdk
from sentry_sdk.integrations.sqlalchemy import SqlalchemyIntegration
from sentry_sdk.integrations.logging import LoggingIntegration
from sentry_sdk.integrations.threading import ThreadingIntegration
from utils.logger import Logger
log = Logger()
def initialize_sentry():
"""
Initialize Sentry for error tracking and performance monitoring
"""
sentry_dsn = os.environ.get("SENTRY_DSN")
if not sentry_dsn:
log.info("Sentry DSN not configured, skipping Sentry initialization")
return False
# try:
# Environment configuration
environment = os.environ.get("SENTRY_ENVIRONMENT", "development")
traces_sample_rate = float(os.environ.get("SENTRY_TRACES_SAMPLE_RATE", "0.1"))
profiles_sample_rate = float(os.environ.get("SENTRY_PROFILES_SAMPLE_RATE", "0.1"))
# Logging integration - capture INFO and above
logging_integration = LoggingIntegration(
level=None, # Don't capture logs below this level
event_level=None # Send logs as events above this level
)
# SQLAlchemy integration for database monitoring
sqlalchemy_integration = SqlalchemyIntegration()
# Threading integration for multi-threaded apps
threading_integration = ThreadingIntegration(propagate_hub=True)
sentry_sdk.init(
dsn=sentry_dsn,
environment=environment,
traces_sample_rate=traces_sample_rate,
profiles_sample_rate=profiles_sample_rate,
integrations=[
logging_integration,
sqlalchemy_integration,
threading_integration,
],
# Additional configuration
send_default_pii=True, # Don't send personally identifiable information
attach_stacktrace=True, # Attach stack traces to messages
before_send=before_send_filter, # Custom filter function
release=get_app_version(), # App version for release tracking
)
log.info(f"Sentry initialized successfully for environment: {environment}")
return True
# except Exception as e:
# log.error(f"Failed to initialize Sentry: {e}")
# return False
def before_send_filter(event, hint):
"""
Filter function to modify or drop events before sending to Sentry
"""
# Don't send events for certain error types
if 'exc_info' in hint:
exc_type, exc_value, tb = hint['exc_info']
# Skip common/expected errors
if isinstance(exc_value, (KeyboardInterrupt, SystemExit)):
return None
# Skip database connection timeouts in development
if "connect_timeout" in str(exc_value) and os.environ.get("SENTRY_ENVIRONMENT") == "development":
return None
# Add custom tags
event.setdefault('tags', {})
event['tags']['component'] = 'tts_labeling'
# Add user context if available (without PII)
if 'user' not in event:
event['user'] = {
'id': 'anonymous', # Don't use real user IDs
}
return event
def get_app_version():
"""
Get the application version for release tracking
"""
# try:
# Try to get version from git
import subprocess
result = subprocess.run(
['git', 'rev-parse', '--short', 'HEAD'],
capture_output=True,
text=True,
cwd=os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
)
if result.returncode == 0:
return result.stdout.strip()
# except:
# pass
# Fallback to a default version
return "unknown"
def capture_custom_event(message, level="info", extra=None, tags=None):
"""
Capture custom events to Sentry with structured data
Args:
message (str): Event message
level (str): Event level (debug, info, warning, error, fatal)
extra (dict): Additional context data
tags (dict): Tags for filtering/grouping
"""
# try:
with sentry_sdk.configure_scope() as scope:
if extra:
for key, value in extra.items():
scope.set_extra(key, value)
if tags:
for key, value in tags.items():
scope.set_tag(key, value)
sentry_sdk.capture_message(message, level=level)
# except Exception as e:
# log.error(f"Failed to capture custom Sentry event: {e}")
def capture_annotation_event(action, user_id=None, annotation_id=None, tts_id=None, success=True):
"""
Capture annotation-specific events for analytics
Args:
action (str): Action performed (create, update, delete, review_approve, review_reject)
user_id (int): User ID (anonymized)
annotation_id (int): Annotation ID
tts_id (int): TTS data ID
success (bool): Whether the action was successful
"""
# try:
tags = {
'action_type': action,
'success': str(success),
'component': 'annotation'
}
extra = {}
if annotation_id:
extra['annotation_id'] = annotation_id
if tts_id:
extra['tts_id'] = tts_id
# Anonymize user ID for privacy
if user_id:
extra['user_hash'] = str(hash(str(user_id))) # Simple hash for privacy
message = f"Annotation {action}: {'success' if success else 'failed'}"
capture_custom_event(
message=message,
level="info" if success else "warning",
extra=extra,
tags=tags
)
# except Exception as e:
# log.error(f"Failed to capture annotation event: {e}")
def capture_database_performance(operation, duration, table=None, success=True):
"""
Capture database performance metrics
Args:
operation (str): Database operation (select, insert, update, delete)
duration (float): Operation duration in seconds
table (str): Table name
success (bool): Whether operation was successful
"""
# try:
tags = {
'db_operation': operation,
'success': str(success),
'component': 'database'
}
if table:
tags['table'] = table
extra = {
'duration_seconds': duration,
'performance_category': 'slow' if duration > 2.0 else 'normal'
}
level = "warning" if duration > 5.0 else "info"
message = f"Database {operation} took {duration:.2f}s"
capture_custom_event(
message=message,
level=level,
extra=extra,
tags=tags
)
# except Exception as e:
# log.error(f"Failed to capture database performance event: {e}")
def capture_user_activity(activity, user_id=None, session_duration=None, items_processed=None):
"""
Capture user activity metrics
Args:
activity (str): Activity type (login, logout, annotation_session)
user_id (int): User ID (will be anonymized)
session_duration (float): Session duration in seconds
items_processed (int): Number of items processed
"""
# try:
tags = {
'activity_type': activity,
'component': 'user_activity'
}
extra = {}
if session_duration:
extra['session_duration_seconds'] = session_duration
if items_processed:
extra['items_processed'] = items_processed
# Anonymize user ID
if user_id:
extra['user_hash'] = str(hash(str(user_id)))
message = f"User {activity}"
capture_custom_event(
message=message,
level="info",
extra=extra,
tags=tags
)
# except Exception as e:
# log.error(f"Failed to capture user activity event: {e}")
# Context manager for capturing performance
class SentryPerformanceMonitor:
"""
Context manager for monitoring operation performance
"""
def __init__(self, operation_name, tags=None):
self.operation_name = operation_name
self.tags = tags or {}
self.start_time = None
def __enter__(self):
import time
self.start_time = time.time()
return self
def __exit__(self, exc_type, exc_val, exc_tb):
import time
duration = time.time() - self.start_time
success = exc_type is None
# Update tags
self.tags.update({
'operation': self.operation_name,
'success': str(success)
})
extra = {
'duration_seconds': duration,
'operation_name': self.operation_name
}
level = "error" if not success else ("warning" if duration > 5.0 else "info")
message = f"Operation '{self.operation_name}' completed in {duration:.2f}s"
capture_custom_event(
message=message,
level=level,
extra=extra,
tags=self.tags
)
# Don't suppress exceptions
return False