Files
readitlater/app.py
root 1c4aaf18b2 Initial commit
Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-01-20 03:53:49 +00:00

665 lines
25 KiB
Python

#!/usr/bin/env python3
import os
import re
import json
from urllib.parse import urljoin, urlencode
from datetime import datetime, timedelta
import requests
import pytz
from bs4 import BeautifulSoup
from dotenv import load_dotenv
from flask import (
Flask, request, jsonify, render_template, abort,
redirect, url_for, make_response, session
)
from flask_cors import CORS
from readability import Document
from werkzeug.middleware.proxy_fix import ProxyFix
from werkzeug.security import check_password_hash
from db import get_db
load_dotenv()
APP_ROOT = os.environ.get("APPLICATION_ROOT", "/readitlater")
API_TOKEN = os.environ.get("API_TOKEN", "")
SECRET_KEY = os.environ.get("SECRET_KEY", "dev")
# Login creds (single-user)
LOGIN_USERNAME = os.environ.get("LOGIN_USERNAME", "admin")
LOGIN_PASSWORD_HASH = os.environ.get("LOGIN_PASSWORD_HASH", "") # werkzeug hash
NY_TZ = pytz.timezone("America/New_York")
def create_app():
app = Flask(__name__, static_url_path=f"{APP_ROOT}/static", static_folder="static")
app.config.update(
SECRET_KEY=SECRET_KEY,
SESSION_COOKIE_SECURE=True,
SESSION_COOKIE_HTTPONLY=True,
SESSION_COOKIE_SAMESITE="Lax",
PERMANENT_SESSION_LIFETIME=timedelta(days=7),
)
app.wsgi_app = ProxyFix(app.wsgi_app, x_for=1, x_proto=1, x_host=1, x_prefix=1)
# CORS for API (tighten as desired)
CORS(app, resources={rf"{APP_ROOT}/api/*": {"origins": [
"https://www.jaredlog.com", "https://jaredlog.com"
]}})
# -------------------------------------------------------------------------
# NOTE (extension patch): In your Chrome extension, prefer:
# document.documentElement.getHTML({ includeShadowRoots: true })
# and fall back to outerHTML. That captures Shadow DOM content IBM/others use.
# -------------------------------------------------------------------------
# --- Sanitization policy for captured HTML ---
ALLOWED_TAGS = {
"article", "section", "header", "footer",
"h1", "h2", "h3", "h4", "p", "blockquote", "pre", "code",
"ul", "ol", "li", "a", "em", "strong", "b", "i",
"img", "figure", "figcaption", "hr", "br",
"picture", "source"
}
ALLOWED_ATTRS = {
"href", "src", "alt", "title", "target", "rel", "loading",
"srcset", "sizes", "referrerpolicy"
}
TAG_RE = re.compile(r"\s+")
# ----------------------------- Auth gate -----------------------------
def is_ui_path(path: str) -> bool:
# Protect everything under APP_ROOT except allowlisted paths
if path == APP_ROOT or path.startswith(APP_ROOT + "/"):
if (path.startswith(f"{APP_ROOT}/static/")
or path.startswith(f"{APP_ROOT}/api/")
or path == f"{APP_ROOT}/healthz"
or path == f"{APP_ROOT}/login"
or path == f"{APP_ROOT}/logout"):
return False
return True
return False
@app.before_request
def _gate_ui():
if not is_ui_path(request.path):
return
if session.get("auth_ok") is True:
return
next_qs = urlencode({"next": request.full_path if request.query_string else request.path})
return redirect(f"{APP_ROOT}/login?{next_qs}", code=302)
# ----------------------------- Utils -----------------------------
def require_token() -> bool:
return bool(API_TOKEN) and request.headers.get("Authorization", "") == f"Bearer {API_TOKEN}"
def normalize_tag(s: str) -> str:
s = TAG_RE.sub(" ", (s or "").strip().lower())
return s
# --------- Helpers for images / shadow DOM / JSON-LD ---------
def is_image_url(u: str) -> bool:
u = (u or "").lower()
return any(u.split("?")[0].endswith(ext) for ext in (".png", ".jpg", ".jpeg", ".gif", ".webp", ".avif", ".svg"))
def absolutize_srcset(value: str, base_url: str) -> str:
parts = []
for part in (value or "").split(","):
bits = part.strip().split()
if bits:
bits[0] = urljoin(base_url, bits[0])
parts.append(" ".join(bits))
return ", ".join(parts)
def inline_declarative_shadow_dom(raw_html: str) -> str:
try:
soup = BeautifulSoup(raw_html, "lxml")
for tpl in soup.find_all("template"):
if tpl.has_attr("shadowrootmode"):
frag = BeautifulSoup(tpl.decode_contents(), "lxml")
tpl.replace_with(frag)
return str(soup)
except Exception:
return raw_html
def extract_jsonld_article_body(raw_html: str) -> tuple[str | None, str | None]:
try:
doc = BeautifulSoup(raw_html, "lxml")
for s in doc.find_all("script", type="application/ld+json"):
text = s.string
if not text:
continue
data = json.loads(text)
items = data if isinstance(data, list) else [data]
for it in items:
t = (it.get("@type") or it.get("type") or "")
t = t.lower() if isinstance(t, str) else str(t).lower()
if "article" in t or "newsarticle" in t or "blogposting" in t:
body = it.get("articleBody") or it.get("articlebody")
title = it.get("headline") or it.get("name")
if body and isinstance(body, str) and len(body.strip()) > 400:
html = "".join(
f"<p>{p.strip()}</p>"
for p in body.split("\n")
if p.strip()
)
return title, html
except Exception:
pass
return None, None
def pick_thumbnail(raw_html: str, cleaned_soup: BeautifulSoup, base_url: str) -> str | None:
img = cleaned_soup.find("img", src=True)
if img and img.get("src"):
return urljoin(base_url, img["src"])
for tag in cleaned_soup.find_all("img"):
lazy = (
tag.get("data-src")
or tag.get("data-lazy-src")
or tag.get("data-original")
or tag.get("data-srcset")
)
if not tag.get("src") and lazy:
return urljoin(base_url, lazy)
if not tag.get("src"):
ss = tag.get("srcset")
if ss:
first = ss.split(",")[0].strip().split(" ")[0]
if first:
return urljoin(base_url, first)
for pic in cleaned_soup.find_all("picture"):
for source in pic.find_all("source"):
ss = source.get("srcset")
if ss:
first = ss.split(",")[0].strip().split(" ")[0]
if first:
return urljoin(base_url, first)
try:
full = BeautifulSoup(raw_html, "lxml")
meta = (
full.find("meta", property="og:image") or
full.find("meta", attrs={"name": "og:image"}) or
full.find("meta", attrs={"name": "twitter:image"}) or
full.find("meta", property="twitter:image")
)
if meta and meta.get("content"):
return urljoin(base_url, meta["content"])
except Exception:
pass
return None
def extract_and_clean(raw_html: str, base_url: str) -> tuple[str, str, str, str | None]:
# 1) Prepass: inline Declarative Shadow DOM
pre_html = inline_declarative_shadow_dom(raw_html)
# 2) Readability extraction
doc = Document(pre_html)
title = (doc.short_title() or doc.title() or "").strip()
summary_html = doc.summary(html_partial=True)
soup = BeautifulSoup(summary_html, "lxml")
# Keep <noscript> for now; strip scripts/styles only
for t in soup(["script", "style"]):
t.decompose()
# 3) Allowlist + absolutize + lazy promotion
for tag in soup.find_all(True):
if tag.name not in ALLOWED_TAGS:
tag.unwrap()
continue
if tag.name == "img":
if not tag.get("src"):
lazy_src = (
tag.get("data-src")
or tag.get("data-lazy-src")
or tag.get("data-original")
or tag.get("data-srcset")
)
if lazy_src:
tag["src"] = urljoin(base_url, lazy_src)
if not tag.get("srcset") and tag.get("data-srcset"):
tag["srcset"] = tag["data-srcset"]
if tag.name == "source":
if not tag.get("srcset") and tag.get("data-srcset"):
tag["srcset"] = tag["data-srcset"]
safe = {}
for k, v in list(tag.attrs.items()):
if isinstance(v, list):
v = " ".join(v)
if k in ALLOWED_ATTRS:
if k in ("href", "src"):
v = urljoin(base_url, v)
elif k == "srcset":
v = absolutize_srcset(v, base_url)
safe[k] = v
tag.attrs = safe
if tag.name == "a":
tag.attrs.setdefault("rel", "noopener noreferrer")
tag.attrs.setdefault("target", "_blank")
if tag.name == "img":
tag.attrs.setdefault("loading", "lazy")
tag.attrs.setdefault("referrerpolicy", "no-referrer")
# 4) Convert bare image links into <img>
for a in list(soup.find_all("a", href=True)):
href = a.get("href")
if href and is_image_url(href) and not a.find("img"):
href = urljoin(base_url, href)
img = soup.new_tag("img", src=href, loading="lazy", referrerpolicy="no-referrer")
text_alt = a.get_text(" ", strip=True)
if text_alt:
img["alt"] = text_alt.replace("Image:", "").strip()
a.replace_with(img)
# 5) Pick thumbnail before we drop noscripts
thumb_url = pick_thumbnail(raw_html, soup, base_url)
# 6) Remove residual noscript wrappers
for t in soup(["noscript"]):
t.decompose()
cleaned_html = str(soup)
text = BeautifulSoup(cleaned_html, "lxml").get_text("\n", strip=True)
text = "\n\n".join([line.strip() for line in text.split("\n") if line.strip()])
# 7) Fallback: if content is suspiciously short, try JSON-LD articleBody
if len(text) < 800:
jt, jhtml = extract_jsonld_article_body(raw_html)
if jhtml:
jsoup = BeautifulSoup(jhtml, "lxml")
for tag in jsoup.find_all(True):
if tag.name == "a" and tag.get("href"):
tag["href"] = urljoin(base_url, tag["href"])
tag["target"] = "_blank"
tag["rel"] = "noopener noreferrer"
if tag.name == "img":
if tag.get("src"):
tag["src"] = urljoin(base_url, tag["src"])
tag["loading"] = "lazy"
tag["referrerpolicy"] = "no-referrer"
cleaned_html = str(jsoup)
text = BeautifulSoup(cleaned_html, "lxml").get_text("\n", strip=True)
text = "\n\n".join([line.strip() for line in text.split("\n") if line.strip()])
if not title:
title = jt or title
return title, cleaned_html, text, thumb_url
# ---------------- Tag helpers ----------------
def ensure_tags(conn, names: list[str]) -> list[int]:
tag_ids = []
for n in names:
n = normalize_tag(n)
if not n:
continue
row = conn.execute("SELECT id FROM tags WHERE name = ?", (n,)).fetchone()
if row:
tag_ids.append(row["id"])
else:
cur = conn.execute("INSERT INTO tags(name) VALUES (?)", (n,))
tag_ids.append(cur.lastrowid)
return tag_ids
def add_item_tags(conn, item_id: int, names: list[str]) -> None:
names = [normalize_tag(t) for t in names if normalize_tag(t)]
if not names:
return
tag_ids = ensure_tags(conn, names)
conn.executemany(
"INSERT OR IGNORE INTO item_tags(item_id, tag_id) VALUES (?, ?)",
[(item_id, tid) for tid in tag_ids],
)
# ---------------- Authentication routes ----------------
@app.get(f"{APP_ROOT}/login")
def login_form():
err = request.args.get("err", "")
return render_template("login.html", error=err)
@app.post(f"{APP_ROOT}/login")
def login_submit():
username = (request.form.get("username") or "").strip()
password = (request.form.get("password") or "")
next_url = request.args.get("next") or f"{APP_ROOT}/"
if username == LOGIN_USERNAME and LOGIN_PASSWORD_HASH and check_password_hash(LOGIN_PASSWORD_HASH, password):
session.clear()
session.permanent = True # honors PERMANENT_SESSION_LIFETIME=7 days
session["auth_ok"] = True
session["who"] = username
return redirect(next_url, code=302)
qs = urlencode({"err": "Invalid credentials"})
return redirect(f"{APP_ROOT}/login?{qs}", code=302)
@app.post(f"{APP_ROOT}/logout")
def logout():
session.clear()
return redirect(f"{APP_ROOT}/login", code=302)
# ---------------- UI ----------------
@app.get(f"{APP_ROOT}/")
def index():
q = request.args.get("q", "").strip()
tag = request.args.get("tag", "").strip().lower()
tags_csv = request.args.get("tags", "").strip().lower()
filter_tags = []
if tag:
filter_tags = [normalize_tag(tag)]
elif tags_csv:
filter_tags = [normalize_tag(t) for t in tags_csv.split(",") if normalize_tag(t)]
with get_db() as db:
params: list = []
if q:
base_sql = """
SELECT i.id, i.title, i.url, i.added_at, i.thumb_url
FROM items_fts f
JOIN items i ON i.id = f.rowid
WHERE items_fts MATCH ?
"""
params.append(q)
else:
base_sql = """
SELECT i.id, i.title, i.url, i.added_at, i.thumb_url
FROM items i
"""
if filter_tags:
sqls = []
for tname in filter_tags:
if q:
sqls.append(
base_sql + """
AND EXISTS (
SELECT 1
FROM item_tags it
JOIN tags tg ON tg.id = it.tag_id
WHERE it.item_id = i.id AND tg.name = ?
)
"""
)
else:
sqls.append(
base_sql + """
WHERE EXISTS (
SELECT 1
FROM item_tags it
JOIN tags tg ON tg.id = it.tag_id
WHERE it.item_id = i.id AND tg.name = ?
)
"""
)
params.append(tname)
final_sql = " INTERSECT ".join(sqls) + " ORDER BY i.id DESC LIMIT 100"
rows = db.execute(final_sql, params).fetchall()
else:
final_sql = base_sql + (" ORDER BY bm25(items_fts) LIMIT 100" if q else " ORDER BY i.id DESC LIMIT 100")
rows = db.execute(final_sql, params).fetchall()
# Collect tags for visible items
if rows:
ids = [r["id"] for r in rows]
qmarks = ",".join("?" for _ in ids)
tagmap = {}
for tr in db.execute(
f"""
SELECT it.item_id, tg.name
FROM item_tags it
JOIN tags tg ON tg.id = it.tag_id
WHERE it.item_id IN ({qmarks})
ORDER BY tg.name
""",
ids,
).fetchall():
tagmap.setdefault(tr["item_id"], []).append(tr["name"])
else:
tagmap = {}
# All tags (for top row), with counts
tags_all = db.execute("""
SELECT tg.name AS name, COUNT(*) AS cnt
FROM tags tg
JOIN item_tags it ON it.tag_id = tg.id
GROUP BY tg.id
ORDER BY tg.name
""").fetchall()
return render_template(
"index.html",
rows=rows,
q=q,
filter_tags=filter_tags,
tagmap=tagmap,
tags_all=tags_all,
)
@app.get(f"{APP_ROOT}/item/<int:item_id>")
def detail(item_id: int):
with get_db() as db:
row = db.execute("SELECT * FROM items WHERE id = ?", (item_id,)).fetchone()
if not row:
abort(404)
tags = db.execute("""
SELECT tg.name FROM item_tags it
JOIN tags tg ON tg.id = it.tag_id
WHERE it.item_id = ? ORDER BY tg.name
""", (item_id,)).fetchall()
# All tags (for quick-add palette)
tags_all = db.execute("""
SELECT tg.name AS name, COUNT(*) AS cnt
FROM tags tg
LEFT JOIN item_tags it ON it.tag_id = tg.id
GROUP BY tg.id
ORDER BY tg.name
""").fetchall()
return render_template(
"detail.html",
item=row,
tags=[t["name"] for t in tags],
tags_all=tags_all,
)
# ---------------- Add via URL (server-side fetch) ----------------
@app.post(f"{APP_ROOT}/add-url")
def add_url():
url = (request.form.get("u") or "").strip()
if not url:
return jsonify({"error": "missing url"}), 400
try:
resp = requests.get(
url,
timeout=12,
headers={
"User-Agent": (
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) "
"AppleWebKit/537.36 (KHTML, like Gecko) "
"Chrome/120.0.0.0 Safari/537.36"
)
},
)
resp.raise_for_status()
content = resp.content
if len(content) > 8 * 1024 * 1024:
return jsonify({"error": "page too large"}), 413
html_text = resp.text
except requests.RequestException:
return jsonify({"error": "fetch_failed"}), 502
# Try to extract a <title> and serialize <html> back out
title_guess = None
try:
soup = BeautifulSoup(html_text, "lxml")
t = soup.find("title")
title_guess = (t.get_text() if t else "").strip() or None
html_node = soup.find("html")
if html_node:
html_text = "<!doctype html>" + str(html_node)
else:
html_text = "<!doctype html>" + html_text
except Exception:
html_text = "<!doctype html>" + html_text
# Reuse the same extraction path
try:
etitle, cleaned, text, thumb_url = extract_and_clean(html_text, url)
title = title_guess or etitle or url or "Untitled"
except Exception as e:
return jsonify({"error": f"extract_failed:{type(e).__name__}"}), 400
with get_db() as db:
db.execute(
"""
INSERT INTO items (url, title, content_html, content_text, thumb_url)
VALUES (?, ?, ?, ?, ?)
ON CONFLICT(url) DO UPDATE SET
title = excluded.title,
content_html = excluded.content_html,
content_text = excluded.content_text,
thumb_url = excluded.thumb_url,
added_at = datetime('now')
""",
(url, title, cleaned, text, thumb_url),
)
item_id = db.execute("SELECT id FROM items WHERE url = ?", (url,)).fetchone()["id"]
db.commit()
return redirect(url_for("detail", item_id=item_id))
# ---------------- API (extension) ----------------
@app.post(f"{APP_ROOT}/api/v1/capture")
def capture():
if not require_token():
return jsonify({"error": "unauthorized"}), 401
data = request.get_json(silent=True) or {}
url = (data.get("url") or "").strip()
title = (data.get("title") or "").strip()
raw = data.get("html") or ""
tags = data.get("tags") or []
if isinstance(tags, str):
tags = [t.strip() for t in tags.split(",") if t.strip()]
if not raw:
return jsonify({"error": "missing html"}), 400
try:
etitle, cleaned, text, thumb_url = extract_and_clean(raw, url)
if not title:
title = etitle or url or "Untitled"
except Exception as e:
return jsonify({"error": f"extract_failed:{type(e).__name__}"}), 400
with get_db() as db:
db.execute(
"""
INSERT INTO items (url, title, content_html, content_text, thumb_url)
VALUES (?, ?, ?, ?, ?)
ON CONFLICT(url) DO UPDATE SET
title = excluded.title,
content_html = excluded.content_html,
content_text = excluded.content_text,
thumb_url = excluded.thumb_url,
added_at = datetime('now')
""",
(url, title, cleaned, text, thumb_url),
)
item_id = db.execute("SELECT id FROM items WHERE url = ?", (url,)).fetchone()["id"]
if tags:
add_item_tags(db, item_id, tags)
db.commit()
return jsonify({"ok": True, "id": item_id, "title": title})
@app.get(f"{APP_ROOT}/healthz")
def healthz():
return jsonify({"status": "ok"})
# Delete (HTMX refresh)
@app.post(f"{APP_ROOT}/item/<int:item_id>/delete")
@app.delete(f"{APP_ROOT}/item/<int:item_id>")
def delete_item(item_id: int):
with get_db() as db:
db.execute("DELETE FROM items WHERE id = ?", (item_id,))
db.commit()
if request.headers.get("HX-Request") == "true":
resp = make_response("", 204)
resp.headers["HX-Refresh"] = "true" # full page reload
return resp
return redirect(url_for("index"))
# -------- Tag HTMX endpoints --------
@app.post(f"{APP_ROOT}/item/<int:item_id>/tag")
def add_tag(item_id: int):
name = normalize_tag(request.form.get("name", ""))
if not name:
return ("", 204)
with get_db() as db:
item = db.execute("SELECT * FROM items WHERE id = ?", (item_id,)).fetchone()
if not item:
abort(404)
add_item_tags(db, item_id, [name])
db.commit()
tags = db.execute("""
SELECT tg.name FROM item_tags it JOIN tags tg ON tg.id = it.tag_id
WHERE it.item_id = ? ORDER BY tg.name
""", (item_id,)).fetchall()
return render_template("tags.html", item=item, tags=[t["name"] for t in tags])
@app.post(f"{APP_ROOT}/item/<int:item_id>/tag/<name>/delete")
def delete_tag(item_id: int, name: str):
name = normalize_tag(name)
with get_db() as db:
item = db.execute("SELECT * FROM items WHERE id = ?", (item_id,)).fetchone()
if not item:
abort(404)
db.execute("""
DELETE FROM item_tags
WHERE item_id = ? AND tag_id = (SELECT id FROM tags WHERE name = ?)
""", (item_id, name))
db.commit()
tags = db.execute("""
SELECT tg.name FROM item_tags it JOIN tags tg ON tg.id = it.tag_id
WHERE it.item_id = ? ORDER BY tg.name
""", (item_id,)).fetchall()
return render_template("tags.html", item=item, tags=[t["name"] for t in tags])
return app
app = create_app()
def format_est(dt_str: str) -> str:
"""Convert UTC string to EST/EDT in 12-hour format."""
try:
dt = datetime.strptime(dt_str, "%Y-%m-%d %H:%M:%S")
dt = pytz.utc.localize(dt).astimezone(NY_TZ)
return dt.strftime("%b %d, %Y %I:%M %p")
except Exception:
return dt_str
app.jinja_env.filters["format_est"] = format_est
if __name__ == "__main__":
app.run(host="127.0.0.1", port=8013, debug=True)