447 lines
16 KiB
Python
447 lines
16 KiB
Python
#!/usr/bin/env python3
|
|
# -*- coding: utf-8 -*-
|
|
|
|
import json
|
|
import os
|
|
import sys
|
|
from collections import defaultdict
|
|
|
|
import pandas as pd
|
|
import re
|
|
import requests
|
|
import spacy
|
|
from openai import OpenAI
|
|
from tqdm import tqdm
|
|
|
|
# =========================
|
|
# Configuration
|
|
# =========================
|
|
INPUT_XLSX = "start.xlsx"
|
|
INPUT_CSV = "start.csv"
|
|
OUTPUT_CSV = "output.csv"
|
|
EVENT_LOG = "event_logs.txt"
|
|
EVENT_LOG_MD = "event_logs.md"
|
|
|
|
# LM Studio configuration
|
|
DEFAULT_LM_IP = "100.113.108.121"
|
|
LLM_MODEL = "openai/gpt-oss-20b"
|
|
LLM_API_KEY = "not-needed"
|
|
|
|
# Regex helpers (from process.py)
|
|
DELIM_SPLIT = re.compile(r"\s*[\/|\-–—]\s*")
|
|
KEEP_CHARS = re.compile(r"[^A-Za-zÀ-ÖØ-öø-ÿ' .\-]")
|
|
|
|
|
|
# =========================
|
|
# LM Studio reachability
|
|
# =========================
|
|
def check_lmstudio(ip: str) -> str:
|
|
"""
|
|
Ensure LM Studio endpoint is reachable; if not, prompt for IP until it is.
|
|
Returns the validated base URL like "http://<ip>:1234/v1".
|
|
"""
|
|
def _ok(url: str) -> bool:
|
|
try:
|
|
r = requests.get(url.rstrip("/") + "/models", timeout=5)
|
|
return r.status_code == 200
|
|
except Exception:
|
|
return False
|
|
|
|
base_url = f"http://{ip}:1234/v1"
|
|
if _ok(base_url):
|
|
print(f"LM Studio reachable at {base_url}")
|
|
return base_url
|
|
|
|
print(f"Could not reach LM Studio at {base_url}")
|
|
while True:
|
|
new_ip = input("Enter LM Studio IP address (e.g. 192.168.1.221): ").strip()
|
|
if not new_ip:
|
|
print("Aborted: No IP provided.")
|
|
sys.exit(1)
|
|
base_url = f"http://{new_ip}:1234/v1"
|
|
print(f"Retesting {base_url}...")
|
|
if _ok(base_url):
|
|
print(f"LM Studio reachable at {base_url}")
|
|
return base_url
|
|
else:
|
|
print("Still unreachable. Try again or Ctrl+C to exit.")
|
|
|
|
# =========================
|
|
# spaCy model (Transformer)
|
|
# =========================
|
|
print("Loading spaCy transformer model: en_core_web_trf")
|
|
nlp = spacy.load(
|
|
"en_core_web_trf",
|
|
exclude=["parser", "tagger", "attribute_ruler", "lemmatizer", "morphologizer"],
|
|
)
|
|
print("spaCy model loaded successfully.")
|
|
|
|
|
|
def clean_person(text: str) -> str:
|
|
"""Clean extracted name by removing job codes/fragments after dashes/slashes; keep name-ish chars."""
|
|
if not text:
|
|
return ""
|
|
first = DELIM_SPLIT.split(text, maxsplit=1)[0]
|
|
first = KEEP_CHARS.sub("", first).strip()
|
|
return re.sub(r"\s{2,}", " ", first)
|
|
|
|
|
|
def extract_names(text: str) -> str:
|
|
"""Extract distinct PERSON names using spaCy Transformer model."""
|
|
if not isinstance(text, str) or not text.strip():
|
|
return ""
|
|
doc = nlp(text)
|
|
names, seen = [], set()
|
|
for ent in doc.ents:
|
|
if ent.label_ == "PERSON":
|
|
cleaned = clean_person(ent.text)
|
|
key = cleaned.lower()
|
|
if cleaned and key not in seen:
|
|
seen.add(key)
|
|
names.append(cleaned)
|
|
return ", ".join(names)
|
|
|
|
|
|
def extract_name_from_by_user(text):
|
|
"""Extract name from 'By User' column.
|
|
Example: '1000054 / Dennis Cregan' -> 'Dennis Cregan'
|
|
"""
|
|
if pd.isna(text):
|
|
return None
|
|
text = str(text).strip()
|
|
|
|
# Pattern: ID / Name
|
|
if '/' in text:
|
|
name = text.split('/', 1)[1].strip()
|
|
else:
|
|
name = text
|
|
|
|
# Take only first line
|
|
name = name.split('\n')[0].strip()
|
|
return name if name else None
|
|
|
|
|
|
def assign_event_ids(df):
|
|
"""Assign Event-ID to rows grouped by 'By User' where timestamps are within 2 seconds."""
|
|
df = df.copy()
|
|
df['Event-ID'] = None
|
|
|
|
event_counter = 1
|
|
|
|
# Group by 'By User'
|
|
for by_user, group in df.groupby('By User'):
|
|
# Sort by 'Entered On' within the group
|
|
group = group.sort_values('Entered On')
|
|
indices = group.index.tolist()
|
|
|
|
if len(indices) == 0:
|
|
continue
|
|
|
|
# Assign first row an event ID
|
|
current_event_id = event_counter
|
|
df.loc[indices[0], 'Event-ID'] = f"EVT-{current_event_id:04d}"
|
|
last_timestamp = df.loc[indices[0], 'Entered On']
|
|
|
|
for idx in indices[1:]:
|
|
current_timestamp = df.loc[idx, 'Entered On']
|
|
|
|
# Check if within 2 seconds of the last timestamp in the same event
|
|
if pd.notna(current_timestamp) and pd.notna(last_timestamp):
|
|
time_diff = abs((current_timestamp - last_timestamp).total_seconds())
|
|
if time_diff <= 2:
|
|
# Same event
|
|
df.loc[idx, 'Event-ID'] = f"EVT-{current_event_id:04d}"
|
|
else:
|
|
# New event
|
|
event_counter += 1
|
|
current_event_id = event_counter
|
|
df.loc[idx, 'Event-ID'] = f"EVT-{current_event_id:04d}"
|
|
else:
|
|
# New event if timestamp is missing
|
|
event_counter += 1
|
|
current_event_id = event_counter
|
|
df.loc[idx, 'Event-ID'] = f"EVT-{current_event_id:04d}"
|
|
|
|
last_timestamp = current_timestamp
|
|
|
|
event_counter += 1
|
|
|
|
return df
|
|
|
|
|
|
def process_column_with_ner(series: pd.Series, desc: str) -> pd.Series:
|
|
"""Process a column with spaCy NER and show progress."""
|
|
values = series.fillna("").astype(str).tolist()
|
|
out = []
|
|
for text in tqdm(values, desc=f"NER: {desc}"):
|
|
names = extract_names(text)
|
|
out.append(names if names else None)
|
|
return pd.Series(out, index=series.index, dtype=object)
|
|
|
|
|
|
def insert_after(df: pd.DataFrame, after_col: str, new_col: str, values: pd.Series) -> None:
|
|
"""Insert new_col immediately after after_col (drop existing if present)."""
|
|
if new_col in df.columns:
|
|
df.drop(columns=[new_col], inplace=True)
|
|
idx = df.columns.get_loc(after_col) + 1
|
|
df.insert(idx, new_col, values)
|
|
|
|
|
|
def create_event_payload(df: pd.DataFrame) -> str:
|
|
"""Create compact JSON payload for a grouped event with all columns."""
|
|
payload = {}
|
|
|
|
# Columns to exclude from the payload
|
|
exclude_cols = {"Effective Date"}
|
|
|
|
# Include all columns dynamically
|
|
for col in df.columns:
|
|
if col in exclude_cols:
|
|
continue
|
|
# Get unique non-null values for each column
|
|
values = df[col].dropna().unique().tolist()
|
|
# Convert to strings and sort for consistency
|
|
values = sorted([str(v) for v in values])
|
|
# Use snake_case key names
|
|
key = col.lower().replace(' ', '_').replace('-', '_')
|
|
payload[key] = values
|
|
|
|
# Add row count
|
|
payload["row_count"] = int(len(df))
|
|
|
|
return json.dumps(payload, ensure_ascii=False, indent=2)
|
|
|
|
|
|
def generate_event_summaries(df: pd.DataFrame, client: OpenAI) -> None:
|
|
"""Generate event summaries using LM Studio and save to event_log.txt."""
|
|
if df.empty:
|
|
print("No rows to summarize.")
|
|
return
|
|
|
|
# Group by Event-ID
|
|
grouped = df.groupby("Event-ID", dropna=False)
|
|
summaries = [] # list of tuples (by_admin, event_id, entered_on, sentence)
|
|
|
|
print(f"Generating summaries for {len(grouped)} events...")
|
|
for event_id, gdf in tqdm(grouped, desc="LLM Summaries"):
|
|
if not event_id or str(event_id).strip() == "":
|
|
continue
|
|
|
|
# Get key values
|
|
by_admin = gdf["By Admin"].dropna().unique().tolist()
|
|
by_admin_str = by_admin[0] if by_admin else "Unknown"
|
|
|
|
entered_on = gdf["Entered On"].dropna().unique().tolist()
|
|
entered_on_str = str(entered_on[0])[:10] if entered_on else ""
|
|
|
|
payload = create_event_payload(gdf)
|
|
|
|
prompt = (
|
|
"You are a compliance and information security analyst reviewing Workday admin audit logs. "
|
|
"Analyze the following audit data and describe what happened in plain English. "
|
|
"Use your best judgment to interpret the data and explain the action that was taken. "
|
|
"Be specific about who did what, to whom, and when. "
|
|
"If the 'changed_relationship' value starts with 'Custom Identifier', pay special attention to the value seen for 'added'."
|
|
"No JSON, no extra commentary.\n\n"
|
|
f"Audit Data:\n{payload}"
|
|
)
|
|
|
|
try:
|
|
resp = client.chat.completions.create(
|
|
model=LLM_MODEL,
|
|
messages=[
|
|
{"role": "system", "content": "You are an expert at interpreting Workday audit logs and explaining what actions were taken in plain English."},
|
|
{"role": "user", "content": prompt},
|
|
],
|
|
temperature=0.2,
|
|
)
|
|
one_liner = (resp.choices[0].message.content or "").strip()
|
|
except Exception as e:
|
|
one_liner = f"[LLM ERROR] {e}"
|
|
|
|
summaries.append((by_admin_str, event_id, entered_on_str, one_liner))
|
|
|
|
# Group summaries by By Admin, sort by date
|
|
grouped_summaries: dict[str, list[tuple[str, str, str]]] = defaultdict(list)
|
|
for by_admin, event_id, entered_on, line in summaries:
|
|
grouped_summaries[by_admin].append((entered_on, event_id, line))
|
|
|
|
for admin in grouped_summaries:
|
|
grouped_summaries[admin].sort(key=lambda x: x[1] or "") # Sort by event_id ascending
|
|
|
|
# Write to event_logs.txt
|
|
with open(EVENT_LOG, "w", encoding="utf-8") as f:
|
|
for admin in sorted(grouped_summaries.keys()):
|
|
f.write(f"=== {admin} ===\n\n")
|
|
for entered_on, event_id, line in grouped_summaries[admin]:
|
|
# Keep all details on a single line (replace newlines with spaces)
|
|
single_line = line.replace("\n", " ").replace("\r", " ")
|
|
f.write(f"[{event_id}] - {single_line}\n\n")
|
|
|
|
total_events = sum(len(v) for v in grouped_summaries.values())
|
|
print(f"Wrote {total_events} event summaries to {EVENT_LOG}")
|
|
|
|
# Write markdown version to event_logs.md
|
|
with open(EVENT_LOG_MD, "w", encoding="utf-8") as f:
|
|
f.write("# Security Admin Audit Event Log\n\n")
|
|
for admin in sorted(grouped_summaries.keys()):
|
|
f.write(f"## {admin}\n\n")
|
|
for entered_on, event_id, line in grouped_summaries[admin]:
|
|
# Keep all details on a single line
|
|
single_line = line.replace("\n", " ").replace("\r", " ")
|
|
f.write(f"**[{event_id}]** - {single_line}\n\n")
|
|
print(f"Wrote {total_events} event summaries to {EVENT_LOG_MD}")
|
|
|
|
|
|
def main():
|
|
# Check LM Studio connectivity
|
|
print("Checking LM Studio connectivity...")
|
|
llm_base_url = check_lmstudio(DEFAULT_LM_IP)
|
|
client = OpenAI(base_url=llm_base_url, api_key=LLM_API_KEY)
|
|
|
|
# Check if CSV files already exist - skip to event log generation
|
|
if os.path.exists(INPUT_CSV) and os.path.exists(OUTPUT_CSV) and os.path.exists("reject.csv"):
|
|
print(f"Skipping CSV processing - {INPUT_CSV}, {OUTPUT_CSV}, and reject.csv already exist.")
|
|
df = pd.read_csv(OUTPUT_CSV)
|
|
# Parse Entered On as datetime for proper handling
|
|
if 'Entered On' in df.columns:
|
|
df['Entered On'] = pd.to_datetime(df['Entered On'], format='mixed', errors='coerce')
|
|
print(f"Loaded {len(df)} rows from {OUTPUT_CSV}")
|
|
|
|
# Generate event summaries using LM Studio
|
|
print("\nGenerating event summaries with LM Studio...")
|
|
generate_event_summaries(df, client)
|
|
return
|
|
|
|
# Collect rejected rows
|
|
rejected_rows = []
|
|
|
|
# Step 1: Convert xlsx to csv
|
|
print(f"Reading {INPUT_XLSX}...")
|
|
df = pd.read_excel(INPUT_XLSX)
|
|
df.to_csv(INPUT_CSV, index=False)
|
|
print(f"Converted to {INPUT_CSV}")
|
|
|
|
# Step 2: Sort by 'By User' ascending, 'Entered On' ascending
|
|
print("Sorting by 'By User' and 'Entered On'...")
|
|
df = df.sort_values(by=['By User', 'Entered On'], ascending=[True, True])
|
|
|
|
# Step 3: Pre-filter rows before NER processing
|
|
# Remove rows where 'Instance that Changed' is 'Automatic Complete'
|
|
print("Pre-filtering: Removing rows where 'Instance that Changed' is 'Automatic Complete'...")
|
|
mask = df['Instance that Changed'].fillna('').str.strip().str.lower().eq('automatic complete')
|
|
removed_count = mask.sum()
|
|
df = df[~mask]
|
|
print(f"Removed {removed_count} rows")
|
|
|
|
# Remove rows where 'Added' text is found in 'By User'
|
|
print("Pre-filtering: Removing rows where 'Added' value is found in 'By User'...")
|
|
added_vals = df['Added'].fillna('').astype(str).str.strip().str.lower()
|
|
byuser_vals = df['By User'].fillna('').astype(str).str.strip().str.lower()
|
|
mask = pd.Series(
|
|
[(a != "") and (u != "") and (a in u) for a, u in zip(added_vals, byuser_vals)],
|
|
index=df.index,
|
|
dtype="bool",
|
|
)
|
|
rejected = df[mask].copy()
|
|
rejected['Reject Reason'] = "Added value found in By User"
|
|
rejected_rows.append(rejected)
|
|
df = df[~mask].reset_index(drop=True)
|
|
print(f"Removed {len(rejected)} rows")
|
|
|
|
# Remove rows where 'Instance that Changed' contains both 'Configures:' and 'Notification'
|
|
print("Pre-filtering: Removing rows where 'Instance that Changed' contains 'Configures:' and 'Notification'...")
|
|
instance_col = df['Instance that Changed'].fillna('').astype(str).str.lower()
|
|
mask = instance_col.str.contains('configures:') & instance_col.str.contains('notification')
|
|
removed_count = mask.sum()
|
|
df = df[~mask].reset_index(drop=True)
|
|
print(f"Removed {removed_count} rows")
|
|
|
|
print(f"Rows to process with NER: {len(df)}")
|
|
|
|
# Step 4: Extract names into new columns using spaCy NER
|
|
# Insert columns in correct positions
|
|
print("Extracting names with spaCy NER...")
|
|
|
|
# "Applied to" after "Instance that Changed"
|
|
applied_to = process_column_with_ner(df['Instance that Changed'], "Instance that Changed")
|
|
insert_after(df, 'Instance that Changed', 'Applied to', applied_to)
|
|
|
|
# "By Admin" after "By User"
|
|
by_admin = df['By User'].apply(extract_name_from_by_user)
|
|
insert_after(df, 'By User', 'By Admin', by_admin)
|
|
|
|
# "Added to" after "Added"
|
|
added_to = process_column_with_ner(df['Added'], "Added")
|
|
insert_after(df, 'Added', 'Added to', added_to)
|
|
|
|
# Step 5: Clean up names - remove 's and remove "By Admin" name from "Applied to" and "Added to"
|
|
print("Cleaning names: removing 's and By Admin matches...")
|
|
|
|
def clean_name_column(value, by_admin):
|
|
"""Remove 's from names and remove any name matching By Admin."""
|
|
if pd.isna(value) or not value:
|
|
return value
|
|
|
|
# Split by comma if multiple names
|
|
names = [n.strip() for n in str(value).split(',')]
|
|
cleaned = []
|
|
|
|
by_admin_lower = str(by_admin).strip().lower() if pd.notna(by_admin) else ""
|
|
|
|
for name in names:
|
|
# Remove 's from the name
|
|
if name.endswith("'s"):
|
|
name = name[:-2]
|
|
|
|
# Skip if name matches By Admin (case-insensitive)
|
|
if name.strip().lower() == by_admin_lower:
|
|
continue
|
|
|
|
if name.strip():
|
|
cleaned.append(name.strip())
|
|
|
|
return ', '.join(cleaned) if cleaned else None
|
|
|
|
df['Applied to'] = df.apply(lambda row: clean_name_column(row['Applied to'], row['By Admin']), axis=1)
|
|
df['Added to'] = df.apply(lambda row: clean_name_column(row['Added to'], row['By Admin']), axis=1)
|
|
|
|
# Step 6: Remove rows where 'Added to' or 'Applied to' is 'Automatic Complete' (case-insensitive)
|
|
print("Removing rows where 'Added to' is 'Automatic Complete'...")
|
|
mask = df['Added to'].fillna('').str.strip().str.lower().eq('automatic complete')
|
|
removed_count = mask.sum()
|
|
df = df[~mask]
|
|
print(f"Removed {removed_count} rows")
|
|
|
|
print("Removing rows where 'Applied to' is 'Automatic Complete'...")
|
|
mask = df['Applied to'].fillna('').str.strip().str.lower().eq('automatic complete')
|
|
removed_count = mask.sum()
|
|
df = df[~mask]
|
|
print(f"Removed {removed_count} rows")
|
|
|
|
# Step 7: Assign Event-IDs
|
|
print("Assigning Event-IDs...")
|
|
df = assign_event_ids(df)
|
|
|
|
# Reset index after filtering
|
|
df = df.reset_index(drop=True)
|
|
|
|
# Save output
|
|
df.to_csv(OUTPUT_CSV, index=False)
|
|
print(f"Saved to {OUTPUT_CSV}")
|
|
print(f"Total rows: {len(df)}")
|
|
|
|
# Save rejected rows to reject.csv
|
|
if rejected_rows:
|
|
all_rejected = pd.concat(rejected_rows, ignore_index=True)
|
|
all_rejected.to_csv("reject.csv", index=False)
|
|
print(f"Saved {len(all_rejected)} rejected rows to reject.csv")
|
|
|
|
# Step 8: Generate event summaries using LM Studio
|
|
print("\nGenerating event summaries with LM Studio...")
|
|
generate_event_summaries(df, client)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|