Files
pre-repos/okta_search_logs/deactivated_users.py
2026-01-26 16:49:09 -05:00

160 lines
5.5 KiB
Python

#!/home/jevans/audit_reports/okta_system_logs/.venv/bin/python
import os
import re
import sys
import csv
import requests
# ---------------- .env loading (KEY=VALUE; quotes supported) ----------------
_ENV_LINE_RE = re.compile(r'^\s*([A-Za-z_][A-Za-z0-9_]*)\s*=\s*(.*)\s*$')
def _strip_quotes(val: str) -> str:
val = val.strip()
if len(val) >= 2 and (val[0] == val[-1]) and val[0] in ("'", '"'):
return val[1:-1]
return val
def load_env():
"""Load KEY=VALUE pairs from .env in script dir or cwd."""
script_dir = os.path.dirname(os.path.abspath(__file__))
candidates = [os.path.join(script_dir, ".env"), os.path.join(os.getcwd(), ".env")]
for path in candidates:
if os.path.exists(path):
with open(path, "r", encoding="utf-8") as f:
for raw in f:
line = raw.strip()
if not line or line.startswith("#"):
continue
m = _ENV_LINE_RE.match(line)
if not m:
continue
key, val = m.group(1), _strip_quotes(m.group(2))
if key and key not in os.environ:
os.environ[key] = val
load_env()
# ---------------- Config ----------------
OKTA_DOMAIN = os.getenv("OKTA_DOMAIN", "gallaudet.okta.com")
API_TOKEN = os.getenv("OKTA_API_TOKEN")
if not API_TOKEN:
sys.stderr.write("ERROR: Missing OKTA_API_TOKEN in .env\n")
sys.exit(1)
BASE_URL = f"https://{OKTA_DOMAIN}"
USERS_URL = f"{BASE_URL}/api/v1/users"
HEADERS = {
"Authorization": f"SSWS {API_TOKEN}",
"Accept": "application/json",
}
CSV_FILENAME = "okta_deprovisioned_users.csv"
COLUMNS = [
"firstName","lastName","email","title","organization",
"wdEmployeeRole","wdJobProfile","status","employeeStatus","wdHireDate",
"wdTerminated","wdTerminationDate","wdHasAcademicAppointment","wdFutureHire",
"InstructorKeepActiveTo","wdTerminatedWorkerKeepActiveTo","department",
"wdJobFamilyGroup","wdEmployeeType","userRole","wdIsWorkerActiveStudent",
"created","activated","statusChanged","lastLogin","lastUpdated",
"passwordChanged","ADpwdLastSet","displayName","login","secondEmail"
]
# ---------------- Helpers ----------------
def get_all_users():
"""
Return users whose status is one of STAGED, DEPROVISIONED, PROVISIONED, or RECOVERY.
Handles Okta Link header pagination.
"""
users = []
url = USERS_URL
params = {
"limit": 200,
'filter': 'status eq "STAGED" or status eq "DEPROVISIONED" or status eq "PROVISIONED" or status eq "RECOVERY"',
}
while url:
if url == USERS_URL:
resp = requests.get(url, headers=HEADERS, params=params, timeout=15)
else:
resp = requests.get(url, headers=HEADERS, timeout=15)
if resp.status_code != 200:
print(f"Failed to fetch users: {resp.status_code}")
print(resp.text)
break
data = resp.json()
if isinstance(data, list):
users.extend(data)
# Parse pagination link header
next_link = None
link_hdr = resp.headers.get("link") or resp.headers.get("Link")
if link_hdr:
parts = [p.strip() for p in link_hdr.split(",")]
for part in parts:
if 'rel="next"' in part:
# format: <URL>; rel="next"
start = part.find("<") + 1
end = part.find(">")
if start > 0 and end > start:
next_link = part[start:end]
break
url = next_link
return users
def get_user_by_login(user_login: str):
"""
Return a list with the matching user (or empty list if not found).
"""
params = {"filter": f'profile.login eq "{user_login}"'}
resp = requests.get(USERS_URL, headers=HEADERS, params=params, timeout=15)
if resp.status_code != 200:
print(f"Failed to fetch user: {resp.status_code}")
print(resp.text)
return []
users = resp.json()
if users:
return users
print(f"No user found with login: {user_login}")
return []
def format_user(user: dict) -> dict:
"""
Flatten user object into the CSV field set.
Pull top-level lifecycle fields from user,
everything else from profile.
"""
row = {}
profile = user.get("profile", {}) or {}
for field in COLUMNS:
if field in {"status","created","activated","statusChanged","lastLogin","lastUpdated","passwordChanged"}:
row[field] = user.get(field, "")
else:
row[field] = profile.get(field, "")
return row
def save_to_csv(users: list, filename: str):
with open(filename, "w", newline="", encoding="utf-8") as csvfile:
writer = csv.DictWriter(csvfile, fieldnames=COLUMNS, extrasaction="ignore")
writer.writeheader()
for user in users:
row = format_user(user)
# Only write users with a non-empty "title" (as in your original script)
if row.get("title") and str(row.get("title")).strip():
writer.writerow(row)
print(f"User data saved to {filename}")
# ---------------- Main ----------------
if __name__ == "__main__":
if len(sys.argv) > 1 and sys.argv[1].endswith("@gallaudet.edu"):
user_login = sys.argv[1]
users = get_user_by_login(user_login)
out_file = f"okta_user_{user_login.replace('@','_at_').replace('.','_')}.csv"
save_to_csv(users, out_file)
else:
users = get_all_users()
save_to_csv(users, CSV_FILENAME)