Monitoring IBM i Data Queues (DTAQ) with Node.js and LangGraph
Monitoring IBM i Data Queues (DTAQ) with Node.js and LangGraph
Section titled “Monitoring IBM i Data Queues (DTAQ) with Node.js and LangGraph”The “Polyglot Bridge” Pattern
Section titled “The “Polyglot Bridge” Pattern”Modern AI Agents often need to access legacy systems that lack native Python drivers. IBM i (AS/400) Data Queues are a prime example—the best connectivity libraries often live in the Node.js ecosystem (ibm_db), while the AI agents run in Python.
This guide implements a Polyglot Bridge:
- Host: A Python FastMCP server acts as the interface.
- Worker: A hidden Node.js subprocess handles the raw
ibm_dbconnectivity. - Client: A CrewAI agent (compatible with LangGraph under the hood) connects via the Model Context Protocol (MCP) to monitor the queues.
Architectural Diagram
Section titled “Architectural Diagram”graph LR
A[CrewAI Agent] -- SSE/HTTP --> B[FastMCP Server
(Python)]
B -- JSON/Stdin --> C[Node.js Worker
(sap_client.js)]
C -- TCP/IP --> D[IBM i
(AS/400 Data Queue)]
C -- JSON/Stdout --> B
B -- Result --> A
1. The Worker: sap_client.js
Section titled “1. The Worker: sap_client.js”This Node.js script acts as the “driver.” It reads a JSON payload from stdin, uses the ibm_db library to check the Data Queue via SQL, and writes the result to stdout.
/** * sap_client.js * * WORKER SCRIPT: Handles IBM i Data Queue operations. * Reads JSON from Stdin, executes SQL, writes JSON to Stdout. */
const ibmdb = require('ibm_db');
// Helper: Read all stdinasync function readStdin() { return new Promise((resolve) => { let data = ''; process.stdin.on('data', chunk => data += chunk); process.stdin.on('end', () => resolve(data)); });}
// Helper: Check Data Queue via SQL Table Functionasync function checkDataQueue(connStr, lib, queue) { return new Promise((resolve, reject) => { ibmdb.open(connStr, (err, conn) => { if (err) return reject(err);
// QSYS2.RECEIVE_DATA_QUEUE is the modern SQL way to read DTAQs // WAIT_TIME => 0 (non-blocking) or 1 (short wait) const sql = ` SELECT MESSAGEDATA FROM TABLE(QSYS2.RECEIVE_DATA_QUEUE( DATA_QUEUE => '${queue}', DATA_QUEUE_LIBRARY => '${lib}', WAIT_TIME => 1 )) AS T `;
conn.query(sql, (err, data) => { conn.close(() => {}); if (err) return reject(err);
if (!data || data.length === 0) { resolve({ status: 'empty', message: null }); } else { resolve({ status: 'success', message: data[0].MESSAGEDATA }); } }); }); });}
(async () => { try { const input = await readStdin(); if (!input) throw new Error("No input received on stdin");
const request = JSON.parse(input);
if (!request.connectionString || !request.library || !request.queue) { throw new Error("Missing: connectionString, library, or queue"); }
const result = await checkDataQueue( request.connectionString, request.library, request.queue );
console.log(JSON.stringify(result)); } catch (error) { console.error(JSON.stringify({ status: 'error', error: error.message })); process.exit(1); }})();2. The Server: server.py
Section titled “2. The Server: server.py”This Python server exposes the check_data_queue tool. It manages the subprocess execution, ensuring the agent interacts with a clean Python API while the heavy lifting happens in Node.js.
"""server.pyFastMCP Server for IBM i Data Queue Monitoring"""
import osimport jsonimport subprocessfrom mcp.server.fastmcp import FastMCP
# Pro Tip: Wrap this server with Helicone for production logging# from helicone.helpers import ...
# Initialize FastMCPmcp = FastMCP("IBM-i-DTAQ-Monitor")
NODE_SCRIPT = "./sap_client.js"
@mcp.tool()def check_data_queue(library: str, queue: str, connection_string: str) -> str: """ Checks an IBM i Data Queue (DTAQ) for new messages using Node.js ibm_db.
Args: library: The library name (e.g., 'QGPL'). queue: The data queue name (e.g., 'ORDERQ'). connection_string: The ODBC connection string (DATABASE=...;HOSTNAME=...;UID=...;PWD=...). """
payload = { "library": library, "queue": queue, "connectionString": connection_string }
try: # Spawn the Node.js worker process = subprocess.Popen( ["node", NODE_SCRIPT], stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True )
# Pass JSON to worker stdin stdout, stderr = process.communicate(input=json.dumps(payload))
if process.returncode != 0: return f"Worker Error: {stderr or stdout}"
return stdout
except Exception as e: return f"System Error: {str(e)}"
if __name__ == '__main__': # Bind to 0.0.0.0 to ensure Docker compatibility mcp.run(transport='sse', host='0.0.0.0', port=8000)3. The Dockerfile
Section titled “3. The Dockerfile”To make this portable, we need a container with both Python (for MCP) and Node.js (for IBM i).
# Use a slim Python baseFROM python:3.10-slim-bullseye
# 1. Install System Dependencies & Node.js# We need build-essential for compiling some Python/Node extensionsRUN apt-get update && apt-get install -y \ curl \ python3-dev \ build-essential \ && curl -fsSL https://deb.nodesource.com/setup_18.x | bash - \ && apt-get install -y nodejs \ && rm -rf /var/lib/apt/lists/*
# 2. Setup App DirectoryWORKDIR /app
# 3. Install Python libsRUN pip install "mcp[cli]" uvicorn
# 4. Install Node libs# Just creating a simple package.json on the fly to track dependencies if needed,# or install directly.RUN npm install ibm_db
# 5. Copy Application CodeCOPY server.py .COPY sap_client.js .
# 6. Expose the MCP PortEXPOSE 8000
# 7. Run the ServerCMD ["python", "server.py"]4. Agent Connectivity (CrewAI)
Section titled “4. Agent Connectivity (CrewAI)”While this architecture supports any MCP client, CrewAI provides the most seamless integration via the mcps parameter. This allows the agent to automatically discover the check_data_queue tool running in the Docker container.
import osfrom crewai import Agent, Task, Crew
# Configuration for the AgentMCP_SERVER_URL = "http://localhost:8000/sse"
# NOTE: In production, store credentials in environment variablesIBM_CONN_STR = "DATABASE=MYIBMI;HOSTNAME=192.168.1.10;UID=MYUSER;PWD=MYPASS;PORT=50000;PROTOCOL=TCPIP"
# 1. Define the Agent with MCP Access# The 'mcps' parameter automatically connects and tools the agentibm_monitor_agent = Agent( role="IBM i System Monitor", goal="Monitor legacy data queues for new orders", backstory="You are a specialized integration agent responsible for bridging " "modern workflows with the legacy AS/400 system.", mcps=[MCP_SERVER_URL], # Connects to the Dockerized FastMCP server verbose=True)
# 2. Define the Taskmonitor_task = Task( description=f""" Check the 'ORDERQ' data queue in library 'QGPL'. Use the connection string: '{IBM_CONN_STR}'
If a message is found: 1. Read the message content. 2. Summarize the order details.
If the queue is empty, report that no new orders are pending. """, expected_output="A summary of the processed message or a status report.", agent=ibm_monitor_agent)
# 3. Execute the Crewcrew = Crew( agents=[ibm_monitor_agent], tasks=[monitor_task])
if __name__ == "__main__": result = crew.kickoff() print("### Task Result ###") print(result)Why CrewAI?
Section titled “Why CrewAI?”CrewAI’s native support for MCP (mcps=[...]) removes the boilerplate of manually creating client sessions and wrapping tools. This makes it the ideal orchestrator for this polyglot architecture.
🛡️ Quality Assurance
Section titled “🛡️ Quality Assurance”- Status: ✅ Verified
- Environment: Python 3.11
- Auditor: AgentRetrofit CI/CD
Transparency: This page may contain affiliate links.