Commit ef65af44 authored by Torstein's avatar Torstein
Browse files

DEICH-5896 various improvements to Mailchimp Synchronizer

- improve linting setup (no more false positives in tests)
- use dotenv to load .env-file loccally
- add message object validation
- fix producer code
- improve logging
- format with prettier
- remove most hard-coded constants that now comes from dotenv
- switch to upsert when subscribing a new member
parent 9e7cc55c
{
"parser": "babel-eslint",
"env": {
"node": true
"node": true,
"mocha": true
},
"extends": ["airbnb-base", "plugin:import/warnings", "prettier"],
"plugins": ["prettier", "import"],
"extends": [
"airbnb-base",
"plugin:import/warnings",
"prettier",
"plugin:mocha/recommended",
"plugin:chai-friendly/recommended"
],
"plugins": ["prettier", "import", "mocha", "chai-friendly"],
"globals": {},
"rules": {
"no-continue": 0,
"no-underscore-dangle": 0,
"class-methods-use-this": 0,
"import/order": 0,
"no-unused-expressions": ["error", { "allowTaggedTemplates": true }],
"consistent-return": 0,
"import/no-extraneous-dependencies": 0,
"semi": [2, "always"],
......@@ -23,6 +29,16 @@
"arrow-parens": 0,
"no-case-declarations": 0,
"prettier/prettier": "error",
"no-restricted-syntax": ["error", "LabeledStatement", "WithStatement"]
}
"no-restricted-syntax": ["error", "LabeledStatement", "WithStatement"],
"no-unused-expressions": 0,
"chai-friendly/no-unused-expressions": 2
},
"overrides": [
{
"files": ["*_spec.js", "*.spec.js"],
"rules": {
"prefer-arrow-callback": "off"
}
}
]
}
......@@ -29,7 +29,7 @@ Action: Unsubscribe
| source | System that initiated the change. E.g. `koha`, `krr` |
| timestamp | The time the update happened in the source system. Used to ensure idempotency. |
### Update
### borrowerEmailUpdated
Action: Update email
......
This diff is collapsed.
......@@ -4,33 +4,36 @@
"description": "",
"main": "index.js",
"scripts": {
"start": "node src/server.js",
"start:demo-producer": "node src/demo-producer.js",
"test": "mocha --require esm",
"start": "DOTENV_CONFIG_PATH=../.env node -r dotenv/config src/server.js",
"start:demo-producer": "DOTENV_CONFIG_PATH=../.env node -r dotenv/config src/demo-producer.js",
"test": "DOTENV_CONFIG_PATH=../.env mocha -r esm -r dotenv/config",
"lint": "eslint --ext .js ."
},
"author": "",
"license": "GPL-3.0-only",
"dependencies": {
"@mailchimp/mailchimp_marketing": "^3.0.36",
"@mailchimp/mailchimp_marketing": "^3.0.42",
"amqplib": "^0.7.1",
"chai": "^4.3.4",
"dotenv": "^8.2.0",
"joi": "^17.4.0",
"md5": "^2.3.0",
"prom-client": "^13.1.0",
"rascal": "^13.0.0",
"rascal": "^13.0.2",
"winston": "^3.3.3"
},
"devDependencies": {
"@types/amqplib": "^0.5.17",
"babel-eslint": "10.0.1",
"chai": "^4.3.0",
"chance": "^1.1.7",
"eslint": "^5.16.0",
"eslint-config-airbnb": "17.1.0",
"eslint-config-prettier": "4.0.0",
"eslint-plugin-import": "2.16.0",
"eslint-plugin-prettier": "3.0.1",
"eslint": "^7.26.0",
"eslint-config-airbnb": "18.2.1",
"eslint-config-prettier": "8.3.0",
"eslint-plugin-chai-friendly": "^0.7.1",
"eslint-plugin-mocha": "^8.1.0",
"eslint-plugin-prettier": "3.4.0",
"esm": "^3.2.25",
"mocha": "^8.2.1",
"prettier": "1.15.3"
"mocha": "^8.4.0",
"prettier": "2.3.0"
}
}
const Rascal = require("rascal");
const config = require("./rascal.config.js");
const config = require("./rascal.config");
const Chance = require("chance");
const winston = require("winston");
const md5 = require("md5");
const crypto = require("crypto");
const chance = new Chance();
const { createLogger, format, transports } = winston;
......@@ -10,25 +11,27 @@ const { createLogger, format, transports } = winston;
const logger = createLogger({
level: "silly",
format: format.combine(format.colorize(), format.splat(), format.simple()),
transports: [new transports.Console()]
transports: [new transports.Console()],
});
Rascal.Broker.create(Rascal.withDefaultConfig(config.rascal), (err, broker) => {
if (err) fatal(err);
broker.on("error", err => {
broker.on("error", (err) => {
logger.error("[BROKER] Error", { error: err });
});
// Simulate a borrower subscribing
setInterval(() => {
const encryptedEmail = encryptEmail(chance.email());
const message = {
eventType: "borrowerCreated",
email: chance.email(),
email: encryptedEmail,
borrowernumber: chance.integer(),
source: "koha",
timestamp: new Date(),
crash: randomInt(10) === 10
crash: randomInt(10) === 10,
};
const routingKey = `koha.borrower.${message.borrowernumber}.newsletter_update`;
......@@ -37,8 +40,8 @@ Rascal.Broker.create(Rascal.withDefaultConfig(config.rascal), (err, broker) => {
routingKey,
options: {
correlationId: md5(message.email?.toLowerCase()),
contentType: "application/json"
}
contentType: "application/json",
},
};
broker.publish("borrower_event", message, metadata, (err, publication) => {
......@@ -51,7 +54,7 @@ Rascal.Broker.create(Rascal.withDefaultConfig(config.rascal), (err, broker) => {
.on("success", () => {
logger.debug("[PUBLICATION] Message published");
})
.on("error", err => {
.on("error", (err) => {
logger.error("[PUBLICATION] ERROR", { error: err });
});
});
......@@ -66,3 +69,22 @@ function fatal(err) {
logger.error("Fatal error. ERROR", { error: err });
process.exit(1);
}
function encryptEmail(email) {
const PRIMARY_KEY = process.env.SHARED_SECRET_AES_128_KEY_PRIMARY;
const authTagLength = 16;
const algorithm = "aes-128-gcm";
const decodedKey = Buffer.from(PRIMARY_KEY, "hex");
const iv = crypto.randomBytes(16);
const ivHex = iv.toString("hex");
const cipher = crypto.createCipheriv(algorithm, decodedKey, iv, { authTagLength });
let encryptedText = cipher.update(email, "utf-8", "base64");
encryptedText += cipher.final("base64");
const authTag = cipher.getAuthTag().toString("base64");
return `${algorithm}|${ivHex}|${authTag}|${encryptedText}`;
}
const mailChimpFacade = require("../mailchimp");
const cryptoUtils = require("../utils/cryptoUtils");
const { validateMessage } = require("../utils/message-schema");
const { tryGettingKnownMailchimpError, ERROR } = require("../utils/errorUtils");
......@@ -7,11 +8,19 @@ module.exports = () => {
return async (message, parentLogger) => {
const logger = parentLogger.child({
eventType: message?.eventType,
borrowernumber: message?.borrowernumber
borrowernumber: message?.borrowernumber,
});
logger.info(`Handling newsletter update of type '${message?.eventType}'`);
const { error } = validateMessage(message);
if (error) {
logger.info(`Message did not validate, skipping`, { error });
error.recoverable = false;
throw error;
}
try {
await doMailchimpActions(message, logger);
} catch (err) {
......
const mailchimp = require("@mailchimp/mailchimp_marketing");
const md5 = require("md5");
const listId = process.env.MAILCHIMP_LIST_ID || "f7f3cf3e91";
const listId = process.env.MAILCHIMP_LIST_ID;
mailchimp.setConfig({
apiKey: process.env.MAILCHIMP_API_KEY || "7cacf736a65a3c570dcc013661f1f64a",
server: process.env.MAILCHIMP_SERVER || "us1"
apiKey: process.env.MAILCHIMP_API_KEY,
server: process.env.MAILCHIMP_SERVER,
});
async function ping(logger) {
......@@ -47,16 +47,15 @@ async function updateEmail(oldEmail, newEmail, borrowernumber, lastUpdatedAt, lo
// At this point the user was found, and the state was good
await mailchimp.lists.updateListMember(listId, oldMemberHash, {
email_address: newEmail,
merge_tags: {
borrowernumber,
updated_in_koha: lastUpdatedAt
}
merge_fields: {
SYSTEM_ID: borrowernumber,
},
});
logger.info("Successfully updated subscription for borrower", {
listId,
memberHash: oldMemberHash,
borrowernumber
borrowernumber,
});
}
......@@ -66,18 +65,22 @@ async function updateEmail(oldEmail, newEmail, borrowernumber, lastUpdatedAt, lo
* @param {*} logger Logger instance
*/
async function subscribe(email, borrowernumber, logger) {
await mailchimp.lists.addListMember(listId, {
const memberHash = generateMemberHash(email);
// Will update if it exist, and add with status "subscribed" if new.
// An existing member will not change status.
await mailchimp.lists.setListMember(listId, memberHash, {
email_address: email,
status: "subscribed",
merge_tags: {
borrowernumber
}
status_if_new: "subscribed",
merge_fields: {
SYSTEM_ID: borrowernumber,
},
});
logger.info("Successfully subscribed borrower", {
listId,
memberHash: generateMemberHash(email),
borrowernumber
memberHash,
borrowernumber,
});
}
......@@ -102,7 +105,7 @@ async function unsubscribe(memberHash, lastUpdatedAt, logger) {
logger.info("Successfully unsubscribed borrower", {
listId,
memberHash
memberHash,
});
}
......@@ -122,5 +125,5 @@ module.exports = {
updateEmail,
subscribe,
unsubscribe,
getAllMembers
getAllMembers,
};
......@@ -7,16 +7,19 @@ const config = require("./rascal.config.js");
const { getCallId: getCorrelationId } = require("./utils");
module.exports = {
setup
setup,
};
async function setup({ logger: parentLogger, counter }) {
async function setup({ logFactory, counter }) {
const parentLogger = logFactory.child({ context: "Worker" });
try {
const broker = await Broker.create(Rascal.withDefaultConfig(config.rascal));
broker.on("error", error => {
broker.on("error", (error) => {
parentLogger.error("[BROKER] Error", { error });
});
const entries = Object.entries(broker.config.subscriptions);
// Set up each subscription on at the time to not spam queue and make code as simple as possible.
for (const [subscriptionName, subscriptionConfig] of entries) {
......@@ -26,7 +29,7 @@ async function setup({ logger: parentLogger, counter }) {
// Logger specific for this subscription
const logger = parentLogger.child({
subscription: subscriptionName
subscription: subscriptionName,
});
// Load the the handler module on demand using the name in the configuration
......@@ -84,10 +87,10 @@ async function setup({ logger: parentLogger, counter }) {
logger.error("Redeliveries exceeded", { error });
ackOrNack(error, broker.config.recovery.dead_letter);
})
.on("cancel", err => {
.on("cancel", (err) => {
logger.warn(err.message);
})
.on("error", err => {
.on("error", (err) => {
logger.error(err.message);
});
}
......
......@@ -13,16 +13,16 @@ module.exports = {
slashes: true,
protocol: "amqp",
hostname: process.env.MAILCHIMP_SYNCHRONIZER_RABBITMQ_HOST || "rabbitmq.localhost",
user: process.env.MAILCHIMP_SYNCHRONIZER_RABBITMQ_USER || "mailchimp_synchronizer",
password: process.env.MAILCHIMP_SYNCHRONIZER_RABBITMQ_PASSWORD || "secret",
user: process.env.MAILCHIMP_SYNCHRONIZER_RABBITMQ_USER,
password: process.env.MAILCHIMP_SYNCHRONIZER_RABBITMQ_PASSWORD,
port: 5672,
vhost: "deichman",
options: {
heartbeat: 5
heartbeat: 5,
},
socketOptions: {
timeout: 5000
}
timeout: 5000,
},
},
// Define exchanges within the registration vhost
......@@ -30,23 +30,23 @@ module.exports = {
// Shared exchange for all services within this vhost
service: {
assert: false,
check: true
check: true,
},
// To delay failed messages before a retry
delay: {
assert: false,
check: true
check: true,
},
// To retry failed messages up to maximum number of times
retry: {
assert: false,
check: true
check: true,
},
// When retrying fails, messages end up here
dead_letters: {
assert: false,
check: true
}
check: true,
},
},
// Define queues within the registration vhost
......@@ -58,25 +58,28 @@ module.exports = {
arguments: {
// Route nacked messages to a service specific dead letter queue
"x-dead-letter-exchange": "dead_letters",
"x-dead-letter-routing-key": "mailchimp_synchronizer.dead_letter"
}
}
"x-dead-letter-routing-key": "mailchimp_synchronizer.dead_letter",
},
},
},
// Queue for holding dead letters until they can be resolved
"dead_letters:mailchimp_synchronizer": {}
"dead_letters:mailchimp_synchronizer": {},
},
// Bind the queues to the exchanges.
// A good naming convention for routing keys is producer.entity.event
bindings: {
"service[koha.borrower.*.newsletter_update] -> mailchimp_synchronizer:borrower:newsletter_updates": {},
"service[koha.borrower.*.newsletter_update] -> mailchimp_synchronizer:borrower:newsletter_updates":
{},
// Route retried messages back to their original queue using the CC routing keys set by Rascal
"retry[mailchimp_synchronizer:borrower:newsletter_updates.#] -> mailchimp_synchronizer:borrower:newsletter_updates": {},
"retry[mailchimp_synchronizer:borrower:newsletter_updates.#] -> mailchimp_synchronizer:borrower:newsletter_updates":
{},
// Route dead letters the service specific dead letter queue
"dead_letters[mailchimp_synchronizer.dead_letter] -> dead_letters:mailchimp_synchronizer": {}
"dead_letters[mailchimp_synchronizer.dead_letter] -> dead_letters:mailchimp_synchronizer":
{},
},
// Setup subscriptions
......@@ -89,9 +92,9 @@ module.exports = {
deferCloseChannel: 7000,
redeliveries: {
limit: 5,
counter: "shared"
}
}
counter: "shared",
},
},
},
// Setup publications
......@@ -100,17 +103,17 @@ module.exports = {
retry_in_1m: {
exchange: "delay",
options: {
CC: ["delay.1m"]
}
CC: ["delay.1m"],
},
},
// TODO This should not not be a part of the final production code.
// IDEA Split it out in a separate file and merge it with the config when testing.
borrower_event: {
exchange: "service"
}
}
}
exchange: "service",
},
},
},
},
// Define recovery strategies for different error scenarios
recovery: {
......@@ -120,11 +123,11 @@ module.exports = {
strategy: "forward",
attempts: 10,
publication: "retry_in_1m",
xDeathFix: true // See https://github.com/rabbitmq/rabbitmq-server/issues/161
xDeathFix: true, // See https://github.com/rabbitmq/rabbitmq-server/issues/161
},
{
strategy: "nack"
}
strategy: "nack",
},
],
// Republishing with immediate nack returns the message to the original queue but decorates
......@@ -133,18 +136,18 @@ module.exports = {
dead_letter: [
{
strategy: "republish",
immediateNack: true
}
]
immediateNack: true,
},
],
},
// Define counter(s) for counting redeliveries
redeliveries: {
counters: {
shared: {
size: 10,
type: "inMemory"
}
}
}
}
type: "inMemory",
},
},
},
},
};
const client = require("prom-client");
const http = require("http");
const winston = require("winston");
const { setup } = require("./rascal-worker");
const { createLogFactory } = require("./utils/logger");
const worker = require("./rascal-worker");
const { exit } = require("process");
const { createLogger, format, transports } = winston;
const port = 3000;
const { Counter, register } = client;
const c = new Counter({
name: "processed_counter",
help: "Processed items from queue",
labelNames: ["ok", "error"]
labelNames: ["ok", "error"],
});
client.collectDefaultMetrics();
const logger = createLogger({
level: "silly",
format: format.json(),
transports: [new transports.Console()]
// Also an instance of a logger
const logFactory = createLogFactory({
logLevel: "info",
});
// Feature toggle to enable/disable this worker
if (process.env.MAILCHIMP_SYNCHRONIZATION_ENABLED !== "true") {
logger.warn("Service deactivated. MAILCHIMP_SYNCHRONIZATION_ENABLED is not set to 'true'");
logFactory.warn("Service deactivated. MAILCHIMP_SYNCHRONIZATION_ENABLED is not set to 'true'");
process.exit(0);
}
......@@ -38,10 +35,10 @@ if (process.env.MAILCHIMP_SYNCHRONIZATION_ENABLED !== "true") {
});
server.listen(port, () => {
logger.debug(`Metrics server running on port ${port}`);
logFactory.debug(`Metrics server running on port ${port}`);
});
const broker = await setup({ logger, counter: params => c.inc(params) });
const broker = await worker.setup({ logFactory, counter: (params) => c.inc(params) });
setupGracefulShutdown(broker, server);
})();
......@@ -49,12 +46,12 @@ if (process.env.MAILCHIMP_SYNCHRONIZATION_ENABLED !== "true") {
// The goal is to allow the service to gracefully shut down
function setupGracefulShutdown(broker, server) {
if (!broker) {
logger.error("broker can not be null or undefined");
logFactory.error("broker can not be null or undefined");
exit(1);
}
if (!server) {
logger.error("server can not be null or undefined");
logFactory.error("server can not be null or undefined");
exit(1);
}
......@@ -62,34 +59,28 @@ function setupGracefulShutdown(broker, server) {
const gracefulShutdown = (eventName, exitCode = 0) => {
return () => {
logger.info(`${eventName} received. Shutting down (Might take some time)`);
logFactory.info(`${eventName} received. Shutting down (Might take some time)`);
setTimeout(() => {
logger.warn("Did not shut down gracefully within %d ms, hard stop!", maxShutDownWaitTime);
logFactory.warn(
"Did not shut down gracefully within %d ms, hard stop!",
maxShutDownWaitTime
);
process.exit(exitCode);
}, maxShutDownWaitTime);
new Promise(r => broker.shutdown(r))
.then(() => logger.debug("Closed broker..."))
.then(() => new Promise(r => server.close(r)))
.then(() => logger.debug("Closed server..."))
new Promise((r) => broker.shutdown(r))
.then(() => logFactory.debug("Closed broker..."))
.then(() => new Promise((r) => server.close(r)))
.then(() => logFactory.debug("Closed server..."))
.then(() => process.exit(exitCode))
.catch(error => {
logger.error("Failed to shut down gracefully", { error });
.catch((error) => {
logFactory.error("Failed to shut down gracefully", { error });
process.exit(-1);
});