160 lines
5.5 KiB
Python
160 lines
5.5 KiB
Python
#!/home/jevans/audit_reports/okta_system_logs/.venv/bin/python
|
|
import os
|
|
import re
|
|
import sys
|
|
import csv
|
|
import requests
|
|
|
|
# ---------------- .env loading (KEY=VALUE; quotes supported) ----------------
|
|
_ENV_LINE_RE = re.compile(r'^\s*([A-Za-z_][A-Za-z0-9_]*)\s*=\s*(.*)\s*$')
|
|
|
|
def _strip_quotes(val: str) -> str:
|
|
val = val.strip()
|
|
if len(val) >= 2 and (val[0] == val[-1]) and val[0] in ("'", '"'):
|
|
return val[1:-1]
|
|
return val
|
|
|
|
def load_env():
|
|
"""Load KEY=VALUE pairs from .env in script dir or cwd."""
|
|
script_dir = os.path.dirname(os.path.abspath(__file__))
|
|
candidates = [os.path.join(script_dir, ".env"), os.path.join(os.getcwd(), ".env")]
|
|
for path in candidates:
|
|
if os.path.exists(path):
|
|
with open(path, "r", encoding="utf-8") as f:
|
|
for raw in f:
|
|
line = raw.strip()
|
|
if not line or line.startswith("#"):
|
|
continue
|
|
m = _ENV_LINE_RE.match(line)
|
|
if not m:
|
|
continue
|
|
key, val = m.group(1), _strip_quotes(m.group(2))
|
|
if key and key not in os.environ:
|
|
os.environ[key] = val
|
|
|
|
load_env()
|
|
|
|
# ---------------- Config ----------------
|
|
OKTA_DOMAIN = os.getenv("OKTA_DOMAIN", "gallaudet.okta.com")
|
|
API_TOKEN = os.getenv("OKTA_API_TOKEN")
|
|
|
|
if not API_TOKEN:
|
|
sys.stderr.write("ERROR: Missing OKTA_API_TOKEN in .env\n")
|
|
sys.exit(1)
|
|
|
|
BASE_URL = f"https://{OKTA_DOMAIN}"
|
|
USERS_URL = f"{BASE_URL}/api/v1/users"
|
|
|
|
HEADERS = {
|
|
"Authorization": f"SSWS {API_TOKEN}",
|
|
"Accept": "application/json",
|
|
}
|
|
|
|
CSV_FILENAME = "okta_deprovisioned_users.csv"
|
|
|
|
COLUMNS = [
|
|
"firstName","lastName","email","title","organization",
|
|
"wdEmployeeRole","wdJobProfile","status","employeeStatus","wdHireDate",
|
|
"wdTerminated","wdTerminationDate","wdHasAcademicAppointment","wdFutureHire",
|
|
"InstructorKeepActiveTo","wdTerminatedWorkerKeepActiveTo","department",
|
|
"wdJobFamilyGroup","wdEmployeeType","userRole","wdIsWorkerActiveStudent",
|
|
"created","activated","statusChanged","lastLogin","lastUpdated",
|
|
"passwordChanged","ADpwdLastSet","displayName","login","secondEmail"
|
|
]
|
|
|
|
# ---------------- Helpers ----------------
|
|
def get_all_users():
|
|
"""
|
|
Return users whose status is one of STAGED, DEPROVISIONED, PROVISIONED, or RECOVERY.
|
|
Handles Okta Link header pagination.
|
|
"""
|
|
users = []
|
|
url = USERS_URL
|
|
params = {
|
|
"limit": 200,
|
|
'filter': 'status eq "STAGED" or status eq "DEPROVISIONED" or status eq "PROVISIONED" or status eq "RECOVERY"',
|
|
}
|
|
while url:
|
|
if url == USERS_URL:
|
|
resp = requests.get(url, headers=HEADERS, params=params, timeout=15)
|
|
else:
|
|
resp = requests.get(url, headers=HEADERS, timeout=15)
|
|
if resp.status_code != 200:
|
|
print(f"Failed to fetch users: {resp.status_code}")
|
|
print(resp.text)
|
|
break
|
|
|
|
data = resp.json()
|
|
if isinstance(data, list):
|
|
users.extend(data)
|
|
|
|
# Parse pagination link header
|
|
next_link = None
|
|
link_hdr = resp.headers.get("link") or resp.headers.get("Link")
|
|
if link_hdr:
|
|
parts = [p.strip() for p in link_hdr.split(",")]
|
|
for part in parts:
|
|
if 'rel="next"' in part:
|
|
# format: <URL>; rel="next"
|
|
start = part.find("<") + 1
|
|
end = part.find(">")
|
|
if start > 0 and end > start:
|
|
next_link = part[start:end]
|
|
break
|
|
url = next_link
|
|
|
|
return users
|
|
|
|
def get_user_by_login(user_login: str):
|
|
"""
|
|
Return a list with the matching user (or empty list if not found).
|
|
"""
|
|
params = {"filter": f'profile.login eq "{user_login}"'}
|
|
resp = requests.get(USERS_URL, headers=HEADERS, params=params, timeout=15)
|
|
if resp.status_code != 200:
|
|
print(f"Failed to fetch user: {resp.status_code}")
|
|
print(resp.text)
|
|
return []
|
|
users = resp.json()
|
|
if users:
|
|
return users
|
|
print(f"No user found with login: {user_login}")
|
|
return []
|
|
|
|
def format_user(user: dict) -> dict:
|
|
"""
|
|
Flatten user object into the CSV field set.
|
|
Pull top-level lifecycle fields from user,
|
|
everything else from profile.
|
|
"""
|
|
row = {}
|
|
profile = user.get("profile", {}) or {}
|
|
for field in COLUMNS:
|
|
if field in {"status","created","activated","statusChanged","lastLogin","lastUpdated","passwordChanged"}:
|
|
row[field] = user.get(field, "")
|
|
else:
|
|
row[field] = profile.get(field, "")
|
|
return row
|
|
|
|
def save_to_csv(users: list, filename: str):
|
|
with open(filename, "w", newline="", encoding="utf-8") as csvfile:
|
|
writer = csv.DictWriter(csvfile, fieldnames=COLUMNS, extrasaction="ignore")
|
|
writer.writeheader()
|
|
for user in users:
|
|
row = format_user(user)
|
|
# Only write users with a non-empty "title" (as in your original script)
|
|
if row.get("title") and str(row.get("title")).strip():
|
|
writer.writerow(row)
|
|
print(f"User data saved to {filename}")
|
|
|
|
# ---------------- Main ----------------
|
|
if __name__ == "__main__":
|
|
if len(sys.argv) > 1 and sys.argv[1].endswith("@gallaudet.edu"):
|
|
user_login = sys.argv[1]
|
|
users = get_user_by_login(user_login)
|
|
out_file = f"okta_user_{user_login.replace('@','_at_').replace('.','_')}.csv"
|
|
save_to_csv(users, out_file)
|
|
else:
|
|
users = get_all_users()
|
|
save_to_csv(users, CSV_FILENAME)
|