How to Migrate Data from MongoDB to Supabase

Recently, I found myself in a situation where I needed to migrate data from MongoDB to Supabase (PostgreSQL). Initially, MongoDB was the quickest way to get up and running on a single VPS, it was simple, flexible, and did the job. However, as our project grew and we moved more processes to AWS, it became clear that a relational database would be more suitable for storing our metadata. Since we were already using Supabase for authentication, it made perfect sense to consolidate our data there.

In this tutorial, I’ll walk you through the entire migration process, sharing the approach that worked for me. We’ll cover both full database migration and single-table migration with custom field mapping.

Prerequisites

  • Access to both MongoDB and Supabase databases
  • Python 3.x installed on your system
  • Basic understanding of both MongoDB and PostgreSQL
  • Ubuntu/Linux environment (though the process can be adapted for other OS)

Important: Before proceeding with any migration, make sure to backup both your MongoDB and Supabase databases. I’ve covered the backup process in detail in these tutorials:

Step 1: Gathering Database Credentials

Before we start the migration, we need to get the connection URLs for both databases:

For Supabase:

  • Log in to your Supabase dashboard (https://supabase.com/dashboard/projects)
  • Select your project
  • Click ‘Connect’ in the top bar
  • Find your connection string (preferably Transaction pooler), should look something like this:
postgresql://postgres.xxxxxxxxxxxx:[YOUR-PASSWORD]@aws-0-us-east-1.pooler.supabase.com:6543/postgres

For MongoDB Atlas:

  • Log into MongoDB Atlas
  • Navigate to your cluster
  • Click “Connect” then “Connect your application”
  • You’ll get a URL that looks like this:
mongodb+srv://username:password@cluster0.xxxxx.mongodb.net/dbname?retryWrites=true&w=majority

For MongoDB (Self-hosted)

  • SSH into your VPS instance
  • Installing MongoDB Database Tools
  • Setup remote access

All steps above are documented in “How to Create External Backup of Your Self-Hosted MongoDB Database“.

Step 2: Setting Up the Environment

First, let’s install the necessary dependencies on our Ubuntu machine:

sudo apt-get install python3-dev libpq-dev psycopg2-binary

Next, create and activate a Python virtual environment:

python3 -m venv venv
source venv/bin/activate

Install the required Python packages:

pip install pymongo psycopg2 psycopg2-binary tqdm

Step 3: Setting Up Connection URIs

We’ll use environment variables to store our database credentials securely. Create them like this:

# For Supabase
export SUPABASE_URI='postgresql://postgres:your_password@db.example.supabase.co:5432/postgres'

# For MongoDB
export MONGO_URI='mongodb://username:password@your-mongodb-host:27017/dbname?authSource=admin'
export MONGO_DB='your_database_name'

Note: Replace the placeholder values with your actual database credentials. Also, When using MongoDB connection URLs, special characters in usernames and passwords (like @, !, #, etc.) must be URL-encoded. For example, if your password is complex@123, it should be encoded as complex%40123 in the URL.

Step 4: Ensuring Proper Access

Before running the migration, we need to ensure our server can connect to both databases:

For MongoDB:

  • If you’re using MongoDB Atlas, you’ll need to whitelist your server’s IP address from the dashboard.
  • For self-hosted MongoDB, you’ll need to modify MongoDB config to allow external connections, and configure your firewall to allow incoming connections. You can find detailed instructions in our MongoDB backup tutorial under the “Setting Up Remote Access” section.

For Supabase:

The connection is usually straightforward with Supabase. Just make sure you’re using the correct connection URL. If you encounter connection errors, double-check that you’re not using the direct connection string, this can cause issues with our migration script.

Step 5: The Migration Scripts

I’ve prepared two different approaches for the migration:

  • Full Database Migration: Migrates all collections and their data while maintaining proper type mapping
  • Single Table Migration: Allows for specific table migration with custom field mapping and default values

Full Migration Script

from bson.decimal128 import Decimal128
import pymongo
import psycopg2
from psycopg2.extensions import AsIs
import json
from datetime import datetime
from psycopg2 import sql, extensions, connect, Error
from bson import ObjectId
import os
import logging
from tqdm import tqdm
from typing import List, Dict, Any
import itertools

# Set up logging
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(levelname)s - %(message)s',
    handlers=[
        logging.FileHandler('migration.log'),
        logging.StreamHandler()
    ]
)

# Configuration
BATCH_SIZE = 1000  # Adjust based on your data size and memory
MAX_RETRIES = 3

# Map MongoDB types to PostgreSQL types
DATABASE_TYPE_MAPPING = {
    "str": "TEXT",
    "ObjectId": "TEXT",
    "datetime.datetime": "TIMESTAMP WITH TIME ZONE",
    "datetime": "TIMESTAMP WITH TIME ZONE",
    "int": "INT",
    "list": "JSONB",
    "dict": "JSONB",
    "bool": "BOOLEAN",
    "float": "NUMERIC",
    "Decimal128": "NUMERIC",
    "default": "TEXT",
    "NoneType": "TEXT"
}

class CustomEncoder(json.JSONEncoder):
    """Custom JSON encoder to handle MongoDB specific types"""
    def default(self, obj):
        if isinstance(obj, ObjectId):
            return str(obj)
        if isinstance(obj, datetime):
            return obj.isoformat()
        if isinstance(obj, Decimal128):
            return str(obj)
        if isinstance(obj, complex):
            return [obj.real, obj.imag]
        return json.JSONEncoder.default(self, obj)


def prepare_schema(collection, pg_cur, collection_name: str) -> Dict[str, str]:
    """Analyze collection and prepare PostgreSQL schema"""
    logging.info(f"Analyzing schema for {collection_name}")
    schema = {}
    sample_size = min(1000, collection.count_documents({}))
    
    for doc in collection.aggregate([{"$sample": {"size": sample_size}}]):
        for field, value in doc.items():
            if field not in schema:
                if isinstance(value, ObjectId):
                    schema[field] = DATABASE_TYPE_MAPPING["ObjectId"]
                else:
                    schema[field] = DATABASE_TYPE_MAPPING.get(
                        type(value).__name__, 
                        DATABASE_TYPE_MAPPING["default"]
                    )

    # Create table with discovered schema
    columns = [
        sql.SQL("{} {}").format(
            sql.Identifier(field),
            sql.SQL(data_type)
        )
        for field, data_type in schema.items()
    ]
    
    create_table_sql = sql.SQL("CREATE TABLE IF NOT EXISTS {} ({})").format(
        sql.Identifier(collection_name),
        sql.SQL(', ').join(columns)
    )
    
    pg_cur.execute(create_table_sql)
    return schema

def process_value(value: Any) -> Any:
    """Process a value for PostgreSQL insertion"""
    if isinstance(value, (ObjectId, Decimal128)):
        return str(value)
    elif isinstance(value, (list, dict)):
        return json.dumps(value, cls=CustomEncoder)
    return value

def batch_insert(pg_cur, collection_name: str, fields: List[str], batch_data: List[List]):
    """Insert a batch of records"""
    if not batch_data:
        return

    insert_sql = sql.SQL("INSERT INTO {} ({}) VALUES {}").format(
        sql.Identifier(collection_name),
        sql.SQL(', ').join(map(sql.Identifier, fields)),
        sql.SQL(', ').join([
            sql.SQL('({})').format(sql.SQL(', ').join([sql.Placeholder()] * len(fields)))
            for _ in batch_data
        ])
    )
    
    # Flatten the batch data for insertion
    flat_values = list(itertools.chain(*batch_data))
    
    for attempt in range(MAX_RETRIES):
        try:
            pg_cur.execute(insert_sql, flat_values)
            break
        except Error as e:
            if attempt == MAX_RETRIES - 1:
                logging.error(f"Failed to insert batch after {MAX_RETRIES} attempts: {str(e)}")
                raise
            logging.warning(f"Batch insert attempt {attempt + 1} failed: {str(e)}")

def migrate_collection(collection, pg_cur, collection_name: str):
    """Migrate a single collection to PostgreSQL using batch processing"""
    try:
        # Prepare schema and get field names
        schema = prepare_schema(collection, pg_cur, collection_name)
        fields = list(schema.keys())
        
        # Get total documents for progress bar
        total_docs = collection.count_documents({})
        cursor = collection.find()
        
        batch_data = []
        processed = 0
        
        with tqdm(total=total_docs, desc=f"Migrating {collection_name}") as pbar:
            for document in cursor:
                # Process document values
                values = [process_value(document.get(field)) for field in fields]
                batch_data.append(values)
                
                # Insert batch when size reached
                if len(batch_data) >= BATCH_SIZE:
                    batch_insert(pg_cur, collection_name, fields, batch_data)
                    processed += len(batch_data)
                    pbar.update(len(batch_data))
                    batch_data = []
            
            # Insert remaining documents
            if batch_data:
                batch_insert(pg_cur, collection_name, fields, batch_data)
                pbar.update(len(batch_data))
        
        logging.info(f"Successfully migrated {processed} documents from {collection_name}")
        
    except Exception as e:
        logging.error(f"Error migrating collection {collection_name}: {str(e)}")
        raise

def main():
    try:
        # Get connection details
        mongo_url = os.getenv('MONGO_URI')
        supabase_url = os.getenv('SUPABASE_URI')
        mongo_db_manual = os.getenv('MONGO_DB')
        
        if not all([mongo_url, supabase_url]):
            raise EnvironmentError("Missing required environment variables")
        
        # Setup connections
        mongo_client = pymongo.MongoClient(mongo_url)
        pg_conn = connect(supabase_url)
        pg_conn.set_isolation_level(extensions.ISOLATION_LEVEL_AUTOCOMMIT)
        pg_cur = pg_conn.cursor()
        
        # Register Decimal128 adapter
        psycopg2.extensions.register_adapter(Decimal128, lambda val: AsIs(str(val.to_decimal())))
        
        # Get databases to migrate
        mongo_db_names = [mongo_db_manual] if mongo_db_manual else mongo_client.list_database_names()
        
        # Migrate each database
        for db_name in mongo_db_names:
            logging.info(f"Starting migration of database: {db_name}")
            mongo_db = mongo_client[db_name]
            
            for collection_name in mongo_db.list_collection_names():
                if collection_name.startswith("system."):
                    continue
                
                logging.info(f"Migrating collection: {collection_name}")
                collection = mongo_db[collection_name]
                migrate_collection(collection, pg_cur, collection_name)
                
            logging.info(f"Completed migration of database: {db_name}")
            
    except Exception as e:
        logging.error(f"Migration failed: {str(e)}")
        raise
    finally:
        # Clean up connections
        if 'pg_cur' in locals():
            pg_cur.close()
        if 'pg_conn' in locals():
            pg_conn.close()
        if 'mongo_client' in locals():
            mongo_client.close()

if __name__ == "__main__":
    main()

Single Table Migration with Field Mapping

from bson.decimal128 import Decimal128
import pymongo
import psycopg2
from psycopg2.extensions import AsIs
import json
from datetime import datetime
from psycopg2 import sql, extensions, connect, Error
from bson import ObjectId
import os
import logging
from tqdm import tqdm
from typing import List, Dict, Any
import itertools

# Set up logging
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(levelname)s - %(message)s',
    handlers=[
        logging.FileHandler('table_migration.log'),
        logging.StreamHandler()
    ]
)

