Bulk Email Validation Pipeline

This tutorial walks through building a production-ready email validation pipeline that processes thousands of addresses, filters out disposable and role accounts, and scores deliverability — all using Toolkit API endpoints.

What you'll build

A pipeline that: 1. Validates email addresses in batches of up to 50 2. Filters out disposable domains and role accounts 3. Scores deliverability and flags risky addresses 4. Exports results as CSV for your CRM or mailing list

Prerequisites

pip install httpx

Step 1: Batch validation

Process email addresses in batches of 50 (the max per API call):

import httpx
import csv
from typing import Iterator

API_KEY = "YOUR_KEY"
BASE_URL = "https://email.toolkitapi.io/v1"

def chunk_list(items: list, chunk_size: int = 50) -> Iterator[list]:
    """Split a list into fixed-size chunks."""
    for i in range(0, len(items), chunk_size):
        yield items[i:i + chunk_size]

def validate_batch(emails: list[str]) -> list[dict]:
    """Validate up to 50 emails in one API call."""
    r = httpx.post(
        f"{BASE_URL}/email/validate-batch",
        headers={"X-API-Key": API_KEY},
        json={"emails": emails},
    )
    r.raise_for_status()
    return r.json()["results"]

# Load emails from a CSV or text file
with open("mailing_list.csv") as f:
    reader = csv.reader(f)
    emails = [row[0] for row in reader if row]

results = []
for batch in chunk_list(emails, 50):
    batch_results = validate_batch(batch)
    results.extend(batch_results)
    print(f"Validated {len(results)}/{len(emails)} emails")

print(f"Done! {len(results)} emails validated.")
const API_KEY = "YOUR_KEY";
const BASE_URL = "https://email.toolkitapi.io/v1";

async function validateBatch(emails) {
  const r = await fetch(`${BASE_URL}/email/validate-batch`, {
    method: "POST",
    headers: { "X-API-Key": API_KEY, "Content-Type": "application/json" },
    body: JSON.stringify({ emails }),
  });
  const data = await r.json();
  return data.results;
}

// Process in chunks of 50
const emails = ["[email protected]", "[email protected]", /* ... */];
const results = [];

for (let i = 0; i < emails.length; i += 50) {
  const batch = emails.slice(i, i + 50);
  const batchResults = await validateBatch(batch);
  results.push(...batchResults);
  console.log(`Validated ${results.length}/${emails.length}`);
}

Step 2: Classify results

Categorize each email based on validation results:

def classify_email(result: dict) -> str:
    """Classify an email based on validation signals."""
    if not result.get("syntax_valid", False):
        return "invalid_syntax"
    if not result.get("mx_found", False):
        return "no_mx"
    if result.get("is_disposable", False):
        return "disposable"
    if result.get("is_role", False):
        return "role_account"
    if result.get("is_free", False):
        return "free_provider"

    deliverability = result.get("deliverability", "unknown")
    if deliverability == "deliverable":
        return "deliverable"
    elif deliverability == "risky":
        return "risky"
    else:
        return "undeliverable"

# Apply classification
clean = []
flags = []

for r in results:
    category = classify_email(r)
    if category == "deliverable":
        clean.append(r)
    else:
        flags.append({"email": r["email"], "reason": category})

print(f"Clean (deliverable): {len(clean)}")
print(f"Flagged: {len(flags)}")
for f in flags[:10]:
    print(f"  ✗ {f['email']} — {f['reason']}")
function classifyEmail(result) {
  if (!result.syntax_valid) return "invalid_syntax";
  if (!result.mx_found) return "no_mx";
  if (result.is_disposable) return "disposable";
  if (result.is_role) return "role_account";
  if (result.is_free) return "free_provider";

  const d = result.deliverability;
  if (d === "deliverable") return "deliverable";
  if (d === "risky") return "risky";
  return "undeliverable";
}

const clean = [];
const flagged = [];

for (const r of results) {
  const category = classifyEmail(r);
  if (category === "deliverable") clean.push(r);
  else flagged.push({ email: r.email, reason: category });
}

console.log(`Clean: ${clean.length}, Flagged: ${flagged.length}`);

Step 3: Additional checks

Detect catch-all domains

For domains flagged as "risky", check if they're catch-all servers:

def check_catch_all(domain: str) -> bool:
    """Check if a domain accepts all emails (catch-all)."""
    r = httpx.get(
        f"{BASE_URL}/email/catch-all",
        headers={"X-API-Key": API_KEY},
        params={"domain": domain},
    )
    r.raise_for_status()
    return r.json().get("is_catch_all", False)

Check spam score on content

If you're validating form submissions, also check for spam:

def check_spam(subject: str, body: str) -> dict:
    """Score email content for spam signals."""
    r = httpx.post(
        f"{BASE_URL}/email/spam-score",
        headers={"X-API-Key": API_KEY},
        json={"subject": subject, "body": body},
    )
    r.raise_for_status()
    return r.json()

Step 4: Export results

Write cleaned results to CSV:

import csv

output_fields = ["email", "deliverability", "confidence", "is_disposable",
                 "is_free", "is_role", "mx_records", "classification"]

with open("validated_emails.csv", "w", newline="") as f:
    writer = csv.DictWriter(f, fieldnames=output_fields)
    writer.writeheader()
    for r in results:
        r["classification"] = classify_email(r)
        writer.writerow({k: r.get(k, "") for k in output_fields})

print(f"Exported to validated_emails.csv")
import fs from "fs";

const header = "email,deliverability,confidence,is_disposable,is_free,is_role,classification\n";
const rows = results.map(r => {
  const c = classifyEmail(r);
  return `${r.email},${r.deliverability},${r.confidence},${r.is_disposable},${r.is_free},${r.is_role},${c}`;
}).join("\n");

fs.writeFileSync("validated_emails.csv", header + rows);
console.log("Exported to validated_emails.csv");

Complete pipeline summary

Step Endpoint Purpose
1 POST /v1/email/validate-batch Validate up to 50 emails at once
2 (client-side) Classify by deliverability, disposable, role
3 GET /v1/email/catch-all Detect catch-all domains for risky results
4 POST /v1/email/spam-score Score content for spam signals
5 (client-side) Export clean results to CSV

Taking it further

  • Real-time validation — Use GET /v1/email/validate for signup forms and inline validation.
  • Provider intelligence — Use GET /v1/email/provider to identify the email provider from MX records.
  • Header forensics — Use POST /v1/email/headers to parse full email headers for spoofing detection.
  • Rate limiting — Add time.sleep(0.5) between batches if you hit rate limits on large lists.

Browse all email endpoints → Browse validation endpoint →