110 lines
3.2 KiB
Python
110 lines
3.2 KiB
Python
import os
|
|
import pickle
|
|
from selenium import webdriver
|
|
from selenium.webdriver.chrome.options import Options
|
|
from selenium.webdriver.common.by import By
|
|
from time import sleep
|
|
import json
|
|
import time
|
|
|
|
|
|
with open("marketplaces.json", "r", encoding="utf-8") as f:
|
|
data = json.load(f)
|
|
|
|
with open("cookies.json", "r", encoding="utf-8") as f:
|
|
cookies_ref = json.load(f)
|
|
|
|
# Or if it's a Python dict already:
|
|
marketplaces = data["marketplaces"]
|
|
|
|
|
|
def get_driver():
|
|
options = Options()
|
|
options.add_argument("--headless")
|
|
options.add_argument("--disable-blink-features=AutomationControlled")
|
|
driver = webdriver.Chrome(options=options)
|
|
return driver
|
|
|
|
def save_cookies(driver, path):
|
|
with open(path, "wb") as f:
|
|
pickle.dump(driver.get_cookies(), f)
|
|
|
|
|
|
def save_ranking(rankings, file_path):
|
|
with open(file_path, "w", encoding="utf-8") as f:
|
|
json.dump(rankings, f, ensure_ascii=False, indent=4)
|
|
|
|
def load_cookies(driver, path):
|
|
with open(path, "rb") as f:
|
|
cookies = pickle.load(f)
|
|
for cookie in cookies:
|
|
if 'sameSite' in cookie:
|
|
cookie.pop('sameSite') # Optional fix if Chrome complains
|
|
driver.add_cookie(cookie)
|
|
|
|
|
|
def check_sponsored(item):
|
|
try:
|
|
# Check if any element inside contains the exact text "Sponsored"
|
|
sponsored_labels = item.find_elements(By.XPATH, './/*[contains(text(), "Sponsored")]')
|
|
for label in sponsored_labels:
|
|
if label.text.strip().lower() == "sponsored":
|
|
return 1
|
|
except:
|
|
return 0
|
|
|
|
def check_consist_utopia( title ):
|
|
return 1 if "Utopia" in title else 0
|
|
|
|
|
|
|
|
def get_amazon_ranks(url, marketplace, keyword):
|
|
print( '[INFO] Getting Amazon Ranks for: ', marketplace, keyword)
|
|
url = f"https://www.{url}/s?k={keyword.replace(' ', '+')}"
|
|
driver.get(url)
|
|
count =1
|
|
ranks = []
|
|
|
|
COOKIE_FILE = f"{cookies_ref[marketplace]['cookies_name']}";
|
|
print(COOKIE_FILE)
|
|
|
|
# Load cookies if available
|
|
if os.path.exists(COOKIE_FILE):
|
|
load_cookies(driver, COOKIE_FILE)
|
|
driver.get(url)
|
|
else:
|
|
print("No cookie file found, visiting fresh")
|
|
driver.get(url)
|
|
sleep(5) # Give time to solve CAPTCHA manually (if needed)
|
|
save_cookies(driver, COOKIE_FILE)
|
|
|
|
sleep(3) # Wait for JS to load
|
|
items = driver.find_elements(By.XPATH, '//div[contains(@class,"s-result-item") and @data-asin]')
|
|
for idx, item in enumerate(items, start=1):
|
|
asin = item.get_attribute("data-asin")
|
|
try:
|
|
sponsored = check_sponsored(item)
|
|
title = item.find_element(By.XPATH, './/h2//span').text
|
|
if title == 'Results':
|
|
continue
|
|
if sponsored == None :
|
|
ranks.append({'rank' : count , 'title' : title , 'marketplace' : marketplace , 'keyword': keyword, 'sponsored' : 0, 'asin' : asin , 'is_utopia' : check_consist_utopia(title) })
|
|
count += 1
|
|
except:
|
|
continue
|
|
|
|
file_path = f"{int(time.time() * 1000)}-{marketplace}-{keyword}.json"
|
|
|
|
save_ranking(ranks, file_path )
|
|
|
|
|
|
driver = get_driver()
|
|
for marketplace, details in marketplaces.items():
|
|
url = details['url']
|
|
get_amazon_ranks(url, marketplace, 'pillows')
|
|
driver.quit()
|
|
|
|
|
|
|
|
|