Compare commits

...

6 Commits

Author SHA1 Message Date
bahawal.baloch 3720c189e5 Add Temu scraper service with Docker configuration and API endpoints
- Introduced a new Dockerfile for the Temu scraper service.
- Updated docker-compose.yml to include the Temu scraper service with environment variables and volume configuration.
- Implemented the TemuScraperService for scraping functionality and integrated it with FastAPI.
- Added serializers for request and response handling.
- Created storage management for saving scrape results in SQLite.
- Developed API endpoints for scraping and searching saved results.
- Included a smoke test script for validating the scraper functionality.
2026-05-21 17:08:10 +05:00
bahawal.baloch 593079fb68 Update Dockerfile for caching and modify req.txt for platform-specific dependencies; add smoke test for Google Custom Search API 2026-05-14 17:43:44 +05:00
bahawal.baloch 40e3e26eef Add logging for Google CSE request parameters and update Qdrant client URL configuration 2026-05-14 17:10:57 +05:00
bahawal.baloch 96bd923387 Add Docker configuration with MySQL and Qdrant services, including health checks and environment variables 2026-05-14 17:10:52 +05:00
bahawal.baloch cc9466ab60 Refactor CustomSearchService to use updated environment variable names and add search endpoint with error handling 2026-05-14 15:53:21 +05:00
bahawal.baloch 8d9b084a0b Add custom search API with serializers and service implementation 2026-05-14 15:52:47 +05:00
21 changed files with 1200 additions and 23 deletions

18
.dockerignore Normal file
View File

@ -0,0 +1,18 @@
.git
.gitignore
.venv
**/__pycache__
**/*.pyc
**/*.pyo
**/*.pyd
.env
.env.*
*.log
.vscode
.idea
.DS_Store
Thumbs.db
**/downloaded_images
**/data
model_export/train.py
model_export/dataset.py

3
.gitignore vendored
View File

