LLM 구조화된 출력을 실시간 스트리밍하기 (feat. LangChain, SSE)

LLM 구조화된 출력을 실시간 스트리밍하기 (feat. LangChain, SSE)

LLM 기반의 서비스를 개발할 때 가장 중요한 것은 모델을 서비스의 목적에 맞게 적용하는 것입니다. 일관된 형식의 응답 데이터가 필요할 수도 있고, 사용자에게 응답을 실시간으로 보여주어야 할 수도 있죠.

이러한 기능은 대부분의 LLM 모델이 제공하는 구조화된 출력과 응답 스트리밍을 활용해 간단히 적용 가능합니다. 하지만 둘 다 필요한 경우는 어떨까요?

이번 아티클에서는 Riido AI에서 구조화된 출력과 실시간 스트리밍을 결합하여 사용자 경험을 개선한 과정을 공유하고자 합니다.

Riido AI - Before vs. After

배경

Riido AI는 프로젝트의 제목과 개요, 그리고 우리 팀원들의 정보를 입력하면 프로젝트 계획을 자동으로 생성해 주는 기능이에요. 결과물은 프로젝트 > 목표 > 작업 으로 이루어진 Riido의 계층 구조에 맞게 생성되며 각 작업의 마감 기한, 담당자 등 추가 정보도 생성해 줍니다.

Riido AI - 프로젝트 생성

기존 Riido AI의 가장 큰 문제점은 LLM 응답 생성 시간이 오래 걸리는 것이었어요. 프로젝트 하나의 계획을 생성하는데 평균 30초, 길게는 1분까지 소요되었습니다. 이렇게 응답 생성이 오래 걸리는 이유는, 응답 결과 즉 생성해야 하는 토큰 수가 많기 때문입니다.

하지만 아무리 결과물의 퀄리티가 높아도 사용자 입장에서는 10초도 기다리기 힘든 법이에요. 데이터 분석 결과, 로딩 창에서 기다리지 못하고 이탈하는 사용자가 많음을 확인하였습니다. Riido AI 사용에 걸리는 시간이 부담 요소로 작용하여 사용 자체를 기피하는 경우도 있었습니다.

문제 정의

1️⃣ 사용자는 LLM 응답이 완성되기까지 기다리기 힘듦

LLM 응답 생성 속도가 느림으로 인해 사용자 경험이 저하된다는 문제는 인지하였지만, 이에 대한 해결법으로 응답 생성 시간을 단축하는 것을 시도하였을 때는 잘 개선되지 않았어요. 응답 속도에 가장 큰 병목은 생성할 토큰 수인데요, 그렇기에 응답의 길이 즉 퀄리티를 희생하지 않고는 속도 개선이 어려웠습니다.

그래서 저희는 방향을 바꾸어 실시간 스트리밍을 통해 결과물을 바로 확인할 수 있게 하였고, 이를 통해 같은 대기 시간이어도 짧게 느껴지게끔 하여 사용자 경험이 개선되는 것을 목표로 하였습니다.

2️⃣ 구조화된 출력을 사용하는 LLM 응답을 스트리밍할 경우, 스트리밍 도중의 데이터를 해석할 수 없음

기존의 Riido AI 모델은 구조화된 출력을 사용하고 있었습니다. JSON 형태로 응답을 받는다는 뜻인데요, 이는 해당 데이터를 작업 리스트와 동일한 UI로 보여줄 수 있고 실제 작업과 동일한 형식으로 DB에 저장할 수 있게 하기 위해서입니다.

하지만 1️⃣에서 목표로 한 실시간 스트리밍을 구조화된 출력과 함께 적용하면 생기는 문제점이 있어요. 그것은 바로 구조화된 응답 데이터가 스트리밍 도중에는 구조화되어 있지 않다는 것입니다.

예시로, 아래 JSON이 LLM의 최종 응답이라고 가정할게요.

{
  "project": "랜딩 페이지 제작",
  "deadline": "2025-12-31",
  "milestones": ["디자인 확정", "프론트엔드 개발 완료"]
}

이 응답이 생성되는 도중, 즉 스트리밍 도중에는 어떤 모습일까요? 바로 아래와 같이 미완성된 JSON 형태일 것입니다. 이대로는 구조화된 데이터로 사용할 수 없고, 클라이언트에서 해당 데이터를 UI에 적용하여 보여줄 수도 없을 겁니다.

