본문 바로가기

[Projects]/[AI CHAT BOT]

[AI CHAT BOT] 응답 속도 개선을 위한 스트리밍 설계, Next -> Nest -> Python 실시간 렌더링

728x90

[Python] 멀티프로세싱으로 처리 속도 올리기 (multiprocessing)

이전 글 읽으러 가기!

 

👋 소개

안녕하세요! 대학생 개발자 주이어입니다!

 

저는 현재 진행 중인 프로젝트에서 단순히 GPT API를 호출해 답변을 받는 구조로 챗봇을 제작하고 있었습니다.

하지만 서버 배포 후 테스트 과정에서 무료 vm의 낮은 성능 콜드스타트 등의 문제 때문에 답변을 기다리는 시간이 너무 길어졌고,
이는 사용자 경험을 현저히 떨어뜨리는 문제가 될 것이라고 생각했습니다.

 

저번 TRAIVEL 프로젝트에서도 비슷한 문제가 있었지만 Next -> Express로 한 단계 통신이었기에 큰 문제가 되지 않았습니다.

하지만 이번의 경우 RAG 방식을 사용한 Next -> Nest -> Python으로 두 단계 이상의 통신이었고 그냥 넘어갈 수 없는 문제가 되어 해결해야 겠다고 생각했습니다.

 

그래서 저는 이 문제를 해결하기 위해 GPT 스트림 기능을 사용하고, 프론트엔드에서 실시간으로 렌더링하는 구조로 리팩토링하기로 결정했습니다.


💬 GPT 스트림 설정하기

from openai import AsyncOpenAI

client = AsyncOpenAI(api_key="APIKEY")

가장 먼저 수정해야할 부분은 라이브러리 import 부분과 client 생성 부분입니다.

GPT 스트리밍을 사용하기 위한 비동기 openai 라이브러리를 가져와주고, client를 생성해줍니다.

async def generate_answer(query, context_documents, notice_context_docs):

실제로 답변을 생성하는 함수도 async를 붙여 비동기 함수로 만들어줍니다.

비동기 함수로 만드는 이유는 GPT가 chunk 단위로 답변을 생성할 때마다 async for과 yield로 값을 전달해주기 위해서입니다.

response = await client.chat.completions.create(
    model="gpt-4o",
    messages=[{"role":"user", "content":prompt}],
    stream=True,
)

async for chunk in response:
    token = getattr(chunk.choices[0].delta, "content", "")
    if token:
        yield token

위 코드는 실질적으로 GPT가 스트리밍 방식으로 답변을 생성하게 만들어주는 코드입니다.

먼저 client 모델 설정에서 stream=True로 설정해 답변 생성 방식을 stream으로 해줍니다.

 

그 다음 async for을 이용해 답변을 chunk 단위로 받아오고, 각 chunk에서 토큰을 추출하여 yield로 순차적으로 값을 보내줍니다.

 

이렇게 해주면 이제 GPT는 실시간으로 답변을 생성하게 됩니다.


⚙️ FastAPI 스트리밍 설정하기

GPT가 스트리밍으로 답변을 생성해서 내보내주니, 이를 스트리밍으로 받아줄 서버가 필요하겠죠.

from fastapi import FastAPI
from fastapi.responses import StreamingResponse

FastAPI에서는 스트리밍으로 응답을 받기 위한 StreamingResponse를 지원하고 있어 쉽게 구현할 수 있었습니다.

@server.post("/message")
async def message(data:MessageRequest):
    msg = data.message
    docs, embedded_vectors, notice_docs, notice_vec = load_embeddings()
    related_docs = search_top_k(msg, docs, embedded_vectors, k=7)
    related_notice_docs = search_top_k(msg, notice_docs, notice_vec, k=7)

    async def answer_generator():
        async for token in generate_answer(msg, related_docs, related_notice_docs):
            yield token

    return StreamingResponse(answer_generator(), media_type="text/event-stream")

스트리밍으로 값을 받아오고, 또 스트리밍으로 Nest에 넘겨줄 예정이기 때문에 async def로 생성을 해줍니다.
그 후 RAG와 관련된 임베딩, 유사 문서 검색 부분은 생략하고,

async def answer_generator 함수가 추가적으로 있는 것을 확인할 수 있는데,

GPT가 스트리밍으로 생성하는 답변을 token 단위로 받아오고, 이를 다시 한번 yield로 스트리밍으로 반환해주는 함수입니다.

 

