© 2025 anveloper.dev
GitHub·LinkedIn·Contact

목차

  • SSE: LLM 스트리밍의 사실상 표준
  • 배경
  • 전체 아키텍처
  • FastAPI 프로젝트 구조
  • SSE 스트리밍 엔드포인트
  • SSE 이벤트 설계
  • 정상 흐름
  • 재시도 흐름
  • 두 가지 수준의 스트리밍
  • 앱 라이프사이클 관리
  • Dependency Injection
  • 에러 처리
  • API 엔드포인트 요약
  • 정리
포스트 목록으로 돌아가기

FastAPI + LangGraph로 AI 생성 과정 SSE 스트리밍하기

2026-02-12
FastAPI
SSE
LangGraph
Streaming
Python

FastAPI + LangGraph로 AI 생성 과정 SSE 스트리밍하기

SSE: LLM 스트리밍의 사실상 표준

Server-Sent Events(SSE)는 서버에서 클라이언트로 단방향 실시간 데이터를 전송하는 HTTP 기반 프로토콜이다. WebSocket과 달리 별도의 프로토콜 업그레이드 없이 표준 HTTP 위에서 동작하며, 브라우저의 EventSource API로 별도 라이브러리 없이 사용할 수 있다.

항목SSEWebSocket
통신 방향서버 → 클라이언트 (단방향)양방향
프로토콜HTTP/HTTPSws:// / wss:// (프로토콜 업그레이드)
자동 재연결내장 (EventSource 자동 처리)직접 구현 필요
로드밸런서표준 HTTP 인프라 그대로WebSocket 지원 설정 필요
브라우저 지원EventSource 내장WebSocket 내장

2025년 AI 생태계에서 SSE는 LLM 토큰 스트리밍의 사실상 표준이 되었다. OpenAI, Anthropic, Google 등 주요 AI API가 모두 SSE 기반 스트리밍을 제공한다. "사용자가 요청하면, 서버가 토큰을 흘려보낸다"는 단방향 패턴에 SSE가 정확히 맞기 때문이다. WebSocket의 양방향 기능은 LLM 스트리밍에서는 불필요한 복잡성이며, 커넥션 풀링, 재시도, 로드밸런싱 등 운영 부담이 크다.

이 글에서는 FastAPI의 sse-starlette과 LangGraph의 astream을 결합하여, 멀티 에이전트 워크플로우의 진행 상황을 실시간으로 전달하는 SSE 스트리밍 구조를 설계한 과정을 정리한다.

배경

디자인 템플릿 AI 생성 시스템의 워크플로우는 성공 시 ~17-22초, 재시도 포함 시 ~30초 이상 소요된다. 5개 에이전트(Orchestrator → Retriever → Analyzer → Generator → Validator)가 순차적으로 실행되는 구조이기 때문이다.

사용자에게 "로딩 중..." 하나만 보여주면 체감 대기 시간이 길다. 각 에이전트의 진행 상황을 실시간으로 보여주면 훨씬 나은 경험이 된다. WebSocket 대신 **SSE(Server-Sent Events)**를 선택한 이유는, 서버 → 클라이언트 단방향 스트리밍만 필요하고 구현이 단순하기 때문이다.

전체 아키텍처

Backend는 AI 서버의 SSE 스트림을 그대로 릴레이한다. 직접 이벤트를 생성하지 않고, AI 서버의 응답 body를 클라이언트에 투명하게 전달하는 구조다.

FastAPI 프로젝트 구조

src/ai_agent/
├── main.py              # FastAPI 앱 팩토리
├── config.py            # pydantic-settings 환경 설정
├── api/
│   ├── generate.py      # POST /api/generate/stream (SSE)
│   ├── search.py        # GET /api/search/similar
│   └── health.py        # GET /health
├── schemas/
│   └── generate.py      # Pydantic 요청/응답 모델
├── llm/                 # LLM 클라이언트
├── agents/              # 에이전트 구현
├── db/                  # DB 접근 (asyncpg)
└── workflow/            # LangGraph 워크플로우

FastAPI를 선택한 이유:

  • 비동기 네이티브: LLM API, DB 쿼리 등 I/O 작업을 async/await로 병렬 처리
  • AI/ML 생태계: Python 기반으로 LangGraph, Anthropic SDK, OpenAI SDK와 자연스럽게 통합
  • Pydantic 통합: 요청/응답 스키마의 자동 검증과 OpenAPI 문서 생성

SSE 스트리밍 엔드포인트

LangGraph의 astream(stream_mode="updates")과 sse-starlette을 결합한다.

import json
from fastapi import APIRouter
from sse_starlette.sse import EventSourceResponse
 
router = APIRouter()
 
NODE_TO_STAGE = {
    "orchestrate": "orchestrating",
    "retrieve": "retrieving",
    "analyze": "analyzing",
    "generate": "generating",
    "validate": "validating",
}
 
@router.post("/api/generate/stream")
async def generate_stream(request: GenerateRequest, mock: bool = False):
    async def event_generator():
        compiled = build_graph().compile()
        initial_state = {
            "prompt": request.prompt,
            "pod_type": None,
            "retry_count": 0,
        }
 
        async for chunk in compiled.astream(
            initial_state, stream_mode="updates"
        ):
            for node_name, state_update in chunk.items():
                stage = NODE_TO_STAGE.get(node_name)
                if not stage:
                    continue
                yield {
                    "data": json.dumps({
                        "type": "agent_complete",
                        "stage": stage,
                    })
                }
 
        # 최종 결과
        yield {
            "data": json.dumps({
                "type": "result",
                "data": {"xml": state_update.get("generated_xml", "")},
            })
        }
 
    return EventSourceResponse(event_generator())

