diff --git a/config.json b/config.json index b16162c..2bacf02 100644 --- a/config.json +++ b/config.json @@ -4,11 +4,39 @@ "metrics" : [ { "command": "processes", - "tags_level" : [ 1 ] + "root" : "$.process.*", + "tags" : { + "name": "~", + "process_name" : "$.name", + "pid": "$.pid" + }, + "values" : { + "cpu": "$.statistics.cpu.cur", + "memory": "$.vm-size" + } }, { "command": "ip hotspot", - "tags_level" : [ ] + "root" : "$.host[*]", + "tags" : { + "hostname" : "$.hostname", + "name" : "$.name" + }, + "values" : { + "active": "$.active" + } + }, + { + "command": "ip nat", + "root" : "$.[*]", + "tags" : { + "src" : "$.src", + "dst" : "$.dst" + }, + "values" : { + "bytes-in": "$.bytes", + "bytes-out": "$.bytes-out" + } } ] } \ No newline at end of file diff --git a/keentic_influxdb_exporter.py b/keentic_influxdb_exporter.py index f492a94..812c907 100644 --- a/keentic_influxdb_exporter.py +++ b/keentic_influxdb_exporter.py @@ -2,84 +2,70 @@ import json import time import requests -from typing import Dict +from jsonpath_ng.ext import parse -from value_normalizer import normalize_data, isvalidmetric +from value_normalizer import normalize_value + + +def json_path_init(paths): + queries = {} + + for pathName, path in paths.items(): + if path == "~": + queries[pathName] = path + else: + queries[pathName] = parse(path) + + return queries class KeeneticCollector(object): - def __init__(self, endpoint, command, tags_levels): + def __init__(self, endpoint, command, root, tags, values): self._endpoint = endpoint self._command = command - self._tags_levels = tags_levels + self._root = parse(root) + self._tags = json_path_init(tags) + self._values = json_path_init(values) def collect(self): url = '{}/show/{}'.format(self._endpoint, self._command.replace(' ', '/')) response = json.loads(requests.get(url).content.decode('UTF-8')) - prefix = self._command.split(' ') - metrics = self.recursive_iterate(response, prefix, [], {}, 0) - for metric in metrics: + roots = self._root.find(response) + + for root in roots: + tags = {} + values = {} + + for tagName, tagPath in self._tags.items(): + if tagPath == '~': + tags[tagName] = root.path.fields[0] + else: + tags[tagName] = self.get_first_value(tagPath.find(root.value)) + + for valueName, valuePath in self._values.items(): + values[valueName] = normalize_value(self.get_first_value(valuePath.find(root.value))) + + metric = self.create_metric(self._command, tags, values) print(json.dumps(metric)) - def recursive_iterate(self, data, path, metrics, tags, level): - - if isinstance(data, dict): - - data = normalize_data(data) - tags = tags.copy() - tags.update(self.tags(data)) - values = self.values(data) - - if values.__len__() > 0: - metrics.append(self.create_metric(path, tags, values)) - - for key in data: - value = data.get(key) - if isinstance(value, list) or isinstance(value, dict): - key_path = path.copy() - - if level in self._tags_levels: # Need for some API, like show processes - tags['_'.join(path)] = key - else: - key_path.append(key) - - self.recursive_iterate(value, key_path, metrics, tags, level + 1) - - if isinstance(data, list): - for idx, value in enumerate(data): - self.recursive_iterate(value, path, metrics, tags, level + 1) - - return metrics - - def create_metric(self, path, tags, values): + @staticmethod + def create_metric(measurement, tags, values): return { - "measurement": '_'.join(path), + "measurement": measurement, "tags": tags, "time": int(time.time() * 1000.0), "fields": values } - def tags(self, data): - labels: Dict[str, str] = {} - - for key in data: - value = data.get(key) - if isinstance(value, str): labels[key] = value - - return labels - - def values(self, data): - values = {} - - for key in data: - value = data.get(key) - - if isvalidmetric(value): values[key] = value - - return values + @staticmethod + def get_first_value(array): + if array and len(array) > 0: + return array[0].value + else: + return None if __name__ == '__main__': @@ -90,7 +76,7 @@ if __name__ == '__main__': collectors = [] for metric in metrics: - collectors.append( KeeneticCollector(endpoint, metric['command'], metric['tags_level']) ) + collectors.append(KeeneticCollector(endpoint, metric['command'], metric['root'], metric['tags'], metric['values'])) while True: for collector in collectors: collector.collect() diff --git a/value_normalizer.py b/value_normalizer.py index 2633f98..5aadeb1 100644 --- a/value_normalizer.py +++ b/value_normalizer.py @@ -2,33 +2,32 @@ import re def isstring(value): return isinstance(value, str) - - def isfloat(value: str): return (re.match(r'^-?\d+(?:\.\d+)?$', value) is not None) - - def isinteger(value: str): return (re.match('^\d+$', value) is not None) - - def isvalidmetric(value) : return isinstance(value, int) or isinstance(value, float) or isinstance(value, bool) -def normalize_data(data): - if isinstance(data, dict): - for key in data: +def normalize_value(value): - value = data.get(key) + if value is None: + return None - if isstring(value): - value = remove_data_unit(value) - if isinteger(value): - data[key] = int(value) - continue - if isfloat(value): - data[key] = float(value) - continue + if isstring(value): + value = parse_string(value) + + if isvalidmetric(value): + return value + else: + print("WARN Value: " + str(value) + " is not valid metric type") + return None - return data +def parse_string(value): + value = remove_data_unit(value) + if isinteger(value): + value = int(value) + elif isfloat(value): + value = float(value) + return value def remove_data_unit(value: str):