Transport Overview

The transport module (src/transport/) implements the communication layer between MCP clients and the server. It handles HTTP requests, Server-Sent Events (SSE) for streaming, and manages the JSON-RPC 2.0 protocol used by MCP.

Module Structure

src/transport/
├── http.ts    # HTTP server transport implementation
└── sse.ts     # Server-Sent Events streaming support

Architecture

┌─────────────────────────────────────────┐
│           MCP Clients                   │
│    (Claude, IDEs, Custom Tools)         │
└─────────────────┬───────────────────────┘
                  │
        ┌─────────┴─────────┐
        │                   │
        ▼                   ▼
┌───────────────┐   ┌───────────────┐
│     HTTP      │   │      SSE      │
│   Requests    │   │   Streaming   │
└───────┬───────┘   └───────┬───────┘
        │                   │
        └─────────┬─────────┘
                  │
                  ▼
┌─────────────────────────────────────────┐
│        HttpServerTransport              │
│  • Request routing                      │
│  • Session management                   │
│  • Protocol handling                    │
└─────────────────┬───────────────────────┘
                  │
                  ▼
┌─────────────────────────────────────────┐
│          MCP Server Core                │
│  • Method dispatching                   │
│  • Response generation                  │
└─────────────────────────────────────────┘

Transport Components

HTTP Transport (http.ts)

The HTTP transport handles:

  • JSON-RPC 2.0 request/response cycles
  • Session management for stateful operations
  • Request routing to appropriate handlers
  • Error handling and response formatting

Key Features:

  • Stateless HTTP endpoints
  • Session tracking via headers
  • Request validation
  • Response streaming support

SSE Transport (sse.ts)

Server-Sent Events provide:

  • Real-time server-to-client communication
  • Long-running operation updates
  • Progress notifications
  • Streaming responses

Key Features:

  • Keep-alive connections
  • Event-based messaging
  • Automatic reconnection support
  • Multiple client handling

Protocol Flow

Standard Request/Response

Client                    Server
  │                         │
  ├──── HTTP POST ─────────>│
  │   JSON-RPC Request      │
  │                         │
  │<──── HTTP Response ─────┤
  │   JSON-RPC Response     │
  │                         │

Streaming with SSE

Client                    Server
  │                         │
  ├──── HTTP POST ─────────>│
  │   Accept: text/event-stream
  │                         │
  │<──── SSE Stream ────────┤
  │   event: data           │
  │   data: {...}           │
  │                         │
  │   event: data           │
  │   data: {...}           │
  │                         │
  │   event: done           │
  │                         │

Request Handling

Request Structure

interface TransportRequest {
    jsonrpc: "2.0";
    method: string;
    params?: any;
    id?: string | number;
}

Response Structure

interface TransportResponse {
    jsonrpc: "2.0";
    result?: any;
    error?: {
        code: number;
        message: string;
        data?: any;
    };
    id?: string | number;
}

Session Management

The transport layer maintains session state for:

  • Multi-step operations
  • Client identification
  • Request correlation
  • Progress tracking

Session Flow

// 1. Client initiates session
POST / 
X-Session-ID: unique-session-id

// 2. Server maintains session state
sessions.set(sessionId, {
    client: clientInfo,
    state: {},
    created: Date.now()
});

// 3. Subsequent requests use same session
POST /
X-Session-ID: unique-session-id

Error Handling

JSON-RPC Error Codes

Code Meaning Description
-32700 Parse error Invalid JSON
-32600 Invalid request Invalid JSON-RPC
-32601 Method not found Unknown method
-32602 Invalid params Invalid parameters
-32603 Internal error Server error

Error Response Format

{
    "jsonrpc": "2.0",
    "error": {
        "code": -32603,
        "message": "Internal server error",
        "data": {
            "details": "Additional error information"
        }
    },
    "id": "request-id"
}

Streaming Support

SSE Message Format

event: message
data: {"type": "progress", "value": 50}

event: result
data: {"type": "complete", "result": {...}}

event: error
data: {"type": "error", "error": "Failed"}

Streaming Implementation

// Initialize SSE
res.setHeader('Content-Type', 'text/event-stream');
res.setHeader('Cache-Control', 'no-cache');
res.setHeader('Connection', 'keep-alive');

// Send events
function sendEvent(event: string, data: any) {
    res.write(`event: ${event}\n`);
    res.write(`data: ${JSON.stringify(data)}\n\n`);
}