즉, Python 서버의 역할은 GPT의 스트리밍 데이터를 중간에서 받아 스트리밍으로 Nest에 넘겨주는 역할을 수행합니다.

 

잠깐! 왜 이런 구조를 선택했을까요?

Next -> Python으로 바로 가는 구조로 만들었다면 훨씬 더 간단하고 쉽게 만들었을 텐데 왜 Nest를 한번 더 거치는 구조를 사용했을까요? 

사실 저는 이 부분에 대해서 많은 고민을 했었는데요.

그럼에도 Next -> Nest -> Python 구조를 사용한 이유는 2가지 정도가 있는 것 같습니다.

  • API 통일성 유지
    /message API를 제외한 기존의 API들은 이미 모두 Nest에서 처리중이었기 때문에 Nest에서 모든 API 요청을 관리하는 것이유지보수에도 좋고 일관성을 유지하는데 좋을 것 같다고 생각했습니다.
  • 서버 역할 분리
    RAG 방식으로 답변을 생성하는 과정은 메모리를 많이 사용하며 오래 걸리는 무거운 작업입니다.
    이 때 Nest와 Python을 합쳐서 사용한다면 전체 서비스에 부하가 올 수 있다고 생각했고,
    서버에 배포를 할 경우에도 Nest는 무료 서버, Python만 유료 서버(메모리를 많이 사용하므로)에 배포함으로써 서버 비용 면에서도 효율적으로 관리할 수 있을 거라고 생각했습니다.

위와 같은 이유로 저는 Nest -> Python 구조를 선택하게 되었습니다.


⚙️ Nest 서버 구현하기

import { Controller, Post, Req, Res } from '@nestjs/common';
import { Request, Response } from 'express';
import { firstValueFrom } from 'rxjs';
import { HttpService } from '@nestjs/axios';

먼저 스트리밍 서버를 구현하기 위한 모듈들을 import 해줍니다.

firstValueFrom은 Observable 스트림에서 첫 번째 값을 Promise형태로 가져오도록 도와줍니다.

HttpService는 Observable을 반환하는데 await는 Promise형태에서만 사용이 가능하기 때문에 firstValueFrom을 사용합니다.

HttpService는 Nest에서 HTTP 요청을 보낼 때 사용하는 모듈이며, 내부적으로 Axios를 사용합니다.

@Controller('message')
export class MessageController {
  constructor(private readonly httpService: HttpService) {}

/message 경로로 컨트롤러를 만들어주고, HttpService 모듈을 생성자에 의존성 주입해줍니다.

