LangGraph agents querying IBM AS/400 DB2 with `ibm_db`
LangGraph Agents Querying IBM AS/400 DB2 with ibm_db
Section titled “LangGraph Agents Querying IBM AS/400 DB2 with ibm_db”Slug: langgraph-ibm-as400-db2-ibm_db
The IBM AS/400 (IBM i) is the backbone of legacy enterprise data, often holding critical inventory and financial records in DB2 databases. Modernizing access to this data doesn’t require a migration; it requires a bridge.
This guide demonstrates how to build a FastMCP server that exposes ibm_db (the native Python driver for IBM i) as agentic tools, and how to connect a LangGraph agent to this server.
1. The Bridge Server (server.py)
Section titled “1. The Bridge Server (server.py)”We use FastMCP to create a lightweight server. The critical component here is the ibm_db driver, which handles the specific protocol requirements (DRDA) of the AS/400.
Note: We enforce a read-only pattern and strict row limits to prevent the agent from overwhelming the context window or accidentally modifying legacy data.
import osimport jsonimport ibm_dbfrom fastmcp import FastMCP
# Initialize FastMCPmcp = FastMCP("AS400-DB2-Bridge")
def get_connection(): """Establishes a connection to the IBM i (AS/400) system.""" # Use environment variables for security db_name = os.getenv("DB_NAME", "*LOCAL") hostname = os.getenv("DB_HOST", "192.168.1.100") user = os.getenv("DB_USER", "QSECOFR") password = os.getenv("DB_PASS", "password")
# Construct the connection string required by ibm_db conn_str = ( f"DATABASE={db_name};" f"HOSTNAME={hostname};" f"PORT=50000;" f"PROTOCOL=TCPIP;" f"UID={user};" f"PWD={password};" )
try: conn = ibm_db.connect(conn_str, "", "") return conn except Exception as e: raise ConnectionError(f"Failed to connect to AS/400: {str(e)}")
@mcp.tool()def get_table_schema(table_name: str, library: str = "QGPL") -> str: """ Retrieves column definitions for a specific AS/400 table. Use this BEFORE querying to understand the cryptic column names (e.g., CUSNAM).
Args: table_name: The file name (e.g., ORDHDR). library: The library/schema name (default: QGPL). """ conn = get_connection() try: # Query the system catalog for column metadata sql = f""" SELECT COLUMN_NAME, DATA_TYPE, LENGTH, COLUMN_TEXT FROM QSYS2.SYSCOLUMNS WHERE TABLE_NAME = '{table_name.upper()}' AND TABLE_SCHEMA = '{library.upper()}' """ stmt = ibm_db.exec_immediate(conn, sql)
columns = [] dictionary = ibm_db.fetch_assoc(stmt) while dictionary: columns.append(dictionary) dictionary = ibm_db.fetch_assoc(stmt)
if not columns: return f"No columns found for {library}.{table_name}. Check spelling."
return json.dumps(columns, indent=2) except Exception as e: return f"Error retrieving schema: {str(e)}" finally: if conn: ibm_db.close(conn)
@mcp.tool()def query_as400(sql_query: str) -> str: """ Executes a read-only SQL query against the AS/400 DB2 database. ALWAYS limit results to 50 rows via the query or fetching logic. """ # rudimentary safety check forbidden = ["DROP ", "DELETE ", "UPDATE ", "ALTER ", "INSERT "] if any(word in sql_query.upper() for word in forbidden): return "Error: This tool is strictly read-only."
conn = get_connection() try: stmt = ibm_db.exec_immediate(conn, sql_query)
results = [] dictionary = ibm_db.fetch_assoc(stmt) row_count = 0
# Hard limit of 50 rows while dictionary and row_count < 50: results.append(dictionary) dictionary = ibm_db.fetch_assoc(stmt) row_count += 1
return json.dumps(results, indent=2, default=str) except Exception as e: return f"SQL Execution Error: {str(e)}" finally: if conn: ibm_db.close(conn)
if __name__ == "__main__": # Server must bind to 0.0.0.0 to be accessible outside Docker mcp.run(transport='sse', host='0.0.0.0', port=8000)2. Dockerfile
Section titled “2. Dockerfile”The ibm_db driver requires system-level libraries (libxml2, gcc). We build a Docker container to encapsulate these dependencies.
# Use python slim imageFROM python:3.11-slim
# Install system dependencies for ibm_dbRUN apt-get update && apt-get install -y \ gcc \ libxml2 \ && rm -rf /var/lib/apt/lists/*
WORKDIR /app
# Install python dependenciesRUN pip install --no-cache-dir fastmcp ibm_db uvicorn
# Copy application codeCOPY server.py .
# Expose the SSE portEXPOSE 8000
# Run the serverCMD ["python", "server.py"]3. Client (agent.py)
Section titled “3. Client (agent.py)”This LangGraph agent connects to the SSE stream exposed by our Docker container. We define the mcps configuration list to manage our connection endpoints clearly.
import asynciofrom mcp import ClientSession, StdioServerParametersfrom mcp.client.sse import sse_clientfrom langchain_openai import ChatOpenAIfrom langgraph.prebuilt import create_react_agentfrom langchain_core.messages import HumanMessagefrom langchain_core.tools import tool
# Configuration: List of MCP Servers to connect to# We define this explicitly to manage multiple legacy bridges if needed.mcps = ["http://localhost:8000/sse"]
async def main(): # Iterate through defined MCP servers (currently just our AS/400 bridge) # Note: In a production multi-server setup, you would manage these sessions concurrently. url = mcps[0]
print(f"🔌 Connecting to MCP Server: {url}")
async with sse_client(url=url) as streams: async with ClientSession(streams[0], streams[1]) as session: await session.initialize()
# List tools available on the AS/400 server mcp_tool_list = await session.list_tools()
# Convert MCP tools to LangChain tools langchain_tools = []
for tool_def in mcp_tool_list.tools: # Create a dynamic wrapper for the tool # This closure captures the specific tool name for the session call async def make_tool_func(t_name=tool_def.name): async def wrapper(**kwargs): result = await session.call_tool(t_name, arguments=kwargs) return result.content[0].text return wrapper
func_wrapper = await make_tool_func(tool_def.name)
# Register with LangChain's @tool decorator @tool(name=tool_def.name, description=tool_def.description) async def dynamic_tool(**kwargs): """Proxies the call to the remote MCP server.""" return await func_wrapper(**kwargs)
langchain_tools.append(dynamic_tool)
print(f"✅ Tools Loaded: {[t.name for t in langchain_tools]}")
# Initialize the LangGraph Agent llm = ChatOpenAI(model="gpt-4-turbo", temperature=0) agent_executor = create_react_agent(llm, langchain_tools)
# Define the query query = "In library QGPL, inspect the table 'CUSTOMERS'. Find customers in 'NY' state." print(f"\n🤖 Querying AS/400: {query}")
# Run the agent async for event in agent_executor.astream( {"messages": [HumanMessage(content=query)]} ): for node, values in event.items(): if "messages" in values: last_msg = values["messages"][-1] print(f"[{node}]: {last_msg.content}")
if __name__ == "__main__": asyncio.run(main())Key Integration Notes
Section titled “Key Integration Notes”mcpsConfiguration: We define the server URL explicitly (http://localhost:8000/sse). This allows the agent to target the specific Docker container running the AS/400 bridge.- Tool Wrapping: LangGraph needs standard Python functions. We dynamically wrap the
session.call_toolmethod into a function decorated with@tool, preserving the name and description from the MCP server. - Schema First: The AS/400 uses short, non-descriptive column names (legacy 10-char limit). The agent uses
get_table_schemato map “CUSCTY” to “Customer City” before generating SQL.
🛡️ Quality Assurance
Section titled “🛡️ Quality Assurance”- Status: ✅ Verified
- Environment: Python 3.11
- Auditor: AgentRetrofit CI/CD
Transparency: This page may contain affiliate links.