// Keep connection alive
const keepAlive = setInterval(() => {
    res.write(':keep-alive\n\n');
}, 30000);

Security Features

Request Validation

  • JSON schema validation
  • Method whitelist
  • Parameter sanitization
  • Size limits

Authentication

// Optional authentication via headers
const token = req.headers['authorization'];
if (token && !validateToken(token)) {
    throw new Error('Unauthorized');
}

CORS Support

// CORS headers for browser clients
app.use(cors({
    origin: process.env.ALLOWED_ORIGINS?.split(',') || '*',
    methods: ['GET', 'POST'],
    allowedHeaders: ['Content-Type', 'X-Session-ID'],
}));

Performance Optimization

Connection Pooling

  • Reuse HTTP connections
  • Efficient session management
  • Resource cleanup

Request Batching

// Batch multiple operations
{
    "jsonrpc": "2.0",
    "method": "batch",
    "params": {
        "requests": [
            { "method": "tools/list" },
            { "method": "resources/list" }
        ]
    }
}

Compression

// Enable compression for responses
app.use(compression({
    threshold: 1024, // Only compress responses > 1KB
}));

Client Integration

HTTP Client Example

const response = await fetch('http://server/mcp', {
    method: 'POST',
    headers: {
        'Content-Type': 'application/json',
        'X-Session-ID': sessionId,
    },
    body: JSON.stringify({
        jsonrpc: '2.0',
        method: 'tools/call',
        params: { name: 'scan', arguments: {} },
        id: 'req-1'
    }),
});

const result = await response.json();

SSE Client Example

const eventSource = new EventSource('/mcp?session=' + sessionId);

eventSource.onmessage = (event) => {
    const data = JSON.parse(event.data);
    console.log('Received:', data);
};

eventSource.onerror = (error) => {
    console.error('SSE error:', error);
};

Monitoring and Debugging

Request Logging

// Log all requests
transport.on('request', (req) => {
    logger.debug('Incoming request', {
        method: req.method,
        params: req.params,
        session: req.sessionId,
    });
});

Performance Metrics

// Track response times
const start = Date.now();
const response = await handleRequest(request);
const duration = Date.now() - start;

metrics.histogram('transport.request.duration', duration, {
    method: request.method,
});

Debug Mode

// Enable detailed logging
if (process.env.DEBUG_TRANSPORT) {
    transport.setDebugMode(true);
}

Error Recovery

Automatic Retry

// Client-side retry logic
async function requestWithRetry(request, maxRetries = 3) {
    for (let i = 0; i < maxRetries; i++) {
        try {
            return await sendRequest(request);
        } catch (error) {
            if (i === maxRetries - 1) throw error;
            await sleep(Math.pow(2, i) * 1000);
        }
    }
}

Connection Recovery

// SSE auto-reconnect
eventSource.onerror = () => {
    setTimeout(() => {
        eventSource = new EventSource(url);
        setupEventHandlers(eventSource);
    }, 5000);
};

Testing

Unit Testing

describe('HttpServerTransport', () => {
    it('should handle valid requests', async () => {
        const transport = new HttpServerTransport({ server });
        const response = await transport.handleRequest({
            jsonrpc: '2.0',
            method: 'test',
            id: '1'
        });
        
        expect(response.jsonrpc).toBe('2.0');
        expect(response.id).toBe('1');
    });
});

Integration Testing

it('should handle SSE streaming', (done) => {
    const client = new EventSource('/test');
    const messages = [];
    
    client.onmessage = (event) => {
        messages.push(JSON.parse(event.data));
        if (messages.length === 3) {
            expect(messages).toHaveLength(3);
            client.close();
            done();
        }
    };
});

Best Practices

1. Use Appropriate Transport

// Short operations - HTTP
const result = await httpRequest('tools/call', params);

// Long operations - SSE
const stream = new EventSource('/stream');
stream.onmessage = handleProgress;

2. Handle Disconnections

// Graceful SSE cleanup
req.on('close', () => {
    clearInterval(keepAlive);
    sessions.delete(sessionId);
});

3. Validate Input

// Always validate requests
if (!isValidJsonRpc(request)) {
    throw new JsonRpcError(-32600, 'Invalid request');
}

4. Implement Timeouts

// Prevent hanging requests
const timeout = setTimeout(() => {
    res.status(408).json({ error: 'Request timeout' });
}, 30000);