From a3038b43abefb51d52a3c2cac357824993ea716b Mon Sep 17 00:00:00 2001 From: Dmitrii Andreev Date: Mon, 7 Apr 2025 12:36:56 +0300 Subject: [PATCH] refactoring, simplification --- cmd/icecast-ripper/main.go | 72 +++++------ go.mod | 3 +- go.sum | 2 - internal/config/config.go | 63 ++++------ internal/database/database.go | 53 +++------ internal/hash/hash.go | 28 ++--- internal/logger/logger.go | 37 +++--- internal/recorder/recorder.go | 151 +++++++++--------------- internal/rss/rss.go | 46 ++++---- internal/scheduler/scheduler.go | 55 +++------ internal/server/server.go | 76 +++++------- internal/streamchecker/streamchecker.go | 28 ++--- 12 files changed, 234 insertions(+), 380 deletions(-) diff --git a/cmd/icecast-ripper/main.go b/cmd/icecast-ripper/main.go index 8249abb..d06afef 100644 --- a/cmd/icecast-ripper/main.go +++ b/cmd/icecast-ripper/main.go @@ -21,37 +21,33 @@ import ( "github.com/kemko/icecast-ripper/internal/streamchecker" ) +const version = "0.2.0" + func main() { // Load configuration cfg, err := config.LoadConfig() if err != nil { - _, err2 := fmt.Fprintf(os.Stderr, "Error loading configuration: %v\n", err) - if err2 != nil { - return - } + fmt.Fprintf(os.Stderr, "Error loading configuration: %v\n", err) os.Exit(1) } // Setup logger logger.Setup(cfg.LogLevel) + slog.Info("Starting icecast-ripper", "version", version) - slog.Info("Starting icecast-ripper", "version", "0.1.0") // Updated version - - // Validate essential config + // Validate essential configuration if cfg.StreamURL == "" { - slog.Error("Configuration error: STREAM_URL must be set.") + slog.Error("Configuration error: STREAM_URL must be set") os.Exit(1) } - // Extract stream name from URL for use in GUID generation + // Extract stream name for GUID generation streamName := extractStreamName(cfg.StreamURL) - slog.Info("Using stream name for GUID generation", "name", streamName) + slog.Info("Using stream name for identification", "name", streamName) - // Initialize file store (replacing SQLite database) - // If DatabasePath is provided, use it for JSON persistence + // Initialize file store storePath := "" if cfg.DatabasePath != "" { - // Change extension from .db to .json for the file store storePath = changeExtension(cfg.DatabasePath, ".json") } @@ -60,89 +56,81 @@ func main() { slog.Error("Failed to initialize file store", "error", err) os.Exit(1) } + // Properly handle Close() error defer func() { if err := fileStore.Close(); err != nil { - slog.Error("Failed to close file store", "error", err) + slog.Error("Error closing file store", "error", err) } }() - // Create a context that cancels on shutdown signal + // Create main context that cancels on shutdown signal ctx, stop := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM) - defer stop() // Ensure context is cancelled even if not explicitly stopped later + defer stop() - // --- Initialize components --- - slog.Info("Initializing components...") + // Initialize components streamChecker := streamchecker.New(cfg.StreamURL) recorderInstance, err := recorder.New(cfg.TempPath, cfg.RecordingsPath, fileStore, streamName) if err != nil { slog.Error("Failed to initialize recorder", "error", err) os.Exit(1) } + rssGenerator := rss.New(fileStore, cfg, "Icecast Recordings", "Recordings from stream: "+cfg.StreamURL) schedulerInstance := scheduler.New(cfg.CheckInterval, streamChecker, recorderInstance) httpServer := server.New(cfg, rssGenerator) - // --- Start components --- + // Start services slog.Info("Starting services...") - schedulerInstance.Start(ctx) // Pass the main context to the scheduler + + // Start the scheduler which will check for streams and record them + schedulerInstance.Start(ctx) + + // Start the HTTP server for RSS feed if err := httpServer.Start(); err != nil { slog.Error("Failed to start HTTP server", "error", err) - stop() // Trigger shutdown if server fails to start + stop() os.Exit(1) } slog.Info("Application started successfully. Press Ctrl+C to shut down.") - // Wait for context cancellation (due to signal) + // Wait for termination signal <-ctx.Done() - slog.Info("Shutting down application...") - // --- Graceful shutdown --- - // Create a shutdown context with a timeout + // Graceful shutdown shutdownCtx, shutdownCancel := context.WithTimeout(context.Background(), 30*time.Second) defer shutdownCancel() - // Stop scheduler first (prevents new recordings) - schedulerInstance.Stop() // This waits for the scheduler loop to exit + // First stop the scheduler to prevent new recordings + schedulerInstance.Stop() - // Stop the recorder (ensures any ongoing recording is finalized or cancelled) - // Note: Stopping the scheduler doesn't automatically stop an *ongoing* recording. - // We need to explicitly stop the recorder if we want it to terminate immediately. - recorderInstance.StopRecording() // Signal any active recording to stop + // Then stop any ongoing recording + recorderInstance.StopRecording() - // Stop the HTTP server + // Finally, stop the HTTP server if err := httpServer.Stop(shutdownCtx); err != nil { slog.Warn("HTTP server shutdown error", "error", err) } - // File store is closed by the deferred function call - slog.Info("Application shut down gracefully") } -// extractStreamName extracts a meaningful stream name from the URL. -// This is used as part of the GUID generation. +// extractStreamName extracts a meaningful identifier from the URL func extractStreamName(streamURL string) string { - // Try to parse the URL parsedURL, err := url.Parse(streamURL) if err != nil { - // If we can't parse it, just use the hostname part or the whole URL return streamURL } - // Use the host as the base name streamName := parsedURL.Hostname() - // If there's a path that appears to be a stream identifier, use that too if parsedURL.Path != "" && parsedURL.Path != "/" { - // Remove leading slash and use just the first part of the path if it has multiple segments path := parsedURL.Path if path[0] == '/' { path = path[1:] } - // Take just the first segment of the path pathSegments := filepath.SplitList(path) if len(pathSegments) > 0 { streamName += "_" + pathSegments[0] diff --git a/go.mod b/go.mod index 98321b1..b7a0be8 100644 --- a/go.mod +++ b/go.mod @@ -3,14 +3,13 @@ module github.com/kemko/icecast-ripper go 1.24.2 require ( - github.com/mattn/go-sqlite3 v1.14.27 + github.com/gorilla/feeds v1.2.0 github.com/spf13/viper v1.20.1 ) require ( github.com/fsnotify/fsnotify v1.9.0 // indirect github.com/go-viper/mapstructure/v2 v2.2.1 // indirect - github.com/gorilla/feeds v1.2.0 // indirect github.com/pelletier/go-toml/v2 v2.2.3 // indirect github.com/sagikazarmark/locafero v0.9.0 // indirect github.com/sourcegraph/conc v0.3.0 // indirect diff --git a/go.sum b/go.sum index 396b503..0af7a4c 100644 --- a/go.sum +++ b/go.sum @@ -14,8 +14,6 @@ github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE= github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk= github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= -github.com/mattn/go-sqlite3 v1.14.27 h1:drZCnuvf37yPfs95E5jd9s3XhdVWLal+6BOK6qrv6IU= -github.com/mattn/go-sqlite3 v1.14.27/go.mod h1:Uh1q+B4BYcTPb+yiD3kU8Ct7aC0hY9fxUwlHK0RXw+Y= github.com/pelletier/go-toml/v2 v2.2.3 h1:YmeHyLY8mFWbdkNWwpr+qIL2bEqT0o95WSdkNHvL12M= github.com/pelletier/go-toml/v2 v2.2.3/go.mod h1:MfCQTFTvCcUyyvvwm1+G6H/jORL20Xlb6rzQu9GuUkc= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= diff --git a/internal/config/config.go b/internal/config/config.go index 21a3188..eba2fc4 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -1,14 +1,12 @@ package config import ( - "strings" "time" "github.com/spf13/viper" ) -// Config stores all configuration for the application. -// The values are read by viper from environment variables or a config file. +// Config stores all configuration for the application type Config struct { StreamURL string `mapstructure:"STREAM_URL"` CheckInterval time.Duration `mapstructure:"CHECK_INTERVAL"` @@ -16,54 +14,33 @@ type Config struct { TempPath string `mapstructure:"TEMP_PATH"` DatabasePath string `mapstructure:"DATABASE_PATH"` ServerAddress string `mapstructure:"SERVER_ADDRESS"` - RSSFeedURL string `mapstructure:"RSS_FEED_URL"` // Base URL for the RSS feed links + RSSFeedURL string `mapstructure:"RSS_FEED_URL"` LogLevel string `mapstructure:"LOG_LEVEL"` } -// LoadConfig reads configuration from environment variables. +// LoadConfig reads configuration from environment variables func LoadConfig() (*Config, error) { - viper.AutomaticEnv() - viper.SetEnvKeyReplacer(strings.NewReplacer(".", "_")) + v := viper.New() + v.AutomaticEnv() // Set default values - viper.SetDefault("STREAM_URL", "") - viper.SetDefault("CHECK_INTERVAL", "1m") - viper.SetDefault("RECORDINGS_PATH", "./recordings") - viper.SetDefault("TEMP_PATH", "./temp") - viper.SetDefault("DATABASE_PATH", "./icecast-ripper.db") - viper.SetDefault("SERVER_ADDRESS", ":8080") - viper.SetDefault("RSS_FEED_URL", "http://localhost:8080/rss") // Example default - viper.SetDefault("LOG_LEVEL", "info") + defaults := map[string]interface{}{ + "STREAM_URL": "", + "CHECK_INTERVAL": "1m", + "RECORDINGS_PATH": "./recordings", + "TEMP_PATH": "./temp", + "DATABASE_PATH": "./icecast-ripper.db", + "SERVER_ADDRESS": ":8080", + "RSS_FEED_URL": "http://localhost:8080/rss", + "LOG_LEVEL": "info", + } + + for key, value := range defaults { + v.SetDefault(key, value) + } var config Config - // Bind environment variables to struct fields - if err := viper.BindEnv("STREAM_URL"); err != nil { - return nil, err - } - if err := viper.BindEnv("CHECK_INTERVAL"); err != nil { - return nil, err - } - if err := viper.BindEnv("RECORDINGS_PATH"); err != nil { - return nil, err - } - if err := viper.BindEnv("TEMP_PATH"); err != nil { - return nil, err - } - if err := viper.BindEnv("DATABASE_PATH"); err != nil { - return nil, err - } - if err := viper.BindEnv("SERVER_ADDRESS"); err != nil { - return nil, err - } - if err := viper.BindEnv("RSS_FEED_URL"); err != nil { - return nil, err - } - if err := viper.BindEnv("LOG_LEVEL"); err != nil { - return nil, err - } - - // Unmarshal the config - if err := viper.Unmarshal(&config); err != nil { + if err := v.Unmarshal(&config); err != nil { return nil, err } diff --git a/internal/database/database.go b/internal/database/database.go index 42516cb..329bea6 100644 --- a/internal/database/database.go +++ b/internal/database/database.go @@ -11,25 +11,24 @@ import ( "time" ) -// FileStore represents an in-memory store with optional file persistence -type FileStore struct { - files map[string]*RecordedFile // Map of filename -> file data - storePath string // Path to persistence file, if empty no persistence - mu sync.RWMutex // Mutex for thread safety -} - -// RecordedFile represents information about a recorded file. +// RecordedFile represents information about a recorded file type RecordedFile struct { ID int64 `json:"id"` Filename string `json:"filename"` - Hash string `json:"hash"` // GUID for the file + Hash string `json:"hash"` FileSize int64 `json:"fileSize"` Duration time.Duration `json:"duration"` RecordedAt time.Time `json:"recordedAt"` } -// InitDB initializes a new FileStore. -// If dataSourceName is provided, it will be used as the path to persist the store. +// FileStore represents an in-memory store with optional file persistence +type FileStore struct { + files map[string]*RecordedFile + storePath string + mu sync.RWMutex +} + +// InitDB initializes a new FileStore func InitDB(dataSourceName string) (*FileStore, error) { slog.Info("Initializing file store", "path", dataSourceName) @@ -38,27 +37,21 @@ func InitDB(dataSourceName string) (*FileStore, error) { storePath: dataSourceName, } - // If a data source is provided, try to load existing data if dataSourceName != "" { - if err := fs.loadFromFile(); err != nil { - // If file doesn't exist, that's fine - we'll create it when we save - if !os.IsNotExist(err) { - slog.Error("Failed to load file store data", "error", err) - } + if err := fs.loadFromFile(); err != nil && !os.IsNotExist(err) { + slog.Error("Failed to load file store data", "error", err) } } - slog.Info("File store initialized successfully") return fs, nil } -// loadFromFile loads the file store data from the persistence file. func (fs *FileStore) loadFromFile() error { fs.mu.Lock() defer fs.mu.Unlock() if fs.storePath == "" { - return nil // No persistence path set + return nil } data, err := os.ReadFile(fs.storePath) @@ -71,7 +64,6 @@ func (fs *FileStore) loadFromFile() error { return fmt.Errorf("failed to parse file store data: %w", err) } - // Reset the map and repopulate it fs.files = make(map[string]*RecordedFile, len(files)) for _, file := range files { fs.files[file.Filename] = file @@ -81,22 +73,19 @@ func (fs *FileStore) loadFromFile() error { return nil } -// saveToFile persists the file store data to disk. func (fs *FileStore) saveToFile() error { fs.mu.RLock() defer fs.mu.RUnlock() if fs.storePath == "" { - return nil // No persistence path set + return nil } - // Create directory if it doesn't exist dir := filepath.Dir(fs.storePath) if err := os.MkdirAll(dir, 0755); err != nil { return fmt.Errorf("failed to create directory for file store: %w", err) } - // Convert map to slice files := make([]*RecordedFile, 0, len(fs.files)) for _, file := range fs.files { files = append(files, file) @@ -115,15 +104,13 @@ func (fs *FileStore) saveToFile() error { return nil } -// AddRecordedFile adds a file to the store. +// AddRecordedFile adds a file to the store func (fs *FileStore) AddRecordedFile(filename, hash string, fileSize int64, duration time.Duration, recordedAt time.Time) (int64, error) { fs.mu.Lock() defer fs.mu.Unlock() - // Generate a unique ID (just use timestamp as we don't need real DB IDs) id := time.Now().UnixNano() - // Store the file fs.files[filename] = &RecordedFile{ ID: id, Filename: filename, @@ -133,32 +120,27 @@ func (fs *FileStore) AddRecordedFile(filename, hash string, fileSize int64, dura RecordedAt: recordedAt, } - // Persist changes if err := fs.saveToFile(); err != nil { slog.Error("Failed to save file store data", "error", err) } - slog.Debug("Added recorded file to store", "id", id, "filename", filename, "hash", hash) return id, nil } -// GetRecordedFiles retrieves all recorded files, ordered by recording date descending. +// GetRecordedFiles retrieves all recorded files, ordered by recording date descending func (fs *FileStore) GetRecordedFiles(limit int) ([]RecordedFile, error) { fs.mu.RLock() defer fs.mu.RUnlock() - // Convert map to slice for sorting files := make([]RecordedFile, 0, len(fs.files)) for _, file := range fs.files { files = append(files, *file) } - // Sort by recorded_at descending sort.Slice(files, func(i, j int) bool { return files[i].RecordedAt.After(files[j].RecordedAt) }) - // Apply limit if provided if limit > 0 && limit < len(files) { files = files[:limit] } @@ -166,8 +148,7 @@ func (fs *FileStore) GetRecordedFiles(limit int) ([]RecordedFile, error) { return files, nil } -// Close closes the file store and ensures all data is persisted. +// Close ensures all data is persisted func (fs *FileStore) Close() error { - slog.Info("Closing file store") return fs.saveToFile() } diff --git a/internal/hash/hash.go b/internal/hash/hash.go index c46606e..eebbd61 100644 --- a/internal/hash/hash.go +++ b/internal/hash/hash.go @@ -10,46 +10,36 @@ import ( "time" ) -// GenerateGUID generates a unique identifier based on file metadata without reading file content. -// It uses recording metadata (stream name, recording start time, filename) to create a deterministic GUID. +// GenerateGUID creates a unique identifier based on metadata +// Uses stream name, recording time and filename to create a deterministic GUID func GenerateGUID(streamName string, recordedAt time.Time, filePath string) string { filename := filepath.Base(filePath) - // Create a unique string combining stream name, recording time (rounded to seconds), - // and filename for uniqueness + // Create a unique string combining metadata input := fmt.Sprintf("%s:%s:%s", streamName, recordedAt.UTC().Format(time.RFC3339), filename) - slog.Debug("Generating GUID", "input", input) - - // Hash the combined string to create a GUID + // Hash the combined string hasher := sha256.New() hasher.Write([]byte(input)) guid := hex.EncodeToString(hasher.Sum(nil)) - slog.Debug("Generated GUID", "file", filePath, "guid", guid) + slog.Debug("Generated GUID", "input", input, "guid", guid) return guid } -// GenerateFileHash remains for backward compatibility but is now built on GenerateGUID -// This uses file metadata from the path and mtime instead of reading file content +// GenerateFileHash is maintained for backwards compatibility +// Uses file metadata instead of content func GenerateFileHash(filePath string) (string, error) { fileInfo, err := os.Stat(filePath) if err != nil { return "", fmt.Errorf("failed to stat file %s: %w", filePath, err) } - // Extract stream name from directory structure if possible, otherwise use parent dir - dir := filepath.Dir(filePath) - streamName := filepath.Base(dir) - - // Use file modification time as a proxy for recording time + streamName := filepath.Base(filepath.Dir(filePath)) recordedAt := fileInfo.ModTime() - guid := GenerateGUID(streamName, recordedAt, filePath) - - slog.Debug("Generated hash using metadata", "file", filePath, "hash", guid) - return guid, nil + return GenerateGUID(streamName, recordedAt, filePath), nil } diff --git a/internal/logger/logger.go b/internal/logger/logger.go index a640f34..77c633b 100644 --- a/internal/logger/logger.go +++ b/internal/logger/logger.go @@ -6,25 +6,30 @@ import ( "strings" ) -// Setup initializes the structured logger. +// Setup initializes the structured logger with the specified log level func Setup(logLevel string) { - var level slog.Level - switch strings.ToLower(logLevel) { - case "debug": - level = slog.LevelDebug - case "warn": - level = slog.LevelWarn - case "error": - level = slog.LevelError - default: - level = slog.LevelInfo - } + level := parseLogLevel(logLevel) - opts := &slog.HandlerOptions{ + handler := slog.NewJSONHandler(os.Stdout, &slog.HandlerOptions{ Level: level, - } - handler := slog.NewJSONHandler(os.Stdout, opts) // Or slog.NewTextHandler - slog.SetDefault(slog.New(handler)) + }) + + logger := slog.New(handler) + slog.SetDefault(logger) slog.Info("Logger initialized", "level", level.String()) } + +// parseLogLevel converts a string log level to slog.Level +func parseLogLevel(level string) slog.Level { + switch strings.ToLower(level) { + case "debug": + return slog.LevelDebug + case "warn", "warning": + return slog.LevelWarn + case "error": + return slog.LevelError + default: + return slog.LevelInfo + } +} diff --git a/internal/recorder/recorder.go b/internal/recorder/recorder.go index 4f46b82..0f68a07 100644 --- a/internal/recorder/recorder.go +++ b/internal/recorder/recorder.go @@ -2,6 +2,7 @@ package recorder import ( "context" + "errors" "fmt" "io" "log/slog" @@ -16,26 +17,22 @@ import ( "github.com/kemko/icecast-ripper/internal/hash" ) -// Recorder handles downloading the stream and saving recordings. type Recorder struct { tempPath string recordingsPath string db *database.FileStore client *http.Client - mu sync.Mutex // Protects access to isRecording + mu sync.Mutex isRecording bool - cancelFunc context.CancelFunc // To stop the current recording - streamName string // Name of the stream being recorded + cancelFunc context.CancelFunc + streamName string } -// New creates a new Recorder instance. func New(tempPath, recordingsPath string, db *database.FileStore, streamName string) (*Recorder, error) { - // Ensure temp and recordings directories exist - if err := os.MkdirAll(tempPath, 0755); err != nil { - return nil, fmt.Errorf("failed to create temp directory %s: %w", tempPath, err) - } - if err := os.MkdirAll(recordingsPath, 0755); err != nil { - return nil, fmt.Errorf("failed to create recordings directory %s: %w", recordingsPath, err) + for _, dir := range []string{tempPath, recordingsPath} { + if err := os.MkdirAll(dir, 0755); err != nil { + return nil, fmt.Errorf("failed to create directory %s: %w", dir, err) + } } return &Recorder{ @@ -43,76 +40,65 @@ func New(tempPath, recordingsPath string, db *database.FileStore, streamName str recordingsPath: recordingsPath, db: db, streamName: streamName, - client: &http.Client{ - // Use a longer timeout for downloading the stream - Timeout: 0, // No timeout for the download itself, rely on context cancellation - }, + client: &http.Client{Timeout: 0}, // No timeout, rely on context cancellation }, nil } -// IsRecording returns true if a recording is currently in progress. func (r *Recorder) IsRecording() bool { r.mu.Lock() defer r.mu.Unlock() return r.isRecording } -// StartRecording starts downloading the stream to a temporary file. -// It runs asynchronously and returns immediately. func (r *Recorder) StartRecording(ctx context.Context, streamURL string) error { r.mu.Lock() + defer r.mu.Unlock() + if r.isRecording { - r.mu.Unlock() - return fmt.Errorf("recording already in progress") + return errors.New("recording already in progress") } - // Create a new context that can be cancelled to stop this specific recording + recordingCtx, cancel := context.WithCancel(ctx) r.cancelFunc = cancel r.isRecording = true - r.mu.Unlock() slog.Info("Starting recording", "url", streamURL) - - go r.recordStream(recordingCtx, streamURL) // Run in a goroutine + go r.recordStream(recordingCtx, streamURL) return nil } -// StopRecording stops the current recording if one is in progress. func (r *Recorder) StopRecording() { r.mu.Lock() defer r.mu.Unlock() + if r.isRecording && r.cancelFunc != nil { slog.Info("Stopping current recording") - r.cancelFunc() // Signal the recording goroutine to stop - // The lock ensures that isRecording is set to false *after* the goroutine finishes + r.cancelFunc() } } -// recordStream performs the actual download and processing. func (r *Recorder) recordStream(ctx context.Context, streamURL string) { startTime := time.Now() var tempFilePath string - // Defer setting isRecording to false and cleaning up cancelFunc defer func() { r.mu.Lock() r.isRecording = false r.cancelFunc = nil r.mu.Unlock() slog.Info("Recording process finished") - // Clean up temp file if it still exists (e.g., due to early error) + if tempFilePath != "" { if _, err := os.Stat(tempFilePath); err == nil { - slog.Warn("Removing leftover temporary file", "path", tempFilePath) + slog.Warn("Cleaning up temporary file", "path", tempFilePath) if err := os.Remove(tempFilePath); err != nil { - slog.Error("Failed to remove temporary file", "path", tempFilePath, "error", err) + slog.Error("Failed to remove temporary file", "error", err) } } } }() - // Create temporary file tempFile, err := os.CreateTemp(r.tempPath, "recording-*.tmp") if err != nil { slog.Error("Failed to create temporary file", "error", err) @@ -121,77 +107,54 @@ func (r *Recorder) recordStream(ctx context.Context, streamURL string) { tempFilePath = tempFile.Name() slog.Debug("Created temporary file", "path", tempFilePath) - // Download stream bytesWritten, err := r.downloadStream(ctx, streamURL, tempFile) - // Close the file explicitly *before* hashing and moving + if closeErr := tempFile.Close(); closeErr != nil { - slog.Error("Failed to close temporary file", "path", tempFilePath, "error", closeErr) - // If download also failed, prioritize that error + slog.Error("Failed to close temporary file", "error", closeErr) if err == nil { - err = closeErr // Report closing error if download was ok + err = closeErr } } + // Handle context cancellation or download errors if err != nil { - // Check if the error was due to context cancellation (graceful stop) - if ctx.Err() == context.Canceled { - slog.Info("Recording stopped via cancellation.") - // Proceed to process the partially downloaded file + if errors.Is(err, context.Canceled) { + slog.Info("Recording stopped via cancellation") } else { slog.Error("Failed to download stream", "error", err) - // No need to keep the temp file if download failed unexpectedly - if err := os.Remove(tempFilePath); err != nil { - slog.Error("Failed to remove temporary file after download error", "path", tempFilePath, "error", err) - } - tempFilePath = "" // Prevent deferred cleanup from trying again return } } - // If no bytes were written (e.g., stream closed immediately or cancelled before data), discard. + // Skip empty recordings if bytesWritten == 0 { - slog.Warn("No data written to temporary file, discarding.", "path", tempFilePath) - if err := os.Remove(tempFilePath); err != nil { - slog.Error("Failed to remove temporary file after no data written", "path", tempFilePath, "error", err) - } - tempFilePath = "" // Prevent deferred cleanup + slog.Warn("No data written, discarding recording") return } + // Process successful recording endTime := time.Now() duration := endTime.Sub(startTime) - - // Generate final filename (e.g., based on timestamp) - finalFilename := fmt.Sprintf("recording_%s.mp3", startTime.Format("20060102_150405")) // Assuming mp3, adjust if needed - finalFilename = sanitizeFilename(finalFilename) // Ensure filename is valid + finalFilename := fmt.Sprintf("recording_%s.mp3", startTime.Format("20060102_150405")) + finalFilename = sanitizeFilename(finalFilename) finalPath := filepath.Join(r.recordingsPath, finalFilename) - // Move temporary file to final location if err := os.Rename(tempFilePath, finalPath); err != nil { - slog.Error("Failed to move temporary file to final location", "temp", tempFilePath, "final", finalPath, "error", err) - if err := os.Remove(tempFilePath); err != nil { - slog.Error("Failed to remove temporary file after move error", "path", tempFilePath, "error", err) - } - tempFilePath = "" // Prevent deferred cleanup + slog.Error("Failed to move recording to final location", "error", err) return } - tempFilePath = "" // File moved, clear path to prevent deferred cleanup + + tempFilePath = "" // Prevent cleanup in defer slog.Info("Recording saved", "path", finalPath, "size", bytesWritten, "duration", duration) - // Generate GUID for this recording using the new metadata-based approach guid := hash.GenerateGUID(r.streamName, startTime, finalFilename) - - // Add record to database - _, err = r.db.AddRecordedFile(finalFilename, guid, bytesWritten, duration, startTime) - if err != nil { - slog.Error("Failed to add recording to database", "filename", finalFilename, "guid", guid, "error", err) - // Consider how to handle this - maybe retry later? For now, just log. + if _, err = r.db.AddRecordedFile(finalFilename, guid, bytesWritten, duration, startTime); err != nil { + slog.Error("Failed to add recording to database", "error", err) } } -// downloadStream handles the HTTP GET request and writes the body to the writer. func (r *Recorder) downloadStream(ctx context.Context, streamURL string, writer io.Writer) (int64, error) { - req, err := http.NewRequestWithContext(ctx, "GET", streamURL, nil) + req, err := http.NewRequestWithContext(ctx, http.MethodGet, streamURL, nil) if err != nil { return 0, fmt.Errorf("failed to create request: %w", err) } @@ -199,9 +162,8 @@ func (r *Recorder) downloadStream(ctx context.Context, streamURL string, writer resp, err := r.client.Do(req) if err != nil { - // Check if context was cancelled - if ctx.Err() == context.Canceled { - return 0, ctx.Err() + if errors.Is(err, context.Canceled) { + return 0, err } return 0, fmt.Errorf("failed to connect to stream: %w", err) } @@ -215,31 +177,31 @@ func (r *Recorder) downloadStream(ctx context.Context, streamURL string, writer return 0, fmt.Errorf("unexpected status code: %s", resp.Status) } - slog.Debug("Connected to stream, starting download", "url", streamURL) - // Copy the response body to the writer (temp file) - // io.Copy will block until EOF or an error (including context cancellation via the request) + slog.Debug("Connected to stream, downloading", "url", streamURL) bytesWritten, err := io.Copy(writer, resp.Body) + if err != nil { - // Check if the error is due to context cancellation - if ctx.Err() == context.Canceled { - slog.Info("Stream download cancelled.") - return bytesWritten, ctx.Err() // Return context error + if errors.Is(err, context.Canceled) { + slog.Info("Stream download cancelled") + return bytesWritten, ctx.Err() } - // Check for common stream disconnection errors (might need refinement) - if err == io.ErrUnexpectedEOF || strings.Contains(err.Error(), "connection reset by peer") || strings.Contains(err.Error(), "broken pipe") { - slog.Info("Stream disconnected gracefully (EOF or reset)", "bytes_written", bytesWritten) - return bytesWritten, nil // Treat as normal end of stream + + // Handle common stream disconnections gracefully + if errors.Is(err, io.ErrUnexpectedEOF) || + strings.Contains(err.Error(), "connection reset by peer") || + strings.Contains(err.Error(), "broken pipe") { + slog.Info("Stream disconnected normally", "bytesWritten", bytesWritten) + return bytesWritten, nil } + return bytesWritten, fmt.Errorf("failed during stream copy: %w", err) } - slog.Info("Stream download finished (EOF)", "bytes_written", bytesWritten) + slog.Info("Stream download finished normally", "bytesWritten", bytesWritten) return bytesWritten, nil } -// sanitizeFilename removes potentially problematic characters from filenames. func sanitizeFilename(filename string) string { - // Replace common problematic characters replacer := strings.NewReplacer( " ", "_", "/", "-", @@ -253,12 +215,5 @@ func sanitizeFilename(filename string) string { "|", "-", ) cleaned := replacer.Replace(filename) - // Trim leading/trailing underscores/hyphens/dots - cleaned = strings.Trim(cleaned, "_-. ") - // Limit length if necessary (though less common on modern filesystems) - // const maxLen = 200 - // if len(cleaned) > maxLen { - // cleaned = cleaned[:maxLen] - // } - return cleaned + return strings.Trim(cleaned, "_-. ") } diff --git a/internal/rss/rss.go b/internal/rss/rss.go index 35617c7..137945d 100644 --- a/internal/rss/rss.go +++ b/internal/rss/rss.go @@ -4,6 +4,7 @@ import ( "fmt" "log/slog" "net/url" + "strings" "time" "github.com/gorilla/feeds" @@ -11,24 +12,25 @@ import ( "github.com/kemko/icecast-ripper/internal/database" ) -// Generator creates RSS feeds. +// Generator creates RSS feeds type Generator struct { fileStore *database.FileStore - feedBaseURL string // Base URL for links in the feed (e.g., http://server.com/recordings/) - recordingsPath string // Local path to recordings (needed for file info, maybe not directly used in feed) + feedBaseURL string + recordingsPath string feedTitle string feedDesc string } -// New creates a new RSS Generator instance. +// New creates a new RSS Generator instance func New(fileStore *database.FileStore, cfg *config.Config, title, description string) *Generator { - // Ensure the base URL for recordings ends with a slash - baseURL := cfg.RSSFeedURL // This should be the URL base for *serving* files + baseURL := cfg.RSSFeedURL if baseURL == "" { - slog.Warn("RSS_FEED_URL not set, RSS links might be incomplete. Using placeholder.") - baseURL = "http://localhost:8080/recordings/" // Placeholder + slog.Warn("RSS_FEED_URL not set, using default") + baseURL = "http://localhost:8080/recordings/" } - if baseURL[len(baseURL)-1:] != "/" { + + // Ensure base URL ends with a slash + if !strings.HasSuffix(baseURL, "/") { baseURL += "/" } @@ -41,46 +43,50 @@ func New(fileStore *database.FileStore, cfg *config.Config, title, description s } } -// GenerateFeed fetches recordings and produces the RSS feed XML as a byte slice. +// GenerateFeed produces the RSS feed XML as a byte slice func (g *Generator) GenerateFeed(maxItems int) ([]byte, error) { - slog.Debug("Generating RSS feed", "maxItems", maxItems) recordings, err := g.fileStore.GetRecordedFiles(maxItems) if err != nil { - return nil, fmt.Errorf("failed to get recorded files for RSS feed: %w", err) + return nil, fmt.Errorf("failed to get recorded files: %w", err) } - now := time.Now() feed := &feeds.Feed{ Title: g.feedTitle, Link: &feeds.Link{Href: g.feedBaseURL}, Description: g.feedDesc, - Created: now, + Created: time.Now(), } + feed.Items = make([]*feeds.Item, 0, len(recordings)) + for _, rec := range recordings { fileURL, err := url.JoinPath(g.feedBaseURL, rec.Filename) if err != nil { - slog.Error("Failed to create file URL for RSS item", "filename", rec.Filename, "error", err) - continue // Skip this item if URL creation fails + slog.Error("Failed to create file URL", "filename", rec.Filename, "error", err) + continue } item := &feeds.Item{ Title: fmt.Sprintf("Recording %s", rec.RecordedAt.Format("2006-01-02 15:04")), Link: &feeds.Link{Href: fileURL}, - Description: fmt.Sprintf("Icecast stream recording from %s. Duration: %s.", rec.RecordedAt.Format(time.RFC1123), rec.Duration.String()), + Description: fmt.Sprintf("Icecast stream recording from %s. Duration: %s", + rec.RecordedAt.Format(time.RFC1123), rec.Duration.String()), Created: rec.RecordedAt, Id: rec.Hash, - Enclosure: &feeds.Enclosure{Url: fileURL, Length: rec.FileSize, Type: "audio/mpeg"}, + Enclosure: &feeds.Enclosure{ + Url: fileURL, + Length: fmt.Sprintf("%d", rec.FileSize), // Convert int64 to string + Type: "audio/mpeg", + }, } feed.Items = append(feed.Items, item) } - // Create RSS 2.0 feed rssFeed, err := feed.ToRss() if err != nil { return nil, fmt.Errorf("failed to generate RSS feed: %w", err) } - slog.Debug("RSS feed generated successfully", "item_count", len(feed.Items)) + slog.Debug("RSS feed generated", "itemCount", len(feed.Items)) return []byte(rssFeed), nil } diff --git a/internal/scheduler/scheduler.go b/internal/scheduler/scheduler.go index 7500059..7d3fa30 100644 --- a/internal/scheduler/scheduler.go +++ b/internal/scheduler/scheduler.go @@ -10,18 +10,16 @@ import ( "github.com/kemko/icecast-ripper/internal/streamchecker" ) -// Scheduler manages the periodic checking of the stream and triggers recordings. type Scheduler struct { interval time.Duration checker *streamchecker.Checker recorder *recorder.Recorder - stopChan chan struct{} // Channel to signal stopping the scheduler - stopOnce sync.Once // Ensures Stop is only called once - wg sync.WaitGroup // Waits for the run loop to finish - parentContext context.Context // Base context for recordings + stopChan chan struct{} + stopOnce sync.Once + wg sync.WaitGroup + parentContext context.Context } -// New creates a new Scheduler instance. func New(interval time.Duration, checker *streamchecker.Checker, recorder *recorder.Recorder) *Scheduler { return &Scheduler{ interval: interval, @@ -31,8 +29,6 @@ func New(interval time.Duration, checker *streamchecker.Checker, recorder *recor } } -// Start begins the scheduling loop. -// It takes a parent context which will be used for recordings. func (s *Scheduler) Start(ctx context.Context) { slog.Info("Starting scheduler", "interval", s.interval.String()) s.parentContext = ctx @@ -40,73 +36,56 @@ func (s *Scheduler) Start(ctx context.Context) { go s.run() } -// Stop signals the scheduling loop to terminate gracefully. func (s *Scheduler) Stop() { s.stopOnce.Do(func() { slog.Info("Stopping scheduler...") - close(s.stopChan) // Signal the run loop to stop - // If a recording is in progress, stopping the scheduler doesn't automatically stop it. - // The recorder needs to be stopped separately if immediate termination is desired. - // s.recorder.StopRecording() // Uncomment if scheduler stop should also stop recording - s.wg.Wait() // Wait for the run loop to exit + close(s.stopChan) + s.wg.Wait() slog.Info("Scheduler stopped.") }) } -// run is the main loop for the scheduler. func (s *Scheduler) run() { defer s.wg.Done() ticker := time.NewTicker(s.interval) defer ticker.Stop() - for { - // Initial check immediately on start, then wait for ticker - s.checkAndRecord() + // Initial check immediately on start + s.checkAndRecord() + for { select { case <-ticker.C: - // Check periodically based on the interval s.checkAndRecord() case <-s.stopChan: slog.Info("Scheduler run loop exiting.") - return // Exit loop if stop signal received + return case <-s.parentContext.Done(): - slog.Info("Scheduler parent context cancelled, stopping.") - // Ensure Stop is called to clean up resources if not already stopped - s.Stop() + slog.Info("Parent context cancelled, stopping scheduler.") return } } } -// checkAndRecord performs the stream check and starts recording if necessary. func (s *Scheduler) checkAndRecord() { - // Do not check if a recording is already in progress if s.recorder.IsRecording() { - slog.Debug("Skipping check, recording in progress.") + slog.Debug("Recording in progress, skipping stream check.") return } - slog.Debug("Scheduler checking stream status...") + slog.Debug("Checking stream status") isLive, err := s.checker.IsLive() if err != nil { - // Log the error but continue; maybe a temporary network issue slog.Warn("Error checking stream status", "error", err) return } if isLive { - slog.Info("Stream is live, attempting to start recording.") - // Use the parent context provided during Start for the recording - err := s.recorder.StartRecording(s.parentContext, s.checker.GetStreamURL()) - if err != nil { - // This likely means a recording was *just* started by another check, which is fine. - slog.Warn("Failed to start recording (might already be in progress)", "error", err) - } else { - slog.Info("Recording started successfully by scheduler.") - // Recording started, the check loop will skip until it finishes + slog.Info("Stream is live, starting recording") + if err := s.recorder.StartRecording(s.parentContext, s.checker.GetStreamURL()); err != nil { + slog.Warn("Failed to start recording", "error", err) } } else { - slog.Debug("Stream is not live, waiting for next interval.") + slog.Debug("Stream is not live") } } diff --git a/internal/server/server.go b/internal/server/server.go index af05f0d..c3a66fd 100644 --- a/internal/server/server.go +++ b/internal/server/server.go @@ -12,41 +12,38 @@ import ( "github.com/kemko/icecast-ripper/internal/rss" ) -// Server handles HTTP requests for the RSS feed and recordings. +// Server handles HTTP requests for the RSS feed and recordings type Server struct { server *http.Server rssGenerator *rss.Generator recordingsPath string - serverAddress string } -// New creates a new HTTP server instance. +// New creates a new HTTP server instance func New(cfg *config.Config, rssGenerator *rss.Generator) *Server { mux := http.NewServeMux() - s := &Server{ - rssGenerator: rssGenerator, - recordingsPath: cfg.RecordingsPath, - serverAddress: cfg.ServerAddress, - } - - // Handler for the RSS feed - mux.HandleFunc("GET /rss", s.handleRSS) - - // Handler for serving static recording files - // Ensure recordingsPath is absolute or relative to the working directory + // Get absolute path for recordings directory absRecordingsPath, err := filepath.Abs(cfg.RecordingsPath) if err != nil { - slog.Error("Failed to get absolute path for recordings directory", "path", cfg.RecordingsPath, "error", err) - // Decide how to handle this - maybe panic or return an error from New? - // For now, log and continue, file serving might fail. + slog.Error("Failed to get absolute path for recordings directory", "error", err) absRecordingsPath = cfg.RecordingsPath // Fallback to original path } + + s := &Server{ + rssGenerator: rssGenerator, + recordingsPath: absRecordingsPath, + } + + // Set up routes + mux.HandleFunc("GET /rss", s.handleRSS) + + // Serve static recordings slog.Info("Serving recordings from", "path", absRecordingsPath) fileServer := http.FileServer(http.Dir(absRecordingsPath)) - // The path must end with a trailing slash for FileServer to work correctly with subdirectories mux.Handle("GET /recordings/", http.StripPrefix("/recordings/", fileServer)) + // Configure server with sensible timeouts s.server = &http.Server{ Addr: cfg.ServerAddress, Handler: mux, @@ -58,14 +55,11 @@ func New(cfg *config.Config, rssGenerator *rss.Generator) *Server { return s } -// handleRSS generates and serves the RSS feed. +// handleRSS generates and serves the RSS feed func (s *Server) handleRSS(w http.ResponseWriter, r *http.Request) { - slog.Debug("Received request for RSS feed") - // TODO: Consider adding caching headers (ETag, Last-Modified) - // based on the latest recording timestamp or a hash of the feed content. + slog.Debug("Serving RSS feed") - // Generate feed (limit to a reasonable number, e.g., 50 latest items) - maxItems := 50 + const maxItems = 50 feedBytes, err := s.rssGenerator.GenerateFeed(maxItems) if err != nil { slog.Error("Failed to generate RSS feed", "error", err) @@ -75,35 +69,27 @@ func (s *Server) handleRSS(w http.ResponseWriter, r *http.Request) { w.Header().Set("Content-Type", "application/rss+xml; charset=utf-8") w.WriteHeader(http.StatusOK) - _, err = w.Write(feedBytes) - if err != nil { - slog.Error("Failed to write RSS feed response", "error", err) + if _, err = w.Write(feedBytes); err != nil { + slog.Error("Failed to write RSS response", "error", err) } } -// Start begins listening for HTTP requests. +// Start begins listening for HTTP requests func (s *Server) Start() error { - slog.Info("Starting HTTP server", "address", s.serverAddress) - // Run the server in a separate goroutine so it doesn't block. + slog.Info("Starting HTTP server", "address", s.server.Addr) + go func() { if err := s.server.ListenAndServe(); err != nil && !errors.Is(err, http.ErrServerClosed) { - slog.Error("HTTP server ListenAndServe error", "error", err) - // Consider signaling failure to the main application thread if needed + slog.Error("HTTP server error", "error", err) } }() - return nil // Return immediately -} -// Stop gracefully shuts down the HTTP server. -func (s *Server) Stop(ctx context.Context) error { - slog.Info("Stopping HTTP server...") - shutdownCtx, cancel := context.WithTimeout(ctx, 15*time.Second) // Give 15 seconds to shutdown gracefully - defer cancel() - - if err := s.server.Shutdown(shutdownCtx); err != nil { - slog.Error("HTTP server graceful shutdown failed", "error", err) - return err - } - slog.Info("HTTP server stopped.") return nil } + +// Stop gracefully shuts down the HTTP server +func (s *Server) Stop(ctx context.Context) error { + slog.Info("Stopping HTTP server") + + return s.server.Shutdown(ctx) +} diff --git a/internal/streamchecker/streamchecker.go b/internal/streamchecker/streamchecker.go index 709324b..659f0f4 100644 --- a/internal/streamchecker/streamchecker.go +++ b/internal/streamchecker/streamchecker.go @@ -7,42 +7,34 @@ import ( "time" ) -// Checker checks if an Icecast stream is live. type Checker struct { streamURL string client *http.Client } -// New creates a new Checker instance. func New(streamURL string) *Checker { return &Checker{ streamURL: streamURL, client: &http.Client{ - Timeout: 10 * time.Second, // Sensible timeout for checking a stream + Timeout: 10 * time.Second, }, } } -// IsLive checks if the stream URL is currently broadcasting. -// It performs a GET request and checks for a 200 OK status. -// Note: Some Icecast servers might behave differently; this might need adjustment. func (c *Checker) IsLive() (bool, error) { slog.Debug("Checking stream status", "url", c.streamURL) - req, err := http.NewRequest("GET", c.streamURL, nil) + + req, err := http.NewRequest(http.MethodGet, c.streamURL, nil) if err != nil { - return false, fmt.Errorf("failed to create request for stream check: %w", err) + return false, fmt.Errorf("failed to create request: %w", err) } - // Set a user agent; some servers might require it. + req.Header.Set("User-Agent", "icecast-ripper/1.0") - // Icy-Metadata header can sometimes force the server to respond even if the stream is idle, - // but we want to know if it's *actually* streaming audio data. - // req.Header.Set("Icy-Metadata", "1") resp, err := c.client.Do(req) if err != nil { - // Network errors (DNS lookup failure, connection refused) usually mean not live. - slog.Warn("Failed to connect to stream URL during check", "url", c.streamURL, "error", err) - return false, nil // Treat connection errors as 'not live' + slog.Debug("Connection to stream failed, considering not live", "error", err) + return false, nil // Connection failures mean the stream is not available } defer func() { if err := resp.Body.Close(); err != nil { @@ -50,17 +42,15 @@ func (c *Checker) IsLive() (bool, error) { } }() - // A 200 OK status generally indicates the stream is live and broadcasting. if resp.StatusCode == http.StatusOK { - slog.Info("Stream is live", "url", c.streamURL, "status", resp.StatusCode) + slog.Info("Stream is live", "status", resp.StatusCode) return true, nil } - slog.Info("Stream is not live", "url", c.streamURL, "status", resp.StatusCode) + slog.Debug("Stream is not live", "status", resp.StatusCode) return false, nil } -// GetStreamURL returns the URL being checked. func (c *Checker) GetStreamURL() string { return c.streamURL }