Build cache and message queues with Valkey in-memory data store and Open Source Cloud

Eyevinn Technology
4 min read3 days ago

Valkey is an open source high-performance key/value datastore that supports a variety of workloads such as caching and message queues. In this blog post we will describe how a continue watching microservice can use Valkey in Open Source Cloud to store the position so a viewer can continue to watch where he or she left off. For streaming services with many simultaneous viewers it is critical that this store is high performant.

As an example we will use a simple continue watching micro service available in our GitHub. This service provides a simple HTTP API:

  • POST /position/:userId/:assetId/:position where position is in seconds
  • GET /position/:userId/:assetId to get position on a given asset
  • GET /position/:userId to get all positions for a specific user, newest first.

To store a position the following code is used.

const store = async (userId, assetId, position) => {
if (!userId || !assetId || !position) return false;
const key = generateKey(KEY_PREFIX, userId, assetId);
const success = await redisClient.setex(
key,
ONE_YEAR,
position
);
return success;
};

Here you may think that we would need to modify this code as it currently is built to use a Redis data store but we actually don’t as Valkey provides an API compatible with Redis clients.

What we will modify is that when we start this service we will create a Valkey data store in Open Source Cloud that we will use.

Let us look at index.js in the code repository and modify the function server(). Before modification it looks as this.

async function server() {
const fastify = require("fastify")({
ignoreTrailingSlash: true
});
await fastify.register(require("fastify-express"));
const cors = require("cors");
fastify.use(cors());

const fastifyRateLimit = require("fastify-rate-limit");
fastify.register(fastifyRateLimit, {
max: 100,
timeWindow: "1 minute"
});

fastify.register(require("./src/routes"), {
prefix: "/position"
});

return fastify;
}

What we will add is the following.

const { Context } = require('@osaas/client-core');
const { ValkeyDb } = require('@osaas/client-db');

...

const context = new Context();
const db = new ValkeyDb({ context, name: 'mydb' });
await db.init();

And then the function server() will look as this.

const { Context } = require('@osaas/client-core');
const { ValkeyDb } = require('@osaas/client-db');

async function server() {
const context = new Context();
const db = new ValkeyDb({ context, name: 'mydb' });
await db.init();

const fastify = require("fastify")({
ignoreTrailingSlash: true
});
await fastify.register(require("fastify-express"));
const cors = require("cors");
fastify.use(cors());

const fastifyRateLimit = require("fastify-rate-limit");
fastify.register(fastifyRateLimit, {
max: 100,
timeWindow: "1 minute"
});

fastify.register(require("./src/routes"), {
prefix: "/position"
});

return fastify;
}

Now when the server starts it will create a Valkey data store in Open Source Cloud first. As the current implementation reads the Redis host and port from environment variables the quickest (but also dirty) way is to just set these variables with the address and port to the data store created. We could refactor the code a bit to make this prettier but we will not do that in this blog post. To get the URL to the data store and override the env variables we do:

const redisUrl = await db.getRedisUrl();
process.env.REDIS_URL = redisUrl.hostname;
process.env.REDIS_PORT = redisUrl.port;

Adding this to the server() function we now have.

const { Context } = require('@osaas/client-core');
const { ValkeyDb } = require('@osaas/client-db');

async function server() {
const context = new Context();
const db = new ValkeyDb({ context, name: 'mydb' });
await db.init();

const redisUrl = await db.getRedisUrl();
process.env.REDIS_URL = redisUrl.hostname;
process.env.REDIS_PORT = redisUrl.port;

const fastify = require("fastify")({
ignoreTrailingSlash: true
});
await fastify.register(require("fastify-express"));
const cors = require("cors");
fastify.use(cors());

const fastifyRateLimit = require("fastify-rate-limit");
fastify.register(fastifyRateLimit, {
max: 100,
timeWindow: "1 minute"
});

fastify.register(require("./src/routes"), {
prefix: "/position"
});

return fastify;
}

We can now start this service and with curl test to store a position for a user and an asset. Before we start we need to install the client libraries.

% npm install --save @osaas/client-core
% npm install --save @osaas/client-db
% OSC_ACCESS_TOKEN=<personal-access-token> npm start
% curl -X POST http://localhost:3000/position/jonas/1/20
OK

This will store the position 20 for user jonas and for an asset with id 1 . We can verify this with this curl command.

% curl http://localhost:3000/position/jonas
[{"assetId":"1","position":"20","expiration":31535988}]

And that is all you need to quickly enable a high-performant data store for your continue watching service. This Continue Watching API is also available in Open Source Cloud.

What is Open Source Cloud

A software as a service based on open source with a unique transparent model where revenue is shared with the open source authors. Open Source Cloud offers media industry companies a quick way to incorporate open source in their solutions and the option to run the same software in-house as the source code is publicly available.

--

--

Eyevinn Technology

We are consultants sharing the passion for the technology for a media consumer of the future.