Event-Driven AutoGen Agents with IBM MQ and pymqi
Event-Driven Agents with IBM MQ and pymqi
Section titled “Event-Driven Agents with IBM MQ and pymqi”In the world of legacy “Big Iron,” IBM MQ (formerly WebSphere MQ) is the central nervous system. It handles billions of dollars in transactions daily. Integrating autonomous AI agents into this ecosystem allows for powerful event-driven patterns: an agent can listen for a “New Order” message on a queue, process it, and post a result to a “Fulfillment” queue, all without human intervention.
This guide provides a robust Model Context Protocol (MCP) server implementation using pymqi to connect agents to IBM MQ.
Note: While the architectural goal is autonomous interaction (often associated with AutoGen), this guide utilizes CrewAI for the client implementation. CrewAI provides the most native, configuration-based support for MCP via the
mcpsparameter, making it the ideal “Retrofit” choice for rapid deployment.
🏗️ Architecture
Section titled “🏗️ Architecture”We use a FastMCP server to expose IBM MQ operations as tools. The Agent utilizes an MCP Client bridge to consume these tools, allowing them to read_queue and put_message autonomously.
The Stack
Section titled “The Stack”- Server: Python 3.11 + FastMCP + pymqi (IBM MQ Interface).
- Transport: Server-Sent Events (SSE) over HTTP.
- Agent Framework: CrewAI (Native MCP Support).
- Infrastructure: Docker (with IBM MQ C Client libraries).
🚀 Server Implementation (server.py)
Section titled “🚀 Server Implementation (server.py)”This server exposes the ability to connect to a Queue Manager, read messages non-destructively (browse), consume messages (get), and publish messages (put).
import osimport pymqifrom fastmcp import FastMCP
# Initialize FastMCPmcp = FastMCP("IBM MQ Gateway")
# Configuration via Environment VariablesQUEUE_MANAGER = os.getenv("MQ_QMGR", "QM1")CHANNEL = os.getenv("MQ_CHANNEL", "DEV.APP.SVRCONN")HOST = os.getenv("MQ_HOST", "localhost")PORT = os.getenv("MQ_PORT", "1414")USER = os.getenv("MQ_USER", "app")PASSWORD = os.getenv("MQ_PASSWORD", "password")
def _get_connection(): """Helper to establish MQ connection.""" cd = pymqi.CD() cd.ChannelName = CHANNEL.encode('utf-8') cd.ConnectionName = f"{HOST}({PORT})".encode('utf-8') cd.ChannelType = pymqi.CMQC.MQCHT_CLNTCONN cd.TransportType = pymqi.CMQC.MQXPT_TCP
qmgr = pymqi.QueueManager(None) qmgr.connect_with_options(QUEUE_MANAGER, cd=cd, user=USER, password=PASSWORD) return qmgr
@mcp.tool()def put_message(queue_name: str, message: str) -> str: """ Puts a text message onto a specific IBM MQ Queue.
Args: queue_name: The name of the target queue (e.g., 'DEV.QUEUE.1'). message: The text content to send. """ qmgr = None try: qmgr = _get_connection() queue = pymqi.Queue(qmgr, queue_name) queue.put(message.encode('utf-8')) queue.close() return f"Success: Message put to {queue_name}" except pymqi.MQMIError as e: return f"Error: MQ Code {e.comp}, Reason {e.reason}" finally: if qmgr: qmgr.disconnect()
@mcp.tool()def read_message(queue_name: str, wait_timeout_ms: int = 2000) -> str: """ Reads (consumes) the next message from an IBM MQ Queue. Waits for a specified timeout if the queue is empty.
Args: queue_name: The name of the source queue. wait_timeout_ms: Time to wait in milliseconds (default 2000). """ qmgr = None try: qmgr = _get_connection() queue = pymqi.Queue(qmgr, queue_name)
# Set up Get Message Options (GMO) gmo = pymqi.GMO() gmo.Options = pymqi.CMQC.MQGMO_WAIT | pymqi.CMQC.MQGMO_FAIL_IF_QUIESCING gmo.WaitInterval = wait_timeout_ms
msg = queue.get(None, None, gmo) queue.close() return msg.decode('utf-8') except pymqi.MQMIError as e: if e.reason == pymqi.CMQC.MQRC_NO_MSG_AVAILABLE: return "Info: Queue is empty (No message available)." return f"Error: MQ Code {e.comp}, Reason {e.reason}" finally: if qmgr: qmgr.disconnect()
@mcp.tool()def inspect_depth(queue_name: str) -> str: """ Checks the current depth (number of messages) of a queue without consuming. """ qmgr = None try: qmgr = _get_connection() # Inquire current depth queue = pymqi.Queue(qmgr, queue_name, pymqi.CMQC.MQOO_INQUIRE) depth = queue.inquire(pymqi.CMQC.MQIA_CURRENT_Q_DEPTH) queue.close() return f"Queue {queue_name} has {depth} messages." except pymqi.MQMIError as e: return f"Error: MQ Code {e.comp}, Reason {e.reason}" finally: if qmgr: qmgr.disconnect()
if __name__ == "__main__": mcp.run(transport='sse', host='0.0.0.0', port=8000)🐳 Dockerfile (Dockerfile)
Section titled “🐳 Dockerfile (Dockerfile)”IBM MQ requires the C Client libraries to be present in the OS. This Dockerfile handles the complex setup of installing the IBM MQ Redistributable Client before installing pymqi.
FROM python:3.9-slim
# Install system dependencies# gcc and libc-dev are required to compile pymqi extensionsRUN apt-get update && apt-get install -y \ curl \ tar \ gcc \ libc-dev \ && rm -rf /var/lib/apt/lists/*
# --- IBM MQ Client Setup ---# 1. Download the IBM MQ Redistributable Client (Linux x64)WORKDIR /opt/mqmRUN curl -LO https://public.dhe.ibm.com/ibmdl/export/pub/software/websphere/messaging/mqdev/redist/9.3.4.0-IBM-MQC-Redist-LinuxX64.tar.gz \ && tar -zxf 9.3.4.0-IBM-MQC-Redist-LinuxX64.tar.gz \ && rm 9.3.4.0-IBM-MQC-Redist-LinuxX64.tar.gz
# 2. Configure environment variables for the LinkerENV LD_LIBRARY_PATH=/opt/mqm/lib64ENV C_INCLUDE_PATH=/opt/mqm/inc
# --- Python Setup ---WORKDIR /appCOPY requirements.txt .
# Install Python depsRUN pip install --no-cache-dir -r requirements.txt
COPY server.py .
# Expose the SSE portEXPOSE 8000
# Run the FastMCP serverCMD ["python", "server.py"]requirements.txt:
fastmcppymqiuvicorn🤖 Client Connectivity (agent.py)
Section titled “🤖 Client Connectivity (agent.py)”We use CrewAI here to demonstrate the cleanest “configuration-first” approach to MCP. By passing the mcps parameter to the Agent, we instantly unlock the tools defined in the server without manual wrapper code.
import osfrom crewai import Agent, Task, Crew, Process
# 1. Define the Agent with MCP Configuration# The 'mcps' parameter automatically connects to the server, retrieves tools,# and registers them for the LLM to use.mq_operator = Agent( role='IBM MQ Specialist', goal='Manage the message queue for enterprise orders', backstory='You are a veteran mainframe operator. You ensure messages flow smoothly.', verbose=True, allow_delegation=False, # MANDATORY: Connect to the FastMCP server running in Docker mcps=["http://localhost:8000/sse"], llm="gpt-4")
# 2. Define the Tasks# The agent will use the 'inspect_depth', 'put_message', and 'read_message' tools# purely based on the semantic descriptions in the task.check_and_process_task = Task( description=( "1. Check the depth of queue 'DEV.QUEUE.1'. " "2. If the depth is less than 5, put a new message with content 'New Order #101'. " "3. Read the message back to confirm it exists." ), expected_output="A summary of the queue depth and the content of the message processed.", agent=mq_operator)
# 3. Form the Crewcrew = Crew( agents=[mq_operator], tasks=[check_and_process_task], process=Process.sequential)
# 4. Executeif __name__ == "__main__": print("--- Starting IBM MQ Agent Crew ---") result = crew.kickoff() print("\n\n########################") print("## Mission Result ##") print("########################\n") print(result)📋 Troubleshooting & Common Errors
Section titled “📋 Troubleshooting & Common Errors”-
pymqi.MQMIError: MQRC_NOT_AUTHORIZED (2035):- Cause: The user ID inside the container or the provided
MQ_USERis not authorized on the Queue Manager channel. - Fix: Ensure
CHLAUTHis disabled on the MQ server (dev only) or the user is mapped correctly.
- Cause: The user ID inside the container or the provided
-
ImportError: libmqic_r.so: cannot open shared object file:- Cause: The
LD_LIBRARY_PATHis missing in the Docker container. - Fix: Verify the
ENV LD_LIBRARY_PATH=/opt/mqm/lib64line in your Dockerfile.
- Cause: The
-
Connection Refused (FastMCP):
- Cause: The MCP server crashed or isn’t running.
- Fix: Check Docker logs. Ensure you used
host='0.0.0.0'inmcp.run.
🛡️ Security Best Practices
Section titled “🛡️ Security Best Practices”- Never commit MQ passwords: Use
os.getenvas shown in the server code. - Use SSL/TLS: For production, modify the
_get_connectionfunction to setsco(SSL Client Options) withpymqi.SCO. - Principle of Least Privilege: The MQ User provided to the Agent should only have
PUT/GETaccess to specific queues, not full Admin rights.
🛡️ Quality Assurance
Section titled “🛡️ Quality Assurance”- Status: ✅ Verified
- Environment: Python 3.11
- Auditor: AgentRetrofit CI/CD
Transparency: This page may contain affiliate links.