AI 채팅, streamedQuery로 우아하게 구현하기

Tech
2025-03-30

배경

이제는 ChatGPT나 Claude 같은 AI 채팅 서비스가 주변에서 흔하게 쓰입니다. 여행 계획을 세우거나 논문을 요약하고 코드를 작성하는 등 다양한 방식으로 우리 생활과 업무 방식에 영향을 미치고 있습니다.

AI 채팅 서비스들의 공통적인 특징은 실시간으로 누군가가 타이핑하는 것처럼 글자가 하나씩 나타나는 인터페이스입니다. 자연스러운 타이핑 효과는 단순한 시각적 효과를 넘어 사용자가 AI와 실제로 대화하는 듯한 느낌을 주는 중요한 요소입니다. 프론트엔드 개발자로서 구현 방식에 호기심을 가지게 되었는데, 생각보다 까다로운 기술적 과제들이 있었습니다.

이러한 인터페이스를 구현하기 위해서는 서버에서 데이터를 점진적으로 받아와 화면에 순차적으로 표시해야 합니다. 스트리밍 데이터를 처리하고, 상태를 관리하고, 에러 처리까지 고려하다 보면 코드가 금방 복잡해질 것이라는 생각이 들었습니다. 여러 접근 방식을 탐색하던 중, 스트리밍 데이터 처리의 복잡성을 놀라울 정도로 단순화하는 streamedQuery를 발견했습니다.

streamedQuery란?

streamedQuery는 TanStack Query v5.69.0에 추가된 helper function으로, 데이터가 조금씩 나눠서 오는 상황을 쉽게 처리할 수 있게 도와줍니다. 특히 AI 채팅 서비스처럼 답변이 한 번에 오지 않고 글자 단위로 천천히 오는 경우에 유용합니다.

일반적인 API 요청은 모든 데이터가 준비될 때까지 기다렸다가 한 번에 응답합니다. 하지만 AI 채팅처럼 긴 응답을 생성하는 경우라면 사용자가 오래 기다려야 하는 문제가 발생하지 않을까요? streamedQuery를 사용하면 첫 번째 데이터 청크가 도착하자마자 화면에 표시하고, 이후 데이터가 도착하는 대로 자연스럽게 화면을 업데이트할 수 있습니다. 복잡한 상태 관리가 별도로 필요하지 않다는 장점이 있습니다.

streamedQuery 사용 예시

import { useQuery, streamedQuery } from '@tanstack/react-query';
 
const SimpleChatbot = () => {
  const [prompt, setPrompt] = useState("");
  
  // AI 응답을 스트리밍으로 가져오기
  const { data, isPending, isFetching } = useQuery({
    queryKey: ['ai-chat', prompt],
    queryFn: streamedQuery({
      // 서버에서 청크 단위로 데이터 가져오기
      queryFn: async () => {
        const response = await fetch('/api/chat', {
          method: 'POST',
          body: JSON.stringify({ prompt }),
        });
        
        // 스트림을 AsyncIterator로 변환
        const reader = response.body.getReader();
        const decoder = new TextDecoder();
        
        return {
          async *[Symbol.asyncIterator]() {
            while (true) {
              const { done, value } = await reader.read();
              if (done) break;
              yield decoder.decode(value);
            }
          }
        };
      }
    }),
    enabled: !!prompt, // 프롬프트가 있을 때만 쿼리 실행
  });
  
  return (
    <div>
      {isPending ? (
        <div>응답 준비 중</div>
      ) : (
        <div className="ai-message">
          {/* 모든 청크를 결합해 표시 */}
          {data?.join('')}
          
          {/* 더 받아오는 중이면 로딩 표시 */}
          {isFetching && <span>...</span>}
        </div>
      )}
      
      <MessageInput onSend={setPrompt} />
    </div>
  );
};

