Skip to main content
21nauts
MCPDatabaseSingleStore

Database Integration Patterns with MCP

Learn how to connect databases to MCP servers with practical examples for SingleStore, PostgreSQL, and other data sources. Includes automation patterns and natural language database operations.

January 3, 2025
14 min read
SingleStore Labs

Database Integration Patterns with MCP

Database integration through the Model Context Protocol (MCP) enables AI applications to interact with data using natural language, making complex database operations accessible to both technical and non-technical users. This guide explores various database integration patterns and practical implementations.

Why Database Integration Matters

Traditional database interaction requires:

  • SQL expertise for complex queries
  • Programming knowledge for application integration
  • Manual effort for routine operations
  • Context switching between tools and interfaces

MCP eliminates these barriers by providing:

  • Natural language database queries
  • Automated operations through AI assistance
  • Unified interface for multiple database types
  • Intelligent query optimization and suggestions

SingleStore MCP Server

SingleStore provides a comprehensive MCP server that demonstrates best practices for database integration.

Installation and Setup

Via Smithery (Recommended)

npx -y @smithery/cli install @singlestore-labs/mcp-server-singlestore --client claude
Bash

Via npm

npm install singlestore-mcp-server
Bash

Direct Installation

git clone https://github.com/singlestore-labs/mcp-server-singlestore.git
cd mcp-server-singlestore
pip install -e .
Bash

Configuration

Claude Desktop Configuration

{
  "mcpServers": {
    "singlestore": {
      "command": "singlestore-mcp-client",
      "args": [],
      "env": {
        "SINGLESTORE_CONNECTION_STRING": "mysql://user:password@host:port/database"
      }
    }
  }
}
JSON

Connection String Format

# Basic connection
mysql://username:password@hostname:port/database

# With SSL
mysql://username:password@hostname:port/database?ssl=true

# With connection pool
mysql://username:password@hostname:port/database?pool_size=10
Bash

Capabilities

The SingleStore MCP server provides:

Schema Exploration

  • List databases and tables
  • Describe table structures
  • View indexes and constraints
  • Explore relationships

Query Execution

  • Run SELECT statements
  • Execute INSERT, UPDATE, DELETE operations
  • Call stored procedures
  • Execute complex analytical queries

Data Analysis

  • Aggregate data across tables
  • Generate reports and insights
  • Perform statistical analysis
  • Create data visualizations

Database Management

  • Create and modify tables
  • Manage indexes
  • Handle user permissions
  • Monitor performance

Natural Language Database Operations

Basic Queries

With MCP, you can perform database operations using natural language:

"Show me the top 10 customers by revenue this quarter"

Translates to:

SELECT customer_name, SUM(revenue) as total_revenue
FROM sales
WHERE quarter = QUARTER(NOW())
  AND year = YEAR(NOW())
GROUP BY customer_id, customer_name
ORDER BY total_revenue DESC
LIMIT 10;
SQL

Complex Analytics

"Compare monthly sales trends between product categories for the last year"

Generates:

SELECT
    MONTH(sale_date) as month,
    YEAR(sale_date) as year,
    category,
    SUM(amount) as monthly_sales
FROM sales s
JOIN products p ON s.product_id = p.id
WHERE sale_date >= DATE_SUB(NOW(), INTERVAL 1 YEAR)
GROUP BY YEAR(sale_date), MONTH(sale_date), category
ORDER BY year, month, category;
SQL

Data Modifications

"Update all products in the 'Electronics' category to have a 10% discount"

Executes:

UPDATE products
SET discount_percentage = 10
WHERE category = 'Electronics';
SQL

PostgreSQL Integration

Custom PostgreSQL MCP Server

import asyncpg
from mcp import McpServer
from typing import Any, Dict

class PostgreSQLMCPServer:
    def __init__(self, connection_string: str):
        self.connection_string = connection_string
        self.server = McpServer("postgresql-server")
        self.setup_tools()

    def setup_tools(self):
        @self.server.tool("execute_query")
        async def execute_query(query: str) -> Dict[str, Any]:
            """Execute a PostgreSQL query"""
            try:
                conn = await asyncpg.connect(self.connection_string)

                if query.strip().upper().startswith('SELECT'):
                    rows = await conn.fetch(query)
                    result = [dict(row) for row in rows]
                    return {
                        "type": "query_result",
                        "data": result,
                        "row_count": len(result)
                    }
                else:
                    result = await conn.execute(query)
                    return {
                        "type": "execution_result",
                        "message": f"Query executed: {result}",
                        "affected_rows": result.split()[-1] if result else "0"
                    }

            except Exception as e:
                return {
                    "type": "error",
                    "message": str(e)
                }
            finally:
                await conn.close()

        @self.server.tool("describe_table")
        async def describe_table(table_name: str) -> Dict[str, Any]:
            """Get table schema information"""
            query = """
                SELECT column_name, data_type, is_nullable, column_default
                FROM information_schema.columns
                WHERE table_name = $1
                ORDER BY ordinal_position;
            """

            conn = await asyncpg.connect(self.connection_string)
            rows = await conn.fetch(query, table_name)
            await conn.close()

            return {
                "table_name": table_name,
                "columns": [dict(row) for row in rows]
            }
