224 lines
7.6 KiB
Python
224 lines
7.6 KiB
Python
import os
|
|
import pickle
|
|
from selenium import webdriver
|
|
from selenium.webdriver.chrome.options import Options
|
|
from selenium.webdriver.common.by import By
|
|
from selenium.webdriver.chrome.service import Service
|
|
from webdriver_manager.chrome import ChromeDriverManager
|
|
|
|
|
|
from time import sleep
|
|
import json
|
|
import time
|
|
import re
|
|
|
|
|
|
ACTIVE_ENV = 'prod'
|
|
|
|
|
|
config = {
|
|
"prod": {
|
|
"base_path" : "/home/ec2-user/keyword_ranking_crawler",
|
|
"marketplace_path": "/home/ec2-user/keyword_ranking_crawler/marketplaces.json",
|
|
"keyword_path": "/home/ec2-user/keyword_ranking_crawler/keywords.json",
|
|
"cookies_path": "/home/ec2-user/keyword_ranking_crawler/cookies.json",
|
|
"data_path": "/mnt/AmazonReports/Amazon/keyword_ranking",
|
|
"screenshots_path": "/mnt/AmazonReports/Amazon/keyword_ranking/screenshots"
|
|
},
|
|
"dev": {
|
|
"base_path" : "C:/Users/saif.haq/Desktop/Scrapper",
|
|
"marketplace_path": "marketplaces.json",
|
|
"keyword_path": "keywords.json",
|
|
"cookies_path": "cookies.json",
|
|
"data_path": "data",
|
|
"screenshots_path": "screenshots"
|
|
}
|
|
}
|
|
|
|
|
|
|
|
with open(config[ACTIVE_ENV]["marketplace_path"], "r", encoding="utf-8") as f:
|
|
data = json.load(f)
|
|
|
|
with open(config[ACTIVE_ENV]["keyword_path"], "r", encoding="utf-8") as f:
|
|
keywords = json.load(f)
|
|
|
|
with open(config[ACTIVE_ENV]["cookies_path"], "r", encoding="utf-8") as f:
|
|
cookies_ref = json.load(f)
|
|
|
|
# Or if it's a Python dict already:
|
|
marketplaces = data["marketplaces"]
|
|
|
|
BASE_PATH= config[ACTIVE_ENV]["data_path"]
|
|
MAX_PAGE = 2
|
|
|
|
|
|
def get_driver():
|
|
options = Options()
|
|
if ACTIVE_ENV == "prod":
|
|
options.add_argument("--headless")
|
|
options.add_argument("--disable-blink-features=AutomationControlled")
|
|
options.add_argument("--disable-infobars")
|
|
options.add_argument("--disable-extensions")
|
|
options.add_argument("--start-maximized")
|
|
options.add_argument("--no-sandbox")
|
|
options.add_argument("--disable-dev-shm-usage")
|
|
options.add_argument("--window-size=1920,1080")
|
|
# Set a realistic user agent
|
|
options.add_argument(
|
|
"user-agent=Mozilla/5.0 (Windows NT 10.0; Win64; x64) "
|
|
"AppleWebKit/537.36 (KHTML, like Gecko) "
|
|
"Chrome/115.0.0.0 Safari/537.36"
|
|
)
|
|
# Experimental options to hide automation
|
|
options.add_experimental_option("excludeSwitches", ["enable-automation"])
|
|
options.add_experimental_option("useAutomationExtension", False)
|
|
options.add_experimental_option("prefs", {
|
|
"profile.default_content_setting_values.cookies": 2
|
|
})
|
|
driver = webdriver.Chrome(options=options)
|
|
driver.execute_cdp_cmd("Page.addScriptToEvaluateOnNewDocument", {
|
|
"source": """
|
|
Object.defineProperty(navigator, 'webdriver', {
|
|
get: () => undefined
|
|
});
|
|
"""
|
|
})
|
|
return driver
|
|
|
|
def save_cookies(driver, path):
|
|
with open(path, "wb") as f:
|
|
pickle.dump(driver.get_cookies(), f)
|
|
|
|
def parse_number_with_commas(str):
|
|
try:
|
|
return int(str.replace(",", ""))
|
|
except:
|
|
return 0
|
|
|
|
def parse_rating(ratingHtml,prefix):
|
|
try :
|
|
rating_str = ratingHtml.get_attribute("innerHTML")
|
|
return float(rating_str.split(prefix)[0])
|
|
except:
|
|
return ""
|
|
|
|
|
|
def parse_bought_count(text):
|
|
if not text:
|
|
return 0
|
|
# Extract numeric part using regex
|
|
match = re.search(r'([\d,.]+)([KM]?)\+?', text.upper())
|
|
if not match:
|
|
return 0
|
|
|
|
number, unit = match.groups()
|
|
number = float(number.replace(',', ''))
|
|
|
|
if unit == 'K':
|
|
return int(number * 1_000)
|
|
elif unit == 'M':
|
|
return int(number * 1_000_000)
|
|
else:
|
|
return int(number)
|
|
|
|
def save_ranking(rankings, file_path):
|
|
if len(rankings) > 0 :
|
|
with open(file_path, "w", encoding="utf-8") as f:
|
|
json.dump(rankings, f, ensure_ascii=False, indent=4)
|
|
|
|
def load_cookies(driver, path):
|
|
with open(path, "rb") as f:
|
|
cookies = pickle.load(f)
|
|
for cookie in cookies:
|
|
if 'sameSite' in cookie:
|
|
cookie.pop('sameSite') # Optional fix if Chrome complains
|
|
driver.add_cookie(cookie)
|
|
|
|
|
|
def check_sponsored(item):
|
|
try:
|
|
# Check if any element inside contains the exact text "Sponsored"
|
|
sponsored_labels = item.find_elements(By.XPATH, './/*[contains(text(), "Sponsored")]')
|
|
for label in sponsored_labels:
|
|
if label.text.strip().lower() == "sponsored":
|
|
return 1
|
|
except:
|
|
return 0
|
|
|
|
def check_consist_utopia( title ):
|
|
return 1 if "Utopia" in title else 0
|
|
|
|
|
|
|
|
def get_amazon_ranks(url, marketplace, ratingPrefix, keyword, page, count):
|
|
print( '[INFO] Getting Amazon Ranks for: ', marketplace, keyword, page)
|
|
url = f"https://www.{url}/s?k={keyword.replace(' ', '+')}&page={page}"
|
|
driver.get(url)
|
|
ranks = []
|
|
COOKIE_FILE = f"{config[ACTIVE_ENV]['base_path']}/{cookies_ref[marketplace]['cookies_name']}";
|
|
# Load cookies if available
|
|
if os.path.exists(COOKIE_FILE):
|
|
load_cookies(driver, COOKIE_FILE)
|
|
driver.get(url)
|
|
else:
|
|
print("No cookie file found, visiting fresh")
|
|
driver.get(url)
|
|
sleep(1) # Give time to solve CAPTCHA manually (if needed)
|
|
save_cookies(driver, COOKIE_FILE)
|
|
|
|
sleep(1) # Wait for JS to load
|
|
items = driver.find_elements(By.XPATH, '//div[contains(@class,"s-result-item") and @data-asin]')
|
|
for idx, item in enumerate(items, start=1):
|
|
try:
|
|
asin = item.get_attribute("data-asin")
|
|
sponsored = check_sponsored(item)
|
|
title = item.find_element(By.XPATH, './/h2//span').text
|
|
rating = item.find_element(By.XPATH, './/span[@class="a-icon-alt"]')
|
|
reviews_count = item.find_element(By.XPATH, './/span[@class="a-size-base s-underline-text"]').text
|
|
last_month_bought = item.find_element(By.XPATH, './/span[contains(@class, "a-size-base a-color-secondary") and contains(text(), "bought")]').text
|
|
if title == 'Results':
|
|
continue
|
|
if sponsored == None :
|
|
ranks.append({
|
|
'rank' : count,
|
|
'title' : title,
|
|
'marketplace' : marketplace,
|
|
'keyword': keyword,
|
|
'sponsored' : 0,
|
|
'asin' : asin,
|
|
'is_utopia' : check_consist_utopia(title),
|
|
'url' : url,
|
|
'rating' : parse_rating(rating,ratingPrefix),
|
|
'reviews_count' : parse_number_with_commas (reviews_count ),
|
|
'last_month_bought': parse_bought_count (last_month_bought)
|
|
})
|
|
count += 1
|
|
except:
|
|
continue
|
|
screenshot_path = config[ACTIVE_ENV]['screenshots_path'] + f"/{marketplace}-{keyword}-{page}--{int(time.time() * 1000)}.png"
|
|
driver.save_screenshot(screenshot_path)
|
|
file_path = f"{BASE_PATH}/{int(time.time() * 1000)}-{marketplace}-{keyword}.json"
|
|
save_ranking(ranks, file_path )
|
|
if( len(ranks) == 0 ):
|
|
return -1
|
|
return count
|
|
|
|
|
|
try :
|
|
driver = get_driver()
|
|
for keyword in keywords:
|
|
for marketplace, details in marketplaces.items():
|
|
if marketplace == 'AMAZON_USA':
|
|
url = details['url']
|
|
ratingPrefix = details['ratingPrefix']
|
|
count =1
|
|
for i in range(1, MAX_PAGE):
|
|
count = get_amazon_ranks(url, marketplace, ratingPrefix, keyword, i, count)
|
|
if count == -1:
|
|
break
|
|
sleep(1)
|
|
finally:
|
|
print("[INFO] Closing WebDriver...")
|
|
driver.quit()
|