1414# limitations under the License.
1515#
1616
17+ # Gevent monkey patching - must be done before importing other modules
18+ import os
19+ if os .environ .get ('GUNICORN_WORKER_CLASS' ) == 'gevent' :
20+ from gevent import monkey
21+ monkey .patch_all ()
22+
23+ # Import gevent for greenlet spawning
24+ import gevent
25+ from gevent import spawn
26+ USE_GEVENT = True
27+ else :
28+ USE_GEVENT = False
29+
1730from api .utils .log_utils import initRootLogger
1831from plugin import GlobalPluginManager
1932
2033# Initialize logging first
2134initRootLogger ("ragflow_server" )
2235
2336import logging
24- import os
2537import signal
2638import threading
2739import uuid
4052from rag .settings import print_rag_settings
4153from rag .utils .redis_conn import RedisDistributedLock
4254
43- # Global stop event and executor for background tasks
44- stop_event = threading .Event ()
45- background_executor = None
55+ # Global variables for background tasks
56+ if USE_GEVENT :
57+ stop_event = None
58+ background_executor = None
59+ background_greenlet = None
60+ else :
61+ stop_event = threading .Event ()
62+ background_executor = None
63+ background_greenlet = None
4664
4765RAGFLOW_DEBUGPY_LISTEN = int (os .environ .get ('RAGFLOW_DEBUGPY_LISTEN' , "0" ))
4866
@@ -52,24 +70,50 @@ def update_progress():
5270 lock_value = str (uuid .uuid4 ())
5371 redis_lock = RedisDistributedLock ("update_progress" , lock_value = lock_value , timeout = 60 )
5472 logging .info (f"update_progress lock_value: { lock_value } " )
55- while not stop_event .is_set ():
56- try :
57- if redis_lock .acquire ():
58- DocumentService .update_progress ()
73+
74+ if USE_GEVENT :
75+ # Use gevent sleep and loop for greenlet compatibility
76+ while True :
77+ try :
78+ if redis_lock .acquire ():
79+ DocumentService .update_progress ()
80+ redis_lock .release ()
81+ gevent .sleep (6 ) # Use gevent.sleep instead of stop_event.wait
82+ except Exception :
83+ logging .exception ("update_progress exception" )
84+ redis_lock .release ()
85+ break
86+ else :
87+ # Traditional threading approach
88+ while not stop_event .is_set ():
89+ try :
90+ if redis_lock .acquire ():
91+ DocumentService .update_progress ()
92+ redis_lock .release ()
93+ stop_event .wait (6 )
94+ except Exception :
95+ logging .exception ("update_progress exception" )
96+ finally :
5997 redis_lock .release ()
60- stop_event .wait (6 )
61- except Exception :
62- logging .exception ("update_progress exception" )
63- finally :
64- redis_lock .release ()
6598
6699
67100def signal_handler (sig , frame ):
68101 """Handle shutdown signals gracefully"""
69- logging .info ("Received interrupt signal, shutting down..." )
70- stop_event .set ()
71- if background_executor :
72- background_executor .shutdown (wait = False )
102+ logging .info ("Received shutdown signal, stopping background tasks..." )
103+
104+ if USE_GEVENT :
105+ # Kill the background greenlet
106+ global background_greenlet
107+ if background_greenlet and not background_greenlet .dead :
108+ background_greenlet .kill ()
109+ else :
110+ # Traditional threading approach
111+ stop_event .set ()
112+ if hasattr (background_executor , 'shutdown' ):
113+ background_executor .shutdown (wait = False )
114+
115+ logging .info ("Background tasks stopped" )
116+ exit (0 )
73117
74118
75119def initialize_ragflow ():
@@ -113,8 +157,16 @@ def initialize_ragflow():
113157 signal .signal (signal .SIGTERM , signal_handler )
114158
115159 # Start background progress update task
116- background_executor = ThreadPoolExecutor (max_workers = 1 )
117- background_executor .submit (update_progress )
160+ if USE_GEVENT :
161+ # Use gevent spawn for greenlet-based execution
162+ global background_greenlet
163+ background_greenlet = spawn (update_progress )
164+ logging .info ("Started document progress update task in gevent mode" )
165+ else :
166+ # Use thread pool for traditional threading
167+ background_executor = ThreadPoolExecutor (max_workers = 1 )
168+ background_executor .submit (update_progress )
169+ logging .info ("Started document progress update task in threading mode" )
118170
119171 logging .info ("RAGFlow WSGI application initialized successfully in production mode" )
120172
0 commit comments