06. RAG and Knowledge Base¶
Why This Chapter?¶
A regular agent only knows what it was trained on (up to the cut-off date). It won't know your local instructions like "How to restart Phoenix server according to protocol #5" unless you provide them at runtime.
RAG (Retrieval Augmented Generation) is a "cheat sheet lookup" pattern. The agent first pulls relevant text from a knowledge base, then acts.
Without RAG, an agent can't reliably use your documentation, protocols, and knowledge base. With RAG, it can: fetch what's relevant and act according to your instructions.
Real-World Case Study¶
Situation: A user writes: "Restart Phoenix server according to protocol"
Problem: The agent doesn't know the Phoenix server restart protocol. It may perform a standard restart that doesn't match your procedures.
Solution: With RAG, the agent looks up the protocol in the knowledge base before acting. It finds the document "Phoenix server restart protocol: 1. Turn off load balancer 2. Restart server 3. Turn on load balancer" and follows the steps.
Theory in Simple Terms¶
How Does RAG Work?¶
- Agent receives a request from the user
- Agent searches for information in the knowledge base via a search tool
- Knowledge base returns relevant documents
- Agent uses the information to perform the action
How Does RAG Work? — Magic vs Reality¶
Magic:
Agent "knows" it needs to search the knowledge base and finds the needed information on its own
Reality:
Complete RAG Protocol¶
Step 1: User Request
messages := []openai.ChatCompletionMessage{
{Role: "system", Content: "You are a DevOps assistant. Always search knowledge base before actions."},
{Role: "user", Content: "Restart Phoenix server according to protocol"},
}
Step 2: Model Receives Search Tool Description
tools := []openai.Tool{
{
Function: &openai.FunctionDefinition{
Name: "search_knowledge_base",
Description: "Search the knowledge base for documentation, protocols, and procedures. Use this BEFORE performing actions that require specific procedures.",
Parameters: json.RawMessage(`{
"type": "object",
"properties": {
"query": {"type": "string", "description": "Search query"}
},
"required": ["query"]
}`),
},
},
{
Function: &openai.FunctionDefinition{
Name: "restart_server",
Description: "Restart a server",
Parameters: json.RawMessage(`{
"type": "object",
"properties": {
"hostname": {"type": "string"}
},
"required": ["hostname"]
}`),
},
},
}
Step 3: Model Generates Tool Call for Search
resp1, _ := client.CreateChatCompletion(ctx, openai.ChatCompletionRequest{
Model: "gpt-4o-mini",
Messages: messages,
Tools: tools,
})
msg1 := resp1.Choices[0].Message
// msg1.ToolCalls = [{
// Function: {
// Name: "search_knowledge_base",
// Arguments: "{\"query\": \"Phoenix restart protocol\"}"
// }
// }]
Why did the model generate a tool_call for search?
- System Prompt says: "Always search knowledge base before actions"
- Tool Description says: "Use this BEFORE performing actions"
- The model identifies the word "protocol" in the request and links it to the search tool
Step 4: Runtime (Your Code) Executes Search
Note: Runtime is the agent code you write in Go. See Chapter 00: Preface for definition.
func searchKnowledgeBase(query string) string {
// Simple keyword search (in production - vector search)
knowledgeBase := map[string]string{
"protocols.txt": "Phoenix server restart protocol:\n1. Turn off load balancer\n2. Restart server\n3. Turn on load balancer",
}
for filename, content := range knowledgeBase {
if strings.Contains(strings.ToLower(content), strings.ToLower(query)) {
return fmt.Sprintf("File: %s\nContent: %s", filename, content)
}
}
return "No documents found"
}
result1 := searchKnowledgeBase("Phoenix restart protocol")
// result1 = "File: protocols.txt\nContent: Phoenix server restart protocol:\n1. Turn off load balancer..."
Step 5: Search Result Added to Context
messages = append(messages, openai.ChatCompletionMessage{
Role: "tool",
Content: result1, // All found documentation!
ToolCallID: msg1.ToolCalls[0].ID,
})
// Now messages contains:
// [system, user, assistant(tool_call: search_kb), tool("File: protocols.txt\nContent: ...")]
Step 6: Model Receives Documentation and Acts
// Send updated history (with documentation!) to model
resp2, _ := client.CreateChatCompletion(ctx, openai.ChatCompletionRequest{
Model: "gpt-4o-mini",
Messages: messages, // Model receives found documentation!
Tools: tools,
})
msg2 := resp2.Choices[0].Message
// Model receives in context:
// "Phoenix server restart protocol:\n1. Turn off load balancer..."
// Model generates tool calls according to protocol:
// msg2.ToolCalls = [
// {Function: {Name: "restart_server", Arguments: "{\"hostname\": \"phoenix\"}"}},
// // Or first turn off load balancer, then server
// ]
Why this isn't magic:
- Model doesn't "know" the protocol — it receives it in context after search
- Search is a regular tool — same as
pingorrestart_service - Search result is added to
messages[]— model receives it as a new message - Model generates actions based on context — it processes documentation and follows it
Takeaway: RAG is not magic "knowledge", but a mechanism for adding relevant information to the model's context through a regular tool call.
Simple RAG vs Vector Search¶
In this lab, we implement simple RAG (keyword search). In production, vector search (Semantic Search) is used, which searches by meaning, not by words.
Simple RAG (Lab 07):
Vector search (production):
// 1. Documents are split into chunks and converted to vectors (embeddings)
chunks := []Chunk{
{ID: "chunk_1", Text: "Phoenix restart protocol...", Embedding: [1536]float32{...}},
{ID: "chunk_2", Text: "Step 2: Turn off load balancer...", Embedding: [1536]float32{...}},
}
// 2. User query is also converted to vector
queryEmbedding := embedQuery("Phoenix restart protocol") // [1536]float32{...}
// 3. Search for similar vectors by cosine distance
similarDocs := vectorDB.Search(queryEmbedding, topK=3)
// Returns 3 most similar chunks by meaning (not by words!)
// 4. Result is added to model context the same way as in simple RAG
result := formatChunks(similarDocs) // "Chunk 1: ...\nChunk 2: ...\nChunk 3: ..."
messages = append(messages, openai.ChatCompletionMessage{
Role: "tool",
Content: result,
})
Why vector search is better:
- Searches by meaning, not by words
- Will find "restart Phoenix" even if document says "Phoenix server restart"
- Works with synonyms and different phrasings
Chunking¶
Documents are split into chunks (pieces) for efficient search.
Example:
Document: "Phoenix server restart protocol..."
Chunk 1: "Phoenix server restart protocol: step 1..."
Chunk 2: "Step 2: Turn off load balancer..."
Chunk 3: "Step 3: Restart server..."
Advanced RAG Techniques¶
Basic RAG works like this: user query → search → retrieved documents → response. In practice, this isn't enough. The query can be vague, search can be imprecise, and results can be irrelevant.
Let's look at techniques that solve these problems.
RAG Evolution¶
Basic RAG Advanced RAG Agentic RAG
┌──────────┐ ┌──────────────┐ ┌──────────────────┐
│ Query │ │ Query │ │ Agent decides: │
│ ↓ │ │ ↓ │ │ - Search or not │
│ Search │ │ Transform │ │ - Where to search│
│ ↓ │ │ ↓ │ │ - Is it enough │
│ Retrieve │ │ Route │ │ - Search more │
│ ↓ │ │ ↓ │ │ ↓ │
│ Generate │ │ Hybrid Search│ │ Iterative │
│ │ │ ↓ │ │ search loop │
│ │ │ Rerank │ │ │
│ │ │ ↓ │ │ │
│ │ │ Generate │ │ │
└──────────┘ └──────────────┘ └──────────────────┘
Query Transformation¶
Problem: A user writes "server is down". Searching for this query won't find the document "Server failure recovery procedure".
Solution: Transform the query before searching — rephrase, expand, or break it into sub-queries.
Technique 1: Query Rewriting
// Rewrite the query before search for better retrieval
func rewriteQuery(originalQuery string, client *openai.Client) (string, error) {
resp, err := client.CreateChatCompletion(ctx, openai.ChatCompletionRequest{
Model: "gpt-4o-mini", // Cheap model — simple task
Messages: []openai.ChatCompletionMessage{
{
Role: openai.ChatMessageRoleSystem,
Content: `Rewrite the user query to be more specific for document search.
Return ONLY the rewritten query, nothing else.
Examples:
- "server is down" → "server failure recovery procedure"
- "DB is slow" → "PostgreSQL performance diagnostics high latency"`,
},
{Role: openai.ChatMessageRoleUser, Content: originalQuery},
},
Temperature: 0,
})
if err != nil {
return originalQuery, err // Fall back to original
}
return resp.Choices[0].Message.Content, nil
}
Technique 2: Sub-Query Decomposition
A complex query is broken into several simpler ones. Results are merged.
// "Compare nginx config on prod and staging" breaks into:
// 1. "nginx configuration production"
// 2. "nginx configuration staging"
subQueries := decomposeQuery(userQuery)
var allResults []SearchResult
for _, sq := range subQueries {
results := searchKnowledgeBase(sq)
allResults = append(allResults, results...)
}
Technique 3: HyDE (Hypothetical Document Embeddings)
Instead of searching by the query, ask the model to generate a hypothetical answer. Then search for documents similar to that answer.
// Step 1: Model generates a hypothetical document
hypothetical := generateHypotheticalAnswer(query)
// query: "how to set up SSL"
// hypothetical: "To set up SSL on nginx: 1. Obtain a certificate... 2. Add to config..."
// Step 2: Search for documents similar to the hypothetical answer
embedding := embedText(hypothetical)
results := vectorDB.Search(embedding, topK=5)
// Finds real documents about SSL setup, even if written differently
Why this works: The embedding of a hypothetical document is closer to the embedding of the real document than the embedding of a short query.
Routing (Query Routing)¶
Problem: You have multiple data sources: a wiki, SQL database, monitoring API. The query "server response time for the last hour" won't be found in the wiki — you need the metrics database.
Solution: Classify the query and route it to the right source.
type QueryRoute struct {
Source string // "wiki", "sql", "metrics_api", "vector_db"
Query string // Original or transformed query
Reason string // Why this source
}
func routeQuery(query string, client *openai.Client) (QueryRoute, error) {
resp, err := client.CreateChatCompletion(ctx, openai.ChatCompletionRequest{
Model: "gpt-4o-mini",
Messages: []openai.ChatCompletionMessage{
{
Role: openai.ChatMessageRoleSystem,
Content: `Classify the query and route to the correct data source.
Available sources:
- "wiki": documentation, procedures, runbooks
- "sql": structured data, tables, historical records
- "metrics_api": real-time metrics, monitoring data
- "vector_db": semantic search in knowledge base
Return JSON: {"source": "...", "query": "...", "reason": "..."}`,
},
{Role: openai.ChatMessageRoleUser, Content: query},
},
Temperature: 0,
})
if err != nil {
return QueryRoute{Source: "vector_db", Query: query}, err
}
var route QueryRoute
json.Unmarshal([]byte(resp.Choices[0].Message.Content), &route)
return route, nil
}
// Usage:
route, _ := routeQuery("server response time for the last hour")
// route.Source = "metrics_api"
// route.Reason = "query about real-time metrics"
Hybrid Search¶
Problem: Vector search finds meaning well but struggles with exact terms. Keyword search is the opposite. The query "error ORA-12154" needs both keyword search for "ORA-12154" and semantic search for "Oracle connection error".
Solution: Combine both approaches and merge results.
type SearchResult struct {
ChunkID string
Text string
Score float64
}
func hybridSearch(query string, topK int) []SearchResult {
// 1. Keyword search (BM25)
keywordResults := bm25Search(query, topK)
// 2. Vector search (Semantic)
embedding := embedQuery(query)
vectorResults := vectorDB.Search(embedding, topK)
// 3. Reciprocal Rank Fusion (RRF) — merge results
return reciprocalRankFusion(keywordResults, vectorResults, topK)
}
// RRF: combines ranks from different lists
func reciprocalRankFusion(lists ...[]SearchResult) []SearchResult {
scores := make(map[string]float64) // chunkID → combined score
k := 60.0 // RRF constant (standard value)
for _, list := range lists {
for rank, result := range list {
scores[result.ChunkID] += 1.0 / (k + float64(rank+1))
}
}
// Sort by combined score (descending)
// ... sort and return top-K ...
}
When each approach works best:
| Query Type | Keyword | Vector | Hybrid |
|---|---|---|---|
| "ORA-12154" | Excellent | Poor | Excellent |
| "can't connect to database" | Poor | Excellent | Excellent |
| "ORA-12154 won't connect" | Medium | Medium | Excellent |
Reranking¶
Problem: Search returned 20 results. Not all are equally relevant. Passing all 20 into context wastes tokens.
Solution: After initial retrieval, rerank results using a more accurate (but slower) model.
func rerankResults(query string, results []SearchResult, topK int) []SearchResult {
// Use LLM to score relevance of each result
type scored struct {
Result SearchResult
Score float64
}
var scored []scored
for _, r := range results {
score := scoreRelevance(query, r.Text) // cross-encoder or LLM
scored = append(scored, scored{Result: r, Score: score})
}
// Sort by score (descending)
sort.Slice(scored, func(i, j int) bool {
return scored[i].Score > scored[j].Score
})
// Return top-K
result := make([]SearchResult, 0, topK)
for i := 0; i < topK && i < len(scored); i++ {
result = append(result, scored[i].Result)
}
return result
}
Two-stage pipeline: Fast retrieval (BM25/vector, top-100) → Precise reranking (cross-encoder, top-5).
Self-RAG (Self-Assessment of Retrieval Quality)¶
Problem: The agent found documents, but they may be irrelevant, outdated, or incomplete. Basic RAG doesn't notice this.
Solution: The model assesses retrieved document quality and decides: answer, refine the query, or search more.
type RetrievalAssessment struct {
IsRelevant bool `json:"is_relevant"` // Are documents relevant to the query?
IsSufficient bool `json:"is_sufficient"` // Is there enough information to answer?
Action string `json:"action"` // "answer", "refine_query", "search_more"
RefinedQuery string `json:"refined_query,omitempty"` // Refined query (if needed)
}
func assessRetrieval(query string, docs []SearchResult, client *openai.Client) (RetrievalAssessment, error) {
docsText := formatDocs(docs)
resp, err := client.CreateChatCompletion(ctx, openai.ChatCompletionRequest{
Model: "gpt-4o-mini",
Messages: []openai.ChatCompletionMessage{
{
Role: openai.ChatMessageRoleSystem,
Content: `Assess if the retrieved documents are relevant and sufficient to answer the query.
Return JSON: {"is_relevant": bool, "is_sufficient": bool, "action": "answer"|"refine_query"|"search_more", "refined_query": "..."}`,
},
{
Role: openai.ChatMessageRoleUser,
Content: fmt.Sprintf("Query: %s\n\nRetrieved documents:\n%s", query, docsText),
},
},
Temperature: 0,
})
if err != nil {
return RetrievalAssessment{IsRelevant: true, IsSufficient: true, Action: "answer"}, err
}
var assessment RetrievalAssessment
json.Unmarshal([]byte(resp.Choices[0].Message.Content), &assessment)
return assessment, nil
}
Self-RAG in a loop:
func selfRAG(query string, maxAttempts int) ([]SearchResult, error) {
currentQuery := query
for attempt := 0; attempt < maxAttempts; attempt++ {
docs := hybridSearch(currentQuery, 10)
assessment, _ := assessRetrieval(currentQuery, docs)
switch assessment.Action {
case "answer":
return docs, nil // Documents are sufficient — answer
case "refine_query":
currentQuery = assessment.RefinedQuery // Refine the query
case "search_more":
// Expand search (more top-K or different source)
moreDocs := hybridSearch(currentQuery, 20)
docs = append(docs, moreDocs...)
return docs, nil
}
}
return nil, fmt.Errorf("could not find sufficient documents after %d attempts", maxAttempts)
}
Agentic RAG (RAG as an Agent Tool)¶
Problem: Self-RAG assesses retrieval quality but can't make complex decisions: which source to use, whether to combine information from multiple documents, or solve multi-hop tasks (where the answer requires a chain of searches).
Solution: RAG is embedded into the Agent Loop (a.k.a. ReAct Loop — see Chapter 04). The agent decides when to search, where to search, and whether it has enough information.
// Agentic RAG: RAG is simply agent tools
tools := []openai.Tool{
// Tool 1: Search documentation
{
Function: &openai.FunctionDefinition{
Name: "search_docs",
Description: "Search documentation and runbooks. Use when you need procedures or technical details.",
Parameters: searchParamsSchema,
},
},
// Tool 2: SQL query to metrics database
{
Function: &openai.FunctionDefinition{
Name: "query_metrics",
Description: "Query metrics database. Use when you need historical data or statistics.",
Parameters: sqlParamsSchema,
},
},
// Tool 3: Search similar incidents
{
Function: &openai.FunctionDefinition{
Name: "search_incidents",
Description: "Search past incidents for similar issues and their resolutions.",
Parameters: searchParamsSchema,
},
},
}
// Agent decides on its own:
// Iteration 1: search_docs("nginx 502 error troubleshooting")
// Iteration 2: query_metrics("SELECT avg(latency) FROM requests WHERE status=502 AND time > now()-1h")
// Iteration 3: search_incidents("nginx 502 upstream timeout")
// Iteration 4: Final answer combining information from all sources
Multi-hop RAG (search chains):
Query: "Why did the payments service go down yesterday?"
Step 1: search_incidents("payments service outage yesterday")
→ "Incident INC-4521: payments went down due to DB timeout"
Step 2: search_docs("payments database connection configuration")
→ "Payments uses PostgreSQL on db-prod-03, connection pool = 20"
Step 3: query_metrics("SELECT connections FROM pg_stat WHERE time = yesterday")
→ "Peak connections: 150 (with limit of 100)"
Step 4: Final answer: "Payments went down due to connection pool exhaustion.
Peak was 150 connections with a limit of 100. Recommendation: increase pool to 200."
Differences between approaches:
| Approach | Who makes decisions | Where's the logic |
|---|---|---|
| Basic RAG | Rigid pipeline | Code (hardcoded) |
| Advanced RAG | Pipeline with branching | Code + config |
| Self-RAG | Model assesses quality | Model (assessment) |
| Agentic RAG | Agent controls everything | Agent Loop |
RAG for Action Space (Tool Retrieval)¶
So far we've discussed RAG for documents (protocols, instructions). But RAG is also needed for action space — when an agent has a potentially infinite number of tools.
The Problem: "Infinite" Tools¶
Situation: An agent needs to work with Linux commands for troubleshooting. Linux has thousands of commands (grep, awk, sed, jq, sort, uniq, head, tail, etc.), and they can be combined into pipelines.
Problem:
- Can't pass all commands in
tools[]— that's thousands of tokens - Model performs worse with large lists (more hallucinations)
- Latency increases (more tokens = slower)
- No security control (which commands are dangerous?)
Naive solution (doesn't work):
// BAD: One universal tool for everything
tools := []openai.Tool{
{
Function: &openai.FunctionDefinition{
Name: "run_shell",
Description: "Execute any shell command",
Parameters: json.RawMessage(`{
"type": "object",
"properties": {
"command": {"type": "string"}
}
}`),
},
},
}
Why this is bad:
- No validation (could execute
rm -rf /) - No audit trail (unclear which commands were used)
- No control (model can call anything)
Solution: Tool RAG (Action-Space Retrieval)¶
Idea: Store a tool catalog and retrieve only relevant tools before planning.
How it works:
-
Tool catalog stores metadata for each tool:
- Name and description
- Tags/categories (e.g., "text-processing", "network", "filesystem")
- Parameters and their types
- Risk level (safe/moderate/dangerous)
- Usage examples
-
Before planning, agent searches for relevant tools:
- Based on user query ("find errors in logs")
- Retrieves top-k tools (e.g.,
grep,tail,jq) - Adds only their schemas to
tools[]
-
For pipelines, use a two-level contract:
- JSON DSL describes the pipeline plan (steps, stdin/stdout, expectations)
- Runtime maps DSL to tool calls or executes via single
execute_pipeline
Example: Tool RAG for Linux Commands¶
Step 1: Tool Catalog
type ToolDefinition struct {
Name string
Description string
Tags []string // "text-processing", "filtering", "sorting"
RiskLevel string // "safe", "moderate", "dangerous"
Schema json.RawMessage
}
var toolCatalog = []ToolDefinition{
{
Name: "grep",
Description: "Search for patterns in text. Use for filtering lines matching a pattern.",
Tags: []string{"text-processing", "filtering", "search"},
RiskLevel: "safe",
Schema: json.RawMessage(`{
"type": "object",
"properties": {
"pattern": {"type": "string"},
"input": {"type": "string"}
}
}`),
},
{
Name: "sort",
Description: "Sort lines of text. Use for ordering output.",
Tags: []string{"text-processing", "sorting"},
RiskLevel: "safe",
Schema: json.RawMessage(`{
"type": "object",
"properties": {
"input": {"type": "string"}
}
}`),
},
{
Name: "head",
Description: "Show first N lines. Use for limiting output.",
Tags: []string{"text-processing", "filtering"},
RiskLevel: "safe",
Schema: json.RawMessage(`{
"type": "object",
"properties": {
"lines": {"type": "number"},
"input": {"type": "string"}
}
}`),
},
// ... hundreds more tools
}
Step 2: Search for Relevant Tools
For tool search, you can use two approaches: simple keyword search (for learning) and vector search (for production).
Simple search (Lab 13):
func searchToolCatalog(query string, topK int) []ToolDefinition {
// Simple search by description and tags
var results []ToolDefinition
queryLower := strings.ToLower(query)
for _, tool := range toolCatalog {
// Search by description
if strings.Contains(strings.ToLower(tool.Description), queryLower) {
results = append(results, tool)
continue
}
// Search by tags
for _, tag := range tool.Tags {
if strings.Contains(strings.ToLower(tag), queryLower) {
results = append(results, tool)
break
}
}
}
// Return top-k
if len(results) > topK {
return results[:topK]
}
return results
}
Vector search (production):
// 1. Tools are converted to vectors (embeddings)
toolEmbeddings := []ToolEmbedding{
{
Tool: toolCatalog[0], // grep
Embedding: embedText("Search for patterns in text. Use for filtering lines matching a pattern."), // [1536]float32{...}
},
{
Tool: toolCatalog[1], // sort
Embedding: embedText("Sort lines of text. Use for ordering output."), // [1536]float32{...}
},
// ... all tools
}
// 2. User query is also converted to vector
queryEmbedding := embedQuery("find errors in logs") // [1536]float32{...}
// 3. Search for similar vectors by cosine distance
similarTools := vectorDB.Search(queryEmbedding, topK=5)
// Returns 5 most similar tools by meaning (not by words!)
// 4. Result is used the same way as in simple search
relevantTools := extractTools(similarTools) // [grep, tail, jq, ...]
Why vector search is better for tools:
- Searches by meaning, not by words
- Will find
grepeven if query is "filter lines by pattern" (without word "grep") - Works with synonyms and different phrasings
- Especially important for large catalogs (1000+ tools)
Example usage:
userQuery := "find errors in logs"
relevantTools := searchToolCatalog("error log filter", 5)
// Returns: [grep, tail, jq, ...] - only relevant ones!
Step 3: Add Only Relevant Tools to Context
// Instead of passing all 1000+ tools
relevantTools := searchToolCatalog(userQuery, 5)
// Convert to OpenAI format
tools := make([]openai.Tool, 0, len(relevantTools))
for _, toolDef := range relevantTools {
tools = append(tools, openai.Tool{
Type: openai.ToolTypeFunction,
Function: &openai.FunctionDefinition{
Name: toolDef.Name,
Description: toolDef.Description,
Parameters: toolDef.Schema,
},
})
}
// Now tools contains only 5 relevant tools instead of 1000+
Pipelines: JSON DSL + Runtime¶
For complex tasks (e.g., "find top 10 errors in logs"), the agent must build pipelines from multiple commands.
Approach 1: JSON DSL Pipeline
Agent generates a formalized pipeline plan:
type PipelineStep struct {
Tool string `json:"tool"`
Args map[string]interface{} `json:"args"`
Input string `json:"input,omitempty"` // stdin from previous step
Output string `json:"output,omitempty"` // expected format
}
type Pipeline struct {
Steps []PipelineStep `json:"steps"`
ExpectedOutput string `json:"expected_output"`
RiskLevel string `json:"risk_level"` // "safe", "moderate", "dangerous"
}
// Example: Agent generates this JSON
pipelineJSON := `{
"steps": [
{
"tool": "grep",
"args": {"pattern": "ERROR"},
"input": "logs.txt"
},
{
"tool": "sort",
"args": {},
"input": "{{step_0.output}}"
},
{
"tool": "head",
"args": {"lines": 10},
"input": "{{step_1.output}}"
}
],
"expected_output": "Top 10 error lines, sorted",
"risk_level": "safe"
}`
Approach 2: Runtime Executes Pipeline
func executePipeline(pipelineJSON string, inputData string) (string, error) {
var pipeline Pipeline
if err := json.Unmarshal([]byte(pipelineJSON), &pipeline); err != nil {
return "", err
}
// Validation: check risk
if pipeline.RiskLevel == "dangerous" {
return "", fmt.Errorf("dangerous pipeline requires confirmation")
}
// Execute steps sequentially
currentInput := inputData
for i, step := range pipeline.Steps {
// Substitute result from previous step
if strings.Contains(step.Input, "{{step_") {
step.Input = currentInput
}
// Execute step (in reality - call corresponding tool)
result, err := executeToolStep(step.Tool, step.Args, step.Input)
if err != nil {
return "", fmt.Errorf("step %d failed: %v", i, err)
}
currentInput = result
}
return currentInput, nil
}
Approach 3: execute_pipeline Tool
Agent calls a single tool with pipeline JSON:
tools := []openai.Tool{
{
Function: &openai.FunctionDefinition{
Name: "execute_pipeline",
Description: "Execute a pipeline of tools. Provide pipeline JSON with steps, expected output, and risk level.",
Parameters: json.RawMessage(`{
"type": "object",
"properties": {
"pipeline": {"type": "string", "description": "JSON pipeline definition"},
"input_data": {"type": "string", "description": "Input data (e.g., log file content)"}
},
"required": ["pipeline", "input_data"]
}`),
},
},
}
// Agent generates tool call:
// execute_pipeline({
// "pipeline": "{\"steps\":[...], \"risk_level\": \"safe\"}",
// "input_data": "log content here"
// })
Practical Patterns¶
Tool Discovery via Tool Servers:
In production, tools are often provided via Tool Servers. The catalog can be retrieved dynamically:
// Tool Server provides ListTools()
toolServer := connectToToolServer("http://localhost:8080")
allTools, _ := toolServer.ListTools()
// Filter by task
relevantTools := filterToolsByQuery(allTools, userQuery, topK=5)
Validation and Security:
func validatePipeline(pipeline Pipeline) error {
// Check risk
if pipeline.RiskLevel == "dangerous" {
return fmt.Errorf("dangerous pipeline requires human approval")
}
// Check tool allowlist
allowedTools := map[string]bool{
"grep": true, "sort": true, "head": true,
// rm, dd and other dangerous ones - NOT in allowlist
}
for _, step := range pipeline.Steps {
if !allowedTools[step.Tool] {
return fmt.Errorf("tool %s not allowed", step.Tool)
}
}
return nil
}
Observability:
// Log selected tools and reasons
log.Printf("Tool retrieval: query=%s, selected=%v, reason=%s",
userQuery,
[]string{"grep", "sort", "head"},
"matched tags: text-processing, filtering")
// Store pipeline JSON for audit
auditLog.StorePipeline(userID, pipelineJSON, result)
Common Errors¶
Error 1: Agent Doesn't Search Knowledge Base¶
Symptom: Agent performs actions without searching the knowledge base, using only general knowledge.
Cause: The System Prompt doesn't instruct the agent to search the knowledge base, or the search tool description isn't clear enough.
Solution:
// GOOD: System Prompt requires search
systemPrompt := `... Always search knowledge base before performing actions that require specific procedures.`
// GOOD: Clear tool description
Description: "Search the knowledge base for documentation, protocols, and procedures. Use this BEFORE performing actions that require specific procedures."
Error 2: Poor Search Query¶
Symptom: Agent doesn't find needed information in the knowledge base.
Cause: Search query is too general or doesn't contain keywords from the document.
Solution:
// BAD: Too general query
query := "server"
// GOOD: Specific query with keywords
query := "Phoenix server restart protocol"
Error 3: Chunks Too Large¶
Symptom: Search returns documents that are too large and don't fit in context.
Cause: Chunk size is too large (larger than context window).
Solution:
Error 4: Passing All Tools to Context¶
Symptom: Agent receives a list of 1000+ tools, model performs worse, latency increases.
Cause: All tools are passed in tools[] without filtering.
Solution:
// BAD: All tools
tools := getAllTools() // 1000+ tools
// GOOD: Only relevant ones
userQuery := "find errors in logs"
relevantTools := searchToolCatalog(userQuery, topK=5) // Only 5 relevant tools
tools := convertToOpenAITools(relevantTools)
Error 5: Universal run_shell Without Control¶
Symptom: Agent uses a single run_shell(command) tool for all commands. No validation, no audit trail.
Cause: Simplifying architecture at the expense of security.
Solution:
// BAD: Universal shell
tools := []openai.Tool{{
Function: &openai.FunctionDefinition{
Name: "run_shell",
Description: "Execute any shell command",
},
}}
// GOOD: Specific tools + pipeline DSL
tools := []openai.Tool{
{Function: &openai.FunctionDefinition{Name: "grep", ...}},
{Function: &openai.FunctionDefinition{Name: "sort", ...}},
{Function: &openai.FunctionDefinition{Name: "execute_pipeline", ...}},
}
// Pipeline JSON is validated before execution
if err := validatePipeline(pipeline); err != nil {
return err
}
Error 6: No Pipeline Validation¶
Symptom: Agent generates a pipeline with dangerous commands (rm -rf, dd), which execute without checks.
Cause: No risk level check and allowlist validation before execution.
Solution:
// GOOD: Validate before execution
func executePipeline(pipelineJSON string) error {
var pipeline Pipeline
json.Unmarshal([]byte(pipelineJSON), &pipeline)
// Check risk
if pipeline.RiskLevel == "dangerous" {
return fmt.Errorf("dangerous pipeline requires confirmation")
}
// Check allowlist
allowedTools := map[string]bool{"grep": true, "sort": true}
for _, step := range pipeline.Steps {
if !allowedTools[step.Tool] {
return fmt.Errorf("tool %s not allowed", step.Tool)
}
}
// Execute only after validation
return runValidatedPipeline(pipeline)
}
Mini-Exercises¶
Exercise 1: Implement Simple Search¶
Implement a simple keyword search function:
func searchKnowledgeBase(query string) string {
// Simple keyword search
// Return relevant documents
}
Expected result:
- Function finds documents containing keywords from query
- Function returns first N relevant documents
Exercise 2: Implement Chunking¶
Implement a function to split a document into chunks:
func chunkDocument(text string, chunkSize int) []string {
// Split document into chunks of chunkSize tokens
// Return list of chunks
}
Expected result:
- Function splits document into chunks of specified size
- Chunks don't overlap (or overlap minimally)
Exercise 3: Implement Tool Search¶
Implement a function to search for relevant tools in the catalog:
func searchToolCatalog(query string, catalog []ToolDefinition, topK int) []ToolDefinition {
// Search by description and tags
// Return top-k most relevant tools
}
Expected result:
- Function finds tools relevant to the query
- Returns no more than topK tools
- Considers tool descriptions and tags
Advanced: Implement vector search for tools (similar to vector search for documents above). This is especially useful for large catalogs (1000+ tools).
Exercise 4: Pipeline Validation¶
Implement a function to validate a pipeline before execution:
func validatePipeline(pipeline Pipeline, allowedTools map[string]bool) error {
// Check risk level
// Check tool allowlist
// Return error if pipeline is unsafe
}
Expected result:
- Function returns error for dangerous pipelines
- Function returns error if disallowed tools are used
- Function returns nil for safe pipelines
Completion Criteria / Checklist¶
Completed:
- Agent searches knowledge base before performing actions
- Search queries are specific and contain keywords
- Documents are split into appropriately sized chunks
- Search tool has clear description
- System Prompt instructs agent to use knowledge base
- For large tool spaces, tool retrieval is used (only relevant tools in context)
- Pipelines are validated before execution (risk level, allowlist)
- Dangerous operations require confirmation
Not completed:
- Agent doesn't search knowledge base (uses only general knowledge)
- Search queries are too general (doesn't find needed information)
- Chunks are too large (don't fit in context)
- All tools are passed to context without filtering (1000+ tools)
- Universal
run_shellis used without security controls - Pipelines execute without validation
Production Notes¶
When using RAG in production, consider:
- Document versioning: Track document versions and update date (
updated_at). This helps understand which version was used in the response. - Freshness (currency): Filter outdated documents (e.g., older than 30 days) before using in context.
- Grounding: Require agent to reference found documents in response. This reduces hallucinations and increases trust.
More on production readiness: Chapter 19: Observability, Chapter 23: Evals in CI/CD.
Connection with Other Chapters¶
- Tools: How search tool integrates into agent, see Chapter 03: Tools. The problem of large tool lists is solved via tool retrieval (see "RAG for Action Space" section above).
- Autonomy: How RAG works in the agent loop, see Chapter 04: Autonomy
- Tool Servers: How to retrieve tool catalogs dynamically via tool servers, see Chapter 18: Tool Protocols
- MCP Resources: MCP provides a standard mechanism for data access (Resources) — Model Context Protocol
What's Next?¶
After studying RAG, proceed to:
- 07. Multi-Agent Systems — how to create a team of agents