first release

This commit is contained in:
Dmitrii Andreev
2024-01-07 14:33:44 +03:00
parent 13149411f9
commit 2d2820f9cd
14 changed files with 441 additions and 0 deletions

6
.env.example Normal file
View File

@@ -0,0 +1,6 @@
STREAM_URL=http://example.com/stream
CHECK_INTERVAL=60
RECORD_DIRECTORY=/records
CONNECT_TIMEOUT=10
FIRST_BYTE_TIMEOUT=30
WEB_SERVER_PORT=8080

19
.gitignore vendored Normal file
View File

@@ -0,0 +1,19 @@
records
# Byte-compiled / optimized / DLL files
__pycache__/
*.py[cod]
*$py.class
# C extensions
*.so
# Environments
.env
.venv
env/
venv/
ENV/
env.bak/
venv.bak/
*.swp

20
Dockerfile Normal file
View File

@@ -0,0 +1,20 @@
# Use an official Python runtime as the parent image
FROM python:3.11-slim
# Set the working directory in the container
WORKDIR /app
# Copy the current directory contents into the container at the working directory
COPY . .
# Install any needed packages specified in requirements.txt
RUN pip install --no-cache-dir -r requirements.txt
# Make port 8080 available to the world outside this container
EXPOSE 8080
# Define environment variable for record directory
ENV RECORD_DIRECTORY=/records
# Run main.py when the container launches
CMD [ "python", "./src/main.py" ]

22
Makefile Normal file
View File

@@ -0,0 +1,22 @@
run:
@echo "Starting Icecast Recorder Service"
python src/main.py
test:
@echo "Running tests"
python -m unittest discover -s tests/
build:
@echo "Building Docker image for Icecast Recorder Service"
docker build -t icecast-recorder .
docker-run: build
@echo "Running Icecast Recorder Service in a Docker container"
docker run -p 8080:8080 --env-file .env.example icecast-recorder
clean:
@echo "Cleaning up pycache and .pyc files"
find . -type d -name pycache -exec rm -r {} +
find . -type f -name '*.pyc' -delete
.PHONY: run test build docker-run clean

16
docker-compose.yml Normal file
View File

@@ -0,0 +1,16 @@
version: '3'
services:
icecast-recorder:
build: .
ports:
- "8080:8080"
environment:
- STREAM_URL=http://example.com/stream
- CHECK_INTERVAL=60
- RECORD_DIRECTORY=/records
- CONNECT_TIMEOUT=10
- FIRST_BYTE_TIMEOUT=30
- WEB_SERVER_PORT=8080
volumes:
- ./records:/records

3
requirements.txt Normal file
View File

@@ -0,0 +1,3 @@
aiohttp==3.9.1
yattag==1.15.2
python-dotenv==1.0.0

48
src/config.py Normal file
View File

