meisaicheck-api / auth.py
vumichien's picture
fix authen
ea1dcd3
raw
history blame
3.02 kB
from datetime import datetime, timedelta, timezone
import jwt
from fastapi import Depends, HTTPException, status
from fastapi.security import OAuth2PasswordBearer
from passlib.context import CryptContext
from config import SECRET_KEY, ALGORITHM, ACCESS_TOKEN_EXPIRE_HOURS
from models import TokenData, UserInDB, User
from database import get_users
from typing import Annotated, Optional
from jwt.exceptions import InvalidTokenError
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="/token", auto_error=True)
# Authentication helper functions
def verify_password(plain_password, hashed_password):
return pwd_context.verify(plain_password, hashed_password)
def get_user(db, username: str):
if username in db:
user_dict = db[username]
return UserInDB(**user_dict)
return None
def authenticate_user(fake_db, username: str, password: str):
user = get_user(fake_db, username)
if not user:
return False
if not verify_password(password, user.hashed_password):
return False
return user
def create_access_token(data: dict, expires_delta: Optional[timedelta] = None):
to_encode = data.copy()
if expires_delta:
expire = datetime.now(timezone.utc) + expires_delta
else:
expire = datetime.now(timezone.utc) + timedelta(hours=ACCESS_TOKEN_EXPIRE_HOURS)
to_encode.update({"exp": expire})
encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
return encoded_jwt
async def get_current_user(token: Annotated[str, Depends(oauth2_scheme)]):
credentials_exception = HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Could not validate credentials",
headers={"WWW-Authenticate": "Bearer"},
)
try:
print(f"Decoding token: {token[:10]}...")
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
print(f"Token payload: {payload}")
username = payload.get("sub")
if username is None:
print("Username not found in token payload")
raise credentials_exception
token_data = TokenData(username=username)
print(f"Token data: {token_data}")
except InvalidTokenError as e:
print(f"Token validation error: {str(e)}")
raise credentials_exception
except Exception as e:
print(f"Unexpected error during token validation: {str(e)}")
raise credentials_exception
users = get_users()
print(f"Available users: {list(users.keys())}")
user = get_user(users, username=token_data.username)
if user is None:
print(f"User not found: {token_data.username}")
raise credentials_exception
print(f"User authenticated: {user.username}")
return user
async def get_current_active_user(
current_user: Annotated[User, Depends(get_current_user)],
):
if current_user.disabled:
raise HTTPException(status_code=400, detail="Inactive user")
return current_user