11package scouter .plugin .server .influxdb ;
22
3+ import org .influxdb .InfluxDB ;
4+ import org .influxdb .InfluxDBFactory ;
35import org .influxdb .dto .Point ;
46import scouter .lang .TimeTypeEnum ;
57import scouter .lang .pack .PerfCounterPack ;
68import scouter .lang .plugin .PluginConstants ;
79import scouter .lang .plugin .annotation .ServerPlugin ;
810import scouter .lang .value .Value ;
11+ import scouter .server .ConfObserver ;
912import scouter .server .Configure ;
13+ import scouter .server .Logger ;
14+ import scouter .server .core .AgentManager ;
15+ import scouter .util .HashUtil ;
1016
1117import java .util .Map ;
1218import java .util .concurrent .TimeUnit ;
1723public class InfluxdbPlugin {
1824 Configure conf = Configure .getInstance ();
1925
20- final String confMeasurement = "ext_plugin_influxdb_measurement" ;
21- final String confTagObj = "ext_plugin_influxdb_tag_obj" ;
22- final String confTagTimeType = "ext_plugin_influxdb_tag_timetype" ;
26+ private static final String ext_plugin_influxdb_enabled = "ext_plugin_influxdb_enabled" ;
27+ private static final String ext_plugin_influxdb_measurement = "ext_plugin_influxdb_measurement" ;
2328
24- final String confMeasurementDefault = "counter" ;
25- final String confTagObjDefault = "obj" ;
26- final String confTagTimeTypeDefault = "timetype" ;
29+ private static final String ext_plugin_influxdb_udp = "ext_plugin_influxdb_udp" ;
30+ private static final String ext_plugin_influxdb_udp_local_ip = "ext_plugin_influxdb_udp_local_ip" ;
31+ private static final String ext_plugin_influxdb_udp_local_port = "ext_plugin_influxdb_udp_local_port" ;
32+
33+ private static final String ext_plugin_influxdb_udp_target_ip = "ext_plugin_influxdb_udp_target_ip" ;
34+ private static final String ext_plugin_influxdb_udp_target_port = "ext_plugin_influxdb_udp_target_port" ;
35+
36+ private static final String ext_plugin_influxdb_http_target_ip = "ext_plugin_influxdb_http_target_ip" ;
37+ private static final String ext_plugin_influxdb_http_target_port = "ext_plugin_influxdb_http_target_port" ;
38+ private static final String ext_plugin_influxdb_id = "ext_plugin_influxdb_id" ;
39+ private static final String ext_plugin_influxdb_password = "ext_plugin_influxdb_password" ;
40+ private static final String ext_plugin_influxdb_dbName = "ext_plugin_influxdb_dbName" ;
41+
42+ private static final String tagObjName = "obj" ;
43+ private static final String tagTimeTypeName = "timeType" ;
44+ private static final String tagObjType = "objType" ;
45+
46+ boolean enabled = conf .getBoolean (ext_plugin_influxdb_enabled , true );
47+
48+ private String measurementName = conf .getValue (ext_plugin_influxdb_measurement , "counter" );
49+
50+ boolean isUdp = conf .getBoolean (ext_plugin_influxdb_udp , true );
51+ String udpLocalIp = conf .getValue (ext_plugin_influxdb_udp_local_ip );
52+ int udpLocalPort = conf .getInt (ext_plugin_influxdb_udp_local_port , 0 );
53+ String udpTargetIp = conf .getValue (ext_plugin_influxdb_udp_target_ip , "127.0.0.1" );
54+ int udpTargetPort = conf .getInt (ext_plugin_influxdb_udp_target_port , 8089 );
55+
56+ UdpAgent udpAgent = null ;
57+
58+ String httpTargetIp = conf .getValue (ext_plugin_influxdb_http_target_ip , "127.0.0.1" );
59+ int httpTargetPort = conf .getInt (ext_plugin_influxdb_http_target_port , 8086 );
60+ String id = conf .getValue (ext_plugin_influxdb_id , "root" );
61+ String password = conf .getValue (ext_plugin_influxdb_password , "root" );
62+ String dbName = conf .getValue (ext_plugin_influxdb_dbName , "scouterCounter" );
63+
64+ InfluxDB influx = null ;
65+
66+ public InfluxdbPlugin () {
67+ if (isUdp ) {
68+ udpAgent = UdpAgent .getInstance ();
69+ udpAgent .setLocalAddr (udpLocalIp , udpLocalPort );
70+ udpAgent .setTarget (udpTargetIp , udpTargetPort );
71+ } else {
72+ influx = InfluxDBFactory .connect ("http//" + httpTargetIp + ":" + httpTargetPort , id , password );
73+ influx .enableBatch (200 , 200 , TimeUnit .MILLISECONDS );
74+ influx .createDatabase (dbName );
75+ }
76+
77+ ConfObserver .put ("InfluxdbPlugin" , new Runnable () {
78+ public void run () {
79+ enabled = conf .getBoolean (ext_plugin_influxdb_enabled , true );
80+ measurementName = conf .getValue (ext_plugin_influxdb_measurement , "counter" );
81+ boolean isUdpNew = conf .getBoolean (ext_plugin_influxdb_udp , true );
82+ if (isUdpNew != isUdp ) {
83+ isUdp = isUdpNew ;
84+ if (isUdp ) {
85+ udpAgent = UdpAgent .getInstance ();
86+ udpAgent .setLocalAddr (udpLocalIp , udpLocalPort );
87+ udpAgent .setTarget (udpTargetIp , udpTargetPort );
88+ } else {
89+ influx = InfluxDBFactory .connect ("http//" + httpTargetIp + ":" + httpTargetPort , id , password );
90+ influx .createDatabase (dbName );
91+ }
92+ }
93+
94+ //set udp local
95+ String newUdpLocalIp = conf .getValue (ext_plugin_influxdb_udp_local_ip );
96+ int newUdpLocalPort = conf .getInt (ext_plugin_influxdb_udp_local_port , 0 );
97+ if (!newUdpLocalIp .equals (udpLocalIp ) || newUdpLocalPort != udpLocalPort ) {
98+ udpLocalIp = newUdpLocalIp ;
99+ udpLocalPort = newUdpLocalPort ;
100+ udpAgent .setLocalAddr (udpLocalIp , udpLocalPort );
101+ }
102+
103+ //set udp target
104+ String newUdpTargetIp = conf .getValue (ext_plugin_influxdb_udp_target_ip , "127.0.0.1" );
105+ int newUdpTargetPort = conf .getInt (ext_plugin_influxdb_udp_local_port , 8089 );
106+ if (!newUdpTargetIp .equals (udpTargetIp ) || newUdpTargetPort != udpTargetPort ) {
107+ udpTargetIp = newUdpTargetIp ;
108+ udpTargetPort = newUdpTargetPort ;
109+ udpAgent .setTarget (udpTargetIp , udpTargetPort );
110+ }
111+
112+ //set http target
113+ String newHttpTargetIp = conf .getValue (ext_plugin_influxdb_http_target_ip , "127.0.0.1" );
114+ int newHttpTargetPort = conf .getInt (ext_plugin_influxdb_http_target_port , 8086 );
115+ String newId = conf .getValue (ext_plugin_influxdb_id , "root" );
116+ String newPassword = conf .getValue (ext_plugin_influxdb_password , "root" );
117+
118+ if (!newHttpTargetIp .equals (httpTargetIp ) || newHttpTargetPort != httpTargetPort
119+ || !newId .equals (id ) || !newPassword .equals (password )) {
120+ httpTargetIp = newHttpTargetIp ;
121+ httpTargetPort = newHttpTargetPort ;
122+ id = newId ;
123+ password = newPassword ;
124+ influx = InfluxDBFactory .connect ("http//" + httpTargetIp + ":" + httpTargetPort , id , password );
125+ }
126+
127+ }
128+ });
129+ }
27130
28131 @ ServerPlugin (PluginConstants .PLUGIN_SERVER_COUNTER )
29- public void counter (PerfCounterPack pack ) {
30- String measurementName = conf . getValue ( confMeasurement , confMeasurementDefault );
31- String objTagName = conf . getValue ( confTagObj , confTagObjDefault ) ;
32- String timeTypeTagName = conf . getValue ( confTagTimeType , confTagTimeTypeDefault );
132+ public void counter (final PerfCounterPack pack ) {
133+ if (! enabled ) {
134+ return ;
135+ }
33136
34137 try {
138+ String objName = pack .objName ;
139+ int objHash = HashUtil .hash (objName );
140+ String objType = AgentManager .getAgent (objHash ).objType ;
35141 Point .Builder builder = Point .measurement (measurementName )
36142 .time (pack .time , TimeUnit .MILLISECONDS )
37- .tag (objTagName , pack .objName )
38- .tag (timeTypeTagName , TimeTypeEnum .getString (pack .timetype ));
143+ .tag (tagObjName , objName )
144+ .tag (tagObjType , objType )
145+ .tag (tagTimeTypeName , TimeTypeEnum .getString (pack .timetype ));
39146
40147 Map <String , Value > dataMap = pack .data .toMap ();
41148 for (Map .Entry <String , Value > field : dataMap .entrySet ()) {
@@ -47,14 +154,22 @@ public void counter(PerfCounterPack pack) {
47154 String key = field .getKey ();
48155 builder .field (key , value );
49156 }
50-
51157 Point point = builder .build ();
52- String line = point .lineProtocol ();
53158
54- System .out .println (line );
159+ if (isUdp ) {
160+ String line = point .lineProtocol ();
161+ udpAgent .write (line );
162+ //System.out.println(line);
163+ } else { // http
164+ influx .write (dbName , "default" , point );
165+ }
55166
56167 } catch (Exception e ) {
57- e .getMessage ();
168+ if (conf ._trace ) {
169+ Logger .printStackTrace ("IFLX001" , e );
170+ } else {
171+ Logger .println ("IFLX002" , e .getMessage ());
172+ }
58173 }
59174 }
60175}
0 commit comments