// '이' 까지만 생성된 상태
{
  "project": "랜딩 페이

그래서 저희는 1️⃣ LLM 응답 스트리밍을 구현하되, 2️⃣ 구조화된 출력이 스트리밍 도중에도 그 구조를 유지한 채로 받아올 수 있도록 하는 것을 개발 목표로 잡았습니다.

3️⃣ 기능 요구사항 정의

Riido AI의 구조화된 출력 실시간 스트리밍 기능에 대한 요구사항은 아래와 같이 설정하였습니다.

  1. LLM의 최종 응답은 사전의 정의한 구조의 JSON을 정확히 따라야 한다.
  2. 응답 스트리밍 도중에는 하나의 Riido 작업, 즉 작업 리스트에서 하나의 행이 완성될 때마다 이를 화면에 업데이트하여 보여준다.

개발 과정

구현 과정을 설명하기에 앞서 사용한 기술 스택을 소개할게요.

  • 라이브러리 - LangChain.js zod
  • LLM 모델 - gpt-4o-mini
  • 서버 - Nest.js , 클라이언트 - Next.js

1️⃣ 구조화된 출력 (Structured Output) 적용

LangChain 기반의 모델에서 구조화된 출력을 받는 방법은 여러 가지가 있지만, 그중 개발 과정에서 고려한 방식은 크게 두 가지였습니다.

  1. GPT 모델에 제공되는 JSON 모드를 사용하고, 프롬프트 내에 응답 예시를 포함하여 JSON 구조를 제시하는 방법
  2. 코드로 JSON Schema를 정의하여 모델과 직접 연동하는 Structured Output 기능

최종적으로는 Schema를 정의할 수 있고, 응답이 해당 Schema를 따를 것을 보장해 주는 2. 방식을 채택하였습니다.

그다음에는 필요한 JSON 구조의 설계를 진행하였는데요, 여기서 고려한 점은 (1) 각 작업에 대해 생성해야 하는 각종 추가 정보, 그리고 (2) 스트리밍 시 한 번에 업데이트되는 데이터의 단위는 하나의 작업이라는 점입니다. 완성된 JSON 구조는 zod 라이브러리를 활용하여 코드로 Schema를 정의하였으며, 아래와 같습니다.

import { z } from 'zod';

export const AiComponentSchema = z.object({
  componentType: z
    .enum(['project', 'milestone', 'task'])
    .describe(
      `프로젝트 > 목표 > 작업 으로 이루어진 업무의 계층 구조 단위입니다.`,
    ),
  title: z.string().describe(`작업의 제목입니다.`),
  endDate: z
    .string()
    .describe(
      `작업이 완료되어야 하는 마감 기한입니다. 'YYYY-MM-DD' 형식의 문자열입니다.`,
    ),
  members: z
    .array(z.string())
    .describe(`작업의 담당자입니다. 각 담당자의 이름의 배열입니다.`),
});
export type AiComponentType = z.infer<typeof AiComponentSchema>;

export const AiProjectResponseSchema = z.object({
  components: z.array(AiComponentSchema),
});

Schema를 정의한 후에는 LangChain 모델에 연결하여 구조화된 출력을 하도록 withStructuredOutput 메서드를 사용하였습니다. 아래 예시 코드는 시스템 프롬프트, User 프롬프트, 그리고 ZOD Schema를 각각 입력받아 하나의 응답을 생성하는 모델 클래스이며, createChatCompletion 메서드로 응답을 생성합니다.

import { HumanMessage, SystemMessage } from '@langchain/core/messages';
import { ChatPromptTemplate } from '@langchain/core/prompts';
import { ChatOpenAI } from '@langchain/openai';
import { ZodTypeAny } from 'zod';

export class StructuredOutputModel {
  private readonly chatModel: ChatOpenAI<any>;

  constructor(apiKey: string) {
    this.chatModel = new ChatOpenAI({
      apiKey: apiKey,
      model: 'gpt-4o-mini',
      temperature: 0.7,
    });
  }

  // System prompt, user prompt, 응답의 Schema 를 입력으로 받아 응답 1개 생성
  async getChain(systemPrompt: string, userPrompt: string, schema: ZodTypeAny) {
    const chatPromptTemplate = ChatPromptTemplate.fromMessages([
      new SystemMessage(systemPrompt),
      new HumanMessage(userPrompt),
    ]);

    const structuredOutputModel = this.chatModel.withStructuredOutput(schema);

    const chain = chatPromptTemplate.pipe(structuredOutputModel);

    return chain;
  }

  async createChatCompletion(
    systemPrompt: string,
    userPrompt: string,
    schema: ZodTypeAny,
  ) {
    const chain = await this.getChain(systemPrompt, userPrompt, schema);

    return await chain.invoke(userPrompt);
  }
}

const model = new StructuredOutputModel(API_KEY);
const result = model.createChatCompletion(
  systemPrompt,
  userPrompt,
  AiProjectResponseSchema,
);

2️⃣ LangChain 모델 응답을 Event Stream 으로 받아오기

LangChain 모델 응답을 스트림으로 받아오기 위해 streamEvents 메서드를 사용했습니다. 데이터 형식은 text/event-stream으로 설정하였으며, LangChain의 streamEvents 메서드를 통해 스트림 데이터를 받아올 수 있게 하였습니다.

const model = new StructuredOutputModel(API_KEY);
const chain = await model.getChain(
  systemPrompt,
  userPrompt,
  AiProjectResponseSchema,
);

// streamEvents 사용
const stream = await chain.streamEvents(userPrompt, {
  version: 'v2',
  encoding: 'text/event-stream',
});
const decoder = new TextDecoder();

for await (const chunk of stream) {
  // TextDecoder 를 사용하여 string 형식으로 변환
  const decodedStream = decoder.decode(chunk);

  console.log(decodedStream);
}

3️⃣ 스트리밍 데이터 후처리를 통한 미완성 응답의 구조화

앞서 언급하였듯이, 응답 데이터를 스트리밍으로 받아오는 도중에는 미완성된 JSON 문자열의 형태입니다.

이 문제를 해결하기 위해 각 스트림 이벤트 데이터를 후처리하여 유효한 JSON 구조의 데이터로 변환하는 파싱 과정을 추가했습니다.

{
	"components": [
		{
			"componentType": "project",
			"title": "프로젝트",
			"endDate": "2024-12-31",
			"members": ["김철수"]
		}
	]
}

다시 앞서 정의한 JSON의 구조를 살펴볼게요. 요구사항에 의하면, 우리가 클라이언트에게 스트림 데이터를 보내줘야 하는 시점은 components key에 해당하는 배열의 요소가 하나씩 완성될 때입니다.

그렇다면 현재 출력 중인 응답이 components 배열의 정확히 N 개의 요소가 완성된 시점이라면 아래와 같은 형태일 거에요. 여기까지 생성이 되었다면 이걸 유효한 JSON으로 바꾸려면 2가지만 변경이 되면 됩니다.

  1. 마지막에 오는 , 를 제거
  2. 접미사로 ]} 를 추가하여 닫히지 않은 괄호 닫기
{ 
	"components": [
		{
			"componentType": "project",
			"title": "프로젝트",
			"endDate": "2024-12-31",
			"members": ["김철수"]
		},

위와 같이, 문자열을 약간만 수정하면 미완성된 JSON을 유효한 JSON으로 만들 수 있습니다. 이를 위해 정규식 패턴 매칭을 활용하여 마지막 , 까지의 부분을 잘라내고, }] 등 괄호를 닫아주는 접미사를 추가한 후 JSON 파싱을 시도하여 유효성 검증하는 함수를 작성하였습니다.

