Add integration with influx

This commit is contained in:
Vitaliy Skrypnyk
2020-07-28 19:28:25 +03:00
committed by Vitaliy Skrypnyk
parent 4dc7c9be5e
commit ab8426f893
3 changed files with 44 additions and 8 deletions

View File

@@ -1,6 +1,13 @@
{
"endpoint" : "http://192.168.1.1:79/rci",
"interval_sec" : 10,
"interval_sec" : 30,
"influxdb" : {
"host" : "",
"port" : 80,
"username" : "",
"password" : "",
"db" : ""
},
"metrics" : [
{
"command": "processes",
@@ -24,10 +31,10 @@
"ssid": "$.ssid",
"mode": "$.mode",
"ip": "$.ip",
"mac": "$.mac"
"mac": "$.mac",
"active": "$.active"
},
"values" : {
"active": "$.active",
"rxbytes": "$.rxbytes",
"txbytes": "$.txbytes",
"txrate": "$.txrate",

21
influxdb_writter.py Normal file
View File

@@ -0,0 +1,21 @@
import requests
from influxdb import InfluxDBClient
class InfuxWritter(object):
def __init__(self, configuration):
requests.packages.urllib3.disable_warnings()
self._configuration = configuration['influxdb']
self._client = InfluxDBClient(self._configuration['host'], self._configuration['port'], self._configuration['username'], self._configuration['password'], self._configuration['db'])
self.init_database()
def init_database(self):
db_name = self._configuration['db']
self._client.drop_database(db_name)
if db_name not in self._client.get_list_database():
print("Creating: " + db_name)
self._client.create_database(db_name)
def write_metrics(self, metrics):
self._client.write_points( metrics )

View File

@@ -4,6 +4,7 @@ import requests
from jsonpath_ng.ext import parse
from influxdb_writter import InfuxWritter
from value_normalizer import normalize_value
@@ -21,7 +22,8 @@ def json_path_init(paths):
class KeeneticCollector(object):
def __init__(self, endpoint, command, root, tags, values):
def __init__(self, infuxdb_writter, endpoint, command, root, tags, values):
self._influx = infuxdb_writter
self._endpoint = endpoint
self._command = command
self._root = parse(root)
@@ -34,6 +36,7 @@ class KeeneticCollector(object):
response = json.loads(requests.get(url).content.decode('UTF-8'))
roots = self._root.find(response)
metrics = []
for root in roots:
tags = {}
@@ -41,22 +44,25 @@ class KeeneticCollector(object):
for tagName, tagPath in self._tags.items():
if tagPath == '~':
tags[tagName] = root.path.fields[0]
tags[tagName] = str(root.path.fields[0])
else:
tags[tagName] = self.get_first_value(tagPath.find(root.value))
tags[tagName] = str(self.get_first_value(tagPath.find(root.value)))
for valueName, valuePath in self._values.items():
values[valueName] = normalize_value(self.get_first_value(valuePath.find(root.value)))
metric = self.create_metric(self._command, tags, values)
metrics.append(metric)
print(json.dumps(metric))
infuxdb_writter.write_metrics(metrics)
@staticmethod
def create_metric(measurement, tags, values):
return {
"measurement": measurement,
"tags": tags,
"time": int(time.time() * 1000.0),
"time": time.time_ns(),
"fields": values
}
@@ -75,8 +81,10 @@ if __name__ == '__main__':
collectors = []
infuxdb_writter = InfuxWritter(configuration)
for metric in metrics:
collectors.append(KeeneticCollector(endpoint, metric['command'], metric['root'], metric['tags'], metric['values']))
collectors.append(KeeneticCollector(infuxdb_writter, endpoint, metric['command'], metric['root'], metric['tags'], metric['values']))
while True:
for collector in collectors: collector.collect()