665 lines
25 KiB
Python
665 lines
25 KiB
Python
#!/usr/bin/env python3
|
|
import os
|
|
import re
|
|
import json
|
|
from urllib.parse import urljoin, urlencode
|
|
from datetime import datetime, timedelta
|
|
|
|
import requests
|
|
import pytz
|
|
from bs4 import BeautifulSoup
|
|
from dotenv import load_dotenv
|
|
from flask import (
|
|
Flask, request, jsonify, render_template, abort,
|
|
redirect, url_for, make_response, session
|
|
)
|
|
from flask_cors import CORS
|
|
from readability import Document
|
|
from werkzeug.middleware.proxy_fix import ProxyFix
|
|
from werkzeug.security import check_password_hash
|
|
|
|
from db import get_db
|
|
|
|
load_dotenv()
|
|
|
|
APP_ROOT = os.environ.get("APPLICATION_ROOT", "/readitlater")
|
|
API_TOKEN = os.environ.get("API_TOKEN", "")
|
|
SECRET_KEY = os.environ.get("SECRET_KEY", "dev")
|
|
|
|
# Login creds (single-user)
|
|
LOGIN_USERNAME = os.environ.get("LOGIN_USERNAME", "admin")
|
|
LOGIN_PASSWORD_HASH = os.environ.get("LOGIN_PASSWORD_HASH", "") # werkzeug hash
|
|
|
|
NY_TZ = pytz.timezone("America/New_York")
|
|
|
|
|
|
def create_app():
|
|
app = Flask(__name__, static_url_path=f"{APP_ROOT}/static", static_folder="static")
|
|
app.config.update(
|
|
SECRET_KEY=SECRET_KEY,
|
|
SESSION_COOKIE_SECURE=True,
|
|
SESSION_COOKIE_HTTPONLY=True,
|
|
SESSION_COOKIE_SAMESITE="Lax",
|
|
PERMANENT_SESSION_LIFETIME=timedelta(days=7),
|
|
)
|
|
app.wsgi_app = ProxyFix(app.wsgi_app, x_for=1, x_proto=1, x_host=1, x_prefix=1)
|
|
|
|
# CORS for API (tighten as desired)
|
|
CORS(app, resources={rf"{APP_ROOT}/api/*": {"origins": [
|
|
"https://www.jaredlog.com", "https://jaredlog.com"
|
|
]}})
|
|
|
|
# -------------------------------------------------------------------------
|
|
# NOTE (extension patch): In your Chrome extension, prefer:
|
|
# document.documentElement.getHTML({ includeShadowRoots: true })
|
|
# and fall back to outerHTML. That captures Shadow DOM content IBM/others use.
|
|
# -------------------------------------------------------------------------
|
|
|
|
# --- Sanitization policy for captured HTML ---
|
|
ALLOWED_TAGS = {
|
|
"article", "section", "header", "footer",
|
|
"h1", "h2", "h3", "h4", "p", "blockquote", "pre", "code",
|
|
"ul", "ol", "li", "a", "em", "strong", "b", "i",
|
|
"img", "figure", "figcaption", "hr", "br",
|
|
"picture", "source"
|
|
}
|
|
ALLOWED_ATTRS = {
|
|
"href", "src", "alt", "title", "target", "rel", "loading",
|
|
"srcset", "sizes", "referrerpolicy"
|
|
}
|
|
|
|
TAG_RE = re.compile(r"\s+")
|
|
|
|
# ----------------------------- Auth gate -----------------------------
|
|
def is_ui_path(path: str) -> bool:
|
|
# Protect everything under APP_ROOT except allowlisted paths
|
|
if path == APP_ROOT or path.startswith(APP_ROOT + "/"):
|
|
if (path.startswith(f"{APP_ROOT}/static/")
|
|
or path.startswith(f"{APP_ROOT}/api/")
|
|
or path == f"{APP_ROOT}/healthz"
|
|
or path == f"{APP_ROOT}/login"
|
|
or path == f"{APP_ROOT}/logout"):
|
|
return False
|
|
return True
|
|
return False
|
|
|
|
@app.before_request
|
|
def _gate_ui():
|
|
if not is_ui_path(request.path):
|
|
return
|
|
if session.get("auth_ok") is True:
|
|
return
|
|
next_qs = urlencode({"next": request.full_path if request.query_string else request.path})
|
|
return redirect(f"{APP_ROOT}/login?{next_qs}", code=302)
|
|
|
|
# ----------------------------- Utils -----------------------------
|
|
def require_token() -> bool:
|
|
return bool(API_TOKEN) and request.headers.get("Authorization", "") == f"Bearer {API_TOKEN}"
|
|
|
|
def normalize_tag(s: str) -> str:
|
|
s = TAG_RE.sub(" ", (s or "").strip().lower())
|
|
return s
|
|
|
|
# --------- Helpers for images / shadow DOM / JSON-LD ---------
|
|
def is_image_url(u: str) -> bool:
|
|
u = (u or "").lower()
|
|
return any(u.split("?")[0].endswith(ext) for ext in (".png", ".jpg", ".jpeg", ".gif", ".webp", ".avif", ".svg"))
|
|
|
|
def absolutize_srcset(value: str, base_url: str) -> str:
|
|
parts = []
|
|
for part in (value or "").split(","):
|
|
bits = part.strip().split()
|
|
if bits:
|
|
bits[0] = urljoin(base_url, bits[0])
|
|
parts.append(" ".join(bits))
|
|
return ", ".join(parts)
|
|
|
|
def inline_declarative_shadow_dom(raw_html: str) -> str:
|
|
try:
|
|
soup = BeautifulSoup(raw_html, "lxml")
|
|
for tpl in soup.find_all("template"):
|
|
if tpl.has_attr("shadowrootmode"):
|
|
frag = BeautifulSoup(tpl.decode_contents(), "lxml")
|
|
tpl.replace_with(frag)
|
|
return str(soup)
|
|
except Exception:
|
|
return raw_html
|
|
|
|
def extract_jsonld_article_body(raw_html: str) -> tuple[str | None, str | None]:
|
|
try:
|
|
doc = BeautifulSoup(raw_html, "lxml")
|
|
for s in doc.find_all("script", type="application/ld+json"):
|
|
text = s.string
|
|
if not text:
|
|
continue
|
|
data = json.loads(text)
|
|
items = data if isinstance(data, list) else [data]
|
|
for it in items:
|
|
t = (it.get("@type") or it.get("type") or "")
|
|
t = t.lower() if isinstance(t, str) else str(t).lower()
|
|
if "article" in t or "newsarticle" in t or "blogposting" in t:
|
|
body = it.get("articleBody") or it.get("articlebody")
|
|
title = it.get("headline") or it.get("name")
|
|
if body and isinstance(body, str) and len(body.strip()) > 400:
|
|
html = "".join(
|
|
f"<p>{p.strip()}</p>"
|
|
for p in body.split("\n")
|
|
if p.strip()
|
|
)
|
|
return title, html
|
|
except Exception:
|
|
pass
|
|
return None, None
|
|
|
|
def pick_thumbnail(raw_html: str, cleaned_soup: BeautifulSoup, base_url: str) -> str | None:
|
|
img = cleaned_soup.find("img", src=True)
|
|
if img and img.get("src"):
|
|
return urljoin(base_url, img["src"])
|
|
|
|
for tag in cleaned_soup.find_all("img"):
|
|
lazy = (
|
|
tag.get("data-src")
|
|
or tag.get("data-lazy-src")
|
|
or tag.get("data-original")
|
|
or tag.get("data-srcset")
|
|
)
|
|
if not tag.get("src") and lazy:
|
|
return urljoin(base_url, lazy)
|
|
if not tag.get("src"):
|
|
ss = tag.get("srcset")
|
|
if ss:
|
|
first = ss.split(",")[0].strip().split(" ")[0]
|
|
if first:
|
|
return urljoin(base_url, first)
|
|
|
|
for pic in cleaned_soup.find_all("picture"):
|
|
for source in pic.find_all("source"):
|
|
ss = source.get("srcset")
|
|
if ss:
|
|
first = ss.split(",")[0].strip().split(" ")[0]
|
|
if first:
|
|
return urljoin(base_url, first)
|
|
|
|
try:
|
|
full = BeautifulSoup(raw_html, "lxml")
|
|
meta = (
|
|
full.find("meta", property="og:image") or
|
|
full.find("meta", attrs={"name": "og:image"}) or
|
|
full.find("meta", attrs={"name": "twitter:image"}) or
|
|
full.find("meta", property="twitter:image")
|
|
)
|
|
if meta and meta.get("content"):
|
|
return urljoin(base_url, meta["content"])
|
|
except Exception:
|
|
pass
|
|
return None
|
|
|
|
def extract_and_clean(raw_html: str, base_url: str) -> tuple[str, str, str, str | None]:
|
|
# 1) Prepass: inline Declarative Shadow DOM
|
|
pre_html = inline_declarative_shadow_dom(raw_html)
|
|
|
|
# 2) Readability extraction
|
|
doc = Document(pre_html)
|
|
title = (doc.short_title() or doc.title() or "").strip()
|
|
summary_html = doc.summary(html_partial=True)
|
|
soup = BeautifulSoup(summary_html, "lxml")
|
|
|
|
# Keep <noscript> for now; strip scripts/styles only
|
|
for t in soup(["script", "style"]):
|
|
t.decompose()
|
|
|
|
# 3) Allowlist + absolutize + lazy promotion
|
|
for tag in soup.find_all(True):
|
|
if tag.name not in ALLOWED_TAGS:
|
|
tag.unwrap()
|
|
continue
|
|
|
|
if tag.name == "img":
|
|
if not tag.get("src"):
|
|
lazy_src = (
|
|
tag.get("data-src")
|
|
or tag.get("data-lazy-src")
|
|
or tag.get("data-original")
|
|
or tag.get("data-srcset")
|
|
)
|
|
if lazy_src:
|
|
tag["src"] = urljoin(base_url, lazy_src)
|
|
if not tag.get("srcset") and tag.get("data-srcset"):
|
|
tag["srcset"] = tag["data-srcset"]
|
|
|
|
if tag.name == "source":
|
|
if not tag.get("srcset") and tag.get("data-srcset"):
|
|
tag["srcset"] = tag["data-srcset"]
|
|
|
|
safe = {}
|
|
for k, v in list(tag.attrs.items()):
|
|
if isinstance(v, list):
|
|
v = " ".join(v)
|
|
if k in ALLOWED_ATTRS:
|
|
if k in ("href", "src"):
|
|
v = urljoin(base_url, v)
|
|
elif k == "srcset":
|
|
v = absolutize_srcset(v, base_url)
|
|
safe[k] = v
|
|
tag.attrs = safe
|
|
|
|
if tag.name == "a":
|
|
tag.attrs.setdefault("rel", "noopener noreferrer")
|
|
tag.attrs.setdefault("target", "_blank")
|
|
if tag.name == "img":
|
|
tag.attrs.setdefault("loading", "lazy")
|
|
tag.attrs.setdefault("referrerpolicy", "no-referrer")
|
|
|
|
# 4) Convert bare image links into <img>
|
|
for a in list(soup.find_all("a", href=True)):
|
|
href = a.get("href")
|
|
if href and is_image_url(href) and not a.find("img"):
|
|
href = urljoin(base_url, href)
|
|
img = soup.new_tag("img", src=href, loading="lazy", referrerpolicy="no-referrer")
|
|
text_alt = a.get_text(" ", strip=True)
|
|
if text_alt:
|
|
img["alt"] = text_alt.replace("Image:", "").strip()
|
|
a.replace_with(img)
|
|
|
|
# 5) Pick thumbnail before we drop noscripts
|
|
thumb_url = pick_thumbnail(raw_html, soup, base_url)
|
|
|
|
# 6) Remove residual noscript wrappers
|
|
for t in soup(["noscript"]):
|
|
t.decompose()
|
|
|
|
cleaned_html = str(soup)
|
|
text = BeautifulSoup(cleaned_html, "lxml").get_text("\n", strip=True)
|
|
text = "\n\n".join([line.strip() for line in text.split("\n") if line.strip()])
|
|
|
|
# 7) Fallback: if content is suspiciously short, try JSON-LD articleBody
|
|
if len(text) < 800:
|
|
jt, jhtml = extract_jsonld_article_body(raw_html)
|
|
if jhtml:
|
|
jsoup = BeautifulSoup(jhtml, "lxml")
|
|
for tag in jsoup.find_all(True):
|
|
if tag.name == "a" and tag.get("href"):
|
|
tag["href"] = urljoin(base_url, tag["href"])
|
|
tag["target"] = "_blank"
|
|
tag["rel"] = "noopener noreferrer"
|
|
if tag.name == "img":
|
|
if tag.get("src"):
|
|
tag["src"] = urljoin(base_url, tag["src"])
|
|
tag["loading"] = "lazy"
|
|
tag["referrerpolicy"] = "no-referrer"
|
|
cleaned_html = str(jsoup)
|
|
text = BeautifulSoup(cleaned_html, "lxml").get_text("\n", strip=True)
|
|
text = "\n\n".join([line.strip() for line in text.split("\n") if line.strip()])
|
|
if not title:
|
|
title = jt or title
|
|
|
|
return title, cleaned_html, text, thumb_url
|
|
|
|
# ---------------- Tag helpers ----------------
|
|
def ensure_tags(conn, names: list[str]) -> list[int]:
|
|
tag_ids = []
|
|
for n in names:
|
|
n = normalize_tag(n)
|
|
if not n:
|
|
continue
|
|
row = conn.execute("SELECT id FROM tags WHERE name = ?", (n,)).fetchone()
|
|
if row:
|
|
tag_ids.append(row["id"])
|
|
else:
|
|
cur = conn.execute("INSERT INTO tags(name) VALUES (?)", (n,))
|
|
tag_ids.append(cur.lastrowid)
|
|
return tag_ids
|
|
|
|
def add_item_tags(conn, item_id: int, names: list[str]) -> None:
|
|
names = [normalize_tag(t) for t in names if normalize_tag(t)]
|
|
if not names:
|
|
return
|
|
tag_ids = ensure_tags(conn, names)
|
|
conn.executemany(
|
|
"INSERT OR IGNORE INTO item_tags(item_id, tag_id) VALUES (?, ?)",
|
|
[(item_id, tid) for tid in tag_ids],
|
|
)
|
|
|
|
# ---------------- Authentication routes ----------------
|
|
@app.get(f"{APP_ROOT}/login")
|
|
def login_form():
|
|
err = request.args.get("err", "")
|
|
return render_template("login.html", error=err)
|
|
|
|
@app.post(f"{APP_ROOT}/login")
|
|
def login_submit():
|
|
username = (request.form.get("username") or "").strip()
|
|
password = (request.form.get("password") or "")
|
|
next_url = request.args.get("next") or f"{APP_ROOT}/"
|
|
|
|
if username == LOGIN_USERNAME and LOGIN_PASSWORD_HASH and check_password_hash(LOGIN_PASSWORD_HASH, password):
|
|
session.clear()
|
|
session.permanent = True # honors PERMANENT_SESSION_LIFETIME=7 days
|
|
session["auth_ok"] = True
|
|
session["who"] = username
|
|
return redirect(next_url, code=302)
|
|
|
|
qs = urlencode({"err": "Invalid credentials"})
|
|
return redirect(f"{APP_ROOT}/login?{qs}", code=302)
|
|
|
|
@app.post(f"{APP_ROOT}/logout")
|
|
def logout():
|
|
session.clear()
|
|
return redirect(f"{APP_ROOT}/login", code=302)
|
|
|
|
# ---------------- UI ----------------
|
|
@app.get(f"{APP_ROOT}/")
|
|
def index():
|
|
q = request.args.get("q", "").strip()
|
|
tag = request.args.get("tag", "").strip().lower()
|
|
tags_csv = request.args.get("tags", "").strip().lower()
|
|
filter_tags = []
|
|
if tag:
|
|
filter_tags = [normalize_tag(tag)]
|
|
elif tags_csv:
|
|
filter_tags = [normalize_tag(t) for t in tags_csv.split(",") if normalize_tag(t)]
|
|
|
|
with get_db() as db:
|
|
params: list = []
|
|
if q:
|
|
base_sql = """
|
|
SELECT i.id, i.title, i.url, i.added_at, i.thumb_url
|
|
FROM items_fts f
|
|
JOIN items i ON i.id = f.rowid
|
|
WHERE items_fts MATCH ?
|
|
"""
|
|
params.append(q)
|
|
else:
|
|
base_sql = """
|
|
SELECT i.id, i.title, i.url, i.added_at, i.thumb_url
|
|
FROM items i
|
|
"""
|
|
|
|
if filter_tags:
|
|
sqls = []
|
|
for tname in filter_tags:
|
|
if q:
|
|
sqls.append(
|
|
base_sql + """
|
|
AND EXISTS (
|
|
SELECT 1
|
|
FROM item_tags it
|
|
JOIN tags tg ON tg.id = it.tag_id
|
|
WHERE it.item_id = i.id AND tg.name = ?
|
|
)
|
|
"""
|
|
)
|
|
else:
|
|
sqls.append(
|
|
base_sql + """
|
|
WHERE EXISTS (
|
|
SELECT 1
|
|
FROM item_tags it
|
|
JOIN tags tg ON tg.id = it.tag_id
|
|
WHERE it.item_id = i.id AND tg.name = ?
|
|
)
|
|
"""
|
|
)
|
|
params.append(tname)
|
|
|
|
final_sql = " INTERSECT ".join(sqls) + " ORDER BY i.id DESC LIMIT 100"
|
|
rows = db.execute(final_sql, params).fetchall()
|
|
else:
|
|
final_sql = base_sql + (" ORDER BY bm25(items_fts) LIMIT 100" if q else " ORDER BY i.id DESC LIMIT 100")
|
|
rows = db.execute(final_sql, params).fetchall()
|
|
|
|
# Collect tags for visible items
|
|
if rows:
|
|
ids = [r["id"] for r in rows]
|
|
qmarks = ",".join("?" for _ in ids)
|
|
tagmap = {}
|
|
for tr in db.execute(
|
|
f"""
|
|
SELECT it.item_id, tg.name
|
|
FROM item_tags it
|
|
JOIN tags tg ON tg.id = it.tag_id
|
|
WHERE it.item_id IN ({qmarks})
|
|
ORDER BY tg.name
|
|
""",
|
|
ids,
|
|
).fetchall():
|
|
tagmap.setdefault(tr["item_id"], []).append(tr["name"])
|
|
else:
|
|
tagmap = {}
|
|
|
|
# All tags (for top row), with counts
|
|
tags_all = db.execute("""
|
|
SELECT tg.name AS name, COUNT(*) AS cnt
|
|
FROM tags tg
|
|
JOIN item_tags it ON it.tag_id = tg.id
|
|
GROUP BY tg.id
|
|
ORDER BY tg.name
|
|
""").fetchall()
|
|
|
|
return render_template(
|
|
"index.html",
|
|
rows=rows,
|
|
q=q,
|
|
filter_tags=filter_tags,
|
|
tagmap=tagmap,
|
|
tags_all=tags_all,
|
|
)
|
|
|
|
@app.get(f"{APP_ROOT}/item/<int:item_id>")
|
|
def detail(item_id: int):
|
|
with get_db() as db:
|
|
row = db.execute("SELECT * FROM items WHERE id = ?", (item_id,)).fetchone()
|
|
if not row:
|
|
abort(404)
|
|
|
|
tags = db.execute("""
|
|
SELECT tg.name FROM item_tags it
|
|
JOIN tags tg ON tg.id = it.tag_id
|
|
WHERE it.item_id = ? ORDER BY tg.name
|
|
""", (item_id,)).fetchall()
|
|
|
|
# All tags (for quick-add palette)
|
|
tags_all = db.execute("""
|
|
SELECT tg.name AS name, COUNT(*) AS cnt
|
|
FROM tags tg
|
|
LEFT JOIN item_tags it ON it.tag_id = tg.id
|
|
GROUP BY tg.id
|
|
ORDER BY tg.name
|
|
""").fetchall()
|
|
|
|
return render_template(
|
|
"detail.html",
|
|
item=row,
|
|
tags=[t["name"] for t in tags],
|
|
tags_all=tags_all,
|
|
)
|
|
|
|
# ---------------- Add via URL (server-side fetch) ----------------
|
|
@app.post(f"{APP_ROOT}/add-url")
|
|
def add_url():
|
|
url = (request.form.get("u") or "").strip()
|
|
if not url:
|
|
return jsonify({"error": "missing url"}), 400
|
|
|
|
try:
|
|
resp = requests.get(
|
|
url,
|
|
timeout=12,
|
|
headers={
|
|
"User-Agent": (
|
|
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) "
|
|
"AppleWebKit/537.36 (KHTML, like Gecko) "
|
|
"Chrome/120.0.0.0 Safari/537.36"
|
|
)
|
|
},
|
|
)
|
|
resp.raise_for_status()
|
|
content = resp.content
|
|
if len(content) > 8 * 1024 * 1024:
|
|
return jsonify({"error": "page too large"}), 413
|
|
html_text = resp.text
|
|
except requests.RequestException:
|
|
return jsonify({"error": "fetch_failed"}), 502
|
|
|
|
# Try to extract a <title> and serialize <html> back out
|
|
title_guess = None
|
|
try:
|
|
soup = BeautifulSoup(html_text, "lxml")
|
|
t = soup.find("title")
|
|
title_guess = (t.get_text() if t else "").strip() or None
|
|
html_node = soup.find("html")
|
|
if html_node:
|
|
html_text = "<!doctype html>" + str(html_node)
|
|
else:
|
|
html_text = "<!doctype html>" + html_text
|
|
except Exception:
|
|
html_text = "<!doctype html>" + html_text
|
|
|
|
# Reuse the same extraction path
|
|
try:
|
|
etitle, cleaned, text, thumb_url = extract_and_clean(html_text, url)
|
|
title = title_guess or etitle or url or "Untitled"
|
|
except Exception as e:
|
|
return jsonify({"error": f"extract_failed:{type(e).__name__}"}), 400
|
|
|
|
with get_db() as db:
|
|
db.execute(
|
|
"""
|
|
INSERT INTO items (url, title, content_html, content_text, thumb_url)
|
|
VALUES (?, ?, ?, ?, ?)
|
|
ON CONFLICT(url) DO UPDATE SET
|
|
title = excluded.title,
|
|
content_html = excluded.content_html,
|
|
content_text = excluded.content_text,
|
|
thumb_url = excluded.thumb_url,
|
|
added_at = datetime('now')
|
|
""",
|
|
(url, title, cleaned, text, thumb_url),
|
|
)
|
|
item_id = db.execute("SELECT id FROM items WHERE url = ?", (url,)).fetchone()["id"]
|
|
db.commit()
|
|
|
|
return redirect(url_for("detail", item_id=item_id))
|
|
|
|
# ---------------- API (extension) ----------------
|
|
@app.post(f"{APP_ROOT}/api/v1/capture")
|
|
def capture():
|
|
if not require_token():
|
|
return jsonify({"error": "unauthorized"}), 401
|
|
|
|
data = request.get_json(silent=True) or {}
|
|
url = (data.get("url") or "").strip()
|
|
title = (data.get("title") or "").strip()
|
|
raw = data.get("html") or ""
|
|
tags = data.get("tags") or []
|
|
if isinstance(tags, str):
|
|
tags = [t.strip() for t in tags.split(",") if t.strip()]
|
|
|
|
if not raw:
|
|
return jsonify({"error": "missing html"}), 400
|
|
|
|
try:
|
|
etitle, cleaned, text, thumb_url = extract_and_clean(raw, url)
|
|
if not title:
|
|
title = etitle or url or "Untitled"
|
|
except Exception as e:
|
|
return jsonify({"error": f"extract_failed:{type(e).__name__}"}), 400
|
|
|
|
with get_db() as db:
|
|
db.execute(
|
|
"""
|
|
INSERT INTO items (url, title, content_html, content_text, thumb_url)
|
|
VALUES (?, ?, ?, ?, ?)
|
|
ON CONFLICT(url) DO UPDATE SET
|
|
title = excluded.title,
|
|
content_html = excluded.content_html,
|
|
content_text = excluded.content_text,
|
|
thumb_url = excluded.thumb_url,
|
|
added_at = datetime('now')
|
|
""",
|
|
(url, title, cleaned, text, thumb_url),
|
|
)
|
|
item_id = db.execute("SELECT id FROM items WHERE url = ?", (url,)).fetchone()["id"]
|
|
if tags:
|
|
add_item_tags(db, item_id, tags)
|
|
db.commit()
|
|
|
|
return jsonify({"ok": True, "id": item_id, "title": title})
|
|
|
|
@app.get(f"{APP_ROOT}/healthz")
|
|
def healthz():
|
|
return jsonify({"status": "ok"})
|
|
|
|
# Delete (HTMX refresh)
|
|
@app.post(f"{APP_ROOT}/item/<int:item_id>/delete")
|
|
@app.delete(f"{APP_ROOT}/item/<int:item_id>")
|
|
def delete_item(item_id: int):
|
|
with get_db() as db:
|
|
db.execute("DELETE FROM items WHERE id = ?", (item_id,))
|
|
db.commit()
|
|
|
|
if request.headers.get("HX-Request") == "true":
|
|
resp = make_response("", 204)
|
|
resp.headers["HX-Refresh"] = "true" # full page reload
|
|
return resp
|
|
|
|
return redirect(url_for("index"))
|
|
|
|
# -------- Tag HTMX endpoints --------
|
|
@app.post(f"{APP_ROOT}/item/<int:item_id>/tag")
|
|
def add_tag(item_id: int):
|
|
name = normalize_tag(request.form.get("name", ""))
|
|
if not name:
|
|
return ("", 204)
|
|
with get_db() as db:
|
|
item = db.execute("SELECT * FROM items WHERE id = ?", (item_id,)).fetchone()
|
|
if not item:
|
|
abort(404)
|
|
add_item_tags(db, item_id, [name])
|
|
db.commit()
|
|
tags = db.execute("""
|
|
SELECT tg.name FROM item_tags it JOIN tags tg ON tg.id = it.tag_id
|
|
WHERE it.item_id = ? ORDER BY tg.name
|
|
""", (item_id,)).fetchall()
|
|
return render_template("tags.html", item=item, tags=[t["name"] for t in tags])
|
|
|
|
@app.post(f"{APP_ROOT}/item/<int:item_id>/tag/<name>/delete")
|
|
def delete_tag(item_id: int, name: str):
|
|
name = normalize_tag(name)
|
|
with get_db() as db:
|
|
item = db.execute("SELECT * FROM items WHERE id = ?", (item_id,)).fetchone()
|
|
if not item:
|
|
abort(404)
|
|
db.execute("""
|
|
DELETE FROM item_tags
|
|
WHERE item_id = ? AND tag_id = (SELECT id FROM tags WHERE name = ?)
|
|
""", (item_id, name))
|
|
db.commit()
|
|
tags = db.execute("""
|
|
SELECT tg.name FROM item_tags it JOIN tags tg ON tg.id = it.tag_id
|
|
WHERE it.item_id = ? ORDER BY tg.name
|
|
""", (item_id,)).fetchall()
|
|
return render_template("tags.html", item=item, tags=[t["name"] for t in tags])
|
|
|
|
return app
|
|
|
|
|
|
app = create_app()
|
|
|
|
|
|
def format_est(dt_str: str) -> str:
|
|
"""Convert UTC string to EST/EDT in 12-hour format."""
|
|
try:
|
|
dt = datetime.strptime(dt_str, "%Y-%m-%d %H:%M:%S")
|
|
dt = pytz.utc.localize(dt).astimezone(NY_TZ)
|
|
return dt.strftime("%b %d, %Y %I:%M %p")
|
|
except Exception:
|
|
return dt_str
|
|
|
|
|
|
app.jinja_env.filters["format_est"] = format_est
|
|
|
|
|
|
if __name__ == "__main__":
|
|
app.run(host="127.0.0.1", port=8013, debug=True)
|