FastAPI + LangGraph로 AI 생성 과정 SSE 스트리밍하기
SSE: LLM 스트리밍의 사실상 표준
Server-Sent Events(SSE)는 서버에서 클라이언트로 단방향 실시간 데이터를 전송하는 HTTP 기반 프로토콜이다. WebSocket과 달리 별도의 프로토콜 업그레이드 없이 표준 HTTP 위에서 동작하며, 브라우저의 EventSource API로 별도 라이브러리 없이 사용할 수 있다.
| 항목 | SSE | WebSocket |
|---|---|---|
| 통신 방향 | 서버 → 클라이언트 (단방향) | 양방향 |
| 프로토콜 | HTTP/HTTPS | ws:// / wss:// (프로토콜 업그레이드) |
| 자동 재연결 | 내장 (EventSource 자동 처리) | 직접 구현 필요 |
| 로드밸런서 | 표준 HTTP 인프라 그대로 | WebSocket 지원 설정 필요 |
| 브라우저 지원 | EventSource 내장 | WebSocket 내장 |
2025년 AI 생태계에서 SSE는 LLM 토큰 스트리밍의 사실상 표준이 되었다. OpenAI, Anthropic, Google 등 주요 AI API가 모두 SSE 기반 스트리밍을 제공한다. "사용자가 요청하면, 서버가 토큰을 흘려보낸다"는 단방향 패턴에 SSE가 정확히 맞기 때문이다. WebSocket의 양방향 기능은 LLM 스트리밍에서는 불필요한 복잡성이며, 커넥션 풀링, 재시도, 로드밸런싱 등 운영 부담이 크다.
이 글에서는 FastAPI의 sse-starlette과 LangGraph의 astream을 결합하여, 멀티 에이전트 워크플로우의 진행 상황을 실시간으로 전달하는 SSE 스트리밍 구조를 설계한 과정을 정리한다.
배경
디자인 템플릿 AI 생성 시스템의 워크플로우는 성공 시 ~17-22초, 재시도 포함 시 ~30초 이상 소요된다. 5개 에이전트(Orchestrator → Retriever → Analyzer → Generator → Validator)가 순차적으로 실행되는 구조이기 때문이다.
사용자에게 "로딩 중..." 하나만 보여주면 체감 대기 시간이 길다. 각 에이전트의 진행 상황을 실시간으로 보여주면 훨씬 나은 경험이 된다. WebSocket 대신 **SSE(Server-Sent Events)**를 선택한 이유는, 서버 → 클라이언트 단방향 스트리밍만 필요하고 구현이 단순하기 때문이다.
전체 아키텍처
Backend는 AI 서버의 SSE 스트림을 그대로 릴레이한다. 직접 이벤트를 생성하지 않고, AI 서버의 응답 body를 클라이언트에 투명하게 전달하는 구조다.
FastAPI 프로젝트 구조
src/ai_agent/
├── main.py # FastAPI 앱 팩토리
├── config.py # pydantic-settings 환경 설정
├── api/
│ ├── generate.py # POST /api/generate/stream (SSE)
│ ├── search.py # GET /api/search/similar
│ └── health.py # GET /health
├── schemas/
│ └── generate.py # Pydantic 요청/응답 모델
├── llm/ # LLM 클라이언트
├── agents/ # 에이전트 구현
├── db/ # DB 접근 (asyncpg)
└── workflow/ # LangGraph 워크플로우FastAPI를 선택한 이유:
- 비동기 네이티브: LLM API, DB 쿼리 등 I/O 작업을 async/await로 병렬 처리
- AI/ML 생태계: Python 기반으로 LangGraph, Anthropic SDK, OpenAI SDK와 자연스럽게 통합
- Pydantic 통합: 요청/응답 스키마의 자동 검증과 OpenAPI 문서 생성
SSE 스트리밍 엔드포인트
LangGraph의 astream(stream_mode="updates")과 sse-starlette을 결합한다.
import json
from fastapi import APIRouter
from sse_starlette.sse import EventSourceResponse
router = APIRouter()
NODE_TO_STAGE = {
"orchestrate": "orchestrating",
"retrieve": "retrieving",
"analyze": "analyzing",
"generate": "generating",
"validate": "validating",
}
@router.post("/api/generate/stream")
async def generate_stream(request: GenerateRequest, mock: bool = False):
async def event_generator():
compiled = build_graph().compile()
initial_state = {
"prompt": request.prompt,
"pod_type": None,
"retry_count": 0,
}
async for chunk in compiled.astream(
initial_state, stream_mode="updates"
):
for node_name, state_update in chunk.items():
stage = NODE_TO_STAGE.get(node_name)
if not stage:
continue
yield {
"data": json.dumps({
"type": "agent_complete",
"stage": stage,
})
}
# 최종 결과
yield {
"data": json.dumps({
"type": "result",
"data": {"xml": state_update.get("generated_xml", "")},
})
}
return EventSourceResponse(event_generator())astream(stream_mode="updates")는 각 노드 실행 완료 시마다 {노드이름: 상태업데이트} dict를 yield한다. 이를 SSE 이벤트로 변환하면 된다.
SSE 이벤트 설계
| 이벤트 | 설명 | 데이터 |
|---|---|---|
agent_start | 에이전트 단계 시작 | { stage: AgentStage } |
agent_complete | 에이전트 단계 완료 | { stage: AgentStage } |
result | 최종 결과 | { data: { xml: string } } |
error | 오류 발생 | { message: string } |
5단계 스테이지: orchestrating → retrieving → analyzing → generating → validating
정상 흐름
재시도 흐름
Validator가 실패하면 generate → validate 루프가 반복된다.
두 가지 수준의 스트리밍
이 시스템에는 두 가지 수준의 스트리밍이 존재한다.
| 수준 | 위치 | 방식 | 용도 |
|---|---|---|---|
| 토큰 스트리밍 | AI 서버 내부 | Claude messages.stream() | Generator XML 생성 시 토큰 단위 수신 |
| 에이전트 스트리밍 | AI 서버 → 클라이언트 | LangGraph astream() → SSE | 에이전트 단계별 진행 상황 표시 |
현재는 에이전트 단계 스트리밍만 프론트엔드에 전달한다. 토큰 스트리밍은 AI 서버 내부에서만 사용하며, 향후 Generator의 XML 생성 과정을 실시간으로 보여주는 데 활용할 수 있다.
# 토큰 스트리밍 (AI 서버 내부)
async with client.messages.stream(
model="claude-sonnet-4-5-20250929",
max_tokens=8192,
system=system_prompt,
messages=[{"role": "user", "content": prompt}],
) as stream:
async for text in stream.text_stream:
yield text앱 라이프사이클 관리
DB 풀 초기화/정리를 FastAPI의 lifespan 컨텍스트로 관리한다.
from contextlib import asynccontextmanager
from fastapi import FastAPI
@asynccontextmanager
async def lifespan(app: FastAPI):
try:
await init_pool()
except Exception as e:
logger.warning("DB 연결 실패 (DB 없이 기동): %s", e)
yield
await close_pool()
app = FastAPI(title="AI Agent", version="0.1.0", lifespan=lifespan)DB 연결이 실패해도 경고만 출력하고 정상 기동한다. XML 검증(/api/validate)처럼 DB가 필요 없는 엔드포인트는 계속 동작할 수 있기 때문이다.
Dependency Injection
FastAPI의 Depends()로 DB pool, LLM client를 주입한다. 테스트 시 mock으로 교체할 수 있다.
async def get_pool():
return pool
@router.get("/api/search/similar")
async def search_similar(
q: str,
top_k: int = 5,
pool=Depends(get_pool),
):
embedding = await create_embedding(q)
results = await search_similar_templates(pool, embedding, top_k=top_k)
return results테스트에서 의존성 교체:
app.dependency_overrides[get_pool] = lambda: mock_pool에러 처리
워크플로우 실행 중 에러 발생 시 SSE error 이벤트로 프론트엔드에 전달한다.
async def event_generator():
try:
async for chunk in compiled.astream(initial_state, stream_mode="updates"):
# ... 정상 처리
pass
except Exception as e:
yield {"data": json.dumps({"type": "error", "message": str(e)})}전역 에러 핸들러도 설정한다.
@app.exception_handler(Exception)
async def global_exception_handler(request, exc):
logger.error(f"Unhandled error: {exc}", exc_info=True)
return JSONResponse(
status_code=500,
content={"detail": "내부 서버 오류가 발생했습니다"},
)API 엔드포인트 요약
| 메서드 | 경로 | 설명 | 필요 인프라 |
|---|---|---|---|
| GET | /health | 헬스체크 | 없음 |
| POST | /api/validate | XML 유효성 검증 | 없음 |
| POST | /api/analyze | XML 패턴 분석 | Claude API |
| GET | /api/search/similar | 유사 템플릿 검색 | DB + OpenAI |
| POST | /api/generate/stream | 템플릿 생성 (SSE) | 모두 |
외부 인프라 의존도에 따라 엔드포인트를 분리했다. /health와 /api/validate는 외부 의존 없이 동작하므로, 모니터링과 테스트에 유용하다.
정리
FastAPI + LangGraph SSE 스트리밍 구조에서 핵심적인 설계 결정 세 가지:
-
Backend는 릴레이만 한다: AI 서버의 SSE 스트림을 Backend가 그대로 중계하는 구조로, SSE 이벤트 생성 로직이 AI 서버에만 존재한다. 관심사 분리가 명확하다.
-
sse-starlette + astream 조합: LangGraph의
astream(stream_mode="updates")이 노드 완료 시마다 yield하는 구조와sse-starlette의EventSourceResponse가 자연스럽게 결합된다. -
Lifespan 기반 리소스 관리: DB 풀의 초기화/정리를 FastAPI lifespan으로 관리하여, DB 없이도 기동 가능한 유연한 구조를 유지한다.
~20초의 AI 생성 과정을 5단계로 나누어 실시간으로 보여주는 것만으로도, 사용자 체감 대기 시간이 크게 줄어든다.