refactoring, simplification

This commit is contained in:
Dmitrii Andreev
2025-04-07 12:36:56 +03:00
parent 535020aa0f
commit a3038b43ab
12 changed files with 234 additions and 380 deletions

View File

@@ -21,37 +21,33 @@ import (
"github.com/kemko/icecast-ripper/internal/streamchecker"
)
const version = "0.2.0"
func main() {
// Load configuration
cfg, err := config.LoadConfig()
if err != nil {
_, err2 := fmt.Fprintf(os.Stderr, "Error loading configuration: %v\n", err)
if err2 != nil {
return
}
fmt.Fprintf(os.Stderr, "Error loading configuration: %v\n", err)
os.Exit(1)
}
// Setup logger
logger.Setup(cfg.LogLevel)
slog.Info("Starting icecast-ripper", "version", version)
slog.Info("Starting icecast-ripper", "version", "0.1.0") // Updated version
// Validate essential config
// Validate essential configuration
if cfg.StreamURL == "" {
slog.Error("Configuration error: STREAM_URL must be set.")
slog.Error("Configuration error: STREAM_URL must be set")
os.Exit(1)
}
// Extract stream name from URL for use in GUID generation
// Extract stream name for GUID generation
streamName := extractStreamName(cfg.StreamURL)
slog.Info("Using stream name for GUID generation", "name", streamName)
slog.Info("Using stream name for identification", "name", streamName)
// Initialize file store (replacing SQLite database)
// If DatabasePath is provided, use it for JSON persistence
// Initialize file store
storePath := ""
if cfg.DatabasePath != "" {
// Change extension from .db to .json for the file store
storePath = changeExtension(cfg.DatabasePath, ".json")
}
@@ -60,89 +56,81 @@ func main() {
slog.Error("Failed to initialize file store", "error", err)
os.Exit(1)
}
// Properly handle Close() error
defer func() {
if err := fileStore.Close(); err != nil {
slog.Error("Failed to close file store", "error", err)
slog.Error("Error closing file store", "error", err)
}
}()
// Create a context that cancels on shutdown signal
// Create main context that cancels on shutdown signal
ctx, stop := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM)
defer stop() // Ensure context is cancelled even if not explicitly stopped later
defer stop()
// --- Initialize components ---
slog.Info("Initializing components...")
// Initialize components
streamChecker := streamchecker.New(cfg.StreamURL)
recorderInstance, err := recorder.New(cfg.TempPath, cfg.RecordingsPath, fileStore, streamName)
if err != nil {
slog.Error("Failed to initialize recorder", "error", err)
os.Exit(1)
}
rssGenerator := rss.New(fileStore, cfg, "Icecast Recordings", "Recordings from stream: "+cfg.StreamURL)
schedulerInstance := scheduler.New(cfg.CheckInterval, streamChecker, recorderInstance)
httpServer := server.New(cfg, rssGenerator)
// --- Start components ---
// Start services
slog.Info("Starting services...")
schedulerInstance.Start(ctx) // Pass the main context to the scheduler
// Start the scheduler which will check for streams and record them
schedulerInstance.Start(ctx)
// Start the HTTP server for RSS feed
if err := httpServer.Start(); err != nil {
slog.Error("Failed to start HTTP server", "error", err)
stop() // Trigger shutdown if server fails to start
stop()
os.Exit(1)
}
slog.Info("Application started successfully. Press Ctrl+C to shut down.")
// Wait for context cancellation (due to signal)
// Wait for termination signal
<-ctx.Done()
slog.Info("Shutting down application...")
// --- Graceful shutdown ---
// Create a shutdown context with a timeout
// Graceful shutdown
shutdownCtx, shutdownCancel := context.WithTimeout(context.Background(), 30*time.Second)
defer shutdownCancel()
// Stop scheduler first (prevents new recordings)
schedulerInstance.Stop() // This waits for the scheduler loop to exit
// First stop the scheduler to prevent new recordings
schedulerInstance.Stop()
// Stop the recorder (ensures any ongoing recording is finalized or cancelled)
// Note: Stopping the scheduler doesn't automatically stop an *ongoing* recording.
// We need to explicitly stop the recorder if we want it to terminate immediately.
recorderInstance.StopRecording() // Signal any active recording to stop
// Then stop any ongoing recording
recorderInstance.StopRecording()
// Stop the HTTP server
// Finally, stop the HTTP server
if err := httpServer.Stop(shutdownCtx); err != nil {
slog.Warn("HTTP server shutdown error", "error", err)
}
// File store is closed by the deferred function call
slog.Info("Application shut down gracefully")
}
// extractStreamName extracts a meaningful stream name from the URL.
// This is used as part of the GUID generation.
// extractStreamName extracts a meaningful identifier from the URL
func extractStreamName(streamURL string) string {
// Try to parse the URL
parsedURL, err := url.Parse(streamURL)
if err != nil {
// If we can't parse it, just use the hostname part or the whole URL
return streamURL
}
// Use the host as the base name
streamName := parsedURL.Hostname()
// If there's a path that appears to be a stream identifier, use that too
if parsedURL.Path != "" && parsedURL.Path != "/" {
// Remove leading slash and use just the first part of the path if it has multiple segments
path := parsedURL.Path
if path[0] == '/' {
path = path[1:]
}
// Take just the first segment of the path
pathSegments := filepath.SplitList(path)
if len(pathSegments) > 0 {
streamName += "_" + pathSegments[0]

3
go.mod
View File

@@ -3,14 +3,13 @@ module github.com/kemko/icecast-ripper
go 1.24.2
require (
github.com/mattn/go-sqlite3 v1.14.27
github.com/gorilla/feeds v1.2.0
github.com/spf13/viper v1.20.1
)
require (
github.com/fsnotify/fsnotify v1.9.0 // indirect
github.com/go-viper/mapstructure/v2 v2.2.1 // indirect
github.com/gorilla/feeds v1.2.0 // indirect
github.com/pelletier/go-toml/v2 v2.2.3 // indirect
github.com/sagikazarmark/locafero v0.9.0 // indirect
github.com/sourcegraph/conc v0.3.0 // indirect

2
go.sum
View File

@@ -14,8 +14,6 @@ github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE=
github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk=
github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE=
github.com/mattn/go-sqlite3 v1.14.27 h1:drZCnuvf37yPfs95E5jd9s3XhdVWLal+6BOK6qrv6IU=
github.com/mattn/go-sqlite3 v1.14.27/go.mod h1:Uh1q+B4BYcTPb+yiD3kU8Ct7aC0hY9fxUwlHK0RXw+Y=
github.com/pelletier/go-toml/v2 v2.2.3 h1:YmeHyLY8mFWbdkNWwpr+qIL2bEqT0o95WSdkNHvL12M=
github.com/pelletier/go-toml/v2 v2.2.3/go.mod h1:MfCQTFTvCcUyyvvwm1+G6H/jORL20Xlb6rzQu9GuUkc=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=

View File

@@ -1,14 +1,12 @@
package config
import (
"strings"
"time"
"github.com/spf13/viper"
)
// Config stores all configuration for the application.
// The values are read by viper from environment variables or a config file.
// Config stores all configuration for the application
type Config struct {
StreamURL string `mapstructure:"STREAM_URL"`
CheckInterval time.Duration `mapstructure:"CHECK_INTERVAL"`
@@ -16,54 +14,33 @@ type Config struct {
TempPath string `mapstructure:"TEMP_PATH"`
DatabasePath string `mapstructure:"DATABASE_PATH"`
ServerAddress string `mapstructure:"SERVER_ADDRESS"`
RSSFeedURL string `mapstructure:"RSS_FEED_URL"` // Base URL for the RSS feed links
RSSFeedURL string `mapstructure:"RSS_FEED_URL"`
LogLevel string `mapstructure:"LOG_LEVEL"`
}
// LoadConfig reads configuration from environment variables.
// LoadConfig reads configuration from environment variables
func LoadConfig() (*Config, error) {
viper.AutomaticEnv()
viper.SetEnvKeyReplacer(strings.NewReplacer(".", "_"))
v := viper.New()
v.AutomaticEnv()
// Set default values
viper.SetDefault("STREAM_URL", "")
viper.SetDefault("CHECK_INTERVAL", "1m")
viper.SetDefault("RECORDINGS_PATH", "./recordings")
viper.SetDefault("TEMP_PATH", "./temp")
viper.SetDefault("DATABASE_PATH", "./icecast-ripper.db")
viper.SetDefault("SERVER_ADDRESS", ":8080")
viper.SetDefault("RSS_FEED_URL", "http://localhost:8080/rss") // Example default
viper.SetDefault("LOG_LEVEL", "info")
defaults := map[string]interface{}{
"STREAM_URL": "",
"CHECK_INTERVAL": "1m",
"RECORDINGS_PATH": "./recordings",
"TEMP_PATH": "./temp",
"DATABASE_PATH": "./icecast-ripper.db",
"SERVER_ADDRESS": ":8080",
"RSS_FEED_URL": "http://localhost:8080/rss",
"LOG_LEVEL": "info",
}
for key, value := range defaults {
v.SetDefault(key, value)
}
var config Config
// Bind environment variables to struct fields
if err := viper.BindEnv("STREAM_URL"); err != nil {
return nil, err
}
if err := viper.BindEnv("CHECK_INTERVAL"); err != nil {
return nil, err
}
if err := viper.BindEnv("RECORDINGS_PATH"); err != nil {
return nil, err
}
if err := viper.BindEnv("TEMP_PATH"); err != nil {
return nil, err
}
if err := viper.BindEnv("DATABASE_PATH"); err != nil {
return nil, err
}
if err := viper.BindEnv("SERVER_ADDRESS"); err != nil {
return nil, err
}
if err := viper.BindEnv("RSS_FEED_URL"); err != nil {
return nil, err
}
if err := viper.BindEnv("LOG_LEVEL"); err != nil {
return nil, err
}
// Unmarshal the config
if err := viper.Unmarshal(&config); err != nil {
if err := v.Unmarshal(&config); err != nil {
return nil, err
}

View File

@@ -11,25 +11,24 @@ import (
"time"
)
// FileStore represents an in-memory store with optional file persistence
type FileStore struct {
files map[string]*RecordedFile // Map of filename -> file data
storePath string // Path to persistence file, if empty no persistence
mu sync.RWMutex // Mutex for thread safety
}
// RecordedFile represents information about a recorded file.
// RecordedFile represents information about a recorded file
type RecordedFile struct {
ID int64 `json:"id"`
Filename string `json:"filename"`
Hash string `json:"hash"` // GUID for the file
Hash string `json:"hash"`
FileSize int64 `json:"fileSize"`
Duration time.Duration `json:"duration"`
RecordedAt time.Time `json:"recordedAt"`
}
// InitDB initializes a new FileStore.
// If dataSourceName is provided, it will be used as the path to persist the store.
// FileStore represents an in-memory store with optional file persistence
type FileStore struct {
files map[string]*RecordedFile
storePath string
mu sync.RWMutex
}
// InitDB initializes a new FileStore
func InitDB(dataSourceName string) (*FileStore, error) {
slog.Info("Initializing file store", "path", dataSourceName)
@@ -38,27 +37,21 @@ func InitDB(dataSourceName string) (*FileStore, error) {
storePath: dataSourceName,
}
// If a data source is provided, try to load existing data
if dataSourceName != "" {
if err := fs.loadFromFile(); err != nil {
// If file doesn't exist, that's fine - we'll create it when we save
if !os.IsNotExist(err) {
if err := fs.loadFromFile(); err != nil && !os.IsNotExist(err) {
slog.Error("Failed to load file store data", "error", err)
}
}
}
slog.Info("File store initialized successfully")
return fs, nil
}
// loadFromFile loads the file store data from the persistence file.
func (fs *FileStore) loadFromFile() error {
fs.mu.Lock()
defer fs.mu.Unlock()
if fs.storePath == "" {
return nil // No persistence path set
return nil
}
data, err := os.ReadFile(fs.storePath)
@@ -71,7 +64,6 @@ func (fs *FileStore) loadFromFile() error {
return fmt.Errorf("failed to parse file store data: %w", err)
}
// Reset the map and repopulate it
fs.files = make(map[string]*RecordedFile, len(files))
for _, file := range files {
fs.files[file.Filename] = file
@@ -81,22 +73,19 @@ func (fs *FileStore) loadFromFile() error {
return nil
}
// saveToFile persists the file store data to disk.
func (fs *FileStore) saveToFile() error {
fs.mu.RLock()
defer fs.mu.RUnlock()
if fs.storePath == "" {
return nil // No persistence path set
return nil
}
// Create directory if it doesn't exist
dir := filepath.Dir(fs.storePath)
if err := os.MkdirAll(dir, 0755); err != nil {
return fmt.Errorf("failed to create directory for file store: %w", err)
}
// Convert map to slice
files := make([]*RecordedFile, 0, len(fs.files))
for _, file := range fs.files {
files = append(files, file)
@@ -115,15 +104,13 @@ func (fs *FileStore) saveToFile() error {
return nil
}
// AddRecordedFile adds a file to the store.
// AddRecordedFile adds a file to the store
func (fs *FileStore) AddRecordedFile(filename, hash string, fileSize int64, duration time.Duration, recordedAt time.Time) (int64, error) {
fs.mu.Lock()
defer fs.mu.Unlock()
// Generate a unique ID (just use timestamp as we don't need real DB IDs)
id := time.Now().UnixNano()
// Store the file
fs.files[filename] = &RecordedFile{
ID: id,
Filename: filename,
@@ -133,32 +120,27 @@ func (fs *FileStore) AddRecordedFile(filename, hash string, fileSize int64, dura
RecordedAt: recordedAt,
}
// Persist changes
if err := fs.saveToFile(); err != nil {
slog.Error("Failed to save file store data", "error", err)
}
slog.Debug("Added recorded file to store", "id", id, "filename", filename, "hash", hash)
return id, nil
}
// GetRecordedFiles retrieves all recorded files, ordered by recording date descending.
// GetRecordedFiles retrieves all recorded files, ordered by recording date descending
func (fs *FileStore) GetRecordedFiles(limit int) ([]RecordedFile, error) {
fs.mu.RLock()
defer fs.mu.RUnlock()
// Convert map to slice for sorting
files := make([]RecordedFile, 0, len(fs.files))
for _, file := range fs.files {
files = append(files, *file)
}
// Sort by recorded_at descending
sort.Slice(files, func(i, j int) bool {
return files[i].RecordedAt.After(files[j].RecordedAt)
})
// Apply limit if provided
if limit > 0 && limit < len(files) {
files = files[:limit]
}
@@ -166,8 +148,7 @@ func (fs *FileStore) GetRecordedFiles(limit int) ([]RecordedFile, error) {
return files, nil
}
// Close closes the file store and ensures all data is persisted.
// Close ensures all data is persisted
func (fs *FileStore) Close() error {
slog.Info("Closing file store")
return fs.saveToFile()
}

View File

@@ -10,46 +10,36 @@ import (
"time"
)
// GenerateGUID generates a unique identifier based on file metadata without reading file content.
// It uses recording metadata (stream name, recording start time, filename) to create a deterministic GUID.
// GenerateGUID creates a unique identifier based on metadata
// Uses stream name, recording time and filename to create a deterministic GUID
func GenerateGUID(streamName string, recordedAt time.Time, filePath string) string {
filename := filepath.Base(filePath)
// Create a unique string combining stream name, recording time (rounded to seconds),
// and filename for uniqueness
// Create a unique string combining metadata
input := fmt.Sprintf("%s:%s:%s",
streamName,
recordedAt.UTC().Format(time.RFC3339),
filename)
slog.Debug("Generating GUID", "input", input)
// Hash the combined string to create a GUID
// Hash the combined string
hasher := sha256.New()
hasher.Write([]byte(input))
guid := hex.EncodeToString(hasher.Sum(nil))
slog.Debug("Generated GUID", "file", filePath, "guid", guid)
slog.Debug("Generated GUID", "input", input, "guid", guid)
return guid
}
// GenerateFileHash remains for backward compatibility but is now built on GenerateGUID
// This uses file metadata from the path and mtime instead of reading file content
// GenerateFileHash is maintained for backwards compatibility
// Uses file metadata instead of content
func GenerateFileHash(filePath string) (string, error) {
fileInfo, err := os.Stat(filePath)
if err != nil {
return "", fmt.Errorf("failed to stat file %s: %w", filePath, err)
}
// Extract stream name from directory structure if possible, otherwise use parent dir
dir := filepath.Dir(filePath)
streamName := filepath.Base(dir)
// Use file modification time as a proxy for recording time
streamName := filepath.Base(filepath.Dir(filePath))
recordedAt := fileInfo.ModTime()
guid := GenerateGUID(streamName, recordedAt, filePath)
slog.Debug("Generated hash using metadata", "file", filePath, "hash", guid)
return guid, nil
return GenerateGUID(streamName, recordedAt, filePath), nil
}

View File

@@ -6,25 +6,30 @@ import (
"strings"
)
// Setup initializes the structured logger.
// Setup initializes the structured logger with the specified log level
func Setup(logLevel string) {
var level slog.Level
switch strings.ToLower(logLevel) {
case "debug":
level = slog.LevelDebug
case "warn":
level = slog.LevelWarn
case "error":
level = slog.LevelError
default:
level = slog.LevelInfo
}
level := parseLogLevel(logLevel)
opts := &slog.HandlerOptions{
handler := slog.NewJSONHandler(os.Stdout, &slog.HandlerOptions{
Level: level,
}
handler := slog.NewJSONHandler(os.Stdout, opts) // Or slog.NewTextHandler
slog.SetDefault(slog.New(handler))
})
logger := slog.New(handler)
slog.SetDefault(logger)
slog.Info("Logger initialized", "level", level.String())
}
// parseLogLevel converts a string log level to slog.Level
func parseLogLevel(level string) slog.Level {
switch strings.ToLower(level) {
case "debug":
return slog.LevelDebug
case "warn", "warning":
return slog.LevelWarn
case "error":
return slog.LevelError
default:
return slog.LevelInfo
}
}

View File

@@ -2,6 +2,7 @@ package recorder
import (
"context"
"errors"
"fmt"
"io"
"log/slog"
@@ -16,26 +17,22 @@ import (
"github.com/kemko/icecast-ripper/internal/hash"
)
// Recorder handles downloading the stream and saving recordings.
type Recorder struct {
tempPath string
recordingsPath string
db *database.FileStore
client *http.Client
mu sync.Mutex // Protects access to isRecording
mu sync.Mutex
isRecording bool
cancelFunc context.CancelFunc // To stop the current recording
streamName string // Name of the stream being recorded
cancelFunc context.CancelFunc
streamName string
}
// New creates a new Recorder instance.
func New(tempPath, recordingsPath string, db *database.FileStore, streamName string) (*Recorder, error) {
// Ensure temp and recordings directories exist
if err := os.MkdirAll(tempPath, 0755); err != nil {
return nil, fmt.Errorf("failed to create temp directory %s: %w", tempPath, err)
for _, dir := range []string{tempPath, recordingsPath} {
if err := os.MkdirAll(dir, 0755); err != nil {
return nil, fmt.Errorf("failed to create directory %s: %w", dir, err)
}
if err := os.MkdirAll(recordingsPath, 0755); err != nil {
return nil, fmt.Errorf("failed to create recordings directory %s: %w", recordingsPath, err)
}
return &Recorder{
@@ -43,76 +40,65 @@ func New(tempPath, recordingsPath string, db *database.FileStore, streamName str
recordingsPath: recordingsPath,
db: db,
streamName: streamName,
client: &http.Client{
// Use a longer timeout for downloading the stream
Timeout: 0, // No timeout for the download itself, rely on context cancellation
},
client: &http.Client{Timeout: 0}, // No timeout, rely on context cancellation
}, nil
}
// IsRecording returns true if a recording is currently in progress.
func (r *Recorder) IsRecording() bool {
r.mu.Lock()
defer r.mu.Unlock()
return r.isRecording
}
// StartRecording starts downloading the stream to a temporary file.
// It runs asynchronously and returns immediately.
func (r *Recorder) StartRecording(ctx context.Context, streamURL string) error {
r.mu.Lock()
defer r.mu.Unlock()
if r.isRecording {
r.mu.Unlock()
return fmt.Errorf("recording already in progress")
return errors.New("recording already in progress")
}
// Create a new context that can be cancelled to stop this specific recording
recordingCtx, cancel := context.WithCancel(ctx)
r.cancelFunc = cancel
r.isRecording = true
r.mu.Unlock()
slog.Info("Starting recording", "url", streamURL)
go r.recordStream(recordingCtx, streamURL) // Run in a goroutine
go r.recordStream(recordingCtx, streamURL)
return nil
}
// StopRecording stops the current recording if one is in progress.
func (r *Recorder) StopRecording() {
r.mu.Lock()
defer r.mu.Unlock()
if r.isRecording && r.cancelFunc != nil {
slog.Info("Stopping current recording")
r.cancelFunc() // Signal the recording goroutine to stop
// The lock ensures that isRecording is set to false *after* the goroutine finishes
r.cancelFunc()
}
}
// recordStream performs the actual download and processing.
func (r *Recorder) recordStream(ctx context.Context, streamURL string) {
startTime := time.Now()
var tempFilePath string
// Defer setting isRecording to false and cleaning up cancelFunc
defer func() {
r.mu.Lock()
r.isRecording = false
r.cancelFunc = nil
r.mu.Unlock()
slog.Info("Recording process finished")
// Clean up temp file if it still exists (e.g., due to early error)
if tempFilePath != "" {
if _, err := os.Stat(tempFilePath); err == nil {
slog.Warn("Removing leftover temporary file", "path", tempFilePath)
slog.Warn("Cleaning up temporary file", "path", tempFilePath)
if err := os.Remove(tempFilePath); err != nil {
slog.Error("Failed to remove temporary file", "path", tempFilePath, "error", err)
slog.Error("Failed to remove temporary file", "error", err)
}
}
}
}()
// Create temporary file
tempFile, err := os.CreateTemp(r.tempPath, "recording-*.tmp")
if err != nil {
slog.Error("Failed to create temporary file", "error", err)
@@ -121,77 +107,54 @@ func (r *Recorder) recordStream(ctx context.Context, streamURL string) {
tempFilePath = tempFile.Name()
slog.Debug("Created temporary file", "path", tempFilePath)
// Download stream
bytesWritten, err := r.downloadStream(ctx, streamURL, tempFile)
// Close the file explicitly *before* hashing and moving
if closeErr := tempFile.Close(); closeErr != nil {
slog.Error("Failed to close temporary file", "path", tempFilePath, "error", closeErr)
// If download also failed, prioritize that error
slog.Error("Failed to close temporary file", "error", closeErr)
if err == nil {
err = closeErr // Report closing error if download was ok
err = closeErr
}
}
// Handle context cancellation or download errors
if err != nil {
// Check if the error was due to context cancellation (graceful stop)
if ctx.Err() == context.Canceled {
slog.Info("Recording stopped via cancellation.")
// Proceed to process the partially downloaded file
if errors.Is(err, context.Canceled) {
slog.Info("Recording stopped via cancellation")
} else {
slog.Error("Failed to download stream", "error", err)
// No need to keep the temp file if download failed unexpectedly
if err := os.Remove(tempFilePath); err != nil {
slog.Error("Failed to remove temporary file after download error", "path", tempFilePath, "error", err)
}
tempFilePath = "" // Prevent deferred cleanup from trying again
return
}
}
// If no bytes were written (e.g., stream closed immediately or cancelled before data), discard.
// Skip empty recordings
if bytesWritten == 0 {
slog.Warn("No data written to temporary file, discarding.", "path", tempFilePath)
if err := os.Remove(tempFilePath); err != nil {
slog.Error("Failed to remove temporary file after no data written", "path", tempFilePath, "error", err)
}
tempFilePath = "" // Prevent deferred cleanup
slog.Warn("No data written, discarding recording")
return
}
// Process successful recording
endTime := time.Now()
duration := endTime.Sub(startTime)
// Generate final filename (e.g., based on timestamp)
finalFilename := fmt.Sprintf("recording_%s.mp3", startTime.Format("20060102_150405")) // Assuming mp3, adjust if needed
finalFilename = sanitizeFilename(finalFilename) // Ensure filename is valid
finalFilename := fmt.Sprintf("recording_%s.mp3", startTime.Format("20060102_150405"))
finalFilename = sanitizeFilename(finalFilename)
finalPath := filepath.Join(r.recordingsPath, finalFilename)
// Move temporary file to final location
if err := os.Rename(tempFilePath, finalPath); err != nil {
slog.Error("Failed to move temporary file to final location", "temp", tempFilePath, "final", finalPath, "error", err)
if err := os.Remove(tempFilePath); err != nil {
slog.Error("Failed to remove temporary file after move error", "path", tempFilePath, "error", err)
}
tempFilePath = "" // Prevent deferred cleanup
slog.Error("Failed to move recording to final location", "error", err)
return
}
tempFilePath = "" // File moved, clear path to prevent deferred cleanup
tempFilePath = "" // Prevent cleanup in defer
slog.Info("Recording saved", "path", finalPath, "size", bytesWritten, "duration", duration)
// Generate GUID for this recording using the new metadata-based approach
guid := hash.GenerateGUID(r.streamName, startTime, finalFilename)
// Add record to database
_, err = r.db.AddRecordedFile(finalFilename, guid, bytesWritten, duration, startTime)
if err != nil {
slog.Error("Failed to add recording to database", "filename", finalFilename, "guid", guid, "error", err)
// Consider how to handle this - maybe retry later? For now, just log.
if _, err = r.db.AddRecordedFile(finalFilename, guid, bytesWritten, duration, startTime); err != nil {
slog.Error("Failed to add recording to database", "error", err)
}
}
// downloadStream handles the HTTP GET request and writes the body to the writer.
func (r *Recorder) downloadStream(ctx context.Context, streamURL string, writer io.Writer) (int64, error) {
req, err := http.NewRequestWithContext(ctx, "GET", streamURL, nil)
req, err := http.NewRequestWithContext(ctx, http.MethodGet, streamURL, nil)
if err != nil {
return 0, fmt.Errorf("failed to create request: %w", err)
}
@@ -199,9 +162,8 @@ func (r *Recorder) downloadStream(ctx context.Context, streamURL string, writer
resp, err := r.client.Do(req)
if err != nil {
// Check if context was cancelled
if ctx.Err() == context.Canceled {
return 0, ctx.Err()
if errors.Is(err, context.Canceled) {
return 0, err
}
return 0, fmt.Errorf("failed to connect to stream: %w", err)
}
@@ -215,31 +177,31 @@ func (r *Recorder) downloadStream(ctx context.Context, streamURL string, writer
return 0, fmt.Errorf("unexpected status code: %s", resp.Status)
}
slog.Debug("Connected to stream, starting download", "url", streamURL)
// Copy the response body to the writer (temp file)
// io.Copy will block until EOF or an error (including context cancellation via the request)
slog.Debug("Connected to stream, downloading", "url", streamURL)
bytesWritten, err := io.Copy(writer, resp.Body)
if err != nil {
// Check if the error is due to context cancellation
if ctx.Err() == context.Canceled {
slog.Info("Stream download cancelled.")
return bytesWritten, ctx.Err() // Return context error
}
// Check for common stream disconnection errors (might need refinement)
if err == io.ErrUnexpectedEOF || strings.Contains(err.Error(), "connection reset by peer") || strings.Contains(err.Error(), "broken pipe") {
slog.Info("Stream disconnected gracefully (EOF or reset)", "bytes_written", bytesWritten)
return bytesWritten, nil // Treat as normal end of stream
}
return bytesWritten, fmt.Errorf("failed during stream copy: %w", err)
if errors.Is(err, context.Canceled) {
slog.Info("Stream download cancelled")
return bytesWritten, ctx.Err()
}
slog.Info("Stream download finished (EOF)", "bytes_written", bytesWritten)
// Handle common stream disconnections gracefully
if errors.Is(err, io.ErrUnexpectedEOF) ||
strings.Contains(err.Error(), "connection reset by peer") ||
strings.Contains(err.Error(), "broken pipe") {
slog.Info("Stream disconnected normally", "bytesWritten", bytesWritten)
return bytesWritten, nil
}
return bytesWritten, fmt.Errorf("failed during stream copy: %w", err)
}
slog.Info("Stream download finished normally", "bytesWritten", bytesWritten)
return bytesWritten, nil
}
// sanitizeFilename removes potentially problematic characters from filenames.
func sanitizeFilename(filename string) string {
// Replace common problematic characters
replacer := strings.NewReplacer(
" ", "_",
"/", "-",
@@ -253,12 +215,5 @@ func sanitizeFilename(filename string) string {
"|", "-",
)
cleaned := replacer.Replace(filename)
// Trim leading/trailing underscores/hyphens/dots
cleaned = strings.Trim(cleaned, "_-. ")
// Limit length if necessary (though less common on modern filesystems)
// const maxLen = 200
// if len(cleaned) > maxLen {
// cleaned = cleaned[:maxLen]
// }
return cleaned
return strings.Trim(cleaned, "_-. ")
}

View File

@@ -4,6 +4,7 @@ import (
"fmt"
"log/slog"
"net/url"
"strings"
"time"
"github.com/gorilla/feeds"
@@ -11,24 +12,25 @@ import (
"github.com/kemko/icecast-ripper/internal/database"
)
// Generator creates RSS feeds.
// Generator creates RSS feeds
type Generator struct {
fileStore *database.FileStore
feedBaseURL string // Base URL for links in the feed (e.g., http://server.com/recordings/)
recordingsPath string // Local path to recordings (needed for file info, maybe not directly used in feed)
feedBaseURL string
recordingsPath string
feedTitle string
feedDesc string
}
// New creates a new RSS Generator instance.
// New creates a new RSS Generator instance
func New(fileStore *database.FileStore, cfg *config.Config, title, description string) *Generator {
// Ensure the base URL for recordings ends with a slash
baseURL := cfg.RSSFeedURL // This should be the URL base for *serving* files
baseURL := cfg.RSSFeedURL
if baseURL == "" {
slog.Warn("RSS_FEED_URL not set, RSS links might be incomplete. Using placeholder.")
baseURL = "http://localhost:8080/recordings/" // Placeholder
slog.Warn("RSS_FEED_URL not set, using default")
baseURL = "http://localhost:8080/recordings/"
}
if baseURL[len(baseURL)-1:] != "/" {
// Ensure base URL ends with a slash
if !strings.HasSuffix(baseURL, "/") {
baseURL += "/"
}
@@ -41,46 +43,50 @@ func New(fileStore *database.FileStore, cfg *config.Config, title, description s
}
}
// GenerateFeed fetches recordings and produces the RSS feed XML as a byte slice.
// GenerateFeed produces the RSS feed XML as a byte slice
func (g *Generator) GenerateFeed(maxItems int) ([]byte, error) {
slog.Debug("Generating RSS feed", "maxItems", maxItems)
recordings, err := g.fileStore.GetRecordedFiles(maxItems)
if err != nil {
return nil, fmt.Errorf("failed to get recorded files for RSS feed: %w", err)
return nil, fmt.Errorf("failed to get recorded files: %w", err)
}
now := time.Now()
feed := &feeds.Feed{
Title: g.feedTitle,
Link: &feeds.Link{Href: g.feedBaseURL},
Description: g.feedDesc,
Created: now,
Created: time.Now(),
}
feed.Items = make([]*feeds.Item, 0, len(recordings))
for _, rec := range recordings {
fileURL, err := url.JoinPath(g.feedBaseURL, rec.Filename)
if err != nil {
slog.Error("Failed to create file URL for RSS item", "filename", rec.Filename, "error", err)
continue // Skip this item if URL creation fails
slog.Error("Failed to create file URL", "filename", rec.Filename, "error", err)
continue
}
item := &feeds.Item{
Title: fmt.Sprintf("Recording %s", rec.RecordedAt.Format("2006-01-02 15:04")),
Link: &feeds.Link{Href: fileURL},
Description: fmt.Sprintf("Icecast stream recording from %s. Duration: %s.", rec.RecordedAt.Format(time.RFC1123), rec.Duration.String()),
Description: fmt.Sprintf("Icecast stream recording from %s. Duration: %s",
rec.RecordedAt.Format(time.RFC1123), rec.Duration.String()),
Created: rec.RecordedAt,
Id: rec.Hash,
Enclosure: &feeds.Enclosure{Url: fileURL, Length: rec.FileSize, Type: "audio/mpeg"},
Enclosure: &feeds.Enclosure{
Url: fileURL,
Length: fmt.Sprintf("%d", rec.FileSize), // Convert int64 to string
Type: "audio/mpeg",
},
}
feed.Items = append(feed.Items, item)
}
// Create RSS 2.0 feed
rssFeed, err := feed.ToRss()
if err != nil {
return nil, fmt.Errorf("failed to generate RSS feed: %w", err)
}
slog.Debug("RSS feed generated successfully", "item_count", len(feed.Items))
slog.Debug("RSS feed generated", "itemCount", len(feed.Items))
return []byte(rssFeed), nil
}

View File

@@ -10,18 +10,16 @@ import (
"github.com/kemko/icecast-ripper/internal/streamchecker"
)
// Scheduler manages the periodic checking of the stream and triggers recordings.
type Scheduler struct {
interval time.Duration
checker *streamchecker.Checker
recorder *recorder.Recorder
stopChan chan struct{} // Channel to signal stopping the scheduler
stopOnce sync.Once // Ensures Stop is only called once
wg sync.WaitGroup // Waits for the run loop to finish
parentContext context.Context // Base context for recordings
stopChan chan struct{}
stopOnce sync.Once
wg sync.WaitGroup
parentContext context.Context
}
// New creates a new Scheduler instance.
func New(interval time.Duration, checker *streamchecker.Checker, recorder *recorder.Recorder) *Scheduler {
return &Scheduler{
interval: interval,
@@ -31,8 +29,6 @@ func New(interval time.Duration, checker *streamchecker.Checker, recorder *recor
}
}
// Start begins the scheduling loop.
// It takes a parent context which will be used for recordings.
func (s *Scheduler) Start(ctx context.Context) {
slog.Info("Starting scheduler", "interval", s.interval.String())
s.parentContext = ctx
@@ -40,73 +36,56 @@ func (s *Scheduler) Start(ctx context.Context) {
go s.run()
}
// Stop signals the scheduling loop to terminate gracefully.
func (s *Scheduler) Stop() {
s.stopOnce.Do(func() {
slog.Info("Stopping scheduler...")
close(s.stopChan) // Signal the run loop to stop
// If a recording is in progress, stopping the scheduler doesn't automatically stop it.
// The recorder needs to be stopped separately if immediate termination is desired.
// s.recorder.StopRecording() // Uncomment if scheduler stop should also stop recording
s.wg.Wait() // Wait for the run loop to exit
close(s.stopChan)
s.wg.Wait()
slog.Info("Scheduler stopped.")
})
}
// run is the main loop for the scheduler.
func (s *Scheduler) run() {
defer s.wg.Done()
ticker := time.NewTicker(s.interval)
defer ticker.Stop()
for {
// Initial check immediately on start, then wait for ticker
// Initial check immediately on start
s.checkAndRecord()
for {
select {
case <-ticker.C:
// Check periodically based on the interval
s.checkAndRecord()
case <-s.stopChan:
slog.Info("Scheduler run loop exiting.")
return // Exit loop if stop signal received
return
case <-s.parentContext.Done():
slog.Info("Scheduler parent context cancelled, stopping.")
// Ensure Stop is called to clean up resources if not already stopped
s.Stop()
slog.Info("Parent context cancelled, stopping scheduler.")
return
}
}
}
// checkAndRecord performs the stream check and starts recording if necessary.
func (s *Scheduler) checkAndRecord() {
// Do not check if a recording is already in progress
if s.recorder.IsRecording() {
slog.Debug("Skipping check, recording in progress.")
slog.Debug("Recording in progress, skipping stream check.")
return
}
slog.Debug("Scheduler checking stream status...")
slog.Debug("Checking stream status")
isLive, err := s.checker.IsLive()
if err != nil {
// Log the error but continue; maybe a temporary network issue
slog.Warn("Error checking stream status", "error", err)
return
}
if isLive {
slog.Info("Stream is live, attempting to start recording.")
// Use the parent context provided during Start for the recording
err := s.recorder.StartRecording(s.parentContext, s.checker.GetStreamURL())
if err != nil {
// This likely means a recording was *just* started by another check, which is fine.
slog.Warn("Failed to start recording (might already be in progress)", "error", err)
} else {
slog.Info("Recording started successfully by scheduler.")
// Recording started, the check loop will skip until it finishes
slog.Info("Stream is live, starting recording")
if err := s.recorder.StartRecording(s.parentContext, s.checker.GetStreamURL()); err != nil {
slog.Warn("Failed to start recording", "error", err)
}
} else {
slog.Debug("Stream is not live, waiting for next interval.")
slog.Debug("Stream is not live")
}
}

View File

@@ -12,41 +12,38 @@ import (
"github.com/kemko/icecast-ripper/internal/rss"
)
// Server handles HTTP requests for the RSS feed and recordings.
// Server handles HTTP requests for the RSS feed and recordings
type Server struct {
server *http.Server
rssGenerator *rss.Generator
recordingsPath string
serverAddress string
}
// New creates a new HTTP server instance.
// New creates a new HTTP server instance
func New(cfg *config.Config, rssGenerator *rss.Generator) *Server {
mux := http.NewServeMux()
s := &Server{
rssGenerator: rssGenerator,
recordingsPath: cfg.RecordingsPath,
serverAddress: cfg.ServerAddress,
}
// Handler for the RSS feed
mux.HandleFunc("GET /rss", s.handleRSS)
// Handler for serving static recording files
// Ensure recordingsPath is absolute or relative to the working directory
// Get absolute path for recordings directory
absRecordingsPath, err := filepath.Abs(cfg.RecordingsPath)
if err != nil {
slog.Error("Failed to get absolute path for recordings directory", "path", cfg.RecordingsPath, "error", err)
// Decide how to handle this - maybe panic or return an error from New?
// For now, log and continue, file serving might fail.
slog.Error("Failed to get absolute path for recordings directory", "error", err)
absRecordingsPath = cfg.RecordingsPath // Fallback to original path
}
s := &Server{
rssGenerator: rssGenerator,
recordingsPath: absRecordingsPath,
}
// Set up routes
mux.HandleFunc("GET /rss", s.handleRSS)
// Serve static recordings
slog.Info("Serving recordings from", "path", absRecordingsPath)
fileServer := http.FileServer(http.Dir(absRecordingsPath))
// The path must end with a trailing slash for FileServer to work correctly with subdirectories
mux.Handle("GET /recordings/", http.StripPrefix("/recordings/", fileServer))
// Configure server with sensible timeouts
s.server = &http.Server{
Addr: cfg.ServerAddress,
Handler: mux,
@@ -58,14 +55,11 @@ func New(cfg *config.Config, rssGenerator *rss.Generator) *Server {
return s
}
// handleRSS generates and serves the RSS feed.
// handleRSS generates and serves the RSS feed
func (s *Server) handleRSS(w http.ResponseWriter, r *http.Request) {
slog.Debug("Received request for RSS feed")
// TODO: Consider adding caching headers (ETag, Last-Modified)
// based on the latest recording timestamp or a hash of the feed content.
slog.Debug("Serving RSS feed")
// Generate feed (limit to a reasonable number, e.g., 50 latest items)
maxItems := 50
const maxItems = 50
feedBytes, err := s.rssGenerator.GenerateFeed(maxItems)
if err != nil {
slog.Error("Failed to generate RSS feed", "error", err)
@@ -75,35 +69,27 @@ func (s *Server) handleRSS(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "application/rss+xml; charset=utf-8")
w.WriteHeader(http.StatusOK)
_, err = w.Write(feedBytes)
if err != nil {
slog.Error("Failed to write RSS feed response", "error", err)
if _, err = w.Write(feedBytes); err != nil {
slog.Error("Failed to write RSS response", "error", err)
}
}
// Start begins listening for HTTP requests.
// Start begins listening for HTTP requests
func (s *Server) Start() error {
slog.Info("Starting HTTP server", "address", s.serverAddress)
// Run the server in a separate goroutine so it doesn't block.
slog.Info("Starting HTTP server", "address", s.server.Addr)
go func() {
if err := s.server.ListenAndServe(); err != nil && !errors.Is(err, http.ErrServerClosed) {
slog.Error("HTTP server ListenAndServe error", "error", err)
// Consider signaling failure to the main application thread if needed
slog.Error("HTTP server error", "error", err)
}
}()
return nil // Return immediately
}
// Stop gracefully shuts down the HTTP server.
func (s *Server) Stop(ctx context.Context) error {
slog.Info("Stopping HTTP server...")
shutdownCtx, cancel := context.WithTimeout(ctx, 15*time.Second) // Give 15 seconds to shutdown gracefully
defer cancel()
if err := s.server.Shutdown(shutdownCtx); err != nil {
slog.Error("HTTP server graceful shutdown failed", "error", err)
return err
}
slog.Info("HTTP server stopped.")
return nil
}
// Stop gracefully shuts down the HTTP server
func (s *Server) Stop(ctx context.Context) error {
slog.Info("Stopping HTTP server")
return s.server.Shutdown(ctx)
}

View File

@@ -7,42 +7,34 @@ import (
"time"
)
// Checker checks if an Icecast stream is live.
type Checker struct {
streamURL string
client *http.Client
}
// New creates a new Checker instance.
func New(streamURL string) *Checker {
return &Checker{
streamURL: streamURL,
client: &http.Client{
Timeout: 10 * time.Second, // Sensible timeout for checking a stream
Timeout: 10 * time.Second,
},
}
}
// IsLive checks if the stream URL is currently broadcasting.
// It performs a GET request and checks for a 200 OK status.
// Note: Some Icecast servers might behave differently; this might need adjustment.
func (c *Checker) IsLive() (bool, error) {
slog.Debug("Checking stream status", "url", c.streamURL)
req, err := http.NewRequest("GET", c.streamURL, nil)
req, err := http.NewRequest(http.MethodGet, c.streamURL, nil)
if err != nil {
return false, fmt.Errorf("failed to create request for stream check: %w", err)
return false, fmt.Errorf("failed to create request: %w", err)
}
// Set a user agent; some servers might require it.
req.Header.Set("User-Agent", "icecast-ripper/1.0")
// Icy-Metadata header can sometimes force the server to respond even if the stream is idle,
// but we want to know if it's *actually* streaming audio data.
// req.Header.Set("Icy-Metadata", "1")
resp, err := c.client.Do(req)
if err != nil {
// Network errors (DNS lookup failure, connection refused) usually mean not live.
slog.Warn("Failed to connect to stream URL during check", "url", c.streamURL, "error", err)
return false, nil // Treat connection errors as 'not live'
slog.Debug("Connection to stream failed, considering not live", "error", err)
return false, nil // Connection failures mean the stream is not available
}
defer func() {
if err := resp.Body.Close(); err != nil {
@@ -50,17 +42,15 @@ func (c *Checker) IsLive() (bool, error) {
}
}()
// A 200 OK status generally indicates the stream is live and broadcasting.
if resp.StatusCode == http.StatusOK {
slog.Info("Stream is live", "url", c.streamURL, "status", resp.StatusCode)
slog.Info("Stream is live", "status", resp.StatusCode)
return true, nil
}
slog.Info("Stream is not live", "url", c.streamURL, "status", resp.StatusCode)
slog.Debug("Stream is not live", "status", resp.StatusCode)
return false, nil
}
// GetStreamURL returns the URL being checked.
func (c *Checker) GetStreamURL() string {
return c.streamURL
}