require("dotenv").config(); const fs = require("fs"); const { DateTime } = require("luxon"); const { SQSClient, ReceiveMessageCommand, DeleteMessageBatchCommand, } = require("@aws-sdk/client-sqs"); const mysql = require("mysql2/promise"); const { off } = require("process"); const raw = fs.readFileSync("./marketplaces.json", "utf8"); const marketplaces = JSON.parse(raw); const pool = mysql.createPool({ host: process.env.DB_HOST, user: process.env.DB_USER, password: process.env.DB_PASSWORD, database: process.env.DB_NAME, waitForConnections: true, connectionLimit: 10, }); // All 7 queues with their regions const QUEUES = [ { url: process.env.SQS_QUEUE_US_URL, region: process.env.AWS_REGION_US_EAST }, { url: process.env.SQS_QUEUE_CA_URL, region: process.env.AWS_REGION_US_EAST }, { url: process.env.SQS_QUEUE_UK_URL, region: process.env.AWS_REGION_EU_WEST }, { url: process.env.SQS_QUEUE_DE_URL, region: process.env.AWS_REGION_EU_WEST }, { url: process.env.SQS_QUEUE_FR_URL, region: process.env.AWS_REGION_EU_WEST }, { url: process.env.SQS_QUEUE_IT_URL, region: process.env.AWS_REGION_EU_WEST }, { url: process.env.SQS_QUEUE_ES_URL, region: process.env.AWS_REGION_EU_WEST }, ]; const clients = {}; function getClient(region) { if (!clients[region]) { clients[region] = new SQSClient({ region, accessKeyId: process.env.AWS_ACCESS_KEY, secretAccessKey: process.env.AWS_SECRET_KEY, }); } return clients[region]; } function parseMessage(jsonStr) { const data = JSON.parse(jsonStr); return { idempotency_id: data.idempotency_id, dataset_id: data.dataset_id, marketplace_id: data.marketplace_id, marketplace: getMarketplaceName(data.marketplace_id), currency: data.currency, advertiser_id: data.advertiser_id, campaign_id: data.campaign_id, ad_group_id: data.ad_group_id, ad_id: data.ad_id, keyword_id: data.keyword_id, keyword_text: data.keyword_text, match_type: data.match_type, placement: data.placement, time_window_start_original: data.time_window_start, time_window_start: formatToMysqlDatetime(data.time_window_start), utc_date_time: getUtcDateTime(data.time_window_start, data.marketplace_id), clicks: Number(data.clicks || 0), impressions: Number(data.impressions || 0), cost: Number(data.cost || 0), }; } function getMarketplaceName(marketplaceId) { const map = marketplaces[marketplaceId]; return map?.marketplace; } function offsetToMinutes(offsetStr) { const match = offsetStr.match(/^UTC([+-])(\d{2}):(\d{2})$/); if (!match) throw new Error(`Invalid offset format: ${offsetStr}`); const [, sign, hours, minutes] = match; const total = parseInt(hours) * 60 + parseInt(minutes); return sign === "+" ? total : -total; } function convertToUtcFromOffset(localIsoString, offsetStr) { const local = new Date(localIsoString); const offsetMinutes = offsetToMinutes(offsetStr); const utcDate = new Date(local.getTime() - offsetMinutes * 60 * 1000); const pad = (n) => String(n).padStart(2, "0"); return ( `${utcDate.getUTCFullYear()}-${pad(utcDate.getUTCMonth() + 1)}-${pad( utcDate.getUTCDate() )} ` + `${pad(utcDate.getUTCHours())}:${pad(utcDate.getUTCMinutes())}:${pad( utcDate.getUTCSeconds() )}` ); } function formatToMysqlDatetime(dateTime, marketplace_id) { const timezone = marketplaces[marketplace_id]?.timezone; const local = DateTime.fromISO(dateTime, { zone: timezone }).toUTC(); return local.toFormat("yyyy-MM-dd HH:mm:ss"); } function getUtcDateTime(startDateTime, marketplaceId) { const offset = marketplaces[marketplaceId]?.timezoneUTC; const utcTime = convertToUtcFromOffset(startDateTime, offset); return utcTime; } async function saveToDbBulk(traffics) { if (traffics.length === 0) return true; const conn = await pool.getConnection(); try { const sql = ` REPLACE INTO sponsored_product_traffic ( idempotency_id, dataset_id, marketplace_id, marketplace, currency, advertiser_id, campaign_id, ad_group_id, ad_id, keyword_id, keyword_text, match_type, placement, time_window_start, utc_date_time, clicks, impressions, cost ) VALUES ? `; const values = traffics.map((t) => [ t.idempotency_id, t.dataset_id, t.marketplace_id, t.marketplace, t.currency, t.advertiser_id, t.campaign_id, t.ad_group_id, t.ad_id, t.keyword_id, t.keyword_text, t.match_type, t.placement, t.time_window_start, t.utc_date_time, t.clicks, t.impressions, t.cost, ]); await conn.query(sql, [values]); return true; } catch (err) { console.error("DB Bulk Error:", err); return false; } finally { conn.release(); } } async function processQueue(queue) { const client = getClient(queue.region); const BATCHES = 50; const traffics = []; const deleteEntries = []; for (let i = 0; i < BATCHES; i++) { const command = new ReceiveMessageCommand({ QueueUrl: queue.url, MaxNumberOfMessages: 10, WaitTimeSeconds: 10, }); const response = await client.send(command); const messages = response.Messages || []; if (messages.length === 0) { `No Messages in Queue ${queue.url}`; break; } for (const msg of messages) { try { const traffic = parseMessage(msg.Body); traffics.push(traffic); deleteEntries.push({ Id: msg.MessageId, ReceiptHandle: msg.ReceiptHandle, }); } catch (err) { console.error(`Invalid message in ${queue.url}:`, err.message); } } } if (traffics.length > 0) { const saved = await saveToDbBulk(traffics); if (saved && deleteEntries.length > 0) { for (let i = 0; i < deleteEntries.length; i += 10) { const batch = deleteEntries.slice(i, i + 10); await client.send( new DeleteMessageBatchCommand({ QueueUrl: queue.url, Entries: batch, }) ); } } } console.log( `[${queue.region}] ${queue.url} -> processed ${traffics.length} messages` ); } // async function main() { // // Parallel processing of all queues // await Promise.all(QUEUES.map(processQueue)); // } // main().catch(console.error); async function main() { await Promise.all(QUEUES.map(processQueue)); } async function runLoop() { try { await main(); // wait for current execution to finish } catch (err) { console.error('Error in main():', err); } // Wait 5 seconds before running again setTimeout(runLoop, 5000); } runLoop();