@@ -0,0 +1,48 @@
import argparse
import os
from dotenv import load_dotenv
# Load .env file if available
load_dotenv()
# Default configuration values
DEFAULTS = {
'server_host': 'https://example.org',
'server_port': 8080,
'stream_url': 'http://example.com/stream',
'output_directory': './records',
'check_interval': 60,
'timeout_connect': 10,
'timeout_read': 30,
}
def parse_arguments():
parser = argparse.ArgumentParser(description='Icecast Recorder Service')
parser.add_argument('--server-host', help='Server host name with protocol')
parser.add_argument('--server-port', type=int, help='Server port number')
parser.add_argument('--stream-url', help='URL of the Icecast stream to monitor and record')
parser.add_argument('--file-url-base', help='Base URL used for constructing file links in the RSS feed')
parser.add_argument('--output-directory', help='Directory to save the recordings')
parser.add_argument('--check-interval', type=int, help='Interval to check the stream in seconds')
parser.add_argument('--timeout-connect', type=int, help='Timeout for connecting to the stream in seconds')
parser.add_argument('--timeout-read', type=int, help='Read timeout in seconds')
return vars(parser.parse_args())
def load_configuration():
cmd_args = parse_arguments()
# Configuration is established using a priority: CommandLine > EnvironmentVars > Defaults
config = {
'server_host': cmd_args['server_host'] or os.getenv('SERVER_HOST') or DEFAULTS['server_host'],
'server_port': cmd_args['server_port'] or os.getenv('SERVER_PORT') or DEFAULTS['server_port'],
'stream_url': cmd_args['stream_url'] or os.getenv('STREAM_URL') or DEFAULTS['stream_url'],
'output_directory': cmd_args['output_directory'] or os.getenv('OUTPUT_DIRECTORY') or DEFAULTS['output_directory'],
'check_interval': cmd_args['check_interval'] or os.getenv('CHECK_INTERVAL') or DEFAULTS['check_interval'],
'timeout_connect': cmd_args['timeout_connect'] or os.getenv('TIMEOUT_CONNECT') or DEFAULTS['timeout_connect'],
'timeout_read': cmd_args['timeout_read'] or os.getenv('TIMEOUT_READ') or DEFAULTS['timeout_read']
}
# Converting string paths to absolute paths
config['output_directory'] = os.path.abspath(config['output_directory'])
return argparse.Namespace(**config)

37
src/logger.py Normal file
View File

@@ -0,0 +1,37 @@
import json
import sys
from datetime import datetime
# Log levels
DEBUG = "DEBUG"
INFO = "INFO"
WARNING = "WARNING"
ERROR = "ERROR"
FATAL = "FATAL"
def log_event(event, details, level=INFO):
log_entry = {
"timestamp": datetime.utcnow().isoformat(),
"event": event,
"level": level,
"details": details
}
json_log_entry = json.dumps(log_entry)
print(json_log_entry, file=sys.stdout) # Write to stdout
sys.stdout.flush() # Immediately flush the log entry
# Specific log functions per level for convenience
def log_debug(event, details):
log_event(event, details, level=DEBUG)
def log_info(event, details):
log_event(event, details, level=INFO)
def log_warning(event, details):
log_event(event, details, level=WARNING)
def log_error(event, details):
log_event(event, details, level=ERROR)
def log_fatal(event, details):
log_event(event, details, level=FATAL)

37
src/main.py Normal file
View File

@@ -0,0 +1,37 @@
import asyncio
from server import start_server
from stream_checker import StreamChecker
from config import load_configuration
def main():
# Load configuration from command line arguments and environment variables
config = load_configuration()
# Create the StreamChecker instance
checker = StreamChecker(
stream_url=config.stream_url,
check_interval=config.check_interval,
timeout_connect=config.timeout_connect,
timeout_read=config.timeout_read,
output_directory=config.output_directory
)
# Start the Icecast stream checking and recording loop
checker_task = asyncio.ensure_future(checker.run())
# Start the health check and file serving server
server_task = asyncio.ensure_future(start_server(config))
# Run both tasks in the event loop
loop = asyncio.get_event_loop()
try:
loop.run_until_complete(asyncio.gather(checker_task, server_task))
except KeyboardInterrupt:
pass
finally:
checker_task.cancel()
server_task.cancel()
loop.close()
if __name__ == "__main__":
main()

67
src/recorder.py Normal file
View File