# Configuration
BATCH_SIZE = 1000  # Adjust based on your data size and memory
MAX_RETRIES = 3
# Mapping MongoDB types to PostgreSQL types
DATABASE_TYPE_MAPPING = {
    "str": "TEXT",
    "ObjectId": "TEXT",
    "datetime.datetime": "TIMESTAMP WITH TIME ZONE",
    "datetime": "TIMESTAMP WITH TIME ZONE",
    "int": "INT",
    "list": "JSONB",
    "dict": "JSONB",
    "bool": "BOOLEAN",
    "float": "NUMERIC",
    "Decimal128": "NUMERIC",
    "default": "TEXT",
    "NoneType": "TEXT"
}

# Map MongoDB types to PostgreSQL types
class CustomEncoder(json.JSONEncoder):
    """Custom JSON encoder to handle MongoDB specific types"""
    def default(self, obj):
        if isinstance(obj, ObjectId):
            return str(obj)
        if isinstance(obj, datetime):
            return obj.isoformat()
        if isinstance(obj, Decimal128):
            return str(obj)
        if isinstance(obj, complex):
            return [obj.real, obj.imag]
        return json.JSONEncoder.default(self, obj)

# Configuration for table migration
TABLE_CONFIG = {
    "source_collection": "your_mongo_collection",  # Required
    "target_table": "your_postgres_table",        # Required
    "field_mapping": {                           # Optional
        "mongo_field1": "pg_field1",
        "mongo_field2": "pg_field2",
        "nested.field": "flat_field",
    },
    "default_values": {                          # Optional
        "new_field1": "default_value",
        "new_field2": 0,
        "timestamp_field": "CURRENT_TIMESTAMP"
    },
    "key_fields": [],                            # List of fields to check for duplicates
    "type_mapping": {}                           # Optional - will be auto-detected if empty
}