  @Post()
  async proxyMessage(@Req() req: Request, @Res() res: Response) {

Post 요청을 생성해주고, ReqRes로 요청과 응답을 처리해줍니다.

const pythonUrl = 'https://railway_python_server/message';

const pythonResponse$ = this.httpService.post(pythonUrl, req.body, {
  responseType: 'stream',
  headers: { 'Content-Type': 'application/json' },
});

railway에 배포한 python 서버로 url을 설정해주고,

responseType: 'stream' 옵션을 통해 응답을 스트리밍 형태로 받도록 설정해줍니다.

const pythonResponse = await firstValueFrom(pythonResponse$);

res.status(pythonResponse.status);
for (const [key, value] of Object.entries(pythonResponse.headers)) {
  res.setHeader(key, value as string);
}

위에서 언급했던 firstValueFrom으로 Promise형로 변환하여 응답을 가져옵니다.

그 다음 python에서 보내주는 응답 상태와 헤더를 Nest 응답으로 복사합니다.

pythonResponse.data.pipe(res);

마지막으로 python에서 받아온 스트리밍 응답을 그대로 클라이언트에 pipe로 실시간으로 전송해줍니다.

이 부분이 스트리밍 데이터를 중간에 끊기지 않고 전달되도록 처리하는 핵심 부분입니다.

pipe()는 Node.js의 스트림에서 자주 사용되는 기능으로, 한 스트림의 출력을 다른 스트림의 입력으로 연결할 때 사용됩니다.

즉, 위 코드에서는 python의 스트림을 -> Next 클라이언트로 연결해주는 중간 다리 역할을 하고 있는 것입니다.

(여기서 res는 Node.js의 WriteableStream으로 Next 클라이언트를 의미합니다.)

 

이렇게 해주면 Python의 스트림 응답을 끊기지 않고 바로 Next 클라이언트로 보낼 수 있게 됩니다.


🧭 Next API Handler 설정하기

Next API Handler는 Next App Router에서 지원하는 기능으로 API 요청을 중앙화하고, 한 곳에서 관리할 수 있게 도와주는 기능입니다. 또한 클라이언트에 백엔드 주소가 직접적으로 노출되지 않기 때문에 보안적으로도 사용되는 기능입니다.

import { NextRequest, NextResponse } from "next/server";

export async function POST(request: NextRequest) {
  const body = await request.json();
  const message = body.message;

  const backendUrl = `${process.env.NEXT_PUBLIC_BACKEND_URL}/message`;

  const res = await fetch(backendUrl, {
    method: "POST",
    headers: {
      "Content-Type": "application/json",
    },
    body: JSON.stringify({ message }),
  });

  return new NextResponse(res.body, {
    headers: {
      "Content-Type": "text/plain",
    },
  });
}

위 코드는 Nest서버의 message 경로로 요청을 보내는 API Handler 코드입니다.

여기서 특별한 부분은 밑에 있는 return 부분입니다.

그냥 응답을 반환하지 않고 새로운 Response 객체를 생성하여 반환 해주는 것을 알 수 있습니다.

이렇게 하는 이유는 Nest에서 스트리밍으로 응답이 끝날 때까지 값을 계속 보내주기 때문에 이에 맞춰 새로운 Response 객체를 생성하여 반환해주는 것입니다.

결과적으로, Next에서 스트리밍 응답을 처리할 수 있게 되는 것입니다.


🖥️ Next 실시간 렌더링 처리하기

try {
    const res = await fetch(`api/message`, {
      signal: controllerRef.current.signal,
      method: "POST",
      headers: { "Content-Type": "application/json" },
      body: JSON.stringify({ message: input }),
    });

    if (!res.body) return;

    const reader = res.body.getReader();
    const decoder = new TextDecoder();

getReader()는 스트림 데이터를 읽을 수 있도록 도와주는 기능을 합니다.

이 리더를 통해 .read()를 호출해 스트림을 읽을 수 있게 됩니다.

TextDeocder()는 스트림에서 읽은 데이터를 사람이 읽을 수 있는 문자열로 바꿔주는 기능을 합니다.

while (true) {
  const { done, value } = await reader.read();
  if (done) break;
  const chunk = decoder.decode(value);
  setAnswers((prev) => {
    const newAnswers = [...prev];
    newAnswers[newAnswers.length - 1] += chunk;
    return newAnswers;
  });
}

.read()는 done과 value 값을 반환하는데,

done은 스트리밍이 진행 중인지를 나타내는 boolean 값이고 value는 스트림 데이터를 가지고 있습니다.

done이 true라면 스트리밍이 끝났다는 뜻이므로 반복문을 종료해줍니다.

 

끝나지 않았다면 decoder로 값을 읽어 저장해줍니다.

그리고 실시간 렌더링을 하기 위해서는 값이 바뀔 때 마다 렌더링해야 하므로, useState를 이용해줍니다.

 

그럼 최종적으로 클라이언트에서 실시간으로 값을 받아와 렌더링을 해주는 구조가 완성됩니다.


👋 실제 화면

 

stream 실행 영상

원하는 결과가 잘 나오는 것을 확인할 수 있습니다.


😊  마무리

이번 과정을 통해 단순한 GPT API 호출 구조에서 벗어나 실시간 스트리밍 처리를 할 수 있게 되었고,

Next -> Nest -> Python과 같은 다단계 서버 구조를 구축할 수 있게 되었습니다.

 

이번 경험은 단순히 기술적인 해결뿐만 아니라 "어떻게 사용자 경험을 향상시킬 수 있을까?"의 고민에 대해 해결해보는 경험이었던 것 같습니다.

 

혹시 저와 비슷한 문제를 겪고 있다면, 스트리밍 구조를 도입해보는 것을 추천해드립니다.

 

저와 같이 이런 분야를 공부하고 프로젝트를 제작하고 싶다면 아래 링크로 들어와주세요!

https://discord.gg/8Hh8WgM4zp

 

KYT CODING COMMUNITY Discord 서버에 가입하세요!

Discord에서 KYT CODING COMMUNITY 커뮤니티를 확인하세요. 21명과 어울리며 무료 음성 및 텍스트 채팅을 즐기세요.

discord.com

KYT CODING COMMUNITY 가입하기! 

728x90