위 코드에서 눈여겨볼 몇 가지 핵심 요소들이 있습니다.

  • useQuery와 헬퍼 함수 streamedQuery를 함께 사용해 스트리밍 데이터를 쉽게 관리합니다.

    • useQuery는 데이터(data), 로딩 상태(isPending, isFetching)를 자동으로 관리합니다.
    • streamedQuery는 데이터 청크를 자동으로 배열에 누적해 data에 저장합니다.
    • 복잡한 로직을 작성할 필요 없이 data?.join('')으로 간단하게 모든 응답 청크를 결합할 수 있습니다.
    • isPending은 첫 데이터가 도착하기 전까지 true이고, isFetching은 모든 데이터가 수신될 때까지 true입니다.
  • [Symbol.asyncIterator]() 메서드는 스트리밍 데이터를 처리하는 핵심 메커니즘입니다.

    • response.body.getReader()로 얻은 ReadableStreamDefaultReader를 통해 데이터를 청크 단위로 읽어옵니다.
    • async * 문법은 이 함수가 비동기 제너레이터임을 나타냅니다.
    • yield를 통해 각 데이터 청크를 순차적으로 반환합니다.
    • streamedQuery는 내부적으로 이 AsyncIterator를 순회하면서 각 청크를 누적 배열에 추가합니다.

비동기 데이터 순회의 필요성

이처럼 코드의 핵심 부분은 AsyncIterator를 활용한 비동기 데이터 순회입니다. 그렇다면 왜 이런 방식이 특히 필요할까요?

  • AI 모델은 응답을 토큰 단위로 점진적으로 생성하고, 서버는 각 토큰이 생성될 때마다 즉시 클라이언트로 전송합니다. 이렇게 하면 전체 응답 완성 전에도 사용자가 내용을 부분적으로 볼 수 있습니다.

  • 사용자 경험을 크게 향상할 수 있습니다. 동기식 처리는 전체 응답 완성까지 대기해야 하지만, 비동기식은 첫 토큰이 생성되는 즉시 UI에 표시합니다. 총 응답 시간은 같아도 비동기 방식이 첫 데이터를 훨씬 빨리 사용자에게 보여줍니다.

  • 네트워크 효율성 또한 향상됩니다. HTTP 연결을 한 번 열어 데이터를 점진적으로 전송함으로써 여러 번의 요청/응답 사이클이 필요 없어집니다. 이는 latency를 줄이고 서버와 클라이언트 간 효율적인 통신을 가능하게 합니다.

  • 대용량 데이터셋을 다룰 때는 메모리 효율성이 좋아집니다. 전체 데이터를 메모리에 로드하지 않고 필요한 만큼만 처리할 수 있어 제한된 리소스를 가진 모바일에서 특히 효과적입니다.

streamedQuery 등장 전후 비교

as-is

streamedQuery가 없다면 개발자가 직접 스트리밍 데이터를 처리하는 로직을 구현해야 합니다.

