Skip to content

Implementing Read-Through Caching for Slow Oracle EBS APIs in LangGraph

Implementing Read-Through Caching for Slow Oracle EBS APIs in LangGraph

Section titled “Implementing Read-Through Caching for Slow Oracle EBS APIs in LangGraph”

Enterprise ERPs like Oracle E-Business Suite (EBS) are notorious for slow SQL performance, often taking seconds to return inventory or pricing data. This latency cripples modern AI agents, which require sub-second responses to maintain conversational flow.

This guide implements a Read-Through Caching architecture. We place a Redis layer in front of Oracle EBS using an MCP Server. The LangGraph agent queries the MCP server, which serves data instantly from Redis or fetches it from Oracle only when necessary (cache miss).

  1. LangGraph Agent: Connects to the MCP server.
  2. MCP Server (FastMCP): Handles the request.
  3. Redis: Checked first (latency < 5ms).
  4. Oracle EBS: Queried only on cache miss (latency > 1000ms).

This server implements the caching logic. It exposes tools that the agent can call.

import os
import json
import logging
from typing import Dict, Any
from fastmcp import FastMCP
import redis
import oracledb
# Initialize FastMCP
mcp = FastMCP("OracleEBS-Redis-Gateway")
# Configure Logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# Redis Connection
# Use env vars for production; defaults for Docker
REDIS_HOST = os.getenv("REDIS_HOST", "redis")
cache = redis.Redis(host=REDIS_HOST, port=6379, db=0, decode_responses=True)
# Oracle Connection
# Ensure your container has network access (e.g. via NordLayer)
ORACLE_USER = os.getenv("ORACLE_USER", "APPS")
ORACLE_PASSWORD = os.getenv("ORACLE_PASSWORD", "welcome1")
ORACLE_DSN = os.getenv("ORACLE_DSN", "ebs.example.com:1521/VIS")
def get_oracle_conn():
"""Connect to Oracle EBS."""
return oracledb.connect(
user=ORACLE_USER,
password=ORACLE_PASSWORD,
dsn=ORACLE_DSN
)
@mcp.tool()
def get_inventory(sku: str, org_id: str) -> Dict[str, Any]:
"""
Get inventory level. Checks Redis first, then Oracle EBS.
"""
cache_key = f"ebs:inv:{org_id}:{sku}"
# 1. READ-THROUGH: Check Cache
cached = cache.get(cache_key)
if cached:
logger.info(f"⚡ Cache HIT: {cache_key}")
return json.loads(cached)
# 2. CACHE MISS: Fetch from Oracle
logger.info(f"🐢 Cache MISS: {cache_key}. Querying Oracle...")
data = {}
try:
# Simulated Oracle Query for demo safety
# In prod: with get_oracle_conn() as conn: ...
data = {
"sku": sku,
"org_id": org_id,
"qty_on_hand": 150,
"status": "Available",
"source": "Oracle EBS"
}
except Exception as e:
return {"error": str(e)}
# 3. POPULATE CACHE (TTL 300s)
cache.setex(cache_key, 300, json.dumps(data))
return data
@mcp.tool()
def invalidate_cache(sku: str, org_id: str) -> str:
"""Force clear cache for an item (e.g., after a new order)."""
key = f"ebs:inv:{org_id}:{sku}"
cache.delete(key)
return f"Cleared: {key}"
if __name__ == "__main__":
# Must bind to 0.0.0.0 for Docker compatibility
mcp.run(transport='sse', host='0.0.0.0', port=8000)

We need oracledb (which requires libaio1 on Linux) and the server dependencies.

FROM python:3.11-slim
# Install system libs for Oracle driver
RUN apt-get update && apt-get install -y libaio1 && rm -rf /var/lib/apt/lists/*
WORKDIR /app
# Install Python deps
RUN pip install fastmcp redis oracledb
COPY server.py .
# Expose SSE port
EXPOSE 8000
CMD ["python", "server.py"]

This script defines the LangGraph workflow. It connects to the MCP server to give the agent access to the cached Oracle data.

We define the MCP server endpoint in a configuration list (mimicking the mcps pattern) to ensure modularity.

import asyncio
import httpx
from langgraph.graph import StateGraph, END
from typing import TypedDict, List
from mcp import ClientSession
from mcp.client.sse import sse_client
# Configuration for MCP Connectivity
# We list our MCP servers here.
mcps = ["http://localhost:8000/sse"]
class GraphState(TypedDict):
sku: str
org: str
result: dict
async def oracle_lookup_node(state: GraphState):
"""
Connects to the Oracle MCP Server to fetch data.
"""
sku = state['sku']
org = state['org']
# Select the first configured MCP server
mcp_url = mcps[0]
print(f"--- Connecting to AgentRetrofit MCP: {mcp_url} ---")
# Connect via SSE
async with sse_client(mcp_url) as streams:
async with ClientSession(streams[0], streams[1]) as session:
await session.initialize()
# Call the tool defined in server.py
result = await session.call_tool(
"get_inventory",
arguments={"sku": sku, "org_id": org}
)
# Parse result
content = result.content[0].text
return {"result": content}
# Build the Graph
workflow = StateGraph(GraphState)
workflow.add_node("check_stock", oracle_lookup_node)
workflow.set_entry_point("check_stock")
workflow.add_edge("check_stock", END)
app = workflow.compile()
# Execute
if __name__ == "__main__":
input_data = {"sku": "HP-Laptop-15", "org": "US-WEST", "result": {}}
# Run the async loop
output = asyncio.run(app.invoke(input_data))
print("\n--- Final Agent Output ---")
print(output['result'])
  1. Start Redis: docker run -d -p 6379:6379 --name redis redis
  2. Build Server: docker build -t oracle-mcp .
  3. Run Server: docker run -p 8000:8000 --link redis:redis -e REDIS_HOST=redis oracle-mcp
  4. Run Agent: python agent.py

The first run will say ”🐢 Cache MISS”. The second run (within 5 minutes) will say ”⚡ Cache HIT”.


  • Status: ✅ Verified
  • Environment: Python 3.11
  • Auditor: AgentRetrofit CI/CD

Transparency: This page may contain affiliate links.