Python

Configuration

{
  "mcpServers": {
    "postgresql": {
      "command": "python",
      "args": ["postgresql_mcp_server.py"],
      "env": {
        "DATABASE_URL": "postgresql://user:password@localhost:5432/database"
      }
    }
  }
}
JSON

MongoDB Integration

MongoDB MCP Server

from pymongo import MongoClient
from mcp import McpServer
import json

class MongoMCPServer:
    def __init__(self, connection_string: str):
        self.client = MongoClient(connection_string)
        self.server = McpServer("mongodb-server")
        self.setup_tools()

    def setup_tools(self):
        @self.server.tool("find_documents")
        async def find_documents(
            database: str,
            collection: str,
            query: str = "{}",
            limit: int = 10
        ) -> Dict[str, Any]:
            """Find documents in MongoDB collection"""
            try:
                db = self.client[database]
                coll = db[collection]

                query_dict = json.loads(query) if query != "{}" else {}
                cursor = coll.find(query_dict).limit(limit)

                documents = []
                for doc in cursor:
                    # Convert ObjectId to string for JSON serialization
                    doc['_id'] = str(doc['_id'])
                    documents.append(doc)

                return {
                    "database": database,
                    "collection": collection,
                    "documents": documents,
                    "count": len(documents)
                }

            except Exception as e:
                return {"error": str(e)}

        @self.server.tool("aggregate")
        async def aggregate(
            database: str,
            collection: str,
            pipeline: str
        ) -> Dict[str, Any]:
            """Execute aggregation pipeline"""
            try:
                db = self.client[database]
                coll = db[collection]

                pipeline_list = json.loads(pipeline)
                result = list(coll.aggregate(pipeline_list))

                # Convert ObjectIds to strings
                for doc in result:
                    if '_id' in doc:
                        doc['_id'] = str(doc['_id'])

                return {
                    "database": database,
                    "collection": collection,
                    "result": result
                }

            except Exception as e:
                return {"error": str(e)}
Python

Redis Integration

Redis MCP Server for Caching

import redis
from mcp import McpServer

class RedisMCPServer:
    def __init__(self, host: str = 'localhost', port: int = 6379, db: int = 0):
        self.redis_client = redis.Redis(host=host, port=port, db=db, decode_responses=True)
        self.server = McpServer("redis-server")
        self.setup_tools()

    def setup_tools(self):
        @self.server.tool("get_value")
        async def get_value(key: str) -> Dict[str, Any]:
            """Get value from Redis"""
            try:
                value = self.redis_client.get(key)
                return {
                    "key": key,
                    "value": value,
                    "exists": value is not None
                }
            except Exception as e:
                return {"error": str(e)}

        @self.server.tool("set_value")
        async def set_value(key: str, value: str, ttl: int = None) -> Dict[str, Any]:
            """Set value in Redis with optional TTL"""
            try:
                if ttl:
                    result = self.redis_client.setex(key, ttl, value)
                else:
                    result = self.redis_client.set(key, value)

                return {
                    "key": key,
                    "value": value,
                    "success": bool(result),
                    "ttl": ttl
                }
            except Exception as e:
                return {"error": str(e)}

        @self.server.tool("list_keys")
        async def list_keys(pattern: str = "*") -> Dict[str, Any]:
            """List keys matching pattern"""
            try:
                keys = self.redis_client.keys(pattern)
                return {
                    "pattern": pattern,
                    "keys": keys,
                    "count": len(keys)
                }
            except Exception as e:
                return {"error": str(e)}
Python

Advanced Database Patterns

Multi-Database Queries

Combine data from multiple database sources:

class MultiDatabaseMCPServer:
    def __init__(self):
        self.postgres_conn = "postgresql://..."
        self.mongo_client = MongoClient("mongodb://...")
        self.redis_client = redis.Redis()
        self.server = McpServer("multi-db-server")
        self.setup_tools()

    def setup_tools(self):
        @self.server.tool("cross_database_analysis")
        async def cross_database_analysis(user_id: str) -> Dict[str, Any]:
            """Analyze user data across multiple databases"""
            # Get user profile from PostgreSQL
            pg_conn = await asyncpg.connect(self.postgres_conn)
            user_profile = await pg_conn.fetchrow(
                "SELECT * FROM users WHERE id = $1", user_id
            )

            # Get user activities from MongoDB
            db = self.mongo_client.analytics
            activities = list(db.user_activities.find({"user_id": user_id}))

            # Get cached preferences from Redis
            preferences = self.redis_client.hgetall(f"user_prefs:{user_id}")

            return {
                "user_id": user_id,
                "profile": dict(user_profile) if user_profile else None,
                "activities": activities,
                "preferences": preferences,
                "analysis": self.analyze_user_data(user_profile, activities, preferences)
            }
