Files
mobile-pii-discovery-agent/RQs/RQ2/RQ2_t6_search_reduction.ipynb
2026-02-11 22:29:04 -05:00

243 lines
9.3 KiB
Plaintext

{
"cells": [
{
"cell_type": "code",
"execution_count": 3,
"id": "a30eef73",
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"ID Apps CandidateCols ColsScanned Reduc(%)\n",
"-------------------------------------------------------------\n",
"A1 WhatsApp 1637 7 99.57%\n",
"A2 Snapchat 848 107 87.38%\n",
"A3 Telegram 1197 0 100.00%\n",
"A4 Google Maps 80 2 97.50%\n",
"A5 Samsung Internet 185 0 100.00%\n",
"I1 WhatsApp 328 0 100.00%\n",
"I2 Contacts 13 0 100.00%\n",
"I3 Apple Messages 186 0 100.00%\n",
"I4 Safari 74 0 100.00%\n",
"I5 Calendar 541 0 100.00%\n",
"test2 test2 12 0 100.00%\n",
"users users 3 0 100.00%\n",
"\n",
"Wrote LaTeX: I:\\project2026\\llmagent\\RQs\\RQ2\\RQ2_search_space_reduction_gpt4o.tex\n"
]
}
],
"source": [
"import csv\n",
"import json\n",
"from pathlib import Path\n",
"from collections import OrderedDict\n",
"from typing import Dict\n",
"\n",
"CSV_PATH = Path(r\"app_total_columns.csv\")\n",
"\n",
"BASE_DIR=Path(r\"..\\normalized_PII_results\\GPT-5.1\\app_level\")\n",
"JSONL_PATH = BASE_DIR /Path(r\"app_level.jsonl\")\n",
"OUT_TEX = Path(\"RQ2_search_space_reduction_gpt4o.tex\")\n",
"\n",
"APP_NAME_PLAIN = OrderedDict([\n",
" (\"A1\", \"WhatsApp\"),\n",
" (\"A2\", \"Snapchat\"),\n",
" (\"A3\", \"Telegram\"),\n",
" (\"A4\", \"Google Maps\"),\n",
" (\"A5\", \"Samsung Internet\"),\n",
" (\"I1\", \"WhatsApp\"),\n",
" (\"I2\", \"Contacts\"),\n",
" (\"I3\", \"Apple Messages\"),\n",
" (\"I4\", \"Safari\"),\n",
" (\"I5\", \"Calendar\"),\n",
"])\n",
"\n",
"\n",
"def get_app_code_from_db_path(db_path: str) -> str:\n",
" p = Path(db_path)\n",
" stem = p.stem\n",
" if \"_\" in stem:\n",
" return stem.split(\"_\", 1)[0]\n",
" if \"-\" in stem:\n",
" return stem.split(\"-\", 1)[0]\n",
" return stem\n",
"\n",
"\n",
"def read_candidate_totals(csv_path: Path) -> Dict[str, int]:\n",
" totals: Dict[str, int] = {}\n",
" with csv_path.open(\"r\", encoding=\"utf-8\", newline=\"\") as f:\n",
" reader = csv.DictReader(f)\n",
" for row in reader:\n",
" code = (row.get(\"app_code\") or \"\").strip()\n",
" tc = row.get(\"total_columns\")\n",
" if not code or tc is None:\n",
" continue\n",
" totals[code] = int(tc)\n",
" return totals\n",
"\n",
"\n",
"def read_scanned_cols_from_app_jsonl(jsonl_path: Path) -> Dict[str, int]:\n",
" scanned: Dict[str, int] = {}\n",
" with jsonl_path.open(\"r\", encoding=\"utf-8\") as f:\n",
" for line_no, line in enumerate(f, start=1):\n",
" line = line.strip()\n",
" if not line:\n",
" continue\n",
"\n",
" try:\n",
" rec = json.loads(line)\n",
" except json.JSONDecodeError as e:\n",
" raise ValueError(f\"Bad JSON in {jsonl_path} line {line_no}: {e}\") from e\n",
"\n",
" if not isinstance(rec, dict):\n",
" continue\n",
"\n",
" db_path = rec.get(\"db_path\", \"\")\n",
" if not isinstance(db_path, str) or not db_path:\n",
" continue\n",
"\n",
" app = get_app_code_from_db_path(db_path)\n",
"\n",
" n_scanned = rec.get(\"Num_of_source_columns\", None)\n",
" if isinstance(n_scanned, bool) or (n_scanned is not None and not isinstance(n_scanned, (int, float))):\n",
" n_scanned = None\n",
"\n",
" if n_scanned is None:\n",
" cols = rec.get(\"source_columns\", [])\n",
" n_scanned = len(cols) if isinstance(cols, list) else 0\n",
"\n",
" scanned[app] = int(n_scanned)\n",
"\n",
" return scanned\n",
"\n",
"\n",
"def format_reduction(total: int, scanned: int) -> float:\n",
" if total <= 0:\n",
" return 0.0\n",
" if scanned <= 0:\n",
" return 100.0\n",
" red = (1.0 - (scanned / total)) * 100.0\n",
" if red < 0:\n",
" red = 0.0\n",
" if red > 100:\n",
" red = 100.0\n",
" return red\n",
"\n",
"\n",
"def build_latex_table(candidate_totals: Dict[str, int], scanned_cols: Dict[str, int]) -> str:\n",
" lines = []\n",
" lines.append(r\"\\begin{table}[th]\")\n",
" lines.append(r\"\\centering\")\n",
" lines.append(r\"\\caption{Search space reduction during row-level PII extraction.}\")\n",
" lines.append(r\"\\label{tab:search_space_reduction}\")\n",
" lines.append(r\"\\small\")\n",
" lines.append(r\"\\begin{tabular}{|l|l|p{1.3cm}|p{1.7cm}|p{1.0cm}|}\")\n",
" lines.append(r\"\\hline\")\n",
" lines.append(\n",
" r\"\\textbf{ID} & \\textbf{Apps} & \"\n",
" r\"\\textbf{Candidate Cols (Total)} & \"\n",
" r\"\\textbf{Cols Scanned (Extraction)} & \"\n",
" r\"\\textbf{Reduc. (\\%)} \\\\\"\n",
" )\n",
" lines.append(r\"\\hline\")\n",
"\n",
" app_order = list(APP_NAME_PLAIN.keys())\n",
" extra = sorted((set(candidate_totals) | set(scanned_cols)) - set(app_order))\n",
" app_order += extra\n",
"\n",
" for app in app_order:\n",
" if app not in candidate_totals and app not in scanned_cols:\n",
" continue\n",
"\n",
" app_name = APP_NAME_PLAIN.get(app, app)\n",
" total = int(candidate_totals.get(app, 0))\n",
" scanned = int(scanned_cols.get(app, 0))\n",
" reduc = format_reduction(total, scanned)\n",
"\n",
" lines.append(f\"{app} & {app_name} & {total} & {scanned} & {reduc:.2f}\\\\% \\\\\\\\\")\n",
" lines.append(r\"\\hline\")\n",
"\n",
" lines.append(r\"\\end{tabular}\")\n",
" lines.append(r\"\\end{table}\")\n",
" return \"\\n\".join(lines)\n",
"\n",
"\n",
"def build_plaintext_table(candidate_totals: Dict[str, int], scanned_cols: Dict[str, int]) -> str:\n",
" headers = [\"ID\", \"Apps\", \"CandidateCols\", \"ColsScanned\", \"Reduc(%)\"]\n",
"\n",
" app_order = list(APP_NAME_PLAIN.keys())\n",
" extra = sorted((set(candidate_totals) | set(scanned_cols)) - set(app_order))\n",
" app_order += extra\n",
"\n",
" rows = [headers]\n",
" for app in app_order:\n",
" if app not in candidate_totals and app not in scanned_cols:\n",
" continue\n",
" app_name = APP_NAME_PLAIN.get(app, app)\n",
" total = int(candidate_totals.get(app, 0))\n",
" scanned = int(scanned_cols.get(app, 0))\n",
" reduc = format_reduction(total, scanned)\n",
" rows.append([app, app_name, str(total), str(scanned), f\"{reduc:.2f}%\"])\n",
"\n",
" # column widths\n",
" widths = [0] * len(headers)\n",
" for r in rows:\n",
" for i, cell in enumerate(r):\n",
" widths[i] = max(widths[i], len(cell))\n",
"\n",
" def fmt_row(r):\n",
" out = []\n",
" for i, cell in enumerate(r):\n",
" # left align text cols, right align numeric cols\n",
" if i in (0, 1):\n",
" out.append(cell.ljust(widths[i]))\n",
" else:\n",
" out.append(cell.rjust(widths[i]))\n",
" return \" \".join(out)\n",
"\n",
" lines = [fmt_row(rows[0]), \"-\" * len(fmt_row(rows[0]))]\n",
" for r in rows[1:]:\n",
" lines.append(fmt_row(r))\n",
" return \"\\n\".join(lines)\n",
"\n",
"\n",
"if __name__ == \"__main__\":\n",
" candidate_totals = read_candidate_totals(CSV_PATH)\n",
" scanned_cols = read_scanned_cols_from_app_jsonl(JSONL_PATH)\n",
"\n",
" # Write LaTeX to file\n",
" tex = build_latex_table(candidate_totals, scanned_cols)\n",
" OUT_TEX.write_text(tex, encoding=\"utf-8\")\n",
"\n",
" # Print ONLY plaintext table to screen\n",
" print(build_plaintext_table(candidate_totals, scanned_cols))\n",
" print(f\"\\nWrote LaTeX: {OUT_TEX.resolve()}\")\n"
]
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 3",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.10.18"
}
},
"nbformat": 4,
"nbformat_minor": 5
}