How to Build a RAG Agent with LangGraph and Needle
Today, let’s explore how to build sophisticated Retrieval-Augmented Generation (RAG) Agents with state management and complex workflows using two powerful tools
Uploads documents to a Needle collection.
Waits for the documents to finish indexing.
Searches those documents based on a user’s query.
Responds with an answer using a Large Language Model (LLM).
We’ll build this agent using two tools:
Needle, a “RAG API and Knowledge Threading” platform for document ingestion, chunking, embedding, and semantic search.
LangGraph, a library that lets you create stateful, multi-step LLM workflows (or “agents”) that can maintain state between steps.
By the end of this tutorial, you’ll have a fully functional RAG agent that uses Needle for document retrieval and LangGraph for orchestrating a multi-step process.
Why Needle and LangGraph?
Needle handles the heavy lifting of storing and indexing your documents. Its built-in chunking, embedding, and semantic search features make it easy to build RAG applications without managing complex infrastructure.
LangGraph specializes in creating stateful workflows for LLM-powered agents. It helps you chain together steps, like uploading files, waiting for indexing, searching, and more, while keeping track of important context along the way.
Together, these tools provide a clean, modular approach to building powerful RAG workflows. You can reference this GitHub repository (replace with your own link) that demonstrates everything we’ll cover.
Prerequisites
Before you begin, make sure you have:
Python 3.9+ installed.
A Needle account and API key.
An OpenAI account and API key.
Dependencies installed:
pip install needle langgraph langchain_community openai
A Needle collection already created (or you can create one). You’ll need the
COLLECTION_ID
for that collection.
Demo Preview
By the end of this tutorial, your agent will:
Add a file (for example, documentation) to the Needle collection.
Pause until Needle indexes that file.
Perform a semantic search against that document.
Return an answer to a user’s question.
The Code (Step-by-Step)
Below is a complete, annotated example showing how to build and run this workflow. Feel free to copy and adapt it to your own needs.
Step 1: Initialize the Environment
import os
import time
from typing import List, Union, Literal, TypedDict
# LangChain & OpenAI
from langchain.chat_models import ChatOpenAI
from langchain.schema import HumanMessage, AIMessage
from langchain.prompts.chat import ChatPromptTemplate
from langchain.chains import create_stuff_documents_chain, create_retrieval_chain
# LangChain Community
from langchain_community.document_loaders import NeedleLoader
from langchain_community.retrievers import NeedleRetriever
# LangGraph
from langgraph.graph import StateGraph, START, END
from langgraph.types import ToolMessage
from langgraph.decorators import tool
from langgraph.nodes import ToolNode
from typing_extensions import Annotated
from langgraph.utils import add_messages
#####################################################
# 1) Set up API Keys and Collection
#####################################################
NEEDLE_API_KEY = os.environ.get("NEEDLE_API_KEY")
OPENAI_API_KEY = os.environ.get("OPENAI_API_KEY")
if not NEEDLE_API_KEY or not OPENAI_API_KEY:
raise ValueError(
"Required environment variables NEEDLE_API_KEY and OPENAI_API_KEY must be set"
)
COLLECTION_ID = "clt_01JGVAHK2V00KEJJNR84BS1FKA" # Replace with your own collection ID
print("✅ API keys validated and collection ID set.")
#####################################################
# 2) Define Our Agent State
#####################################################
class AgentState(TypedDict):
messages: Annotated[List[Union[HumanMessage, AIMessage, ToolMessage]], add_messages]
file_added: bool
indexed: bool
#####################################################
# 3) Initialize Needle Loader and Retriever
#####################################################
needle_loader = NeedleLoader(
needle_api_key=NEEDLE_API_KEY,
collection_id=COLLECTION_ID
)
retriever = NeedleRetriever(
needle_api_key=NEEDLE_API_KEY,
collection_id=COLLECTION_ID
)
#####################################################
# 4) Define Tools for Needle Operations
#####################################################
@tool
def add_file_to_collection() -> str:
"""
Add a file to the Needle collection.
Replace 'docs.needle-ai.com' and the URL with your own file or website.
"""
print("\n📥 Adding documentation to collection...")
files = {
"docs.needle-ai.com": "https://docs.needle-ai.com"
}
needle_loader.add_files(files=files)
print("✅ File added successfully")
return "File added successfully"
@tool
def search_collection(query: str) -> str:
"""
Search the Needle collection using a retrieval chain.
Returns a direct answer from the retrieved context.
"""
print(f"\n🔍 Searching collection for: '{query}'")
llm = ChatOpenAI(temperature=0)
system_prompt = """
You are an assistant for question-answering tasks.
Use the following pieces of retrieved context to answer the question.
If you don't know, say so concisely.\n\n{context}
"""
prompt = ChatPromptTemplate.from_messages([
("system", system_prompt),
("human", "{input}")
])
# Create a chain that retrieves documents and then answers the question
question_answer_chain = create_stuff_documents_chain(llm, prompt)
rag_chain = create_retrieval_chain(retriever, question_answer_chain)
print("⚡ Executing RAG chain...")
response = rag_chain.invoke({"input": query})
print("✨ Search complete")
# Return the final answer
return str(response.get('answer', response))
#####################################################
# 5) Bind Tools to the Model
#####################################################
model = ChatOpenAI(model="gpt-4", temperature=0).bind_tools([
add_file_to_collection,
search_collection
])
Explanation
NeedleLoader and NeedleRetriever: These classes handle file ingestion and retrieval for the collection you specify in
COLLECTION_ID
.Tools: We create two “tools” that the agent can call:
add_file_to_collection()
– loads a file into Needle.search_collection(query)
– uses a retrieval chain to answer a question with context from the newly added file.
Step 2: Define the Workflow Logic
A workflow manages the sequence of steps. We’ll use LangGraph to either:
Ask the AI model what to do next,
Wait for indexing,
Or run a tool to add/search files.
#####################################################
# 6) Define Workflow Logic
#####################################################
def should_continue(state: AgentState) -> Literal["tools", "agent", END]:
messages = state["messages"]
if not messages:
# If no messages yet, prompt the AI
return "agent"
last_message = messages[-1]
# If the last message came from a tool
if isinstance(last_message, ToolMessage):
# Check if it was "File added successfully"
if last_message.content == "File added successfully":
state["file_added"] = True
print("\n📌 File addition confirmed")
return "agent"
print("\n🏁 Search complete, ending workflow")
return END
# If the last message was from the AI
if isinstance(last_message, AIMessage):
# If we've already added a file and used the search tool, we can end
if any(
isinstance(m, ToolMessage) and m.content != "File added successfully"
for m in messages[:-1]
):
return END
# If the AI asks to call a tool
if hasattr(last_message, "tool_calls") and last_message.tool_calls:
# If the file is added but not indexed, we wait
if state.get("file_added") and not state.get("indexed"):
print("\n⏳ Waiting for indexing to complete...")
return "agent"
print("\n🛠️ Executing tool calls...")
return "tools"
return "agent"
def call_model(state: AgentState):
messages = state["messages"]
if not messages:
print("\n🤖 Starting conversation...")
return {
"messages": [
HumanMessage(content="Let's add the file to our Needle collection and then search for what it says about Needle.")
]
}
# Wait for indexing if file is added but not yet indexed
if state.get("file_added") and not state.get("indexed"):
print("\n⏳ Waiting 30 seconds for file indexing...")
time.sleep(30)
state["indexed"] = True
print("✅ Indexing complete")
return {
"messages": [
HumanMessage(content="File has been indexed. Now searching for information about Needle...")
]
}
# Otherwise, let the AI respond
response = model.invoke(messages)
return {"messages": [response]}
Explanation
should_continue: Checks the conversation’s latest message to decide whether to:
Call a tool (
"tools"
),Have the AI speak again (
"agent"
),End the workflow (
END
).
call_model: Either starts the conversation, waits for indexing, or calls the bound model (
model
) to generate the next message.
Step 3: Compile and Run the Workflow
Now we’ll tie everything together and execute our multi-step process.
#####################################################
# 7) Compile and Execute the Graph
#####################################################
# Create a graph of states and transitions
workflow = StateGraph(AgentState)
workflow.add_node("agent", call_model)
workflow.add_node("tools", ToolNode([add_file_to_collection, search_collection]))
workflow.add_edge(START, "agent")
workflow.add_conditional_edges("agent", should_continue, {
"tools": "tools",
"agent": "agent",
END: END
})
workflow.add_edge("tools", "agent")
app = workflow.compile()
#####################################################
# 8) Initialize the State and Run
#####################################################
initial_state = {
"messages": [],
"file_added": False,
"indexed": False
}
print("\n🚀 Starting Needle operations...")
print("=" * 50)
for step in app.stream(initial_state, {"recursion_limit": 10}):
# Print messages from each step
if "messages" in step.get("agent", {}):
for msg in step["agent"]["messages"]:
msg.pretty_print()
elif "messages" in step.get("tools", {}):
for msg in step["tools"]["messages"]:
msg.pretty_print()
print("\n✨ Workflow complete!")
print("=" * 50)
Explanation
StateGraph: Manages transitions between the
"agent"
(AI decisions) and"tools"
(tool execution).app.stream: Steps through the workflow until we reach the end state.
Trying It Out
Set environment variables:
export NEEDLE_API_KEY="your-needle-api-key"
export OPENAI_API_KEY="your-openai-api-key"
Run the script:
python rag_workflow.py
Observe:
The script prompts, “Let’s add the file to our Needle collection…”
It calls
add_file_to_collection
.Waits 30 seconds to allow indexing.
Searches for content about Needle.
Ends the workflow when the AI is satisfied with the final response.
How It Works (In a Nutshell)
Initialize: You specify your API keys, collection ID, and create a loader/retriever.
Add a File: The agent calls
add_file_to_collection
.Indexing Delay: We wait 30 seconds to let Needle index the file.
Search: The agent queries the newly added file for the user’s question.
Multi-Step: LangGraph tracks these steps so each one executes at the right time.
Key Takeaways
Minimal Dependencies:
Needle: For document ingestion and retrieval.
LangGraph: For orchestrating a stateful workflow.
OpenAI: For LLM responses.
Stateful Workflows: LangGraph helps you split your process into small, manageable steps.
Beginner-Friendly: The code waits for indexing automatically, so you don’t have to remember to do it manually.
Easily Extensible: You can add more steps (like transformations or additional data checks) without altering the rest of the workflow.
Conclusion
Combining Needle and LangGraph allows you to build RAG agents that gracefully handle multi-step processes, such as uploading documents, waiting for indexing, and searching. This example is just a starting point: customize the wait time, build more elaborate logic, or add new tools to your workflow.
Next Steps
Check out the Needle documentation for more details on advanced indexing options.
Explore LangGraph on GitHub for building more sophisticated multi-step LLM workflows.
Browse LangChain Community integrations for more ways to enhance your agent.
If you found this post helpful, please subscribe below to stay updated on new tutorials and best practices in the world of LLMs, retrieval, and software development.
Happy building!
Thanks for reading! If you found this post helpful, consider subscribing to stay updated on all things related to AI workflows, agents, and retrieval.