Bulk Email Validation Pipeline¶
This tutorial walks through building a production-ready email validation pipeline that processes thousands of addresses, filters out disposable and role accounts, and scores deliverability — all using Toolkit API endpoints.
What you'll build¶
A pipeline that: 1. Validates email addresses in batches of up to 50 2. Filters out disposable domains and role accounts 3. Scores deliverability and flags risky addresses 4. Exports results as CSV for your CRM or mailing list
Prerequisites¶
pip install httpx
Step 1: Batch validation¶
Process email addresses in batches of 50 (the max per API call):
import httpx
import csv
from typing import Iterator
API_KEY = "YOUR_KEY"
BASE_URL = "https://email.toolkitapi.io/v1"
def chunk_list(items: list, chunk_size: int = 50) -> Iterator[list]:
"""Split a list into fixed-size chunks."""
for i in range(0, len(items), chunk_size):
yield items[i:i + chunk_size]
def validate_batch(emails: list[str]) -> list[dict]:
"""Validate up to 50 emails in one API call."""
r = httpx.post(
f"{BASE_URL}/email/validate-batch",
headers={"X-API-Key": API_KEY},
json={"emails": emails},
)
r.raise_for_status()
return r.json()["results"]
# Load emails from a CSV or text file
with open("mailing_list.csv") as f:
reader = csv.reader(f)
emails = [row[0] for row in reader if row]
results = []
for batch in chunk_list(emails, 50):
batch_results = validate_batch(batch)
results.extend(batch_results)
print(f"Validated {len(results)}/{len(emails)} emails")
print(f"Done! {len(results)} emails validated.")
const API_KEY = "YOUR_KEY";
const BASE_URL = "https://email.toolkitapi.io/v1";
async function validateBatch(emails) {
const r = await fetch(`${BASE_URL}/email/validate-batch`, {
method: "POST",
headers: { "X-API-Key": API_KEY, "Content-Type": "application/json" },
body: JSON.stringify({ emails }),
});
const data = await r.json();
return data.results;
}
// Process in chunks of 50
const emails = ["[email protected]", "[email protected]", /* ... */];
const results = [];
for (let i = 0; i < emails.length; i += 50) {
const batch = emails.slice(i, i + 50);
const batchResults = await validateBatch(batch);
results.push(...batchResults);
console.log(`Validated ${results.length}/${emails.length}`);
}
Step 2: Classify results¶
Categorize each email based on validation results:
def classify_email(result: dict) -> str:
"""Classify an email based on validation signals."""
if not result.get("syntax_valid", False):
return "invalid_syntax"
if not result.get("mx_found", False):
return "no_mx"
if result.get("is_disposable", False):
return "disposable"
if result.get("is_role", False):
return "role_account"
if result.get("is_free", False):
return "free_provider"
deliverability = result.get("deliverability", "unknown")
if deliverability == "deliverable":
return "deliverable"
elif deliverability == "risky":
return "risky"
else:
return "undeliverable"
# Apply classification
clean = []
flags = []
for r in results:
category = classify_email(r)
if category == "deliverable":
clean.append(r)
else:
flags.append({"email": r["email"], "reason": category})
print(f"Clean (deliverable): {len(clean)}")
print(f"Flagged: {len(flags)}")
for f in flags[:10]:
print(f" ✗ {f['email']} — {f['reason']}")
function classifyEmail(result) {
if (!result.syntax_valid) return "invalid_syntax";
if (!result.mx_found) return "no_mx";
if (result.is_disposable) return "disposable";
if (result.is_role) return "role_account";
if (result.is_free) return "free_provider";
const d = result.deliverability;
if (d === "deliverable") return "deliverable";
if (d === "risky") return "risky";
return "undeliverable";
}
const clean = [];
const flagged = [];
for (const r of results) {
const category = classifyEmail(r);
if (category === "deliverable") clean.push(r);
else flagged.push({ email: r.email, reason: category });
}
console.log(`Clean: ${clean.length}, Flagged: ${flagged.length}`);
Step 3: Additional checks¶
Detect catch-all domains¶
For domains flagged as "risky", check if they're catch-all servers:
def check_catch_all(domain: str) -> bool:
"""Check if a domain accepts all emails (catch-all)."""
r = httpx.get(
f"{BASE_URL}/email/catch-all",
headers={"X-API-Key": API_KEY},
params={"domain": domain},
)
r.raise_for_status()
return r.json().get("is_catch_all", False)
Check spam score on content¶
If you're validating form submissions, also check for spam:
def check_spam(subject: str, body: str) -> dict:
"""Score email content for spam signals."""
r = httpx.post(
f"{BASE_URL}/email/spam-score",
headers={"X-API-Key": API_KEY},
json={"subject": subject, "body": body},
)
r.raise_for_status()
return r.json()
Step 4: Export results¶
Write cleaned results to CSV:
import csv
output_fields = ["email", "deliverability", "confidence", "is_disposable",
"is_free", "is_role", "mx_records", "classification"]
with open("validated_emails.csv", "w", newline="") as f:
writer = csv.DictWriter(f, fieldnames=output_fields)
writer.writeheader()
for r in results:
r["classification"] = classify_email(r)
writer.writerow({k: r.get(k, "") for k in output_fields})
print(f"Exported to validated_emails.csv")
import fs from "fs";
const header = "email,deliverability,confidence,is_disposable,is_free,is_role,classification\n";
const rows = results.map(r => {
const c = classifyEmail(r);
return `${r.email},${r.deliverability},${r.confidence},${r.is_disposable},${r.is_free},${r.is_role},${c}`;
}).join("\n");
fs.writeFileSync("validated_emails.csv", header + rows);
console.log("Exported to validated_emails.csv");
Complete pipeline summary¶
| Step | Endpoint | Purpose |
|---|---|---|
| 1 | POST /v1/email/validate-batch |
Validate up to 50 emails at once |
| 2 | (client-side) | Classify by deliverability, disposable, role |
| 3 | GET /v1/email/catch-all |
Detect catch-all domains for risky results |
| 4 | POST /v1/email/spam-score |
Score content for spam signals |
| 5 | (client-side) | Export clean results to CSV |
Taking it further¶
- Real-time validation — Use
GET /v1/email/validatefor signup forms and inline validation. - Provider intelligence — Use
GET /v1/email/providerto identify the email provider from MX records. - Header forensics — Use
POST /v1/email/headersto parse full email headers for spoofing detection. - Rate limiting — Add
time.sleep(0.5)between batches if you hit rate limits on large lists.