376 lines
15 KiB
Python
376 lines
15 KiB
Python
#!/usr/bin/env python3
|
||
# -*- coding: utf-8 -*-
|
||
|
||
import os
|
||
import sys
|
||
import re
|
||
import json
|
||
import logging
|
||
from collections import Counter, defaultdict
|
||
from typing import List, Dict, Any, Optional
|
||
|
||
import requests
|
||
import pandas as pd
|
||
import spacy
|
||
from openai import OpenAI
|
||
from tqdm import tqdm
|
||
|
||
# =========================
|
||
# Configuration
|
||
# =========================
|
||
# DEFAULT_LM_IP = "192.168.1.221" # default LM Studio host (without /v1)
|
||
# DEFAULT_LM_IP = "10.81.209.99" # default LM Studio host (without /v1)
|
||
DEFAULT_LM_IP = "100.113.108.121" # default LM Studio host (without /v1)
|
||
LLM_MODEL = "openai/gpt-oss-20b"
|
||
LLM_API_KEY = "not-needed" # LM Studio typically doesn't require an API key
|
||
|
||
INPUT_CSV = "test.csv"
|
||
OUTPUT_CSV = "test_with_names.csv"
|
||
EVENT_LOG = "event_log.txt"
|
||
EVENT_LOG_MD = "event_log.md" # markdown version for easier reading
|
||
FINAL_SNAPSHOT = "final.csv" # snapshot right before LM Studio summarization
|
||
|
||
# Columns to process
|
||
SOURCE_COL_1 = "Instance that Changed"
|
||
TARGET_COL_1 = "Applied to"
|
||
|
||
SOURCE_COL_2 = "Added"
|
||
TARGET_COL_2 = "Added Applied to"
|
||
|
||
ENTERED_COL = "Entered On"
|
||
ENTERED_MMDD_COL = "Entered On (MM/DD)"
|
||
|
||
# Values to ignore entirely (case-insensitive)
|
||
AUTO_STRINGS = {"automatic complete"}
|
||
|
||
def is_auto(val) -> bool:
|
||
return isinstance(val, str) and val.strip().lower() in AUTO_STRINGS
|
||
|
||
# Regex helpers
|
||
DELIM_SPLIT = re.compile(r"\s*[\/|\-–—]\s*")
|
||
KEEP_CHARS = re.compile(r"[^A-Za-zÀ-ÖØ-öø-ÿ' .\-]")
|
||
|
||
def clean_person(text: str) -> str:
|
||
"""Clean extracted name by removing job codes/fragments after dashes/slashes; keep name-ish chars."""
|
||
if not text:
|
||
return ""
|
||
first = DELIM_SPLIT.split(text, maxsplit=1)[0]
|
||
first = KEEP_CHARS.sub("", first).strip()
|
||
return re.sub(r"\s{2,}", " ", first)
|
||
|
||
# =========================
|
||
# LM Studio reachability
|
||
# =========================
|
||
def check_lmstudio(ip: str) -> str:
|
||
"""
|
||
Ensure LM Studio endpoint is reachable; if not, prompt for IP until it is.
|
||
Returns the validated base URL like "http://<ip>:1234/v1".
|
||
"""
|
||
def _ok(url: str) -> bool:
|
||
try:
|
||
r = requests.get(url.rstrip("/") + "/models", timeout=5)
|
||
return r.status_code == 200
|
||
except Exception:
|
||
return False
|
||
|
||
base_url = f"http://{ip}:1234/v1"
|
||
if _ok(base_url):
|
||
print(f"✅ LM Studio reachable at {base_url}")
|
||
return base_url
|
||
|
||
print(f"❌ Could not reach LM Studio at {base_url}")
|
||
while True:
|
||
new_ip = input("Enter LM Studio IP address (e.g. 192.168.1.221): ").strip()
|
||
if not new_ip:
|
||
print("Aborted: No IP provided.")
|
||
sys.exit(1)
|
||
base_url = f"http://{new_ip}:1234/v1"
|
||
print(f"🔍 Retesting {base_url}...")
|
||
if _ok(base_url):
|
||
print(f"✅ LM Studio reachable at {base_url}")
|
||
return base_url
|
||
else:
|
||
print("❌ Still unreachable. Try again or Ctrl+C to exit.")
|
||
|
||
# Perform reachability check BEFORE any processing
|
||
LLM_BASE_URL = check_lmstudio(DEFAULT_LM_IP)
|
||
client = OpenAI(base_url=LLM_BASE_URL, api_key=LLM_API_KEY)
|
||
|
||
# =========================
|
||
# spaCy model (Transformer)
|
||
# =========================
|
||
print("🔍 Loading spaCy transformer model: en_core_web_trf")
|
||
nlp = spacy.load(
|
||
"en_core_web_trf",
|
||
exclude=["parser", "tagger", "attribute_ruler", "lemmatizer", "morphologizer"],
|
||
)
|
||
print("✅ spaCy model loaded successfully.")
|
||
|
||
def extract_names(text: str) -> str:
|
||
"""Extract distinct PERSON names using spaCy Transformer model."""
|
||
if not isinstance(text, str) or not text.strip():
|
||
return ""
|
||
doc = nlp(text)
|
||
names, seen = [], set()
|
||
for ent in doc.ents:
|
||
if ent.label_ == "PERSON":
|
||
cleaned = clean_person(ent.text)
|
||
key = cleaned.lower()
|
||
if cleaned and key not in seen:
|
||
seen.add(key)
|
||
names.append(cleaned)
|
||
return ", ".join(names)
|
||
|
||
def insert_after(df: pd.DataFrame, after_col: str, new_col: str, values: pd.Series) -> None:
|
||
"""Insert new_col immediately after after_col (drop existing if present)."""
|
||
if new_col in df.columns:
|
||
df.drop(columns=[new_col], inplace=True)
|
||
idx = df.columns.get_loc(after_col) + 1
|
||
df.insert(idx, new_col, values)
|
||
|
||
def dataframe_to_compact_event(df: pd.DataFrame) -> str:
|
||
"""Compact JSON payload for a grouped event (keeps unique values per column)."""
|
||
def uniq(col):
|
||
return sorted([v for v in df[col].dropna().unique().tolist()]) if col in df else []
|
||
payload = {
|
||
"applied_to": uniq(TARGET_COL_1),
|
||
"by_user": uniq("By User"),
|
||
"in_transaction": uniq("In Transaction"),
|
||
"entered_on": uniq(ENTERED_COL),
|
||
"dates_mmdd": uniq(ENTERED_MMDD_COL),
|
||
"instances": uniq(SOURCE_COL_1),
|
||
"added": uniq(SOURCE_COL_2),
|
||
"row_count": int(len(df)),
|
||
}
|
||
return json.dumps(payload, ensure_ascii=False, indent=2)
|
||
|
||
# =========================
|
||
# Main flow
|
||
# =========================
|
||
|
||
# If processed CSV already exists, skip straight to summarization
|
||
if os.path.exists(OUTPUT_CSV):
|
||
print(f"⚡ Skipping CSV processing — {OUTPUT_CSV} already exists.")
|
||
df = pd.read_csv(OUTPUT_CSV)
|
||
# Ensure MM/DD exists (for old CSVs)
|
||
if ENTERED_MMDD_COL not in df.columns and ENTERED_COL in df.columns:
|
||
ts = pd.to_datetime(df[ENTERED_COL], errors="coerce")
|
||
df[ENTERED_MMDD_COL] = ts.dt.strftime("%m/%d").fillna("")
|
||
else:
|
||
print("⚙️ Processing CSV to extract names and generate output...")
|
||
|
||
# Load CSV
|
||
df = pd.read_csv(INPUT_CSV)
|
||
|
||
# Derive Entered On (MM/DD)
|
||
if ENTERED_COL in df.columns:
|
||
try:
|
||
ts = pd.to_datetime(df[ENTERED_COL], format="mixed", errors="coerce")
|
||
except TypeError:
|
||
ts = pd.to_datetime(df[ENTERED_COL], errors="coerce")
|
||
df[ENTERED_MMDD_COL] = ts.dt.strftime("%m/%d").fillna("")
|
||
else:
|
||
df[ENTERED_MMDD_COL] = ""
|
||
|
||
# Live progress counters for names across both columns
|
||
name_counter = Counter()
|
||
|
||
def _process_series_with_progress(series: pd.Series, desc: str) -> pd.Series:
|
||
"""Iterate with progress, update name_counter, and return extracted names Series."""
|
||
values = series.fillna("").astype(str).tolist()
|
||
out = []
|
||
total = len(values)
|
||
if total == 0:
|
||
return pd.Series([], dtype=object)
|
||
step = max(10, total // 20) # update ~every 5% (at least every 10 rows)
|
||
pbar = tqdm(values, desc=f"NER: {desc}", leave=True)
|
||
for i, text in enumerate(pbar, start=1):
|
||
names = extract_names(text)
|
||
# Update running totals (ignore "Automatic Complete")
|
||
for n in [x.strip() for x in names.split(",") if x.strip()]:
|
||
if n.lower() not in AUTO_STRINGS:
|
||
name_counter[n] += 1
|
||
out.append(names)
|
||
# Periodic status refresh
|
||
if i % step == 0 or i == total:
|
||
top = ", ".join(f"{n}:{c}" for n, c in name_counter.most_common(3))
|
||
pbar.set_postfix_str(f"unique={len(name_counter)} top=[{top}]")
|
||
return pd.Series(out, index=series.index, dtype=object)
|
||
|
||
# =========================
|
||
# Requested processing order
|
||
# 1) Process "Added" FIRST -> fill BOTH "Added Applied to" and "Applied to"
|
||
# 2) Then process "Instance that Changed" ONLY where "Applied to" is still empty -> fill BOTH
|
||
# =========================
|
||
|
||
# Ensure target columns exist and are positioned
|
||
if SOURCE_COL_1 in df.columns:
|
||
if TARGET_COL_1 not in df.columns:
|
||
insert_after(df, SOURCE_COL_1, TARGET_COL_1, pd.Series([""] * len(df), index=df.index))
|
||
else:
|
||
if TARGET_COL_1 not in df.columns:
|
||
df[TARGET_COL_1] = ""
|
||
|
||
if SOURCE_COL_2 in df.columns:
|
||
if TARGET_COL_2 not in df.columns:
|
||
insert_after(df, SOURCE_COL_2, TARGET_COL_2, pd.Series([""] * len(df), index=df.index))
|
||
else:
|
||
if TARGET_COL_2 not in df.columns:
|
||
df[TARGET_COL_2] = ""
|
||
|
||
# ---- 1) Added -> fill BOTH "Added Applied to" and "Applied to"
|
||
if SOURCE_COL_2 in df.columns:
|
||
added_names = _process_series_with_progress(df[SOURCE_COL_2], f"{SOURCE_COL_2} (ALL)")
|
||
df[TARGET_COL_2] = added_names
|
||
df[TARGET_COL_1] = added_names
|
||
else:
|
||
df[TARGET_COL_2] = df.get(TARGET_COL_2, "")
|
||
df[TARGET_COL_1] = df.get(TARGET_COL_1, "")
|
||
|
||
# ---- 2) Instance that Changed -> only where "Applied to" still empty; fill BOTH
|
||
if SOURCE_COL_1 in df.columns:
|
||
mask_empty_applied = df[TARGET_COL_1].fillna("").str.strip() == ""
|
||
if mask_empty_applied.any():
|
||
inst_subset = df.loc[mask_empty_applied, SOURCE_COL_1]
|
||
inst_names = _process_series_with_progress(inst_subset, f"{SOURCE_COL_1} (only where Applied to empty)")
|
||
df.loc[mask_empty_applied, TARGET_COL_1] = inst_names
|
||
df.loc[mask_empty_applied, TARGET_COL_2] = inst_names
|
||
|
||
# --- Remove any rows that are purely "Automatic Complete" in key fields ---
|
||
for col in [SOURCE_COL_1, SOURCE_COL_2, "In Transaction"]:
|
||
if col in df.columns:
|
||
df = df[~df[col].apply(is_auto)]
|
||
|
||
# --- Keep only selected columns (incl. MM/DD) ---
|
||
keep_cols = [
|
||
SOURCE_COL_1,
|
||
TARGET_COL_1,
|
||
"In Transaction",
|
||
SOURCE_COL_2,
|
||
TARGET_COL_2,
|
||
"By User",
|
||
ENTERED_COL,
|
||
ENTERED_MMDD_COL,
|
||
]
|
||
df = df[[c for c in keep_cols if c in df.columns]]
|
||
|
||
# --- Filter rows: keep where Applied to == Added Applied to (case-insensitive) ---
|
||
if TARGET_COL_1 in df.columns and TARGET_COL_2 in df.columns:
|
||
df = df[
|
||
df[TARGET_COL_1].fillna("").str.strip().str.lower()
|
||
== df[TARGET_COL_2].fillna("").str.strip().str.lower()
|
||
]
|
||
|
||
# --- Drop duplicates & save overall result ---
|
||
df = df.drop_duplicates().reset_index(drop=True)
|
||
df.to_csv(OUTPUT_CSV, index=False)
|
||
print(f"✅ Saved {len(df)} unique matching rows to {OUTPUT_CSV}")
|
||
|
||
# =========================
|
||
# NEW RULE APPLIED BEFORE SUMMARIZATION (covers both branches):
|
||
# Ignore rows where 'Added Applied to' value appears inside 'By User'
|
||
# Vectorized + strictly-boolean mask to avoid TypeError on "~"
|
||
# =========================
|
||
if TARGET_COL_2 in df.columns and "By User" in df.columns and not df.empty:
|
||
# Normalize to lowercase strings
|
||
names = df[TARGET_COL_2].fillna("").astype(str).str.strip().str.lower()
|
||
byuser = df["By User"].fillna("").astype(str).str.strip().str.lower()
|
||
|
||
# Build a boolean mask: both non-empty AND 'names' is a substring of 'byuser'
|
||
contains_flags = pd.Series(
|
||
[(n != "") and (u != "") and (n in u) for n, u in zip(names, byuser)],
|
||
index=df.index,
|
||
dtype="bool",
|
||
)
|
||
|
||
before = len(df)
|
||
df = df[~contains_flags].reset_index(drop=True)
|
||
removed = before - len(df)
|
||
if removed:
|
||
print(f"🚫 Ignored {removed} rows where 'Added Applied to' matched text in 'By User'.")
|
||
|
||
# --- Snapshot rows right before LM Studio processing ---
|
||
df.to_csv(FINAL_SNAPSHOT, index=False)
|
||
print(f"📦 Wrote {len(df)} rows to {FINAL_SNAPSHOT} (pre-LM Studio snapshot)")
|
||
|
||
# =========================
|
||
# LM Studio event summary generation (group by By User, then date asc)
|
||
# =========================
|
||
if not df.empty:
|
||
grouped = df.groupby([TARGET_COL_1, "By User", ENTERED_COL], dropna=False)
|
||
summaries = [] # list of tuples (by_user, mmdd, sentence)
|
||
|
||
for keys, gdf in grouped:
|
||
applied_to, by_user, entered_on = keys
|
||
if not applied_to or str(applied_to).strip() == "":
|
||
continue
|
||
|
||
mmdd_vals = gdf[ENTERED_MMDD_COL].dropna().astype(str)
|
||
mmdd = next((v for v in mmdd_vals if v.strip()), "")
|
||
|
||
payload = dataframe_to_compact_event(gdf)
|
||
|
||
prompt = (
|
||
"You are a compliance and information security analyst. "
|
||
"Given the following grouped audit data, produce ONE clear and concise sentence summarizing the event. "
|
||
"Include: (1) who performed the action (By User, include name and ID if available), "
|
||
"(2) who the change applied to (Applied to), "
|
||
"(3) the full list of role names that were assigned or added (from 'Instance that Changed' and 'Added'), "
|
||
"and (4) the date of the event. "
|
||
"Always mention the specific role titles exactly as shown in the data. "
|
||
"If multiple roles were assigned, list them all in a natural phrase like "
|
||
"'assigned the A, B, and C roles'. "
|
||
"Do not include raw JSON, extra commentary, or line breaks. Return only one sentence.\n\n"
|
||
f"Audit Data (JSON):\n{payload}"
|
||
)
|
||
|
||
try:
|
||
resp = client.chat.completions.create(
|
||
model=LLM_MODEL,
|
||
messages=[
|
||
{"role": "system", "content": "You write terse, clear compliance summaries."},
|
||
{"role": "user", "content": prompt},
|
||
],
|
||
temperature=0.2,
|
||
)
|
||
one_liner = (resp.choices[0].message.content or "").strip()
|
||
except Exception as e:
|
||
one_liner = f"[LLM ERROR] {e}"
|
||
|
||
summaries.append((by_user or "Unknown User", mmdd, one_liner))
|
||
|
||
# Group by By User, sort each user's entries by mm/dd asc, write file (OVERWRITE)
|
||
grouped_summaries: Dict[str, list[tuple[str, str]]] = defaultdict(list)
|
||
for by_user, mmdd, line in summaries:
|
||
grouped_summaries[by_user].append((mmdd, line))
|
||
|
||
for user in grouped_summaries:
|
||
grouped_summaries[user].sort(key=lambda x: x[0] or "")
|
||
|
||
with open(EVENT_LOG, "w", encoding="utf-8") as f:
|
||
for user in sorted(grouped_summaries.keys()):
|
||
f.write(f"=== {user} ===\n")
|
||
for mmdd, line in grouped_summaries[user]:
|
||
prefix = f"{mmdd} - " if mmdd else ""
|
||
f.write(f"{prefix}{line}\n")
|
||
f.write("\n")
|
||
|
||
total_events = sum(len(v) for v in grouped_summaries.values())
|
||
print(f"📝 Overwrote {EVENT_LOG} with {total_events} grouped event summaries")
|
||
|
||
# Write markdown version for easier reading
|
||
with open(EVENT_LOG_MD, "w", encoding="utf-8") as f:
|
||
f.write("# Security Admin Audit Event Log\n\n")
|
||
for user in sorted(grouped_summaries.keys()):
|
||
f.write(f"## {user}\n\n")
|
||
f.write("| Date | Event Summary |\n")
|
||
f.write("|------|---------------|\n")
|
||
for mmdd, line in grouped_summaries[user]:
|
||
date_str = mmdd if mmdd else "N/A"
|
||
escaped_line = line.replace("|", "\\|")
|
||
f.write(f"| {date_str} | {escaped_line} |\n")
|
||
f.write("\n")
|
||
print(f"📝 Wrote {EVENT_LOG_MD} with {total_events} grouped event summaries (markdown)")
|
||
else:
|
||
print("ℹ️ No matching rows found; nothing to summarize.")
|