marketing-SQS-RDS-script/script.js

242 lines
6.6 KiB
JavaScript

require("dotenv").config();
const fs = require("fs");
const { DateTime } = require("luxon");
const {
SQSClient,
ReceiveMessageCommand,
DeleteMessageBatchCommand,
} = require("@aws-sdk/client-sqs");
const mysql = require("mysql2/promise");
const { off } = require("process");
const raw = fs.readFileSync("./marketplaces.json", "utf8");
const marketplaces = JSON.parse(raw);
const pool = mysql.createPool({
host: process.env.DB_HOST,
user: process.env.DB_USER,
password: process.env.DB_PASSWORD,
database: process.env.DB_NAME,
waitForConnections: true,
connectionLimit: 10,
});
// All 7 queues with their regions
const QUEUES = [
{ url: process.env.SQS_QUEUE_US_URL, region: process.env.AWS_REGION_US_EAST },
{ url: process.env.SQS_QUEUE_CA_URL, region: process.env.AWS_REGION_US_EAST },
{ url: process.env.SQS_QUEUE_UK_URL, region: process.env.AWS_REGION_EU_WEST },
{ url: process.env.SQS_QUEUE_DE_URL, region: process.env.AWS_REGION_EU_WEST },
{ url: process.env.SQS_QUEUE_FR_URL, region: process.env.AWS_REGION_EU_WEST },
{ url: process.env.SQS_QUEUE_IT_URL, region: process.env.AWS_REGION_EU_WEST },
{ url: process.env.SQS_QUEUE_ES_URL, region: process.env.AWS_REGION_EU_WEST },
];
const clients = {};
function getClient(region) {
if (!clients[region]) {
clients[region] = new SQSClient({
region,
credentials: {
accessKeyId: process.env.AWS_ACCESS_KEY,
secretAccessKey: process.env.AWS_SECRET_KEY,
},
});
}
return clients[region];
}
function parseMessage(jsonStr) {
const data = JSON.parse(jsonStr);
return {
idempotency_id: data.idempotency_id,
dataset_id: data.dataset_id,
marketplace_id: data.marketplace_id,
marketplace: getMarketplaceName(data.marketplace_id),
currency: data.currency,
advertiser_id: data.advertiser_id,
campaign_id: data.campaign_id,
ad_group_id: data.ad_group_id,
ad_id: data.ad_id,
keyword_id: data.keyword_id,
keyword_text: data.keyword_text,
match_type: data.match_type,
placement: data.placement,
time_window_start_original: data.time_window_start,
time_window_start: formatToMysqlDatetime(data.time_window_start),
utc_date_time: getUtcDateTime(data.time_window_start, data.marketplace_id),
clicks: Number(data.clicks || 0),
impressions: Number(data.impressions || 0),
cost: Number(data.cost || 0),
};
}
function getMarketplaceName(marketplaceId) {
const map = marketplaces[marketplaceId];
return map?.marketplace;
}
function offsetToMinutes(offsetStr) {
const match = offsetStr.match(/^UTC([+-])(\d{2}):(\d{2})$/);
if (!match) throw new Error(`Invalid offset format: ${offsetStr}`);
const [, sign, hours, minutes] = match;
const total = parseInt(hours) * 60 + parseInt(minutes);
return sign === "+" ? total : -total;
}
function convertToUtcFromOffset(localIsoString, offsetStr) {
const local = new Date(localIsoString);
const offsetMinutes = offsetToMinutes(offsetStr);
const utcDate = new Date(local.getTime() - offsetMinutes * 60 * 1000);
const pad = (n) => String(n).padStart(2, "0");
return (
`${utcDate.getUTCFullYear()}-${pad(utcDate.getUTCMonth() + 1)}-${pad(
utcDate.getUTCDate()
)} ` +
`${pad(utcDate.getUTCHours())}:${pad(utcDate.getUTCMinutes())}:${pad(
utcDate.getUTCSeconds()
)}`
);
}
function formatToMysqlDatetime(dateTime, marketplace_id) {
const timezone = marketplaces[marketplace_id]?.timezone;
const local = DateTime.fromISO(dateTime, { zone: timezone }).toUTC();
return local.toFormat("yyyy-MM-dd HH:mm:ss");
}
function getUtcDateTime(startDateTime, marketplaceId) {
const offset = marketplaces[marketplaceId]?.timezoneUTC;
const utcTime = convertToUtcFromOffset(startDateTime, offset);
return utcTime;
}
async function saveToDbBulk(traffics) {
if (traffics.length === 0) return true;
const conn = await pool.getConnection();
try {
const sql = `
REPLACE INTO sponsored_product_traffic (
idempotency_id, dataset_id, marketplace_id, marketplace, currency, advertiser_id,
campaign_id, ad_group_id, ad_id, keyword_id, keyword_text, match_type, placement,
time_window_start, utc_date_time, clicks, impressions, cost
) VALUES ?
`;
const values = traffics.map((t) => [
t.idempotency_id,
t.dataset_id,
t.marketplace_id,
t.marketplace,
t.currency,
t.advertiser_id,
t.campaign_id,
t.ad_group_id,
t.ad_id,
t.keyword_id,
t.keyword_text,
t.match_type,
t.placement,
t.time_window_start,
t.utc_date_time,
t.clicks,
t.impressions,
t.cost,
]);
await conn.query(sql, [values]);
return true;
} catch (err) {
console.error("DB Bulk Error:", err);
return false;
} finally {
conn.release();
}
}
async function processQueue(queue) {
const client = getClient(queue.region);
const BATCHES = 50;
const traffics = [];
const deleteEntries = [];
for (let i = 0; i < BATCHES; i++) {
const command = new ReceiveMessageCommand({
QueueUrl: queue.url,
MaxNumberOfMessages: 10,
WaitTimeSeconds: 10,
});
const response = await client.send(command);
const messages = response.Messages || [];
if (messages.length === 0) {
`No Messages in Queue ${queue.url}`;
break;
}
for (const msg of messages) {
try {
const traffic = parseMessage(msg.Body);
traffics.push(traffic);
deleteEntries.push({
Id: msg.MessageId,
ReceiptHandle: msg.ReceiptHandle,
});
} catch (err) {
console.error(`Invalid message in ${queue.url}:`, err.message);
}
}
}
if (traffics.length > 0) {
const saved = await saveToDbBulk(traffics);
if (saved && deleteEntries.length > 0) {
for (let i = 0; i < deleteEntries.length; i += 10) {
const batch = deleteEntries.slice(i, i + 10);
await client.send(
new DeleteMessageBatchCommand({
QueueUrl: queue.url,
Entries: batch,
})
);
}
}
}
console.log(
`[${queue.region}] ${queue.url} -> processed ${traffics.length} messages ${DateTime.now().toFormat(
"yyyy-MM-dd HH:mm:ss"
)}`
);
}
// async function main() {
// // Parallel processing of all queues
// await Promise.all(QUEUES.map(processQueue));
// }
// main().catch(console.error);
async function main() {
await Promise.all(QUEUES.map(processQueue));
}
async function runLoop() {
try {
await main(); // wait for current execution to finish
} catch (err) {
console.error("Error in main():", err);
}
// Wait 5 seconds before running again
setTimeout(runLoop, 5000);
}
runLoop();