const safeParseJSON = (input: string) => {
  try {
    return JSON.parse(input);
  } catch (e) {
    return null;
  }
};

const parseStreamResponse = (response: string) => {
  try {
    const suffixes = [`]}`, `}`, ``];

    // 정규식을 통해 마지막 , 까지 제거
    const strippedResponse = response.replace(/,[^,]*$/g, '');

    // 가능한 모든 접미사 조합에 대해 JSON.parse 시도
    for (const suffix of suffixes) {
      const dataStripped = safeParseJSON(`${strippedResponse}${suffix}`);
      const data = safeParseJSON(`${response}${suffix}`);

      if (dataStripped && dataStripped.components) return dataStripped;
      if (data && data.components) return data;
    }

    return null;
  } catch (e) {
    return null;
  }
};

스트리밍 도중 LLM이 토큰을 생성할 때마다 파싱을 시도하고, 성공적으로 파싱 된 데이터만 클라이언트로 전송하도록 처리했습니다.

4️⃣ SSE(Server-Sent Events)를 통해 클라이언트에게 스트림 데이터 전송

후처리한 스트림 데이터를 클라이언트로 전송하는 데에는 SSE를 사용했습니다.

SSE는 서버에서 클라이언트로 실시간 이벤트를 스트리밍 하는 기술입니다. 하나의 HTTP 연결을 유지한 상태로 데이터 전송이 된다는 점에서 Polling, WebSocket에 비해 효율적이고, 클라이언트로 단방향 데이터 스트리밍을 하는데 이상적인 방식이에요.

