app.py 2.3 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889
  1. import os
  2. from datetime import timedelta
  3. from dotenv import load_dotenv
  4. from sqlalchemy import create_engine
  5. from sqlalchemy.orm import sessionmaker, Session
  6. from fastapi import FastAPI, HTTPException, Depends
  7. from werkzeug.security import generate_password_hash, check_password_hash
  8. from models.Base import Base
  9. from models.User import User
  10. from models.Message import Message
  11. from models.Conversation import Conversation
  12. from utils.validators import UserCreate, UserLogin, Token
  13. from utils.auth import create_access_token
  14. ACCESS_TOKEN_EXPIRE_MINUTES = 30 # Token expiration time
  15. load_dotenv()
  16. app = FastAPI()
  17. engine = create_engine(os.environ["DB_URL"])
  18. SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
  19. Base.metadata.create_all(bind=engine)
  20. # Dependency to get the SQLAlchemy session
  21. def get_db():
  22. db = SessionLocal()
  23. try:
  24. yield db
  25. finally:
  26. db.close()
  27. @app.post("/register", response_model=dict)
  28. def register(user: UserCreate, db: Session = Depends(get_db)):
  29. existing_user = db.query(User).filter(User.email == user.email).first()
  30. if existing_user:
  31. raise HTTPException(
  32. status_code=400,
  33. detail="The provided email is already taken.",
  34. )
  35. hashed_password = generate_password_hash(user.password)
  36. new_user = User(
  37. email=user.email,
  38. password_hash=hashed_password,
  39. )
  40. db.add(new_user)
  41. db.commit()
  42. db.refresh(new_user)
  43. return {"message": "User registered successfully."}
  44. @app.post("/login", response_model=Token)
  45. def login(user: UserLogin, db: Session = Depends(get_db)):
  46. # Check if the user exists
  47. db_user = db.query(User).filter(User.email == user.email).first()
  48. if not db_user or not check_password_hash(db_user.password_hash, user.password):
  49. raise HTTPException(status_code=401, detail="Invalid email or password")
  50. # Create a token
  51. access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
  52. access_token = create_access_token(
  53. data={
  54. "email": db_user.email,
  55. "userId": db_user.id,
  56. },
  57. expires_delta=access_token_expires,
  58. )
  59. return {"access_token": access_token, "token_type": "bearer"}
  60. @app.get("/chats")
  61. def chats():
  62. pass
  63. if __name__ == "__main__":
  64. import uvicorn
  65. uvicorn.run(app)