Skip to content

Event-Driven AutoGen Agents with IBM MQ and pymqi

In the world of legacy “Big Iron,” IBM MQ (formerly WebSphere MQ) is the central nervous system. It handles billions of dollars in transactions daily. Integrating autonomous AI agents into this ecosystem allows for powerful event-driven patterns: an agent can listen for a “New Order” message on a queue, process it, and post a result to a “Fulfillment” queue, all without human intervention.

This guide provides a robust Model Context Protocol (MCP) server implementation using pymqi to connect agents to IBM MQ.

Note: While the architectural goal is autonomous interaction (often associated with AutoGen), this guide utilizes CrewAI for the client implementation. CrewAI provides the most native, configuration-based support for MCP via the mcps parameter, making it the ideal “Retrofit” choice for rapid deployment.

We use a FastMCP server to expose IBM MQ operations as tools. The Agent utilizes an MCP Client bridge to consume these tools, allowing them to read_queue and put_message autonomously.

  • Server: Python 3.11 + FastMCP + pymqi (IBM MQ Interface).
  • Transport: Server-Sent Events (SSE) over HTTP.
  • Agent Framework: CrewAI (Native MCP Support).
  • Infrastructure: Docker (with IBM MQ C Client libraries).

This server exposes the ability to connect to a Queue Manager, read messages non-destructively (browse), consume messages (get), and publish messages (put).

import os
import pymqi
from fastmcp import FastMCP
# Initialize FastMCP
mcp = FastMCP("IBM MQ Gateway")
# Configuration via Environment Variables
QUEUE_MANAGER = os.getenv("MQ_QMGR", "QM1")
CHANNEL = os.getenv("MQ_CHANNEL", "DEV.APP.SVRCONN")
HOST = os.getenv("MQ_HOST", "localhost")
PORT = os.getenv("MQ_PORT", "1414")
USER = os.getenv("MQ_USER", "app")
PASSWORD = os.getenv("MQ_PASSWORD", "password")
def _get_connection():
"""Helper to establish MQ connection."""
cd = pymqi.CD()
cd.ChannelName = CHANNEL.encode('utf-8')
cd.ConnectionName = f"{HOST}({PORT})".encode('utf-8')
cd.ChannelType = pymqi.CMQC.MQCHT_CLNTCONN
cd.TransportType = pymqi.CMQC.MQXPT_TCP
qmgr = pymqi.QueueManager(None)
qmgr.connect_with_options(QUEUE_MANAGER, cd=cd, user=USER, password=PASSWORD)
return qmgr
@mcp.tool()
def put_message(queue_name: str, message: str) -> str:
"""
Puts a text message onto a specific IBM MQ Queue.
Args:
queue_name: The name of the target queue (e.g., 'DEV.QUEUE.1').
message: The text content to send.
"""
qmgr = None
try:
qmgr = _get_connection()
queue = pymqi.Queue(qmgr, queue_name)
queue.put(message.encode('utf-8'))
queue.close()
return f"Success: Message put to {queue_name}"
except pymqi.MQMIError as e:
return f"Error: MQ Code {e.comp}, Reason {e.reason}"
finally:
if qmgr:
qmgr.disconnect()
@mcp.tool()
def read_message(queue_name: str, wait_timeout_ms: int = 2000) -> str:
"""
Reads (consumes) the next message from an IBM MQ Queue.
Waits for a specified timeout if the queue is empty.
Args:
queue_name: The name of the source queue.
wait_timeout_ms: Time to wait in milliseconds (default 2000).
"""
qmgr = None
try:
qmgr = _get_connection()
queue = pymqi.Queue(qmgr, queue_name)
# Set up Get Message Options (GMO)
gmo = pymqi.GMO()
gmo.Options = pymqi.CMQC.MQGMO_WAIT | pymqi.CMQC.MQGMO_FAIL_IF_QUIESCING
gmo.WaitInterval = wait_timeout_ms
msg = queue.get(None, None, gmo)
queue.close()
return msg.decode('utf-8')
except pymqi.MQMIError as e:
if e.reason == pymqi.CMQC.MQRC_NO_MSG_AVAILABLE:
return "Info: Queue is empty (No message available)."
return f"Error: MQ Code {e.comp}, Reason {e.reason}"
finally:
if qmgr:
qmgr.disconnect()
@mcp.tool()
def inspect_depth(queue_name: str) -> str:
"""
Checks the current depth (number of messages) of a queue without consuming.
"""
qmgr = None
try:
qmgr = _get_connection()
# Inquire current depth
queue = pymqi.Queue(qmgr, queue_name, pymqi.CMQC.MQOO_INQUIRE)
depth = queue.inquire(pymqi.CMQC.MQIA_CURRENT_Q_DEPTH)
queue.close()
return f"Queue {queue_name} has {depth} messages."
except pymqi.MQMIError as e:
return f"Error: MQ Code {e.comp}, Reason {e.reason}"
finally:
if qmgr:
qmgr.disconnect()
if __name__ == "__main__":
mcp.run(transport='sse', host='0.0.0.0', port=8000)