@ -6,4 +6,5 @@
*agent/** *agent/**
**downloaded_images** **downloaded_images**
**model_export** **model_export**
**cpython** **cpython**
.claude

29
Dockerfile Normal file
View File

@ -0,0 +1,29 @@
# syntax=docker/dockerfile:1.6
FROM python:3.13-slim
ENV PYTHONDONTWRITEBYTECODE=1 \
PYTHONUNBUFFERED=1 \
PYTHONPATH=/app
WORKDIR /app
RUN --mount=type=cache,target=/var/cache/apt,sharing=locked \
--mount=type=cache,target=/var/lib/apt,sharing=locked \
apt-get update && apt-get install -y --no-install-recommends \
build-essential \
curl \
git
COPY req.txt ./
RUN --mount=type=cache,target=/root/.cache/pip \
pip install --upgrade pip && pip install -r req.txt
COPY dev_backend ./dev_backend
COPY custom_search_api ./custom_search_api
COPY model_export ./model_export
WORKDIR /app/dev_backend
EXPOSE 8000
CMD ["uvicorn", "main:api", "--host", "0.0.0.0", "--port", "8000"]

23
Dockerfile.temu Normal file
View File

@ -0,0 +1,23 @@
# syntax=docker/dockerfile:1.6
FROM python:3.13-slim
ENV PYTHONDONTWRITEBYTECODE=1 \
PYTHONUNBUFFERED=1 \
PYTHONPATH=/app
WORKDIR /app
RUN --mount=type=cache,target=/var/cache/apt,sharing=locked \
--mount=type=cache,target=/var/lib/apt,sharing=locked \
apt-get update && apt-get install -y --no-install-recommends \
curl
COPY temu_scraper_api/req.txt ./req.txt
RUN --mount=type=cache,target=/root/.cache/pip \
pip install --upgrade pip && pip install -r req.txt
COPY temu_scraper_api ./temu_scraper_api
EXPOSE 8001
CMD ["uvicorn", "temu_scraper_api.main:api", "--host", "0.0.0.0", "--port", "8001"]

View File

View File

@ -0,0 +1,19 @@
from pydantic import BaseModel, Field
from typing import List, Optional, Any, Dict
class SearchResultItem(BaseModel):
title: Optional[str] = None
link: Optional[str] = None
displayLink: Optional[str] = None
snippet: Optional[str] = None
formattedUrl: Optional[str] = None
pagemap: Optional[Dict[str, Any]] = None
class SearchResponse(BaseModel):
query: str
total_results: Optional[str] = None
search_time: Optional[float] = None
items: List[SearchResultItem] = Field(default_factory=list)
raw: Optional[Dict[str, Any]] = None

View File

@ -0,0 +1,47 @@
import os
import httpx
from typing import Optional, Dict, Any
from dotenv import load_dotenv
load_dotenv()
GOOGLE_CSE_ENDPOINT = "https://www.googleapis.com/customsearch/v1"
class CustomSearchService:
def __init__(
self,
api_key: Optional[str] = None,
cx: Optional[str] = None,
timeout: float = 30.0,
):
self.api_key = api_key or os.getenv("GEMINI_API_KEY")
self.cx = cx or os.getenv("SEARCH_ENGINE_ID")
self.timeout = timeout
if not self.api_key:
raise ValueError("GEMINI_API_KEY is not set in environment")
if not self.cx:
raise ValueError("SEARCH_ENGINE_ID is not set in environment")
async def search(
self,
query: str,
num: int = 10,
start: int = 1,
extra_params: Optional[Dict[str, Any]] = None,
) -> Dict[str, Any]:
params: Dict[str, Any] = {
"key": self.api_key,
"cx": self.cx,
"q": query,
"num": num,
"start": start,
}
if extra_params:
params.update(extra_params)
print(f"Making request to Google CSE with params: {params}")
async with httpx.AsyncClient(timeout=self.timeout) as client:
response = await client.get(GOOGLE_CSE_ENDPOINT, params=params)
response.raise_for_status()
return response.json()

View File

@ -0,0 +1,59 @@
import logging
from typing import Optional
from fastapi import APIRouter, HTTPException, Query
from fastapi.responses import JSONResponse
import httpx
from .service import CustomSearchService
from .serializers import SearchResponse, SearchResultItem
app_router = APIRouter()
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
log = logging.getLogger(__name__)
@app_router.get("/search", response_model=SearchResponse)
async def search_endpoint(
q: str = Query(..., description="Search query string"),
num: int = Query(10, ge=1, le=10, description="Number of results to return (1-10)"),
start: int = Query(1, ge=1, description="Start index for pagination"),
):
try:
service = CustomSearchService()
data = await service.search(query=q, num=num, start=start)
items = [
SearchResultItem(
title=item.get("title"),
link=item.get("link"),
displayLink=item.get("displayLink"),
snippet=item.get("snippet"),
formattedUrl=item.get("formattedUrl"),
pagemap=item.get("pagemap"),
)
for item in data.get("items", [])
]
search_info = data.get("searchInformation", {}) or {}
return SearchResponse(
query=q,
total_results=search_info.get("totalResults"),
search_time=search_info.get("searchTime"),
items=items,
raw=data,
)
except ValueError as e:
log.error(f"Configuration error: {e}")
raise HTTPException(status_code=500, detail=str(e))
except httpx.HTTPStatusError as e:
log.error(f"Google CSE returned {e.response.status_code}: {e.response.text}")
raise HTTPException(status_code=e.response.status_code, detail=e.response.text)
except httpx.RequestError as e:
log.error(f"Network error calling Google CSE: {e}")
raise HTTPException(status_code=502, detail=f"Upstream request failed: {e}")
except Exception as e:
log.error(f"Unexpected error: {e}")
raise HTTPException(status_code=500, detail=str(e))

View File

@ -25,8 +25,8 @@ DATABASE_URL = URL.create(
async def get_qdrant_client()->AsyncGenerator[AsyncQdrantClient,None]: async def get_qdrant_client()->AsyncGenerator[AsyncQdrantClient,None]:
# Replace with your Qdrant URL qdrant_url = os.getenv("QDRANT_URL", "http://localhost:6333")
client = AsyncQdrantClient(url="http://localhost:6333", timeout=60) client = AsyncQdrantClient(url=qdrant_url, timeout=60)
try: try:
yield client yield client
finally: finally:

View File

@ -10,6 +10,7 @@ from dotenv import load_dotenv
from db_setup import get_qdrant_client,get_session from db_setup import get_qdrant_client,get_session
from mysql_process.views import app_router as mysql_router from mysql_process.views import app_router as mysql_router
from vector_db_router.views import app_router as vector_db_router from vector_db_router.views import app_router as vector_db_router
from custom_search_api.views import app_router as custom_search_router
load_dotenv() load_dotenv()
api = FastAPI( api = FastAPI(
@ -18,4 +19,5 @@ api = FastAPI(
) )
api.include_router(mysql_router,prefix="/mysql",tags=["mysql_process"]) api.include_router(mysql_router,prefix="/mysql",tags=["mysql_process"])
api.include_router(vector_db_router,prefix="/collection",tags=["vector_db"]) api.include_router(vector_db_router,prefix="/collection",tags=["vector_db"])
api.include_router(custom_search_router,prefix="/custom_search",tags=["custom_search"])

80
docker-compose.yml Normal file
View File

@ -0,0 +1,80 @@
services:
mysql:
image: mysql:8.0
container_name: listing_radar_mysql
restart: unless-stopped
environment:
MYSQL_ROOT_PASSWORD: ${MYSQL_ROOT_PASSWORD:-rootpass}
MYSQL_DATABASE: ${MYSQL_DATABASE:-listing_radar}
MYSQL_USER: ${MYSQL_USER:-app}
MYSQL_PASSWORD: ${MYSQL_PASSWORD:-apppass}
ports:
- "${MYSQL_PORT:-3306}:3306"
volumes:
- mysql_data:/var/lib/mysql
healthcheck:
test: ["CMD", "mysqladmin", "ping", "-h", "localhost", "-u", "root", "-p${MYSQL_ROOT_PASSWORD:-rootpass}"]
interval: 10s
timeout: 5s
retries: 10
qdrant:
image: qdrant/qdrant:latest
container_name: listing_radar_qdrant
restart: unless-stopped
ports:
- "6333:6333"
- "6334:6334"
volumes:
- qdrant_data:/qdrant/storage
healthcheck:
test: ["CMD-SHELL", "bash -c ':> /dev/tcp/127.0.0.1/6333' || exit 1"]
interval: 10s
timeout: 5s
retries: 10
api:
build:
context: .
dockerfile: Dockerfile
container_name: listing_radar_api
restart: unless-stopped
depends_on:
mysql:
condition: service_healthy
qdrant:
condition: service_started
environment:
MYSQL_HOST: mysql
MYSQL_PORT: 3306
MYSQL_USER: ${MYSQL_USER:-app}
MYSQL_PASSWORD: ${MYSQL_PASSWORD:-apppass}
MYSQL_DATABASE: ${MYSQL_DATABASE:-listing_radar}
QDRANT_URL: http://qdrant:6333
GEMINI_API_KEY: ${GEMINI_API_KEY}
SEARCH_ENGINE_ID: ${SEARCH_ENGINE_ID}
ports:
- "8000:8000"
env_file:
- .env
temu_scraper:
build:
context: .
dockerfile: Dockerfile.temu
container_name: listing_radar_temu_scraper
restart: unless-stopped
environment:
APIFY_API_TOKEN: ${APIFY_API_TOKEN}
TEMU_DB_PATH: /data/temu_scrapes.db
ports:
- "8001:8001"
volumes:
- temu_scraper_data:/data
env_file:
- .env
volumes:
mysql_data:
qdrant_data:
temu_scraper_data:

41
req.txt
View File

@ -5,9 +5,9 @@ asyncmy==0.2.11
certifi==2026.4.22 certifi==2026.4.22
click==8.3.3 click==8.3.3
contourpy==1.3.3 contourpy==1.3.3
cuda-bindings==13.2.0 cuda-bindings==13.2.0 ; sys_platform == "linux"
cuda-pathfinder==1.5.4 cuda-pathfinder==1.5.4 ; sys_platform == "linux"
cuda-toolkit==13.0.2 cuda-toolkit==13.0.2 ; sys_platform == "linux"
cycler==0.12.1 cycler==0.12.1
et_xmlfile==2.0.0 et_xmlfile==2.0.0
fastapi==0.136.1 fastapi==0.136.1
@ -23,6 +23,7 @@ httpcore==1.0.9
httpx==0.28.1 httpx==0.28.1
hyperframe==6.1.0 hyperframe==6.1.0
idna==3.13 idna==3.13
imagehash==4.3.2
Jinja2==3.1.6 Jinja2==3.1.6
joblib==1.5.3 joblib==1.5.3
kiwisolver==1.5.0 kiwisolver==1.5.0
@ -31,21 +32,21 @@ matplotlib==3.10.9
mpmath==1.3.0 mpmath==1.3.0
networkx==3.6.1 networkx==3.6.1
numpy==2.4.4 numpy==2.4.4
nvidia-cublas==13.1.0.3 nvidia-cublas==13.1.0.3 ; sys_platform == "linux"
nvidia-cuda-cupti==13.0.85 nvidia-cuda-cupti==13.0.85 ; sys_platform == "linux"
nvidia-cuda-nvrtc==13.0.88 nvidia-cuda-nvrtc==13.0.88 ; sys_platform == "linux"
nvidia-cuda-runtime==13.0.96 nvidia-cuda-runtime==13.0.96 ; sys_platform == "linux"
nvidia-cudnn-cu13==9.19.0.56 nvidia-cudnn-cu13==9.19.0.56 ; sys_platform == "linux"
nvidia-cufft==12.0.0.61 nvidia-cufft==12.0.0.61 ; sys_platform == "linux"
nvidia-cufile==1.15.1.6 nvidia-cufile==1.15.1.6 ; sys_platform == "linux"
nvidia-curand==10.4.0.35 nvidia-curand==10.4.0.35 ; sys_platform == "linux"
nvidia-cusolver==12.0.4.66 nvidia-cusolver==12.0.4.66 ; sys_platform == "linux"
nvidia-cusparse==12.6.3.3 nvidia-cusparse==12.6.3.3 ; sys_platform == "linux"
nvidia-cusparselt-cu13==0.8.0 nvidia-cusparselt-cu13==0.8.0 ; sys_platform == "linux"
nvidia-nccl-cu13==2.28.9 nvidia-nccl-cu13==2.28.9 ; sys_platform == "linux"
nvidia-nvjitlink==13.0.88 nvidia-nvjitlink==13.0.88 ; sys_platform == "linux"
nvidia-nvshmem-cu13==3.4.5 nvidia-nvshmem-cu13==3.4.5 ; sys_platform == "linux"
nvidia-nvtx==13.0.85 nvidia-nvtx==13.0.85 ; sys_platform == "linux"
openpyxl==3.1.5 openpyxl==3.1.5
packaging==26.2 packaging==26.2
pandas==3.0.2 pandas==3.0.2
@ -58,10 +59,12 @@ pyparsing==3.3.2
python-dateutil==2.9.0.post0 python-dateutil==2.9.0.post0
python-dotenv==1.2.2 python-dotenv==1.2.2
qdrant-client==1.17.1 qdrant-client==1.17.1
requests==2.34.1
scikit-learn==1.8.0 scikit-learn==1.8.0
scipy==1.17.1 scipy==1.17.1
setuptools==81.0.0 setuptools==81.0.0
six==1.17.0 six==1.17.0
sqlmodel==0.0.22
SQLAlchemy==2.0.49 SQLAlchemy==2.0.49
starlette==1.0.0 starlette==1.0.0
sympy==1.14.0 sympy==1.14.0
@ -69,7 +72,7 @@ threadpoolctl==3.6.0
torch==2.11.0 torch==2.11.0
torchvision==0.26.0 torchvision==0.26.0
tqdm==4.67.3 tqdm==4.67.3
triton==3.6.0 triton==3.6.0 ; sys_platform == "linux"
typing-inspection==0.4.2 typing-inspection==0.4.2
typing_extensions==4.15.0 typing_extensions==4.15.0
urllib3==2.6.3 urllib3==2.6.3

View File

29
temu_scraper_api/main.py Normal file
View File

@ -0,0 +1,29 @@
from contextlib import asynccontextmanager
from dotenv import load_dotenv
from fastapi import FastAPI
from .storage import get_storage
from .views import app_router
load_dotenv()
@asynccontextmanager
async def lifespan(app: FastAPI):
get_storage().init_db()
yield
api = FastAPI(
title="Temu Scraper API",
description=(
"POST endpoints run expensive Apify scrapers and persist results with timestamps. "
"GET endpoints search the local FTS index only."
),
docs_url="/docs",
redoc_url="/redoc",
lifespan=lifespan,
)
api.include_router(app_router, tags=["temu_scraper"])

17
temu_scraper_api/req.txt Normal file
View File

@ -0,0 +1,17 @@
annotated-doc==0.0.4
annotated-types==0.7.0
anyio==4.13.0
certifi==2026.4.22
click==8.3.3
fastapi==0.136.1
h11==0.16.0
httpcore==1.0.9
httpx==0.28.1
idna==3.13
pydantic==2.13.4
pydantic_core==2.46.4
python-dotenv==1.2.2
starlette==1.0.0
typing-inspection==0.4.2
typing_extensions==4.15.0
uvicorn==0.46.0

View File

@ -0,0 +1,174 @@
from pydantic import BaseModel, ConfigDict, Field, model_validator
from typing import List, Literal, Optional, Any, Dict
CurrencyCode = Literal["USD", "EUR"]
class ProxyConfiguration(BaseModel):
use_apify_proxy: bool = Field(True, serialization_alias="useApifyProxy")
apify_proxy_groups: Optional[List[str]] = Field(
default=["RESIDENTIAL"],
serialization_alias="apifyProxyGroups",
)
class TemuScrapeRequest(BaseModel):
search_terms: List[str] = Field(
default_factory=list,
description="Keywords to search on Temu",
serialization_alias="searchTerms",
)
product_urls: List[str] = Field(
default_factory=list,
description="Direct Temu product page URLs",
serialization_alias="productUrls",
)
category: str = Field(
default="",
description="Category slug (e.g. electronics, women-clothing)",
)
max_price: int = Field(
default=0,
ge=0,
description="Max price in USD; 0 means no filter",
serialization_alias="maxPrice",
)
max_results: int = Field(
default=50,
ge=0,
description="Max products to scrape; 0 for unlimited",
serialization_alias="maxResults",
)
proxy_configuration: Optional[ProxyConfiguration] = Field(
default=None,
serialization_alias="proxyConfiguration",
)
@model_validator(mode="after")
def require_scrape_target(self) -> "TemuScrapeRequest":
if not self.search_terms and not self.product_urls and not self.category:
raise ValueError(
"Provide at least one of search_terms, product_urls, or category"
)
return self
def to_actor_input(self) -> Dict[str, Any]:
payload: Dict[str, Any] = {
"searchTerms": self.search_terms,
"productUrls": self.product_urls,
"category": self.category,
"maxPrice": self.max_price,
"maxResults": self.max_results,
}
proxy = self.proxy_configuration or ProxyConfiguration()
payload["proxyConfiguration"] = proxy.model_dump(
by_alias=True, exclude_none=True
)
return payload
class AmitTemuScrapeRequest(BaseModel):
"""Input for amit123/temu-products-scraper."""
search_queries: List[str] = Field(
...,
min_length=1,
description="Search keywords on Temu",
serialization_alias="searchQueries",
)
currency: CurrencyCode = Field(
default="USD",
description="Price currency (USD or EUR)",
)
max_results: int = Field(
default=40,
ge=20,
le=200,
description="Max products per search query (20-200)",
serialization_alias="maxResults",
)
def to_actor_input(self) -> Dict[str, Any]:
return {
"searchQueries": self.search_queries,
"currency": self.currency,
"maxResults": self.max_results,
}
class TemuProductVariant(BaseModel):
sku_id: Optional[str] = Field(None, alias="skuId")
title: Optional[str] = None
price: Optional[float] = None
original_price: Optional[float] = Field(None, alias="originalPrice")
image: Optional[str] = None
available: Optional[bool] = None
attributes: Optional[Dict[str, Any]] = None
model_config = ConfigDict(populate_by_name=True, extra="allow")
class TemuProduct(BaseModel):
title: Optional[str] = None
price: Optional[float] = None
original_price: Optional[float] = Field(None, alias="originalPrice")
discount: Optional[str] = None
sold: Optional[str] = None
rating: Optional[float] = None
reviews: Optional[int] = None
category: Optional[str] = None
images: List[str] = Field(default_factory=list)
variants: List[Dict[str, Any]] = Field(default_factory=list)
store: Optional[str] = None
shipping_info: Optional[str] = Field(None, alias="shippingInfo")
url: Optional[str] = None
product_id: Optional[str] = Field(None, alias="productId")
brand: Optional[str] = None
currency: Optional[str] = None
search_term: Optional[str] = Field(None, alias="searchTerm")
scraped_at: Optional[str] = Field(None, alias="scrapedAt")
model_config = ConfigDict(populate_by_name=True, extra="allow")
class SavedProduct(BaseModel):
id: int
run_id: int
scraper: str
saved_at: str
title: Optional[str] = None
url: Optional[str] = None
product_id: Optional[str] = None
search_term: Optional[str] = None
product: Dict[str, Any] = Field(default_factory=dict)
rank: Optional[float] = None
model_config = ConfigDict(extra="allow")
class TemuScrapeResponse(BaseModel):
scraper: str = "sovereigntaylor"
actor_id: Optional[str] = None
saved_run_id: Optional[int] = None
saved_at: Optional[str] = None
search_terms: List[str] = Field(default_factory=list)
search_queries: List[str] = Field(default_factory=list)
product_urls: List[str] = Field(default_factory=list)
category: str = ""
currency: Optional[str] = None
max_results: int = 0
product_count: int = 0
run_id: Optional[str] = None
dataset_id: Optional[str] = None
dataset_url: Optional[str] = None
products: List[TemuProduct] = Field(default_factory=list)
class TemuSearchResponse(BaseModel):
query: str
scraper: str
match_count: int = 0
total_saved_for_scraper: int = 0
run_id: Optional[int] = None
since: Optional[str] = None
results: List[SavedProduct] = Field(default_factory=list)

View File

@ -0,0 +1,82 @@
import os
from typing import Any, Dict, List, Literal, Optional, Tuple
import httpx
from dotenv import load_dotenv
from .serializers import AmitTemuScrapeRequest, TemuScrapeRequest
load_dotenv()
ScraperKey = Literal["sovereigntaylor", "amit123"]
ACTORS: Dict[ScraperKey, Dict[str, str]] = {
"sovereigntaylor": {
"id": "sovereigntaylor/temu-product-scraper",
"slug": "sovereigntaylor~temu-product-scraper",
},
"amit123": {
"id": "amit123/temu-products-scraper",
"slug": "amit123~temu-products-scraper",
},
}
class TemuScraperService:
"""Runs Temu scrapers on Apify."""
def __init__(
self,
api_token: Optional[str] = None,
timeout: float = 600.0,
):
self.api_token = api_token or os.getenv("APIFY_API_TOKEN")
self.timeout = timeout
if not self.api_token:
raise ValueError("APIFY_API_TOKEN is not set in environment")
def _run_sync_url(self, scraper: ScraperKey) -> str:
slug = ACTORS[scraper]["slug"]
return (
f"https://api.apify.com/v2/acts/{slug}/run-sync-get-dataset-items"
)
async def _run_actor(
self,
scraper: ScraperKey,
run_input: Dict[str, Any],
) -> Tuple[List[Dict[str, Any]], Dict[str, Any]]:
params = {"token": self.api_token}
url = self._run_sync_url(scraper)
async with httpx.AsyncClient(timeout=self.timeout) as client:
response = await client.post(url, params=params, json=run_input)
response.raise_for_status()
items = response.json()
if not isinstance(items, list):
raise ValueError(f"Unexpected Apify response type: {type(items).__name__}")
meta = {
"scraper": scraper,
"actor_id": ACTORS[scraper]["id"],
"item_count": len(items),
}
return items, meta
async def scrape_sovereigntaylor(
self, request: TemuScrapeRequest
) -> Tuple[List[Dict[str, Any]], Dict[str, Any]]:
return await self._run_actor("sovereigntaylor", request.to_actor_input())
async def scrape_amit123(
self, request: AmitTemuScrapeRequest
) -> Tuple[List[Dict[str, Any]], Dict[str, Any]]:
return await self._run_actor("amit123", request.to_actor_input())
async def scrape(
self, request: TemuScrapeRequest
) -> Tuple[List[Dict[str, Any]], Dict[str, Any]]:
"""Backward-compatible alias for sovereignigtaylor scraper."""
return await self.scrape_sovereigntaylor(request)

254
temu_scraper_api/storage.py Normal file
View File

@ -0,0 +1,254 @@
import json
import os
import re
import sqlite3
from contextlib import contextmanager
from datetime import datetime, timezone
from pathlib import Path
from typing import Any, Dict, List, Optional, Tuple
ScraperKey = str
DEFAULT_DB_PATH = os.getenv("TEMU_DB_PATH", "data/temu_scrapes.db")
def _utc_now_iso() -> str:
return datetime.now(timezone.utc).isoformat()
def _extract_product_fields(scraper: ScraperKey, item: Dict[str, Any]) -> Dict[str, Optional[str]]:
title = item.get("title")
url = item.get("url") or item.get("link_url")
product_id = item.get("productId") or item.get("product_id") or item.get("goods_id")
if product_id is not None:
product_id = str(product_id)
search_term = item.get("searchTerm") or item.get("search_term")
category = item.get("category")
if isinstance(category, dict):
category = json.dumps(category)
store = item.get("store")
brand = item.get("brand")
return {
"title": title,
"url": url,
"product_id": product_id,
"search_term": search_term,
"category": str(category) if category else None,
"store": store,
"brand": str(brand) if brand else None,
}
def _fts_query(user_query: str) -> str:
terms = re.findall(r"[\w]+", user_query, flags=re.UNICODE)
if not terms:
return '""'
return " ".join(f'"{term}"*' for term in terms)
class ScrapeStorage:
def __init__(self, db_path: str = DEFAULT_DB_PATH):
self.db_path = db_path
Path(db_path).parent.mkdir(parents=True, exist_ok=True)
@contextmanager
def _connect(self):
conn = sqlite3.connect(self.db_path)
conn.row_factory = sqlite3.Row
try:
yield conn
conn.commit()
finally:
conn.close()
def init_db(self) -> None:
with self._connect() as conn:
conn.executescript(
"""
CREATE TABLE IF NOT EXISTS scrape_runs (
id INTEGER PRIMARY KEY AUTOINCREMENT,
scraper TEXT NOT NULL,
saved_at TEXT NOT NULL,
actor_id TEXT,
request_json TEXT NOT NULL,
product_count INTEGER NOT NULL
);
CREATE TABLE IF NOT EXISTS scrape_products (
id INTEGER PRIMARY KEY AUTOINCREMENT,
run_id INTEGER NOT NULL,
scraper TEXT NOT NULL,
saved_at TEXT NOT NULL,
title TEXT,
url TEXT,
product_id TEXT,
search_term TEXT,
category TEXT,
store TEXT,
brand TEXT,
data_json TEXT NOT NULL,
FOREIGN KEY (run_id) REFERENCES scrape_runs(id)
);
CREATE INDEX IF NOT EXISTS idx_products_scraper_saved
ON scrape_products (scraper, saved_at DESC);
CREATE INDEX IF NOT EXISTS idx_products_run_id
ON scrape_products (run_id);
CREATE VIRTUAL TABLE IF NOT EXISTS scrape_products_fts USING fts5(
product_rowid UNINDEXED,
scraper UNINDEXED,
saved_at UNINDEXED,
title,
search_term,
category,
store,
brand,
url,
tokenize='unicode61'
);
"""
)
def save_scrape(
self,
scraper: ScraperKey,
actor_id: Optional[str],
request_payload: Dict[str, Any],
items: List[Dict[str, Any]],
) -> Tuple[int, str, int]:
saved_at = _utc_now_iso()
request_json = json.dumps(request_payload, ensure_ascii=False)
with self._connect() as conn:
cursor = conn.execute(
"""
INSERT INTO scrape_runs (scraper, saved_at, actor_id, request_json, product_count)
VALUES (?, ?, ?, ?, ?)
""",
(scraper, saved_at, actor_id, request_json, len(items)),
)
run_id = cursor.lastrowid
for item in items:
fields = _extract_product_fields(scraper, item)
data_json = json.dumps(item, ensure_ascii=False)
product_cursor = conn.execute(
"""
INSERT INTO scrape_products (
run_id, scraper, saved_at, title, url, product_id,
search_term, category, store, brand, data_json
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""",
(
run_id,
scraper,
saved_at,
fields["title"],
fields["url"],
fields["product_id"],
fields["search_term"],
fields["category"],
fields["store"],
fields["brand"],
data_json,
),
)
product_id = product_cursor.lastrowid
conn.execute(
"""
INSERT INTO scrape_products_fts (
product_rowid, scraper, saved_at, title, search_term,
category, store, brand, url
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
""",
(
product_id,
scraper,
saved_at,
fields["title"] or "",
fields["search_term"] or "",
fields["category"] or "",
fields["store"] or "",
fields["brand"] or "",
fields["url"] or "",
),
)
return run_id, saved_at, len(items)
def search_products(
self,
scraper: ScraperKey,
query: str,
limit: int = 50,
run_id: Optional[int] = None,
since: Optional[str] = None,
) -> Tuple[List[Dict[str, Any]], int]:
fts_q = _fts_query(query)
sql = """
SELECT
p.id,
p.run_id,
p.scraper,
p.saved_at,
p.title,
p.url,
p.product_id,
p.search_term,
p.data_json,
bm25(scrape_products_fts) AS rank
FROM scrape_products_fts
JOIN scrape_products p ON p.id = scrape_products_fts.product_rowid
WHERE scrape_products_fts MATCH ?
AND scrape_products_fts.scraper = ?
"""
params: List[Any] = [fts_q, scraper]
if run_id is not None:
sql += " AND p.run_id = ?"
params.append(run_id)
if since:
sql += " AND p.saved_at >= ?"
params.append(since)
sql += " ORDER BY rank LIMIT ?"
params.append(limit)
with self._connect() as conn:
rows = conn.execute(sql, params).fetchall()
total = conn.execute(
"SELECT COUNT(*) FROM scrape_products WHERE scraper = ?", (scraper,)
).fetchone()[0]
results = []
for row in rows:
product = json.loads(row["data_json"])
results.append(
{
"id": row["id"],
"run_id": row["run_id"],
"scraper": row["scraper"],
"saved_at": row["saved_at"],
"title": row["title"],
"url": row["url"],
"product_id": row["product_id"],
"search_term": row["search_term"],
"product": product,
"rank": row["rank"],
}
)
return results, total
_storage: Optional[ScrapeStorage] = None
def get_storage() -> ScrapeStorage:
global _storage
if _storage is None:
_storage = ScrapeStorage()
return _storage

183
temu_scraper_api/views.py Normal file
View File

@ -0,0 +1,183 @@
import logging
from typing import List, Optional
import httpx
from fastapi import APIRouter, HTTPException, Query
from .serializers import (
AmitTemuScrapeRequest,
SavedProduct,
TemuScrapeRequest,
TemuScrapeResponse,
TemuSearchResponse,
TemuProduct,
)
from .service import TemuScraperService
from .storage import get_storage
app_router = APIRouter()
logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s")
log = logging.getLogger(__name__)
def _products_from_items(items: list) -> List[TemuProduct]:
return [TemuProduct.model_validate(item) for item in items]
def _sovereigntaylor_response(
request: TemuScrapeRequest,
items: list,
meta: dict,
saved_run_id: int,
saved_at: str,
) -> TemuScrapeResponse:
products = _products_from_items(items)
return TemuScrapeResponse(
scraper=meta.get("scraper", "sovereigntaylor"),
actor_id=meta.get("actor_id"),
saved_run_id=saved_run_id,
saved_at=saved_at,
search_terms=request.search_terms,
product_urls=request.product_urls,
category=request.category,
max_results=request.max_results,
product_count=len(products),
products=products,
)
def _amit123_response(
request: AmitTemuScrapeRequest,
items: list,
meta: dict,
saved_run_id: int,
saved_at: str,
) -> TemuScrapeResponse:
products = _products_from_items(items)
return TemuScrapeResponse(
scraper=meta.get("scraper", "amit123"),
actor_id=meta.get("actor_id"),
saved_run_id=saved_run_id,
saved_at=saved_at,
search_queries=request.search_queries,
currency=request.currency,
max_results=request.max_results,
product_count=len(products),
products=products,
)
def _search_saved(
scraper: str,
q: str,
limit: int,
run_id: Optional[int],
since: Optional[str],
) -> TemuSearchResponse:
storage = get_storage()
results, total = storage.search_products(
scraper=scraper,
query=q,
limit=limit,
run_id=run_id,
since=since,
)
return TemuSearchResponse(
query=q,
scraper=scraper,
match_count=len(results),
total_saved_for_scraper=total,
run_id=run_id,
since=since,
results=[SavedProduct.model_validate(r) for r in results],
)
async def _handle_scrape(coro):
try:
return await coro
except ValueError as e:
log.error(f"Validation or config error: {e}")
raise HTTPException(status_code=400, detail=str(e))
except httpx.HTTPStatusError as e:
log.error(f"Apify returned {e.response.status_code}: {e.response.text}")
raise HTTPException(status_code=e.response.status_code, detail=e.response.text)
except httpx.RequestError as e:
log.error(f"Network error calling Apify: {e}")
raise HTTPException(status_code=502, detail=f"Upstream request failed: {e}")
except Exception as e:
log.error(f"Unexpected error: {e}")
raise HTTPException(status_code=500, detail=str(e))
@app_router.post("/scrape", response_model=TemuScrapeResponse)
async def scrape_and_save_sovereigntaylor(body: TemuScrapeRequest):
"""
Run Apify sovereignigtaylor scraper, save results with timestamp, and index them.
Expensive use GET /scrape to search saved results instead of re-scraping.
"""
async def run():
service = TemuScraperService()
items, meta = await service.scrape_sovereigntaylor(body)
storage = get_storage()
run_id, saved_at, _ = storage.save_scrape(
scraper="sovereigntaylor",
actor_id=meta.get("actor_id"),
request_payload=body.model_dump(),
items=items,
)
log.info(f"Saved {len(items)} products as run_id={run_id} at {saved_at}")
return _sovereigntaylor_response(body, items, meta, run_id, saved_at)
return await _handle_scrape(run())
@app_router.get("/scrape", response_model=TemuSearchResponse)
async def search_saved_sovereigntaylor(
q: str = Query(..., description="Search saved sovereignigtaylor results (FTS index)"),
limit: int = Query(50, ge=1, le=500),
run_id: Optional[int] = Query(None, description="Filter to a specific saved scrape run"),
since: Optional[str] = Query(None, description="Only results saved at or after this ISO timestamp"),
):
"""Search locally saved scrape results. Does not call Apify."""
if not q.strip():
raise HTTPException(status_code=400, detail="Query parameter q is required")
return _search_saved("sovereigntaylor", q, limit, run_id, since)
@app_router.post("/scrape/amit123", response_model=TemuScrapeResponse)
async def scrape_and_save_amit123(body: AmitTemuScrapeRequest):
"""
Run Apify amit123 scraper, save results with timestamp, and index them.
Expensive use GET /scrape/amit123 to search saved results.
"""
async def run():
service = TemuScraperService()
items, meta = await service.scrape_amit123(body)
storage = get_storage()
run_id, saved_at, _ = storage.save_scrape(
scraper="amit123",
actor_id=meta.get("actor_id"),
request_payload=body.model_dump(),
items=items,
)
log.info(f"Saved {len(items)} products as run_id={run_id} at {saved_at}")
return _amit123_response(body, items, meta, run_id, saved_at)
return await _handle_scrape(run())
@app_router.get("/scrape/amit123", response_model=TemuSearchResponse)
async def search_saved_amit123(
q: str = Query(..., description="Search saved amit123 results (FTS index)"),
limit: int = Query(50, ge=1, le=500),
run_id: Optional[int] = Query(None, description="Filter to a specific saved scrape run"),
since: Optional[str] = Query(None, description="Only results saved at or after this ISO timestamp"),
):
"""Search locally saved scrape results. Does not call Apify."""
if not q.strip():
raise HTTPException(status_code=400, detail="Query parameter q is required")
return _search_saved("amit123", q, limit, run_id, since)

60
test_search_api.py Normal file
View File

@ -0,0 +1,60 @@
"""Quick smoke test that hits Google Custom Search API directly.
Reads GEMINI_API_KEY and SEARCH_ENGINE_ID from .env and confirms credentials
work without going through the FastAPI service.
"""
import os
import sys
import requests
from dotenv import load_dotenv
load_dotenv()
GOOGLE_CSE_ENDPOINT = "https://www.googleapis.com/customsearch/v1"
API_KEY = os.getenv("GEMINI_API_KEY")
CX = os.getenv("SEARCH_ENGINE_ID")
QUERY = sys.argv[1] if len(sys.argv) > 1 else "lectures"
NUM = int(sys.argv[2]) if len(sys.argv) > 2 else 5
def main():
if not API_KEY:
print("[FAIL] GEMINI_API_KEY not set in .env")
sys.exit(1)
if not CX:
print("[FAIL] SEARCH_ENGINE_ID not set in .env")
sys.exit(1)
params = {"key": API_KEY, "cx": CX, "q": QUERY, "num": NUM}
print(f"GET {GOOGLE_CSE_ENDPOINT}?q={QUERY}&num={NUM} (key/cx hidden)\n")
try:
r = requests.get(GOOGLE_CSE_ENDPOINT, params=params, timeout=30)
except requests.RequestException as e:
print(f"[FAIL] Could not reach Google CSE: {e}")
sys.exit(1)
print(f"Status: {r.status_code}")
if r.status_code != 200:
print(f"Body: {r.text}")
sys.exit(1)
data = r.json()
info = data.get("searchInformation", {}) or {}
items = data.get("items", []) or []
print(f"Query: {QUERY}")
print(f"Total results: {info.get('totalResults')}")
print(f"Search time: {info.get('searchTime')}s")
print(f"Items returned: {len(items)}\n")
for i, item in enumerate(items, 1):
print(f"{i}. {item.get('title')}")
print(f" {item.get('link')}")
snippet = (item.get("snippet") or "").replace("\n", " ")
print(f" {snippet[:120]}{'...' if len(snippet) > 120 else ''}\n")
if __name__ == "__main__":
main()

97
test_temu_scraper.py Normal file
View File

@ -0,0 +1,97 @@
"""Smoke test for Temu scrapers (POST=Apify+save, GET=search cache).
Usage:
python test_temu_scraper.py scrape "wireless earbuds" 10
python test_temu_scraper.py search "wireless" 20
python test_temu_scraper.py scrape "women dress" 40 amit123
python test_temu_scraper.py search "dress" 10 amit123
Requires APIFY_API_TOKEN in .env for scrape mode only.
"""
import asyncio
import os
import sys
from dotenv import load_dotenv
load_dotenv()
from temu_scraper_api.serializers import AmitTemuScrapeRequest, TemuScrapeRequest
from temu_scraper_api.service import TemuScraperService
from temu_scraper_api.storage import get_storage
async def scrape(query: str, max_results: int, scraper: str) -> None:
if not os.getenv("APIFY_API_TOKEN"):
print("[FAIL] APIFY_API_TOKEN not set in .env")
sys.exit(1)
storage = get_storage()
storage.init_db()
service = TemuScraperService()
if scraper == "amit123":
max_results = max(max_results, 20)
request = AmitTemuScrapeRequest(search_queries=[query], max_results=max_results)
print(f"[amit123] Scraping '{query}' (max_results={max_results})...\n")
items, meta = await service.scrape_amit123(request)
request_payload = request.model_dump()
else:
request = TemuScrapeRequest(search_terms=[query], max_results=max_results)
print(f"[sovereigntaylor] Scraping '{query}' (max_results={max_results})...\n")
items, meta = await service.scrape_sovereigntaylor(request)
request_payload = request.model_dump()
run_id, saved_at, count = storage.save_scrape(
scraper=scraper,
actor_id=meta.get("actor_id"),
request_payload=request_payload,
items=items,
)
print(f"Actor: {meta.get('actor_id')}")
print(f"Saved run_id={run_id} at {saved_at} ({count} products)\n")
_print_items(items)
def search(query: str, limit: int, scraper: str) -> None:
storage = get_storage()
storage.init_db()
results, total = storage.search_products(scraper=scraper, query=query, limit=limit)
print(f"[{scraper}] Search '{query}' -> {len(results)} matches ({total} total saved)\n")
for i, row in enumerate(results[:10], 1):
product = row["product"]
title = row.get("title") or product.get("title", "N/A")
price = product.get("price") or product.get("price_info.price_str", "N/A")
print(f"{i}. [{row['saved_at']}] run={row['run_id']} {title}")
print(f" price={price} rank={row.get('rank')}")
print(f" {row.get('url')}\n")
if len(results) > 10:
print(f"... and {len(results) - 10} more")
def _print_items(items: list) -> None:
for i, item in enumerate(items[:10], 1):
title = item.get("title", "N/A")
price = item.get("price") or item.get("price_info.price_str", "N/A")
url = item.get("url") or item.get("link_url", "")
print(f"{i}. {title}")
print(f" price={price}")
print(f" {url}\n")
if len(items) > 10:
print(f"... and {len(items) - 10} more")
async def main():
mode = sys.argv[1] if len(sys.argv) > 1 else "scrape"
query = sys.argv[2] if len(sys.argv) > 2 else "wireless earbuds"
limit = int(sys.argv[3]) if len(sys.argv) > 3 else 5
scraper = sys.argv[4] if len(sys.argv) > 4 else "sovereigntaylor"
if mode == "search":
search(query, limit, scraper)
else:
await scrape(query, limit, scraper)
if __name__ == "__main__":
asyncio.run(main())