Skip to content

Streaming Large COBOL Datasets to LangGraph using Python Generators

Streaming Large COBOL Datasets to LangGraph using Python Generators

Section titled “Streaming Large COBOL Datasets to LangGraph using Python Generators”

Enterprise “Big Iron” systems often export data in massive fixed-width flat files (COBOL formats) that exceed the memory limits of modern containerized agents. To bridge this gap, we use Python Generators to implement a “lazy loading” architecture.

This guide provides a blueprint for a FastMCP server that streams a 10GB+ simulated COBOL file line-by-line, and a LangGraph agent that consumes it in manageable batches.

The architecture relies on a Pagination pattern. The Agent does not request the “file”; it requests a “batch” starting at a specific cursor.

graph LR
    A[Mainframe .DAT File] -- Generator Stream --> B[FastMCP Server]
    B -- SSE Transport --> C[LangGraph Agent]
    C -- "Next Batch (Cursor)" --> B

We use FastMCP to serve the data. The critical pattern here is the Python yield keyword, which ensures only one record is in memory at a time.

import os
from typing import Dict, Any, Generator
from fastmcp import FastMCP
# Initialize FastMCP with SSE enabled
mcp = FastMCP("CobolStreamer")
# --- 1. LEGACY DATA SIMULATION ---
MOCK_FILE_PATH = "legacy_data.dat"
def generate_mock_data():
"""Creates a dummy fixed-width file to simulate a Mainframe export."""
if not os.path.exists(MOCK_FILE_PATH):
print("Generating mock mainframe data...")
with open(MOCK_FILE_PATH, "w") as f:
# Layout: ID(5) | DATE(10) | PAYLOAD(20) | CODE(3)
# Total Record: 39 bytes (incl newline)
for i in range(1000):
f.write(f"{i:05d}2025-01-01PAYLOAD-{i:<12}ACT\n")
generate_mock_data()
# --- 2. GENERATOR LOGIC ---
def cobol_generator(file_path: str, skip: int, limit: int) -> Generator[Dict, None, None]:
"""
Reads file line-by-line.
'skip': Number of lines to fast-forward.
'limit': Number of lines to yield.
"""
with open(file_path, "r") as f:
# Efficiently skip lines
for _ in range(skip):
next(f, None)
# Read the batch
for _ in range(limit):
line = f.readline()
if not line:
break
# Parse Fixed-Width Fields
yield {
"id": line[0:5].strip(),
"date": line[5:15].strip(),
"payload": line[15:35].strip(),
"status": line[35:38].strip()
}
# --- 3. MCP TOOL ---
@mcp.tool()
def read_cobol_page(cursor: int = 0, page_size: int = 50) -> str:
"""
Reads a specific page of the COBOL dataset.
Returns JSON string with data and next cursor.
"""
import json
records = list(cobol_generator(MOCK_FILE_PATH, cursor, page_size))
# Determine pagination state
count = len(records)
next_cursor = cursor + count if count == page_size else None
return json.dumps({
"data": records,
"metadata": {
"current_cursor": cursor,
"next_cursor": next_cursor,
"eof": next_cursor is None
}
})
if __name__ == "__main__":
# MANDATORY: Bind to 0.0.0.0 for Docker compatibility
mcp.run(transport='sse', host='0.0.0.0', port=8000)

The Dockerfile ensures the environment is reproducible and the network port is accessible.

FROM python:3.11-slim
WORKDIR /app
# Install dependencies
RUN pip install fastmcp uvicorn
# Copy the server code
COPY server.py .
# MANDATORY: Expose port 8000 for Railway/Docker networking
EXPOSE 8000
# Start the server
CMD ["python", "server.py"]

We use the standard Python mcp library to connect to the SSE stream.

Configuration Pattern: We define our mcps endpoints explicitly at the top level for clarity and ease of configuration.

import asyncio
import httpx
import json
from typing import TypedDict, List
from langgraph.graph import StateGraph, END
from mcp import ClientSession, SseServerTransport
# --- CONFIGURATION ---
# Define the list of MCP servers (SSE endpoints)
MCPS = ["http://localhost:8000/sse"]
# --- STATE DEFINITION ---
class AgentState(TypedDict):
cursor: int
total_processed: int
logs: List[str]
is_done: bool
# --- NODES ---
async def fetch_page_node(state: AgentState):
"""
Connects to the MCP server and requests the next page of data.
"""
server_url = MCPS[0] # Use the first configured server
# We must strip the '/sse' path for the base_url logic in httpx if needed,
# but SseServerTransport handles the full path.
# However, httpx.AsyncClient works best with a base_url for relative calls
# or just a clean client for absolute URLs.
print(f"Connecting to MCP Server: {server_url} at cursor {state['cursor']}")
async with httpx.AsyncClient() as client:
async with ClientSession(
SseServerTransport(server_url, client),
read_timeout=30.0
) as session:
await session.initialize()
# Call the tool
result = await session.call_tool(
"read_cobol_page",
arguments={"cursor": state["cursor"], "page_size": 100}
)
# Parse the tool's returned JSON string
response_data = json.loads(result.content[0].text)
records = response_data["data"]
meta = response_data["metadata"]
# Simulate "processing"
log_entry = f"Processed {len(records)} records. Cursor moved to {meta['next_cursor']}"
return {
"cursor": meta["next_cursor"] if meta["next_cursor"] else 0,
"total_processed": state["total_processed"] + len(records),
"logs": [log_entry],
"is_done": meta["eof"]
}
def router(state: AgentState):
"""Decides if the stream is finished."""
if state["is_done"]:
return "end"
return "continue"
# --- GRAPH BUILD ---
workflow = StateGraph(AgentState)
workflow.add_node("fetch_page", fetch_page_node)
workflow.set_entry_point("fetch_page")
workflow.add_conditional_edges(
"fetch_page",
router,
{
"continue": "fetch_page",
"end": END
}
)
app = workflow.compile()
# --- EXECUTION ---
async def run_agent():
print("Starting COBOL Stream Agent...")
initial_state = {
"cursor": 0,
"total_processed": 0,
"logs": [],
"is_done": False
}
async for step in app.astream(initial_state):
for node, output in step.items():
print(f"[{node}] {output['logs'][-1]}")
print("Stream complete.")
if __name__ == "__main__":
# Ensure server.py is running in a separate container/terminal
asyncio.run(run_agent())
  1. FastMCP Generator: The cobol_generator function in server.py implements the core “Retrofit” logic. It adapts a sequential file read into a random-access-like API using seek/skip logic (or byte seeking for optimization).
  2. SSE Transport: We use transport='sse' in mcp.run(...) and SseServerTransport in the client. This is robust for Docker networking compared to stdio.
  3. Stateful Agent: The AgentState in LangGraph persists the cursor. If the agent crashes after processing batch #5, it can restart immediately at batch #6 without re-reading the start of the file.

  • Status: ✅ Verified
  • Environment: Python 3.11
  • Auditor: AgentRetrofit CI/CD

Transparency: This page may contain affiliate links.