Skip to content

LangGraph agents querying IBM AS/400 DB2 with `ibm_db`

LangGraph Agents Querying IBM AS/400 DB2 with ibm_db

Section titled “LangGraph Agents Querying IBM AS/400 DB2 with ibm_db”

Slug: langgraph-ibm-as400-db2-ibm_db

The IBM AS/400 (IBM i) is the backbone of legacy enterprise data, often holding critical inventory and financial records in DB2 databases. Modernizing access to this data doesn’t require a migration; it requires a bridge.

This guide demonstrates how to build a FastMCP server that exposes ibm_db (the native Python driver for IBM i) as agentic tools, and how to connect a LangGraph agent to this server.


We use FastMCP to create a lightweight server. The critical component here is the ibm_db driver, which handles the specific protocol requirements (DRDA) of the AS/400.

Note: We enforce a read-only pattern and strict row limits to prevent the agent from overwhelming the context window or accidentally modifying legacy data.

import os
import json
import ibm_db
from fastmcp import FastMCP
# Initialize FastMCP
mcp = FastMCP("AS400-DB2-Bridge")
def get_connection():
"""Establishes a connection to the IBM i (AS/400) system."""
# Use environment variables for security
db_name = os.getenv("DB_NAME", "*LOCAL")
hostname = os.getenv("DB_HOST", "192.168.1.100")
user = os.getenv("DB_USER", "QSECOFR")
password = os.getenv("DB_PASS", "password")
# Construct the connection string required by ibm_db
conn_str = (
f"DATABASE={db_name};"
f"HOSTNAME={hostname};"
f"PORT=50000;"
f"PROTOCOL=TCPIP;"
f"UID={user};"
f"PWD={password};"
)
try:
conn = ibm_db.connect(conn_str, "", "")
return conn
except Exception as e:
raise ConnectionError(f"Failed to connect to AS/400: {str(e)}")
@mcp.tool()
def get_table_schema(table_name: str, library: str = "QGPL") -> str:
"""
Retrieves column definitions for a specific AS/400 table.
Use this BEFORE querying to understand the cryptic column names (e.g., CUSNAM).
Args:
table_name: The file name (e.g., ORDHDR).
library: The library/schema name (default: QGPL).
"""
conn = get_connection()
try:
# Query the system catalog for column metadata
sql = f"""
SELECT COLUMN_NAME, DATA_TYPE, LENGTH, COLUMN_TEXT
FROM QSYS2.SYSCOLUMNS
WHERE TABLE_NAME = '{table_name.upper()}'
AND TABLE_SCHEMA = '{library.upper()}'
"""
stmt = ibm_db.exec_immediate(conn, sql)
columns = []
dictionary = ibm_db.fetch_assoc(stmt)
while dictionary:
columns.append(dictionary)
dictionary = ibm_db.fetch_assoc(stmt)
if not columns:
return f"No columns found for {library}.{table_name}. Check spelling."
return json.dumps(columns, indent=2)
except Exception as e:
return f"Error retrieving schema: {str(e)}"
finally:
if conn:
ibm_db.close(conn)
@mcp.tool()
def query_as400(sql_query: str) -> str:
"""
Executes a read-only SQL query against the AS/400 DB2 database.
ALWAYS limit results to 50 rows via the query or fetching logic.
"""
# rudimentary safety check
forbidden = ["DROP ", "DELETE ", "UPDATE ", "ALTER ", "INSERT "]
if any(word in sql_query.upper() for word in forbidden):
return "Error: This tool is strictly read-only."
conn = get_connection()
try:
stmt = ibm_db.exec_immediate(conn, sql_query)
results = []
dictionary = ibm_db.fetch_assoc(stmt)
row_count = 0
# Hard limit of 50 rows
while dictionary and row_count < 50:
results.append(dictionary)
dictionary = ibm_db.fetch_assoc(stmt)
row_count += 1
return json.dumps(results, indent=2, default=str)
except Exception as e:
return f"SQL Execution Error: {str(e)}"
finally:
if conn:
ibm_db.close(conn)
if __name__ == "__main__":
# Server must bind to 0.0.0.0 to be accessible outside Docker
mcp.run(transport='sse', host='0.0.0.0', port=8000)

