Migrate to json path

This commit is contained in:
Vitaliy Skrypnyk
2020-07-27 23:31:30 +03:00
committed by Vitaliy Skrypnyk
parent abc26e4875
commit cba96f79c3
3 changed files with 92 additions and 79 deletions

View File

@@ -4,11 +4,39 @@
"metrics" : [
{
"command": "processes",
"tags_level" : [ 1 ]
"root" : "$.process.*",
"tags" : {
"name": "~",
"process_name" : "$.name",
"pid": "$.pid"
},
"values" : {
"cpu": "$.statistics.cpu.cur",
"memory": "$.vm-size"
}
},
{
"command": "ip hotspot",
"tags_level" : [ ]
"root" : "$.host[*]",
"tags" : {
"hostname" : "$.hostname",
"name" : "$.name"
},
"values" : {
"active": "$.active"
}
},
{
"command": "ip nat",
"root" : "$.[*]",
"tags" : {
"src" : "$.src",
"dst" : "$.dst"
},
"values" : {
"bytes-in": "$.bytes",
"bytes-out": "$.bytes-out"
}
}
]
}

View File

@@ -2,84 +2,70 @@ import json
import time
import requests
from typing import Dict
from jsonpath_ng.ext import parse
from value_normalizer import normalize_data, isvalidmetric
from value_normalizer import normalize_value
def json_path_init(paths):
queries = {}
for pathName, path in paths.items():
if path == "~":
queries[pathName] = path
else:
queries[pathName] = parse(path)
return queries
class KeeneticCollector(object):
def __init__(self, endpoint, command, tags_levels):
def __init__(self, endpoint, command, root, tags, values):
self._endpoint = endpoint
self._command = command
self._tags_levels = tags_levels
self._root = parse(root)
self._tags = json_path_init(tags)
self._values = json_path_init(values)
def collect(self):
url = '{}/show/{}'.format(self._endpoint, self._command.replace(' ', '/'))
response = json.loads(requests.get(url).content.decode('UTF-8'))
prefix = self._command.split(' ')
metrics = self.recursive_iterate(response, prefix, [], {}, 0)
for metric in metrics:
roots = self._root.find(response)
for root in roots:
tags = {}
values = {}
for tagName, tagPath in self._tags.items():
if tagPath == '~':
tags[tagName] = root.path.fields[0]
else:
tags[tagName] = self.get_first_value(tagPath.find(root.value))
for valueName, valuePath in self._values.items():
values[valueName] = normalize_value(self.get_first_value(valuePath.find(root.value)))
metric = self.create_metric(self._command, tags, values)
print(json.dumps(metric))
def recursive_iterate(self, data, path, metrics, tags, level):
if isinstance(data, dict):
data = normalize_data(data)
tags = tags.copy()
tags.update(self.tags(data))
values = self.values(data)
if values.__len__() > 0:
metrics.append(self.create_metric(path, tags, values))
for key in data:
value = data.get(key)
if isinstance(value, list) or isinstance(value, dict):
key_path = path.copy()
if level in self._tags_levels: # Need for some API, like show processes
tags['_'.join(path)] = key
else:
key_path.append(key)
self.recursive_iterate(value, key_path, metrics, tags, level + 1)
if isinstance(data, list):
for idx, value in enumerate(data):
self.recursive_iterate(value, path, metrics, tags, level + 1)
return metrics
def create_metric(self, path, tags, values):
@staticmethod
def create_metric(measurement, tags, values):
return {
"measurement": '_'.join(path),
"measurement": measurement,
"tags": tags,
"time": int(time.time() * 1000.0),
"fields": values
}
def tags(self, data):
labels: Dict[str, str] = {}
for key in data:
value = data.get(key)
if isinstance(value, str): labels[key] = value
return labels
def values(self, data):
values = {}
for key in data:
value = data.get(key)
if isvalidmetric(value): values[key] = value
return values
@staticmethod
def get_first_value(array):
if array and len(array) > 0:
return array[0].value
else:
return None
if __name__ == '__main__':
@@ -90,7 +76,7 @@ if __name__ == '__main__':
collectors = []
for metric in metrics:
collectors.append( KeeneticCollector(endpoint, metric['command'], metric['tags_level']) )
collectors.append(KeeneticCollector(endpoint, metric['command'], metric['root'], metric['tags'], metric['values']))
while True:
for collector in collectors: collector.collect()

View File

@@ -2,33 +2,32 @@ import re
def isstring(value): return isinstance(value, str)
def isfloat(value: str): return (re.match(r'^-?\d+(?:\.\d+)?$', value) is not None)
def isinteger(value: str): return (re.match('^\d+$', value) is not None)
def isvalidmetric(value) : return isinstance(value, int) or isinstance(value, float) or isinstance(value, bool)
def normalize_data(data):
if isinstance(data, dict):
for key in data:
def normalize_value(value):
value = data.get(key)
if value is None:
return None
if isstring(value):
value = parse_string(value)
if isvalidmetric(value):
return value
else:
print("WARN Value: " + str(value) + " is not valid metric type")
return None
def parse_string(value):
value = remove_data_unit(value)
if isinteger(value):
data[key] = int(value)
continue
if isfloat(value):
data[key] = float(value)
continue
return data
value = int(value)
elif isfloat(value):
value = float(value)
return value
def remove_data_unit(value: str):