Add custom search API with serializers and service implementation

main
bahawal.baloch 2026-05-14 15:52:47 +05:00
parent 0fe92f182f
commit 8d9b084a0b
4 changed files with 69 additions and 1 deletions

View File

View File

@ -0,0 +1,19 @@
from pydantic import BaseModel, Field
from typing import List, Optional, Any, Dict
class SearchResultItem(BaseModel):
title: Optional[str] = None
link: Optional[str] = None
displayLink: Optional[str] = None
snippet: Optional[str] = None
formattedUrl: Optional[str] = None
pagemap: Optional[Dict[str, Any]] = None
class SearchResponse(BaseModel):
query: str
total_results: Optional[str] = None
search_time: Optional[float] = None
items: List[SearchResultItem] = Field(default_factory=list)
raw: Optional[Dict[str, Any]] = None

View File

@ -0,0 +1,47 @@
import os
import httpx
from typing import Optional, Dict, Any
from dotenv import load_dotenv
load_dotenv()
GOOGLE_CSE_ENDPOINT = "https://www.googleapis.com/customsearch/v1"
class CustomSearchService:
def __init__(
self,
api_key: Optional[str] = None,
cx: Optional[str] = None,
timeout: float = 30.0,
):
self.api_key = api_key or os.getenv("GOOGLE_CSE_API_KEY")
self.cx = cx or os.getenv("GOOGLE_CSE_CX")
self.timeout = timeout
if not self.api_key:
raise ValueError("GOOGLE_CSE_API_KEY is not set in environment")
if not self.cx:
raise ValueError("GOOGLE_CSE_CX is not set in environment")
async def search(
self,
query: str,
num: int = 10,
start: int = 1,
extra_params: Optional[Dict[str, Any]] = None,
) -> Dict[str, Any]:
params: Dict[str, Any] = {
"key": self.api_key,
"cx": self.cx,
"q": query,
"num": num,
"start": start,
}
if extra_params:
params.update(extra_params)
async with httpx.AsyncClient(timeout=self.timeout) as client:
response = await client.get(GOOGLE_CSE_ENDPOINT, params=params)
response.raise_for_status()
return response.json()

View File

@ -10,6 +10,7 @@ from dotenv import load_dotenv
from db_setup import get_qdrant_client,get_session
from mysql_process.views import app_router as mysql_router
from vector_db_router.views import app_router as vector_db_router
from custom_search_api.views import app_router as custom_search_router
load_dotenv()
api = FastAPI(
@ -18,4 +19,5 @@ api = FastAPI(
)
api.include_router(mysql_router,prefix="/mysql",tags=["mysql_process"])
api.include_router(vector_db_router,prefix="/collection",tags=["vector_db"])
api.include_router(vector_db_router,prefix="/collection",tags=["vector_db"])
api.include_router(custom_search_router,prefix="/custom_search",tags=["custom_search"])