Using admin api, instead open port (#1)

* influxdb2 integration
* using python alpine base image
* add authorization, refactoring config
* logging instead print, keenetic api exception handling when try to collect metrics, a little pep8
This commit is contained in:
Sergei Samokhvalov
2021-03-24 13:44:34 +03:00
committed by GitHub
parent 7661b5819d
commit 16a37bf0d9
12 changed files with 196 additions and 97 deletions

View File

@@ -1,16 +1,20 @@
import configparser
import json
import logging
import os
import time
import urllib
from typing import Dict, List
import requests
from jsonpath_rw import parse
from influxdb_writter import InfuxWritter
from influxdb_writter import InfuxWriter
from keenetic_api import KeeneticClient, KeeneticApiException
from value_normalizer import normalize_value
logging.basicConfig(level='INFO', format='%(asctime)s - %(filename)s - %(levelname)s - %(message)s')
def json_path_init(paths):
def json_path_init(paths: Dict[str, str]):
queries = {}
for pathName, path in paths.items():
@@ -24,20 +28,21 @@ def json_path_init(paths):
class KeeneticCollector(object):
def __init__(self, infuxdb_writter, endpoint, metric_configration):
self._influx = infuxdb_writter
self._endpoint = endpoint
self._command = metric_configration['command']
self._params = metric_configration.get('param', {})
self._root = parse(metric_configration['root'])
self._tags = json_path_init(metric_configration['tags'])
self._values = json_path_init(metric_configration['values'])
def __init__(self, keenetic_client: KeeneticClient, metric_configuration: Dict[str, object]):
self._keenetic_client = keenetic_client
self._command: str = metric_configuration['command']
self._params = metric_configuration.get('param', {})
self._root = parse(metric_configuration['root'])
self._tags = json_path_init(metric_configuration['tags'])
self._values = json_path_init(metric_configuration['values'])
def collect(self):
url = '{}/show/{}'.format(self._endpoint, self._command.replace(' ', '/')) + "?" + urllib.parse.urlencode(
self._params)
response = json.loads(requests.get(url).content.decode('UTF-8'))
def collect(self) -> List[dict]:
try:
response = self._keenetic_client.metric(self._command, self._params)
except KeeneticApiException as e:
logging.warning(f"Skipping metric '{self._command}' collection. Reason keenetic api exception, "
f"status: {e.status_code}, response: {e.response_text}")
return []
roots = self._root.find(response)
metrics = []
@@ -55,9 +60,11 @@ class KeeneticCollector(object):
for valueName, valuePath in self._values.items():
value = self.get_first_value(valuePath.find(root.value))
if value is not None: values[valueName] = normalize_value(value)
if value is not None:
values[valueName] = normalize_value(value)
if values.__len__() == 0: continue
if values.__len__() == 0:
continue
metric = self.create_metric(self._command, tags, values)
# print(json.dumps(metric))
@@ -66,7 +73,7 @@ class KeeneticCollector(object):
metrics.append(
self.create_metric("collector", {"command": self._command}, {"duration": (time.time_ns() - start_time)}))
infuxdb_writter.write_metrics(metrics)
return metrics
@staticmethod
def create_metric(measurement, tags, values):
@@ -86,28 +93,31 @@ class KeeneticCollector(object):
if __name__ == '__main__':
logging.info("\n\n" +
" _ __ _ _ _____ _ _ _ \n | |/ / | | (_) / ____| | | | | | \n | ' / ___ ___ _ __ ___| |_ _ ___ | | ___ | | | ___ ___| |_ ___ _ __ \n | < / _ \/ _ \ '_ \ / _ \ __| |/ __| | | / _ \| | |/ _ \/ __| __/ _ \| '__|\n | . \ __/ __/ | | | __/ |_| | (__ | |___| (_) | | | __/ (__| || (_) | | \n |_|\_\___|\___|_| |_|\___|\__|_|\___| \_____\___/|_|_|\___|\___|\__\___/|_| \n\n")
pwd = os.path.dirname(os.path.realpath(__file__))
metrics_configuration = json.load(open(pwd + "/config/metrics.json", "r"))
print(
" _ __ _ _ _____ _ _ _ \n | |/ / | | (_) / ____| | | | | | \n | ' / ___ ___ _ __ ___| |_ _ ___ | | ___ | | | ___ ___| |_ ___ _ __ \n | < / _ \/ _ \ '_ \ / _ \ __| |/ __| | | / _ \| | |/ _ \/ __| __/ _ \| '__|\n | . \ __/ __/ | | | __/ |_| | (__ | |___| (_) | | | __/ (__| || (_) | | \n |_|\_\___|\___|_| |_|\___|\__|_|\___| \_____\___/|_|_|\___|\___|\__\___/|_| \n \n ")
metrics_configuration = json.load(open(os.path.dirname(os.path.realpath(__file__)) + "/config/metrics.json", "r"))
influx_configuration = json.load(open(os.path.dirname(os.path.realpath(__file__)) + "/config/influx.json", "r"))
endpoint = metrics_configuration['endpoint']
metrics = metrics_configuration['metrics']
config = configparser.ConfigParser(interpolation=None)
config.read(pwd + "/config/config.ini")
infuxdb_writer = InfuxWriter(config['influxdb'])
keenetic_config = config['keenetic']
logging.info("Connecting to router: " + keenetic_config['admin_endpoint'])
collectors = []
with KeeneticClient(keenetic_config['admin_endpoint'], keenetic_config.getboolean('skip_auth'),
keenetic_config['login'], keenetic_config['password']) as kc:
for metric_configuration in metrics:
logging.info("Configuring metric: " + metric_configuration['command'])
collectors.append(KeeneticCollector(kc, metric_configuration))
infuxdb_writter = InfuxWritter(influx_configuration)
print("Connecting to router: " + endpoint)
for metric_configuration in metrics:
print("Configuring metric: " + metric_configuration['command'])
collectors.append(KeeneticCollector(infuxdb_writter, endpoint, metric_configuration))
print("Configuration done. Start collecting with interval: " + str(metrics_configuration['interval_sec']) + " sec")
while True:
for collector in collectors: collector.collect()
time.sleep(metrics_configuration['interval_sec'])
wait_interval = config['collector'].getint('interval_sec')
logging.info(f"Configuration done. Start collecting with interval: {wait_interval} sec")
while True:
for collector in collectors:
metrics = collector.collect()
infuxdb_writer.write_metrics(metrics)
time.sleep(wait_interval)