데이터 형식은 text/event-stream으로 설정하였고, 각 스트림 이벤트의 데이터 구조는 LangChain에서 제공하는 것과 동일하되 on_chain_stream 이벤트 타입으로 지정하여 전송하였습니다.

// 서버 : text/event-stream 형태로 SSE 전송

res.setHeader('Connection', 'keep-alive');
res.setHeader('Content-Encoding', 'none');
res.setHeader('Cache-Control', 'no-cache, no-transform');
res.setHeader('Content-Type', 'text/event-stream; charset=utf-8');
res.flushHeaders();

let streamedResponse: string = '';
let componentsCount = 0;

for await (const chunk of stream) {
  const decodedStream = decoder.decode(chunk);
  const splitDecodedStream = decodedStream.split('\\n');
  const data = JSON.parse(splitDecodedStream[1].replace('data: ', ''));

  if (data?.event === 'on_chat_model_stream') {
    // 현재까지 스트림된 토큰 합치기
    streamedResponse = `${streamedResponse}${data.data.chunk.kwargs.content}`;

    const parsedResponse = parseStreamResponse(streamedResponse);

    if (parsedResponse?.components?.length) {
      // 스트림 데이터가 파싱되고 배열 길이가 증가한 경우 SSE 전송
      if (parsedStreamedResponse.components.length > componentsCount) {
        const onChainStreamData: any = {
          event: 'on_chain_stream',
          data: {
            chunk: {
              components: parsedStreamedResponse.components,
            },
          },
        };

        res.write(
          `event: data\\ndata: ${JSON.stringify(onChainStreamData)}\\n\\n`,
        );

        componentsCount = parsedStreamedResponse.components.length;
      }
    }
  }
}

클라이언트 측에서는 @microsoft/fetch-event-source 라이브러리를 이용해 데이터를 수신하며, 전송된 이벤트 타입에 따라 화면을 실시간으로 업데이트하는 핸들러를 작성했습니다.

3️⃣의 스트림 데이터 파싱 과정을 클라이언트에서 할 수도 있지만, 서버에서 처리함으로써 불필요한 SSE 이벤트 전송을 줄이고 클라이언트 성능 부하를 낮추었습니다.

// 클라이언트 : SSE 수신 및 이벤트 타입에 따라 처리
'use client';

import { fetchEventSource } from '@microsoft/fetch-event-source';

const fetchAiStreamApi = async () => {
  try {
    await fetchEventSource(API_URL, {
      method: 'POST',
      // SSE 이벤트 타입에 따라 처리
      onmessage: (message) => {
        if (message.event !== 'data') return;

        const eventSourceMessage = JSON.parse(message.data);

        if (eventSourceMessage.event === 'on_chain_stream') {
          // 스트림 데이터 업데이트됨
          const newComponents = eventSourceMessage?.data?.chunk?.components;

          console.log(newComponents);
        }
      },
      onerror: (error) => {
        console.error('SSE Error:', error);
      },
    });
  } catch (error) {
    console.error('Fetch Event Source Error:', error);
  }
};

5️⃣ 구현 결과

위 과정을 통해 구조화된 출력에 사용할 Schema를 정의하고 모델에 연동시켰으며, LangChain의 스트림 기능을 활용하여 이벤트 형태로 응답을 받아오게 하였습니다. 그리고 스트리밍 도중의 응답을 텍스트 후처리를 통해 구조화된 JSON으로 변환하여 클라이언트가 해석할 수 있는 형태로 만들었습니다.

JSON 변환이 성공한 시점에만 SSE로 해당 데이터를 클라이언트에 전송하고, 클라이언트는 이를 수신하여 화면에 보여주는 플로우를 구축하였습니다.

적용 결과

새로워진 Riido AI - 프로젝트 계획을 실시간으로

실시간 스트리밍과 구조화된 출력을 적용한 결과, Riido AI는 이제 프로젝트 계획 생성 시 작업이 생성되는 대로 즉시 화면에 보여줄 수 있게 되었습니다.

스트리밍 기능의 사용자 경험 개선을 평가하기 위한 지표로 저희는 2가지를 설정하였습니다.

  1. 응답 생성 시간 (Response Generation Time) - LLM 응답 전체가 완성되기까지 걸리는 시간이에요. 사용자가 최종 결과물을 확인하고 후속 액션을 취할 수 있을 때까지 걸리는 시간이기도 합니다.
  2. 스트리밍 지연 시간 (Stream Latency) - 생성 중인 LLM 응답이 화면에 업데이트될 때까지 걸리는 평균 시간 간격입니다. 스트리밍이 적용된 Riido AI에서는 하나의 Riido 작업이 생성되어 화면에 보일 때까지 걸리는 시간을 의미합니다. 스트리밍 적용 전의 Riido AI에서는 전체 응답이 생성된 후에야 화면에 반영되기 때문에, 응답 생성 시간 = 스트리밍 지연 시간으로 볼 수 있습니다.