astream(stream_mode="updates")는 각 노드 실행 완료 시마다 {노드이름: 상태업데이트} dict를 yield한다. 이를 SSE 이벤트로 변환하면 된다.

SSE 이벤트 설계

이벤트설명데이터
agent_start에이전트 단계 시작{ stage: AgentStage }
agent_complete에이전트 단계 완료{ stage: AgentStage }
result최종 결과{ data: { xml: string } }
error오류 발생{ message: string }

5단계 스테이지: orchestrating → retrieving → analyzing → generating → validating

정상 흐름

재시도 흐름

Validator가 실패하면 generate → validate 루프가 반복된다.

두 가지 수준의 스트리밍

이 시스템에는 두 가지 수준의 스트리밍이 존재한다.

수준위치방식용도
토큰 스트리밍AI 서버 내부Claude messages.stream()Generator XML 생성 시 토큰 단위 수신
에이전트 스트리밍AI 서버 → 클라이언트LangGraph astream() → SSE에이전트 단계별 진행 상황 표시

현재는 에이전트 단계 스트리밍만 프론트엔드에 전달한다. 토큰 스트리밍은 AI 서버 내부에서만 사용하며, 향후 Generator의 XML 생성 과정을 실시간으로 보여주는 데 활용할 수 있다.

# 토큰 스트리밍 (AI 서버 내부)
async with client.messages.stream(
    model="claude-sonnet-4-5-20250929",
    max_tokens=8192,
    system=system_prompt,
    messages=[{"role": "user", "content": prompt}],
) as stream:
    async for text in stream.text_stream:
        yield text

앱 라이프사이클 관리

DB 풀 초기화/정리를 FastAPI의 lifespan 컨텍스트로 관리한다.

from contextlib import asynccontextmanager
from fastapi import FastAPI
 
@asynccontextmanager
async def lifespan(app: FastAPI):
    try:
        await init_pool()
    except Exception as e:
        logger.warning("DB 연결 실패 (DB 없이 기동): %s", e)
    yield
    await close_pool()
 
app = FastAPI(title="AI Agent", version="0.1.0", lifespan=lifespan)

DB 연결이 실패해도 경고만 출력하고 정상 기동한다. XML 검증(/api/validate)처럼 DB가 필요 없는 엔드포인트는 계속 동작할 수 있기 때문이다.

Dependency Injection

FastAPI의 Depends()로 DB pool, LLM client를 주입한다. 테스트 시 mock으로 교체할 수 있다.

async def get_pool():
    return pool
 
@router.get("/api/search/similar")
async def search_similar(
    q: str,
    top_k: int = 5,
    pool=Depends(get_pool),
):
    embedding = await create_embedding(q)
    results = await search_similar_templates(pool, embedding, top_k=top_k)
    return results

테스트에서 의존성 교체:

app.dependency_overrides[get_pool] = lambda: mock_pool

에러 처리

워크플로우 실행 중 에러 발생 시 SSE error 이벤트로 프론트엔드에 전달한다.

async def event_generator():
    try:
        async for chunk in compiled.astream(initial_state, stream_mode="updates"):
            # ... 정상 처리
            pass
    except Exception as e:
        yield {"data": json.dumps({"type": "error", "message": str(e)})}

전역 에러 핸들러도 설정한다.

@app.exception_handler(Exception)
async def global_exception_handler(request, exc):
    logger.error(f"Unhandled error: {exc}", exc_info=True)
    return JSONResponse(
        status_code=500,
        content={"detail": "내부 서버 오류가 발생했습니다"},
    )

API 엔드포인트 요약

메서드경로설명필요 인프라
GET/health헬스체크없음
POST/api/validateXML 유효성 검증없음
POST/api/analyzeXML 패턴 분석Claude API
GET/api/search/similar유사 템플릿 검색DB + OpenAI
POST/api/generate/stream템플릿 생성 (SSE)모두

외부 인프라 의존도에 따라 엔드포인트를 분리했다. /health와 /api/validate는 외부 의존 없이 동작하므로, 모니터링과 테스트에 유용하다.

정리

FastAPI + LangGraph SSE 스트리밍 구조에서 핵심적인 설계 결정 세 가지:

  1. Backend는 릴레이만 한다: AI 서버의 SSE 스트림을 Backend가 그대로 중계하는 구조로, SSE 이벤트 생성 로직이 AI 서버에만 존재한다. 관심사 분리가 명확하다.

  2. sse-starlette + astream 조합: LangGraph의 astream(stream_mode="updates")이 노드 완료 시마다 yield하는 구조와 sse-starlette의 EventSourceResponse가 자연스럽게 결합된다.

  3. Lifespan 기반 리소스 관리: DB 풀의 초기화/정리를 FastAPI lifespan으로 관리하여, DB 없이도 기동 가능한 유연한 구조를 유지한다.

~20초의 AI 생성 과정을 5단계로 나누어 실시간으로 보여주는 것만으로도, 사용자 체감 대기 시간이 크게 줄어든다.