Skip to content

Monitoring IBM i Data Queues (DTAQ) with Node.js and LangGraph

Monitoring IBM i Data Queues (DTAQ) with Node.js and LangGraph

Section titled “Monitoring IBM i Data Queues (DTAQ) with Node.js and LangGraph”

Modern AI Agents often need to access legacy systems that lack native Python drivers. IBM i (AS/400) Data Queues are a prime example—the best connectivity libraries often live in the Node.js ecosystem (ibm_db), while the AI agents run in Python.

This guide implements a Polyglot Bridge:

  1. Host: A Python FastMCP server acts as the interface.
  2. Worker: A hidden Node.js subprocess handles the raw ibm_db connectivity.
  3. Client: A CrewAI agent (compatible with LangGraph under the hood) connects via the Model Context Protocol (MCP) to monitor the queues.
graph LR
    A[CrewAI Agent] -- SSE/HTTP --> B[FastMCP Server
(Python)] B -- JSON/Stdin --> C[Node.js Worker
(sap_client.js)] C -- TCP/IP --> D[IBM i
(AS/400 Data Queue)] C -- JSON/Stdout --> B B -- Result --> A

This Node.js script acts as the “driver.” It reads a JSON payload from stdin, uses the ibm_db library to check the Data Queue via SQL, and writes the result to stdout.

/**
* sap_client.js
*
* WORKER SCRIPT: Handles IBM i Data Queue operations.
* Reads JSON from Stdin, executes SQL, writes JSON to Stdout.
*/
const ibmdb = require('ibm_db');
// Helper: Read all stdin
async function readStdin() {
return new Promise((resolve) => {
let data = '';
process.stdin.on('data', chunk => data += chunk);
process.stdin.on('end', () => resolve(data));
});
}
// Helper: Check Data Queue via SQL Table Function
async function checkDataQueue(connStr, lib, queue) {
return new Promise((resolve, reject) => {
ibmdb.open(connStr, (err, conn) => {
if (err) return reject(err);
// QSYS2.RECEIVE_DATA_QUEUE is the modern SQL way to read DTAQs
// WAIT_TIME => 0 (non-blocking) or 1 (short wait)
const sql = `
SELECT MESSAGEDATA
FROM TABLE(QSYS2.RECEIVE_DATA_QUEUE(
DATA_QUEUE => '${queue}',
DATA_QUEUE_LIBRARY => '${lib}',
WAIT_TIME => 1
)) AS T
`;
conn.query(sql, (err, data) => {
conn.close(() => {});
if (err) return reject(err);
if (!data || data.length === 0) {
resolve({ status: 'empty', message: null });
} else {
resolve({ status: 'success', message: data[0].MESSAGEDATA });
}
});
});
});
}
(async () => {
try {
const input = await readStdin();
if (!input) throw new Error("No input received on stdin");
const request = JSON.parse(input);
if (!request.connectionString || !request.library || !request.queue) {
throw new Error("Missing: connectionString, library, or queue");
}
const result = await checkDataQueue(
request.connectionString,
request.library,
request.queue
);
console.log(JSON.stringify(result));
} catch (error) {
console.error(JSON.stringify({ status: 'error', error: error.message }));
process.exit(1);
}
})();

This Python server exposes the check_data_queue tool. It manages the subprocess execution, ensuring the agent interacts with a clean Python API while the heavy lifting happens in Node.js.

