Files
mobile-pii-discovery-agent/RQs/RQ2/RQ2_t6_0_total_columns.ipynb
2026-02-18 16:22:33 -05:00

197 lines
6.0 KiB
Plaintext

{
"cells": [
{
"cell_type": "code",
"execution_count": 1,
"id": "2c488f9e",
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"Wrote: I:\\project2026\\llmagent\\RQs\\RQ2\\app_total_columns.csv\n"
]
}
],
"source": [
"import sqlite3\n",
"from pathlib import Path\n",
"from collections import defaultdict, OrderedDict\n",
"from typing import Dict, Iterable, Optional, Tuple\n",
"\n",
"APP_NAME_PLAIN = OrderedDict([\n",
" (\"A1\", \"WhatsApp\"),\n",
" (\"A2\", \"Snapchat\"),\n",
" (\"A3\", \"Telegram\"),\n",
" (\"A4\", \"Google Maps\"),\n",
" (\"A5\", \"Samsung Internet\"),\n",
" (\"I1\", \"WhatsApp\"),\n",
" (\"I2\", \"Contacts\"),\n",
" (\"I3\", \"Apple Messages\"),\n",
" (\"I4\", \"Safari\"),\n",
" (\"I5\", \"Calendar\"),\n",
"])\n",
"\n",
"PATTERNS = (\"*.db\", \"*.sqlite\", \"*.sqlitedb\", \"*.sqlite3\")\n",
"\n",
"\n",
"# -------------------------\n",
"# Utilities\n",
"# -------------------------\n",
"\n",
"def get_app_code_from_filename(db_file: Path) -> str:\n",
" stem = db_file.stem\n",
" if \"_\" in stem:\n",
" return stem.split(\"_\", 1)[0]\n",
" if \"-\" in stem:\n",
" return stem.split(\"-\", 1)[0]\n",
" return stem\n",
"\n",
"\n",
"def count_columns_in_db(db_path: Path) -> int:\n",
" \"\"\"\n",
" Counts only real physical tables.\n",
" Excludes:\n",
" - sqlite_* internal tables\n",
" - VIRTUAL TABLE definitions (prevents tokenizer errors)\n",
" \"\"\"\n",
" conn: Optional[sqlite3.Connection] = None\n",
" total_cols = 0\n",
"\n",
" try:\n",
" conn = sqlite3.connect(str(db_path))\n",
" cur = conn.cursor()\n",
"\n",
" # Only physical tables, skip virtual tables entirely\n",
" cur.execute(\"\"\"\n",
" SELECT name\n",
" FROM sqlite_master\n",
" WHERE type='table'\n",
" AND name NOT LIKE 'sqlite_%'\n",
" AND sql NOT LIKE '%VIRTUAL TABLE%';\n",
" \"\"\")\n",
"\n",
" tables = [row[0] for row in cur.fetchall()]\n",
"\n",
" for table_name in tables:\n",
" try:\n",
" cur.execute(f'PRAGMA table_info(\"{table_name}\");')\n",
" cols = cur.fetchall()\n",
" total_cols += len(cols)\n",
" except sqlite3.Error:\n",
" # Skip problematic tables safely\n",
" continue\n",
"\n",
" except sqlite3.Error as e:\n",
" raise RuntimeError(f\"{db_path}: {e}\")\n",
"\n",
" finally:\n",
" if conn:\n",
" conn.close()\n",
"\n",
" return total_cols\n",
"\n",
"\n",
"def iter_db_files(in_dir: Path, patterns: Iterable[str]) -> Iterable[Path]:\n",
" seen = set()\n",
" for pat in patterns:\n",
" for fp in in_dir.glob(pat):\n",
" p = fp.resolve()\n",
" if p in seen:\n",
" continue\n",
" seen.add(p)\n",
" yield fp\n",
"\n",
"\n",
"# -------------------------\n",
"# Main CSV writer\n",
"# -------------------------\n",
"\n",
"def write_app_column_totals(\n",
" in_dir: str | Path,\n",
" out_csv: str | Path,\n",
" patterns: Tuple[str, ...] = PATTERNS,\n",
") -> Path:\n",
"\n",
" in_dir = Path(in_dir)\n",
" out_csv = Path(out_csv)\n",
" out_csv.parent.mkdir(parents=True, exist_ok=True)\n",
"\n",
" if not in_dir.exists():\n",
" raise FileNotFoundError(f\"Input folder not found: {in_dir.resolve()}\")\n",
"\n",
" totals_by_app: Dict[str, int] = defaultdict(int)\n",
"\n",
" files = list(iter_db_files(in_dir, patterns))\n",
"\n",
" if not files:\n",
" out_csv.write_text(\"app_code,app_name,total_columns\\n\", encoding=\"utf-8\")\n",
" return out_csv\n",
"\n",
" for fp in sorted(files):\n",
" app_code = get_app_code_from_filename(fp)\n",
" col_count = count_columns_in_db(fp)\n",
" totals_by_app[app_code] += col_count\n",
"\n",
" # Deterministic ordering\n",
" app_order = list(APP_NAME_PLAIN.keys()) + [\n",
" a for a in sorted(totals_by_app.keys()) if a not in APP_NAME_PLAIN\n",
" ]\n",
"\n",
" lines = [\"app_code,app_name,total_columns\"]\n",
"\n",
" for app_code in app_order:\n",
" if app_code not in totals_by_app:\n",
" continue\n",
" app_name = APP_NAME_PLAIN.get(app_code, app_code)\n",
" lines.append(f\"{app_code},{app_name},{totals_by_app[app_code]}\")\n",
"\n",
" out_csv.write_text(\"\\n\".join(lines) + \"\\n\", encoding=\"utf-8\")\n",
" return out_csv\n",
"\n",
"\n",
"# -------------------------\n",
"# Runner\n",
"# -------------------------\n",
"\n",
"if __name__ == \"__main__\":\n",
" IN_DIR = Path(r\"..\\..\\selectedDBs\")\n",
" OUT_CSV = Path(\"app_total_columns.csv\")\n",
"\n",
" out = write_app_column_totals(IN_DIR, OUT_CSV, patterns=PATTERNS)\n",
" print(f\"Wrote: {out.resolve()}\")\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "3bfab4bb",
"metadata": {},
"outputs": [],
"source": []
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 3",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.10.18"
}
},
"nbformat": 4,
"nbformat_minor": 5
}