def get_nested_value(obj: Dict, path: str) -> Any:
    """Get value from nested dictionary using dot notation"""
    for key in path.split('.'):
        if isinstance(obj, dict):
            obj = obj.get(key)
        else:
            return None
    return obj

def analyze_field_types(collection, field_mapping: Dict) -> Dict[str, str]:
    """Analyze and determine field types from sample documents"""
    schema = {}
    sample_size = min(1000, collection.count_documents({}))
    
    # Create reverse mapping for field names
    reverse_mapping = {v: k for k, v in field_mapping.items()} if field_mapping else {}
    
    for doc in collection.aggregate([{"$sample": {"size": sample_size}}]):
        if field_mapping:
            # Process mapped fields
            for mongo_field, pg_field in field_mapping.items():
                value = get_nested_value(doc, mongo_field)
                if value is not None and pg_field not in schema:
                    if isinstance(value, ObjectId):
                        schema[pg_field] = SQL_DATA_TYPE["ObjectId"]
                    else:
                        schema[pg_field] = SQL_DATA_TYPE.get(
                            type(value).__name__, 
                            SQL_DATA_TYPE["default"]
                        )
        else:
            # Process all fields if no mapping specified
            for field, value in doc.items():
                if field not in schema:
                    if isinstance(value, ObjectId):
                        schema[field] = SQL_DATA_TYPE["ObjectId"]
                    else:
                        schema[field] = SQL_DATA_TYPE.get(
                            type(value).__name__, 
                            SQL_DATA_TYPE["default"]
                        )
    
    return schema

