mirror of
https://github.com/kemko/icecast-ripper.git
synced 2026-01-01 15:55:42 +03:00
Rework with go
This commit is contained in:
77
internal/config/config.go
Normal file
77
internal/config/config.go
Normal file
@@ -0,0 +1,77 @@
|
||||
package config
|
||||
|
||||
import (
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/spf13/viper"
|
||||
)
|
||||
|
||||
// Config stores all configuration for the application.
|
||||
// The values are read by viper from environment variables or a config file.
|
||||
type Config struct {
|
||||
StreamURL string `mapstructure:"STREAM_URL"`
|
||||
CheckInterval time.Duration `mapstructure:"CHECK_INTERVAL"`
|
||||
RecordingsPath string `mapstructure:"RECORDINGS_PATH"`
|
||||
TempPath string `mapstructure:"TEMP_PATH"`
|
||||
DatabasePath string `mapstructure:"DATABASE_PATH"`
|
||||
ServerAddress string `mapstructure:"SERVER_ADDRESS"`
|
||||
RSSFeedURL string `mapstructure:"RSS_FEED_URL"` // Base URL for the RSS feed links
|
||||
LogLevel string `mapstructure:"LOG_LEVEL"`
|
||||
}
|
||||
|
||||
// LoadConfig reads configuration from environment variables.
|
||||
func LoadConfig() (*Config, error) {
|
||||
viper.AutomaticEnv()
|
||||
viper.SetEnvKeyReplacer(strings.NewReplacer(".", "_"))
|
||||
|
||||
// Set default values
|
||||
viper.SetDefault("STREAM_URL", "")
|
||||
viper.SetDefault("CHECK_INTERVAL", "1m")
|
||||
viper.SetDefault("RECORDINGS_PATH", "./recordings")
|
||||
viper.SetDefault("TEMP_PATH", "./temp")
|
||||
viper.SetDefault("DATABASE_PATH", "./icecast-ripper.db")
|
||||
viper.SetDefault("SERVER_ADDRESS", ":8080")
|
||||
viper.SetDefault("RSS_FEED_URL", "http://localhost:8080/rss") // Example default
|
||||
viper.SetDefault("LOG_LEVEL", "info")
|
||||
|
||||
var config Config
|
||||
// Bind environment variables to struct fields
|
||||
if err := viper.BindEnv("STREAM_URL"); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if err := viper.BindEnv("CHECK_INTERVAL"); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if err := viper.BindEnv("RECORDINGS_PATH"); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if err := viper.BindEnv("TEMP_PATH"); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if err := viper.BindEnv("DATABASE_PATH"); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if err := viper.BindEnv("SERVER_ADDRESS"); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if err := viper.BindEnv("RSS_FEED_URL"); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if err := viper.BindEnv("LOG_LEVEL"); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// Unmarshal the config
|
||||
if err := viper.Unmarshal(&config); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// Ensure required fields are set
|
||||
if config.StreamURL == "" {
|
||||
// Consider returning an error or logging a fatal error if essential config is missing
|
||||
// return nil, errors.New("STREAM_URL environment variable is required")
|
||||
}
|
||||
|
||||
return &config, nil
|
||||
}
|
||||
114
internal/database/database.go
Normal file
114
internal/database/database.go
Normal file
@@ -0,0 +1,114 @@
|
||||
package database
|
||||
|
||||
import (
|
||||
"database/sql"
|
||||
"fmt"
|
||||
"log/slog"
|
||||
"time"
|
||||
|
||||
_ "github.com/mattn/go-sqlite3" // SQLite driver
|
||||
)
|
||||
|
||||
// DB represents the database connection.
|
||||
type DB struct {
|
||||
*sql.DB
|
||||
}
|
||||
|
||||
// RecordedFile represents a row in the recorded_files table.
|
||||
type RecordedFile struct {
|
||||
ID int64
|
||||
Filename string
|
||||
Hash string
|
||||
FileSize int64 // Size in bytes
|
||||
Duration time.Duration // Duration of the recording
|
||||
RecordedAt time.Time
|
||||
}
|
||||
|
||||
// InitDB initializes the SQLite database connection and creates tables if they don't exist.
|
||||
func InitDB(dataSourceName string) (*DB, error) {
|
||||
slog.Info("Initializing database connection", "path", dataSourceName)
|
||||
db, err := sql.Open("sqlite3", dataSourceName)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to open database: %w", err)
|
||||
}
|
||||
|
||||
if err = db.Ping(); err != nil {
|
||||
return nil, fmt.Errorf("failed to connect to database: %w", err)
|
||||
}
|
||||
|
||||
// Create table if not exists
|
||||
query := `
|
||||
CREATE TABLE IF NOT EXISTS recorded_files (
|
||||
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
||||
filename TEXT NOT NULL UNIQUE,
|
||||
hash TEXT NOT NULL UNIQUE,
|
||||
filesize INTEGER NOT NULL,
|
||||
duration INTEGER NOT NULL, -- Store duration in seconds or milliseconds
|
||||
recorded_at DATETIME NOT NULL
|
||||
);
|
||||
CREATE INDEX IF NOT EXISTS idx_recorded_at ON recorded_files (recorded_at);
|
||||
`
|
||||
if _, err = db.Exec(query); err != nil {
|
||||
return nil, fmt.Errorf("failed to create table: %w", err)
|
||||
}
|
||||
|
||||
slog.Info("Database initialized successfully")
|
||||
return &DB{db}, nil
|
||||
}
|
||||
|
||||
// AddRecordedFile inserts a new record into the database.
|
||||
func (db *DB) AddRecordedFile(filename, hash string, fileSize int64, duration time.Duration, recordedAt time.Time) (int64, error) {
|
||||
result, err := db.Exec(
|
||||
"INSERT INTO recorded_files (filename, hash, filesize, duration, recorded_at) VALUES (?, ?, ?, ?, ?)",
|
||||
filename, hash, fileSize, int64(duration.Seconds()), recordedAt, // Store duration as seconds
|
||||
)
|
||||
if err != nil {
|
||||
return 0, fmt.Errorf("failed to insert recorded file: %w", err)
|
||||
}
|
||||
|
||||
id, err := result.LastInsertId()
|
||||
if err != nil {
|
||||
return 0, fmt.Errorf("failed to get last insert ID: %w", err)
|
||||
}
|
||||
slog.Debug("Added recorded file to database", "id", id, "filename", filename, "hash", hash)
|
||||
return id, nil
|
||||
}
|
||||
|
||||
// GetRecordedFiles retrieves all recorded files, ordered by recording date descending.
|
||||
func (db *DB) GetRecordedFiles(limit int) ([]RecordedFile, error) {
|
||||
query := "SELECT id, filename, hash, filesize, duration, recorded_at FROM recorded_files ORDER BY recorded_at DESC"
|
||||
args := []interface{}{}
|
||||
if limit > 0 {
|
||||
query += " LIMIT ?"
|
||||
args = append(args, limit)
|
||||
}
|
||||
|
||||
rows, err := db.Query(query, args...)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to query recorded files: %w", err)
|
||||
}
|
||||
defer rows.Close()
|
||||
|
||||
var files []RecordedFile
|
||||
for rows.Next() {
|
||||
var rf RecordedFile
|
||||
var durationSeconds int64
|
||||
if err := rows.Scan(&rf.ID, &rf.Filename, &rf.Hash, &rf.FileSize, &durationSeconds, &rf.RecordedAt); err != nil {
|
||||
return nil, fmt.Errorf("failed to scan recorded file row: %w", err)
|
||||
}
|
||||
rf.Duration = time.Duration(durationSeconds) * time.Second
|
||||
files = append(files, rf)
|
||||
}
|
||||
|
||||
if err = rows.Err(); err != nil {
|
||||
return nil, fmt.Errorf("error iterating over recorded file rows: %w", err)
|
||||
}
|
||||
|
||||
return files, nil
|
||||
}
|
||||
|
||||
// Close closes the database connection.
|
||||
func (db *DB) Close() error {
|
||||
slog.Info("Closing database connection")
|
||||
return db.DB.Close()
|
||||
}
|
||||
55
internal/hash/hash.go
Normal file
55
internal/hash/hash.go
Normal file
@@ -0,0 +1,55 @@
|
||||
package hash
|
||||
|
||||
import (
|
||||
"crypto/sha256"
|
||||
"encoding/hex"
|
||||
"fmt"
|
||||
"io"
|
||||
"log/slog"
|
||||
"os"
|
||||
"path/filepath"
|
||||
)
|
||||
|
||||
const (
|
||||
// hashChunkSize defines how many bytes of the file to read for hashing.
|
||||
// 1MB should be sufficient for uniqueness in most cases while being fast.
|
||||
hashChunkSize = 1 * 1024 * 1024 // 1 MB
|
||||
)
|
||||
|
||||
// GenerateFileHash generates a SHA256 hash based on the filename and the first `hashChunkSize` bytes of the file content.
|
||||
// This avoids reading the entire file for large recordings.
|
||||
func GenerateFileHash(filePath string) (string, error) {
|
||||
filename := filepath.Base(filePath)
|
||||
slog.Debug("Generating hash", "file", filePath, "chunkSize", hashChunkSize)
|
||||
|
||||
file, err := os.Open(filePath)
|
||||
if err != nil {
|
||||
return "", fmt.Errorf("failed to open file %s for hashing: %w", filePath, err)
|
||||
}
|
||||
defer file.Close()
|
||||
|
||||
hasher := sha256.New()
|
||||
|
||||
// Include filename in the hash
|
||||
if _, err := hasher.Write([]byte(filename)); err != nil {
|
||||
return "", fmt.Errorf("failed to write filename to hasher: %w", err)
|
||||
}
|
||||
|
||||
// Read only the first part of the file
|
||||
chunk := make([]byte, hashChunkSize)
|
||||
bytesRead, err := file.Read(chunk)
|
||||
if err != nil && err != io.EOF {
|
||||
return "", fmt.Errorf("failed to read file chunk %s: %w", filePath, err)
|
||||
}
|
||||
|
||||
// Hash the chunk that was read
|
||||
if _, err := hasher.Write(chunk[:bytesRead]); err != nil {
|
||||
return "", fmt.Errorf("failed to write file chunk to hasher: %w", err)
|
||||
}
|
||||
|
||||
hashBytes := hasher.Sum(nil)
|
||||
hashString := hex.EncodeToString(hashBytes)
|
||||
|
||||
slog.Debug("Generated hash", "file", filePath, "hash", hashString)
|
||||
return hashString, nil
|
||||
}
|
||||
30
internal/logger/logger.go
Normal file
30
internal/logger/logger.go
Normal file
@@ -0,0 +1,30 @@
|
||||
package logger
|
||||
|
||||
import (
|
||||
"log/slog"
|
||||
"os"
|
||||
"strings"
|
||||
)
|
||||
|
||||
// Setup initializes the structured logger.
|
||||
func Setup(logLevel string) {
|
||||
var level slog.Level
|
||||
switch strings.ToLower(logLevel) {
|
||||
case "debug":
|
||||
level = slog.LevelDebug
|
||||
case "warn":
|
||||
level = slog.LevelWarn
|
||||
case "error":
|
||||
level = slog.LevelError
|
||||
default:
|
||||
level = slog.LevelInfo
|
||||
}
|
||||
|
||||
opts := &slog.HandlerOptions{
|
||||
Level: level,
|
||||
}
|
||||
handler := slog.NewJSONHandler(os.Stdout, opts) // Or slog.NewTextHandler
|
||||
slog.SetDefault(slog.New(handler))
|
||||
|
||||
slog.Info("Logger initialized", "level", level.String())
|
||||
}
|
||||
256
internal/recorder/recorder.go
Normal file
256
internal/recorder/recorder.go
Normal file
@@ -0,0 +1,256 @@
|
||||
package recorder
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"io"
|
||||
"log/slog"
|
||||
"net/http"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/kemko/icecast-ripper/internal/database"
|
||||
"github.com/kemko/icecast-ripper/internal/hash"
|
||||
)
|
||||
|
||||
// Recorder handles downloading the stream and saving recordings.
|
||||
type Recorder struct {
|
||||
tempPath string
|
||||
recordingsPath string
|
||||
db *database.DB
|
||||
client *http.Client
|
||||
mu sync.Mutex // Protects access to isRecording
|
||||
isRecording bool
|
||||
cancelFunc context.CancelFunc // To stop the current recording
|
||||
}
|
||||
|
||||
// New creates a new Recorder instance.
|
||||
func New(tempPath, recordingsPath string, db *database.DB) (*Recorder, error) {
|
||||
// Ensure temp and recordings directories exist
|
||||
if err := os.MkdirAll(tempPath, 0755); err != nil {
|
||||
return nil, fmt.Errorf("failed to create temp directory %s: %w", tempPath, err)
|
||||
}
|
||||
if err := os.MkdirAll(recordingsPath, 0755); err != nil {
|
||||
return nil, fmt.Errorf("failed to create recordings directory %s: %w", recordingsPath, err)
|
||||
}
|
||||
|
||||
return &Recorder{
|
||||
tempPath: tempPath,
|
||||
recordingsPath: recordingsPath,
|
||||
db: db,
|
||||
client: &http.Client{
|
||||
// Use a longer timeout for downloading the stream
|
||||
Timeout: 0, // No timeout for the download itself, rely on context cancellation
|
||||
},
|
||||
}, nil
|
||||
}
|
||||
|
||||
// IsRecording returns true if a recording is currently in progress.
|
||||
func (r *Recorder) IsRecording() bool {
|
||||
r.mu.Lock()
|
||||
defer r.mu.Unlock()
|
||||
return r.isRecording
|
||||
}
|
||||
|
||||
// StartRecording starts downloading the stream to a temporary file.
|
||||
// It runs asynchronously and returns immediately.
|
||||
func (r *Recorder) StartRecording(ctx context.Context, streamURL string) error {
|
||||
r.mu.Lock()
|
||||
if r.isRecording {
|
||||
r.mu.Unlock()
|
||||
return fmt.Errorf("recording already in progress")
|
||||
}
|
||||
// Create a new context that can be cancelled to stop this specific recording
|
||||
recordingCtx, cancel := context.WithCancel(ctx)
|
||||
r.cancelFunc = cancel
|
||||
r.isRecording = true
|
||||
r.mu.Unlock()
|
||||
|
||||
slog.Info("Starting recording", "url", streamURL)
|
||||
|
||||
go r.recordStream(recordingCtx, streamURL) // Run in a goroutine
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// StopRecording stops the current recording if one is in progress.
|
||||
func (r *Recorder) StopRecording() {
|
||||
r.mu.Lock()
|
||||
defer r.mu.Unlock()
|
||||
if r.isRecording && r.cancelFunc != nil {
|
||||
slog.Info("Stopping current recording")
|
||||
r.cancelFunc() // Signal the recording goroutine to stop
|
||||
// The lock ensures that isRecording is set to false *after* the goroutine finishes
|
||||
}
|
||||
}
|
||||
|
||||
// recordStream performs the actual download and processing.
|
||||
func (r *Recorder) recordStream(ctx context.Context, streamURL string) {
|
||||
startTime := time.Now()
|
||||
var tempFilePath string
|
||||
|
||||
// Defer setting isRecording to false and cleaning up cancelFunc
|
||||
defer func() {
|
||||
r.mu.Lock()
|
||||
r.isRecording = false
|
||||
r.cancelFunc = nil
|
||||
r.mu.Unlock()
|
||||
slog.Info("Recording process finished")
|
||||
// Clean up temp file if it still exists (e.g., due to early error)
|
||||
if tempFilePath != "" {
|
||||
if _, err := os.Stat(tempFilePath); err == nil {
|
||||
slog.Warn("Removing leftover temporary file", "path", tempFilePath)
|
||||
os.Remove(tempFilePath)
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
// Create temporary file
|
||||
tempFile, err := os.CreateTemp(r.tempPath, "recording-*.tmp")
|
||||
if err != nil {
|
||||
slog.Error("Failed to create temporary file", "error", err)
|
||||
return
|
||||
}
|
||||
tempFilePath = tempFile.Name()
|
||||
slog.Debug("Created temporary file", "path", tempFilePath)
|
||||
|
||||
// Download stream
|
||||
bytesWritten, err := r.downloadStream(ctx, streamURL, tempFile)
|
||||
// Close the file explicitly *before* hashing and moving
|
||||
if closeErr := tempFile.Close(); closeErr != nil {
|
||||
slog.Error("Failed to close temporary file", "path", tempFilePath, "error", closeErr)
|
||||
// If download also failed, prioritize that error
|
||||
if err == nil {
|
||||
err = closeErr // Report closing error if download was ok
|
||||
}
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
// Check if the error was due to context cancellation (graceful stop)
|
||||
if ctx.Err() == context.Canceled {
|
||||
slog.Info("Recording stopped via cancellation.")
|
||||
// Proceed to process the partially downloaded file
|
||||
} else {
|
||||
slog.Error("Failed to download stream", "error", err)
|
||||
// No need to keep the temp file if download failed unexpectedly
|
||||
os.Remove(tempFilePath)
|
||||
tempFilePath = "" // Prevent deferred cleanup from trying again
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
// If no bytes were written (e.g., stream closed immediately or cancelled before data), discard.
|
||||
if bytesWritten == 0 {
|
||||
slog.Warn("No data written to temporary file, discarding.", "path", tempFilePath)
|
||||
os.Remove(tempFilePath)
|
||||
tempFilePath = "" // Prevent deferred cleanup
|
||||
return
|
||||
}
|
||||
|
||||
endTime := time.Now()
|
||||
duration := endTime.Sub(startTime)
|
||||
|
||||
// Generate hash
|
||||
fileHash, err := hash.GenerateFileHash(tempFilePath)
|
||||
if err != nil {
|
||||
slog.Error("Failed to generate file hash", "path", tempFilePath, "error", err)
|
||||
os.Remove(tempFilePath) // Clean up if hashing fails
|
||||
tempFilePath = "" // Prevent deferred cleanup
|
||||
return
|
||||
}
|
||||
|
||||
// Generate final filename (e.g., based on timestamp)
|
||||
finalFilename := fmt.Sprintf("recording_%s.mp3", startTime.Format("20060102_150405")) // Assuming mp3, adjust if needed
|
||||
finalFilename = sanitizeFilename(finalFilename) // Ensure filename is valid
|
||||
finalPath := filepath.Join(r.recordingsPath, finalFilename)
|
||||
|
||||
// Move temporary file to final location
|
||||
if err := os.Rename(tempFilePath, finalPath); err != nil {
|
||||
slog.Error("Failed to move temporary file to final location", "temp", tempFilePath, "final", finalPath, "error", err)
|
||||
os.Remove(tempFilePath) // Attempt cleanup of temp file
|
||||
tempFilePath = "" // Prevent deferred cleanup
|
||||
return
|
||||
}
|
||||
tempFilePath = "" // File moved, clear path to prevent deferred cleanup
|
||||
slog.Info("Recording saved", "path", finalPath, "size", bytesWritten, "duration", duration)
|
||||
|
||||
// Add record to database
|
||||
_, err = r.db.AddRecordedFile(finalFilename, fileHash, bytesWritten, duration, startTime)
|
||||
if err != nil {
|
||||
slog.Error("Failed to add recording to database", "filename", finalFilename, "hash", fileHash, "error", err)
|
||||
// Consider how to handle this - maybe retry later? For now, just log.
|
||||
}
|
||||
}
|
||||
|
||||
// downloadStream handles the HTTP GET request and writes the body to the writer.
|
||||
func (r *Recorder) downloadStream(ctx context.Context, streamURL string, writer io.Writer) (int64, error) {
|
||||
req, err := http.NewRequestWithContext(ctx, "GET", streamURL, nil)
|
||||
if err != nil {
|
||||
return 0, fmt.Errorf("failed to create request: %w", err)
|
||||
}
|
||||
req.Header.Set("User-Agent", "icecast-ripper/1.0")
|
||||
|
||||
resp, err := r.client.Do(req)
|
||||
if err != nil {
|
||||
// Check if context was cancelled
|
||||
if ctx.Err() == context.Canceled {
|
||||
return 0, ctx.Err()
|
||||
}
|
||||
return 0, fmt.Errorf("failed to connect to stream: %w", err)
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
|
||||
if resp.StatusCode != http.StatusOK {
|
||||
return 0, fmt.Errorf("unexpected status code: %s", resp.Status)
|
||||
}
|
||||
|
||||
slog.Debug("Connected to stream, starting download", "url", streamURL)
|
||||
// Copy the response body to the writer (temp file)
|
||||
// io.Copy will block until EOF or an error (including context cancellation via the request)
|
||||
bytesWritten, err := io.Copy(writer, resp.Body)
|
||||
if err != nil {
|
||||
// Check if the error is due to context cancellation
|
||||
if ctx.Err() == context.Canceled {
|
||||
slog.Info("Stream download cancelled.")
|
||||
return bytesWritten, ctx.Err() // Return context error
|
||||
}
|
||||
// Check for common stream disconnection errors (might need refinement)
|
||||
if err == io.ErrUnexpectedEOF || strings.Contains(err.Error(), "connection reset by peer") || strings.Contains(err.Error(), "broken pipe") {
|
||||
slog.Info("Stream disconnected gracefully (EOF or reset)", "bytes_written", bytesWritten)
|
||||
return bytesWritten, nil // Treat as normal end of stream
|
||||
}
|
||||
return bytesWritten, fmt.Errorf("failed during stream copy: %w", err)
|
||||
}
|
||||
|
||||
slog.Info("Stream download finished (EOF)", "bytes_written", bytesWritten)
|
||||
return bytesWritten, nil
|
||||
}
|
||||
|
||||
// sanitizeFilename removes potentially problematic characters from filenames.
|
||||
func sanitizeFilename(filename string) string {
|
||||
// Replace common problematic characters
|
||||
replacer := strings.NewReplacer(
|
||||
" ", "_",
|
||||
"/", "-",
|
||||
"\\", "-",
|
||||
":", "-",
|
||||
"*", "-",
|
||||
"?", "-",
|
||||
"\"", "'",
|
||||
"<", "-",
|
||||
">", "-",
|
||||
"|", "-",
|
||||
)
|
||||
cleaned := replacer.Replace(filename)
|
||||
// Trim leading/trailing underscores/hyphens/dots
|
||||
cleaned = strings.Trim(cleaned, "_-. ")
|
||||
// Limit length if necessary (though less common on modern filesystems)
|
||||
// const maxLen = 200
|
||||
// if len(cleaned) > maxLen {
|
||||
// cleaned = cleaned[:maxLen]
|
||||
// }
|
||||
return cleaned
|
||||
}
|
||||
142
internal/rss/rss.go
Normal file
142
internal/rss/rss.go
Normal file
@@ -0,0 +1,142 @@
|
||||
package rss
|
||||
|
||||
import (
|
||||
"encoding/xml"
|
||||
"fmt"
|
||||
"log/slog"
|
||||
"net/url"
|
||||
"time"
|
||||
|
||||
"github.com/kemko/icecast-ripper/internal/config"
|
||||
"github.com/kemko/icecast-ripper/internal/database"
|
||||
)
|
||||
|
||||
// Structs for RSS 2.0 feed generation
|
||||
// Based on https://validator.w3.org/feed/docs/rss2.html
|
||||
|
||||
type RSS struct {
|
||||
XMLName xml.Name `xml:"rss"`
|
||||
Version string `xml:"version,attr"`
|
||||
Channel Channel `xml:"channel"`
|
||||
}
|
||||
|
||||
type Channel struct {
|
||||
XMLName xml.Name `xml:"channel"`
|
||||
Title string `xml:"title"`
|
||||
Link string `xml:"link"` // Link to the website/source
|
||||
Description string `xml:"description"`
|
||||
LastBuildDate string `xml:"lastBuildDate,omitempty"` // RFC1123Z format
|
||||
Items []Item `xml:"item"`
|
||||
}
|
||||
|
||||
type Item struct {
|
||||
XMLName xml.Name `xml:"item"`
|
||||
Title string `xml:"title"`
|
||||
Link string `xml:"link"` // Link to the specific recording file
|
||||
Description string `xml:"description"` // Can include duration, size etc.
|
||||
PubDate string `xml:"pubDate"` // RFC1123Z format of recording time
|
||||
GUID GUID `xml:"guid"` // Unique identifier (using file hash)
|
||||
Enclosure Enclosure `xml:"enclosure"` // Describes the media file
|
||||
}
|
||||
|
||||
// GUID needs IsPermaLink attribute
|
||||
type GUID struct {
|
||||
XMLName xml.Name `xml:"guid"`
|
||||
IsPermaLink bool `xml:"isPermaLink,attr"`
|
||||
Value string `xml:",chardata"`
|
||||
}
|
||||
|
||||
// Enclosure describes the media file
|
||||
type Enclosure struct {
|
||||
XMLName xml.Name `xml:"enclosure"`
|
||||
URL string `xml:"url,attr"`
|
||||
Length int64 `xml:"length,attr"`
|
||||
Type string `xml:"type,attr"` // MIME type (e.g., "audio/mpeg")
|
||||
}
|
||||
|
||||
// Generator creates RSS feeds.
|
||||
type Generator struct {
|
||||
db *database.DB
|
||||
feedBaseURL string // Base URL for links in the feed (e.g., http://server.com/recordings/)
|
||||
recordingsPath string // Local path to recordings (needed for file info, maybe not directly used in feed)
|
||||
feedTitle string
|
||||
feedDesc string
|
||||
}
|
||||
|
||||
// New creates a new RSS Generator instance.
|
||||
func New(db *database.DB, cfg *config.Config, title, description string) *Generator {
|
||||
// Ensure the base URL for recordings ends with a slash
|
||||
baseURL := cfg.RSSFeedURL // This should be the URL base for *serving* files
|
||||
if baseURL == "" {
|
||||
slog.Warn("RSS_FEED_URL not set, RSS links might be incomplete. Using placeholder.")
|
||||
baseURL = "http://localhost:8080/recordings/" // Placeholder
|
||||
}
|
||||
if baseURL[len(baseURL)-1:] != "/" {
|
||||
baseURL += "/"
|
||||
}
|
||||
|
||||
return &Generator{
|
||||
db: db,
|
||||
feedBaseURL: baseURL,
|
||||
recordingsPath: cfg.RecordingsPath,
|
||||
feedTitle: title,
|
||||
feedDesc: description,
|
||||
}
|
||||
}
|
||||
|
||||
// GenerateFeed fetches recordings and produces the RSS feed XML as a byte slice.
|
||||
func (g *Generator) GenerateFeed(maxItems int) ([]byte, error) {
|
||||
slog.Debug("Generating RSS feed", "maxItems", maxItems)
|
||||
recordings, err := g.db.GetRecordedFiles(maxItems)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to get recorded files for RSS feed: %w", err)
|
||||
}
|
||||
|
||||
items := make([]Item, 0, len(recordings))
|
||||
for _, rec := range recordings {
|
||||
fileURL, err := url.JoinPath(g.feedBaseURL, rec.Filename)
|
||||
if err != nil {
|
||||
slog.Error("Failed to create file URL for RSS item", "filename", rec.Filename, "error", err)
|
||||
continue // Skip this item if URL creation fails
|
||||
}
|
||||
|
||||
item := Item{
|
||||
Title: fmt.Sprintf("Recording %s", rec.RecordedAt.Format("2006-01-02 15:04")), // Example title
|
||||
Link: fileURL,
|
||||
Description: fmt.Sprintf("Icecast stream recording from %s. Duration: %s.", rec.RecordedAt.Format(time.RFC1123), rec.Duration.String()),
|
||||
PubDate: rec.RecordedAt.Format(time.RFC1123Z), // Use RFC1123Z for pubDate
|
||||
GUID: GUID{
|
||||
IsPermaLink: false, // The hash itself is not a permalink URL
|
||||
Value: rec.Hash,
|
||||
},
|
||||
Enclosure: Enclosure{
|
||||
URL: fileURL,
|
||||
Length: rec.FileSize,
|
||||
Type: "audio/mpeg", // Assuming MP3, adjust if format varies or is detectable
|
||||
},
|
||||
}
|
||||
items = append(items, item)
|
||||
}
|
||||
|
||||
feed := RSS{
|
||||
Version: "2.0",
|
||||
Channel: Channel{
|
||||
Title: g.feedTitle,
|
||||
Link: g.feedBaseURL, // Link to the base URL or a relevant page
|
||||
Description: g.feedDesc,
|
||||
LastBuildDate: time.Now().Format(time.RFC1123Z),
|
||||
Items: items,
|
||||
},
|
||||
}
|
||||
|
||||
xmlBytes, err := xml.MarshalIndent(feed, "", " ")
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to marshal RSS feed to XML: %w", err)
|
||||
}
|
||||
|
||||
// Add the standard XML header
|
||||
output := append([]byte(xml.Header), xmlBytes...)
|
||||
|
||||
slog.Debug("RSS feed generated successfully", "item_count", len(items))
|
||||
return output, nil
|
||||
}
|
||||
112
internal/scheduler/scheduler.go
Normal file
112
internal/scheduler/scheduler.go
Normal file
@@ -0,0 +1,112 @@
|
||||
package scheduler
|
||||
|
||||
import (
|
||||
"context"
|
||||
"log/slog"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/kemko/icecast-ripper/internal/recorder"
|
||||
"github.com/kemko/icecast-ripper/internal/streamchecker"
|
||||
)
|
||||
|
||||
// Scheduler manages the periodic checking of the stream and triggers recordings.
|
||||
type Scheduler struct {
|
||||
interval time.Duration
|
||||
checker *streamchecker.Checker
|
||||
recorder *recorder.Recorder
|
||||
stopChan chan struct{} // Channel to signal stopping the scheduler
|
||||
stopOnce sync.Once // Ensures Stop is only called once
|
||||
wg sync.WaitGroup // Waits for the run loop to finish
|
||||
parentContext context.Context // Base context for recordings
|
||||
}
|
||||
|
||||
// New creates a new Scheduler instance.
|
||||
func New(interval time.Duration, checker *streamchecker.Checker, recorder *recorder.Recorder) *Scheduler {
|
||||
return &Scheduler{
|
||||
interval: interval,
|
||||
checker: checker,
|
||||
recorder: recorder,
|
||||
stopChan: make(chan struct{}),
|
||||
}
|
||||
}
|
||||
|
||||
// Start begins the scheduling loop.
|
||||
// It takes a parent context which will be used for recordings.
|
||||
func (s *Scheduler) Start(ctx context.Context) {
|
||||
slog.Info("Starting scheduler", "interval", s.interval.String())
|
||||
s.parentContext = ctx
|
||||
s.wg.Add(1)
|
||||
go s.run()
|
||||
}
|
||||
|
||||
// Stop signals the scheduling loop to terminate gracefully.
|
||||
func (s *Scheduler) Stop() {
|
||||
s.stopOnce.Do(func() {
|
||||
slog.Info("Stopping scheduler...")
|
||||
close(s.stopChan) // Signal the run loop to stop
|
||||
// If a recording is in progress, stopping the scheduler doesn't automatically stop it.
|
||||
// The recorder needs to be stopped separately if immediate termination is desired.
|
||||
// s.recorder.StopRecording() // Uncomment if scheduler stop should also stop recording
|
||||
s.wg.Wait() // Wait for the run loop to exit
|
||||
slog.Info("Scheduler stopped.")
|
||||
})
|
||||
}
|
||||
|
||||
// run is the main loop for the scheduler.
|
||||
func (s *Scheduler) run() {
|
||||
defer s.wg.Done()
|
||||
ticker := time.NewTicker(s.interval)
|
||||
defer ticker.Stop()
|
||||
|
||||
for {
|
||||
// Initial check immediately on start, then wait for ticker
|
||||
s.checkAndRecord()
|
||||
|
||||
select {
|
||||
case <-ticker.C:
|
||||
// Check periodically based on the interval
|
||||
s.checkAndRecord()
|
||||
case <-s.stopChan:
|
||||
slog.Info("Scheduler run loop exiting.")
|
||||
return // Exit loop if stop signal received
|
||||
case <-s.parentContext.Done():
|
||||
slog.Info("Scheduler parent context cancelled, stopping.")
|
||||
// Ensure Stop is called to clean up resources if not already stopped
|
||||
s.Stop()
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// checkAndRecord performs the stream check and starts recording if necessary.
|
||||
func (s *Scheduler) checkAndRecord() {
|
||||
// Do not check if a recording is already in progress
|
||||
if s.recorder.IsRecording() {
|
||||
slog.Debug("Skipping check, recording in progress.")
|
||||
return
|
||||
}
|
||||
|
||||
slog.Debug("Scheduler checking stream status...")
|
||||
isLive, err := s.checker.IsLive()
|
||||
if err != nil {
|
||||
// Log the error but continue; maybe a temporary network issue
|
||||
slog.Warn("Error checking stream status", "error", err)
|
||||
return
|
||||
}
|
||||
|
||||
if isLive {
|
||||
slog.Info("Stream is live, attempting to start recording.")
|
||||
// Use the parent context provided during Start for the recording
|
||||
err := s.recorder.StartRecording(s.parentContext, s.checker.GetStreamURL())
|
||||
if err != nil {
|
||||
// This likely means a recording was *just* started by another check, which is fine.
|
||||
slog.Warn("Failed to start recording (might already be in progress)", "error", err)
|
||||
} else {
|
||||
slog.Info("Recording started successfully by scheduler.")
|
||||
// Recording started, the check loop will skip until it finishes
|
||||
}
|
||||
} else {
|
||||
slog.Debug("Stream is not live, waiting for next interval.")
|
||||
}
|
||||
}
|
||||
109
internal/server/server.go
Normal file
109
internal/server/server.go
Normal file
@@ -0,0 +1,109 @@
|
||||
package server
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"log/slog"
|
||||
"net/http"
|
||||
"path/filepath"
|
||||
"time"
|
||||
|
||||
"github.com/kemko/icecast-ripper/internal/config"
|
||||
"github.com/kemko/icecast-ripper/internal/rss"
|
||||
)
|
||||
|
||||
// Server handles HTTP requests for the RSS feed and recordings.
|
||||
type Server struct {
|
||||
server *http.Server
|
||||
rssGenerator *rss.Generator
|
||||
recordingsPath string
|
||||
serverAddress string
|
||||
}
|
||||
|
||||
// New creates a new HTTP server instance.
|
||||
func New(cfg *config.Config, rssGenerator *rss.Generator) *Server {
|
||||
mux := http.NewServeMux()
|
||||
|
||||
s := &Server{
|
||||
rssGenerator: rssGenerator,
|
||||
recordingsPath: cfg.RecordingsPath,
|
||||
serverAddress: cfg.ServerAddress,
|
||||
}
|
||||
|
||||
// Handler for the RSS feed
|
||||
mux.HandleFunc("GET /rss", s.handleRSS)
|
||||
|
||||
// Handler for serving static recording files
|
||||
// Ensure recordingsPath is absolute or relative to the working directory
|
||||
absRecordingsPath, err := filepath.Abs(cfg.RecordingsPath)
|
||||
if err != nil {
|
||||
slog.Error("Failed to get absolute path for recordings directory", "path", cfg.RecordingsPath, "error", err)
|
||||
// Decide how to handle this - maybe panic or return an error from New?
|
||||
// For now, log and continue, file serving might fail.
|
||||
absRecordingsPath = cfg.RecordingsPath // Fallback to original path
|
||||
}
|
||||
slog.Info("Serving recordings from", "path", absRecordingsPath)
|
||||
fileServer := http.FileServer(http.Dir(absRecordingsPath))
|
||||
// The path must end with a trailing slash for FileServer to work correctly with subdirectories
|
||||
mux.Handle("GET /recordings/", http.StripPrefix("/recordings/", fileServer))
|
||||
|
||||
s.server = &http.Server{
|
||||
Addr: cfg.ServerAddress,
|
||||
Handler: mux,
|
||||
ReadTimeout: 10 * time.Second,
|
||||
WriteTimeout: 10 * time.Second,
|
||||
IdleTimeout: 60 * time.Second,
|
||||
}
|
||||
|
||||
return s
|
||||
}
|
||||
|
||||
// handleRSS generates and serves the RSS feed.
|
||||
func (s *Server) handleRSS(w http.ResponseWriter, r *http.Request) {
|
||||
slog.Debug("Received request for RSS feed")
|
||||
// TODO: Consider adding caching headers (ETag, Last-Modified)
|
||||
// based on the latest recording timestamp or a hash of the feed content.
|
||||
|
||||
// Generate feed (limit to a reasonable number, e.g., 50 latest items)
|
||||
maxItems := 50
|
||||
feedBytes, err := s.rssGenerator.GenerateFeed(maxItems)
|
||||
if err != nil {
|
||||
slog.Error("Failed to generate RSS feed", "error", err)
|
||||
http.Error(w, "Internal Server Error", http.StatusInternalServerError)
|
||||
return
|
||||
}
|
||||
|
||||
w.Header().Set("Content-Type", "application/rss+xml; charset=utf-8")
|
||||
w.WriteHeader(http.StatusOK)
|
||||
_, err = w.Write(feedBytes)
|
||||
if err != nil {
|
||||
slog.Error("Failed to write RSS feed response", "error", err)
|
||||
}
|
||||
}
|
||||
|
||||
// Start begins listening for HTTP requests.
|
||||
func (s *Server) Start() error {
|
||||
slog.Info("Starting HTTP server", "address", s.serverAddress)
|
||||
// Run the server in a separate goroutine so it doesn't block.
|
||||
go func() {
|
||||
if err := s.server.ListenAndServe(); err != nil && !errors.Is(err, http.ErrServerClosed) {
|
||||
slog.Error("HTTP server ListenAndServe error", "error", err)
|
||||
// Consider signaling failure to the main application thread if needed
|
||||
}
|
||||
}()
|
||||
return nil // Return immediately
|
||||
}
|
||||
|
||||
// Stop gracefully shuts down the HTTP server.
|
||||
func (s *Server) Stop(ctx context.Context) error {
|
||||
slog.Info("Stopping HTTP server...")
|
||||
shutdownCtx, cancel := context.WithTimeout(ctx, 15*time.Second) // Give 15 seconds to shutdown gracefully
|
||||
defer cancel()
|
||||
|
||||
if err := s.server.Shutdown(shutdownCtx); err != nil {
|
||||
slog.Error("HTTP server graceful shutdown failed", "error", err)
|
||||
return err
|
||||
}
|
||||
slog.Info("HTTP server stopped.")
|
||||
return nil
|
||||
}
|
||||
62
internal/streamchecker/streamchecker.go
Normal file
62
internal/streamchecker/streamchecker.go
Normal file
@@ -0,0 +1,62 @@
|
||||
package streamchecker
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"log/slog"
|
||||
"net/http"
|
||||
"time"
|
||||
)
|
||||
|
||||
// Checker checks if an Icecast stream is live.
|
||||
type Checker struct {
|
||||
streamURL string
|
||||
client *http.Client
|
||||
}
|
||||
|
||||
// New creates a new Checker instance.
|
||||
func New(streamURL string) *Checker {
|
||||
return &Checker{
|
||||
streamURL: streamURL,
|
||||
client: &http.Client{
|
||||
Timeout: 10 * time.Second, // Sensible timeout for checking a stream
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
// IsLive checks if the stream URL is currently broadcasting.
|
||||
// It performs a GET request and checks for a 200 OK status.
|
||||
// Note: Some Icecast servers might behave differently; this might need adjustment.
|
||||
func (c *Checker) IsLive() (bool, error) {
|
||||
slog.Debug("Checking stream status", "url", c.streamURL)
|
||||
req, err := http.NewRequest("GET", c.streamURL, nil)
|
||||
if err != nil {
|
||||
return false, fmt.Errorf("failed to create request for stream check: %w", err)
|
||||
}
|
||||
// Set a user agent; some servers might require it.
|
||||
req.Header.Set("User-Agent", "icecast-ripper/1.0")
|
||||
// Icy-Metadata header can sometimes force the server to respond even if the stream is idle,
|
||||
// but we want to know if it's *actually* streaming audio data.
|
||||
// req.Header.Set("Icy-Metadata", "1")
|
||||
|
||||
resp, err := c.client.Do(req)
|
||||
if err != nil {
|
||||
// Network errors (DNS lookup failure, connection refused) usually mean not live.
|
||||
slog.Warn("Failed to connect to stream URL during check", "url", c.streamURL, "error", err)
|
||||
return false, nil // Treat connection errors as 'not live'
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
|
||||
// A 200 OK status generally indicates the stream is live and broadcasting.
|
||||
if resp.StatusCode == http.StatusOK {
|
||||
slog.Info("Stream is live", "url", c.streamURL, "status", resp.StatusCode)
|
||||
return true, nil
|
||||
}
|
||||
|
||||
slog.Info("Stream is not live", "url", c.streamURL, "status", resp.StatusCode)
|
||||
return false, nil
|
||||
}
|
||||
|
||||
// GetStreamURL returns the URL being checked.
|
||||
func (c *Checker) GetStreamURL() string {
|
||||
return c.streamURL
|
||||
}
|
||||
Reference in New Issue
Block a user