Files
pre-repos/okta_search_logs/full_okta_users.py
2026-01-26 16:49:09 -05:00

373 lines
13 KiB
Python

#!/usr/bin/env python3
"""
Merged Okta users export:
- Includes all profile columns from okta_users.py
- Includes lifecycle/login/password-expiry (ADpwdLastSet-based)
- Includes MFA enrollment + factors (unless --skip-mfa)
- Loads OKTA_DOMAIN, OKTA_API_TOKEN, OKTA_APP_ID from .env
Examples:
# All users (any status):
python3 full_okta_users.py --all --out okta_all_users.csv
# Only ACTIVE users:
python3 full_okta_users.py --all --only-active --out okta_active_users.csv
# Single user:
python3 full_okta_users.py --user jared.evans
# Skip MFA (faster for large runs):
python3 full_okta_users.py --all --skip-mfa
"""
import os
import re
import sys
import csv
import math
import time
import argparse
from datetime import datetime, timedelta, timezone
import requests
# ---------------- .env loading ----------------
_ENV_LINE_RE = re.compile(r'^\s*([A-Za-z_][A-Za-z0-9_]*)\s*=\s*(.*)\s*$')
def _strip_quotes(val: str) -> str:
val = val.strip()
if len(val) >= 2 and (val[0] == val[-1]) and val[0] in ("'", '"'):
return val[1:-1]
return val
def load_env():
"""Load KEY=VALUE pairs from .env in script dir or cwd."""
script_dir = os.path.dirname(os.path.abspath(__file__))
candidates = [os.path.join(script_dir, ".env"), os.path.join(os.getcwd(), ".env")]
for path in candidates:
if os.path.exists(path):
with open(path, "r", encoding="utf-8") as f:
for raw in f:
line = raw.strip()
if not line or line.startswith("#"):
continue
m = _ENV_LINE_RE.match(line)
if not m:
continue
key, val = m.group(1), _strip_quotes(m.group(2))
if key and key not in os.environ:
os.environ[key] = val
load_env()
# ---------------- Configuration ----------------
OKTA_DOMAIN = os.getenv("OKTA_DOMAIN", "gallaudet.okta.com")
API_TOKEN = os.getenv("OKTA_API_TOKEN")
OKTA_APP_ID = os.getenv("OKTA_APP_ID", "")
BASE_URL = f"https://{OKTA_DOMAIN}"
USERS_URL = f"{BASE_URL}/api/v1/users"
DEFAULT_TIMEOUT = 15 # seconds
FIXED_MAX_AGE_DAYS = 180
# ---------------- HTTP session ----------------
SESSION = requests.Session()
if API_TOKEN:
SESSION.headers.update({
"Authorization": f"SSWS {API_TOKEN}",
"Accept": "application/json",
})
else:
SESSION.headers.update({"Accept": "application/json"})
def require_token():
if not API_TOKEN:
sys.stderr.write(
"ERROR: Missing OKTA_API_TOKEN. Create a .env with:\n"
" OKTA_DOMAIN=\"gallaudet.okta.com\"\n"
" OKTA_API_TOKEN=\"xxxxxxxx\"\n"
" OKTA_APP_ID=\"0oa...\" # optional\n"
)
sys.exit(1)
# ---------------- HTTP helpers ----------------
def retry_get(url, params=None, max_tries=5):
params = dict(params or {})
delay = 0.5
for i in range(max_tries):
r = SESSION.get(url, params=params, timeout=DEFAULT_TIMEOUT)
if r.status_code in (429, 500, 502, 503, 504):
rem = r.headers.get("X-Rate-Limit-Remaining")
reset = r.headers.get("X-Rate-Limit-Reset")
sys.stderr.write(f"[backoff] {r.status_code} remaining={rem} reset={reset} try={i+1}\n")
if i == max_tries - 1:
r.raise_for_status()
time.sleep(delay)
delay *= 1.7
continue
r.raise_for_status()
return r
raise RuntimeError("Unreachable")
def get_with_pagination(url, params=None):
params = dict(params or {})
while True:
r = retry_get(url, params=params)
data = r.json()
if isinstance(data, list):
for item in data:
yield item
else:
yield data
nxt = r.links.get("next", {}).get("url")
if not nxt:
break
url, params = nxt, {}
# ---------------- Date/time helpers ----------------
def iso_to_dt(iso_str):
if not iso_str:
return None
try:
s = str(iso_str)
if s.endswith("Z"):
s = s.replace("Z", "+00:00")
return datetime.fromisoformat(s).astimezone(timezone.utc)
except Exception:
return None
def fmt_utc(dt_utc):
return dt_utc.isoformat() if dt_utc else ""
def days_between(a_dt, b_dt):
if not a_dt or not b_dt:
return ""
return math.floor((b_dt - a_dt).total_seconds() / 86400.0)
FILETIME_EPOCH = datetime(1601, 1, 1, tzinfo=timezone.utc)
def parse_adpwdlastset(value):
if value is None or value == "":
return None
dt = iso_to_dt(value)
if dt:
return dt
try:
s = str(value).strip()
if s.isdigit():
n = int(s)
if n > 10_000_000_000_000: # FILETIME
seconds = n / 10_000_000
return FILETIME_EPOCH + timedelta(seconds=seconds)
elif n > 10_000_000_000: # ms epoch
return datetime.fromtimestamp(n / 1000.0, tz=timezone.utc)
else: # sec epoch
return datetime.fromtimestamp(n, tz=timezone.utc)
except Exception:
pass
try:
f = float(str(value).strip())
if f > 0:
if f > 10_000_000_000: # ms
return datetime.fromtimestamp(f / 1000.0, tz=timezone.utc)
return datetime.fromtimestamp(f, tz=timezone.utc)
except Exception:
pass
return None
def derive_password_expired_flag(user_obj):
if user_obj.get("passwordExpired") is True:
return True
status = (user_obj.get("status") or "").upper()
if status == "PASSWORD_EXPIRED":
return True
cred = user_obj.get("credentials") or {}
pwd = cred.get("password") or {}
if (pwd.get("status") or "").upper() == "EXPIRED":
return True
return False
# ---------------- Fetchers ----------------
def get_all_users(only_active=False):
users = list(get_with_pagination(USERS_URL, params={"limit": 200}))
if only_active:
users = [u for u in users if (u.get("status") or "").upper() == "ACTIVE"]
return users
def get_user_by_login(user_login):
r = retry_get(USERS_URL, params={"filter": f'profile.login eq "{user_login}"'})
data = r.json()
if isinstance(data, list):
return data
return [data] if data else []
def get_user_factors(user_id):
url = f"{USERS_URL}/{user_id}/factors"
r = retry_get(url, params={"limit": 200})
data = r.json()
return data if isinstance(data, list) else []
def normalize_login(user_arg: str) -> str:
if "@" in user_arg:
return user_arg
return f"{user_arg}@gallaudet.edu"
# ---------------- Column set ----------------
PROFILE_COLUMNS_FROM_FIRST = [
"firstName","lastName","email","title","organization",
"wdEmployeeRole","wdJobProfile","status","employeeStatus","wdHireDate",
"wdTerminated","wdTerminationDate","wdHasAcademicAppointment","wdFutureHire",
"InstructorKeepActiveTo","wdTerminatedWorkerKeepActiveTo","department",
"wdJobFamilyGroup","wdEmployeeType","userRole","wdIsWorkerActiveStudent",
"created","activated","statusChanged","lastLogin","lastUpdated",
"passwordChanged","ADpwdLastSet","displayName","login",
"usernameWithoutDomain","secondEmail"
]
ALL_COLUMNS = PROFILE_COLUMNS_FROM_FIRST + [
"status_upper","is_locked_out",
"created_utc","activated_utc","lastUpdated_utc","statusChanged_utc",
"deprovisioned_date","lastLockedOut",
"division","manager","costCenter","userType",
"ADpwdLastSet_raw","ADpwdLastSet_utc","ADpwdLastSet_days_since",
"password_last_set_utc","days_since_password_last_set",
"maxAgeDays_hardcoded","estimated_expire_utc","days_until_expiry",
"last_successful_signin_utc","days_since_last_successful_signin",
"passwordExpired","id",
"mfa_enrolled","mfa_factors"
]
# ---------------- Row builder ----------------
def build_row(user, now_utc, include_mfa: bool):
profile = user.get("profile") or {}
status = (user.get("status") or "")
status_u = status.upper()
row = {}
for k in PROFILE_COLUMNS_FROM_FIRST:
if k in ("status","created","activated","statusChanged","lastLogin","lastUpdated","passwordChanged"):
row[k] = user.get(k, "")
elif k == "usernameWithoutDomain":
login = profile.get("login","")
row[k] = login.split("@")[0] if "@" in login else login
else:
row[k] = profile.get(k, "")
row["id"] = user.get("id","")
row["status_upper"] = status_u
created_dt = iso_to_dt(user.get("created"))
activated_dt = iso_to_dt(user.get("activated"))
last_updated_dt = iso_to_dt(user.get("lastUpdated"))
status_changed_dt = iso_to_dt(user.get("statusChanged"))
last_login_dt = iso_to_dt(user.get("lastLogin"))
row["created_utc"] = fmt_utc(created_dt)
row["activated_utc"] = fmt_utc(activated_dt)
row["lastUpdated_utc"] = fmt_utc(last_updated_dt)
row["statusChanged_utc"] = fmt_utc(status_changed_dt)
row["is_locked_out"] = "Yes" if status_u == "LOCKED_OUT" else "No"
row["lastLockedOut"] = row["statusChanged_utc"] if status_u == "LOCKED_OUT" else ""
row["deprovisioned_date"] = row["statusChanged_utc"] if status_u == "DEPROVISIONED" else ""
row["division"] = profile.get("division","")
row["manager"] = profile.get("manager","")
row["costCenter"] = profile.get("costCenter","")
row["userType"] = profile.get("userType","")
ad_raw = profile.get("ADpwdLastSet", "")
ad_dt = parse_adpwdlastset(ad_raw)
row["ADpwdLastSet_raw"] = ad_raw if ad_raw is not None else ""
row["ADpwdLastSet"] = ad_raw if ad_raw is not None else ""
row["ADpwdLastSet_utc"] = fmt_utc(ad_dt)
row["ADpwdLastSet_days_since"] = days_between(ad_dt, now_utc)
last_set_dt = ad_dt
row["password_last_set_utc"] = fmt_utc(last_set_dt)
row["days_since_password_last_set"] = days_between(last_set_dt, now_utc)
if last_set_dt:
expire_dt = last_set_dt + timedelta(days=FIXED_MAX_AGE_DAYS)
row["maxAgeDays_hardcoded"] = FIXED_MAX_AGE_DAYS
row["estimated_expire_utc"] = fmt_utc(expire_dt)
row["days_until_expiry"] = days_between(now_utc, expire_dt)
else:
row["maxAgeDays_hardcoded"] = ""
row["estimated_expire_utc"] = ""
row["days_until_expiry"] = ""
row["last_successful_signin_utc"] = fmt_utc(last_login_dt)
row["days_since_last_successful_signin"] = days_between(last_login_dt, now_utc)
row["passwordExpired"] = "True" if derive_password_expired_flag(user) else "False"
if include_mfa:
mfa_factors = []
try:
factors = get_user_factors(row["id"])
for f in factors:
ftype = (f or {}).get("factorType") or (f or {}).get("provider")
if ftype:
mfa_factors.append(str(ftype).lower())
mfa_factors = sorted(set(mfa_factors))
except requests.HTTPError as e:
sys.stderr.write(f"Warning: factors fetch failed for {row['id']}: {e}\n")
row["mfa_enrolled"] = "Yes" if mfa_factors else "No"
row["mfa_factors"] = ",".join(mfa_factors)
else:
row["mfa_enrolled"] = ""
row["mfa_factors"] = ""
return row
# ---------------- Main ----------------
def main():
parser = argparse.ArgumentParser(
description="Merged Okta users export (profiles + lifecycle/login + ADpwdLastSet-based expiry + MFA)."
)
mg = parser.add_mutually_exclusive_group(required=True)
mg.add_argument("--user", help="Username (e.g., 'jared.evans') or full email")
mg.add_argument("--all", action="store_true", help="Export all users")
parser.add_argument("--only-active", action="store_true",
help="With --all, include only ACTIVE users")
parser.add_argument("--skip-mfa", action="store_true",
help="Skip calling /factors for each user (faster for large exports)")
parser.add_argument("--out", default="okta_users_merged.csv",
help="Output CSV path (default: okta_users_merged.csv)")
args = parser.parse_args()
require_token()
if args.user:
login = normalize_login(args.user)
print(f"Fetching user {login}...")
users = get_user_by_login(login)
if not users:
print(f"No user found with login: {login}")
sys.exit(1)
else:
print(f"Fetching users (only_active={args.only_active})...")
users = get_all_users(only_active=args.only_active)
print(f"Found {len(users)} users.")
now_utc = datetime.now(timezone.utc)
rows = []
for idx, u in enumerate(users, start=1):
rows.append(build_row(u, now_utc, include_mfa=(not args.skip_mfa)))
if idx % 200 == 0 and not args.user:
print(f"Processed {idx}/{len(users)} users...")
with open(args.out, "w", newline="", encoding="utf-8") as f:
w = csv.DictWriter(f, fieldnames=ALL_COLUMNS)
w.writeheader()
for r in rows:
w.writerow(r)
print(f"Done. Wrote {args.out}")
if __name__ == "__main__":
main()