From 3720c189e5a18a41bd88e595e28e92eb553afa7c Mon Sep 17 00:00:00 2001 From: "bahawal.baloch" Date: Thu, 21 May 2026 17:08:10 +0500 Subject: [PATCH] Add Temu scraper service with Docker configuration and API endpoints - Introduced a new Dockerfile for the Temu scraper service. - Updated docker-compose.yml to include the Temu scraper service with environment variables and volume configuration. - Implemented the TemuScraperService for scraping functionality and integrated it with FastAPI. - Added serializers for request and response handling. - Created storage management for saving scrape results in SQLite. - Developed API endpoints for scraping and searching saved results. - Included a smoke test script for validating the scraper functionality. --- Dockerfile.temu | 23 +++ docker-compose.yml | 17 +++ temu_scraper_api/__init__.py | 0 temu_scraper_api/main.py | 29 ++++ temu_scraper_api/req.txt | 17 +++ temu_scraper_api/serializers.py | 174 ++++++++++++++++++++++ temu_scraper_api/service.py | 82 +++++++++++ temu_scraper_api/storage.py | 254 ++++++++++++++++++++++++++++++++ temu_scraper_api/views.py | 183 +++++++++++++++++++++++ test_temu_scraper.py | 97 ++++++++++++ 10 files changed, 876 insertions(+) create mode 100644 Dockerfile.temu create mode 100644 temu_scraper_api/__init__.py create mode 100644 temu_scraper_api/main.py create mode 100644 temu_scraper_api/req.txt create mode 100644 temu_scraper_api/serializers.py create mode 100644 temu_scraper_api/service.py create mode 100644 temu_scraper_api/storage.py create mode 100644 temu_scraper_api/views.py create mode 100644 test_temu_scraper.py diff --git a/Dockerfile.temu b/Dockerfile.temu new file mode 100644 index 0000000..f8d899d --- /dev/null +++ b/Dockerfile.temu @@ -0,0 +1,23 @@ +# syntax=docker/dockerfile:1.6 +FROM python:3.13-slim + +ENV PYTHONDONTWRITEBYTECODE=1 \ + PYTHONUNBUFFERED=1 \ + PYTHONPATH=/app + +WORKDIR /app + +RUN --mount=type=cache,target=/var/cache/apt,sharing=locked \ + --mount=type=cache,target=/var/lib/apt,sharing=locked \ + apt-get update && apt-get install -y --no-install-recommends \ + curl + +COPY temu_scraper_api/req.txt ./req.txt +RUN --mount=type=cache,target=/root/.cache/pip \ + pip install --upgrade pip && pip install -r req.txt + +COPY temu_scraper_api ./temu_scraper_api + +EXPOSE 8001 + +CMD ["uvicorn", "temu_scraper_api.main:api", "--host", "0.0.0.0", "--port", "8001"] diff --git a/docker-compose.yml b/docker-compose.yml index d2bb284..5d1257c 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -58,6 +58,23 @@ services: env_file: - .env + temu_scraper: + build: + context: . + dockerfile: Dockerfile.temu + container_name: listing_radar_temu_scraper + restart: unless-stopped + environment: + APIFY_API_TOKEN: ${APIFY_API_TOKEN} + TEMU_DB_PATH: /data/temu_scrapes.db + ports: + - "8001:8001" + volumes: + - temu_scraper_data:/data + env_file: + - .env + volumes: mysql_data: qdrant_data: + temu_scraper_data: diff --git a/temu_scraper_api/__init__.py b/temu_scraper_api/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/temu_scraper_api/main.py b/temu_scraper_api/main.py new file mode 100644 index 0000000..ab87471 --- /dev/null +++ b/temu_scraper_api/main.py @@ -0,0 +1,29 @@ +from contextlib import asynccontextmanager + +from dotenv import load_dotenv +from fastapi import FastAPI + +from .storage import get_storage +from .views import app_router + +load_dotenv() + + +@asynccontextmanager +async def lifespan(app: FastAPI): + get_storage().init_db() + yield + + +api = FastAPI( + title="Temu Scraper API", + description=( + "POST endpoints run expensive Apify scrapers and persist results with timestamps. " + "GET endpoints search the local FTS index only." + ), + docs_url="/docs", + redoc_url="/redoc", + lifespan=lifespan, +) + +api.include_router(app_router, tags=["temu_scraper"]) diff --git a/temu_scraper_api/req.txt b/temu_scraper_api/req.txt new file mode 100644 index 0000000..f73c94a --- /dev/null +++ b/temu_scraper_api/req.txt @@ -0,0 +1,17 @@ +annotated-doc==0.0.4 +annotated-types==0.7.0 +anyio==4.13.0 +certifi==2026.4.22 +click==8.3.3 +fastapi==0.136.1 +h11==0.16.0 +httpcore==1.0.9 +httpx==0.28.1 +idna==3.13 +pydantic==2.13.4 +pydantic_core==2.46.4 +python-dotenv==1.2.2 +starlette==1.0.0 +typing-inspection==0.4.2 +typing_extensions==4.15.0 +uvicorn==0.46.0 diff --git a/temu_scraper_api/serializers.py b/temu_scraper_api/serializers.py new file mode 100644 index 0000000..8af7618 --- /dev/null +++ b/temu_scraper_api/serializers.py @@ -0,0 +1,174 @@ +from pydantic import BaseModel, ConfigDict, Field, model_validator +from typing import List, Literal, Optional, Any, Dict + +CurrencyCode = Literal["USD", "EUR"] + + +class ProxyConfiguration(BaseModel): + use_apify_proxy: bool = Field(True, serialization_alias="useApifyProxy") + apify_proxy_groups: Optional[List[str]] = Field( + default=["RESIDENTIAL"], + serialization_alias="apifyProxyGroups", + ) + + +class TemuScrapeRequest(BaseModel): + search_terms: List[str] = Field( + default_factory=list, + description="Keywords to search on Temu", + serialization_alias="searchTerms", + ) + product_urls: List[str] = Field( + default_factory=list, + description="Direct Temu product page URLs", + serialization_alias="productUrls", + ) + category: str = Field( + default="", + description="Category slug (e.g. electronics, women-clothing)", + ) + max_price: int = Field( + default=0, + ge=0, + description="Max price in USD; 0 means no filter", + serialization_alias="maxPrice", + ) + max_results: int = Field( + default=50, + ge=0, + description="Max products to scrape; 0 for unlimited", + serialization_alias="maxResults", + ) + proxy_configuration: Optional[ProxyConfiguration] = Field( + default=None, + serialization_alias="proxyConfiguration", + ) + + @model_validator(mode="after") + def require_scrape_target(self) -> "TemuScrapeRequest": + if not self.search_terms and not self.product_urls and not self.category: + raise ValueError( + "Provide at least one of search_terms, product_urls, or category" + ) + return self + + def to_actor_input(self) -> Dict[str, Any]: + payload: Dict[str, Any] = { + "searchTerms": self.search_terms, + "productUrls": self.product_urls, + "category": self.category, + "maxPrice": self.max_price, + "maxResults": self.max_results, + } + proxy = self.proxy_configuration or ProxyConfiguration() + payload["proxyConfiguration"] = proxy.model_dump( + by_alias=True, exclude_none=True + ) + return payload + + +class AmitTemuScrapeRequest(BaseModel): + """Input for amit123/temu-products-scraper.""" + + search_queries: List[str] = Field( + ..., + min_length=1, + description="Search keywords on Temu", + serialization_alias="searchQueries", + ) + currency: CurrencyCode = Field( + default="USD", + description="Price currency (USD or EUR)", + ) + max_results: int = Field( + default=40, + ge=20, + le=200, + description="Max products per search query (20-200)", + serialization_alias="maxResults", + ) + + def to_actor_input(self) -> Dict[str, Any]: + return { + "searchQueries": self.search_queries, + "currency": self.currency, + "maxResults": self.max_results, + } + + +class TemuProductVariant(BaseModel): + sku_id: Optional[str] = Field(None, alias="skuId") + title: Optional[str] = None + price: Optional[float] = None + original_price: Optional[float] = Field(None, alias="originalPrice") + image: Optional[str] = None + available: Optional[bool] = None + attributes: Optional[Dict[str, Any]] = None + + model_config = ConfigDict(populate_by_name=True, extra="allow") + + +class TemuProduct(BaseModel): + title: Optional[str] = None + price: Optional[float] = None + original_price: Optional[float] = Field(None, alias="originalPrice") + discount: Optional[str] = None + sold: Optional[str] = None + rating: Optional[float] = None + reviews: Optional[int] = None + category: Optional[str] = None + images: List[str] = Field(default_factory=list) + variants: List[Dict[str, Any]] = Field(default_factory=list) + store: Optional[str] = None + shipping_info: Optional[str] = Field(None, alias="shippingInfo") + url: Optional[str] = None + product_id: Optional[str] = Field(None, alias="productId") + brand: Optional[str] = None + currency: Optional[str] = None + search_term: Optional[str] = Field(None, alias="searchTerm") + scraped_at: Optional[str] = Field(None, alias="scrapedAt") + + model_config = ConfigDict(populate_by_name=True, extra="allow") + + +class SavedProduct(BaseModel): + id: int + run_id: int + scraper: str + saved_at: str + title: Optional[str] = None + url: Optional[str] = None + product_id: Optional[str] = None + search_term: Optional[str] = None + product: Dict[str, Any] = Field(default_factory=dict) + rank: Optional[float] = None + + model_config = ConfigDict(extra="allow") + + +class TemuScrapeResponse(BaseModel): + scraper: str = "sovereigntaylor" + actor_id: Optional[str] = None + saved_run_id: Optional[int] = None + saved_at: Optional[str] = None + search_terms: List[str] = Field(default_factory=list) + search_queries: List[str] = Field(default_factory=list) + product_urls: List[str] = Field(default_factory=list) + category: str = "" + currency: Optional[str] = None + max_results: int = 0 + product_count: int = 0 + run_id: Optional[str] = None + dataset_id: Optional[str] = None + dataset_url: Optional[str] = None + products: List[TemuProduct] = Field(default_factory=list) + + +class TemuSearchResponse(BaseModel): + query: str + scraper: str + match_count: int = 0 + total_saved_for_scraper: int = 0 + run_id: Optional[int] = None + since: Optional[str] = None + results: List[SavedProduct] = Field(default_factory=list) diff --git a/temu_scraper_api/service.py b/temu_scraper_api/service.py new file mode 100644 index 0000000..fc0f8e9 --- /dev/null +++ b/temu_scraper_api/service.py @@ -0,0 +1,82 @@ +import os +from typing import Any, Dict, List, Literal, Optional, Tuple + +import httpx +from dotenv import load_dotenv + +from .serializers import AmitTemuScrapeRequest, TemuScrapeRequest + +load_dotenv() + +ScraperKey = Literal["sovereigntaylor", "amit123"] + +ACTORS: Dict[ScraperKey, Dict[str, str]] = { + "sovereigntaylor": { + "id": "sovereigntaylor/temu-product-scraper", + "slug": "sovereigntaylor~temu-product-scraper", + }, + "amit123": { + "id": "amit123/temu-products-scraper", + "slug": "amit123~temu-products-scraper", + }, +} + + +class TemuScraperService: + """Runs Temu scrapers on Apify.""" + + def __init__( + self, + api_token: Optional[str] = None, + timeout: float = 600.0, + ): + self.api_token = api_token or os.getenv("APIFY_API_TOKEN") + self.timeout = timeout + + if not self.api_token: + raise ValueError("APIFY_API_TOKEN is not set in environment") + + def _run_sync_url(self, scraper: ScraperKey) -> str: + slug = ACTORS[scraper]["slug"] + return ( + f"https://api.apify.com/v2/acts/{slug}/run-sync-get-dataset-items" + ) + + async def _run_actor( + self, + scraper: ScraperKey, + run_input: Dict[str, Any], + ) -> Tuple[List[Dict[str, Any]], Dict[str, Any]]: + params = {"token": self.api_token} + url = self._run_sync_url(scraper) + + async with httpx.AsyncClient(timeout=self.timeout) as client: + response = await client.post(url, params=params, json=run_input) + response.raise_for_status() + items = response.json() + + if not isinstance(items, list): + raise ValueError(f"Unexpected Apify response type: {type(items).__name__}") + + meta = { + "scraper": scraper, + "actor_id": ACTORS[scraper]["id"], + "item_count": len(items), + } + return items, meta + + async def scrape_sovereigntaylor( + self, request: TemuScrapeRequest + ) -> Tuple[List[Dict[str, Any]], Dict[str, Any]]: + return await self._run_actor("sovereigntaylor", request.to_actor_input()) + + async def scrape_amit123( + self, request: AmitTemuScrapeRequest + ) -> Tuple[List[Dict[str, Any]], Dict[str, Any]]: + return await self._run_actor("amit123", request.to_actor_input()) + + async def scrape( + self, request: TemuScrapeRequest + ) -> Tuple[List[Dict[str, Any]], Dict[str, Any]]: + """Backward-compatible alias for sovereignigtaylor scraper.""" + return await self.scrape_sovereigntaylor(request) diff --git a/temu_scraper_api/storage.py b/temu_scraper_api/storage.py new file mode 100644 index 0000000..116ce27 --- /dev/null +++ b/temu_scraper_api/storage.py @@ -0,0 +1,254 @@ +import json +import os +import re +import sqlite3 +from contextlib import contextmanager +from datetime import datetime, timezone +from pathlib import Path +from typing import Any, Dict, List, Optional, Tuple + +ScraperKey = str + +DEFAULT_DB_PATH = os.getenv("TEMU_DB_PATH", "data/temu_scrapes.db") + + +def _utc_now_iso() -> str: + return datetime.now(timezone.utc).isoformat() + + +def _extract_product_fields(scraper: ScraperKey, item: Dict[str, Any]) -> Dict[str, Optional[str]]: + title = item.get("title") + url = item.get("url") or item.get("link_url") + product_id = item.get("productId") or item.get("product_id") or item.get("goods_id") + if product_id is not None: + product_id = str(product_id) + + search_term = item.get("searchTerm") or item.get("search_term") + category = item.get("category") + if isinstance(category, dict): + category = json.dumps(category) + store = item.get("store") + brand = item.get("brand") + + return { + "title": title, + "url": url, + "product_id": product_id, + "search_term": search_term, + "category": str(category) if category else None, + "store": store, + "brand": str(brand) if brand else None, + } + + +def _fts_query(user_query: str) -> str: + terms = re.findall(r"[\w]+", user_query, flags=re.UNICODE) + if not terms: + return '""' + return " ".join(f'"{term}"*' for term in terms) + + +class ScrapeStorage: + def __init__(self, db_path: str = DEFAULT_DB_PATH): + self.db_path = db_path + Path(db_path).parent.mkdir(parents=True, exist_ok=True) + + @contextmanager + def _connect(self): + conn = sqlite3.connect(self.db_path) + conn.row_factory = sqlite3.Row + try: + yield conn + conn.commit() + finally: + conn.close() + + def init_db(self) -> None: + with self._connect() as conn: + conn.executescript( + """ + CREATE TABLE IF NOT EXISTS scrape_runs ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + scraper TEXT NOT NULL, + saved_at TEXT NOT NULL, + actor_id TEXT, + request_json TEXT NOT NULL, + product_count INTEGER NOT NULL + ); + + CREATE TABLE IF NOT EXISTS scrape_products ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + run_id INTEGER NOT NULL, + scraper TEXT NOT NULL, + saved_at TEXT NOT NULL, + title TEXT, + url TEXT, + product_id TEXT, + search_term TEXT, + category TEXT, + store TEXT, + brand TEXT, + data_json TEXT NOT NULL, + FOREIGN KEY (run_id) REFERENCES scrape_runs(id) + ); + + CREATE INDEX IF NOT EXISTS idx_products_scraper_saved + ON scrape_products (scraper, saved_at DESC); + CREATE INDEX IF NOT EXISTS idx_products_run_id + ON scrape_products (run_id); + + CREATE VIRTUAL TABLE IF NOT EXISTS scrape_products_fts USING fts5( + product_rowid UNINDEXED, + scraper UNINDEXED, + saved_at UNINDEXED, + title, + search_term, + category, + store, + brand, + url, + tokenize='unicode61' + ); + """ + ) + + def save_scrape( + self, + scraper: ScraperKey, + actor_id: Optional[str], + request_payload: Dict[str, Any], + items: List[Dict[str, Any]], + ) -> Tuple[int, str, int]: + saved_at = _utc_now_iso() + request_json = json.dumps(request_payload, ensure_ascii=False) + + with self._connect() as conn: + cursor = conn.execute( + """ + INSERT INTO scrape_runs (scraper, saved_at, actor_id, request_json, product_count) + VALUES (?, ?, ?, ?, ?) + """, + (scraper, saved_at, actor_id, request_json, len(items)), + ) + run_id = cursor.lastrowid + + for item in items: + fields = _extract_product_fields(scraper, item) + data_json = json.dumps(item, ensure_ascii=False) + product_cursor = conn.execute( + """ + INSERT INTO scrape_products ( + run_id, scraper, saved_at, title, url, product_id, + search_term, category, store, brand, data_json + ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) + """, + ( + run_id, + scraper, + saved_at, + fields["title"], + fields["url"], + fields["product_id"], + fields["search_term"], + fields["category"], + fields["store"], + fields["brand"], + data_json, + ), + ) + product_id = product_cursor.lastrowid + conn.execute( + """ + INSERT INTO scrape_products_fts ( + product_rowid, scraper, saved_at, title, search_term, + category, store, brand, url + ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?) + """, + ( + product_id, + scraper, + saved_at, + fields["title"] or "", + fields["search_term"] or "", + fields["category"] or "", + fields["store"] or "", + fields["brand"] or "", + fields["url"] or "", + ), + ) + + return run_id, saved_at, len(items) + + def search_products( + self, + scraper: ScraperKey, + query: str, + limit: int = 50, + run_id: Optional[int] = None, + since: Optional[str] = None, + ) -> Tuple[List[Dict[str, Any]], int]: + fts_q = _fts_query(query) + + sql = """ + SELECT + p.id, + p.run_id, + p.scraper, + p.saved_at, + p.title, + p.url, + p.product_id, + p.search_term, + p.data_json, + bm25(scrape_products_fts) AS rank + FROM scrape_products_fts + JOIN scrape_products p ON p.id = scrape_products_fts.product_rowid + WHERE scrape_products_fts MATCH ? + AND scrape_products_fts.scraper = ? + """ + params: List[Any] = [fts_q, scraper] + + if run_id is not None: + sql += " AND p.run_id = ?" + params.append(run_id) + if since: + sql += " AND p.saved_at >= ?" + params.append(since) + + sql += " ORDER BY rank LIMIT ?" + params.append(limit) + + with self._connect() as conn: + rows = conn.execute(sql, params).fetchall() + total = conn.execute( + "SELECT COUNT(*) FROM scrape_products WHERE scraper = ?", (scraper,) + ).fetchone()[0] + + results = [] + for row in rows: + product = json.loads(row["data_json"]) + results.append( + { + "id": row["id"], + "run_id": row["run_id"], + "scraper": row["scraper"], + "saved_at": row["saved_at"], + "title": row["title"], + "url": row["url"], + "product_id": row["product_id"], + "search_term": row["search_term"], + "product": product, + "rank": row["rank"], + } + ) + return results, total + + +_storage: Optional[ScrapeStorage] = None + + +def get_storage() -> ScrapeStorage: + global _storage + if _storage is None: + _storage = ScrapeStorage() + return _storage diff --git a/temu_scraper_api/views.py b/temu_scraper_api/views.py new file mode 100644 index 0000000..0afe5be --- /dev/null +++ b/temu_scraper_api/views.py @@ -0,0 +1,183 @@ +import logging +from typing import List, Optional + +import httpx +from fastapi import APIRouter, HTTPException, Query + +from .serializers import ( + AmitTemuScrapeRequest, + SavedProduct, + TemuScrapeRequest, + TemuScrapeResponse, + TemuSearchResponse, + TemuProduct, +) +from .service import TemuScraperService +from .storage import get_storage + +app_router = APIRouter() + +logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s") +log = logging.getLogger(__name__) + + +def _products_from_items(items: list) -> List[TemuProduct]: + return [TemuProduct.model_validate(item) for item in items] + + +def _sovereigntaylor_response( + request: TemuScrapeRequest, + items: list, + meta: dict, + saved_run_id: int, + saved_at: str, +) -> TemuScrapeResponse: + products = _products_from_items(items) + return TemuScrapeResponse( + scraper=meta.get("scraper", "sovereigntaylor"), + actor_id=meta.get("actor_id"), + saved_run_id=saved_run_id, + saved_at=saved_at, + search_terms=request.search_terms, + product_urls=request.product_urls, + category=request.category, + max_results=request.max_results, + product_count=len(products), + products=products, + ) + + +def _amit123_response( + request: AmitTemuScrapeRequest, + items: list, + meta: dict, + saved_run_id: int, + saved_at: str, +) -> TemuScrapeResponse: + products = _products_from_items(items) + return TemuScrapeResponse( + scraper=meta.get("scraper", "amit123"), + actor_id=meta.get("actor_id"), + saved_run_id=saved_run_id, + saved_at=saved_at, + search_queries=request.search_queries, + currency=request.currency, + max_results=request.max_results, + product_count=len(products), + products=products, + ) + + +def _search_saved( + scraper: str, + q: str, + limit: int, + run_id: Optional[int], + since: Optional[str], +) -> TemuSearchResponse: + storage = get_storage() + results, total = storage.search_products( + scraper=scraper, + query=q, + limit=limit, + run_id=run_id, + since=since, + ) + return TemuSearchResponse( + query=q, + scraper=scraper, + match_count=len(results), + total_saved_for_scraper=total, + run_id=run_id, + since=since, + results=[SavedProduct.model_validate(r) for r in results], + ) + + +async def _handle_scrape(coro): + try: + return await coro + except ValueError as e: + log.error(f"Validation or config error: {e}") + raise HTTPException(status_code=400, detail=str(e)) + except httpx.HTTPStatusError as e: + log.error(f"Apify returned {e.response.status_code}: {e.response.text}") + raise HTTPException(status_code=e.response.status_code, detail=e.response.text) + except httpx.RequestError as e: + log.error(f"Network error calling Apify: {e}") + raise HTTPException(status_code=502, detail=f"Upstream request failed: {e}") + except Exception as e: + log.error(f"Unexpected error: {e}") + raise HTTPException(status_code=500, detail=str(e)) + + +@app_router.post("/scrape", response_model=TemuScrapeResponse) +async def scrape_and_save_sovereigntaylor(body: TemuScrapeRequest): + """ + Run Apify sovereignigtaylor scraper, save results with timestamp, and index them. + + Expensive — use GET /scrape to search saved results instead of re-scraping. + """ + async def run(): + service = TemuScraperService() + items, meta = await service.scrape_sovereigntaylor(body) + storage = get_storage() + run_id, saved_at, _ = storage.save_scrape( + scraper="sovereigntaylor", + actor_id=meta.get("actor_id"), + request_payload=body.model_dump(), + items=items, + ) + log.info(f"Saved {len(items)} products as run_id={run_id} at {saved_at}") + return _sovereigntaylor_response(body, items, meta, run_id, saved_at) + + return await _handle_scrape(run()) + + +@app_router.get("/scrape", response_model=TemuSearchResponse) +async def search_saved_sovereigntaylor( + q: str = Query(..., description="Search saved sovereignigtaylor results (FTS index)"), + limit: int = Query(50, ge=1, le=500), + run_id: Optional[int] = Query(None, description="Filter to a specific saved scrape run"), + since: Optional[str] = Query(None, description="Only results saved at or after this ISO timestamp"), +): + """Search locally saved scrape results. Does not call Apify.""" + if not q.strip(): + raise HTTPException(status_code=400, detail="Query parameter q is required") + return _search_saved("sovereigntaylor", q, limit, run_id, since) + + +@app_router.post("/scrape/amit123", response_model=TemuScrapeResponse) +async def scrape_and_save_amit123(body: AmitTemuScrapeRequest): + """ + Run Apify amit123 scraper, save results with timestamp, and index them. + + Expensive — use GET /scrape/amit123 to search saved results. + """ + async def run(): + service = TemuScraperService() + items, meta = await service.scrape_amit123(body) + storage = get_storage() + run_id, saved_at, _ = storage.save_scrape( + scraper="amit123", + actor_id=meta.get("actor_id"), + request_payload=body.model_dump(), + items=items, + ) + log.info(f"Saved {len(items)} products as run_id={run_id} at {saved_at}") + return _amit123_response(body, items, meta, run_id, saved_at) + + return await _handle_scrape(run()) + + +@app_router.get("/scrape/amit123", response_model=TemuSearchResponse) +async def search_saved_amit123( + q: str = Query(..., description="Search saved amit123 results (FTS index)"), + limit: int = Query(50, ge=1, le=500), + run_id: Optional[int] = Query(None, description="Filter to a specific saved scrape run"), + since: Optional[str] = Query(None, description="Only results saved at or after this ISO timestamp"), +): + """Search locally saved scrape results. Does not call Apify.""" + if not q.strip(): + raise HTTPException(status_code=400, detail="Query parameter q is required") + return _search_saved("amit123", q, limit, run_id, since) diff --git a/test_temu_scraper.py b/test_temu_scraper.py new file mode 100644 index 0000000..5455507 --- /dev/null +++ b/test_temu_scraper.py @@ -0,0 +1,97 @@ +"""Smoke test for Temu scrapers (POST=Apify+save, GET=search cache). + +Usage: + python test_temu_scraper.py scrape "wireless earbuds" 10 + python test_temu_scraper.py search "wireless" 20 + python test_temu_scraper.py scrape "women dress" 40 amit123 + python test_temu_scraper.py search "dress" 10 amit123 + +Requires APIFY_API_TOKEN in .env for scrape mode only. +""" +import asyncio +import os +import sys + +from dotenv import load_dotenv + +load_dotenv() + +from temu_scraper_api.serializers import AmitTemuScrapeRequest, TemuScrapeRequest +from temu_scraper_api.service import TemuScraperService +from temu_scraper_api.storage import get_storage + + +async def scrape(query: str, max_results: int, scraper: str) -> None: + if not os.getenv("APIFY_API_TOKEN"): + print("[FAIL] APIFY_API_TOKEN not set in .env") + sys.exit(1) + + storage = get_storage() + storage.init_db() + service = TemuScraperService() + + if scraper == "amit123": + max_results = max(max_results, 20) + request = AmitTemuScrapeRequest(search_queries=[query], max_results=max_results) + print(f"[amit123] Scraping '{query}' (max_results={max_results})...\n") + items, meta = await service.scrape_amit123(request) + request_payload = request.model_dump() + else: + request = TemuScrapeRequest(search_terms=[query], max_results=max_results) + print(f"[sovereigntaylor] Scraping '{query}' (max_results={max_results})...\n") + items, meta = await service.scrape_sovereigntaylor(request) + request_payload = request.model_dump() + + run_id, saved_at, count = storage.save_scrape( + scraper=scraper, + actor_id=meta.get("actor_id"), + request_payload=request_payload, + items=items, + ) + print(f"Actor: {meta.get('actor_id')}") + print(f"Saved run_id={run_id} at {saved_at} ({count} products)\n") + _print_items(items) + + +def search(query: str, limit: int, scraper: str) -> None: + storage = get_storage() + storage.init_db() + results, total = storage.search_products(scraper=scraper, query=query, limit=limit) + print(f"[{scraper}] Search '{query}' -> {len(results)} matches ({total} total saved)\n") + for i, row in enumerate(results[:10], 1): + product = row["product"] + title = row.get("title") or product.get("title", "N/A") + price = product.get("price") or product.get("price_info.price_str", "N/A") + print(f"{i}. [{row['saved_at']}] run={row['run_id']} {title}") + print(f" price={price} rank={row.get('rank')}") + print(f" {row.get('url')}\n") + if len(results) > 10: + print(f"... and {len(results) - 10} more") + + +def _print_items(items: list) -> None: + for i, item in enumerate(items[:10], 1): + title = item.get("title", "N/A") + price = item.get("price") or item.get("price_info.price_str", "N/A") + url = item.get("url") or item.get("link_url", "") + print(f"{i}. {title}") + print(f" price={price}") + print(f" {url}\n") + if len(items) > 10: + print(f"... and {len(items) - 10} more") + + +async def main(): + mode = sys.argv[1] if len(sys.argv) > 1 else "scrape" + query = sys.argv[2] if len(sys.argv) > 2 else "wireless earbuds" + limit = int(sys.argv[3]) if len(sys.argv) > 3 else 5 + scraper = sys.argv[4] if len(sys.argv) > 4 else "sovereigntaylor" + + if mode == "search": + search(query, limit, scraper) + else: + await scrape(query, limit, scraper) + + +if __name__ == "__main__": + asyncio.run(main())