@@ -79,11 +79,13 @@ func (m *Manager) IssueTransaction(w http.ResponseWriter, r *http.Request) {
7979type handlerCtxKey string
8080
8181const (
82- CtxStreamUserSession handlerCtxKey = "stream_user_session"
83- CtxStreamUserTransactions handlerCtxKey = "stream_user_transactions"
84- CtxStreamAllSessions handlerCtxKey = "stream_all_sessions"
85- CtxStreamAllTransactions handlerCtxKey = "stream_all_transactions"
86- CtxStreamUserArchiveSession handlerCtxKey = "stream_user_archive_sessions"
82+ CtxStreamUserSession handlerCtxKey = "stream_user_session"
83+ CtxStreamUserTransactionsResults handlerCtxKey = "stream_user_transactions_results"
84+ CtxStreamUserTransactionsPending handlerCtxKey = "stream_user_transactions_pending"
85+ CtxStreamAllSessions handlerCtxKey = "stream_all_sessions"
86+ CtxStreamAllTransactions handlerCtxKey = "stream_all_transactions"
87+ CtxStreamUserArchiveSession handlerCtxKey = "stream_user_archive_sessions"
88+ CtxStreamUserArchiveResultsTransactions handlerCtxKey = "stream_user_archive_results_transactions"
8789 CtxStreamUserArchivePendingTransactions handlerCtxKey = "stream_user_archive_pending_transactions"
8890)
8991
@@ -150,11 +152,11 @@ func (m *Manager) StreamUserSession(w http.ResponseWriter, r *http.Request) {
150152}
151153
152154/*
153- get user transactions information
155+ get user transactions results information
154156requires user authentication from middleware
155157user/
156158*/
157- func (m * Manager ) StreamUserTransactions (w http.ResponseWriter , r * http.Request ) {
159+ func (m * Manager ) StreamUserTransactionsResults (w http.ResponseWriter , r * http.Request ) {
158160
159161 /* get the username */
160162 username , ok := r .Context ().Value (middleware .ContextKeyUsername ).(string )
@@ -196,16 +198,78 @@ func (m *Manager) StreamUserTransactions(w http.ResponseWriter, r *http.Request)
196198 defer cancel ()
197199
198200 /* sending initial list of transactions data */
199- if err := m .sendCurrentUserTransactions (conn , sessionID , 100 ); err != nil {
201+ if err := m .sendCurrentUserTransactionsResults (conn , sessionID , 100 ); err != nil {
200202 m .errCh <- fmt .Errorf ("error sending initial transactions: %w" , err )
201203 return
202204 }
203205
204206 /* stream changes in transactions made in redis */
205- go m .listenForTransactionsChanges (ctx , conn , sessionID )
207+ go m .listenForTransactionsChangesResults (ctx , conn , sessionID )
206208
207209 /* specify the handler context */
208- ctxVal := context .WithValue (ctx , "type" , CtxStreamUserTransactions )
210+ ctxVal := context .WithValue (ctx , "type" , CtxStreamUserTransactionsResults )
211+
212+ /* handle web socket instructions from client */
213+ m .handleWebSocketCommands (conn , username , sessionID , ctxVal , cancel )
214+ }
215+
216+ /*
217+ get user transactions pending information
218+ requires user authentication from middleware
219+ user/
220+ */
221+ func (m * Manager ) StreamUserTransactionsPending (w http.ResponseWriter , r * http.Request ) {
222+
223+ /* get the username */
224+ username , ok := r .Context ().Value (middleware .ContextKeyUsername ).(string )
225+ if ! ok {
226+ http .Error (w , "Invalid user context" , http .StatusInternalServerError )
227+ return
228+ }
229+
230+ /* get the session id */
231+ sessionID , ok := r .Context ().Value (middleware .ContextKeySessionID ).(string )
232+ if ! ok {
233+ http .Error (w , "Invalid session ID context" , http .StatusInternalServerError )
234+ return
235+ }
236+
237+ m .mutex .RLock ()
238+ session , exists := m .sessionsMap [username ]
239+ m .mutex .RUnlock ()
240+
241+ if ! exists || session .ID .String () != sessionID {
242+ http .Error (w , "unauthorized" , http .StatusUnauthorized )
243+ return
244+ }
245+
246+ /* user exists and verified, upgrade the websocket connection */
247+ conn , err := m .upgrader .Upgrade (w , r , nil )
248+ if err != nil {
249+ m .errCh <- fmt .Errorf ("websocket upgrade error: %w" , err )
250+ return
251+ }
252+ defer conn .Close ()
253+
254+ /*
255+ context with cancel for web socket handlers
256+ this is the official context for a websocket connection
257+ cancelling this means closing components of the websocket handler
258+ */
259+ ctx , cancel := context .WithCancel (context .Background ())
260+ defer cancel ()
261+
262+ /* sending initial list of transactions data */
263+ if err := m .sendCurrentUserTransactionsPending (conn , sessionID , 100 ); err != nil {
264+ m .errCh <- fmt .Errorf ("error sending initial transactions: %w" , err )
265+ return
266+ }
267+
268+ /* stream changes in transactions made in redis */
269+ go m .listenForTransactionsChangesPending (ctx , conn , sessionID )
270+
271+ /* specify the handler context */
272+ ctxVal := context .WithValue (ctx , "type" , CtxStreamUserTransactionsPending )
209273
210274 /* handle web socket instructions from client */
211275 m .handleWebSocketCommands (conn , username , sessionID , ctxVal , cancel )
0 commit comments