Skip to content
Merged
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
134 changes: 67 additions & 67 deletions react_on_rails_pro/spec/dummy/client/app/utils/redisReceiver.ts
Original file line number Diff line number Diff line change
Expand Up @@ -34,68 +34,55 @@ interface PendingPromise {
resolved?: boolean;
}

// Shared Redis client
let sharedRedisClient: RedisClientType | null = null;
let isClientConnected = false;

// Store active listeners by requestId
const activeListeners: Record<string, RequestListener | undefined> = {};

// Store pending promises
const pendingPromises: Record<string, PendingPromise | undefined> = {};

/**
* Gets or creates the shared Redis client
*/
async function getRedisClient() {
if (!sharedRedisClient) {
const url = process.env.REDIS_URL || 'redis://localhost:6379';
sharedRedisClient = createClient({ url });
}

if (!isClientConnected) {
await sharedRedisClient.connect();
isClientConnected = true;
}

return sharedRedisClient;
}

/**
* Closes the shared Redis client
*/
async function closeRedisClient() {
if (sharedRedisClient && isClientConnected) {
await sharedRedisClient.quit();
isClientConnected = false;
}
}

/**
* Listens to a Redis stream for data based on a requestId
* @param requestId - The stream key to listen on
* @returns An object with a getValue function to get values by key
*/
export function listenToRequestData(requestId: string): RequestListener {
// If a listener for this requestId already exists, return it
const existingListener = activeListeners[requestId];
if (existingListener) {
return existingListener;
}

// Private state for THIS listener only - no global state
const pendingPromises: Record<string, PendingPromise | undefined> = {};
const receivedKeys: string[] = [];

// Stream key for this request
const streamKey = `stream:${requestId}`;

// IDs of messages that need to be deleted
const messagesToDelete: string[] = [];

// Track if this listener is active
let isActive = true;
// Track if we've received the end message
let isEnded = false;

// Create dedicated Redis client for THIS listener
const url = process.env.REDIS_URL || 'redis://localhost:6379';
const redisClient: RedisClientType = createClient({ url });
Comment on lines +47 to +48
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

Critical: Add error event listener to prevent process crash.

The Redis client requires at least one error listener; without it, unhandled errors will crash the Node.js process. This is a production-critical issue.

Apply this diff to add error handling immediately after client creation:

 const url = process.env.REDIS_URL || 'redis://localhost:6379';
 const redisClient: RedisClientType = createClient({ url });
+
+// MUST handle errors to prevent process crash
+redisClient.on('error', (err) => {
+  console.error('Redis client error:', err);
+  // Error will be handled by connection retry logic in ensureConnected
+});
+
 let isClientConnected = false;
 let connectionPromise: Promise<void> | null = null;
🤖 Prompt for AI Agents
In react_on_rails_pro/spec/dummy/client/app/utils/redisReceiver.ts around lines
54 to 55, the Redis client is created without an error listener which can cause
unhandled errors to crash the Node.js process; add an error handler immediately
after createClient by attaching redisClient.on('error', handler) (use the
project's logger if available or console.error) to log the error and prevent
process termination, and ensure this listener is in place before calling
redisClient.connect() or using the client.

let isClientConnected = false;
let connectionPromise: Promise<void> | null = null;

/**
* Ensures the Redis client is connected
* Prevents race condition where multiple concurrent calls try to connect
*/
async function ensureConnected(): Promise<RedisClientType> {
// Fast path: already connected
if (isClientConnected) {
return redisClient;
}

// Start connection if not already in progress
if (!connectionPromise) {
connectionPromise = redisClient
.connect()
.then(() => {
isClientConnected = true;
connectionPromise = null; // Clear after successful connection
})
.catch((error: unknown) => {
connectionPromise = null; // Clear on error to allow retry
throw error; // Re-throw to propagate error
});
}

// Wait for connection to complete (handles concurrent calls)
await connectionPromise;
return redisClient;
}

/**
* Process a message from the Redis stream
*/
Expand Down Expand Up @@ -154,7 +141,7 @@ export function listenToRequestData(requestId: string): RequestListener {
}

try {
const client = await getRedisClient();
const client = await ensureConnected();
await client.xDel(streamKey, messagesToDelete);
messagesToDelete.length = 0; // Clear the array
} catch (error) {
Expand All @@ -171,7 +158,7 @@ export function listenToRequestData(requestId: string): RequestListener {
}

try {
const client = await getRedisClient();
const client = await ensureConnected();

// Read all messages from the beginning of the stream
const results = (await client.xRead({ key: streamKey, id: '0' }, { COUNT: 100 })) as
Expand Down Expand Up @@ -203,7 +190,7 @@ export function listenToRequestData(requestId: string): RequestListener {
}

try {
const client = await getRedisClient();
const client = await ensureConnected();

// Use $ as the ID to read only new messages
let lastId = '$';
Expand Down Expand Up @@ -316,28 +303,41 @@ export function listenToRequestData(requestId: string): RequestListener {
close: async () => {
isActive = false;

// Delete this listener from active listeners
activeListeners[requestId] = undefined;

// Reject any pending promises
Object.entries(pendingPromises).forEach(([key, pendingPromise]) => {
if (pendingPromise) {
// Reject and cleanup all pending promises
Object.entries(pendingPromises).forEach(([_, pendingPromise]) => {
if (pendingPromise && !pendingPromise.resolved) {
clearTimeout(pendingPromise.timer);
pendingPromise.reject(new Error('Redis connection closed'));
pendingPromises[key] = undefined;
}
});

// Only close the Redis client if no other listeners are active
const hasActiveListeners = Object.values(activeListeners).some(Boolean);
if (!hasActiveListeners) {
await closeRedisClient();
// Clear the pendingPromises map completely
// eslint-disable-next-line @typescript-eslint/no-dynamic-delete
Object.keys(pendingPromises).forEach((key) => delete pendingPromises[key]);

// Wait for any pending connection attempt to complete
if (connectionPromise) {
try {
await connectionPromise;
} catch {
// Connection failed, but we still need to clean up state
connectionPromise = null;
}
}

// Always close THIS listener's Redis client
try {
if (isClientConnected) {
await redisClient.quit();
}
} catch (error) {
console.error('Error closing Redis client:', error);
} finally {
isClientConnected = false;
connectionPromise = null;
}
},
};

// Store the listener in active listeners
activeListeners[requestId] = listener;

return listener;
}
Loading