373 lines
13 KiB
Python
373 lines
13 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Merged Okta users export:
|
|
- Includes all profile columns from okta_users.py
|
|
- Includes lifecycle/login/password-expiry (ADpwdLastSet-based)
|
|
- Includes MFA enrollment + factors (unless --skip-mfa)
|
|
- Loads OKTA_DOMAIN, OKTA_API_TOKEN, OKTA_APP_ID from .env
|
|
|
|
Examples:
|
|
# All users (any status):
|
|
python3 full_okta_users.py --all --out okta_all_users.csv
|
|
|
|
# Only ACTIVE users:
|
|
python3 full_okta_users.py --all --only-active --out okta_active_users.csv
|
|
|
|
# Single user:
|
|
python3 full_okta_users.py --user jared.evans
|
|
|
|
# Skip MFA (faster for large runs):
|
|
python3 full_okta_users.py --all --skip-mfa
|
|
"""
|
|
|
|
import os
|
|
import re
|
|
import sys
|
|
import csv
|
|
import math
|
|
import time
|
|
import argparse
|
|
from datetime import datetime, timedelta, timezone
|
|
|
|
import requests
|
|
|
|
# ---------------- .env loading ----------------
|
|
_ENV_LINE_RE = re.compile(r'^\s*([A-Za-z_][A-Za-z0-9_]*)\s*=\s*(.*)\s*$')
|
|
|
|
def _strip_quotes(val: str) -> str:
|
|
val = val.strip()
|
|
if len(val) >= 2 and (val[0] == val[-1]) and val[0] in ("'", '"'):
|
|
return val[1:-1]
|
|
return val
|
|
|
|
def load_env():
|
|
"""Load KEY=VALUE pairs from .env in script dir or cwd."""
|
|
script_dir = os.path.dirname(os.path.abspath(__file__))
|
|
candidates = [os.path.join(script_dir, ".env"), os.path.join(os.getcwd(), ".env")]
|
|
for path in candidates:
|
|
if os.path.exists(path):
|
|
with open(path, "r", encoding="utf-8") as f:
|
|
for raw in f:
|
|
line = raw.strip()
|
|
if not line or line.startswith("#"):
|
|
continue
|
|
m = _ENV_LINE_RE.match(line)
|
|
if not m:
|
|
continue
|
|
key, val = m.group(1), _strip_quotes(m.group(2))
|
|
if key and key not in os.environ:
|
|
os.environ[key] = val
|
|
|
|
load_env()
|
|
|
|
# ---------------- Configuration ----------------
|
|
OKTA_DOMAIN = os.getenv("OKTA_DOMAIN", "gallaudet.okta.com")
|
|
API_TOKEN = os.getenv("OKTA_API_TOKEN")
|
|
OKTA_APP_ID = os.getenv("OKTA_APP_ID", "")
|
|
|
|
BASE_URL = f"https://{OKTA_DOMAIN}"
|
|
USERS_URL = f"{BASE_URL}/api/v1/users"
|
|
|
|
DEFAULT_TIMEOUT = 15 # seconds
|
|
FIXED_MAX_AGE_DAYS = 180
|
|
|
|
# ---------------- HTTP session ----------------
|
|
SESSION = requests.Session()
|
|
if API_TOKEN:
|
|
SESSION.headers.update({
|
|
"Authorization": f"SSWS {API_TOKEN}",
|
|
"Accept": "application/json",
|
|
})
|
|
else:
|
|
SESSION.headers.update({"Accept": "application/json"})
|
|
|
|
def require_token():
|
|
if not API_TOKEN:
|
|
sys.stderr.write(
|
|
"ERROR: Missing OKTA_API_TOKEN. Create a .env with:\n"
|
|
" OKTA_DOMAIN=\"gallaudet.okta.com\"\n"
|
|
" OKTA_API_TOKEN=\"xxxxxxxx\"\n"
|
|
" OKTA_APP_ID=\"0oa...\" # optional\n"
|
|
)
|
|
sys.exit(1)
|
|
|
|
# ---------------- HTTP helpers ----------------
|
|
def retry_get(url, params=None, max_tries=5):
|
|
params = dict(params or {})
|
|
delay = 0.5
|
|
for i in range(max_tries):
|
|
r = SESSION.get(url, params=params, timeout=DEFAULT_TIMEOUT)
|
|
if r.status_code in (429, 500, 502, 503, 504):
|
|
rem = r.headers.get("X-Rate-Limit-Remaining")
|
|
reset = r.headers.get("X-Rate-Limit-Reset")
|
|
sys.stderr.write(f"[backoff] {r.status_code} remaining={rem} reset={reset} try={i+1}\n")
|
|
if i == max_tries - 1:
|
|
r.raise_for_status()
|
|
time.sleep(delay)
|
|
delay *= 1.7
|
|
continue
|
|
r.raise_for_status()
|
|
return r
|
|
raise RuntimeError("Unreachable")
|
|
|
|
def get_with_pagination(url, params=None):
|
|
params = dict(params or {})
|
|
while True:
|
|
r = retry_get(url, params=params)
|
|
data = r.json()
|
|
if isinstance(data, list):
|
|
for item in data:
|
|
yield item
|
|
else:
|
|
yield data
|
|
nxt = r.links.get("next", {}).get("url")
|
|
if not nxt:
|
|
break
|
|
url, params = nxt, {}
|
|
|
|
# ---------------- Date/time helpers ----------------
|
|
def iso_to_dt(iso_str):
|
|
if not iso_str:
|
|
return None
|
|
try:
|
|
s = str(iso_str)
|
|
if s.endswith("Z"):
|
|
s = s.replace("Z", "+00:00")
|
|
return datetime.fromisoformat(s).astimezone(timezone.utc)
|
|
except Exception:
|
|
return None
|
|
|
|
def fmt_utc(dt_utc):
|
|
return dt_utc.isoformat() if dt_utc else ""
|
|
|
|
def days_between(a_dt, b_dt):
|
|
if not a_dt or not b_dt:
|
|
return ""
|
|
return math.floor((b_dt - a_dt).total_seconds() / 86400.0)
|
|
|
|
FILETIME_EPOCH = datetime(1601, 1, 1, tzinfo=timezone.utc)
|
|
|
|
def parse_adpwdlastset(value):
|
|
if value is None or value == "":
|
|
return None
|
|
dt = iso_to_dt(value)
|
|
if dt:
|
|
return dt
|
|
try:
|
|
s = str(value).strip()
|
|
if s.isdigit():
|
|
n = int(s)
|
|
if n > 10_000_000_000_000: # FILETIME
|
|
seconds = n / 10_000_000
|
|
return FILETIME_EPOCH + timedelta(seconds=seconds)
|
|
elif n > 10_000_000_000: # ms epoch
|
|
return datetime.fromtimestamp(n / 1000.0, tz=timezone.utc)
|
|
else: # sec epoch
|
|
return datetime.fromtimestamp(n, tz=timezone.utc)
|
|
except Exception:
|
|
pass
|
|
try:
|
|
f = float(str(value).strip())
|
|
if f > 0:
|
|
if f > 10_000_000_000: # ms
|
|
return datetime.fromtimestamp(f / 1000.0, tz=timezone.utc)
|
|
return datetime.fromtimestamp(f, tz=timezone.utc)
|
|
except Exception:
|
|
pass
|
|
return None
|
|
|
|
def derive_password_expired_flag(user_obj):
|
|
if user_obj.get("passwordExpired") is True:
|
|
return True
|
|
status = (user_obj.get("status") or "").upper()
|
|
if status == "PASSWORD_EXPIRED":
|
|
return True
|
|
cred = user_obj.get("credentials") or {}
|
|
pwd = cred.get("password") or {}
|
|
if (pwd.get("status") or "").upper() == "EXPIRED":
|
|
return True
|
|
return False
|
|
|
|
# ---------------- Fetchers ----------------
|
|
def get_all_users(only_active=False):
|
|
users = list(get_with_pagination(USERS_URL, params={"limit": 200}))
|
|
if only_active:
|
|
users = [u for u in users if (u.get("status") or "").upper() == "ACTIVE"]
|
|
return users
|
|
|
|
def get_user_by_login(user_login):
|
|
r = retry_get(USERS_URL, params={"filter": f'profile.login eq "{user_login}"'})
|
|
data = r.json()
|
|
if isinstance(data, list):
|
|
return data
|
|
return [data] if data else []
|
|
|
|
def get_user_factors(user_id):
|
|
url = f"{USERS_URL}/{user_id}/factors"
|
|
r = retry_get(url, params={"limit": 200})
|
|
data = r.json()
|
|
return data if isinstance(data, list) else []
|
|
|
|
def normalize_login(user_arg: str) -> str:
|
|
if "@" in user_arg:
|
|
return user_arg
|
|
return f"{user_arg}@gallaudet.edu"
|
|
|
|
# ---------------- Column set ----------------
|
|
PROFILE_COLUMNS_FROM_FIRST = [
|
|
"firstName","lastName","email","title","organization",
|
|
"wdEmployeeRole","wdJobProfile","status","employeeStatus","wdHireDate",
|
|
"wdTerminated","wdTerminationDate","wdHasAcademicAppointment","wdFutureHire",
|
|
"InstructorKeepActiveTo","wdTerminatedWorkerKeepActiveTo","department",
|
|
"wdJobFamilyGroup","wdEmployeeType","userRole","wdIsWorkerActiveStudent",
|
|
"created","activated","statusChanged","lastLogin","lastUpdated",
|
|
"passwordChanged","ADpwdLastSet","displayName","login",
|
|
"usernameWithoutDomain","secondEmail"
|
|
]
|
|
|
|
ALL_COLUMNS = PROFILE_COLUMNS_FROM_FIRST + [
|
|
"status_upper","is_locked_out",
|
|
"created_utc","activated_utc","lastUpdated_utc","statusChanged_utc",
|
|
"deprovisioned_date","lastLockedOut",
|
|
"division","manager","costCenter","userType",
|
|
"ADpwdLastSet_raw","ADpwdLastSet_utc","ADpwdLastSet_days_since",
|
|
"password_last_set_utc","days_since_password_last_set",
|
|
"maxAgeDays_hardcoded","estimated_expire_utc","days_until_expiry",
|
|
"last_successful_signin_utc","days_since_last_successful_signin",
|
|
"passwordExpired","id",
|
|
"mfa_enrolled","mfa_factors"
|
|
]
|
|
|
|
# ---------------- Row builder ----------------
|
|
def build_row(user, now_utc, include_mfa: bool):
|
|
profile = user.get("profile") or {}
|
|
status = (user.get("status") or "")
|
|
status_u = status.upper()
|
|
|
|
row = {}
|
|
for k in PROFILE_COLUMNS_FROM_FIRST:
|
|
if k in ("status","created","activated","statusChanged","lastLogin","lastUpdated","passwordChanged"):
|
|
row[k] = user.get(k, "")
|
|
elif k == "usernameWithoutDomain":
|
|
login = profile.get("login","")
|
|
row[k] = login.split("@")[0] if "@" in login else login
|
|
else:
|
|
row[k] = profile.get(k, "")
|
|
|
|
row["id"] = user.get("id","")
|
|
row["status_upper"] = status_u
|
|
|
|
created_dt = iso_to_dt(user.get("created"))
|
|
activated_dt = iso_to_dt(user.get("activated"))
|
|
last_updated_dt = iso_to_dt(user.get("lastUpdated"))
|
|
status_changed_dt = iso_to_dt(user.get("statusChanged"))
|
|
last_login_dt = iso_to_dt(user.get("lastLogin"))
|
|
|
|
row["created_utc"] = fmt_utc(created_dt)
|
|
row["activated_utc"] = fmt_utc(activated_dt)
|
|
row["lastUpdated_utc"] = fmt_utc(last_updated_dt)
|
|
row["statusChanged_utc"] = fmt_utc(status_changed_dt)
|
|
|
|
row["is_locked_out"] = "Yes" if status_u == "LOCKED_OUT" else "No"
|
|
row["lastLockedOut"] = row["statusChanged_utc"] if status_u == "LOCKED_OUT" else ""
|
|
row["deprovisioned_date"] = row["statusChanged_utc"] if status_u == "DEPROVISIONED" else ""
|
|
|
|
row["division"] = profile.get("division","")
|
|
row["manager"] = profile.get("manager","")
|
|
row["costCenter"] = profile.get("costCenter","")
|
|
row["userType"] = profile.get("userType","")
|
|
|
|
ad_raw = profile.get("ADpwdLastSet", "")
|
|
ad_dt = parse_adpwdlastset(ad_raw)
|
|
row["ADpwdLastSet_raw"] = ad_raw if ad_raw is not None else ""
|
|
row["ADpwdLastSet"] = ad_raw if ad_raw is not None else ""
|
|
row["ADpwdLastSet_utc"] = fmt_utc(ad_dt)
|
|
row["ADpwdLastSet_days_since"] = days_between(ad_dt, now_utc)
|
|
|
|
last_set_dt = ad_dt
|
|
row["password_last_set_utc"] = fmt_utc(last_set_dt)
|
|
row["days_since_password_last_set"] = days_between(last_set_dt, now_utc)
|
|
|
|
if last_set_dt:
|
|
expire_dt = last_set_dt + timedelta(days=FIXED_MAX_AGE_DAYS)
|
|
row["maxAgeDays_hardcoded"] = FIXED_MAX_AGE_DAYS
|
|
row["estimated_expire_utc"] = fmt_utc(expire_dt)
|
|
row["days_until_expiry"] = days_between(now_utc, expire_dt)
|
|
else:
|
|
row["maxAgeDays_hardcoded"] = ""
|
|
row["estimated_expire_utc"] = ""
|
|
row["days_until_expiry"] = ""
|
|
|
|
row["last_successful_signin_utc"] = fmt_utc(last_login_dt)
|
|
row["days_since_last_successful_signin"] = days_between(last_login_dt, now_utc)
|
|
row["passwordExpired"] = "True" if derive_password_expired_flag(user) else "False"
|
|
|
|
if include_mfa:
|
|
mfa_factors = []
|
|
try:
|
|
factors = get_user_factors(row["id"])
|
|
for f in factors:
|
|
ftype = (f or {}).get("factorType") or (f or {}).get("provider")
|
|
if ftype:
|
|
mfa_factors.append(str(ftype).lower())
|
|
mfa_factors = sorted(set(mfa_factors))
|
|
except requests.HTTPError as e:
|
|
sys.stderr.write(f"Warning: factors fetch failed for {row['id']}: {e}\n")
|
|
row["mfa_enrolled"] = "Yes" if mfa_factors else "No"
|
|
row["mfa_factors"] = ",".join(mfa_factors)
|
|
else:
|
|
row["mfa_enrolled"] = ""
|
|
row["mfa_factors"] = ""
|
|
|
|
return row
|
|
|
|
# ---------------- Main ----------------
|
|
def main():
|
|
parser = argparse.ArgumentParser(
|
|
description="Merged Okta users export (profiles + lifecycle/login + ADpwdLastSet-based expiry + MFA)."
|
|
)
|
|
mg = parser.add_mutually_exclusive_group(required=True)
|
|
mg.add_argument("--user", help="Username (e.g., 'jared.evans') or full email")
|
|
mg.add_argument("--all", action="store_true", help="Export all users")
|
|
|
|
parser.add_argument("--only-active", action="store_true",
|
|
help="With --all, include only ACTIVE users")
|
|
parser.add_argument("--skip-mfa", action="store_true",
|
|
help="Skip calling /factors for each user (faster for large exports)")
|
|
parser.add_argument("--out", default="okta_users_merged.csv",
|
|
help="Output CSV path (default: okta_users_merged.csv)")
|
|
args = parser.parse_args()
|
|
|
|
require_token()
|
|
|
|
if args.user:
|
|
login = normalize_login(args.user)
|
|
print(f"Fetching user {login}...")
|
|
users = get_user_by_login(login)
|
|
if not users:
|
|
print(f"No user found with login: {login}")
|
|
sys.exit(1)
|
|
else:
|
|
print(f"Fetching users (only_active={args.only_active})...")
|
|
users = get_all_users(only_active=args.only_active)
|
|
print(f"Found {len(users)} users.")
|
|
|
|
now_utc = datetime.now(timezone.utc)
|
|
rows = []
|
|
for idx, u in enumerate(users, start=1):
|
|
rows.append(build_row(u, now_utc, include_mfa=(not args.skip_mfa)))
|
|
if idx % 200 == 0 and not args.user:
|
|
print(f"Processed {idx}/{len(users)} users...")
|
|
|
|
with open(args.out, "w", newline="", encoding="utf-8") as f:
|
|
w = csv.DictWriter(f, fieldnames=ALL_COLUMNS)
|
|
w.writeheader()
|
|
for r in rows:
|
|
w.writerow(r)
|
|
|
|
print(f"Done. Wrote {args.out}")
|
|
|
|
if __name__ == "__main__":
|
|
main()
|
|
|