keyword_ranking_crawler/scrapper.py

190 lines
6.2 KiB
Python

import os
import pickle
from selenium import webdriver
from selenium.webdriver.chrome.options import Options
from selenium.webdriver.common.by import By
from selenium.webdriver.chrome.service import Service
from webdriver_manager.chrome import ChromeDriverManager
from time import sleep
import json
import time
import re
with open("marketplaces.json", "r", encoding="utf-8") as f:
data = json.load(f)
with open("keywords.json", "r", encoding="utf-8") as f:
keywords = json.load(f)
with open("cookies.json", "r", encoding="utf-8") as f:
cookies_ref = json.load(f)
# Or if it's a Python dict already:
marketplaces = data["marketplaces"]
#BASE_PATH= '/mnt/AmazonReports/Amazon/keyword_ranking'
BASE_PATH= 'data'
MAX_PAGE = 10
def get_driver():
options = Options()
#options.add_argument("--headless")
options.add_argument("--disable-blink-features=AutomationControlled") # Removes automation flag
options.add_experimental_option("excludeSwitches", ["enable-automation"])
options.add_experimental_option('useAutomationExtension', False)
options.add_argument("user-agent=Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/138.0.0.0 Safari/537.36")
options.add_argument("--start-maximized")
options.add_argument("user-data-dir=/home/ec2-user/keyword_ranking_crawler/chrome_path")
options.add_argument("profile-directory=Default")
driver = webdriver.Chrome(service=Service(ChromeDriverManager().install()), options=options)
# Remove navigator.webdriver
driver.execute_cdp_cmd("Page.addScriptToEvaluateOnNewDocument", {
"source": """
Object.defineProperty(navigator, 'webdriver', {
get: () => undefined
})
"""
})
return driver
def save_cookies(driver, path):
with open(path, "wb") as f:
pickle.dump(driver.get_cookies(), f)
def parse_number_with_commas(str):
try:
return int(str.replace(",", ""))
except:
return 0
def parse_rating(ratingHtml,prefix):
try :
rating_str = ratingHtml.get_attribute("innerHTML")
return float(rating_str.split(prefix)[0])
except:
return ""
def parse_bought_count(text):
if not text:
return 0
# Extract numeric part using regex
match = re.search(r'([\d,.]+)([KM]?)\+?', text.upper())
if not match:
return 0
number, unit = match.groups()
number = float(number.replace(',', ''))
if unit == 'K':
return int(number * 1_000)
elif unit == 'M':
return int(number * 1_000_000)
else:
return int(number)
def save_ranking(rankings, file_path):
if len(rankings) > 0 :
with open(file_path, "w", encoding="utf-8") as f:
json.dump(rankings, f, ensure_ascii=False, indent=4)
def load_cookies(driver, path):
with open(path, "rb") as f:
cookies = pickle.load(f)
for cookie in cookies:
if 'sameSite' in cookie:
cookie.pop('sameSite') # Optional fix if Chrome complains
driver.add_cookie(cookie)
def check_sponsored(item):
try:
# Check if any element inside contains the exact text "Sponsored"
sponsored_labels = item.find_elements(By.XPATH, './/*[contains(text(), "Sponsored")]')
for label in sponsored_labels:
if label.text.strip().lower() == "sponsored":
return 1
except:
return 0
def check_consist_utopia( title ):
return 1 if "Utopia" in title else 0
def get_amazon_ranks(url, marketplace, ratingPrefix, keyword, page, count):
print( '[INFO] Getting Amazon Ranks for: ', marketplace, keyword, page)
url = f"https://www.{url}/s?k={keyword.replace(' ', '+')}&page={page}"
driver.get(url)
ranks = []
COOKIE_FILE = f"{cookies_ref[marketplace]['cookies_name']}";
# Load cookies if available
if os.path.exists(COOKIE_FILE):
load_cookies(driver, COOKIE_FILE)
driver.get(url)
else:
print("No cookie file found, visiting fresh")
driver.get(url)
sleep(5) # Give time to solve CAPTCHA manually (if needed)
save_cookies(driver, COOKIE_FILE)
sleep(3) # Wait for JS to load
items = driver.find_elements(By.XPATH, '//div[contains(@class,"s-result-item") and @data-asin]')
for idx, item in enumerate(items, start=1):
asin = item.get_attribute("data-asin")
try:
sponsored = check_sponsored(item)
title = item.find_element(By.XPATH, './/h2//span').text
rating = item.find_element(By.XPATH, './/span[@class="a-icon-alt"]')
reviews_count = item.find_element(By.XPATH, './/span[@class="a-size-base s-underline-text"]').text
last_month_bought = item.find_element(By.XPATH, './/span[contains(@class, "a-size-base a-color-secondary") and contains(text(), "bought")]').text
if title == 'Results':
continue
if sponsored == None :
ranks.append({
'rank' : count,
'title' : title,
'marketplace' : marketplace,
'keyword': keyword,
'sponsored' : 0,
'asin' : asin,
'is_utopia' : check_consist_utopia(title),
'url' : url,
'rating' : parse_rating(rating,ratingPrefix),
'reviews_count' : parse_number_with_commas (reviews_count ),
'last_month_bought': parse_bought_count (last_month_bought)
})
count += 1
except:
continue
file_path = f"{BASE_PATH}/{int(time.time() * 1000)}-{marketplace}-{keyword}.json"
save_ranking(ranks, file_path )
if( len(ranks) == 0 ):
return -1
return count
driver = get_driver()
for keyword in keywords:
for marketplace, details in marketplaces.items():
if marketplace == 'AMAZON_USA':
url = details['url']
ratingPrefix = details['ratingPrefix']
count =1
for i in range(1, MAX_PAGE):
count = get_amazon_ranks(url, marketplace, ratingPrefix, keyword, i, count)
if count == -1:
break
sleep(3)
driver.quit()