242 lines
6.6 KiB
JavaScript
242 lines
6.6 KiB
JavaScript
require("dotenv").config();
|
|
const fs = require("fs");
|
|
const { DateTime } = require("luxon");
|
|
|
|
const {
|
|
SQSClient,
|
|
ReceiveMessageCommand,
|
|
DeleteMessageBatchCommand,
|
|
} = require("@aws-sdk/client-sqs");
|
|
|
|
const mysql = require("mysql2/promise");
|
|
const { off } = require("process");
|
|
|
|
const raw = fs.readFileSync("./marketplaces.json", "utf8");
|
|
const marketplaces = JSON.parse(raw);
|
|
|
|
const pool = mysql.createPool({
|
|
host: process.env.DB_HOST,
|
|
user: process.env.DB_USER,
|
|
password: process.env.DB_PASSWORD,
|
|
database: process.env.DB_NAME,
|
|
waitForConnections: true,
|
|
connectionLimit: 10,
|
|
});
|
|
|
|
// All 7 queues with their regions
|
|
const QUEUES = [
|
|
{ url: process.env.SQS_QUEUE_US_URL, region: process.env.AWS_REGION_US_EAST },
|
|
{ url: process.env.SQS_QUEUE_CA_URL, region: process.env.AWS_REGION_US_EAST },
|
|
{ url: process.env.SQS_QUEUE_UK_URL, region: process.env.AWS_REGION_EU_WEST },
|
|
{ url: process.env.SQS_QUEUE_DE_URL, region: process.env.AWS_REGION_EU_WEST },
|
|
{ url: process.env.SQS_QUEUE_FR_URL, region: process.env.AWS_REGION_EU_WEST },
|
|
{ url: process.env.SQS_QUEUE_IT_URL, region: process.env.AWS_REGION_EU_WEST },
|
|
{ url: process.env.SQS_QUEUE_ES_URL, region: process.env.AWS_REGION_EU_WEST },
|
|
];
|
|
|
|
const clients = {};
|
|
|
|
function getClient(region) {
|
|
if (!clients[region]) {
|
|
clients[region] = new SQSClient({
|
|
region,
|
|
credentials: {
|
|
accessKeyId: process.env.AWS_ACCESS_KEY,
|
|
secretAccessKey: process.env.AWS_SECRET_KEY,
|
|
},
|
|
});
|
|
}
|
|
return clients[region];
|
|
}
|
|
|
|
function parseMessage(jsonStr) {
|
|
const data = JSON.parse(jsonStr);
|
|
return {
|
|
idempotency_id: data.idempotency_id,
|
|
dataset_id: data.dataset_id,
|
|
marketplace_id: data.marketplace_id,
|
|
marketplace: getMarketplaceName(data.marketplace_id),
|
|
currency: data.currency,
|
|
advertiser_id: data.advertiser_id,
|
|
campaign_id: data.campaign_id,
|
|
ad_group_id: data.ad_group_id,
|
|
ad_id: data.ad_id,
|
|
keyword_id: data.keyword_id,
|
|
keyword_text: data.keyword_text,
|
|
match_type: data.match_type,
|
|
placement: data.placement,
|
|
time_window_start_original: data.time_window_start,
|
|
time_window_start: formatToMysqlDatetime(data.time_window_start),
|
|
utc_date_time: getUtcDateTime(data.time_window_start, data.marketplace_id),
|
|
clicks: Number(data.clicks || 0),
|
|
impressions: Number(data.impressions || 0),
|
|
cost: Number(data.cost || 0),
|
|
};
|
|
}
|
|
|
|
function getMarketplaceName(marketplaceId) {
|
|
const map = marketplaces[marketplaceId];
|
|
return map?.marketplace;
|
|
}
|
|
|
|
function offsetToMinutes(offsetStr) {
|
|
const match = offsetStr.match(/^UTC([+-])(\d{2}):(\d{2})$/);
|
|
if (!match) throw new Error(`Invalid offset format: ${offsetStr}`);
|
|
|
|
const [, sign, hours, minutes] = match;
|
|
const total = parseInt(hours) * 60 + parseInt(minutes);
|
|
return sign === "+" ? total : -total;
|
|
}
|
|
|
|
function convertToUtcFromOffset(localIsoString, offsetStr) {
|
|
const local = new Date(localIsoString);
|
|
const offsetMinutes = offsetToMinutes(offsetStr);
|
|
const utcDate = new Date(local.getTime() - offsetMinutes * 60 * 1000);
|
|
|
|
const pad = (n) => String(n).padStart(2, "0");
|
|
return (
|
|
`${utcDate.getUTCFullYear()}-${pad(utcDate.getUTCMonth() + 1)}-${pad(
|
|
utcDate.getUTCDate()
|
|
)} ` +
|
|
`${pad(utcDate.getUTCHours())}:${pad(utcDate.getUTCMinutes())}:${pad(
|
|
utcDate.getUTCSeconds()
|
|
)}`
|
|
);
|
|
}
|
|
|
|
function formatToMysqlDatetime(dateTime, marketplace_id) {
|
|
const timezone = marketplaces[marketplace_id]?.timezone;
|
|
const local = DateTime.fromISO(dateTime, { zone: timezone }).toUTC();
|
|
return local.toFormat("yyyy-MM-dd HH:mm:ss");
|
|
}
|
|
|
|
function getUtcDateTime(startDateTime, marketplaceId) {
|
|
const offset = marketplaces[marketplaceId]?.timezoneUTC;
|
|
const utcTime = convertToUtcFromOffset(startDateTime, offset);
|
|
return utcTime;
|
|
}
|
|
|
|
async function saveToDbBulk(traffics) {
|
|
if (traffics.length === 0) return true;
|
|
|
|
const conn = await pool.getConnection();
|
|
try {
|
|
const sql = `
|
|
REPLACE INTO sponsored_product_traffic (
|
|
idempotency_id, dataset_id, marketplace_id, marketplace, currency, advertiser_id,
|
|
campaign_id, ad_group_id, ad_id, keyword_id, keyword_text, match_type, placement,
|
|
time_window_start, utc_date_time, clicks, impressions, cost
|
|
) VALUES ?
|
|
`;
|
|
|
|
const values = traffics.map((t) => [
|
|
t.idempotency_id,
|
|
t.dataset_id,
|
|
t.marketplace_id,
|
|
t.marketplace,
|
|
t.currency,
|
|
t.advertiser_id,
|
|
t.campaign_id,
|
|
t.ad_group_id,
|
|
t.ad_id,
|
|
t.keyword_id,
|
|
t.keyword_text,
|
|
t.match_type,
|
|
t.placement,
|
|
t.time_window_start,
|
|
t.utc_date_time,
|
|
t.clicks,
|
|
t.impressions,
|
|
t.cost,
|
|
]);
|
|
|
|
await conn.query(sql, [values]);
|
|
return true;
|
|
} catch (err) {
|
|
console.error("DB Bulk Error:", err);
|
|
return false;
|
|
} finally {
|
|
conn.release();
|
|
}
|
|
}
|
|
|
|
async function processQueue(queue) {
|
|
const client = getClient(queue.region);
|
|
const BATCHES = 50;
|
|
const traffics = [];
|
|
const deleteEntries = [];
|
|
|
|
for (let i = 0; i < BATCHES; i++) {
|
|
const command = new ReceiveMessageCommand({
|
|
QueueUrl: queue.url,
|
|
MaxNumberOfMessages: 10,
|
|
WaitTimeSeconds: 10,
|
|
});
|
|
|
|
const response = await client.send(command);
|
|
const messages = response.Messages || [];
|
|
|
|
if (messages.length === 0) {
|
|
`No Messages in Queue ${queue.url}`;
|
|
break;
|
|
}
|
|
for (const msg of messages) {
|
|
try {
|
|
const traffic = parseMessage(msg.Body);
|
|
traffics.push(traffic);
|
|
deleteEntries.push({
|
|
Id: msg.MessageId,
|
|
ReceiptHandle: msg.ReceiptHandle,
|
|
});
|
|
} catch (err) {
|
|
console.error(`Invalid message in ${queue.url}:`, err.message);
|
|
}
|
|
}
|
|
}
|
|
|
|
if (traffics.length > 0) {
|
|
const saved = await saveToDbBulk(traffics);
|
|
if (saved && deleteEntries.length > 0) {
|
|
for (let i = 0; i < deleteEntries.length; i += 10) {
|
|
const batch = deleteEntries.slice(i, i + 10);
|
|
await client.send(
|
|
new DeleteMessageBatchCommand({
|
|
QueueUrl: queue.url,
|
|
Entries: batch,
|
|
})
|
|
);
|
|
}
|
|
}
|
|
}
|
|
|
|
console.log(
|
|
`[${queue.region}] ${queue.url} -> processed ${traffics.length} messages ${DateTime.now().toFormat(
|
|
"yyyy-MM-dd HH:mm:ss"
|
|
)}`
|
|
);
|
|
}
|
|
|
|
// async function main() {
|
|
// // Parallel processing of all queues
|
|
// await Promise.all(QUEUES.map(processQueue));
|
|
// }
|
|
|
|
// main().catch(console.error);
|
|
|
|
async function main() {
|
|
await Promise.all(QUEUES.map(processQueue));
|
|
}
|
|
|
|
async function runLoop() {
|
|
try {
|
|
await main(); // wait for current execution to finish
|
|
} catch (err) {
|
|
console.error("Error in main():", err);
|
|
}
|
|
|
|
// Wait 5 seconds before running again
|
|
setTimeout(runLoop, 5000);
|
|
}
|
|
|
|
runLoop();
|