diff --git a/.env.example b/.env.example new file mode 100644 index 0000000..eaeef99 --- /dev/null +++ b/.env.example @@ -0,0 +1,6 @@ +STREAM_URL=http://example.com/stream +CHECK_INTERVAL=60 +RECORD_DIRECTORY=/records +CONNECT_TIMEOUT=10 +FIRST_BYTE_TIMEOUT=30 +WEB_SERVER_PORT=8080 diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..df29777 --- /dev/null +++ b/.gitignore @@ -0,0 +1,19 @@ +records + +# Byte-compiled / optimized / DLL files +__pycache__/ +*.py[cod] +*$py.class + +# C extensions +*.so +# Environments +.env +.venv +env/ +venv/ +ENV/ +env.bak/ +venv.bak/ + +*.swp diff --git a/Dockerfile b/Dockerfile new file mode 100644 index 0000000..6457c5f --- /dev/null +++ b/Dockerfile @@ -0,0 +1,20 @@ +# Use an official Python runtime as the parent image +FROM python:3.11-slim + +# Set the working directory in the container +WORKDIR /app + +# Copy the current directory contents into the container at the working directory +COPY . . + +# Install any needed packages specified in requirements.txt +RUN pip install --no-cache-dir -r requirements.txt + +# Make port 8080 available to the world outside this container +EXPOSE 8080 + +# Define environment variable for record directory +ENV RECORD_DIRECTORY=/records + +# Run main.py when the container launches +CMD [ "python", "./src/main.py" ] diff --git a/Makefile b/Makefile new file mode 100644 index 0000000..46e3c48 --- /dev/null +++ b/Makefile @@ -0,0 +1,22 @@ +run: + @echo "Starting Icecast Recorder Service" + python src/main.py + +test: + @echo "Running tests" + python -m unittest discover -s tests/ + +build: + @echo "Building Docker image for Icecast Recorder Service" + docker build -t icecast-recorder . + +docker-run: build + @echo "Running Icecast Recorder Service in a Docker container" + docker run -p 8080:8080 --env-file .env.example icecast-recorder + +clean: + @echo "Cleaning up pycache and .pyc files" + find . -type d -name pycache -exec rm -r {} + + find . -type f -name '*.pyc' -delete + +.PHONY: run test build docker-run clean diff --git a/docker-compose.yml b/docker-compose.yml new file mode 100644 index 0000000..a63df8a --- /dev/null +++ b/docker-compose.yml @@ -0,0 +1,16 @@ +version: '3' + +services: + icecast-recorder: + build: . + ports: + - "8080:8080" + environment: + - STREAM_URL=http://example.com/stream + - CHECK_INTERVAL=60 + - RECORD_DIRECTORY=/records + - CONNECT_TIMEOUT=10 + - FIRST_BYTE_TIMEOUT=30 + - WEB_SERVER_PORT=8080 + volumes: + - ./records:/records diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..1bdfa2f --- /dev/null +++ b/requirements.txt @@ -0,0 +1,3 @@ +aiohttp==3.9.1 +yattag==1.15.2 +python-dotenv==1.0.0 diff --git a/src/config.py b/src/config.py new file mode 100644 index 0000000..4762f53 --- /dev/null +++ b/src/config.py @@ -0,0 +1,48 @@ +import argparse +import os +from dotenv import load_dotenv + +# Load .env file if available +load_dotenv() + +# Default configuration values +DEFAULTS = { + 'server_host': 'https://example.org', + 'server_port': 8080, + 'stream_url': 'http://example.com/stream', + 'output_directory': './records', + 'check_interval': 60, + 'timeout_connect': 10, + 'timeout_read': 30, +} + +def parse_arguments(): + parser = argparse.ArgumentParser(description='Icecast Recorder Service') + parser.add_argument('--server-host', help='Server host name with protocol') + parser.add_argument('--server-port', type=int, help='Server port number') + parser.add_argument('--stream-url', help='URL of the Icecast stream to monitor and record') + parser.add_argument('--file-url-base', help='Base URL used for constructing file links in the RSS feed') + parser.add_argument('--output-directory', help='Directory to save the recordings') + parser.add_argument('--check-interval', type=int, help='Interval to check the stream in seconds') + parser.add_argument('--timeout-connect', type=int, help='Timeout for connecting to the stream in seconds') + parser.add_argument('--timeout-read', type=int, help='Read timeout in seconds') + return vars(parser.parse_args()) + +def load_configuration(): + cmd_args = parse_arguments() + + # Configuration is established using a priority: CommandLine > EnvironmentVars > Defaults + config = { + 'server_host': cmd_args['server_host'] or os.getenv('SERVER_HOST') or DEFAULTS['server_host'], + 'server_port': cmd_args['server_port'] or os.getenv('SERVER_PORT') or DEFAULTS['server_port'], + 'stream_url': cmd_args['stream_url'] or os.getenv('STREAM_URL') or DEFAULTS['stream_url'], + 'output_directory': cmd_args['output_directory'] or os.getenv('OUTPUT_DIRECTORY') or DEFAULTS['output_directory'], + 'check_interval': cmd_args['check_interval'] or os.getenv('CHECK_INTERVAL') or DEFAULTS['check_interval'], + 'timeout_connect': cmd_args['timeout_connect'] or os.getenv('TIMEOUT_CONNECT') or DEFAULTS['timeout_connect'], + 'timeout_read': cmd_args['timeout_read'] or os.getenv('TIMEOUT_READ') or DEFAULTS['timeout_read'] + } + + # Converting string paths to absolute paths + config['output_directory'] = os.path.abspath(config['output_directory']) + + return argparse.Namespace(**config) diff --git a/src/logger.py b/src/logger.py new file mode 100644 index 0000000..c99b3a5 --- /dev/null +++ b/src/logger.py @@ -0,0 +1,37 @@ +import json +import sys +from datetime import datetime + +# Log levels +DEBUG = "DEBUG" +INFO = "INFO" +WARNING = "WARNING" +ERROR = "ERROR" +FATAL = "FATAL" + +def log_event(event, details, level=INFO): + log_entry = { + "timestamp": datetime.utcnow().isoformat(), + "event": event, + "level": level, + "details": details + } + json_log_entry = json.dumps(log_entry) + print(json_log_entry, file=sys.stdout) # Write to stdout + sys.stdout.flush() # Immediately flush the log entry + +# Specific log functions per level for convenience +def log_debug(event, details): + log_event(event, details, level=DEBUG) + +def log_info(event, details): + log_event(event, details, level=INFO) + +def log_warning(event, details): + log_event(event, details, level=WARNING) + +def log_error(event, details): + log_event(event, details, level=ERROR) + +def log_fatal(event, details): + log_event(event, details, level=FATAL) diff --git a/src/main.py b/src/main.py new file mode 100644 index 0000000..05518aa --- /dev/null +++ b/src/main.py @@ -0,0 +1,37 @@ +import asyncio +from server import start_server +from stream_checker import StreamChecker +from config import load_configuration + +def main(): + # Load configuration from command line arguments and environment variables + config = load_configuration() + + # Create the StreamChecker instance + checker = StreamChecker( + stream_url=config.stream_url, + check_interval=config.check_interval, + timeout_connect=config.timeout_connect, + timeout_read=config.timeout_read, + output_directory=config.output_directory + ) + + # Start the Icecast stream checking and recording loop + checker_task = asyncio.ensure_future(checker.run()) + + # Start the health check and file serving server + server_task = asyncio.ensure_future(start_server(config)) + + # Run both tasks in the event loop + loop = asyncio.get_event_loop() + try: + loop.run_until_complete(asyncio.gather(checker_task, server_task)) + except KeyboardInterrupt: + pass + finally: + checker_task.cancel() + server_task.cancel() + loop.close() + +if __name__ == "__main__": + main() diff --git a/src/recorder.py b/src/recorder.py new file mode 100644 index 0000000..efcf83e --- /dev/null +++ b/src/recorder.py @@ -0,0 +1,67 @@ +import aiohttp +import os +from datetime import datetime, timedelta +from logger import log_event, log_error +from utils import sanitize_filename +from pprint import pprint + +class Recorder: + def __init__(self, stream_url, output_directory, timeout_connect=10, timeout_read=30): + self.stream_url = stream_url + self.output_directory = output_directory + self.timeout_read = timeout_read + self.timeout_connect = timeout_connect + self.file_name = None + self.start_time = None + self.last_data_time = None + self.is_recording = False + + async def start_recording(self): + self.start_time = datetime.utcnow() + domain = self.stream_url.split("//")[-1].split("/")[0] + sanitized_domain = sanitize_filename(domain) + date_str = self.start_time.strftime("%Y%m%d_%H%M%S") + self.file_name = f"{sanitized_domain}_{date_str}.mp3.tmp" + self.file_path = os.path.join(self.output_directory, self.file_name) + try: + timeout = aiohttp.ClientTimeout(total=None, connect=self.timeout_connect, sock_read=self.timeout_read) + async with aiohttp.ClientSession(timeout=timeout) as session: + async with session.get(self.stream_url) as response: + if response.status == 200: + self.is_recording = True + log_event("recording_started", {"file_name": self.file_name, "stream_url": self.stream_url}) + async for data, _ in response.content.iter_chunks(): + if not data: + break + self.last_data_time = datetime.utcnow() + with open(self.file_path, 'ab') as f: + f.write(data) + # Check if timeout exceeded between data chunks + if datetime.utcnow() - self.last_data_time > timedelta(seconds=self.timeout_read): + log_error("timeout_exceeded", { + "stream_url": self.stream_url, + "elapsed_seconds": (datetime.utcnow() - self.last_data_time).total_seconds() + }, level="WARNING") + break + + log_event("recording_finished", {"file_name": self.file_name, "stream_url": self.stream_url}) + else: + log_event("stream_unavailable", {"http_status": response.status}) + except Exception as e: + log_event('recording_error', {"error": str(e)}, level="ERROR") + pprint(e) + finally: + self.is_recording = False + self.end_recording() + + def end_recording(self): + if os.path.exists(self.file_path): + finished_file = self.file_path.replace('.tmp', '') + os.rename(self.file_path, finished_file) + log_event("recording_saved", { + "file_name": finished_file, + "duration": (datetime.utcnow() - self.start_time).total_seconds() if self.start_time else 0 + }) + + def is_active(self): + return self.is_recording diff --git a/src/rss_generator.py b/src/rss_generator.py new file mode 100644 index 0000000..729310f --- /dev/null +++ b/src/rss_generator.py @@ -0,0 +1,38 @@ +from yattag import Doc +import os +from utils import generate_file_hash, file_hash_to_id +from datetime import datetime +from yattag import Doc +import os +from utils import generate_file_hash, file_hash_to_id + +def generate_rss_feed(files, output_directory, server_host): + doc, tag, text = Doc().tagtext() + + doc.asis('') + with tag('rss', version='2.0'): + with tag('channel'): + with tag('title'): + text('Icecast Stream Recordings') + with tag('description'): + text('The latest recordings from the Icecast server.') + with tag('link'): + text(server_host) + + for file_name in files: + file_path = os.path.join(output_directory, file_name) + file_hash = generate_file_hash(file_path) + file_id = file_hash_to_id(file_hash) + + with tag('item'): + with tag('title'): + text(file_name) + with tag('link'): + text(f'{server_host}/files/{file_name}') + with tag('guid', isPermaLink='false'): + text(file_id) + with tag('pubDate'): + pub_date = datetime.utcfromtimestamp(os.path.getctime(file_path)).strftime('%a, %d %b %Y %H:%M:%S UTC') + text(pub_date) + + return doc.getvalue() diff --git a/src/server.py b/src/server.py new file mode 100644 index 0000000..41ea078 --- /dev/null +++ b/src/server.py @@ -0,0 +1,59 @@ +from aiohttp import web +import os +import mimetypes +from rss_generator import generate_rss_feed +from logger import log_event +from pprint import pprint +from pathlib import Path + +routes = web.RouteTableDef() + +@routes.get('/health') +async def helth_check(request): + log_event("health_check_requested", {"method": "GET", "path": request.path}, level="INFO") + return web.Response(text="OK") + +@routes.get('/rss') +async def rss_feed(request): + log_event("rss_feed_requested", {"method": "GET", "path": request.path}, level="INFO") + output_directory = request.app['config'].output_directory + files = [f for f in os.listdir(output_directory) if f.endswith('.mp3')] + rss_xml = generate_rss_feed(files, output_directory, request.app['config'].server_host) + return web.Response(text=rss_xml, content_type='application/rss+xml') + +@routes.get('/files/{file_name}') +async def serve_file(request): + file_name = request.match_info['file_name'] + log_event("file_serve_requested", {"method": "GET", "path": request.path, "file_name": file_name}, level="INFO") + + output_directory = request.app['config'].output_directory + file_path = os.path.join(output_directory, file_name) + pprint(file_path) + + if not Path(output_directory).joinpath(file_name).resolve().relative_to(Path(output_directory).resolve()): + log_event("file_access_denied", {"file_name": file_name}, level="WARNING") + return web.Response(status=403, text='Access denied') + + if not os.path.exists(file_path): + log_event("file_not_found", {"file_name": file_name}, level="WARNING") + return web.Response(status=404, text='File not found') + + file = os.path.basename(file_path) + content_type, _ = mimetypes.guess_type(file) + + headers = { + 'Content-Disposition': f'attachment; filename="{file}"', + 'Content-Type': content_type or 'application/octet-stream', + } + return web.FileResponse(file_path, headers=headers) + +async def start_server(config): + app = web.Application() + app['config'] = config + app.add_routes(routes) + runner = web.AppRunner(app) + await runner.setup() + site = web.TCPSite(runner, '0.0.0.0', config.server_port) + log_event('server_starting', {'port': config.server_port}, level="INFO") + await site.start() + log_event('server_started', {'port': config.server_port}, level="INFO") diff --git a/src/stream_checker.py b/src/stream_checker.py new file mode 100644 index 0000000..e8a85f3 --- /dev/null +++ b/src/stream_checker.py @@ -0,0 +1,42 @@ +import asyncio +from pprint import pprint +from aiohttp import ClientSession, ClientTimeout +from recorder import Recorder +from logger import log_event + +class StreamChecker: + def __init__(self, stream_url, check_interval, timeout_connect, output_directory, timeout_read=30): + self.stream_url = stream_url + self.check_interval = check_interval + self.timeout_connect = timeout_connect + self.timeout_read = timeout_read + self.output_directory = output_directory + self.recorder = None + self.is_stream_live = False + + async def check_stream(self, session): + try: + timeout = ClientTimeout(connect=self.timeout_connect) + async with session.get(self.stream_url, timeout=timeout, allow_redirects=True) as response: + if response.status == 200: + self.is_stream_live = True + log_event("stream_live", {"stream_url": self.stream_url}) + else: + self.is_stream_live = False + log_event("stream_offline", {"stream_url": self.stream_url}) + except asyncio.TimeoutError: + log_event("check_stream_timeout", {"stream_url": self.stream_url}) + except Exception as e: + print(self.stream_url) + log_event("check_stream_error", {"error": str(e)}) + + async def run(self): + while True: + async with ClientSession() as session: + await self.check_stream(session) + + if self.is_stream_live and (self.recorder is None or not self.recorder.is_active()): + self.recorder = Recorder(self.stream_url, self.output_directory, self.timeout_read) + await self.recorder.start_recording() + + await asyncio.sleep(self.check_interval) diff --git a/src/utils.py b/src/utils.py new file mode 100644 index 0000000..fec978b --- /dev/null +++ b/src/utils.py @@ -0,0 +1,27 @@ +import hashlib +import string + +def sanitize_filename(filename): + """ + Sanitize the filename by removing or replacing invalid characters. + """ + valid_chars = "-_.() %s%s" % (string.ascii_letters, string.digits) + cleaned_filename = "".join(c for c in filename if c in valid_chars) + cleaned_filename = cleaned_filename.replace(' ', '_') # Replace spaces with underscores + return cleaned_filename + +def generate_file_hash(file_path): + """ + Generate a hash for file contents to uniquely identify files. + """ + hasher = hashlib.sha256() + with open(file_path, 'rb') as f: + while chunk := f.read(8192): + hasher.update(chunk) + return hasher.hexdigest() + +def file_hash_to_id(file_hash, length=32): + """ + Convert file hash to a shorter file ID, considering only the first length characters. + """ + return file_hash[:length]