스트리밍 적용 이후의 해당 지표 값의 변화는 아래와 같아요.

지표 AS-IS (s) TO-BE (스트리밍 적용) (s)
응답 생성 시간 35.4 10.0
스트리밍 지연 시간 35.4 0.8

두 가지 지표에 대해 모두 큰 개선이 있었습니다.

응답 생성 시간은 약 1/3 이하로 크게 감소하였습니다. 이 부분에서의 개선은 실시간 스트리밍으로 얻은 것이 아니라, 스트리밍 구현을 위해 기존 응답의 JSON Schema를 업데이트하며 더욱 간소화하였기 때문입니다. 예시로, 기존에는 프로젝트 > 목표 > 작업 의 계층 구조 데이터를 nested object로 하였는데, 이번 업데이트를 통해 nested 구조를 제거하고 1차원 배열 형태로 변경하여 데이터 구조 간소화를 통한 속도 개선이 있었습니다.

스트리밍 지연 시간의 경우 당연히 스트리밍 적용을 통해 개선된 항목이며, 이제는 사용자가 최초 결과물을 확인하기까지 0.8초 가량밖에 걸리지 않게 되었습니다. 이는 저희가 의도한 대로 전체 응답 생성을 기다리는 시간을 더 빠르고 지루하지 않게 느껴질 수 있도록 해주었습니다.

저희 스위그팀 내에서도 이번 업데이트 후 팀원들이 더 편하게 그리고 더 자주 Riido AI 기능을 사용하게 되었어요.

Riido AI - 프로젝트 생성 (스트리밍 적용)

스트리밍 도입 후 Riido AI 사용량 증가

아직 초기임에도 불구하고 이번 실시간 스트리밍 업데이트 후 Riido AI 기능의 사용량이 증가함을 확인하였는데요, 업데이트 전 1개월과 업데이트 후를 비교하였을 때 일별 Riido AI 사용 횟수는 무려 39% 증가하였습니다.

“AI 기능이 전보다 훨씬 빨라져서 사용하기 편해졌어요.”라는 사용자 피드백도 받게 되었답니다.

향후 계획

더욱 디테일한 부분까지 생성해주는 Riido AI

이번 업데이트는 기존 Riido AI 기능의 사용성을 개선하는 작업이었어요. 하지만 Riido AI의 여정은 이제 시작입니다.

프로젝트의 전체 구조뿐만 아니라 작업의 상세 정보들을 더 구체적으로 채워주는 기능이 빠른 시일 내로 개발된 예정인데요, 작업을 성공적으로 완성하기 위해 필요한 기획서와 유용한 아티클을 첨부해 주는 부분입니다.

프로젝트 개요만 입력하는 작업 리스트를 생성하는 AI, 이미 생성된 작업의 상세 내용을 작성해 주는 AI 등 앞으로의 Riido AI는 관심사 별로 기능을 분리하고 각자의 목적에 맞는 최고의 결과물 을 제공하도록 개선할 것입니다.

고객 맞춤형 추천을 제공하는 Riido AI

기존의 Riido AI는 무(無)에서 유(有)를 창조하는 것이었습니다. 사용자가 입력한 데이터 기반으로 프롬프트를 생성하였죠.

하지만 앞으로의 Riido AI는 유에서 유를 창조하는 방향으로 나아가고자 해요. 프로젝트 계획, 리스크 관리 등의 기능은 해당 워크스페이스에 쌓인 방대한 히스토리 데이터를 활용해 RAG 기반의 개인화된 추천 기능으로 개발할 예정입니다.

Riido AI는 Riido AI Agent가 되어, 우리 팀만을 위한 최고의 IT 프로젝트 관리자로 만들 것입니다. 앞으로도 Riido AI에 많은 관심 부탁드립니다!

마치며

Riido는 계획, 실행, 운영을 모두 아우르는 최고의 IT 프로젝트 관리 툴이 되기 위해 AI를 적극 활용하고 있습니다.

Riido AI를 직접 사용해 보고 싶다면 아래 링크를 통해 확인해 보세요!

Riido Software Engineer 최우진

Riido 뤼이도 - 가장 쉬운 개발 프로젝트 관리 툴
Riido는 툴 학습 난이도를 대폭 낮춰 지금부터 바로 개발 프로세스를 체계적으로 관리할 수 있게 합니다.