@@ -0,0 +1,67 @@
import aiohttp
import os
from datetime import datetime, timedelta
from logger import log_event, log_error
from utils import sanitize_filename
from pprint import pprint
class Recorder:
def __init__(self, stream_url, output_directory, timeout_connect=10, timeout_read=30):
self.stream_url = stream_url
self.output_directory = output_directory
self.timeout_read = timeout_read
self.timeout_connect = timeout_connect
self.file_name = None
self.start_time = None
self.last_data_time = None
self.is_recording = False
async def start_recording(self):
self.start_time = datetime.utcnow()
domain = self.stream_url.split("//")[-1].split("/")[0]
sanitized_domain = sanitize_filename(domain)
date_str = self.start_time.strftime("%Y%m%d_%H%M%S")
self.file_name = f"{sanitized_domain}_{date_str}.mp3.tmp"
self.file_path = os.path.join(self.output_directory, self.file_name)
try:
timeout = aiohttp.ClientTimeout(total=None, connect=self.timeout_connect, sock_read=self.timeout_read)
async with aiohttp.ClientSession(timeout=timeout) as session:
async with session.get(self.stream_url) as response:
if response.status == 200:
self.is_recording = True
log_event("recording_started", {"file_name": self.file_name, "stream_url": self.stream_url})
async for data, _ in response.content.iter_chunks():
if not data:
break
self.last_data_time = datetime.utcnow()
with open(self.file_path, 'ab') as f:
f.write(data)
# Check if timeout exceeded between data chunks
if datetime.utcnow() - self.last_data_time > timedelta(seconds=self.timeout_read):
log_error("timeout_exceeded", {
"stream_url": self.stream_url,
"elapsed_seconds": (datetime.utcnow() - self.last_data_time).total_seconds()
}, level="WARNING")
break
log_event("recording_finished", {"file_name": self.file_name, "stream_url": self.stream_url})
else:
log_event("stream_unavailable", {"http_status": response.status})
except Exception as e:
log_event('recording_error', {"error": str(e)}, level="ERROR")
pprint(e)
finally:
self.is_recording = False
self.end_recording()
def end_recording(self):
if os.path.exists(self.file_path):
finished_file = self.file_path.replace('.tmp', '')
os.rename(self.file_path, finished_file)
log_event("recording_saved", {
"file_name": finished_file,
"duration": (datetime.utcnow() - self.start_time).total_seconds() if self.start_time else 0
})
def is_active(self):
return self.is_recording

38
src/rss_generator.py Normal file
View File

@@ -0,0 +1,38 @@
from yattag import Doc
import os
from utils import generate_file_hash, file_hash_to_id
from datetime import datetime
from yattag import Doc
import os
from utils import generate_file_hash, file_hash_to_id
def generate_rss_feed(files, output_directory, server_host):
doc, tag, text = Doc().tagtext()
doc.asis('<?xml version="1.0" encoding="UTF-8"?>')
with tag('rss', version='2.0'):
with tag('channel'):
with tag('title'):
text('Icecast Stream Recordings')
with tag('description'):
text('The latest recordings from the Icecast server.')
with tag('link'):
text(server_host)
for file_name in files:
file_path = os.path.join(output_directory, file_name)
file_hash = generate_file_hash(file_path)
file_id = file_hash_to_id(file_hash)
with tag('item'):
with tag('title'):
text(file_name)
with tag('link'):
text(f'{server_host}/files/{file_name}')
with tag('guid', isPermaLink='false'):
text(file_id)
with tag('pubDate'):
pub_date = datetime.utcfromtimestamp(os.path.getctime(file_path)).strftime('%a, %d %b %Y %H:%M:%S UTC')
text(pub_date)
return doc.getvalue()

59
src/server.py Normal file
View File

