How to Safely Migrate Your S3 Files to a New Cloud Provider
Learn how to safely migrate your S3 files to a new cloud provider using Python, boto3, and progress tracking for large buckets.
In my last post, I showed how to migrate your PostgreSQL database safely. But for many applications, databases are only part of the story. If you're using S3 (or any S3-compatible object storage), moving your files is just as important—and sometimes trickier, especially when you have hundreds or thousands of files.
Why S3 Migration Needs Care
When copying files from one bucket to another, you can't just blindly copy everything. You want to ensure:
- No duplicates: Avoid overwriting files that already exist in the target bucket.
- Integrity: Make sure file sizes match to prevent incomplete uploads.
- Progress visibility: For large buckets, it's helpful to see which files have been copied.
The Python script below takes care of all of these automatically. It first validates the source and target credentials. If they're correct, it scans the source bucket file by file. For each file, it checks whether the file already exists in the destination bucket and whether it has the same size. If either condition fails, the file is uploaded.
Monitoring the Migration
To give feedback during the process, the script uses tqdm to display a progress bar. This makes it easy
to monitor large migrations and see how many files were scanned versus how many were actually copied.
Python Script
import boto3
from botocore.config import Config
from botocore.exceptions import ClientError, EndpointConnectionError
import time
from tqdm import tqdm
import sys
# ---------- SOURCE (AWS-style) ----------
SOURCE_ACCESS_KEY=""
SOURCE_SECRET_KEY=""
SOURCE_REGION=""
SOURCE_BUCKET=""
source_s3=boto3.client(
"s3",
aws_access_key_id=SOURCE_ACCESS_KEY,
aws_secret_access_key=SOURCE_SECRET_KEY,
region_name=SOURCE_REGION
)
try:
source_s3.head_bucket(Bucket=SOURCE_BUCKET)
print("✅ Credentials are valid and bucket is accessible")
except ClientError as e:
print("❌ Credentials are invalid for source")
print(e)
sys.exit()
# ---------- DESTINATION (S3-compatible) ----------
DEST_ENDPOINT_URL = ""
DEST_ACCESS_KEY = ""
DEST_SECRET_KEY = ""
DEST_BUCKET = ""
dest_s3 = boto3.client(
"s3",
aws_access_key_id=DEST_ACCESS_KEY,
aws_secret_access_key=DEST_SECRET_KEY,
endpoint_url=DEST_ENDPOINT_URL,
config=Config(s3={"addressing_style": "path"})
)
try:
dest_s3.list_buckets()
print("✅ Credentials are valid for destination")
except ClientError as e:
print("❌ Credentials are invalid for destination")
print(e)
sys.exit()
def safe_head_object(bucket, key, retries=3, delay=2):
"""
Tries to head an object, handles transient errors like 504 or connection issues.
Returns head data if exists, None if missing, or raises if permanently fails.
"""
for attempt in range(1, retries + 1):
try:
return dest_s3.head_object(Bucket=bucket, Key=key)
except ClientError as e:
code = e.response["Error"]["Code"]
if code in ("404", "NoSuchKey"):
return None
elif code in ("500", "502", "503", "504"): # transient server errors
print(f"⚠️ Transient error {code} on {key}, attempt {attempt}/{retries}")
time.sleep(delay)
else:
raise
except EndpointConnectionError as e:
print(f"⚠️ Connection error on {key}, attempt {attempt}/{retries}")
time.sleep(delay)
# After retries, just skip
print(f"⚠️ Skipping {key} after {retries} failed attempts")
return None
def object_exists_and_same_size(key, size):
"""
Check if object exists in destination and has same size
"""
head = safe_head_object(DEST_BUCKET, key)
if not head:
return False
return head.get("ContentLength", 0) == size
def mirror_bucket_with_progress():
continuation_token = None
scanned = 0
copied = 0
print("🔍 Scanning source bucket and copying missing files...")
with tqdm(
desc="Scanning & copying",
unit="file",
dynamic_ncols=True
) as bar:
while True:
list_kwargs = {"Bucket": SOURCE_BUCKET}
if continuation_token:
list_kwargs["ContinuationToken"] = continuation_token
response = source_s3.list_objects_v2(**list_kwargs)
if "Contents" not in response:
break
for obj in response["Contents"]:
key = obj["Key"]
size = obj["Size"]
scanned += 1
if not object_exists_and_same_size(key, size):
source_obj = source_s3.get_object(
Bucket=SOURCE_BUCKET,
Key=key
)
dest_s3.upload_fileobj(
Fileobj=source_obj["Body"],
Bucket=DEST_BUCKET,
Key=key
)
copied += 1
bar.update(1)
bar.set_postfix(
scanned=scanned,
copied=copied
)
if response.get("IsTruncated"):
continuation_token = response["NextContinuationToken"]
else:
break
if copied == 0:
print("✅ Everything already in sync!")
else:
print(f"✅ Mirror complete! Copied {copied} files.")
mirror_bucket_with_progress()
Things to Keep in Mind
- Time and bandwidth: Depending on your bucket size, this script may run for hours. If you have millions of objects, consider running it on a server with good network connectivity.
- Credentials security: Never hardcode your credentials in production scripts. Use environment variables or a secrets manager.
- Error handling: The script currently stops on major credential errors, but you could enhance it to retry failed uploads automatically.
Once the migration finishes, you'll have an exact mirror of your S3 bucket in your new provider's environment, ready for your application to use.
✅ All files are in sync! Your new S3 bucket is ready.