Recently, I found myself in a situation where I needed to migrate data from MongoDB to Supabase (PostgreSQL). Initially, MongoDB was the quickest way to get up and running on a single VPS, it was simple, flexible, and did the job. However, as our project grew and we moved more processes to AWS, it became clear that a relational database would be more suitable for storing our metadata. Since we were already using Supabase for authentication, it made perfect sense to consolidate our data there.
In this tutorial, I’ll walk you through the entire migration process, sharing the approach that worked for me. We’ll cover both full database migration and single-table migration with custom field mapping.
Prerequisites
- Access to both MongoDB and Supabase databases
- Python 3.x installed on your system
- Basic understanding of both MongoDB and PostgreSQL
- Ubuntu/Linux environment (though the process can be adapted for other OS)
Important: Before proceeding with any migration, make sure to backup both your MongoDB and Supabase databases. I’ve covered the backup process in detail in these tutorials:
Step 1: Gathering Database Credentials
Before we start the migration, we need to get the connection URLs for both databases:
For Supabase:
- Log in to your Supabase dashboard (https://supabase.com/dashboard/projects)
- Select your project
- Click ‘Connect’ in the top bar
- Find your connection string (preferably Transaction pooler), should look something like this:
postgresql://postgres.xxxxxxxxxxxx:[YOUR-PASSWORD]@aws-0-us-east-1.pooler.supabase.com:6543/postgres
For MongoDB Atlas:
- Log into MongoDB Atlas
- Navigate to your cluster
- Click “Connect” then “Connect your application”
- You’ll get a URL that looks like this:
mongodb+srv://username:password@cluster0.xxxxx.mongodb.net/dbname?retryWrites=true&w=majority
For MongoDB (Self-hosted)
- SSH into your VPS instance
- Installing MongoDB Database Tools
- Setup remote access
All steps above are documented in “How to Create External Backup of Your Self-Hosted MongoDB Database“.
Step 2: Setting Up the Environment
First, let’s install the necessary dependencies on our Ubuntu machine:
sudo apt-get install python3-dev libpq-dev psycopg2-binary
Next, create and activate a Python virtual environment:
python3 -m venv venv
source venv/bin/activate
Install the required Python packages:
pip install pymongo psycopg2 psycopg2-binary tqdm
Step 3: Setting Up Connection URIs
We’ll use environment variables to store our database credentials securely. Create them like this:
# For Supabase
export SUPABASE_URI='postgresql://postgres:your_password@db.example.supabase.co:5432/postgres'
# For MongoDB
export MONGO_URI='mongodb://username:password@your-mongodb-host:27017/dbname?authSource=admin'
export MONGO_DB='your_database_name'
Note: Replace the placeholder values with your actual database credentials. Also, When using MongoDB connection URLs, special characters in usernames and passwords (like @, !, #, etc.) must be URL-encoded. For example, if your password is complex@123, it should be encoded as complex%40123 in the URL.
Step 4: Ensuring Proper Access
Before running the migration, we need to ensure our server can connect to both databases:
For MongoDB:
- If you’re using MongoDB Atlas, you’ll need to whitelist your server’s IP address from the dashboard.
- For self-hosted MongoDB, you’ll need to modify MongoDB config to allow external connections, and configure your firewall to allow incoming connections. You can find detailed instructions in our MongoDB backup tutorial under the “Setting Up Remote Access” section.
For Supabase:
The connection is usually straightforward with Supabase. Just make sure you’re using the correct connection URL. If you encounter connection errors, double-check that you’re not using the direct connection string, this can cause issues with our migration script.
Step 5: The Migration Scripts
I’ve prepared two different approaches for the migration:
- Full Database Migration: Migrates all collections and their data while maintaining proper type mapping
- Single Table Migration: Allows for specific table migration with custom field mapping and default values
Full Migration Script
from bson.decimal128 import Decimal128
import pymongo
import psycopg2
from psycopg2.extensions import AsIs
import json
from datetime import datetime
from psycopg2 import sql, extensions, connect, Error
from bson import ObjectId
import os
import logging
from tqdm import tqdm
from typing import List, Dict, Any
import itertools
# Set up logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler('migration.log'),
logging.StreamHandler()
]
)
# Configuration
BATCH_SIZE = 1000 # Adjust based on your data size and memory
MAX_RETRIES = 3
# Map MongoDB types to PostgreSQL types
DATABASE_TYPE_MAPPING = {
"str": "TEXT",
"ObjectId": "TEXT",
"datetime.datetime": "TIMESTAMP WITH TIME ZONE",
"datetime": "TIMESTAMP WITH TIME ZONE",
"int": "INT",
"list": "JSONB",
"dict": "JSONB",
"bool": "BOOLEAN",
"float": "NUMERIC",
"Decimal128": "NUMERIC",
"default": "TEXT",
"NoneType": "TEXT"
}
class CustomEncoder(json.JSONEncoder):
"""Custom JSON encoder to handle MongoDB specific types"""
def default(self, obj):
if isinstance(obj, ObjectId):
return str(obj)
if isinstance(obj, datetime):
return obj.isoformat()
if isinstance(obj, Decimal128):
return str(obj)
if isinstance(obj, complex):
return [obj.real, obj.imag]
return json.JSONEncoder.default(self, obj)
def prepare_schema(collection, pg_cur, collection_name: str) -> Dict[str, str]:
"""Analyze collection and prepare PostgreSQL schema"""
logging.info(f"Analyzing schema for {collection_name}")
schema = {}
sample_size = min(1000, collection.count_documents({}))
for doc in collection.aggregate([{"$sample": {"size": sample_size}}]):
for field, value in doc.items():
if field not in schema:
if isinstance(value, ObjectId):
schema[field] = DATABASE_TYPE_MAPPING["ObjectId"]
else:
schema[field] = DATABASE_TYPE_MAPPING.get(
type(value).__name__,
DATABASE_TYPE_MAPPING["default"]
)
# Create table with discovered schema
columns = [
sql.SQL("{} {}").format(
sql.Identifier(field),
sql.SQL(data_type)
)
for field, data_type in schema.items()
]
create_table_sql = sql.SQL("CREATE TABLE IF NOT EXISTS {} ({})").format(
sql.Identifier(collection_name),
sql.SQL(', ').join(columns)
)
pg_cur.execute(create_table_sql)
return schema
def process_value(value: Any) -> Any:
"""Process a value for PostgreSQL insertion"""
if isinstance(value, (ObjectId, Decimal128)):
return str(value)
elif isinstance(value, (list, dict)):
return json.dumps(value, cls=CustomEncoder)
return value
def batch_insert(pg_cur, collection_name: str, fields: List[str], batch_data: List[List]):
"""Insert a batch of records"""
if not batch_data:
return
insert_sql = sql.SQL("INSERT INTO {} ({}) VALUES {}").format(
sql.Identifier(collection_name),
sql.SQL(', ').join(map(sql.Identifier, fields)),
sql.SQL(', ').join([
sql.SQL('({})').format(sql.SQL(', ').join([sql.Placeholder()] * len(fields)))
for _ in batch_data
])
)
# Flatten the batch data for insertion
flat_values = list(itertools.chain(*batch_data))
for attempt in range(MAX_RETRIES):
try:
pg_cur.execute(insert_sql, flat_values)
break
except Error as e:
if attempt == MAX_RETRIES - 1:
logging.error(f"Failed to insert batch after {MAX_RETRIES} attempts: {str(e)}")
raise
logging.warning(f"Batch insert attempt {attempt + 1} failed: {str(e)}")
def migrate_collection(collection, pg_cur, collection_name: str):
"""Migrate a single collection to PostgreSQL using batch processing"""
try:
# Prepare schema and get field names
schema = prepare_schema(collection, pg_cur, collection_name)
fields = list(schema.keys())
# Get total documents for progress bar
total_docs = collection.count_documents({})
cursor = collection.find()
batch_data = []
processed = 0
with tqdm(total=total_docs, desc=f"Migrating {collection_name}") as pbar:
for document in cursor:
# Process document values
values = [process_value(document.get(field)) for field in fields]
batch_data.append(values)
# Insert batch when size reached
if len(batch_data) >= BATCH_SIZE:
batch_insert(pg_cur, collection_name, fields, batch_data)
processed += len(batch_data)
pbar.update(len(batch_data))
batch_data = []
# Insert remaining documents
if batch_data:
batch_insert(pg_cur, collection_name, fields, batch_data)
pbar.update(len(batch_data))
logging.info(f"Successfully migrated {processed} documents from {collection_name}")
except Exception as e:
logging.error(f"Error migrating collection {collection_name}: {str(e)}")
raise
def main():
try:
# Get connection details
mongo_url = os.getenv('MONGO_URI')
supabase_url = os.getenv('SUPABASE_URI')
mongo_db_manual = os.getenv('MONGO_DB')
if not all([mongo_url, supabase_url]):
raise EnvironmentError("Missing required environment variables")
# Setup connections
mongo_client = pymongo.MongoClient(mongo_url)
pg_conn = connect(supabase_url)
pg_conn.set_isolation_level(extensions.ISOLATION_LEVEL_AUTOCOMMIT)
pg_cur = pg_conn.cursor()
# Register Decimal128 adapter
psycopg2.extensions.register_adapter(Decimal128, lambda val: AsIs(str(val.to_decimal())))
# Get databases to migrate
mongo_db_names = [mongo_db_manual] if mongo_db_manual else mongo_client.list_database_names()
# Migrate each database
for db_name in mongo_db_names:
logging.info(f"Starting migration of database: {db_name}")
mongo_db = mongo_client[db_name]
for collection_name in mongo_db.list_collection_names():
if collection_name.startswith("system."):
continue
logging.info(f"Migrating collection: {collection_name}")
collection = mongo_db[collection_name]
migrate_collection(collection, pg_cur, collection_name)
logging.info(f"Completed migration of database: {db_name}")
except Exception as e:
logging.error(f"Migration failed: {str(e)}")
raise
finally:
# Clean up connections
if 'pg_cur' in locals():
pg_cur.close()
if 'pg_conn' in locals():
pg_conn.close()
if 'mongo_client' in locals():
mongo_client.close()
if __name__ == "__main__":
main()
Single Table Migration with Field Mapping
from bson.decimal128 import Decimal128
import pymongo
import psycopg2
from psycopg2.extensions import AsIs
import json
from datetime import datetime
from psycopg2 import sql, extensions, connect, Error
from bson import ObjectId
import os
import logging
from tqdm import tqdm
from typing import List, Dict, Any
import itertools
# Set up logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler('table_migration.log'),
logging.StreamHandler()
]
)
# Configuration
BATCH_SIZE = 1000 # Adjust based on your data size and memory
MAX_RETRIES = 3
# Mapping MongoDB types to PostgreSQL types
DATABASE_TYPE_MAPPING = {
"str": "TEXT",
"ObjectId": "TEXT",
"datetime.datetime": "TIMESTAMP WITH TIME ZONE",
"datetime": "TIMESTAMP WITH TIME ZONE",
"int": "INT",
"list": "JSONB",
"dict": "JSONB",
"bool": "BOOLEAN",
"float": "NUMERIC",
"Decimal128": "NUMERIC",
"default": "TEXT",
"NoneType": "TEXT"
}
# Map MongoDB types to PostgreSQL types
class CustomEncoder(json.JSONEncoder):
"""Custom JSON encoder to handle MongoDB specific types"""
def default(self, obj):
if isinstance(obj, ObjectId):
return str(obj)
if isinstance(obj, datetime):
return obj.isoformat()
if isinstance(obj, Decimal128):
return str(obj)
if isinstance(obj, complex):
return [obj.real, obj.imag]
return json.JSONEncoder.default(self, obj)
# Configuration for table migration
TABLE_CONFIG = {
"source_collection": "your_mongo_collection", # Required
"target_table": "your_postgres_table", # Required
"field_mapping": { # Optional
"mongo_field1": "pg_field1",
"mongo_field2": "pg_field2",
"nested.field": "flat_field",
},
"default_values": { # Optional
"new_field1": "default_value",
"new_field2": 0,
"timestamp_field": "CURRENT_TIMESTAMP"
},
"key_fields": [], # List of fields to check for duplicates
"type_mapping": {} # Optional - will be auto-detected if empty
}
def get_nested_value(obj: Dict, path: str) -> Any:
"""Get value from nested dictionary using dot notation"""
for key in path.split('.'):
if isinstance(obj, dict):
obj = obj.get(key)
else:
return None
return obj
def analyze_field_types(collection, field_mapping: Dict) -> Dict[str, str]:
"""Analyze and determine field types from sample documents"""
schema = {}
sample_size = min(1000, collection.count_documents({}))
# Create reverse mapping for field names
reverse_mapping = {v: k for k, v in field_mapping.items()} if field_mapping else {}
for doc in collection.aggregate([{"$sample": {"size": sample_size}}]):
if field_mapping:
# Process mapped fields
for mongo_field, pg_field in field_mapping.items():
value = get_nested_value(doc, mongo_field)
if value is not None and pg_field not in schema:
if isinstance(value, ObjectId):
schema[pg_field] = SQL_DATA_TYPE["ObjectId"]
else:
schema[pg_field] = SQL_DATA_TYPE.get(
type(value).__name__,
SQL_DATA_TYPE["default"]
)
else:
# Process all fields if no mapping specified
for field, value in doc.items():
if field not in schema:
if isinstance(value, ObjectId):
schema[field] = SQL_DATA_TYPE["ObjectId"]
else:
schema[field] = SQL_DATA_TYPE.get(
type(value).__name__,
SQL_DATA_TYPE["default"]
)
return schema
def create_table_schema(pg_cur, table_name: str, schema: Dict[str, str], default_values: Dict[str, Any]):
"""Create PostgreSQL table with defined schema"""
# Add default value fields to schema if not present
for field, value in default_values.items():
if field not in schema:
if value == "CURRENT_TIMESTAMP":
schema[field] = "TIMESTAMP WITH TIME ZONE"
elif isinstance(value, int):
schema[field] = "INTEGER"
elif isinstance(value, float):
schema[field] = "NUMERIC"
elif isinstance(value, bool):
schema[field] = "BOOLEAN"
else:
schema[field] = "TEXT"
columns = []
for field, field_type in schema.items():
columns.append(sql.SQL("{} {}").format(
sql.Identifier(field),
sql.SQL(field_type)
))
create_table_sql = sql.SQL("CREATE TABLE IF NOT EXISTS {} ({})").format(
sql.Identifier(table_name),
sql.SQL(', ').join(columns)
)
try:
pg_cur.execute(create_table_sql)
except Error as e:
logging.error(f"Error creating table schema: {str(e)}")
raise
def process_document(document: Dict, field_mapping: Dict, default_values: Dict) -> Dict:
"""Process MongoDB document according to field mapping and default values"""
processed = {}
# Process mapped fields
if field_mapping:
for mongo_field, pg_field in field_mapping.items():
value = get_nested_value(document, mongo_field)
if value is not None:
processed[pg_field] = process_value(value)
else:
# If no mapping specified, process all fields
for field, value in document.items():
processed[field] = process_value(value)
# Add default values
for field, value in default_values.items():
if field not in processed:
if value == "CURRENT_TIMESTAMP":
processed[field] = datetime.now()
else:
processed[field] = value
return processed
def process_value(value: Any) -> Any:
"""Process a value for PostgreSQL insertion"""
if isinstance(value, (ObjectId, Decimal128)):
return str(value)
elif isinstance(value, (list, dict)):
return json.dumps(value, cls=CustomEncoder)
return value
def check_existing_records(pg_cur, table_name: str, key_fields: List[str], batch_data: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
"""Check for existing records based on key fields"""
if not key_fields or not batch_data:
return batch_data
# Prepare the WHERE clause for checking duplicates
conditions = []
values = []
for record in batch_data:
record_conditions = []
for field in key_fields:
record_conditions.append(f"{field} = %s")
values.append(record[field])
conditions.append(f"({' AND '.join(record_conditions)})")
query = sql.SQL("SELECT {} FROM {} WHERE {}").format(
sql.SQL(', ').join(map(sql.Identifier, key_fields)),
sql.Identifier(table_name),
sql.SQL(' OR ').join(map(sql.SQL, conditions))
)
pg_cur.execute(query, values)
existing_records = pg_cur.fetchall()
if not existing_records:
return batch_data
# Create a set of existing key combinations
existing_keys = {tuple(record) for record in existing_records}
# Filter out records that already exist
filtered_batch = []
for record in batch_data:
record_key = tuple(record[field] for field in key_fields)
if record_key not in existing_keys:
filtered_batch.append(record)
return filtered_batch
def batch_insert(pg_cur, table_name: str, fields: List[str], batch_data: List[Dict[str, Any]], key_fields: List[str] = None):
"""Insert a batch of records"""
if not batch_data:
return
# Check for duplicates if key_fields are specified
if key_fields:
batch_data = check_existing_records(pg_cur, table_name, key_fields, batch_data)
if not batch_data:
return
insert_sql = sql.SQL("INSERT INTO {} ({}) VALUES {}").format(
sql.Identifier(table_name),
sql.SQL(', ').join(map(sql.Identifier, fields)),
sql.SQL(', ').join([
sql.SQL('({})').format(sql.SQL(', ').join([sql.Placeholder()] * len(fields)))
for _ in batch_data
])
)
flat_values = list(itertools.chain(*batch_data))
for attempt in range(MAX_RETRIES):
try:
pg_cur.execute(insert_sql, flat_values)
break
except Error as e:
if attempt == MAX_RETRIES - 1:
logging.error(f"Failed to insert batch after {MAX_RETRIES} attempts: {str(e)}")
raise
logging.warning(f"Batch insert attempt {attempt + 1} failed: {str(e)}")
def migrate_table(mongo_client, pg_conn, config: Dict):
"""Migrate a single table with custom mapping and duplicate checking"""
try:
pg_cur = pg_conn.cursor()
# Validate required config
if not config.get('source_collection') or not config.get('target_table'):
raise ValueError("source_collection and target_table are required in TABLE_CONFIG")
# Get MongoDB collection
mongo_db = mongo_client[os.getenv('MONGO_DB')]
collection = mongo_db[config['source_collection']]
# Get configuration
field_mapping = config.get('field_mapping', {})
default_values = config.get('default_values', {})
key_fields = config.get('key_fields', [])
# Get schema - either from type_mapping or analyze it
schema = config.get('type_mapping', {})
if not schema:
schema = analyze_field_types(collection, field_mapping)
# Create table with schema
create_table_schema(pg_cur, config['target_table'], schema, default_values)
# Prepare for batch processing
batch_data = []
total_docs = collection.count_documents({})
cursor = collection.find()
schema_fields = list(schema.keys())
with tqdm(total=total_docs, desc=f"Migrating {config['source_collection']}") as pbar:
for document in cursor:
processed_doc = process_document(document, field_mapping, default_values)
# Create a dictionary for the processed document
doc_dict = {field: processed_doc.get(field) for field in schema_fields}
batch_data.append(doc_dict)
if len(batch_data) >= BATCH_SIZE:
# Batch insert with duplicate checking
filtered_batch = check_existing_records(pg_cur, config['target_table'], key_fields, batch_data) if key_fields else batch_data
if filtered_batch:
values = [[doc[field] for field in schema_fields] for doc in filtered_batch]
if values:
insert_sql = sql.SQL("INSERT INTO {} ({}) VALUES {}").format(
sql.Identifier(config['target_table']),
sql.SQL(', ').join(map(sql.Identifier, schema_fields)),
sql.SQL(', ').join([
sql.SQL('({})').format(sql.SQL(', ').join([sql.Placeholder()] * len(schema_fields)))
for _ in values
])
)
flat_values = list(itertools.chain(*values))
for attempt in range(MAX_RETRIES):
try:
pg_cur.execute(insert_sql, flat_values)
break
except Error as e:
if attempt == MAX_RETRIES - 1:
logging.error(f"Failed to insert batch after {MAX_RETRIES} attempts: {str(e)}")
raise
logging.warning(f"Batch insert attempt {attempt + 1} failed: {str(e)}")
skipped = len(batch_data) - (len(filtered_batch) if filtered_batch else 0)
if skipped > 0:
logging.info(f"Skipped {skipped} duplicate records")
pbar.update(len(batch_data))
batch_data = []
# Insert remaining documents
if batch_data:
filtered_batch = check_existing_records(pg_cur, config['target_table'], key_fields, batch_data) if key_fields else batch_data
if filtered_batch:
values = [[doc[field] for field in schema_fields] for doc in filtered_batch]
if values:
insert_sql = sql.SQL("INSERT INTO {} ({}) VALUES {}").format(
sql.Identifier(config['target_table']),
sql.SQL(', ').join(map(sql.Identifier, schema_fields)),
sql.SQL(', ').join([
sql.SQL('({})').format(sql.SQL(', ').join([sql.Placeholder()] * len(schema_fields)))
for _ in values
])
)
flat_values = list(itertools.chain(*values))
pg_cur.execute(insert_sql, flat_values)
skipped = len(batch_data) - (len(filtered_batch) if filtered_batch else 0)
if skipped > 0:
logging.info(f"Skipped {skipped} duplicate records")
pbar.update(len(batch_data))
logging.info(f"Migration completed for table {config['target_table']}")
except Exception as e:
logging.error(f"Error in table migration: {str(e)}")
raise
finally:
if 'pg_cur' in locals():
pg_cur.close()
def main():
try:
# Get connection details
mongo_url = os.getenv('MONGO_URI')
supabase_url = os.getenv('SUPABASE_URI')
if not all([mongo_url, supabase_url]):
raise EnvironmentError("Missing required environment variables")
# Setup connections
mongo_client = pymongo.MongoClient(mongo_url)
pg_conn = connect(supabase_url)
pg_conn.set_isolation_level(extensions.ISOLATION_LEVEL_AUTOCOMMIT)
# Register Decimal128 adapter
psycopg2.extensions.register_adapter(Decimal128, lambda val: AsIs(str(val.to_decimal())))
# Perform migration
migrate_table(mongo_client, pg_conn, TABLE_CONFIG)
logging.info("Migration completed successfully")
except Exception as e:
logging.error(f"Migration failed: {str(e)}")
raise
finally:
# Clean up connections
if 'pg_conn' in locals():
pg_conn.close()
if 'mongo_client' in locals():
mongo_client.close()
if __name__ == "__main__":
main()
Important Note: Before running the table migration script, make sure to configure the TABLE_CONFIG
at the top of the script.
At minimum, you need:
source_collection
: The name of your MongoDB collection-
target_table
: The name of your new PostgreSQL table
Optional settings include:
field_mapping
: To rename fields or map nested fields (e.g.,{"old_name": "new_name"}
)default_values
: Add new columns with default valuestype_mapping
: Leave empty for automatic type detectionkey_fields
: List of fields used to check for duplicates, records will be skipped if all specified fields match existing records (e.g.,["email", "user_id"]
to prevent duplicate user records)
Example:
TABLE_CONFIG = {
"source_collection": "users",
"target_table": "app_users",
"field_mapping": {
"name": "full_name",
"profile.age": "age"
}
}
Step 6: Running the Migration
Save your chosen script in a file (e.g., migrate.py
) and run it:
python migrate.py
Note: For large datasets, the migration might take some time.
Important Considerations
- Always test the migration with a small subset of data first
- Verify data integrity after migration
- Keep your MongoDB running until you’ve thoroughly tested the migrated data
- Consider running the migration during off-peak hours if you’re dealing with a production database
That’s it! You now have two ways to migrate your data, either the full migration for moving everything at once, or the table-by-table approach when you need more control. I personally used the table migration because I needed to reshape data and add new fields during the migration.