def create_table_schema(pg_cur, table_name: str, schema: Dict[str, str], default_values: Dict[str, Any]):
    """Create PostgreSQL table with defined schema"""
    # Add default value fields to schema if not present
    for field, value in default_values.items():
        if field not in schema:
            if value == "CURRENT_TIMESTAMP":
                schema[field] = "TIMESTAMP WITH TIME ZONE"
            elif isinstance(value, int):
                schema[field] = "INTEGER"
            elif isinstance(value, float):
                schema[field] = "NUMERIC"
            elif isinstance(value, bool):
                schema[field] = "BOOLEAN"
            else:
                schema[field] = "TEXT"

    columns = []
    for field, field_type in schema.items():
        columns.append(sql.SQL("{} {}").format(
            sql.Identifier(field),
            sql.SQL(field_type)
        ))
    
    create_table_sql = sql.SQL("CREATE TABLE IF NOT EXISTS {} ({})").format(
        sql.Identifier(table_name),
        sql.SQL(', ').join(columns)
    )
    
    try:
        pg_cur.execute(create_table_sql)
    except Error as e:
        logging.error(f"Error creating table schema: {str(e)}")
        raise

def process_document(document: Dict, field_mapping: Dict, default_values: Dict) -> Dict:
    """Process MongoDB document according to field mapping and default values"""
    processed = {}
    
    # Process mapped fields
    if field_mapping:
        for mongo_field, pg_field in field_mapping.items():
            value = get_nested_value(document, mongo_field)
            if value is not None:
                processed[pg_field] = process_value(value)
    else:
        # If no mapping specified, process all fields
        for field, value in document.items():
            processed[field] = process_value(value)
    
    # Add default values
    for field, value in default_values.items():
        if field not in processed:
            if value == "CURRENT_TIMESTAMP":
                processed[field] = datetime.now()
            else:
                processed[field] = value
    
    return processed

def process_value(value: Any) -> Any:
    """Process a value for PostgreSQL insertion"""
    if isinstance(value, (ObjectId, Decimal128)):
        return str(value)
    elif isinstance(value, (list, dict)):
        return json.dumps(value, cls=CustomEncoder)
    return value