Python

Real-time Data Streams

Integrate with streaming data:

@self.server.tool("stream_analysis")
async def stream_analysis(metric: str, time_window: str = "1h") -> Dict[str, Any]:
    """Analyze streaming metrics"""
    # Connect to streaming data source
    stream_data = await self.get_stream_data(metric, time_window)

    # Store aggregated results in database
    await self.store_analysis(metric, stream_data)

    # Cache results in Redis
    self.redis_client.setex(
        f"analysis:{metric}:{time_window}",
        300,  # 5 minute TTL
        json.dumps(stream_data)
    )

    return stream_data
Python

Automated Data Pipelines

Create intelligent data processing pipelines:

@self.server.tool("intelligent_etl")
async def intelligent_etl(source_table: str, target_table: str) -> Dict[str, Any]:
    """AI-powered ETL process"""
    # Analyze source schema
    source_schema = await self.analyze_schema(source_table)

    # Suggest optimal transformations
    transformations = await self.suggest_transformations(source_schema)

    # Execute ETL with AI-optimized query
    result = await self.execute_etl(source_table, target_table, transformations)

    return {
        "source": source_table,
        "target": target_table,
        "transformations": transformations,
        "result": result
    }
Python

Security and Performance

Connection Management

class SecureDatabaseMCP:
    def __init__(self):
        self.connection_pool = None
        self.setup_security()

    def setup_security(self):
        # Implement connection pooling
        self.connection_pool = create_pool(
            min_connections=1,
            max_connections=10,
            timeout=30
        )

        # Add query validation
        self.query_validator = SQLQueryValidator()

        # Setup audit logging
        self.audit_logger = AuditLogger()

    async def execute_secure_query(self, query: str, user_id: str):
        # Validate query for security
        if not self.query_validator.is_safe(query):
            raise SecurityError("Query contains prohibited operations")

        # Log query execution
        self.audit_logger.log_query(user_id, query)

        # Execute with connection from pool
        async with self.connection_pool.acquire() as conn:
            return await conn.fetch(query)
Python

Performance Optimization

@self.server.tool("optimized_query")
async def optimized_query(query: str) -> Dict[str, Any]:
    """Execute query with automatic optimization"""
    # Analyze query performance
    explain_result = await self.explain_query(query)

    # Suggest optimizations if needed
    if explain_result.cost > threshold:
        optimized_query = await self.optimize_query(query)
        return await self.execute_query(optimized_query)

    return await self.execute_query(query)
Python

Best Practices

Error Handling

try:
    result = await database_operation()
except DatabaseConnectionError:
    return {"error": "Database connection failed", "retry": True}
except QuerySyntaxError as e:
    return {"error": f"Invalid query: {e}", "suggestion": "Check SQL syntax"}
except PermissionError:
    return {"error": "Insufficient permissions", "required_role": "analyst"}
Python

Data Validation

def validate_query_parameters(params: Dict) -> bool:
    """Validate input parameters"""
    required_fields = ['database', 'table']

    for field in required_fields:
        if field not in params:
            raise ValueError(f"Missing required field: {field}")

    # Sanitize inputs
    params['table'] = sanitize_identifier(params['table'])
    return True
Python

Monitoring and Logging

import logging
from datetime import datetime

class DatabaseMCPMonitor:
    def __init__(self):
        self.logger = logging.getLogger("database-mcp")
        self.metrics = {}

    def log_operation(self, operation: str, duration: float, success: bool):
        self.logger.info(f"Operation: {operation}, Duration: {duration}s, Success: {success}")

        # Update metrics
        if operation not in self.metrics:
            self.metrics[operation] = {"count": 0, "total_duration": 0, "errors": 0}

        self.metrics[operation]["count"] += 1
        self.metrics[operation]["total_duration"] += duration

        if not success:
            self.metrics[operation]["errors"] += 1
Python

Conclusion

Database integration through MCP transforms how we interact with data, making complex database operations accessible through natural language while maintaining security and performance. Whether you're working with SQL databases like PostgreSQL and SingleStore, NoSQL solutions like MongoDB, or caching layers like Redis, MCP provides a unified interface that enhances productivity and democratizes data access.

The patterns and examples in this guide provide a foundation for building robust database integrations that leverage the full power of AI-assisted data operations.


Ready to implement database integrations? Check out our server development guides and security best practices for production deployments.