Database Integration Patterns with MCP
Database integration through the Model Context Protocol (MCP) enables AI applications to interact with data using natural language, making complex database operations accessible to both technical and non-technical users. This guide explores various database integration patterns and practical implementations.
Why Database Integration Matters
Traditional database interaction requires:
- SQL expertise for complex queries
- Programming knowledge for application integration
- Manual effort for routine operations
- Context switching between tools and interfaces
MCP eliminates these barriers by providing:
- Natural language database queries
- Automated operations through AI assistance
- Unified interface for multiple database types
- Intelligent query optimization and suggestions
SingleStore MCP Server
SingleStore provides a comprehensive MCP server that demonstrates best practices for database integration.
Installation and Setup
Via Smithery (Recommended)
npx -y @smithery/cli install @singlestore-labs/mcp-server-singlestore --client claude
Via npm
npm install singlestore-mcp-server
Direct Installation
git clone https://github.com/singlestore-labs/mcp-server-singlestore.git
cd mcp-server-singlestore
pip install -e .
Configuration
Claude Desktop Configuration
{
"mcpServers": {
"singlestore": {
"command": "singlestore-mcp-client",
"args": [],
"env": {
"SINGLESTORE_CONNECTION_STRING": "mysql://user:password@host:port/database"
}
}
}
}
Connection String Format
# Basic connection
mysql://username:password@hostname:port/database
# With SSL
mysql://username:password@hostname:port/database?ssl=true
# With connection pool
mysql://username:password@hostname:port/database?pool_size=10
Capabilities
The SingleStore MCP server provides:
Schema Exploration
- List databases and tables
- Describe table structures
- View indexes and constraints
- Explore relationships
Query Execution
- Run SELECT statements
- Execute INSERT, UPDATE, DELETE operations
- Call stored procedures
- Execute complex analytical queries
Data Analysis
- Aggregate data across tables
- Generate reports and insights
- Perform statistical analysis
- Create data visualizations
Database Management
- Create and modify tables
- Manage indexes
- Handle user permissions
- Monitor performance
Natural Language Database Operations
Basic Queries
With MCP, you can perform database operations using natural language:
"Show me the top 10 customers by revenue this quarter"
Translates to:
SELECT customer_name, SUM(revenue) as total_revenue
FROM sales
WHERE quarter = QUARTER(NOW())
AND year = YEAR(NOW())
GROUP BY customer_id, customer_name
ORDER BY total_revenue DESC
LIMIT 10;
Complex Analytics
"Compare monthly sales trends between product categories for the last year"
Generates:
SELECT
MONTH(sale_date) as month,
YEAR(sale_date) as year,
category,
SUM(amount) as monthly_sales
FROM sales s
JOIN products p ON s.product_id = p.id
WHERE sale_date >= DATE_SUB(NOW(), INTERVAL 1 YEAR)
GROUP BY YEAR(sale_date), MONTH(sale_date), category
ORDER BY year, month, category;
Data Modifications
"Update all products in the 'Electronics' category to have a 10% discount"
Executes:
UPDATE products
SET discount_percentage = 10
WHERE category = 'Electronics';
PostgreSQL Integration
Custom PostgreSQL MCP Server
import asyncpg
from mcp import McpServer
from typing import Any, Dict
class PostgreSQLMCPServer:
def __init__(self, connection_string: str):
self.connection_string = connection_string
self.server = McpServer("postgresql-server")
self.setup_tools()
def setup_tools(self):
@self.server.tool("execute_query")
async def execute_query(query: str) -> Dict[str, Any]:
"""Execute a PostgreSQL query"""
try:
conn = await asyncpg.connect(self.connection_string)
if query.strip().upper().startswith('SELECT'):
rows = await conn.fetch(query)
result = [dict(row) for row in rows]
return {
"type": "query_result",
"data": result,
"row_count": len(result)
}
else:
result = await conn.execute(query)
return {
"type": "execution_result",
"message": f"Query executed: {result}",
"affected_rows": result.split()[-1] if result else "0"
}
except Exception as e:
return {
"type": "error",
"message": str(e)
}
finally:
await conn.close()
@self.server.tool("describe_table")
async def describe_table(table_name: str) -> Dict[str, Any]:
"""Get table schema information"""
query = """
SELECT column_name, data_type, is_nullable, column_default
FROM information_schema.columns
WHERE table_name = $1
ORDER BY ordinal_position;
"""
conn = await asyncpg.connect(self.connection_string)
rows = await conn.fetch(query, table_name)
await conn.close()
return {
"table_name": table_name,
"columns": [dict(row) for row in rows]
}
Configuration
{
"mcpServers": {
"postgresql": {
"command": "python",
"args": ["postgresql_mcp_server.py"],
"env": {
"DATABASE_URL": "postgresql://user:password@localhost:5432/database"
}
}
}
}
MongoDB Integration
MongoDB MCP Server
from pymongo import MongoClient
from mcp import McpServer
import json
class MongoMCPServer:
def __init__(self, connection_string: str):
self.client = MongoClient(connection_string)
self.server = McpServer("mongodb-server")
self.setup_tools()
def setup_tools(self):
@self.server.tool("find_documents")
async def find_documents(
database: str,
collection: str,
query: str = "{}",
limit: int = 10
) -> Dict[str, Any]:
"""Find documents in MongoDB collection"""
try:
db = self.client[database]
coll = db[collection]
query_dict = json.loads(query) if query != "{}" else {}
cursor = coll.find(query_dict).limit(limit)
documents = []
for doc in cursor:
# Convert ObjectId to string for JSON serialization
doc['_id'] = str(doc['_id'])
documents.append(doc)
return {
"database": database,
"collection": collection,
"documents": documents,
"count": len(documents)
}
except Exception as e:
return {"error": str(e)}
@self.server.tool("aggregate")
async def aggregate(
database: str,
collection: str,
pipeline: str
) -> Dict[str, Any]:
"""Execute aggregation pipeline"""
try:
db = self.client[database]
coll = db[collection]
pipeline_list = json.loads(pipeline)
result = list(coll.aggregate(pipeline_list))
# Convert ObjectIds to strings
for doc in result:
if '_id' in doc:
doc['_id'] = str(doc['_id'])
return {
"database": database,
"collection": collection,
"result": result
}
except Exception as e:
return {"error": str(e)}
Redis Integration
Redis MCP Server for Caching
import redis
from mcp import McpServer
class RedisMCPServer:
def __init__(self, host: str = 'localhost', port: int = 6379, db: int = 0):
self.redis_client = redis.Redis(host=host, port=port, db=db, decode_responses=True)
self.server = McpServer("redis-server")
self.setup_tools()
def setup_tools(self):
@self.server.tool("get_value")
async def get_value(key: str) -> Dict[str, Any]:
"""Get value from Redis"""
try:
value = self.redis_client.get(key)
return {
"key": key,
"value": value,
"exists": value is not None
}
except Exception as e:
return {"error": str(e)}
@self.server.tool("set_value")
async def set_value(key: str, value: str, ttl: int = None) -> Dict[str, Any]:
"""Set value in Redis with optional TTL"""
try:
if ttl:
result = self.redis_client.setex(key, ttl, value)
else:
result = self.redis_client.set(key, value)
return {
"key": key,
"value": value,
"success": bool(result),
"ttl": ttl
}
except Exception as e:
return {"error": str(e)}
@self.server.tool("list_keys")
async def list_keys(pattern: str = "*") -> Dict[str, Any]:
"""List keys matching pattern"""
try:
keys = self.redis_client.keys(pattern)
return {
"pattern": pattern,
"keys": keys,
"count": len(keys)
}
except Exception as e:
return {"error": str(e)}
Advanced Database Patterns
Multi-Database Queries
Combine data from multiple database sources:
class MultiDatabaseMCPServer:
def __init__(self):
self.postgres_conn = "postgresql://..."
self.mongo_client = MongoClient("mongodb://...")
self.redis_client = redis.Redis()
self.server = McpServer("multi-db-server")
self.setup_tools()
def setup_tools(self):
@self.server.tool("cross_database_analysis")
async def cross_database_analysis(user_id: str) -> Dict[str, Any]:
"""Analyze user data across multiple databases"""
# Get user profile from PostgreSQL
pg_conn = await asyncpg.connect(self.postgres_conn)
user_profile = await pg_conn.fetchrow(
"SELECT * FROM users WHERE id = $1", user_id
)
# Get user activities from MongoDB
db = self.mongo_client.analytics
activities = list(db.user_activities.find({"user_id": user_id}))
# Get cached preferences from Redis
preferences = self.redis_client.hgetall(f"user_prefs:{user_id}")
return {
"user_id": user_id,
"profile": dict(user_profile) if user_profile else None,
"activities": activities,
"preferences": preferences,
"analysis": self.analyze_user_data(user_profile, activities, preferences)
}
Real-time Data Streams
Integrate with streaming data:
@self.server.tool("stream_analysis")
async def stream_analysis(metric: str, time_window: str = "1h") -> Dict[str, Any]:
"""Analyze streaming metrics"""
# Connect to streaming data source
stream_data = await self.get_stream_data(metric, time_window)
# Store aggregated results in database
await self.store_analysis(metric, stream_data)
# Cache results in Redis
self.redis_client.setex(
f"analysis:{metric}:{time_window}",
300, # 5 minute TTL
json.dumps(stream_data)
)
return stream_data
Automated Data Pipelines
Create intelligent data processing pipelines:
@self.server.tool("intelligent_etl")
async def intelligent_etl(source_table: str, target_table: str) -> Dict[str, Any]:
"""AI-powered ETL process"""
# Analyze source schema
source_schema = await self.analyze_schema(source_table)
# Suggest optimal transformations
transformations = await self.suggest_transformations(source_schema)
# Execute ETL with AI-optimized query
result = await self.execute_etl(source_table, target_table, transformations)
return {
"source": source_table,
"target": target_table,
"transformations": transformations,
"result": result
}
Security and Performance
Connection Management
class SecureDatabaseMCP:
def __init__(self):
self.connection_pool = None
self.setup_security()
def setup_security(self):
# Implement connection pooling
self.connection_pool = create_pool(
min_connections=1,
max_connections=10,
timeout=30
)
# Add query validation
self.query_validator = SQLQueryValidator()
# Setup audit logging
self.audit_logger = AuditLogger()
async def execute_secure_query(self, query: str, user_id: str):
# Validate query for security
if not self.query_validator.is_safe(query):
raise SecurityError("Query contains prohibited operations")
# Log query execution
self.audit_logger.log_query(user_id, query)
# Execute with connection from pool
async with self.connection_pool.acquire() as conn:
return await conn.fetch(query)
Performance Optimization
@self.server.tool("optimized_query")
async def optimized_query(query: str) -> Dict[str, Any]:
"""Execute query with automatic optimization"""
# Analyze query performance
explain_result = await self.explain_query(query)
# Suggest optimizations if needed
if explain_result.cost > threshold:
optimized_query = await self.optimize_query(query)
return await self.execute_query(optimized_query)
return await self.execute_query(query)
Best Practices
Error Handling
try:
result = await database_operation()
except DatabaseConnectionError:
return {"error": "Database connection failed", "retry": True}
except QuerySyntaxError as e:
return {"error": f"Invalid query: {e}", "suggestion": "Check SQL syntax"}
except PermissionError:
return {"error": "Insufficient permissions", "required_role": "analyst"}
Data Validation
def validate_query_parameters(params: Dict) -> bool:
"""Validate input parameters"""
required_fields = ['database', 'table']
for field in required_fields:
if field not in params:
raise ValueError(f"Missing required field: {field}")
# Sanitize inputs
params['table'] = sanitize_identifier(params['table'])
return True
Monitoring and Logging
import logging
from datetime import datetime
class DatabaseMCPMonitor:
def __init__(self):
self.logger = logging.getLogger("database-mcp")
self.metrics = {}
def log_operation(self, operation: str, duration: float, success: bool):
self.logger.info(f"Operation: {operation}, Duration: {duration}s, Success: {success}")
# Update metrics
if operation not in self.metrics:
self.metrics[operation] = {"count": 0, "total_duration": 0, "errors": 0}
self.metrics[operation]["count"] += 1
self.metrics[operation]["total_duration"] += duration
if not success:
self.metrics[operation]["errors"] += 1
Conclusion
Database integration through MCP transforms how we interact with data, making complex database operations accessible through natural language while maintaining security and performance. Whether you're working with SQL databases like PostgreSQL and SingleStore, NoSQL solutions like MongoDB, or caching layers like Redis, MCP provides a unified interface that enhances productivity and democratizes data access.
The patterns and examples in this guide provide a foundation for building robust database integrations that leverage the full power of AI-assisted data operations.
Ready to implement database integrations? Check out our server development guides and security best practices for production deployments.