Add pylint configuration file and update docstrings

This commit is contained in:
Dmitrii Andreev
2024-01-07 16:29:30 +03:00
parent 5ce68e3bc5
commit cafb2fa516
9 changed files with 44 additions and 18 deletions

2
.pylintrc Normal file
View File

@@ -0,0 +1,2 @@
[MESSAGES CONTROL]
disable=line-too-long

View File

@@ -1,3 +1,5 @@
"""Module for loading configuration values from command line arguments, environment variables and defaults"""
import argparse
import os
from dotenv import load_dotenv
@@ -17,6 +19,7 @@ DEFAULTS = {
}
def parse_arguments():
"""Parse command line arguments"""
parser = argparse.ArgumentParser(description='Icecast Recorder Service')
parser.add_argument('--server-host', help='Server host name with protocol')
parser.add_argument('--server-port', type=int, help='Server port number')
@@ -29,6 +32,7 @@ def parse_arguments():
return vars(parser.parse_args())
def load_configuration():
"""Get values from command line arguments, environment variables and defaults"""
cmd_args = parse_arguments()
# Configuration is established using a priority: CommandLine > EnvironmentVars > Defaults

View File

@@ -1,3 +1,4 @@
"""This module contains the logger functions for the application"""
import json
import sys
from datetime import datetime
@@ -10,6 +11,7 @@ ERROR = "ERROR"
FATAL = "FATAL"
def log_event(event, details, level=INFO):
"""Log an event to stdout in JSON format"""
log_entry = {
"timestamp": datetime.utcnow().isoformat(),
"event": event,
@@ -22,16 +24,21 @@ def log_event(event, details, level=INFO):
# Specific log functions per level for convenience
def log_debug(event, details):
"""Log a debug event"""
log_event(event, details, level=DEBUG)
def log_info(event, details):
"""Log an info event"""
log_event(event, details, level=INFO)
def log_warning(event, details):
"""Log a warning event"""
log_event(event, details, level=WARNING)
def log_error(event, details):
"""Log an error event"""
log_event(event, details, level=ERROR)
def log_fatal(event, details):
"""Log a fatal event"""
log_event(event, details, level=FATAL)

View File

@@ -1,9 +1,11 @@
"""Main entry point for the Icecast stream checker and recorder"""
import asyncio
from server import start_server
from stream_checker import StreamChecker
from config import load_configuration
def main():
"""Main entry point for the Icecast stream checker and recorder"""
# Load configuration from command line arguments and environment variables
config = load_configuration()

View File

@@ -1,28 +1,30 @@
import aiohttp
"""Recorder class for recording a stream to a file"""
import os
from datetime import datetime, timedelta
from logger import log_event, log_error
import aiohttp
from logger import log_event
from utils import sanitize_filename
from pprint import pprint
class Recorder:
class Recorder: # pylint: disable=too-many-instance-attributes
"""Recorder class for recording a stream to a file"""
def __init__(self, stream_url, output_directory, timeout_connect=10, timeout_read=30):
self.stream_url = stream_url
self.output_directory = output_directory
self.timeout_read = timeout_read
self.timeout_connect = timeout_connect
self.file_name = None
self.file_path = os.path.join(self.output_directory, self.file_name)
self.start_time = None
self.last_data_time = None
self.is_recording = False
async def start_recording(self):
"""Start recording the stream to a file"""
self.start_time = datetime.utcnow()
domain = self.stream_url.split("//")[-1].split("/")[0]
sanitized_domain = sanitize_filename(domain)
date_str = self.start_time.strftime("%Y%m%d_%H%M%S")
self.file_name = f"{sanitized_domain}_{date_str}.mp3.tmp"
self.file_path = os.path.join(self.output_directory, self.file_name)
try:
timeout = aiohttp.ClientTimeout(total=None, connect=self.timeout_connect, sock_read=self.timeout_read)
async with aiohttp.ClientSession(timeout=timeout) as session:
@@ -38,7 +40,7 @@ class Recorder:
f.write(data)
# Check if timeout exceeded between data chunks
if datetime.utcnow() - self.last_data_time > timedelta(seconds=self.timeout_read):
log_error("timeout_exceeded", {
log_event("timeout_exceeded", {
"stream_url": self.stream_url,
"elapsed_seconds": (datetime.utcnow() - self.last_data_time).total_seconds()
}, level="WARNING")
@@ -47,14 +49,14 @@ class Recorder:
log_event("recording_finished", {"file_name": self.file_name, "stream_url": self.stream_url})
else:
log_event("stream_unavailable", {"http_status": response.status})
except Exception as e:
except Exception as e: # pylint: disable=broad-except
log_event('recording_error', {"error": str(e)}, level="ERROR")
pprint(e)
finally:
self.is_recording = False
self.end_recording()
def end_recording(self):
"""Rename the temporary file to a finished file"""
if os.path.exists(self.file_path):
finished_file = self.file_path.replace('.tmp', '')
os.rename(self.file_path, finished_file)
@@ -64,4 +66,5 @@ class Recorder:
})
def is_active(self):
"""Check if the recorder is currently recording a stream"""
return self.is_recording

View File

@@ -1,12 +1,11 @@
from yattag import Doc
"""Generates an RSS feed from the files in the output directory"""
import os
from utils import generate_file_hash, file_hash_to_id
from datetime import datetime
from yattag import Doc
import os
from utils import generate_file_hash, file_hash_to_id
def generate_rss_feed(files, output_directory, server_host):
"""Generates an RSS feed from the files in the output directory"""
doc, tag, text = Doc().tagtext()
doc.asis('<?xml version="1.0" encoding="UTF-8"?>')

View File

@@ -1,20 +1,23 @@
from aiohttp import web
"""Server module for the application"""
import os
import mimetypes
from rss_generator import generate_rss_feed
from logger import log_event
from pprint import pprint
from pathlib import Path
from aiohttp import web
from rss_generator import generate_rss_feed
from logger import log_event
routes = web.RouteTableDef()
@routes.get('/health')
async def helth_check(request):
"""Health check endpoint"""
log_event("health_check_requested", {"method": "GET", "path": request.path}, level="INFO")
return web.Response(text="OK")
@routes.get('/rss')
async def rss_feed(request):
"""RSS feed endpoint"""
log_event("rss_feed_requested", {"method": "GET", "path": request.path}, level="INFO")
output_directory = request.app['config'].output_directory
files = [f for f in os.listdir(output_directory) if f.endswith('.mp3')]
@@ -23,6 +26,7 @@ async def rss_feed(request):
@routes.get('/files/{file_name}')
async def serve_file(request):
"""File serving endpoint"""
file_name = request.match_info['file_name']
log_event("file_serve_requested", {"method": "GET", "path": request.path, "file_name": file_name}, level="INFO")
@@ -47,6 +51,7 @@ async def serve_file(request):
return web.FileResponse(file_path, headers=headers)
async def start_server(config):
"""Start the web server"""
app = web.Application()
app['config'] = config
app.add_routes(routes)

View File

@@ -1,11 +1,12 @@
"""Checking the stream status and starting the recorder"""
import asyncio
from pprint import pprint
from aiohttp import ClientSession, ClientTimeout
from recorder import Recorder
from logger import log_event
class StreamChecker:
def __init__(self, stream_url, check_interval, timeout_connect, output_directory, timeout_read=30):
"""Checking the stream status and starting the recorder"""
def __init__(self, stream_url, check_interval, timeout_connect, output_directory, timeout_read=30): # pylint: disable=too-many-arguments
self.stream_url = stream_url
self.check_interval = check_interval
self.timeout_connect = timeout_connect
@@ -15,6 +16,7 @@ class StreamChecker:
self.is_stream_live = False
async def check_stream(self, session):
"""Check if the stream is live and start the recorder if needed"""
try:
timeout = ClientTimeout(connect=self.timeout_connect)
async with session.get(self.stream_url, timeout=timeout, allow_redirects=True) as response:
@@ -26,11 +28,12 @@ class StreamChecker:
log_event("stream_offline", {"stream_url": self.stream_url})
except asyncio.TimeoutError:
log_event("check_stream_timeout", {"stream_url": self.stream_url})
except Exception as e:
except Exception as e: # pylint: disable=broad-except
print(self.stream_url)
log_event("check_stream_error", {"error": str(e)})
async def run(self):
"""Start the stream checking and recording loop"""
while True:
async with ClientSession() as session:
await self.check_stream(session)

View File

@@ -1,3 +1,4 @@
"""Utility functions for the application"""
import hashlib
import string
@@ -5,7 +6,7 @@ def sanitize_filename(filename):
"""
Sanitize the filename by removing or replacing invalid characters.
"""
valid_chars = "-_.() %s%s" % (string.ascii_letters, string.digits)
valid_chars = f"-_.() {string.ascii_letters}{string.digits}"
cleaned_filename = "".join(c for c in filename if c in valid_chars)
cleaned_filename = cleaned_filename.replace(' ', '_') # Replace spaces with underscores
return cleaned_filename