listing-radar/temu_scraper_api/storage.py

255 lines
8.2 KiB
Python

import json
import os
import re
import sqlite3
from contextlib import contextmanager
from datetime import datetime, timezone
from pathlib import Path
from typing import Any, Dict, List, Optional, Tuple
ScraperKey = str
DEFAULT_DB_PATH = os.getenv("TEMU_DB_PATH", "data/temu_scrapes.db")
def _utc_now_iso() -> str:
return datetime.now(timezone.utc).isoformat()
def _extract_product_fields(scraper: ScraperKey, item: Dict[str, Any]) -> Dict[str, Optional[str]]:
title = item.get("title")
url = item.get("url") or item.get("link_url")
product_id = item.get("productId") or item.get("product_id") or item.get("goods_id")
if product_id is not None:
product_id = str(product_id)
search_term = item.get("searchTerm") or item.get("search_term")
category = item.get("category")
if isinstance(category, dict):
category = json.dumps(category)
store = item.get("store")
brand = item.get("brand")
return {
"title": title,
"url": url,
"product_id": product_id,
"search_term": search_term,
"category": str(category) if category else None,
"store": store,
"brand": str(brand) if brand else None,
}
def _fts_query(user_query: str) -> str:
terms = re.findall(r"[\w]+", user_query, flags=re.UNICODE)
if not terms:
return '""'
return " ".join(f'"{term}"*' for term in terms)
class ScrapeStorage:
def __init__(self, db_path: str = DEFAULT_DB_PATH):
self.db_path = db_path
Path(db_path).parent.mkdir(parents=True, exist_ok=True)
@contextmanager
def _connect(self):
conn = sqlite3.connect(self.db_path)
conn.row_factory = sqlite3.Row
try:
yield conn
conn.commit()
finally:
conn.close()
def init_db(self) -> None:
with self._connect() as conn:
conn.executescript(
"""
CREATE TABLE IF NOT EXISTS scrape_runs (
id INTEGER PRIMARY KEY AUTOINCREMENT,
scraper TEXT NOT NULL,
saved_at TEXT NOT NULL,
actor_id TEXT,
request_json TEXT NOT NULL,
product_count INTEGER NOT NULL
);
CREATE TABLE IF NOT EXISTS scrape_products (
id INTEGER PRIMARY KEY AUTOINCREMENT,
run_id INTEGER NOT NULL,
scraper TEXT NOT NULL,
saved_at TEXT NOT NULL,
title TEXT,
url TEXT,
product_id TEXT,
search_term TEXT,
category TEXT,
store TEXT,
brand TEXT,
data_json TEXT NOT NULL,
FOREIGN KEY (run_id) REFERENCES scrape_runs(id)
);
CREATE INDEX IF NOT EXISTS idx_products_scraper_saved
ON scrape_products (scraper, saved_at DESC);
CREATE INDEX IF NOT EXISTS idx_products_run_id
ON scrape_products (run_id);
CREATE VIRTUAL TABLE IF NOT EXISTS scrape_products_fts USING fts5(
product_rowid UNINDEXED,
scraper UNINDEXED,
saved_at UNINDEXED,
title,
search_term,
category,
store,
brand,
url,
tokenize='unicode61'
);
"""
)
def save_scrape(
self,
scraper: ScraperKey,
actor_id: Optional[str],
request_payload: Dict[str, Any],
items: List[Dict[str, Any]],
) -> Tuple[int, str, int]:
saved_at = _utc_now_iso()
request_json = json.dumps(request_payload, ensure_ascii=False)
with self._connect() as conn:
cursor = conn.execute(
"""
INSERT INTO scrape_runs (scraper, saved_at, actor_id, request_json, product_count)
VALUES (?, ?, ?, ?, ?)
""",
(scraper, saved_at, actor_id, request_json, len(items)),
)
run_id = cursor.lastrowid
for item in items:
fields = _extract_product_fields(scraper, item)
data_json = json.dumps(item, ensure_ascii=False)
product_cursor = conn.execute(
"""
INSERT INTO scrape_products (
run_id, scraper, saved_at, title, url, product_id,
search_term, category, store, brand, data_json
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""",
(
run_id,
scraper,
saved_at,
fields["title"],
fields["url"],
fields["product_id"],
fields["search_term"],
fields["category"],
fields["store"],
fields["brand"],
data_json,
),
)
product_id = product_cursor.lastrowid
conn.execute(
"""
INSERT INTO scrape_products_fts (
product_rowid, scraper, saved_at, title, search_term,
category, store, brand, url
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
""",
(
product_id,
scraper,
saved_at,
fields["title"] or "",
fields["search_term"] or "",
fields["category"] or "",
fields["store"] or "",
fields["brand"] or "",
fields["url"] or "",
),
)
return run_id, saved_at, len(items)
def search_products(
self,
scraper: ScraperKey,
query: str,
limit: int = 50,
run_id: Optional[int] = None,
since: Optional[str] = None,
) -> Tuple[List[Dict[str, Any]], int]:
fts_q = _fts_query(query)
sql = """
SELECT
p.id,
p.run_id,
p.scraper,
p.saved_at,
p.title,
p.url,
p.product_id,
p.search_term,
p.data_json,
bm25(scrape_products_fts) AS rank
FROM scrape_products_fts
JOIN scrape_products p ON p.id = scrape_products_fts.product_rowid
WHERE scrape_products_fts MATCH ?
AND scrape_products_fts.scraper = ?
"""
params: List[Any] = [fts_q, scraper]
if run_id is not None:
sql += " AND p.run_id = ?"
params.append(run_id)
if since:
sql += " AND p.saved_at >= ?"
params.append(since)
sql += " ORDER BY rank LIMIT ?"
params.append(limit)
with self._connect() as conn:
rows = conn.execute(sql, params).fetchall()
total = conn.execute(
"SELECT COUNT(*) FROM scrape_products WHERE scraper = ?", (scraper,)
).fetchone()[0]
results = []
for row in rows:
product = json.loads(row["data_json"])
results.append(
{
"id": row["id"],
"run_id": row["run_id"],
"scraper": row["scraper"],
"saved_at": row["saved_at"],
"title": row["title"],
"url": row["url"],
"product_id": row["product_id"],
"search_term": row["search_term"],
"product": product,
"rank": row["rank"],
}
)
return results, total
_storage: Optional[ScrapeStorage] = None
def get_storage() -> ScrapeStorage:
global _storage
if _storage is None:
_storage = ScrapeStorage()
return _storage