#!/usr/bin/env python3 """ Report: Users NOT enrolled in MFA Examples: # Active users only (default): python3 mfa_not_enrolled.py --out no_mfa_active.csv # All statuses except DEPROVISIONED: python3 mfa_not_enrolled.py --all-statuses --out no_mfa_all.csv # Include DEPROVISIONED as well: python3 mfa_not_enrolled.py --all-statuses --include-deprovisioned --out no_mfa_all_incl_deprov.csv """ import os import re import sys import csv import time import math import argparse from datetime import datetime, timezone, timedelta import requests # ---------------- .env loading (KEY=VALUE; quotes ok) ---------------- _ENV_LINE_RE = re.compile(r'^\s*([A-Za-z_][A-Za-z0-9_]*)\s*=\s*(.*)\s*$') def _strip_quotes(v: str) -> str: v = v.strip() return v[1:-1] if len(v) >= 2 and v[0] == v[-1] and v[0] in ("'", '"') else v def load_env(): script_dir = os.path.dirname(os.path.abspath(__file__)) for p in (os.path.join(script_dir, ".env"), os.path.join(os.getcwd(), ".env")): if os.path.exists(p): with open(p, "r", encoding="utf-8") as f: for raw in f: s = raw.strip() if not s or s.startswith("#"): continue m = _ENV_LINE_RE.match(s) if not m: continue k, v = m.group(1), _strip_quotes(m.group(2)) if k and k not in os.environ: os.environ[k] = v load_env() # ---------------- Config ---------------- OKTA_DOMAIN = os.getenv("OKTA_DOMAIN", "gallaudet.okta.com") API_TOKEN = os.getenv("OKTA_API_TOKEN") if not API_TOKEN: sys.stderr.write("ERROR: Missing OKTA_API_TOKEN in .env\n") sys.exit(1) BASE_URL = f"https://{OKTA_DOMAIN}" USERS_URL = f"{BASE_URL}/api/v1/users" DEFAULT_TIMEOUT = 15 # ---------------- HTTP session with backoff ---------------- SESSION = requests.Session() SESSION.headers.update({"Authorization": f"SSWS {API_TOKEN}", "Accept": "application/json"}) def retry_get(url, params=None, max_tries=5): params = dict(params or {}) delay = 0.5 for i in range(max_tries): r = SESSION.get(url, params=params, timeout=DEFAULT_TIMEOUT) if r.status_code in (429, 500, 502, 503, 504): rem = r.headers.get("X-Rate-Limit-Remaining") reset = r.headers.get("X-Rate-Limit-Reset") sys.stderr.write(f"[backoff] {r.status_code} remaining={rem} reset={reset} try={i+1}\n") if i == max_tries - 1: r.raise_for_status() time.sleep(delay) delay *= 1.7 continue r.raise_for_status() return r raise RuntimeError("unreachable") def get_with_pagination(url, params=None): params = dict(params or {}) while True: r = retry_get(url, params=params) data = r.json() if isinstance(data, list): for item in data: yield item else: yield data nxt = r.links.get("next", {}).get("url") if not nxt: break url, params = nxt, {} # ---------------- Helpers ---------------- def iso_to_dt(s): if not s: return None try: s = s.replace("Z", "+00:00") if s.endswith("Z") else s return datetime.fromisoformat(s).astimezone(timezone.utc) except Exception: return None def fmt_utc(dt): return dt.isoformat() if dt else "" def get_user_factors(user_id: str): url = f"{USERS_URL}/{user_id}/factors" r = retry_get(url, params={"limit": 200}) data = r.json() return data if isinstance(data, list) else [] def list_users(active_only: bool, include_deprov: bool, all_statuses: bool): """ Build an Okta filter for statuses: - default: ACTIVE only - --all-statuses: everything except DEPROVISIONED (unless --include-deprovisioned) """ if active_only and not all_statuses: filter_str = 'status eq "ACTIVE"' else: # Okta supports OR filters. Include common states; optionally deprovisioned. statuses = ['"ACTIVE"', '"STAGED"', '"PROVISIONED"', '"RECOVERY"', '"PASSWORD_EXPIRED"', '"LOCKED_OUT"'] if include_deprov: statuses.append('"DEPROVISIONED"') ors = " or ".join([f"status eq {s}" for s in statuses]) filter_str = ors params = {"limit": 200, "filter": filter_str} return list(get_with_pagination(USERS_URL, params=params)) def build_row(user, factors_list): profile = user.get("profile") or {} status = (user.get("status") or "") created = iso_to_dt(user.get("created")) last_login = iso_to_dt(user.get("lastLogin")) last_updated = iso_to_dt(user.get("lastUpdated")) # normalize factor names ftypes = [] for f in factors_list: ftype = (f or {}).get("factorType") or (f or {}).get("provider") if ftype: ftypes.append(str(ftype).lower()) ftypes = sorted(set(ftypes)) mfa_enrolled = "Yes" if ftypes else "No" return { "id": user.get("id", ""), "login": profile.get("login", ""), "email": profile.get("email", ""), "status": status, "created_utc": fmt_utc(created), "lastLogin_utc": fmt_utc(last_login), "lastUpdated_utc": fmt_utc(last_updated), "mfa_enrolled": mfa_enrolled, "mfa_factors": ",".join(ftypes), "firstName": profile.get("firstName", ""), "lastName": profile.get("lastName", ""), "department": profile.get("department", ""), "title": profile.get("title", ""), } # ---------------- Main ---------------- def main(): ap = argparse.ArgumentParser(description="List accounts NOT enrolled in MFA.") ap.add_argument("--out", default="okta_users_without_mfa.csv", help="Output CSV file") ap.add_argument("--all-statuses", action="store_true", help="Include all statuses (default is ACTIVE only).") ap.add_argument("--include-deprovisioned", action="store_true", help="When --all-statuses is used, also include DEPROVISIONED.") args = ap.parse_args() active_only = not args.all_statuses users = list_users(active_only=active_only, include_deprov=args.include_deprovisioned, all_statuses=args.all_statuses) rows = [] for idx, u in enumerate(users, 1): try: factors = get_user_factors(u.get("id","")) except requests.HTTPError as e: sys.stderr.write(f"Warning: factors fetch failed for {u.get('id')}: {e}\n") factors = [] row = build_row(u, factors) if row["mfa_enrolled"] == "No": rows.append(row) if idx % 200 == 0: print(f"Processed {idx} users...") fieldnames = ["id","login","email","status","created_utc","lastLogin_utc","lastUpdated_utc", "mfa_enrolled","mfa_factors","firstName","lastName","department","title"] with open(args.out, "w", newline="", encoding="utf-8") as f: w = csv.DictWriter(f, fieldnames=fieldnames) w.writeheader() for r in rows: w.writerow(r) print(f"Done. Found {len(rows)} user(s) without MFA. Wrote {args.out}") if __name__ == "__main__": main()