[SYNAPSE 개발일지 #1] GraphRAG를 위한 AI 서버 아키텍처 설계 - Orchestrator 도입부터 DB 접근 권한과 도메인 소유권까지
[SYNAPSE 개발일지 #1] GraphRAG를 위한 AI 서버 아키텍처 설계 - Orchestrator 도입부터 DB 접근 권한과 도
[SYNAPSE] 여러분은 무슨 목적으로 기록하시나요? 어쩌면 그 기록은 이미 죽었을지도 모릅니다. [SYNAPSE] 여러분은 무슨 목적으로 기록하시나요? 어쩌면 그 기록은 이미 죽었을지도 모릅니다.💀 지
blog.juyear.dev
이전 글 읽으러 가기!
👋 소개 및 회고
안녕하세요. 대학생 개발자 주이어입니다.
저번 글에서 GraphRAG를 위한 전반적인 AI 서버 아키텍처 설계에 대해서 적어보았습니다.
하지만 저번 글에서 해결하지 못한 문제가 하나 있었습니다.
바로 'AI 서버가 어떻게 메인 서버를 호출할 것인가'라는 문제였습니다.
오늘은 이 문제를 해결하기 위해서 어떤 방법을 도입했는지, 도입하는 과정에서 어떤 문제들을 만났는지 정리해보려고 합니다.
- 왜 Redis가 필요했는가
- 왜 Streams를 선택했는가
- 구현하면서 만난 문제들
이 세 가지를 중심으로 작성해보려 합니다.
🤔 왜 Redis가 필요했는가
현재의 문제점
저번 글에서 잠깐 소개드렸었지만, 이번 글에서 정확히 어떤 문제가 있는지 정리하고 넘어가도록 하겠습니다.

위 그림은 저번 글에서 그렸던 최종적인 아키텍처입니다.
현재 아키텍처의 특징은 아래와 같이 정리할 수 있을 것 같습니다.
- Main Server와 AI Server의 분리
- 도메인 소유권 분리를 위해 AI Server에 Read Only DB를 적용
- Main Server에서 비동기로 AI Server를 호출
문제는 AI Server가 Read Only DB라는 점에서 시작합니다.
AI Server는 GraphRAG 파이프라인과 관련된 작업을 수행하고, 이 작업 과정에는 embedding 데이터 저장, link 데이터 저장과 같은 DB에 데이터를 저장하는 과정이 포함되어 있습니다.
이를 해결하기 위해 제가 쉽게 떠올린 방법은 두 가지 정도가 있었습니다.
[1. Write 권한을 주면 되지 않을까?]
이 방법의 경우 가장 쉽게 떠오르는 방법이자 구현도 가장 쉬운 방법이긴 합니다.
하지만 이 방법은 DB 권한을 나눴던 이유 자체를 부정하는 모순적인 방법이긴 합니다.
도메인 소유권을 분리하고, 유지보수와 관리 체계를 위해 권한을 나눈 DB에 그냥 권한을 주는 것은 좋은 방법이 아니라고 생각했습니다.
[2. API로 데이터 저장을 호출하면 되지 않을까?]
이 방법의 경우 세 가지의 문제가 있었습니다.

첫 번째는 위와 같이 API 호출 과정이 증가한다는 것 입니다.
이는 통신 비용이 증가하고 전체적인 파이프라인 처리 시간도 증가할 것이라고 생각했습니다.
두 번째는 AI 서버의 메인 서버 의존도가 높아진다는 것입니다.
지속적으로 메인 서버와 호출을 진행해야만 작업이 진행될 수 있고, 이는 메인 서버에 지나치게 의존할 수 있다는 생각이 들었습니다. 또한 AI 서버가 메인 서버를 의존하는 '역방향 의존성' 문제도 생길 수 있다고 생각했습니다.
세 번째는 구조적 문제였습니다.
이러한 호출 구조는 '순환 호출 구조' 라는 구조적 문제가 있다고 생각하였습니다.
이러한 이유들로 처음에 떠올렸던 두 가지 방법은 적절하지 않다고 판단하였고,
AI서버에서의 데이터 저장이라는 문제는 해결되지 않았습니다.
Message Queue는 무엇인가
최종적으로 이 문제를 해결하기 위해 저는 Redis를 사용한 Message Queue를 도입하였습니다.
하지만 이 부분에 대해서 설명하기 전에 Message Queue에 대해서 소개하려 합니다.

우선 Message Queue 구조에 대해서 간단히 설명하기 위해 위 그림을 준비했습니다.
Message Queue는 시스템(서버) 간 메시지(데이터)를 전달하여 작업을 비동기적으로 처리하기 위해 사용합니다.
Kafka, RabbitMQ 등 Message Queue 종류에 따라 특징이 다르지만 일반적인 구조와 목적은 비슷합니다.
Redis도 Message Queue로 사용할 수 있으며, 비슷한 목적을 가졌습니다.
이러한 Message Queue를 사용하는 이유는 여러 이유가 있지만 가장 큰 이유 중 하나는 '시스템 간 결합도 완화' 때문입니다.

일반적인 API 호출은 호출하는 서버가 호출받는 서버의 영향을 많이 받습니다.
호출받는 서버를 직접 알고 있어야 하며, 서버가 느려지거나 장애가 발생하면 호출하는 서버도 영향을 받을 수 있습니다.

하지만 Message Queue를 사용하면 호출하는 서버가 호출받는 서버를 직접 알 필요가 없습니다.
호출하는 서버는 Message Queue에 Message를 넣으면 되고, 호출받는 서버는 Message Queue에서 Message를 가져가면 됩니다.
만약 지금 어떠한 문제로 호출받는 서버에 장애가 발생하더라도, 메시지는 Queue에 저장되어 있으므로(Redis의 경우 Streams를 사용) 서버가 복구된 뒤 이어서 처리할 수 있고, 호출하는 서버는 Queue에 Message를 전송하기만 하면 됩니다.
즉, API 호출이 상대 서버에게 직접 요청하는 방식이라면, Message Queue는 Queue를 통해 간접적으로 통신하는 방식이라고 볼 수 있습니다.
Redis가 어떻게 해결하는가
이제 다시 SYNAPSE로 돌아와 Redis Message Queue가 어떻게 문제를 해결하는지 정리해보겠습니다.

Redis Message Queue를 사용하면 위와 같이 아키텍처가 바뀌게 됩니다.
(검은색 화살표 - Main Server 이벤트 발행, 빨간색 화살표 - AI Server 이벤트 발행)
(각 서버는 발행과 이벤트 감지를 동시에 한다는 것을 나타냄)
기존 구조에서는 AI 서버가 데이터를 저장하기 위해 메인 서버를 직접 호출해야 했습니다.
이 과정에서 AI 서버가 메인 서버를 직접 의존하게 되었고, 결과적으로 역방향 의존성과 순환 호출 구조가 발생했습니다.
하지만 현재 구조는 AI 서버가 메인 서버를 직접 호출하지 않습니다.
저장해야 하는 데이터를 Redis Message Queue에 이벤트 형태로 발행하면, 메인 서버는 이를 가져와 필요한 저장 작업을 수행합니다.
해당 구조도 AI 서버가 메인 서버에 의존하는 구조라고 생각하실 수 있지만, 위에서 설명했듯 이는 직접적인 의존 구조가 아닙니다.
AI 서버는 단순히 이벤트를 발행할 뿐이고, 메인 서버는 해당 이벤트를 소비할 뿐입니다.
서로는 서로의 서버를 직접 알 필요가 없고, 서로한테 직접 의존하지 않습니다.
이러한 구조는 도메인 소유권과 DB 권한을 분리하면서도 역방향 의존성 문제와 순환 호출 구조 문제를 해결할 수 있습니다.
물론 Redis를 도입한다고 끝나는 것은 아니었습니다.
Redis를 어떻게 사용해야 하는지에 따라 구조가 크게 달라질 수 있었습니다.
💡 왜 Streams를 선택했는가
Pub/Sub 구조
Redis를 사용한 이벤트 발행 방법은 크게 두 가지가 있었습니다.
그 중 첫 번째는 Pub/Sub 구조였습니다.
Pub/Sub 구조는 이벤트를 발행하는 Publisher와 이벤트를 구독하는 Subscriber로 이루어져 있습니다.
해당 구조는 실시간으로 이벤트를 전달한다는 특징과 메시지를 저장하지 않는다는 특징을 가지고 있습니다.

이벤트가 발행되면 Redis는 해당 이벤트를 구독하고 있는 Subscriber에게 Push 방식으로 즉시 전달합니다.
화살표 방향을 보면 호출하는 서버는 Redis를 바라보지만, 호출받는 서버는 Redis가 바라보는 것을 알 수 있습니다.
따라서 Subscriber는 새로운 이벤트가 있는지 직접 확인할 필요가 없으며, 빠른 처리가 가능하다는 장점이 있습니다.
이외에도 Redis 메모리 사용량, 이벤트 저장 공간 비용 등에서 이점이 있습니다.
하지만 Pub/Sub 구조는 메시지를 저장하지 않는다는 단점이 있습니다.
즉, 이벤트가 발행되는 순간 구독 중인 Subscriber가 일시적인 장애로 인해 이벤트를 받을 수 없는 상황이었다면, 해당 이벤트는 Redis에 보관되지 않고 그대로 삭제됩니다.
따라서 서버가 복구되더라도 이전에 발행된 이벤트를 다시 처리할 수 없으며, 반드시 처리되어야 하는 작업에는 적합하지 않은 구조라고 판단했습니다.
Streams 구조

저는 최종적으로 Streams 구조를 선택했습니다.
가장 큰 이유는 Streams 구조는 Log 형식으로 이벤트를 저장하며, 이는 혹시나 서버에 문제가 생겨 이벤트를 처리하지 못하더라도 복구 이후 다시 처리할 수 있다는 것을 의미합니다.
또 다른 차이점은 이벤트를 전달받는 방식입니다.
Pub/Sub 구조는 Redis가 서버에 직접 알려줬다면, Streams는 Redis로부터 이벤트를 읽어오는 구조입니다.
일반적으로는 이벤트가 없을 때는 대기 상태를 유지하다가, 새로운 이벤트가 들어오면 해당 이벤트를 읽어와 처리합니다.
단점으로는 이벤트를 저장하고, Streams 구조를 위한 Consumer Group, ACK 등을 관리해야 하기 때문에 구조가 복잡하고 Redis 메모리 사용량도 증가할 수 있습니다.
하지만 SYNAPSE에서는 노트가 생성되었다면 해당 노트에 대한 분석은 반드시 수행되어야 했습니다.
즉, 실시간성의 빠른 처리보다는 이벤트를 유실하지 않는 것이 더 중요했습니다.
따라서 약간의 구현 복잡도와 추가적인 리소스 사용을 감수하더라도, 신뢰성을 보장할 수 있는 Streams 구조가 더 적합하다고 판단하였습니다.
⚙️ Streams 구현하기
이벤트 발행
SYNAPSE의 경우 [메인 서버 발행, AI 서버 소비] 와 [AI 서버 발행, 메인 서버 소비] 두 가지 경우가 모두 존재하지만, 이번 글의 핵심은 AI 서버에서 어떻게 메인 서버를 호출할지이기 때문에 AI서버 발행 기준으로 정리해보겠습니다.
Streams에서는 이벤트 발행이 소비보다 훨씬 구현하기 쉬웠습니다. 이벤트 발행의 경우 설정의 차이는 있지만, 로직 자체는 Pub과 크게 다르지 않았습니다. 그저 '이벤트를 발행한다'가 전부였습니다.
그렇기 때문에 이벤트 발행을 위해서 해준 것은 '발행 함수 호출'과 '발행 함수 구현' 두 가지 였습니다.
async def note_analyze(self, data: NoteAnalyzeRequest):
embedding = await self.note_service.create_note_embedding(data.content)
await self.redis_service.addSaveEmbeddingTask(data,embedding)
함수 호출의 경우 Orchestrator 작업에 넣어주었으며, embedding 데이터가 생성된 이후에 저장을 위해 호출해주었습니다.
저는 Redis 또한 infrastructure에 있는 하나의 Service로 생각하고 설계하였기 때문에 Orchestrator 안에 넣어주었습니다.
class RedisService:
async def addSaveEmbeddingTask(self, data: NoteAnalyzeRequest, embedding):
try:
note_embedding = NoteAnalyzeResponse(
note_id=data.note_id,
user_id=data.user_id,
embedding=embedding,
metadata=NoteMetadata(
title=data.title,
tags=[data.category_title],
summary_preview="fdsafdsa"
)
)
await client.xadd(
name='save-embedding-stream',
fields={'data':json.dumps(note_embedding.model_dump())},
maxlen=1000,
approximate=True
)
except Exception as e:
print(f"이벤트 발행 실패: {e}")
이벤트 발행 함수의 경우 위처럼 구현을 해주었습니다.
발행의 경우 언급했듯이 그렇게 어려운 구현이 아니었습니다.
발행에 필요한 형태로 데이터를 가공하고, xadd를 통해 Streams 구조로 발행하였습니다.
xadd 설정 값에 대해서 간단히 정리해보겠습니다.
- name
- stream의 이름을 설정하며, 이 이름을 기반으로 이벤트를 소비하게 됩니다.
- fields
- message로 전달할 데이터를 입력합니다.
- 이 때 message의 고유 id가 자동으로 입력됩니다.
- maxlen
- message log가 끊임없이 쌓이는 것을 방지하기 위해 log의 최대 길이를 설정합니다.
- approximate
- maxlen을 기준으로 log를 딱 자를건지 아니면 효율적인 시점을 계산해 한번에 자를건지를 결정합니다.
- True일 경우 maxlen을 넘더라도 바로 삭제하지 않으며, 효율적으로 삭제합니다.
- False일 경우 maxlen을 무조건 지키며, maxlen을 넘는다면 바로 삭제합니다. (성능이 조금 떨어질 수 있음)
이벤트 소비
이벤트 소비의 경우 발행보다는 구현과 로직이 조금 복잡했습니다.
Pub/Sub처럼 단순히 이벤트를 받는 구조가 아니었고, Consumer Group, ACK과 같은 부가적인 로직도 구현해야 했습니다.
또 가장 오래걸렸던 부분은 기존 서버에 영향이 가지 않도록 비동기로 Redis에서 이벤트를 읽도록 하는 부분이었습니다.

코드를 작성하기 전에 우선 Stream 구조를 정리해보도록 하겠습니다.
하나의 Stream은 위와 같은 구조를 가지고 있습니다.
하나의 Stream에는 여러 개의 Consumer Group이 있으며, 각 Consumer Group은 또 여러 개의 Consumer로 이루어질 수 있습니다.
위 그림처럼 하나의 이벤트에 대해서 실행해야 하는 기능이 여러 개라면, Consumer Group을 분리하여 각각 독립적으로 처리할 수 있습니다.
예를 들어 결제 완료 이벤트에 대해서는 포인트 추가, 주문 저장, 결제 메일 발송 등의 작업을 서로 다른 Consumer Group에서 처리하도록 구성하여 비동기적으로 처리할 수 있습니다.
물론 하나의 Stream이 하나의 이벤트를 의미하는 것은 아닙니다.
하나의 Stream에는 여러 종류의 이벤트 Log가 쌓일 수 있고, 이를 event type으로 구분하여 각 Consumer Group이 필요한 이벤트만 처리하도록 구현할 수 있습니다. 하지만 이 방법의 경우 Consumer Group마다 이벤트 분기를 통해 처리한 이벤트를 지정해주어야 합니다.
반대로 이벤트 종류별로 Stream 자체를 분리하는 방법도 존재합니다. 예를 들어 Add Point Stream, Save Order Stream, Send Email Stream처럼 Stream을 나누면 Consumer Group은 별도의 이벤트 분기 없이 해당 Stream만 소비하면 됩니다.
결국 Redis Streams 설계도 다양한 방법으로 할 수 있고, 서비스 요구사항에 맞는 Stream과 Consumer Group설계를 할 수 있습니다.
export class RedisWorker implements OnModuleInit {
private readonly STREAM_NAME = 'save-embedding-stream';
private readonly GROUP_NAME = 'nestjs-embedding-group';
private readonly CONSUMER_NAME = 'worker-1';
constructor(@Inject('REDIS_WORKER') private readonly redis: Redis) {}
onModuleInit() {
void this.ensureGroupExists().then(() => {
void this.startConsumer();
});
}
private async ensureGroupExists() {
try {
await this.redis.xgroup(
'CREATE',
this.STREAM_NAME,
this.GROUP_NAME,
'0',
'MKSTREAM',
);
} catch (e) {
console.error(e);
}
}
위 코드는 Consumer Group을 설정하는 코드입니다.
저의 경우 위에서 말한 방법 중 이벤트에 따라 Stream 자체를 나누는 방법을 선택하였습니다.
우선 별도의 이벤트 분기 없이 구현할 수 있다는 점이 좋았고, 하나의 Stream이 하나의 Consumer Group과 연결되는 것이 직관적이라고 생각했습니다.
코드를 보자면, onModuleInit을 통해 Nest 서버 실행 시 그룹 생성과 Redis Consumer를 실행시키고 있습니다.
즉, 서버가 실행되면 자동으로 Redis Consumer 작동이 시작되며, 비동기 작업으로 실행되기 때문에, 메인 서버의 API 처리에는 영향을 주지 않습니다.
xgroup에 대해서 간단히 정리해보겠습니다.
- 'CREATE': Consumer Group을 생성합니다.
- STREAM_NAME: 이벤트를 읽어올 Stream을 설정합니다.
- GROUP_NAME: 이벤트를 처리할 Consumer Group 이름을 설정합니다.
- '0': Consumer Group의 시작 위치를 Stream의 처음으로 설정합니다.
- 'MKSTREAM': Stream이 없으면 자동으로 생성합니다.
private async startConsumer() {
while (true) {
try {
const results = (await this.redis.xreadgroup(
'GROUP',
this.GROUP_NAME,
this.CONSUMER_NAME,
'BLOCK',
'0',
'STREAMS',
this.STREAM_NAME,
'>',
)) as [string, [string, string[]]] | null;
if (results) {
for (const [, messages] of results) {
for (const [messageId, data] of messages) {
console.log('데이터 처리:', JSON.parse(data[1]));
// note-embedding.service.ts 만들어서 임베딩 노트 저장 호출 구현 해야함
await this.redis.xack(
this.STREAM_NAME,
this.GROUP_NAME,
messageId,
);
}
}
}
} catch (e) {
console.error('Redis 에러 발생', e);
await new Promise((resolve) => setTimeout(resolve, 5000));
}
}
}
다음은 실제로 Consumer Group이 실행되는 함수입니다.
기본적으로 while true를 통해 이벤트를 항상 읽어올 수 있는 상태로 대기합니다.
while true 구조이지만 CPU를 계속 사용하며 반복하는 방식은 아닙니다.
xreadgroup(BLOCK 0)을 호출한 뒤 Redis의 응답을 기다리는 '대기 상태'가 되며, 이벤트가 도착하면 이어서 실행됩니다.
기다리는 동안에는 다른 요청을 처리하므로 메인 서버 성능에 영향을 주지 않습니다.
xreadgroup에 대해서 간단히 정리해보겠습니다.
- 'GROUP': Consumer Group을 사용하여 이벤트를 처리하겠다는 의미입니다.
- GROUP_NAME: 처리할 Consumer Group을 설정합니다.
- CONSUMER_NAME: Consumer Group 내에서 사용할 Consumer 이름을 의미합니다.
- 'BLOCK': 새로운 이벤트가 들어올 때까지 대기하도록 설정합니다.
- '0': 이벤트가 들어올 때까지 무한정 대기합니다.
- 'STREAMS': 읽어올 Stream을 지정합니다.
- 'STREAM_NAME': 이벤트를 읽어올 Stream 이름을 입력합니다.
- '>': 아직 처리되지 않은 새로운 이벤트만 읽도록 설정합니다.
이러한 설정을 하면, 새로운 이벤트가 들어올 때까지 무한정 대기를 하게 되며, 이벤트가 들어올 경우 처리 로직이 실행됩니다.
또한 Stream의 경우 이벤트가 Log 형태로 계속해서 저장되기 때문에, 해당 이벤트가 성공적으로 처리됐다는 사실을 Redis에 알려주어야 하며, 이를 xack을 통해 구현할 수 있습니다.
xack에 대해서 간단히 정리해보겠습니다.
- STREAM_NAME: 이벤트를 발행한 Stream을 설정합니다.
- GROUP_NAME: 이벤트를 처리한 Consumer Group을 설정합니다.
- messageId: 처리한 이벤트의 id를 설정합니다.
위와 같은 설정을 통해 특정 이벤트가 성공적으로 처리되었는지 알려줄 수 있으며, messageId의 경우 이벤트가 생성될 때 자동으로 입력됩니다.
지금까지 Redis Streams를 이용해 이벤트를 발행하고, Consumer Group이 이를 비동기로 소비하는 구조를 구현했습니다.
이를 통해 AI 서버는 메인 서버를 직접 호출하지 않고도 필요한 저장 작업을 요청할 수 있게 되었고, 메인 서버는 Redis에 저장된 이벤트를 순차적으로 처리할 수 있게 되었습니다.
결과적으로 저번 글에서 고민했던 역방향 의존성과 순환 호출 구조를 해결하면서도, 서버 간 결합도를 낮춘 이벤트 기반 아키텍처를 구현할 수 있었습니다.
🔧 구현하면서 만난 문제들
async에서 sync call 문제
우선 기본적으로 이벤트 처리는 비동기로 이루어집니다.
이벤트 처리라고 해서 무조건 비동기 처리인 것은 아니지만, SYNAPSE에서는 비동기를 통해 빠르게 사용자에게 응답하는 것이 중요했기 때문에 비동기로 이벤트를 처리하였습니다.
여기서 발생한 문제가 async에서 sync call로 인한 blocking 문제 였습니다.
async에서 sync call을 하면 문제가 발생할 수 있다는건 알고 있었습니다.
문제는 sync call이 어디서 일어나는지를 전혀 몰랐고, 그렇기 때문에 sync call 문제라는 것 자체도 나중에 깨달았습니다.
class NoteAnalyzeOrchestrator:
def __init__(self, note_service: NoteService, link_service: LinkService, redis_service: RedisService):
self.note_service = note_service
self.link_service = link_service
self.redis_service = redis_service
async def note_analyze(self, data: NoteAnalyzeRequest):
embedding = await self.note_service.create_note_embedding(data.content)
초기에 문제를 발견한 곳은 Orchestrator의 create_note_embedding 부분이었습니다.
Streams 구현이 끝나고 Orchestrator까지 적용하여 테스트를 해보니 정확히 저 부분에서 실행이 되지 않았고, 오류도 발생하지 않았습니다.
class NoteService:
def __init__(self, note_embedding_repository:NoteEmbeddingRepository):
self.note_embedding_repository = note_embedding_repository
async def create_note_embedding(self, content: str):
return await create_embedding(content)
그래서 저는 print를 하나씩 찍어가며 정확히 어느 순간에 멈추는지를 파악했고, 최종적으로 AI를 호출해 노트를 embedding하는 함수에서 멈춘다는 것을 알 수 있었습니다.
from infrastructure.ai.models import get_openai_client, get_embedding_model
async def create_embedding(text):
client = get_openai_client()
embedding_model = get_embedding_model()
response = await client.embeddings.create(
input=text,
model=embedding_model,
timeout=10
)
return response.data[0].embedding
문제는 그 함수의 코드였습니다.
이미 async를 사용한 비동기 함수로 구현되어 있었고, 해당 함수 호출 또한 await를 사용하여 호출하고 있었습니다.
즉, 비동기적으로 특별한 문제가 보이지 않았습니다.
GPT한테 물어봤지만 'API 호출에서 값을 받아오지 못하면서 오류가 발생하는 것일 수 있다'는 모호한 답변만 계속 해주었습니다.
from openai import AsyncOpenAI
def get_openai_client():
return AsyncOpenAI(api_key="sk-...")
그렇게 사투한 끝에 오류를 고칠 수 있었습니다.
OpenAI API 자체에 Async를 제공하는 AsyncOpenAI가 따로 있다는 것을 알았습니다...
그렇게 최종적으로 get_openai_client 함수를 수정했고, async에서의 sync call 문제를 해결하며 정상적으로 Orchestrator가 작동할 수 있었습니다.
Redis Producer와 Consumer 분리 문제
이벤트 소비(Consumer)를 구현한 이후 갑자기 이벤트 발행(Producer)이 정상적으로 동작하지 않는 문제를 만나게 되었습니다.
메인 서버에서는 이벤트 소비 뿐만 아니라 발행도 같이 수행하고 있었는데, Consumer를 구현한 이후부터 Producer에서 문제가 생기는 것을 확인하였습니다.
해당 이유에 대해서 분석해보니, 이벤트 소비를 구현한 부분에서 BLOCK 0으로 설정되어 있던 것이 문제였습니다.
물론 BLOCK 0 자체가 문제는 아니었습니다. BLOCK 0은 새로운 이벤트가 들어올 때까지 연결을 계 유지하며 무한정 대기하는 정상적인 동작이었습니다.

진짜 문제는 Producer와 Consumer가 동일한 Redis Connection을 공유하고 있었다는 점입니다.
Consumer가 BLOCK 0으로 연결을 계속 점유하고 있으니, Connection은 Blocking 상태가 되고, Producer는 동일한 Connection을 사용하여 이벤트 발행을 할 수가 없었던 것입니다.
@Global()
@Module({
providers: [
{
provide: 'REDIS_CLIENT',
useValue: new Redis({
host: process.env.REDIS_HOST || 'localhost',
port: 6379,
keepAlive: 10000,
}),
},
{
provide: 'REDIS_WORKER',
useValue: new Redis({
host: process.env.REDIS_HOST || 'localhost',
port: 6379,
keepAlive: 10000,
}),
},
RedisService,
RedisWorker,
],
exports: ['REDIS_CLIENT', 'REDIS_WORKER', RedisService],
})
export class RedisModule {}
이를 해결하기 위한 방법은 간단했습니다.
Producer와 Consumer가 각각 독립적인 Redis Connection을 사용하도록 분리하는 것이었습니다.
따라서 저는 위 코드와 같이 REDIS_CLIENT(Producer 전용)와 REDIS_WORKER(Consumer 전용) 총 두 개의 Redis Connection을 만들어 각각 배정해주었고, 최종적으로 문제를 해결할 수 있었습니다.
그런데 저는 여기서 궁금한 것이 하나 생겼습니다.
Python의 FastAPI에서도 동일하게 이벤트 발행과 소비를 함께 수행하고 있었는데 왜 문제 없이 잘 동작하는건지 궁금해졌습니다.
이러한 부분에 대해서 찾아보니 이는 라이브러리의 동작 방식 차이 때문이라는 것을 알게 되었습니다.

NestJS에서 사용하고 있던 ioredis 라이브러리는 Connection의 Blocking 문제에 대해서 자동으로 우회하거나 별도의 Connection을 제공하는 동작을 제공하지 않았습니다.
반면 FastAPI에서 사용하고 있던 redis-py 라이브러리는 Blocking 명령을 수행하는 과정에서 별도의 Connection을 사용할 수 있는 구조를 제공하기 때문에 동일한 상황에서도 문제가 발생하지 않았습니다.
즉, FastAPI에서 문제가 없었던 것이 아니라, 라이브러리가 내부적으로 해당 문제에 대해서 우회해주며, 정상적으로 동작했던 것이었습니다.
의존성 주입(DI) 문제
해당 문제는 그렇게 복잡한 문제는 아니었습니다.
API 호출 구조에서 Redis 기반의 비동기 호출 구조로 바뀌며 의존성 주입을 어떻게 해야할지 고민했던 문제입니다.
def get_note_embedding_repository():
return NoteEmbeddingRepository()
def get_note_service(
repository: NoteEmbeddingRepository = Depends(get_note_embedding_repository)
):
return NoteService(repository)
def get_link_repository():
return LinkRepository()
def get_link_service(
repository: LinkRepository = Depends(get_link_repository)
):
return LinkService(repository)
def get_note_analyze_orchestrator(
note_service: NoteService = Depends(get_note_service),
link_service: LinkService = Depends(get_link_service)
):
return NoteAnalyzeOrchestrator(note_service, link_service)
@router.post(
"/analyze",
response_model=NoteAnalyzeResponse,
status_code=status.HTTP_201_CREATED
)
async def analyze_note(
request: NoteAnalyzeRequest,
orchestrator: NoteAnalyzeOrchestrator = Depends(get_note_analyze_orchestrator)
):
return await orchestrator.note_analyze(request)
API 호출 구조일 때는 FastAPI router에서 Depends를 사용해 의존성 주입을 할 수 있었습니다.
하지만 Redis Streams를 도입하면서 구조가 달라졌습니다.
AI 서버는 HTTP 요청을 받는 것이 아니라, Redis Stream에서 이벤트를 읽어 직접 처리하는 구조가 되었습니다.
즉, 코드의 시작점이 @router가 아니라 worker가 된 것입니다.
Depends를 그대로 사용하여 의존성 주입을 하고 싶었지만, FastAPI의 Depends 기능은 FastAPI의 요청/응답 생명주기 안에서만 사용할 수 있는 기능이었습니다.
def get_note_analyze_orchestrator(self):
note_repo = NoteEmbeddingRepository()
note_service = NoteService(note_repo)
link_repo = LinkRepository()
link_service = LinkService(link_repo)
redis_service = RedisService()
return NoteAnalyzeOrchestrator(note_service, link_service, redis_service)
그래서 저는 worker에서 단순하게 필요한 객체를 직접 생성하여 주입하는 방식을 선택하였습니다.
위와 같은 방식은 의존성을 직접 생성해야 한다는 단점은 있지만, Worker는 애플리케이션 실행 시 한 번만 생성되므로 성능상 부담은 거의 없었습니다.
최종적으로 정리하면, HTTP 요청을 처리하는 API 계층은 FastAPI의 Depends를 사용하고, Redis Worker는 생성자를 통해 직접 의존성을 주입하는 방식으로 역할을 분리하였습니다.
😊 마무리
✅ AI 서버에서 메인 서버를 호출하던 문제 해결
✅ Redis Streams 구조 도입
✅ 비동기 처리 도입을 위한 다양한 구조 수정 및 문제 해결
➡️ GraphRAG 전체 파이프라인 구현
[진행 상황 정리]
이번 글에서는 기존 문제를 해결하고 비동기 처리를 적용하기 위해 Redis Streams를 도입했던 과정에 대해서 정리해보았습니다.
단순히 Redis Streams를 적용했다는 관점보다는 왜 적용했고, 그 중에서도 왜 Streams를 선택했는지, 어떤 고민을 했었는지 등 제가 실제로 개발을 하면서 했던 고민들을 정리한다는 관점에서 적어보았습니다.
다음 글에서는 GraphRAG 전체 파이프라인을 구현하고 이에 대해서 정리해보는 글을 적어보려 합니다.
지금까지 읽어주셔서 감사드리며, 다음에 더 좋은 글로 돌아오도록 하겠습니다.
by. 대학생 개발자 주이어
KYT CODING COMMUNITY Discord 서버에 가입하세요!
Discord에서 KYT CODING COMMUNITY 커뮤니티를 확인하세요. 25명과 어울리며 무료 음성 및 텍스트 채팅을 즐기세요.
discord.com
KYT CODING COMMUNITY 가입하기!
'[Main Projects] > [SYNAPSE]' 카테고리의 다른 글
| [SYNAPSE 개발일지 #1] GraphRAG를 위한 AI 서버 아키텍처 설계 - Orchestrator 도입부터 DB 접근 권한과 도메인 소유권까지 (3) | 2026.07.02 |
|---|---|
| [SYNAPSE] 여러분은 무슨 목적으로 기록하시나요? 어쩌면 그 기록은 이미 죽었을지도 모릅니다. (2) | 2026.06.24 |