@@ -17,31 +17,158 @@ limitations under the License.
1717package main
1818
1919import (
20+ "context"
21+ "fmt"
22+ "net/http"
23+ "os"
24+ "os/signal"
25+ "sync"
26+ "syscall"
27+ "time"
28+
2029 "github.com/Project-HAMi/HAMi/pkg/monitor/nvidia"
30+ "github.com/Project-HAMi/HAMi/pkg/util"
31+ "github.com/Project-HAMi/HAMi/pkg/util/flag"
32+
33+ "github.com/prometheus/client_golang/prometheus"
34+ "github.com/prometheus/client_golang/prometheus/promhttp"
2135
36+ "github.com/NVIDIA/go-nvml/pkg/nvml"
37+ "github.com/spf13/cobra"
2238 "k8s.io/klog/v2"
2339)
2440
25- //var addr = flag.String("listen-address", ":9394", "The address to listen on for HTTP requests.")
26-
27- //const shared_directory = "/usr/local/vgpu/shared"
41+ var (
42+ rootCmd = & cobra.Command {
43+ Use : "vGPUmonitor" ,
44+ Short : "Hami vgpu vGPUmonitor" ,
45+ Run : func (cmd * cobra.Command , args []string ) {
46+ flag .PrintPFlags (cmd .Flags ())
47+ start ()
48+ },
49+ }
50+ )
2851
29- func main () {
52+ func init () {
53+ rootCmd .Flags ().SortFlags = false
54+ rootCmd .PersistentFlags ().SortFlags = false
55+ rootCmd .Flags ().AddGoFlagSet (util .InitKlogFlags ())
56+ }
3057
58+ func start () {
3159 if err := ValidateEnvVars (); err != nil {
3260 klog .Fatalf ("Failed to validate environment variables: %v" , err )
3361 }
62+
3463 containerLister , err := nvidia .NewContainerLister ()
3564 if err != nil {
3665 klog .Fatalf ("Failed to create container lister: %v" , err )
3766 }
38- cgroupDriver = 0
39- errchannel := make (chan error )
40- //go serveInfo(errchannel)
41- go initMetrics (containerLister )
42- go watchAndFeedback (containerLister )
67+
68+ cgroupDriver = 0 // Explicitly initialize
69+
70+ ctx , cancel := context .WithCancel (context .Background ())
71+ defer cancel ()
72+
73+ var wg sync.WaitGroup
74+ errCh := make (chan error , 2 )
75+
76+ // Start the metrics service
77+ wg .Add (1 )
78+ go func () {
79+ defer wg .Done ()
80+ if err := initMetrics (ctx , containerLister ); err != nil {
81+ errCh <- err
82+ }
83+ }()
84+
85+ // Start the monitoring and feedback service
86+ wg .Add (1 )
87+ go func () {
88+ defer wg .Done ()
89+ if err := watchAndFeedback (ctx , containerLister ); err != nil {
90+ errCh <- err
91+ }
92+ }()
93+
94+ // Capture system signals
95+ signalCh := make (chan os.Signal , 1 )
96+ signal .Notify (signalCh , syscall .SIGINT , syscall .SIGTERM )
97+
98+ select {
99+ case sig := <- signalCh :
100+ klog .Infof ("Received signal: %s" , sig )
101+ cancel ()
102+ case err := <- errCh :
103+ klog .Errorf ("Received error: %v" , err )
104+ cancel ()
105+ }
106+
107+ // Wait for all goroutines to complete
108+ wg .Wait ()
109+ close (errCh )
110+ }
111+
112+ func initMetrics (ctx context.Context , containerLister * nvidia.ContainerLister ) error {
113+ klog .V (4 ).Info ("Initializing metrics for vGPUmonitor" )
114+ reg := prometheus .NewRegistry ()
115+ //reg := prometheus.NewPedanticRegistry()
116+
117+ // Construct cluster managers. In real code, we would assign them to
118+ // variables to then do something with them.
119+ NewClusterManager ("vGPU" , reg , containerLister )
120+ //NewClusterManager("ca", reg)
121+
122+ // Uncomment to add the standard process and Go metrics to the custom registry.
123+ //reg.MustRegister(
124+ // prometheus.NewProcessCollector(prometheus.ProcessCollectorOpts{}),
125+ // prometheus.NewGoCollector(),
126+ //)
127+
128+ http .Handle ("/metrics" , promhttp .HandlerFor (reg , promhttp.HandlerOpts {}))
129+ server := & http.Server {Addr : ":9394" , Handler : nil }
130+
131+ // Starting the HTTP server in a goroutine
132+ go func () {
133+ if err := server .ListenAndServe (); err != nil && err != http .ErrServerClosed {
134+ klog .Errorf ("Failed to serve metrics: %v" , err )
135+ }
136+ }()
137+
138+ // Graceful shutdown on context cancellation
139+ <- ctx .Done ()
140+ klog .V (4 ).Info ("Shutting down metrics server" )
141+ if err := server .Shutdown (context .Background ()); err != nil {
142+ return err
143+ }
144+
145+ return nil
146+ }
147+
148+ func watchAndFeedback (ctx context.Context , lister * nvidia.ContainerLister ) error {
149+ if nvret := nvml .Init (); nvret != nvml .SUCCESS {
150+ return fmt .Errorf ("failed to initialize NVML: %s" , nvml .ErrorString (nvret ))
151+ }
152+ defer nvml .Shutdown ()
153+
43154 for {
44- err := <- errchannel
45- klog .Errorf ("failed to serve: %v" , err )
155+ select {
156+ case <- ctx .Done ():
157+ klog .Info ("Shutting down watchAndFeedback" )
158+ return nil
159+ case <- time .After (time .Second * 5 ):
160+ if err := lister .Update (); err != nil {
161+ klog .Errorf ("Failed to update container list: %v" , err )
162+ continue
163+ }
164+ //klog.Infof("WatchAndFeedback srPodList=%v", srPodList)
165+ Observe (lister )
166+ }
167+ }
168+ }
169+
170+ func main () {
171+ if err := rootCmd .Execute (); err != nil {
172+ klog .Fatal (err )
46173 }
47174}
0 commit comments