commit fastapi qdrant structure

AMB_DEV
ambag12 2026-05-07 21:18:59 +05:00
parent 953f1f868f
commit dcaa4cc8c9
7 changed files with 210 additions and 7 deletions

6
.gitignore vendored Normal file
View File

@ -0,0 +1,6 @@
.env
**venv**
**data**
*pyc**
**pycache**
*agent/**

View File

@ -2,8 +2,8 @@ from fastapi import FastAPI, status
from starlette.middleware.base import BaseHTTPMiddleware, RequestResponseEndpoint from starlette.middleware.base import BaseHTTPMiddleware, RequestResponseEndpoint
from dotenv import load_dotenv from dotenv import load_dotenv
from db_setup import get_qdrant_client,get_session from db_setup import get_qdrant_client,get_session
from mysql_process.views import app_router from mysql_process.views import app_router as mysql_router
from vector_db_router.views from vector_db_router.views import app_router as vector_db_router
load_dotenv() load_dotenv()
api = FastAPI( api = FastAPI(
@ -11,5 +11,5 @@ api = FastAPI(
redoc_url="/redocs", redoc_url="/redocs",
) )
api.include_router(app_router,tags=["mysql_process"]) api.include_router(mysql_router,prefix="/mysql",tags=["mysql_process"])
api.include_router(vector_db_router,tags=["vector_db"]) api.include_router(vector_db_router,prefix="/collection",tags=["vector_db"])

View File

@ -1 +1,80 @@
from qdrant_client import AsyncQdrantClient, models from qdrant_client import AsyncQdrantClient, models
from qdrant_client.models import PointStruct
from typing import Dict, Any
class CollectionHandler:
def __init__(self, collection_name: str, vector: Any, vector_size: int, payload: Dict, id: int):
self.collection_name = collection_name
self.vector = vector
self.vector_size = vector_size
self.payload = payload
self.id = id
self.client = AsyncQdrantClient("localhost", port=6333)
async def create_collection(self):
try:
if await self.client.collection_exists(self.collection_name):
return {"message": "Collection already exists"}
await self.client.create_collection(
collection_name=self.collection_name,
vectors_config=models.VectorParams(size=self.vector_size, distance=models.Distance.COSINE),
optimizers_config=models.OptimizersConfigDiff(indexing_threshold=20000)
)
# Creating payload indexes as per project logic
await self.client.create_payload_index(
collection_name=self.collection_name,
field_name="Product_ID",
field_schema=models.PayloadSchemaType.KEYWORD
)
await self.client.create_payload_index(
collection_name=self.collection_name,
field_name="Product_Link",
field_schema=models.PayloadSchemaType.KEYWORD
)
return {"message": f"Collection {self.collection_name} created successfully"}
except Exception as e:
return {"message": str(e)}
async def insertion(self):
try:
await self.client.upsert(
collection_name=self.collection_name,
points=[
PointStruct(id=self.id, vector=self.vector, payload=self.payload)
]
)
print("Data inserted successfully")
return True
except Exception as e:
print("Insertion failed: ", e)
return False
async def upsert_point(self):
return await self.insertion()
async def search(self, query_vector):
try:
result = await self.client.search(
collection_name=self.collection_name,
query_vector=query_vector,
limit=10
)
return result
except Exception as e:
print("Search failed: ", e)
return None
async def update_collection(self):
"""Update is implemented as an upsert of the point data."""
return await self.upsert_point()
async def delete_collection(self):
try:
await self.client.delete_collection(collection_name=self.collection_name)
return {"message": f"Collection {self.collection_name} deleted successfully"}
except Exception as e:
return {"message": str(e)}

View File

@ -0,0 +1,22 @@
from pydantic import BaseModel
from typing import Dict, List, Any
class CreateCollectionSerializer(BaseModel):
collection_name: str
vector: List[float]
vector_size: int
payload: Dict[str, Any]
id: int
class QueryCollectionSerializer(BaseModel):
collection_name: str
query_vector: List[float]
class UpdateCollectionSerializer(BaseModel):
collection_name: str
vector: List[float]
payload: Dict[str, Any]
id: int
class DeleteCollectionSerializer(BaseModel):
collection_name: str

View File

@ -1,4 +1,100 @@
from db_setup import get_session,get_qdrant_client from db_setup import get_qdrant_client
# from .models import Product from typing import Annotated
from sqlalchemy.ext.asyncio import AsyncSession from fastapi import Depends, HTTPException, APIRouter
from qdrant_client import AsyncQdrantClient from qdrant_client import AsyncQdrantClient
from fastapi.responses import JSONResponse
from .serializers import (
CreateCollectionSerializer,
QueryCollectionSerializer,
UpdateCollectionSerializer,
DeleteCollectionSerializer
)
from .models import CollectionHandler
app_router = APIRouter()
@app_router.post("/create")
async def create_collection_endpoint(
q: Annotated[AsyncQdrantClient, Depends(get_qdrant_client)],
body: CreateCollectionSerializer = None
):
try:
if body is None:
raise HTTPException(status_code=400, detail="Collection name is required")
print("collection_name: ", body.collection_name)
handler = CollectionHandler(
collection_name=body.collection_name,
vector=body.vector,
vector_size=body.vector_size,
payload=body.payload,
id=body.id
)
# 1. Create collection
result = await handler.create_collection()
# 2. Automatically call upsert_point
await handler.upsert_point()
return JSONResponse(result)
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@app_router.get("/query")
async def query_collection_endpoint(
q: Annotated[AsyncQdrantClient, Depends(get_qdrant_client)],
body: QueryCollectionSerializer
):
try:
handler = CollectionHandler(
collection_name=body.collection_name,
vector=body.query_vector,
vector_size=len(body.query_vector),
payload={},
id=0
)
result = await handler.search(body.query_vector)
return JSONResponse({"results": str(result)})
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@app_router.put("/update")
async def update_collection_endpoint(
q: Annotated[AsyncQdrantClient, Depends(get_qdrant_client)],
body: UpdateCollectionSerializer
):
try:
handler = CollectionHandler(
collection_name=body.collection_name,
vector=body.vector,
vector_size=len(body.vector),
payload=body.payload,
id=body.id
)
result = await handler.update_collection()
return JSONResponse({"status": "success", "result": result})
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@app_router.delete("/delete")
async def delete_collection_endpoint(
q: Annotated[AsyncQdrantClient, Depends(get_qdrant_client)],
body: DeleteCollectionSerializer
):
try:
handler = CollectionHandler(
collection_name=body.collection_name,
vector=[],
vector_size=0,
payload={},
id=0
)
result = await handler.delete_collection()
return JSONResponse(result)
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))