1010import scouter .lang .value .Value ;
1111import scouter .server .ConfObserver ;
1212import scouter .server .Configure ;
13+ import scouter .server .CounterManager ;
1314import scouter .server .Logger ;
1415import scouter .server .core .AgentManager ;
1516import scouter .util .HashUtil ;
@@ -35,13 +36,15 @@ public class InfluxdbPlugin {
3536
3637 private static final String ext_plugin_influxdb_http_target_ip = "ext_plugin_influxdb_http_target_ip" ;
3738 private static final String ext_plugin_influxdb_http_target_port = "ext_plugin_influxdb_http_target_port" ;
39+ private static final String ext_plugin_influxdb_http_retention_policy = "ext_plugin_influxdb_http_retention_policy" ;
3840 private static final String ext_plugin_influxdb_id = "ext_plugin_influxdb_id" ;
3941 private static final String ext_plugin_influxdb_password = "ext_plugin_influxdb_password" ;
4042 private static final String ext_plugin_influxdb_dbName = "ext_plugin_influxdb_dbName" ;
4143
4244 private static final String tagObjName = "obj" ;
4345 private static final String tagTimeTypeName = "timeType" ;
4446 private static final String tagObjType = "objType" ;
47+ private static final String tagObjFamily = "objFamily" ;
4548
4649 boolean enabled = conf .getBoolean (ext_plugin_influxdb_enabled , true );
4750
@@ -57,14 +60,15 @@ public class InfluxdbPlugin {
5760
5861 String httpTargetIp = conf .getValue (ext_plugin_influxdb_http_target_ip , "127.0.0.1" );
5962 int httpTargetPort = conf .getInt (ext_plugin_influxdb_http_target_port , 8086 );
63+ String retentionPolicy = conf .getValue (ext_plugin_influxdb_http_retention_policy , "default" );
6064 String id = conf .getValue (ext_plugin_influxdb_id , "root" );
6165 String password = conf .getValue (ext_plugin_influxdb_password , "root" );
6266 String dbName = conf .getValue (ext_plugin_influxdb_dbName , "scouterCounter" );
6367
6468 InfluxDB influx = null ;
6569
6670 public InfluxdbPlugin () {
67- if (isUdp ) {
71+ if (isUdp ) {
6872 udpAgent = UdpAgent .getInstance ();
6973 udpAgent .setLocalAddr (udpLocalIp , udpLocalPort );
7074 udpAgent .setTarget (udpTargetIp , udpTargetPort );
@@ -79,9 +83,9 @@ public void run() {
7983 enabled = conf .getBoolean (ext_plugin_influxdb_enabled , true );
8084 measurementName = conf .getValue (ext_plugin_influxdb_measurement , "counter" );
8185 boolean isUdpNew = conf .getBoolean (ext_plugin_influxdb_udp , true );
82- if (isUdpNew != isUdp ) {
86+ if (isUdpNew != isUdp ) {
8387 isUdp = isUdpNew ;
84- if (isUdp ) {
88+ if (isUdp ) {
8589 udpAgent = UdpAgent .getInstance ();
8690 udpAgent .setLocalAddr (udpLocalIp , udpLocalPort );
8791 udpAgent .setTarget (udpTargetIp , udpTargetPort );
@@ -94,7 +98,7 @@ public void run() {
9498 //set udp local
9599 String newUdpLocalIp = conf .getValue (ext_plugin_influxdb_udp_local_ip );
96100 int newUdpLocalPort = conf .getInt (ext_plugin_influxdb_udp_local_port , 0 );
97- if (!newUdpLocalIp .equals (udpLocalIp ) || newUdpLocalPort != udpLocalPort ) {
101+ if (!newUdpLocalIp .equals (udpLocalIp ) || newUdpLocalPort != udpLocalPort ) {
98102 udpLocalIp = newUdpLocalIp ;
99103 udpLocalPort = newUdpLocalPort ;
100104 udpAgent .setLocalAddr (udpLocalIp , udpLocalPort );
@@ -103,7 +107,7 @@ public void run() {
103107 //set udp target
104108 String newUdpTargetIp = conf .getValue (ext_plugin_influxdb_udp_target_ip , "127.0.0.1" );
105109 int newUdpTargetPort = conf .getInt (ext_plugin_influxdb_udp_local_port , 8089 );
106- if (!newUdpTargetIp .equals (udpTargetIp ) || newUdpTargetPort != udpTargetPort ) {
110+ if (!newUdpTargetIp .equals (udpTargetIp ) || newUdpTargetPort != udpTargetPort ) {
107111 udpTargetIp = newUdpTargetIp ;
108112 udpTargetPort = newUdpTargetPort ;
109113 udpAgent .setTarget (udpTargetIp , udpTargetPort );
@@ -114,11 +118,14 @@ public void run() {
114118 int newHttpTargetPort = conf .getInt (ext_plugin_influxdb_http_target_port , 8086 );
115119 String newId = conf .getValue (ext_plugin_influxdb_id , "root" );
116120 String newPassword = conf .getValue (ext_plugin_influxdb_password , "root" );
121+ String newRetentionPolicy = conf .getValue (ext_plugin_influxdb_http_retention_policy , "default" );
117122
118- if (!newHttpTargetIp .equals (httpTargetIp ) || newHttpTargetPort != httpTargetPort
119- || !newId .equals (id ) || !newPassword .equals (password )) {
123+ if (!newHttpTargetIp .equals (httpTargetIp ) || newHttpTargetPort != httpTargetPort
124+ || !newId .equals (id ) || !newPassword .equals (password )
125+ || !newRetentionPolicy .equals (retentionPolicy )) {
120126 httpTargetIp = newHttpTargetIp ;
121127 httpTargetPort = newHttpTargetPort ;
128+ retentionPolicy = newRetentionPolicy ;
122129 id = newId ;
123130 password = newPassword ;
124131 influx = InfluxDBFactory .connect ("http//" + httpTargetIp + ":" + httpTargetPort , id , password );
@@ -130,18 +137,20 @@ public void run() {
130137
131138 @ ServerPlugin (PluginConstants .PLUGIN_SERVER_COUNTER )
132139 public void counter (final PerfCounterPack pack ) {
133- if (!enabled ) {
140+ if (!enabled ) {
134141 return ;
135142 }
136143
137144 try {
138145 String objName = pack .objName ;
139146 int objHash = HashUtil .hash (objName );
140147 String objType = AgentManager .getAgent (objHash ).objType ;
148+ String objFamily = CounterManager .getInstance ().getCounterEngine ().getObjectType (objType ).getFamily ().getName ();
141149 Point .Builder builder = Point .measurement (measurementName )
142150 .time (pack .time , TimeUnit .MILLISECONDS )
143151 .tag (tagObjName , objName )
144152 .tag (tagObjType , objType )
153+ .tag (tagObjFamily , objFamily )
145154 .tag (tagTimeTypeName , TimeTypeEnum .getString (pack .timetype ));
146155
147156 Map <String , Value > dataMap = pack .data .toMap ();
@@ -156,16 +165,16 @@ public void counter(final PerfCounterPack pack) {
156165 }
157166 Point point = builder .build ();
158167
159- if (isUdp ) {
168+ if (isUdp ) {
160169 String line = point .lineProtocol ();
161170 udpAgent .write (line );
162171 //System.out.println(line);
163172 } else { // http
164- influx .write (dbName , "default" , point );
173+ influx .write (dbName , retentionPolicy , point );
165174 }
166175
167176 } catch (Exception e ) {
168- if (conf ._trace ) {
177+ if (conf ._trace ) {
169178 Logger .printStackTrace ("IFLX001" , e );
170179 } else {
171180 Logger .println ("IFLX002" , e .getMessage ());
0 commit comments