import os import pickle import requests import sys from selenium import webdriver from selenium.webdriver.chrome.options import Options from selenium.webdriver.common.by import By from selenium.webdriver.chrome.service import Service from webdriver_manager.chrome import ChromeDriverManager from time import sleep import json import time import re ACTIVE_ENV = 'prod' config = { "prod": { "base_path" : "/home/ec2-user/keyword_ranking_crawler", "marketplace_path": "/home/ec2-user/keyword_ranking_crawler/marketplaces.json", "keyword_path": "/home/ec2-user/keyword_ranking_crawler/keywords.json", "cookies_path": "/home/ec2-user/keyword_ranking_crawler/cookies.json", "data_path": "/mnt/AmazonReports/Amazon/keyword_ranking", "screenshots_path": "/mnt/AmazonReports/Amazon/keyword_ranking/screenshots" }, "dev": { "base_path" : "C:/Users/saif.haq/Desktop/Scrapper", "marketplace_path": "marketplaces.json", "keyword_path": "keywords.json", "cookies_path": "cookies.json", "data_path": "data", "screenshots_path": "screenshots" } } with open(config[ACTIVE_ENV]["marketplace_path"], "r", encoding="utf-8") as f: data = json.load(f) # with open(config[ACTIVE_ENV]["keyword_path"], "r", encoding="utf-8") as f: # keywords = json.load(f) keywords = [] url = "https://cosmos.utopiadeals.com/cosmos/rest/get-keywords-on-automation" response = requests.get(url) if response.status_code == 200: keywords = response.json() print(keywords) else: print("Error:", response.status_code) sys.exit() with open(config[ACTIVE_ENV]["cookies_path"], "r", encoding="utf-8") as f: cookies_ref = json.load(f) # Or if it's a Python dict already: marketplaces = data["marketplaces"] BASE_PATH= config[ACTIVE_ENV]["data_path"] MAX_PAGE = 2 def get_driver(): options = Options() if ACTIVE_ENV == "prod": options.add_argument("--headless") options.add_argument("--disable-blink-features=AutomationControlled") options.add_argument("--disable-infobars") options.add_argument("--disable-extensions") options.add_argument("--start-maximized") options.add_argument("--no-sandbox") options.add_argument("--disable-dev-shm-usage") options.add_argument("--window-size=1920,1080") # Set a realistic user agent options.add_argument( "user-agent=Mozilla/5.0 (Windows NT 10.0; Win64; x64) " "AppleWebKit/537.36 (KHTML, like Gecko) " "Chrome/115.0.0.0 Safari/537.36" ) # Experimental options to hide automation options.add_experimental_option("excludeSwitches", ["enable-automation"]) options.add_experimental_option("useAutomationExtension", False) options.add_experimental_option("prefs", { "profile.default_content_setting_values.cookies": 2 }) driver = webdriver.Chrome(options=options) driver.execute_cdp_cmd("Page.addScriptToEvaluateOnNewDocument", { "source": """ Object.defineProperty(navigator, 'webdriver', { get: () => undefined }); """ }) return driver def save_cookies(driver, path): with open(path, "wb") as f: pickle.dump(driver.get_cookies(), f) def parse_number_with_commas(str): try: return int(str.replace(",", "")) except: return 0 def parse_rating(ratingHtml,prefix): try : rating_str = ratingHtml.get_attribute("innerHTML") return float(rating_str.split(prefix)[0]) except: return "" def parse_bought_count(text): if not text: return 0 # Extract numeric part using regex match = re.search(r'([\d,.]+)([KM]?)\+?', text.upper()) if not match: return 0 number, unit = match.groups() number = float(number.replace(',', '')) if unit == 'K': return int(number * 1_000) elif unit == 'M': return int(number * 1_000_000) else: return int(number) def save_ranking(rankings, file_path): if len(rankings) > 0 : with open(file_path, "w", encoding="utf-8") as f: json.dump(rankings, f, ensure_ascii=False, indent=4) def load_cookies(driver, path): with open(path, "rb") as f: cookies = pickle.load(f) for cookie in cookies: if 'sameSite' in cookie: cookie.pop('sameSite') # Optional fix if Chrome complains driver.add_cookie(cookie) def check_sponsored(item): try: # Check if any element inside contains the exact text "Sponsored" sponsored_labels = item.find_elements(By.XPATH, './/*[contains(text(), "Sponsored")]') for label in sponsored_labels: if label.text.strip().lower() == "sponsored": return 1 except: return 0 def check_consist_utopia( title ): return 1 if "Utopia" in title else 0 def get_amazon_ranks(url, marketplace, ratingPrefix, keyword, page, count): print( '[INFO] Getting Amazon Ranks for: ', marketplace, keyword, page) url = f"https://www.{url}/s?k={keyword.replace(' ', '+')}&page={page}" driver.get(url) ranks = [] COOKIE_FILE = f"{config[ACTIVE_ENV]['base_path']}/{cookies_ref[marketplace]['cookies_name']}"; # Load cookies if available if os.path.exists(COOKIE_FILE): load_cookies(driver, COOKIE_FILE) driver.get(url) else: print("No cookie file found, visiting fresh") driver.get(url) sleep(1) # Give time to solve CAPTCHA manually (if needed) save_cookies(driver, COOKIE_FILE) sleep(1) # Wait for JS to load items = driver.find_elements(By.XPATH, '//div[contains(@class,"s-result-item") and @data-asin]') for idx, item in enumerate(items, start=1): try: asin = item.get_attribute("data-asin") sponsored = check_sponsored(item) title = item.find_element(By.XPATH, './/h2//span').text rating = item.find_element(By.XPATH, './/span[@class="a-icon-alt"]') reviews_count = item.find_element(By.XPATH, './/span[@class="a-size-base s-underline-text"]').text last_month_bought = item.find_element(By.XPATH, './/span[contains(@class, "a-size-base a-color-secondary") and contains(text(), "bought")]').text if title == 'Results': continue if sponsored == None : ranks.append({ 'rank' : count, 'title' : title, 'marketplace' : marketplace, 'keyword': keyword, 'sponsored' : 0, 'asin' : asin, 'is_utopia' : check_consist_utopia(title), 'url' : url, 'rating' : parse_rating(rating,ratingPrefix), 'reviews_count' : parse_number_with_commas (reviews_count ), 'last_month_bought': parse_bought_count (last_month_bought) }) count += 1 except: continue screenshot_path = config[ACTIVE_ENV]['screenshots_path'] + f"/{marketplace}-{keyword}-{page}--{int(time.time() * 1000)}.png" driver.save_screenshot(screenshot_path) file_path = f"{BASE_PATH}/{int(time.time() * 1000)}-{marketplace}-{keyword}.json" save_ranking(ranks, file_path ) if( len(ranks) == 0 ): return -1 return count try : driver = get_driver() for keyword in keywords: for marketplace, details in marketplaces.items(): if marketplace == 'AMAZON_USA': url = details['url'] ratingPrefix = details['ratingPrefix'] count =1 for i in range(1, MAX_PAGE): count = get_amazon_ranks(url, marketplace, ratingPrefix, keyword, i, count) if count == -1: break sleep(1) finally: print("[INFO] Closing WebDriver...") driver.quit()