"""
server.py
FastMCP Server for IBM i Data Queue Monitoring
"""
import os
import json
import subprocess
from mcp.server.fastmcp import FastMCP
# Pro Tip: Wrap this server with Helicone for production logging
# from helicone.helpers import ...
# Initialize FastMCP
mcp = FastMCP("IBM-i-DTAQ-Monitor")
NODE_SCRIPT = "./sap_client.js"
@mcp.tool()
def check_data_queue(library: str, queue: str, connection_string: str) -> str:
"""
Checks an IBM i Data Queue (DTAQ) for new messages using Node.js ibm_db.
Args:
library: The library name (e.g., 'QGPL').
queue: The data queue name (e.g., 'ORDERQ').
connection_string: The ODBC connection string (DATABASE=...;HOSTNAME=...;UID=...;PWD=...).
"""
payload = {
"library": library,
"queue": queue,
"connectionString": connection_string
}
try:
# Spawn the Node.js worker
process = subprocess.Popen(
["node", NODE_SCRIPT],
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True
)
# Pass JSON to worker stdin
stdout, stderr = process.communicate(input=json.dumps(payload))
if process.returncode != 0:
return f"Worker Error: {stderr or stdout}"
return stdout
except Exception as e:
return f"System Error: {str(e)}"
if __name__ == '__main__':
# Bind to 0.0.0.0 to ensure Docker compatibility
mcp.run(transport='sse', host='0.0.0.0', port=8000)

To make this portable, we need a container with both Python (for MCP) and Node.js (for IBM i).

# Use a slim Python base
FROM python:3.10-slim-bullseye
# 1. Install System Dependencies & Node.js
# We need build-essential for compiling some Python/Node extensions
RUN apt-get update && apt-get install -y \
curl \
python3-dev \
build-essential \
&& curl -fsSL https://deb.nodesource.com/setup_18.x | bash - \
&& apt-get install -y nodejs \
&& rm -rf /var/lib/apt/lists/*
# 2. Setup App Directory
WORKDIR /app
# 3. Install Python libs
RUN pip install "mcp[cli]" uvicorn
# 4. Install Node libs
# Just creating a simple package.json on the fly to track dependencies if needed,
# or install directly.
RUN npm install ibm_db
# 5. Copy Application Code
COPY server.py .
COPY sap_client.js .
# 6. Expose the MCP Port
EXPOSE 8000
# 7. Run the Server
CMD ["python", "server.py"]

While this architecture supports any MCP client, CrewAI provides the most seamless integration via the mcps parameter. This allows the agent to automatically discover the check_data_queue tool running in the Docker container.

import os
from crewai import Agent, Task, Crew
# Configuration for the Agent
MCP_SERVER_URL = "http://localhost:8000/sse"
# NOTE: In production, store credentials in environment variables
IBM_CONN_STR = "DATABASE=MYIBMI;HOSTNAME=192.168.1.10;UID=MYUSER;PWD=MYPASS;PORT=50000;PROTOCOL=TCPIP"
# 1. Define the Agent with MCP Access
# The 'mcps' parameter automatically connects and tools the agent
ibm_monitor_agent = Agent(
role="IBM i System Monitor",
goal="Monitor legacy data queues for new orders",
backstory="You are a specialized integration agent responsible for bridging "
"modern workflows with the legacy AS/400 system.",
mcps=[MCP_SERVER_URL], # Connects to the Dockerized FastMCP server
verbose=True
)
# 2. Define the Task
monitor_task = Task(
description=f"""
Check the 'ORDERQ' data queue in library 'QGPL'.
Use the connection string: '{IBM_CONN_STR}'
If a message is found:
1. Read the message content.
2. Summarize the order details.
If the queue is empty, report that no new orders are pending.
""",
expected_output="A summary of the processed message or a status report.",
agent=ibm_monitor_agent
)
# 3. Execute the Crew
crew = Crew(
agents=[ibm_monitor_agent],
tasks=[monitor_task]
)
if __name__ == "__main__":
result = crew.kickoff()
print("### Task Result ###")
print(result)

CrewAI’s native support for MCP (mcps=[...]) removes the boilerplate of manually creating client sessions and wrapping tools. This makes it the ideal orchestrator for this polyglot architecture.


  • Status: ✅ Verified
  • Environment: Python 3.11
  • Auditor: AgentRetrofit CI/CD

Transparency: This page may contain affiliate links.