본문 바로가기

Programming👩🏻‍💻

[FastAPI] - SQLAlchemy, Postgresql 데이터 베이스 연동 시도 과정

 

튜토리얼 개념으로 코드에 적용하는 것은 또 생소해져버려서..(나만 그런지도)

이것저것 보면서 연동된 과정들을 기록 해보려고 한다. 

 

 

 

1. 데이터베이스 연결 설정

우선 데이터베이스 연결 파일인 database.py를 생성하였다.

 

SQLAlchemy를 db에 연결하기 위해서 엔진,세션, ORM과 맵핑할 모델 기본 클래스를 생성해줘야한다.

 

# database.py
from sqlalchemy import create_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker

# 데이터베이스 URL 설정
# format: "postgresql://사용자이름:비밀번호@호스트:포트번호/데이터베이스이름"
DATABASE_URL = f"postgresql://{os.getenv('POSTGRES_USER')}:{os.getenv('POSTGRES_PASSWORD')}@{os.getenv('POSTGRES_HOST')}:{os.getenv('POSTGRES_PORT')}/{os.getenv('POSTGRES_DB')}"

# 데이터베이스 엔진 생성
engine = create_engine(DATABASE_URL, echo=True) # echo=True : SQL 쿼리 출력

# 세션 생성 클래스 설정
# autocommit=False: 수동으로 커밋해야 변경사항이 저장됨
# autoflush=False: 수동으로 flush 해야 데이터가 DB에 반영됨
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)

# 모델 기본 클래스 생성
Base = declarative_base()

 

 

1.1 데이터베이스 엔진 생성  _ create_engine 

엔진은 DB와 실제 연결을 관리하고 데이터베이스 통신을 담당한다.

 

- 데이터베이스 URL을 통해 연결 설정

- SQL 쿼리 문자열을 DBAPI와 호환되도록 변환하여 데이터베이스로 전달

- Connection Pool(연결 풀)을 관리하여 효율적인 데이터베이스 연결 사용

*Connection Pool은 대략 애플리케이션과 DB 연결을 효율적으로 관리하기 위한 구성요소라고 생각하고 넘어가면 될 것 같다.

 

engine = create_engine(DATABASE_URL, echo=True)

'echo=True'는 디버깅 목적으로 실행하는 모든 쿼리를 출력한다. 개발 환경에서만 켜두는걸 권장한다.

 

1.2 세션 설정 _ SessionLocal

트랜젝션 및 상태 관리 역할로 데이터 CRUD, ORM 객체 상태 추적과 데이터베이스에 반영한다.

엔젠을 사용해서 데이터베이스와 연결한다.

SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)

 

1.3 ORM(Base)

Python 클래스의 데이터베이스 테이블 맵핑을 해준다.

SQLAIchemy 모델 클래스를 정의할 때 상속 받는다. (아래에 내용 나옴)

Base = declarative_base()

 

 

1.4 DB 연결 확인

이렇게 database 연동을 정의 해두고 잘 되었는지, db test를 해보았다.

# main.py

def get_db():
    db = Session()
    try:
        yield db
    finally:
        db.close()


@app.get("/test-db")
async def test_db_connection(db: Annotated[Session, Depends(get_db)]):
    try:
        result = db.execute(text("SELECT 1"))
        return {"db_status": "connected", "result": result}
    except Exception as e:
        logger.error(f"Database connection failed: {e}")
        raise HTTPException(
            status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
            detail="Database connection failed",
        )

 

 

 

이제 아래 처리 흐름을 생각하면서 코드를 빌드업한다.

(틀렸으면 댓글 주시면 됩니다)

 

엔드포인트 실행: /register 호출 → JSON 데이터 전달
⬇️
Pydantic 검증: UserCreate를 통해 데이터 구조화 및 유효성 검사
⬇️

SQLAlchemy 모델 생성: User 객체로 변환 및 데이터 추가
⬇️
데이터베이스 처리: 세션(Session)을 통해 데이터를 데이터베이스에 저장.

 

 

2. Pydantic 검증

Pydantic 모델 주로 데이터 스키마를 정의하는 데 사용된다.

API 데이터에 대한 검증 규칙을 정의하고, 요청과 응답 데이터를 직렬/역직렬화를 한다.

# schema.py

from pydantic import BaseModel, Field

# 유저 공통 데이터 스키마
class UserBase(BaseModel):
    username: str
    disabled: bool = False  # 기본값 설정

    class Config:
        orm_mode = True  # SQLAlchemy 모델과 호환되도록 설정


# 유저 생성 시 요청 스키마
class UserCreate(UserBase):
    password: str
    class Config:
        orm_mode = True  # SQLAlchemy 모델과 호환 설정