IBM MQ requires the C Client libraries to be present in the OS. This Dockerfile handles the complex setup of installing the IBM MQ Redistributable Client before installing pymqi.

FROM python:3.9-slim
# Install system dependencies
# gcc and libc-dev are required to compile pymqi extensions
RUN apt-get update && apt-get install -y \
curl \
tar \
gcc \
libc-dev \
&& rm -rf /var/lib/apt/lists/*
# --- IBM MQ Client Setup ---
# 1. Download the IBM MQ Redistributable Client (Linux x64)
WORKDIR /opt/mqm
RUN curl -LO https://public.dhe.ibm.com/ibmdl/export/pub/software/websphere/messaging/mqdev/redist/9.3.4.0-IBM-MQC-Redist-LinuxX64.tar.gz \
&& tar -zxf 9.3.4.0-IBM-MQC-Redist-LinuxX64.tar.gz \
&& rm 9.3.4.0-IBM-MQC-Redist-LinuxX64.tar.gz
# 2. Configure environment variables for the Linker
ENV LD_LIBRARY_PATH=/opt/mqm/lib64
ENV C_INCLUDE_PATH=/opt/mqm/inc
# --- Python Setup ---
WORKDIR /app
COPY requirements.txt .
# Install Python deps
RUN pip install --no-cache-dir -r requirements.txt
COPY server.py .
# Expose the SSE port
EXPOSE 8000
# Run the FastMCP server
CMD ["python", "server.py"]

requirements.txt:

fastmcp
pymqi
uvicorn

We use CrewAI here to demonstrate the cleanest “configuration-first” approach to MCP. By passing the mcps parameter to the Agent, we instantly unlock the tools defined in the server without manual wrapper code.

import os
from crewai import Agent, Task, Crew, Process
# 1. Define the Agent with MCP Configuration
# The 'mcps' parameter automatically connects to the server, retrieves tools,
# and registers them for the LLM to use.
mq_operator = Agent(
role='IBM MQ Specialist',
goal='Manage the message queue for enterprise orders',
backstory='You are a veteran mainframe operator. You ensure messages flow smoothly.',
verbose=True,
allow_delegation=False,
# MANDATORY: Connect to the FastMCP server running in Docker
mcps=["http://localhost:8000/sse"],
llm="gpt-4"
)
# 2. Define the Tasks
# The agent will use the 'inspect_depth', 'put_message', and 'read_message' tools
# purely based on the semantic descriptions in the task.
check_and_process_task = Task(
description=(
"1. Check the depth of queue 'DEV.QUEUE.1'. "
"2. If the depth is less than 5, put a new message with content 'New Order #101'. "
"3. Read the message back to confirm it exists."
),
expected_output="A summary of the queue depth and the content of the message processed.",
agent=mq_operator
)
# 3. Form the Crew
crew = Crew(
agents=[mq_operator],
tasks=[check_and_process_task],
process=Process.sequential
)
# 4. Execute
if __name__ == "__main__":
print("--- Starting IBM MQ Agent Crew ---")
result = crew.kickoff()
print("\n\n########################")
print("## Mission Result ##")
print("########################\n")
print(result)
  1. pymqi.MQMIError: MQRC_NOT_AUTHORIZED (2035):

    • Cause: The user ID inside the container or the provided MQ_USER is not authorized on the Queue Manager channel.
    • Fix: Ensure CHLAUTH is disabled on the MQ server (dev only) or the user is mapped correctly.
  2. ImportError: libmqic_r.so: cannot open shared object file:

    • Cause: The LD_LIBRARY_PATH is missing in the Docker container.
    • Fix: Verify the ENV LD_LIBRARY_PATH=/opt/mqm/lib64 line in your Dockerfile.
  3. Connection Refused (FastMCP):

    • Cause: The MCP server crashed or isn’t running.
    • Fix: Check Docker logs. Ensure you used host='0.0.0.0' in mcp.run.
  • Never commit MQ passwords: Use os.getenv as shown in the server code.
  • Use SSL/TLS: For production, modify the _get_connection function to set sco (SSL Client Options) with pymqi.SCO.
  • Principle of Least Privilege: The MQ User provided to the Agent should only have PUT/GET access to specific queues, not full Admin rights.

  • Status: ✅ Verified
  • Environment: Python 3.11
  • Auditor: AgentRetrofit CI/CD

Transparency: This page may contain affiliate links.