Files
2026-02-04 13:18:15 -05:00

447 lines
16 KiB
Python

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import json
import os
import sys
from collections import defaultdict
import pandas as pd
import re
import requests
import spacy
from openai import OpenAI
from tqdm import tqdm
# =========================
# Configuration
# =========================
INPUT_XLSX = "start.xlsx"
INPUT_CSV = "start.csv"
OUTPUT_CSV = "output.csv"
EVENT_LOG = "event_logs.txt"
EVENT_LOG_MD = "event_logs.md"
# LM Studio configuration
DEFAULT_LM_IP = "100.113.108.121"
LLM_MODEL = "openai/gpt-oss-20b"
LLM_API_KEY = "not-needed"
# Regex helpers (from process.py)
DELIM_SPLIT = re.compile(r"\s*[\/|\-–—]\s*")
KEEP_CHARS = re.compile(r"[^A-Za-zÀ-ÖØ-öø-ÿ' .\-]")
# =========================
# LM Studio reachability
# =========================
def check_lmstudio(ip: str) -> str:
"""
Ensure LM Studio endpoint is reachable; if not, prompt for IP until it is.
Returns the validated base URL like "http://<ip>:1234/v1".
"""
def _ok(url: str) -> bool:
try:
r = requests.get(url.rstrip("/") + "/models", timeout=5)
return r.status_code == 200
except Exception:
return False
base_url = f"http://{ip}:1234/v1"
if _ok(base_url):
print(f"LM Studio reachable at {base_url}")
return base_url
print(f"Could not reach LM Studio at {base_url}")
while True:
new_ip = input("Enter LM Studio IP address (e.g. 192.168.1.221): ").strip()
if not new_ip:
print("Aborted: No IP provided.")
sys.exit(1)
base_url = f"http://{new_ip}:1234/v1"
print(f"Retesting {base_url}...")
if _ok(base_url):
print(f"LM Studio reachable at {base_url}")
return base_url
else:
print("Still unreachable. Try again or Ctrl+C to exit.")
# =========================
# spaCy model (Transformer)
# =========================
print("Loading spaCy transformer model: en_core_web_trf")
nlp = spacy.load(
"en_core_web_trf",
exclude=["parser", "tagger", "attribute_ruler", "lemmatizer", "morphologizer"],
)
print("spaCy model loaded successfully.")
def clean_person(text: str) -> str:
"""Clean extracted name by removing job codes/fragments after dashes/slashes; keep name-ish chars."""
if not text:
return ""
first = DELIM_SPLIT.split(text, maxsplit=1)[0]
first = KEEP_CHARS.sub("", first).strip()
return re.sub(r"\s{2,}", " ", first)
def extract_names(text: str) -> str:
"""Extract distinct PERSON names using spaCy Transformer model."""
if not isinstance(text, str) or not text.strip():
return ""
doc = nlp(text)
names, seen = [], set()
for ent in doc.ents:
if ent.label_ == "PERSON":
cleaned = clean_person(ent.text)
key = cleaned.lower()
if cleaned and key not in seen:
seen.add(key)
names.append(cleaned)
return ", ".join(names)
def extract_name_from_by_user(text):
"""Extract name from 'By User' column.
Example: '1000054 / Dennis Cregan' -> 'Dennis Cregan'
"""
if pd.isna(text):
return None
text = str(text).strip()
# Pattern: ID / Name
if '/' in text:
name = text.split('/', 1)[1].strip()
else:
name = text
# Take only first line
name = name.split('\n')[0].strip()
return name if name else None
def assign_event_ids(df):
"""Assign Event-ID to rows grouped by 'By User' where timestamps are within 2 seconds."""
df = df.copy()
df['Event-ID'] = None
event_counter = 1
# Group by 'By User'
for by_user, group in df.groupby('By User'):
# Sort by 'Entered On' within the group
group = group.sort_values('Entered On')
indices = group.index.tolist()
if len(indices) == 0:
continue
# Assign first row an event ID
current_event_id = event_counter
df.loc[indices[0], 'Event-ID'] = f"EVT-{current_event_id:04d}"
last_timestamp = df.loc[indices[0], 'Entered On']
for idx in indices[1:]:
current_timestamp = df.loc[idx, 'Entered On']
# Check if within 2 seconds of the last timestamp in the same event
if pd.notna(current_timestamp) and pd.notna(last_timestamp):
time_diff = abs((current_timestamp - last_timestamp).total_seconds())
if time_diff <= 2:
# Same event
df.loc[idx, 'Event-ID'] = f"EVT-{current_event_id:04d}"
else:
# New event
event_counter += 1
current_event_id = event_counter
df.loc[idx, 'Event-ID'] = f"EVT-{current_event_id:04d}"
else:
# New event if timestamp is missing
event_counter += 1
current_event_id = event_counter
df.loc[idx, 'Event-ID'] = f"EVT-{current_event_id:04d}"
last_timestamp = current_timestamp
event_counter += 1
return df
def process_column_with_ner(series: pd.Series, desc: str) -> pd.Series:
"""Process a column with spaCy NER and show progress."""
values = series.fillna("").astype(str).tolist()
out = []
for text in tqdm(values, desc=f"NER: {desc}"):
names = extract_names(text)
out.append(names if names else None)
return pd.Series(out, index=series.index, dtype=object)
def insert_after(df: pd.DataFrame, after_col: str, new_col: str, values: pd.Series) -> None:
"""Insert new_col immediately after after_col (drop existing if present)."""
if new_col in df.columns:
df.drop(columns=[new_col], inplace=True)
idx = df.columns.get_loc(after_col) + 1
df.insert(idx, new_col, values)
def create_event_payload(df: pd.DataFrame) -> str:
"""Create compact JSON payload for a grouped event with all columns."""
payload = {}
# Columns to exclude from the payload
exclude_cols = {"Effective Date"}
# Include all columns dynamically
for col in df.columns:
if col in exclude_cols:
continue
# Get unique non-null values for each column
values = df[col].dropna().unique().tolist()
# Convert to strings and sort for consistency
values = sorted([str(v) for v in values])
# Use snake_case key names
key = col.lower().replace(' ', '_').replace('-', '_')
payload[key] = values
# Add row count
payload["row_count"] = int(len(df))
return json.dumps(payload, ensure_ascii=False, indent=2)
def generate_event_summaries(df: pd.DataFrame, client: OpenAI) -> None:
"""Generate event summaries using LM Studio and save to event_log.txt."""
if df.empty:
print("No rows to summarize.")
return
# Group by Event-ID
grouped = df.groupby("Event-ID", dropna=False)
summaries = [] # list of tuples (by_admin, event_id, entered_on, sentence)
print(f"Generating summaries for {len(grouped)} events...")
for event_id, gdf in tqdm(grouped, desc="LLM Summaries"):
if not event_id or str(event_id).strip() == "":
continue
# Get key values
by_admin = gdf["By Admin"].dropna().unique().tolist()
by_admin_str = by_admin[0] if by_admin else "Unknown"
entered_on = gdf["Entered On"].dropna().unique().tolist()
entered_on_str = str(entered_on[0])[:10] if entered_on else ""
payload = create_event_payload(gdf)
prompt = (
"You are a compliance and information security analyst reviewing Workday admin audit logs. "
"Analyze the following audit data and describe what happened in plain English. "
"Use your best judgment to interpret the data and explain the action that was taken. "
"Be specific about who did what, to whom, and when. "
"If the 'changed_relationship' value starts with 'Custom Identifier', pay special attention to the value seen for 'added'."
"No JSON, no extra commentary.\n\n"
f"Audit Data:\n{payload}"
)
try:
resp = client.chat.completions.create(
model=LLM_MODEL,
messages=[
{"role": "system", "content": "You are an expert at interpreting Workday audit logs and explaining what actions were taken in plain English."},
{"role": "user", "content": prompt},
],
temperature=0.2,
)
one_liner = (resp.choices[0].message.content or "").strip()
except Exception as e:
one_liner = f"[LLM ERROR] {e}"
summaries.append((by_admin_str, event_id, entered_on_str, one_liner))
# Group summaries by By Admin, sort by date
grouped_summaries: dict[str, list[tuple[str, str, str]]] = defaultdict(list)
for by_admin, event_id, entered_on, line in summaries:
grouped_summaries[by_admin].append((entered_on, event_id, line))
for admin in grouped_summaries:
grouped_summaries[admin].sort(key=lambda x: x[1] or "") # Sort by event_id ascending
# Write to event_logs.txt
with open(EVENT_LOG, "w", encoding="utf-8") as f:
for admin in sorted(grouped_summaries.keys()):
f.write(f"=== {admin} ===\n\n")
for entered_on, event_id, line in grouped_summaries[admin]:
# Keep all details on a single line (replace newlines with spaces)
single_line = line.replace("\n", " ").replace("\r", " ")
f.write(f"[{event_id}] - {single_line}\n\n")
total_events = sum(len(v) for v in grouped_summaries.values())
print(f"Wrote {total_events} event summaries to {EVENT_LOG}")
# Write markdown version to event_logs.md
with open(EVENT_LOG_MD, "w", encoding="utf-8") as f:
f.write("# Security Admin Audit Event Log\n\n")
for admin in sorted(grouped_summaries.keys()):
f.write(f"## {admin}\n\n")
for entered_on, event_id, line in grouped_summaries[admin]:
# Keep all details on a single line
single_line = line.replace("\n", " ").replace("\r", " ")
f.write(f"**[{event_id}]** - {single_line}\n\n")
print(f"Wrote {total_events} event summaries to {EVENT_LOG_MD}")
def main():
# Check LM Studio connectivity
print("Checking LM Studio connectivity...")
llm_base_url = check_lmstudio(DEFAULT_LM_IP)
client = OpenAI(base_url=llm_base_url, api_key=LLM_API_KEY)
# Check if CSV files already exist - skip to event log generation
if os.path.exists(INPUT_CSV) and os.path.exists(OUTPUT_CSV) and os.path.exists("reject.csv"):
print(f"Skipping CSV processing - {INPUT_CSV}, {OUTPUT_CSV}, and reject.csv already exist.")
df = pd.read_csv(OUTPUT_CSV)
# Parse Entered On as datetime for proper handling
if 'Entered On' in df.columns:
df['Entered On'] = pd.to_datetime(df['Entered On'], format='mixed', errors='coerce')
print(f"Loaded {len(df)} rows from {OUTPUT_CSV}")
# Generate event summaries using LM Studio
print("\nGenerating event summaries with LM Studio...")
generate_event_summaries(df, client)
return
# Collect rejected rows
rejected_rows = []
# Step 1: Convert xlsx to csv
print(f"Reading {INPUT_XLSX}...")
df = pd.read_excel(INPUT_XLSX)
df.to_csv(INPUT_CSV, index=False)
print(f"Converted to {INPUT_CSV}")
# Step 2: Sort by 'By User' ascending, 'Entered On' ascending
print("Sorting by 'By User' and 'Entered On'...")
df = df.sort_values(by=['By User', 'Entered On'], ascending=[True, True])
# Step 3: Pre-filter rows before NER processing
# Remove rows where 'Instance that Changed' is 'Automatic Complete'
print("Pre-filtering: Removing rows where 'Instance that Changed' is 'Automatic Complete'...")
mask = df['Instance that Changed'].fillna('').str.strip().str.lower().eq('automatic complete')
removed_count = mask.sum()
df = df[~mask]
print(f"Removed {removed_count} rows")
# Remove rows where 'Added' text is found in 'By User'
print("Pre-filtering: Removing rows where 'Added' value is found in 'By User'...")
added_vals = df['Added'].fillna('').astype(str).str.strip().str.lower()
byuser_vals = df['By User'].fillna('').astype(str).str.strip().str.lower()
mask = pd.Series(
[(a != "") and (u != "") and (a in u) for a, u in zip(added_vals, byuser_vals)],
index=df.index,
dtype="bool",
)
rejected = df[mask].copy()
rejected['Reject Reason'] = "Added value found in By User"
rejected_rows.append(rejected)
df = df[~mask].reset_index(drop=True)
print(f"Removed {len(rejected)} rows")
# Remove rows where 'Instance that Changed' contains both 'Configures:' and 'Notification'
print("Pre-filtering: Removing rows where 'Instance that Changed' contains 'Configures:' and 'Notification'...")
instance_col = df['Instance that Changed'].fillna('').astype(str).str.lower()
mask = instance_col.str.contains('configures:') & instance_col.str.contains('notification')
removed_count = mask.sum()
df = df[~mask].reset_index(drop=True)
print(f"Removed {removed_count} rows")
print(f"Rows to process with NER: {len(df)}")
# Step 4: Extract names into new columns using spaCy NER
# Insert columns in correct positions
print("Extracting names with spaCy NER...")
# "Applied to" after "Instance that Changed"
applied_to = process_column_with_ner(df['Instance that Changed'], "Instance that Changed")
insert_after(df, 'Instance that Changed', 'Applied to', applied_to)
# "By Admin" after "By User"
by_admin = df['By User'].apply(extract_name_from_by_user)
insert_after(df, 'By User', 'By Admin', by_admin)
# "Added to" after "Added"
added_to = process_column_with_ner(df['Added'], "Added")
insert_after(df, 'Added', 'Added to', added_to)
# Step 5: Clean up names - remove 's and remove "By Admin" name from "Applied to" and "Added to"
print("Cleaning names: removing 's and By Admin matches...")
def clean_name_column(value, by_admin):
"""Remove 's from names and remove any name matching By Admin."""
if pd.isna(value) or not value:
return value
# Split by comma if multiple names
names = [n.strip() for n in str(value).split(',')]
cleaned = []
by_admin_lower = str(by_admin).strip().lower() if pd.notna(by_admin) else ""
for name in names:
# Remove 's from the name
if name.endswith("'s"):
name = name[:-2]
# Skip if name matches By Admin (case-insensitive)
if name.strip().lower() == by_admin_lower:
continue
if name.strip():
cleaned.append(name.strip())
return ', '.join(cleaned) if cleaned else None
df['Applied to'] = df.apply(lambda row: clean_name_column(row['Applied to'], row['By Admin']), axis=1)
df['Added to'] = df.apply(lambda row: clean_name_column(row['Added to'], row['By Admin']), axis=1)
# Step 6: Remove rows where 'Added to' or 'Applied to' is 'Automatic Complete' (case-insensitive)
print("Removing rows where 'Added to' is 'Automatic Complete'...")
mask = df['Added to'].fillna('').str.strip().str.lower().eq('automatic complete')
removed_count = mask.sum()
df = df[~mask]
print(f"Removed {removed_count} rows")
print("Removing rows where 'Applied to' is 'Automatic Complete'...")
mask = df['Applied to'].fillna('').str.strip().str.lower().eq('automatic complete')
removed_count = mask.sum()
df = df[~mask]
print(f"Removed {removed_count} rows")
# Step 7: Assign Event-IDs
print("Assigning Event-IDs...")
df = assign_event_ids(df)
# Reset index after filtering
df = df.reset_index(drop=True)
# Save output
df.to_csv(OUTPUT_CSV, index=False)
print(f"Saved to {OUTPUT_CSV}")
print(f"Total rows: {len(df)}")
# Save rejected rows to reject.csv
if rejected_rows:
all_rejected = pd.concat(rejected_rows, ignore_index=True)
all_rejected.to_csv("reject.csv", index=False)
print(f"Saved {len(all_rejected)} rejected rows to reject.csv")
# Step 8: Generate event summaries using LM Studio
print("\nGenerating event summaries with LM Studio...")
generate_event_summaries(df, client)
if __name__ == "__main__":
main()