BaseModel은 Pydantic에서 제공하는 기본 클래스로, 데이터 검증과 직렬화/역직렬화 기능을 제공한다

즉, BaseModel을 상속받은 모든 클래스는 입력 데이터 검증, 데이터 구조화, 타입 변환을 해준다.

 

3. 데이터 베이스 처리를 위한 SQLAlchemy 모델 생성

sqlalchemy model을 생성하여 데이터 베이스의 테이블 구조와 맵핑한다.

# models.py

from sqlalchemy import Column, Integer, String, Boolean
from database import Base

class User(Base):
    __tablename__ = 'users'

    id = Column(Integer, primary_key=True, index=True)
    username = Column(String, unique=True, index=True, nullable=False)
    password = Column(String, nullable=False)  # 해싱된 비밀번호
    disabled = Column(Boolean, default=False)

 

 

4. 회원가입 기능

service 계층에 유저 회원가입 기능을 정의하였다.

# services.py

class UserService:
    def __init__(self):
        self.pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")


    def hash_password(self, password: str) -> str:
        """
        비밀번호를 해싱하여 반환
        """
        return self.pwd_context.hash(password)

    def verify_password(self, plain_password: str, hashed_password: str) -> bool:
        """
        입력된 비밀번호가 저장된 해시와 일치하는지 확인
        """
        return self.pwd_context.verify(plain_password, hashed_password)

    def register_user(self, user: UserCreate, db: Session):
        existing_user = db.query(User).filter(User.username == user.username).first()
        if existing_user:
            raise HTTPException(
                status_code=status.HTTP_400_BAD_REQUEST,
                detail="Username already registered",
            )

        # 비밀번호 해싱처리
        hashed_password = self.hash_password(user.password)

        # User 객체 생성 데이터베이스 추가
        new_user = User(username=user.username, password=hashed_password)
        db.add(new_user)
        db.commit()
        db.refresh(new_user)

        return {"message": f"User {new_user.username} registered"}

 

 

4.1 SQLAIchemy

existing_user에 등장하는 SQLAIchemy 쿼리 문법을 간단하게 짚고 가면 좋을 것 같다.

existing_user = db.query(User).filter(User.username == user.username).first()

 

db.query(User)

의미: User 테이블(모델)에서 데이터를 조회하겠다는 의미

dbSession 객체로 데이터베이스와의 연결

UserSQLAlchemy로 정의된 테이블 모델

 

 .filter(User.username == user.username)

의미: User 테이블의 username 필드가 특정 값(user.username)과 동일한 레코드를 필터링(검색)합니다.

SQL로 쿼리로는 'SELECT * FROM users WHERE username = '입력된_값' 와 같다고 보면 된다

 

.first()

의미: 조회된 결과 중 하나만 반환. 여러 개의 결과를 반환하려면 .all()

결과가 없으면 None을 반환

SQL로 쿼리로는 'SELECT * FROM users WHERE username = '입력된_값' LIMIT 1;'

 

SQLAlchemy 문법 SQL 문법
db.query(User).filter(User.username == "john") SELECT * FROM users WHERE username = 'john';
db.query(User).filter(User.age > 18).all() SELECT * FROM users WHERE age > 18;
db.query(User).filter(User.id == 1).first() SELECT * FROM users WHERE id = 1 LIMIT 1;

 

5. API endpoint 정의

"/register" 경로로 회원가입 기능을 실행한다.

코드가 왜이렇게 더러워보이지.. 에흐.

from typing import Annotated

from fastapi import FastAPI, HTTPException, Depends, status

from services import UserService
from models import User
from database import Session
from schema import UserCreate
from database import init_db

# main.py

from sqlalchemy import text

import logging


init_db()
app = FastAPI()
user_service = UserService()

logging.basicConfig(
    level=logging.INFO,  # 로그 레벨: DEBUG, INFO, WARNING, ERROR, CRITICAL 중 선택
    format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
)
logger = logging.getLogger(__name__)


@app.post(
    "/register",
    status_code=status.HTTP_201_CREATED,
    responses={
        201: {"description": "User created"},
    },
)
async def register_user(user: UserCreate, db: Session = Depends(get_db)):
    return user_service.register_user(user, db)

 

 

아직 부족한게 많긴한데.. 그래도 계속하다보면 좋아지겠지

'Programming👩🏻‍💻' 카테고리의 다른 글

[SQLAlchemy] - 개념정리 및 입문하기  (0) 2024.12.15
Rate Limiter 이해하기  (0) 2024.11.22
[Swagger] drf-spectacular로 REST API 문서 자동 생성  (1) 2024.10.11
React 벼락치기  (1) 2024.09.22
MAVEN  (0) 2021.12.03