const ChatComponent = () => {
  // 여러 상태 관리
  const [messages, setMessages] = useState([]);
  const [isLoading, setIsLoading] = useState(false);
  const [isStreaming, setIsStreaming] = useState(false);
  const [error, setError] = useState(null);
  
  // 스트림 처리를 위한 ref
  const abortControllerRef = useRef(null);
  
  // 스트리밍 요청 함수
  const fetchStreamingResponse = async (prompt) => {
    try {
      setIsLoading(true);
      
      // 이전 요청 취소
      if (abortControllerRef.current) {
        abortControllerRef.current.abort();
      }
      
      // 새 AbortController 생성
      const abortController = new AbortController();
      abortControllerRef.current = abortController;
      
      const response = await fetch('/api/chat', {
        method: 'POST',
        headers: { 'Content-Type': 'application/json' },
        body: JSON.stringify({ prompt }),
        signal: abortController.signal
      });
      
      if (!response.ok) throw new Error('API 요청 실패');
      
      setIsLoading(false);
      setIsStreaming(true);
      
      const reader = response.body.getReader();
      const decoder = new TextDecoder();
      let partialResponse = '';
      
      // 스트림 처리
      while (true) {
        const { done, value } = await reader.read();
        if (done) break;
        
        const chunk = decoder.decode(value, { stream: true });
        partialResponse += chunk;
        
        // 상태 업데이트
        setMessages(prev => {
          const newMessages = [...prev];
          // 마지막 메시지 업데이트 또는 새 메시지 추가
          if (newMessages.length > 0 && newMessages[newMessages.length - 1].type === 'assistant') {
            newMessages[newMessages.length - 1].content = partialResponse;
          } else {
            newMessages.push({
              type: 'assistant',
              content: partialResponse
            });
          }
          return newMessages;
        });
      }
    } catch (err) {
      if (err.name !== 'AbortError') {
        setError(err.message);
      }
    } finally {
      setIsStreaming(false);
      abortControllerRef.current = null;
    }
  };
  
  // 컴포넌트 정리
  useEffect(() => {
    return () => {
      if (abortControllerRef.current) {
        abortControllerRef.current.abort();
      }
    };
  }, []);
  
  // UI 렌더링 코드 생략
};

to-be

streamedQuery를 사용하면 복잡한 코드가 크게 간소화됩니다.

const ChatComponent = () => {
  const [prompt, setPrompt] = useState("");
  const [messages, setMessages] = useState([]);
  
  // streamedQuery 사용
  const { data, isPending, isFetching, error } = useQuery({
    queryKey: ['chat', prompt],
    queryFn: streamedQuery({
      queryFn: async ({ signal }) => {
        const response = await fetch('/api/chat', {
          method: 'POST',
          headers: { 'Content-Type': 'application/json' },
          body: JSON.stringify({ prompt }),
          signal // 취소 처리를 위한 신호 전달
          // TanStack Query가 내부적으로 생성한 AbortController에서 제공되는 AbortSignal
        });
        
        if (!response.ok) throw new Error('API 요청 실패');
        
        const reader = response.body.getReader();
        const decoder = new TextDecoder();
        
        // AsyncIterator 반환
        return {
          async *[Symbol.asyncIterator]() {
            while (true) {
              const { done, value } = await reader.read();
              if (done) break;
              yield decoder.decode(value, { stream: true });
            }
          }
        };
      }
    }),
    enabled: !!prompt,
    onSettled: () => {
      if (!isFetching && data) {
        // 스트림 완료 시 메시지 목록에 추가
        setMessages(prev => [...prev, {
          type: 'assistant',
          content: data.join('')
        }]);
        setPrompt(""); // 입력 초기화
      }
    }
  });
  
  // 메시지 전송 처리
  const handleSendMessage = (userInput) => {
    // 사용자 메시지 추가
    setMessages(prev => [...prev, {
      type: 'user',
      content: userInput
    }]);
    // 프롬프트 설정하여 쿼리 트리거
    setPrompt(userInput);
  };
  
  // UI 렌더링 코드 생략
};

전후 비교 요약

as-isto-be
데이터 청크 누적partialResponse += chunk로 수동 누적자동 누적, data?.join('')로 모든 청크 결합
취소 처리AbortController로 관리signal만 전달
생명주기useEffect에서 정리 로직 구현자동 정리(별도 코드 불필요)

마치며

복잡한 데이터 스트리밍을 쉽게 구현할 수 있도록 돕는 streamedQuery는 AI와 실시간 데이터 처리의 중요성이 커지는 시점에 딱 맞춰 등장한 것 같습니다. 이제 개발자는 실시간 데이터 스트리밍의 복잡성에 압도되지 않고, 사용자에게 탁월한 경험을 제공하는 데 집중할 수 있습니다. AI 채팅 기능을 개발할 때 스트리밍 데이터 처리가 필요하다면, streamedQuery를 활용하는 것을 추천합니다.

참고 자료