@@ -0,0 +1,59 @@
from aiohttp import web
import os
import mimetypes
from rss_generator import generate_rss_feed
from logger import log_event
from pprint import pprint
from pathlib import Path
routes = web.RouteTableDef()
@routes.get('/health')
async def helth_check(request):
log_event("health_check_requested", {"method": "GET", "path": request.path}, level="INFO")
return web.Response(text="OK")
@routes.get('/rss')
async def rss_feed(request):
log_event("rss_feed_requested", {"method": "GET", "path": request.path}, level="INFO")
output_directory = request.app['config'].output_directory
files = [f for f in os.listdir(output_directory) if f.endswith('.mp3')]
rss_xml = generate_rss_feed(files, output_directory, request.app['config'].server_host)
return web.Response(text=rss_xml, content_type='application/rss+xml')
@routes.get('/files/{file_name}')
async def serve_file(request):
file_name = request.match_info['file_name']
log_event("file_serve_requested", {"method": "GET", "path": request.path, "file_name": file_name}, level="INFO")
output_directory = request.app['config'].output_directory
file_path = os.path.join(output_directory, file_name)
pprint(file_path)
if not Path(output_directory).joinpath(file_name).resolve().relative_to(Path(output_directory).resolve()):
log_event("file_access_denied", {"file_name": file_name}, level="WARNING")
return web.Response(status=403, text='Access denied')
if not os.path.exists(file_path):
log_event("file_not_found", {"file_name": file_name}, level="WARNING")
return web.Response(status=404, text='File not found')
file = os.path.basename(file_path)
content_type, _ = mimetypes.guess_type(file)
headers = {
'Content-Disposition': f'attachment; filename="{file}"',
'Content-Type': content_type or 'application/octet-stream',
}
return web.FileResponse(file_path, headers=headers)
async def start_server(config):
app = web.Application()
app['config'] = config
app.add_routes(routes)
runner = web.AppRunner(app)
await runner.setup()
site = web.TCPSite(runner, '0.0.0.0', config.server_port)
log_event('server_starting', {'port': config.server_port}, level="INFO")
await site.start()
log_event('server_started', {'port': config.server_port}, level="INFO")

42
src/stream_checker.py Normal file
View File

@@ -0,0 +1,42 @@
import asyncio
from pprint import pprint
from aiohttp import ClientSession, ClientTimeout
from recorder import Recorder
from logger import log_event
class StreamChecker:
def __init__(self, stream_url, check_interval, timeout_connect, output_directory, timeout_read=30):
self.stream_url = stream_url
self.check_interval = check_interval
self.timeout_connect = timeout_connect
self.timeout_read = timeout_read
self.output_directory = output_directory
self.recorder = None
self.is_stream_live = False
async def check_stream(self, session):
try:
timeout = ClientTimeout(connect=self.timeout_connect)
async with session.get(self.stream_url, timeout=timeout, allow_redirects=True) as response:
if response.status == 200:
self.is_stream_live = True
log_event("stream_live", {"stream_url": self.stream_url})
else:
self.is_stream_live = False
log_event("stream_offline", {"stream_url": self.stream_url})
except asyncio.TimeoutError:
log_event("check_stream_timeout", {"stream_url": self.stream_url})
except Exception as e:
print(self.stream_url)
log_event("check_stream_error", {"error": str(e)})
async def run(self):
while True:
async with ClientSession() as session:
await self.check_stream(session)
if self.is_stream_live and (self.recorder is None or not self.recorder.is_active()):
self.recorder = Recorder(self.stream_url, self.output_directory, self.timeout_read)
await self.recorder.start_recording()
await asyncio.sleep(self.check_interval)

27
src/utils.py Normal file
View File

@@ -0,0 +1,27 @@
import hashlib
import string
def sanitize_filename(filename):
"""
Sanitize the filename by removing or replacing invalid characters.
"""
valid_chars = "-_.() %s%s" % (string.ascii_letters, string.digits)
cleaned_filename = "".join(c for c in filename if c in valid_chars)
cleaned_filename = cleaned_filename.replace(' ', '_') # Replace spaces with underscores
return cleaned_filename
def generate_file_hash(file_path):
"""
Generate a hash for file contents to uniquely identify files.
"""
hasher = hashlib.sha256()
with open(file_path, 'rb') as f:
while chunk := f.read(8192):
hasher.update(chunk)
return hasher.hexdigest()
def file_hash_to_id(file_hash, length=32):
"""
Convert file hash to a shorter file ID, considering only the first length characters.
"""
return file_hash[:length]