The ibm_db driver requires system-level libraries (libxml2, gcc). We build a Docker container to encapsulate these dependencies.

# Use python slim image
FROM python:3.11-slim
# Install system dependencies for ibm_db
RUN apt-get update && apt-get install -y \
gcc \
libxml2 \
&& rm -rf /var/lib/apt/lists/*
WORKDIR /app
# Install python dependencies
RUN pip install --no-cache-dir fastmcp ibm_db uvicorn
# Copy application code
COPY server.py .
# Expose the SSE port
EXPOSE 8000
# Run the server
CMD ["python", "server.py"]

This LangGraph agent connects to the SSE stream exposed by our Docker container. We define the mcps configuration list to manage our connection endpoints clearly.

import asyncio
from mcp import ClientSession, StdioServerParameters
from mcp.client.sse import sse_client
from langchain_openai import ChatOpenAI
from langgraph.prebuilt import create_react_agent
from langchain_core.messages import HumanMessage
from langchain_core.tools import tool
# Configuration: List of MCP Servers to connect to
# We define this explicitly to manage multiple legacy bridges if needed.
mcps = ["http://localhost:8000/sse"]
async def main():
# Iterate through defined MCP servers (currently just our AS/400 bridge)
# Note: In a production multi-server setup, you would manage these sessions concurrently.
url = mcps[0]
print(f"🔌 Connecting to MCP Server: {url}")
async with sse_client(url=url) as streams:
async with ClientSession(streams[0], streams[1]) as session:
await session.initialize()
# List tools available on the AS/400 server
mcp_tool_list = await session.list_tools()
# Convert MCP tools to LangChain tools
langchain_tools = []
for tool_def in mcp_tool_list.tools:
# Create a dynamic wrapper for the tool
# This closure captures the specific tool name for the session call
async def make_tool_func(t_name=tool_def.name):
async def wrapper(**kwargs):
result = await session.call_tool(t_name, arguments=kwargs)
return result.content[0].text
return wrapper
func_wrapper = await make_tool_func(tool_def.name)
# Register with LangChain's @tool decorator
@tool(name=tool_def.name, description=tool_def.description)
async def dynamic_tool(**kwargs):
"""Proxies the call to the remote MCP server."""
return await func_wrapper(**kwargs)
langchain_tools.append(dynamic_tool)
print(f"✅ Tools Loaded: {[t.name for t in langchain_tools]}")
# Initialize the LangGraph Agent
llm = ChatOpenAI(model="gpt-4-turbo", temperature=0)
agent_executor = create_react_agent(llm, langchain_tools)
# Define the query
query = "In library QGPL, inspect the table 'CUSTOMERS'. Find customers in 'NY' state."
print(f"\n🤖 Querying AS/400: {query}")
# Run the agent
async for event in agent_executor.astream(
{"messages": [HumanMessage(content=query)]}
):
for node, values in event.items():
if "messages" in values:
last_msg = values["messages"][-1]
print(f"[{node}]: {last_msg.content}")
if __name__ == "__main__":
asyncio.run(main())
  1. mcps Configuration: We define the server URL explicitly (http://localhost:8000/sse). This allows the agent to target the specific Docker container running the AS/400 bridge.
  2. Tool Wrapping: LangGraph needs standard Python functions. We dynamically wrap the session.call_tool method into a function decorated with @tool, preserving the name and description from the MCP server.
  3. Schema First: The AS/400 uses short, non-descriptive column names (legacy 10-char limit). The agent uses get_table_schema to map “CUSCTY” to “Customer City” before generating SQL.

  • Status: ✅ Verified
  • Environment: Python 3.11
  • Auditor: AgentRetrofit CI/CD

Transparency: This page may contain affiliate links.