def check_existing_records(pg_cur, table_name: str, key_fields: List[str], batch_data: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
    """Check for existing records based on key fields"""
    if not key_fields or not batch_data:
        return batch_data

    # Prepare the WHERE clause for checking duplicates
    conditions = []
    values = []
    for record in batch_data:
        record_conditions = []
        for field in key_fields:
            record_conditions.append(f"{field} = %s")
            values.append(record[field])
        conditions.append(f"({' AND '.join(record_conditions)})")

    query = sql.SQL("SELECT {} FROM {} WHERE {}").format(
        sql.SQL(', ').join(map(sql.Identifier, key_fields)),
        sql.Identifier(table_name),
        sql.SQL(' OR ').join(map(sql.SQL, conditions))
    )

    pg_cur.execute(query, values)
    existing_records = pg_cur.fetchall()

    if not existing_records:
        return batch_data

    # Create a set of existing key combinations
    existing_keys = {tuple(record) for record in existing_records}

    # Filter out records that already exist
    filtered_batch = []
    for record in batch_data:
        record_key = tuple(record[field] for field in key_fields)
        if record_key not in existing_keys:
            filtered_batch.append(record)

    return filtered_batch

def batch_insert(pg_cur, table_name: str, fields: List[str], batch_data: List[Dict[str, Any]], key_fields: List[str] = None):
    """Insert a batch of records"""
    if not batch_data:
        return
        
    # Check for duplicates if key_fields are specified
    if key_fields:
        batch_data = check_existing_records(pg_cur, table_name, key_fields, batch_data)
        if not batch_data:
            return

    insert_sql = sql.SQL("INSERT INTO {} ({}) VALUES {}").format(
        sql.Identifier(table_name),
        sql.SQL(', ').join(map(sql.Identifier, fields)),
        sql.SQL(', ').join([
            sql.SQL('({})').format(sql.SQL(', ').join([sql.Placeholder()] * len(fields)))
            for _ in batch_data
        ])
    )
    
    flat_values = list(itertools.chain(*batch_data))
    
    for attempt in range(MAX_RETRIES):
        try:
            pg_cur.execute(insert_sql, flat_values)
            break
        except Error as e:
            if attempt == MAX_RETRIES - 1:
                logging.error(f"Failed to insert batch after {MAX_RETRIES} attempts: {str(e)}")
                raise
            logging.warning(f"Batch insert attempt {attempt + 1} failed: {str(e)}")

def migrate_table(mongo_client, pg_conn, config: Dict):
    """Migrate a single table with custom mapping and duplicate checking"""
    try:
        pg_cur = pg_conn.cursor()
        
        # Validate required config
        if not config.get('source_collection') or not config.get('target_table'):
            raise ValueError("source_collection and target_table are required in TABLE_CONFIG")
        
        # Get MongoDB collection
        mongo_db = mongo_client[os.getenv('MONGO_DB')]
        collection = mongo_db[config['source_collection']]
        
        # Get configuration
        field_mapping = config.get('field_mapping', {})
        default_values = config.get('default_values', {})
        key_fields = config.get('key_fields', [])
        
        # Get schema - either from type_mapping or analyze it
        schema = config.get('type_mapping', {})
        if not schema:
            schema = analyze_field_types(collection, field_mapping)
        
        # Create table with schema
        create_table_schema(pg_cur, config['target_table'], schema, default_values)
        
        # Prepare for batch processing
        batch_data = []
        total_docs = collection.count_documents({})
        cursor = collection.find()
        schema_fields = list(schema.keys())
        
        with tqdm(total=total_docs, desc=f"Migrating {config['source_collection']}") as pbar:
            for document in cursor:
                processed_doc = process_document(document, field_mapping, default_values)
                
                # Create a dictionary for the processed document
                doc_dict = {field: processed_doc.get(field) for field in schema_fields}
                batch_data.append(doc_dict)
                
                if len(batch_data) >= BATCH_SIZE:
                    # Batch insert with duplicate checking
                    filtered_batch = check_existing_records(pg_cur, config['target_table'], key_fields, batch_data) if key_fields else batch_data
                    if filtered_batch:
                        values = [[doc[field] for field in schema_fields] for doc in filtered_batch]
                        if values:
                            insert_sql = sql.SQL("INSERT INTO {} ({}) VALUES {}").format(
                                sql.Identifier(config['target_table']),
                                sql.SQL(', ').join(map(sql.Identifier, schema_fields)),
                                sql.SQL(', ').join([
                                    sql.SQL('({})').format(sql.SQL(', ').join([sql.Placeholder()] * len(schema_fields)))
                                    for _ in values
                                ])
                            )
                            flat_values = list(itertools.chain(*values))
                            for attempt in range(MAX_RETRIES):
                                try:
                                    pg_cur.execute(insert_sql, flat_values)
                                    break
                                except Error as e:
                                    if attempt == MAX_RETRIES - 1:
                                        logging.error(f"Failed to insert batch after {MAX_RETRIES} attempts: {str(e)}")
                                        raise
                                    logging.warning(f"Batch insert attempt {attempt + 1} failed: {str(e)}")
                    
                    skipped = len(batch_data) - (len(filtered_batch) if filtered_batch else 0)
                    if skipped > 0:
                        logging.info(f"Skipped {skipped} duplicate records")
                    
                    pbar.update(len(batch_data))
                    batch_data = []
            
            # Insert remaining documents
            if batch_data:
                filtered_batch = check_existing_records(pg_cur, config['target_table'], key_fields, batch_data) if key_fields else batch_data
                if filtered_batch:
                    values = [[doc[field] for field in schema_fields] for doc in filtered_batch]
                    if values:
                        insert_sql = sql.SQL("INSERT INTO {} ({}) VALUES {}").format(
                            sql.Identifier(config['target_table']),
                            sql.SQL(', ').join(map(sql.Identifier, schema_fields)),
                            sql.SQL(', ').join([
                                sql.SQL('({})').format(sql.SQL(', ').join([sql.Placeholder()] * len(schema_fields)))
                                for _ in values
                            ])
                        )
                        flat_values = list(itertools.chain(*values))
                        pg_cur.execute(insert_sql, flat_values)
                
                skipped = len(batch_data) - (len(filtered_batch) if filtered_batch else 0)
                if skipped > 0:
                    logging.info(f"Skipped {skipped} duplicate records")
                
                pbar.update(len(batch_data))
        
        logging.info(f"Migration completed for table {config['target_table']}")
        
    except Exception as e:
        logging.error(f"Error in table migration: {str(e)}")
        raise
    finally:
        if 'pg_cur' in locals():
            pg_cur.close()

def main():
    try:
        # Get connection details
        mongo_url = os.getenv('MONGO_URI')
        supabase_url = os.getenv('SUPABASE_URI')
        
        if not all([mongo_url, supabase_url]):
            raise EnvironmentError("Missing required environment variables")
        
        # Setup connections
        mongo_client = pymongo.MongoClient(mongo_url)
        pg_conn = connect(supabase_url)
        pg_conn.set_isolation_level(extensions.ISOLATION_LEVEL_AUTOCOMMIT)
        
        # Register Decimal128 adapter
        psycopg2.extensions.register_adapter(Decimal128, lambda val: AsIs(str(val.to_decimal())))
        
        # Perform migration
        migrate_table(mongo_client, pg_conn, TABLE_CONFIG)
        
        logging.info("Migration completed successfully")
        
    except Exception as e:
        logging.error(f"Migration failed: {str(e)}")
        raise
    finally:
        # Clean up connections
        if 'pg_conn' in locals():
            pg_conn.close()
        if 'mongo_client' in locals():
            mongo_client.close()

if __name__ == "__main__":
    main()

Important Note: Before running the table migration script, make sure to configure the TABLE_CONFIG at the top of the script.

At minimum, you need:

  1. source_collection: The name of your MongoDB collection
  2. target_table: The name of your new PostgreSQL table

Optional settings include:

  • field_mapping: To rename fields or map nested fields (e.g., {"old_name": "new_name"})
  • default_values: Add new columns with default values
  • type_mapping: Leave empty for automatic type detection
  • key_fields: List of fields used to check for duplicates, records will be skipped if all specified fields match existing records (e.g., ["email", "user_id"] to prevent duplicate user records)

Example:

TABLE_CONFIG = {
    "source_collection": "users",
    "target_table": "app_users",
    "field_mapping": {
        "name": "full_name",
        "profile.age": "age"
    }
}

Step 6: Running the Migration

Save your chosen script in a file (e.g., migrate.py) and run it:

python migrate.py

Note: For large datasets, the migration might take some time.

Important Considerations

  • Always test the migration with a small subset of data first
  • Verify data integrity after migration
  • Keep your MongoDB running until you’ve thoroughly tested the migrated data
  • Consider running the migration during off-peak hours if you’re dealing with a production database

That’s it! You now have two ways to migrate your data, either the full migration for moving everything at once, or the table-by-table approach when you need more control. I personally used the table migration because I needed to reshape data and add new fields during the migration.