mirror of
https://github.com/kemko/icecast-ripper.git
synced 2026-01-01 15:55:42 +03:00
Integrate MP3 duration extraction and silence handling (#6)
* feat: integrate MP3 duration extraction and silence handling - Added a new dependency on github.com/tcolgate/mp3 for MP3 frame decoding. - Implemented actual MP3 duration retrieval in the scanRecordings function, falling back to an estimation if retrieval fails. - Introduced a silent frame asset for generating silence in audio streams. - Created a silence reader to provide a continuous stream of silent frames. - Added necessary documentation and licensing for the new MP3 package. - Updated .gitignore to exclude MP3 files except for the internal data directory. * feat: update README and improve application configuration and logging * refactor: remove unused GenerateFileHash function and improve resource cleanup in MP3 and recorder modules
This commit is contained in:
@@ -1,20 +1,21 @@
|
||||
package config
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
"github.com/spf13/viper"
|
||||
)
|
||||
|
||||
// Config stores all configuration for the application
|
||||
// Config stores application configuration loaded from environment variables
|
||||
type Config struct {
|
||||
StreamURL string `mapstructure:"STREAM_URL"`
|
||||
CheckInterval time.Duration `mapstructure:"CHECK_INTERVAL"`
|
||||
RecordingsPath string `mapstructure:"RECORDINGS_PATH"`
|
||||
TempPath string `mapstructure:"TEMP_PATH"`
|
||||
BindAddress string `mapstructure:"BIND_ADDRESS"`
|
||||
PublicUrl string `mapstructure:"PUBLIC_URL"`
|
||||
LogLevel string `mapstructure:"LOG_LEVEL"`
|
||||
StreamURL string `mapstructure:"STREAM_URL"` // URL of the Icecast stream to record
|
||||
CheckInterval time.Duration `mapstructure:"CHECK_INTERVAL"` // How often to check if the stream is live
|
||||
RecordingsPath string `mapstructure:"RECORDINGS_PATH"` // Where to store recordings
|
||||
TempPath string `mapstructure:"TEMP_PATH"` // Where to store temporary files during recording
|
||||
BindAddress string `mapstructure:"BIND_ADDRESS"` // HTTP server address:port
|
||||
PublicURL string `mapstructure:"PUBLIC_URL"` // Public-facing URL for RSS feed links
|
||||
LogLevel string `mapstructure:"LOG_LEVEL"` // Logging level (debug, info, warn, error)
|
||||
}
|
||||
|
||||
// LoadConfig reads configuration from environment variables
|
||||
@@ -39,7 +40,12 @@ func LoadConfig() (*Config, error) {
|
||||
|
||||
var config Config
|
||||
if err := v.Unmarshal(&config); err != nil {
|
||||
return nil, err
|
||||
return nil, fmt.Errorf("failed to parse configuration: %w", err)
|
||||
}
|
||||
|
||||
// Validate required fields
|
||||
if config.StreamURL == "" {
|
||||
return nil, fmt.Errorf("STREAM_URL is required")
|
||||
}
|
||||
|
||||
return &config, nil
|
||||
|
||||
@@ -4,8 +4,6 @@ import (
|
||||
"crypto/sha256"
|
||||
"encoding/hex"
|
||||
"fmt"
|
||||
"log/slog"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"time"
|
||||
)
|
||||
@@ -26,20 +24,5 @@ func GenerateGUID(streamName string, recordedAt time.Time, filePath string) stri
|
||||
hasher.Write([]byte(input))
|
||||
guid := hex.EncodeToString(hasher.Sum(nil))
|
||||
|
||||
slog.Debug("Generated GUID", "input", input, "guid", guid)
|
||||
return guid
|
||||
}
|
||||
|
||||
// GenerateFileHash is maintained for backwards compatibility
|
||||
// Uses file metadata instead of content
|
||||
func GenerateFileHash(filePath string) (string, error) {
|
||||
fileInfo, err := os.Stat(filePath)
|
||||
if err != nil {
|
||||
return "", fmt.Errorf("failed to stat file %s: %w", filePath, err)
|
||||
}
|
||||
|
||||
streamName := filepath.Base(filepath.Dir(filePath))
|
||||
recordedAt := fileInfo.ModTime()
|
||||
|
||||
return GenerateGUID(streamName, recordedAt, filePath), nil
|
||||
}
|
||||
|
||||
@@ -6,18 +6,43 @@ import (
|
||||
"strings"
|
||||
)
|
||||
|
||||
// Setup initializes the structured logger with the specified log level
|
||||
func Setup(logLevel string) {
|
||||
// Format represents the output format for logs
|
||||
type Format string
|
||||
|
||||
const (
|
||||
// JSON outputs logs in JSON format for machine readability
|
||||
JSON Format = "json"
|
||||
// Text outputs logs in a human-readable format
|
||||
Text Format = "text"
|
||||
)
|
||||
|
||||
// Setup initializes the structured logger with the specified log level and format
|
||||
func Setup(logLevel string, format ...Format) {
|
||||
level := parseLogLevel(logLevel)
|
||||
|
||||
handler := slog.NewJSONHandler(os.Stdout, &slog.HandlerOptions{
|
||||
Level: level,
|
||||
})
|
||||
// Default to JSON format if not specified
|
||||
logFormat := JSON
|
||||
if len(format) > 0 {
|
||||
logFormat = format[0]
|
||||
}
|
||||
|
||||
var handler slog.Handler
|
||||
|
||||
switch logFormat {
|
||||
case Text:
|
||||
handler = slog.NewTextHandler(os.Stdout, &slog.HandlerOptions{
|
||||
Level: level,
|
||||
})
|
||||
default: // JSON
|
||||
handler = slog.NewJSONHandler(os.Stdout, &slog.HandlerOptions{
|
||||
Level: level,
|
||||
})
|
||||
}
|
||||
|
||||
logger := slog.New(handler)
|
||||
slog.SetDefault(logger)
|
||||
|
||||
slog.Info("Logger initialized", "level", level.String())
|
||||
slog.Debug("Logger initialized", "level", level.String(), "format", string(logFormat))
|
||||
}
|
||||
|
||||
// parseLogLevel converts a string log level to slog.Level
|
||||
|
||||
55
internal/mp3util/mp3util.go
Normal file
55
internal/mp3util/mp3util.go
Normal file
@@ -0,0 +1,55 @@
|
||||
// Package mp3util provides utilities for working with MP3 files
|
||||
package mp3util
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"io"
|
||||
"os"
|
||||
"time"
|
||||
|
||||
"github.com/tcolgate/mp3"
|
||||
)
|
||||
|
||||
// GetDuration returns the actual duration of an MP3 file by analyzing its frames
|
||||
func GetDuration(filePath string) (time.Duration, error) {
|
||||
file, err := os.Open(filePath)
|
||||
if err != nil {
|
||||
return 0, fmt.Errorf("failed to open MP3 file: %w", err)
|
||||
}
|
||||
defer func(file *os.File) {
|
||||
err := file.Close()
|
||||
if err != nil {
|
||||
fmt.Printf("Failed to close file: %v", err)
|
||||
}
|
||||
}(file)
|
||||
|
||||
decoder := mp3.NewDecoder(file)
|
||||
var frame mp3.Frame
|
||||
var skipped int
|
||||
var totalSamples int
|
||||
sampleRate := 0
|
||||
|
||||
// Process the frames to calculate the total duration
|
||||
for {
|
||||
if err := decoder.Decode(&frame, &skipped); err != nil {
|
||||
if errors.Is(err, io.EOF) || errors.Is(err, io.ErrUnexpectedEOF) {
|
||||
break
|
||||
}
|
||||
return 0, fmt.Errorf("failed to decode MP3 frame: %w", err)
|
||||
}
|
||||
|
||||
if sampleRate == 0 {
|
||||
sampleRate = frame.Samples()
|
||||
}
|
||||
|
||||
totalSamples += frame.Samples()
|
||||
}
|
||||
|
||||
if totalSamples == 0 || sampleRate == 0 {
|
||||
return 0, errors.New("could not determine MP3 duration")
|
||||
}
|
||||
|
||||
durationSeconds := float64(totalSamples) / float64(sampleRate)
|
||||
return time.Duration(durationSeconds * float64(time.Second)), nil
|
||||
}
|
||||
@@ -23,22 +23,40 @@ type Recorder struct {
|
||||
isRecording bool
|
||||
cancelFunc context.CancelFunc
|
||||
streamName string
|
||||
userAgent string
|
||||
}
|
||||
|
||||
// Option represents a functional option for configuring the recorder
|
||||
type Option func(*Recorder)
|
||||
|
||||
// WithUserAgent sets a custom User-Agent string for HTTP requests
|
||||
func WithUserAgent(userAgent string) Option {
|
||||
return func(r *Recorder) {
|
||||
r.userAgent = userAgent
|
||||
}
|
||||
}
|
||||
|
||||
// New creates a recorder instance
|
||||
func New(tempPath, recordingsPath string, streamName string) (*Recorder, error) {
|
||||
func New(tempPath, recordingsPath string, streamName string, opts ...Option) (*Recorder, error) {
|
||||
for _, dir := range []string{tempPath, recordingsPath} {
|
||||
if err := os.MkdirAll(dir, 0755); err != nil {
|
||||
return nil, fmt.Errorf("failed to create directory %s: %w", dir, err)
|
||||
}
|
||||
}
|
||||
|
||||
return &Recorder{
|
||||
r := &Recorder{
|
||||
tempPath: tempPath,
|
||||
recordingsPath: recordingsPath,
|
||||
streamName: streamName,
|
||||
client: &http.Client{Timeout: 0}, // No timeout, rely on context cancellation
|
||||
}, nil
|
||||
client: &http.Client{Timeout: 0}, // No timeout for long-running downloads
|
||||
}
|
||||
|
||||
// Apply any provided options
|
||||
for _, opt := range opts {
|
||||
opt(r)
|
||||
}
|
||||
|
||||
return r, nil
|
||||
}
|
||||
|
||||
// IsRecording returns whether a recording is currently in progress
|
||||
@@ -90,19 +108,19 @@ func (r *Recorder) recordStream(ctx context.Context, streamURL string) {
|
||||
r.mu.Unlock()
|
||||
slog.Info("Recording process finished")
|
||||
|
||||
// Only clean up temp file if it was successfully moved to final location
|
||||
if tempFilePath != "" && moveSuccessful {
|
||||
if _, err := os.Stat(tempFilePath); err == nil {
|
||||
slog.Debug("Cleaning up temporary file", "path", tempFilePath)
|
||||
if err := os.Remove(tempFilePath); err != nil {
|
||||
slog.Error("Failed to remove temporary file", "error", err)
|
||||
}
|
||||
if tempFilePath != "" && !moveSuccessful {
|
||||
slog.Warn("Temporary file preserved for inspection", "path", tempFilePath)
|
||||
return
|
||||
}
|
||||
|
||||
if tempFilePath != "" {
|
||||
if err := cleanupTempFile(tempFilePath); err != nil {
|
||||
slog.Error("Failed to remove temporary file", "path", tempFilePath, "error", err)
|
||||
}
|
||||
} else if tempFilePath != "" && !moveSuccessful {
|
||||
slog.Warn("Temporary file preserved for manual inspection", "path", tempFilePath)
|
||||
}
|
||||
}()
|
||||
|
||||
// Create temp file for recording
|
||||
tempFile, err := os.CreateTemp(r.tempPath, "recording-*.tmp")
|
||||
if err != nil {
|
||||
slog.Error("Failed to create temporary file", "error", err)
|
||||
@@ -120,7 +138,7 @@ func (r *Recorder) recordStream(ctx context.Context, streamURL string) {
|
||||
}
|
||||
}
|
||||
|
||||
// Handle context cancellation or download errors
|
||||
// Handle errors and early termination
|
||||
if err != nil {
|
||||
if errors.Is(err, context.Canceled) {
|
||||
slog.Info("Recording stopped via cancellation")
|
||||
@@ -137,30 +155,96 @@ func (r *Recorder) recordStream(ctx context.Context, streamURL string) {
|
||||
}
|
||||
|
||||
// Process successful recording
|
||||
endTime := time.Now()
|
||||
duration := endTime.Sub(startTime)
|
||||
finalPath := r.generateFinalPath(startTime)
|
||||
moveSuccessful = r.moveToFinalLocation(tempFilePath, finalPath)
|
||||
|
||||
if moveSuccessful {
|
||||
duration := time.Since(startTime)
|
||||
slog.Info("Recording saved", "path", finalPath, "size", bytesWritten, "duration", duration)
|
||||
}
|
||||
}
|
||||
|
||||
func (r *Recorder) generateFinalPath(startTime time.Time) string {
|
||||
finalFilename := fmt.Sprintf("%s_%s.mp3", r.streamName, startTime.Format("20060102_150405"))
|
||||
finalFilename = sanitizeFilename(finalFilename)
|
||||
finalPath := filepath.Join(r.recordingsPath, finalFilename)
|
||||
return filepath.Join(r.recordingsPath, finalFilename)
|
||||
}
|
||||
|
||||
func (r *Recorder) moveToFinalLocation(tempPath, finalPath string) bool {
|
||||
// Try rename first (fastest)
|
||||
if err := os.Rename(tempFilePath, finalPath); err != nil {
|
||||
slog.Warn("Failed to move recording with rename, trying copy fallback", "error", err)
|
||||
|
||||
// Fallback to manual copy
|
||||
if err := copyFile(tempFilePath, finalPath); err != nil {
|
||||
slog.Error("Failed to move recording to final location", "error", err)
|
||||
return
|
||||
}
|
||||
|
||||
// Copy successful, mark for cleanup
|
||||
moveSuccessful = true
|
||||
slog.Info("Recording copied successfully using fallback method", "path", finalPath)
|
||||
} else {
|
||||
moveSuccessful = true
|
||||
if err := os.Rename(tempPath, finalPath); err == nil {
|
||||
return true
|
||||
}
|
||||
|
||||
slog.Info("Recording saved", "path", finalPath, "size", bytesWritten, "duration", duration)
|
||||
// Fallback to manual copy
|
||||
if err := copyFile(tempPath, finalPath); err != nil {
|
||||
slog.Error("Failed to move recording to final location", "error", err)
|
||||
return false
|
||||
}
|
||||
|
||||
slog.Info("Recording copied successfully using fallback method")
|
||||
return true
|
||||
}
|
||||
|
||||
func (r *Recorder) downloadStream(ctx context.Context, streamURL string, writer io.Writer) (int64, error) {
|
||||
req, err := http.NewRequestWithContext(ctx, http.MethodGet, streamURL, nil)
|
||||
if err != nil {
|
||||
return 0, fmt.Errorf("failed to create request: %w", err)
|
||||
}
|
||||
req.Header.Set("User-Agent", r.userAgent)
|
||||
|
||||
resp, err := r.client.Do(req)
|
||||
if err != nil {
|
||||
if errors.Is(err, context.Canceled) {
|
||||
return 0, err
|
||||
}
|
||||
return 0, fmt.Errorf("failed to connect to stream: %w", err)
|
||||
}
|
||||
defer func(Body io.ReadCloser) {
|
||||
err := Body.Close()
|
||||
if err != nil {
|
||||
slog.Error("Failed to close response body", "error", err)
|
||||
}
|
||||
}(resp.Body)
|
||||
|
||||
if resp.StatusCode != http.StatusOK {
|
||||
return 0, fmt.Errorf("unexpected status code: %s", resp.Status)
|
||||
}
|
||||
|
||||
slog.Debug("Connected to stream, downloading", "url", streamURL)
|
||||
bytesWritten, err := io.Copy(writer, resp.Body)
|
||||
|
||||
if err != nil {
|
||||
if errors.Is(err, context.Canceled) {
|
||||
slog.Info("Stream download cancelled")
|
||||
return bytesWritten, ctx.Err()
|
||||
}
|
||||
|
||||
// Handle common stream disconnections gracefully
|
||||
if isNormalDisconnect(err) {
|
||||
slog.Info("Stream disconnected normally", "bytesWritten", bytesWritten)
|
||||
return bytesWritten, nil
|
||||
}
|
||||
|
||||
return bytesWritten, fmt.Errorf("failed during stream copy: %w", err)
|
||||
}
|
||||
|
||||
slog.Info("Stream download finished normally", "bytesWritten", bytesWritten)
|
||||
return bytesWritten, nil
|
||||
}
|
||||
|
||||
func isNormalDisconnect(err error) bool {
|
||||
return errors.Is(err, io.ErrUnexpectedEOF) ||
|
||||
strings.Contains(err.Error(), "connection reset by peer") ||
|
||||
strings.Contains(err.Error(), "broken pipe")
|
||||
}
|
||||
|
||||
func cleanupTempFile(path string) error {
|
||||
if _, err := os.Stat(path); err == nil {
|
||||
slog.Debug("Cleaning up temporary file", "path", path)
|
||||
return os.Remove(path)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// copyFile copies a file from src to dst
|
||||
@@ -194,54 +278,6 @@ func copyFile(src, dst string) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (r *Recorder) downloadStream(ctx context.Context, streamURL string, writer io.Writer) (int64, error) {
|
||||
req, err := http.NewRequestWithContext(ctx, http.MethodGet, streamURL, nil)
|
||||
if err != nil {
|
||||
return 0, fmt.Errorf("failed to create request: %w", err)
|
||||
}
|
||||
req.Header.Set("User-Agent", "icecast-ripper/1.0")
|
||||
|
||||
resp, err := r.client.Do(req)
|
||||
if err != nil {
|
||||
if errors.Is(err, context.Canceled) {
|
||||
return 0, err
|
||||
}
|
||||
return 0, fmt.Errorf("failed to connect to stream: %w", err)
|
||||
}
|
||||
defer func() {
|
||||
if err := resp.Body.Close(); err != nil {
|
||||
slog.Error("Failed to close response body", "error", err)
|
||||
}
|
||||
}()
|
||||
|
||||
if resp.StatusCode != http.StatusOK {
|
||||
return 0, fmt.Errorf("unexpected status code: %s", resp.Status)
|
||||
}
|
||||
|
||||
slog.Debug("Connected to stream, downloading", "url", streamURL)
|
||||
bytesWritten, err := io.Copy(writer, resp.Body)
|
||||
|
||||
if err != nil {
|
||||
if errors.Is(err, context.Canceled) {
|
||||
slog.Info("Stream download cancelled")
|
||||
return bytesWritten, ctx.Err()
|
||||
}
|
||||
|
||||
// Handle common stream disconnections gracefully
|
||||
if errors.Is(err, io.ErrUnexpectedEOF) ||
|
||||
strings.Contains(err.Error(), "connection reset by peer") ||
|
||||
strings.Contains(err.Error(), "broken pipe") {
|
||||
slog.Info("Stream disconnected normally", "bytesWritten", bytesWritten)
|
||||
return bytesWritten, nil
|
||||
}
|
||||
|
||||
return bytesWritten, fmt.Errorf("failed during stream copy: %w", err)
|
||||
}
|
||||
|
||||
slog.Info("Stream download finished normally", "bytesWritten", bytesWritten)
|
||||
return bytesWritten, nil
|
||||
}
|
||||
|
||||
func sanitizeFilename(filename string) string {
|
||||
replacer := strings.NewReplacer(
|
||||
" ", "_",
|
||||
|
||||
@@ -13,6 +13,7 @@ import (
|
||||
"github.com/gorilla/feeds"
|
||||
"github.com/kemko/icecast-ripper/internal/config"
|
||||
"github.com/kemko/icecast-ripper/internal/hash"
|
||||
"github.com/kemko/icecast-ripper/internal/mp3util"
|
||||
)
|
||||
|
||||
// RecordingInfo contains metadata about a recording
|
||||
@@ -24,9 +25,9 @@ type RecordingInfo struct {
|
||||
RecordedAt time.Time
|
||||
}
|
||||
|
||||
// Generator creates RSS feeds
|
||||
// Generator creates RSS feeds for recorded streams
|
||||
type Generator struct {
|
||||
baseUrl string
|
||||
baseURL string
|
||||
recordingsPath string
|
||||
feedTitle string
|
||||
feedDesc string
|
||||
@@ -35,15 +36,15 @@ type Generator struct {
|
||||
|
||||
// New creates a new RSS Generator instance
|
||||
func New(cfg *config.Config, title, description, streamName string) *Generator {
|
||||
baseUrl := cfg.PublicUrl
|
||||
baseURL := cfg.PublicURL
|
||||
|
||||
// Ensure base URL ends with a slash
|
||||
if !strings.HasSuffix(baseUrl, "/") {
|
||||
baseUrl += "/"
|
||||
if !strings.HasSuffix(baseURL, "/") {
|
||||
baseURL += "/"
|
||||
}
|
||||
|
||||
return &Generator{
|
||||
baseUrl: cfg.PublicUrl,
|
||||
baseURL: baseURL,
|
||||
recordingsPath: cfg.RecordingsPath,
|
||||
feedTitle: title,
|
||||
feedDesc: description,
|
||||
@@ -54,7 +55,7 @@ func New(cfg *config.Config, title, description, streamName string) *Generator {
|
||||
// Pattern to extract timestamp from recording filename (stream.somesite.com_20240907_195622.mp3)
|
||||
var recordingPattern = regexp.MustCompile(`([^_]+)_(\d{8}_\d{6})\.mp3$`)
|
||||
|
||||
// GenerateFeed produces the RSS feed XML as a byte slice
|
||||
// GenerateFeed produces the RSS feed XML
|
||||
func (g *Generator) GenerateFeed(maxItems int) ([]byte, error) {
|
||||
recordings, err := g.scanRecordings(maxItems)
|
||||
if err != nil {
|
||||
@@ -63,15 +64,27 @@ func (g *Generator) GenerateFeed(maxItems int) ([]byte, error) {
|
||||
|
||||
feed := &feeds.Feed{
|
||||
Title: g.feedTitle,
|
||||
Link: &feeds.Link{Href: g.baseUrl},
|
||||
Link: &feeds.Link{Href: g.baseURL},
|
||||
Description: g.feedDesc,
|
||||
Created: time.Now(),
|
||||
}
|
||||
|
||||
feed.Items = make([]*feeds.Item, 0, len(recordings))
|
||||
feed.Items = g.createFeedItems(recordings)
|
||||
|
||||
baseURL := g.baseUrl
|
||||
baseURL = strings.TrimSuffix(baseURL, "/")
|
||||
rssFeed, err := feed.ToRss()
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to generate RSS feed: %w", err)
|
||||
}
|
||||
|
||||
slog.Debug("RSS feed generated", "itemCount", len(feed.Items))
|
||||
return []byte(rssFeed), nil
|
||||
}
|
||||
|
||||
// createFeedItems converts recording info to RSS feed items
|
||||
func (g *Generator) createFeedItems(recordings []RecordingInfo) []*feeds.Item {
|
||||
items := make([]*feeds.Item, 0, len(recordings))
|
||||
|
||||
baseURL := strings.TrimSuffix(g.baseURL, "/")
|
||||
|
||||
for _, rec := range recordings {
|
||||
fileURL := fmt.Sprintf("%s/recordings/%s", baseURL, rec.Filename)
|
||||
@@ -89,46 +102,29 @@ func (g *Generator) GenerateFeed(maxItems int) ([]byte, error) {
|
||||
Type: "audio/mpeg",
|
||||
},
|
||||
}
|
||||
feed.Items = append(feed.Items, item)
|
||||
items = append(items, item)
|
||||
}
|
||||
|
||||
rssFeed, err := feed.ToRss()
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to generate RSS feed: %w", err)
|
||||
}
|
||||
|
||||
slog.Debug("RSS feed generated", "itemCount", len(feed.Items))
|
||||
return []byte(rssFeed), nil
|
||||
return items
|
||||
}
|
||||
|
||||
// scanRecordings scans the recordings directory and returns metadata about the files
|
||||
// scanRecordings scans the recordings directory and returns metadata
|
||||
func (g *Generator) scanRecordings(maxItems int) ([]RecordingInfo, error) {
|
||||
var recordings []RecordingInfo
|
||||
|
||||
err := filepath.WalkDir(g.recordingsPath, func(path string, d fs.DirEntry, err error) error {
|
||||
if err != nil {
|
||||
if err != nil || d.IsDir() || !strings.HasSuffix(strings.ToLower(d.Name()), ".mp3") {
|
||||
return err
|
||||
}
|
||||
|
||||
// Skip directories
|
||||
if d.IsDir() {
|
||||
return nil
|
||||
}
|
||||
|
||||
// Only process mp3 files
|
||||
if !strings.HasSuffix(strings.ToLower(d.Name()), ".mp3") {
|
||||
return nil
|
||||
}
|
||||
|
||||
// Extract timestamp from filename
|
||||
matches := recordingPattern.FindStringSubmatch(d.Name())
|
||||
if len(matches) < 3 {
|
||||
// Skip files not matching our pattern
|
||||
slog.Debug("Skipping non-conforming filename", "filename", d.Name())
|
||||
return nil
|
||||
}
|
||||
|
||||
// Parse the timestamp (now in the 3rd capture group [2])
|
||||
// Parse the timestamp from the filename
|
||||
timestamp, err := time.Parse("20060102_150405", matches[2])
|
||||
if err != nil {
|
||||
slog.Warn("Failed to parse timestamp from filename", "filename", d.Name(), "error", err)
|
||||
@@ -141,9 +137,13 @@ func (g *Generator) scanRecordings(maxItems int) ([]RecordingInfo, error) {
|
||||
return nil
|
||||
}
|
||||
|
||||
// Calculate an estimated duration based on file size
|
||||
// Assuming ~128kbps MP3 bitrate: 16KB per second
|
||||
estimatedDuration := time.Duration(info.Size()/16000) * time.Second
|
||||
// Get the actual duration from the MP3 file
|
||||
duration, err := mp3util.GetDuration(path)
|
||||
if err != nil {
|
||||
slog.Warn("Failed to get MP3 duration, estimating", "filename", d.Name(), "error", err)
|
||||
// Estimate: ~128kbps MP3 bitrate = 16KB per second
|
||||
duration = time.Duration(info.Size()/16000) * time.Second
|
||||
}
|
||||
|
||||
// Generate a stable hash for the recording
|
||||
filename := filepath.Base(path)
|
||||
@@ -153,7 +153,7 @@ func (g *Generator) scanRecordings(maxItems int) ([]RecordingInfo, error) {
|
||||
Filename: filename,
|
||||
Hash: fileHash,
|
||||
FileSize: info.Size(),
|
||||
Duration: estimatedDuration,
|
||||
Duration: duration,
|
||||
RecordedAt: timestamp,
|
||||
})
|
||||
|
||||
|
||||
@@ -10,6 +10,7 @@ import (
|
||||
"github.com/kemko/icecast-ripper/internal/streamchecker"
|
||||
)
|
||||
|
||||
// Scheduler periodically checks if a stream is live and starts recording
|
||||
type Scheduler struct {
|
||||
interval time.Duration
|
||||
checker *streamchecker.Checker
|
||||
@@ -20,6 +21,7 @@ type Scheduler struct {
|
||||
parentContext context.Context
|
||||
}
|
||||
|
||||
// New creates a scheduler instance
|
||||
func New(interval time.Duration, checker *streamchecker.Checker, recorder *recorder.Recorder) *Scheduler {
|
||||
return &Scheduler{
|
||||
interval: interval,
|
||||
@@ -29,6 +31,7 @@ func New(interval time.Duration, checker *streamchecker.Checker, recorder *recor
|
||||
}
|
||||
}
|
||||
|
||||
// Start begins the scheduling process
|
||||
func (s *Scheduler) Start(ctx context.Context) {
|
||||
slog.Info("Starting scheduler", "interval", s.interval.String())
|
||||
s.parentContext = ctx
|
||||
@@ -36,17 +39,19 @@ func (s *Scheduler) Start(ctx context.Context) {
|
||||
go s.run()
|
||||
}
|
||||
|
||||
// Stop gracefully shuts down the scheduler
|
||||
func (s *Scheduler) Stop() {
|
||||
s.stopOnce.Do(func() {
|
||||
slog.Info("Stopping scheduler...")
|
||||
close(s.stopChan)
|
||||
s.wg.Wait()
|
||||
slog.Info("Scheduler stopped.")
|
||||
slog.Info("Scheduler stopped")
|
||||
})
|
||||
}
|
||||
|
||||
func (s *Scheduler) run() {
|
||||
defer s.wg.Done()
|
||||
|
||||
ticker := time.NewTicker(s.interval)
|
||||
defer ticker.Stop()
|
||||
|
||||
@@ -58,10 +63,9 @@ func (s *Scheduler) run() {
|
||||
case <-ticker.C:
|
||||
s.checkAndRecord()
|
||||
case <-s.stopChan:
|
||||
slog.Info("Scheduler run loop exiting.")
|
||||
return
|
||||
case <-s.parentContext.Done():
|
||||
slog.Info("Parent context cancelled, stopping scheduler.")
|
||||
slog.Info("Parent context cancelled, stopping scheduler")
|
||||
return
|
||||
}
|
||||
}
|
||||
@@ -69,12 +73,11 @@ func (s *Scheduler) run() {
|
||||
|
||||
func (s *Scheduler) checkAndRecord() {
|
||||
if s.recorder.IsRecording() {
|
||||
slog.Debug("Recording in progress, skipping stream check.")
|
||||
slog.Debug("Recording in progress, skipping stream check")
|
||||
return
|
||||
}
|
||||
|
||||
slog.Debug("Checking stream status")
|
||||
isLive, err := s.checker.IsLive()
|
||||
isLive, err := s.checker.IsLiveWithContext(s.parentContext)
|
||||
if err != nil {
|
||||
slog.Warn("Error checking stream status", "error", err)
|
||||
return
|
||||
|
||||
@@ -43,7 +43,7 @@ func New(cfg *config.Config, rssGenerator *rss.Generator) *Server {
|
||||
fileServer := http.FileServer(http.Dir(absRecordingsPath))
|
||||
mux.Handle("GET /recordings/", http.StripPrefix("/recordings/", fileServer))
|
||||
|
||||
// Configure server with sensible timeouts
|
||||
// Configure server with timeouts for robustness
|
||||
s.server = &http.Server{
|
||||
Addr: cfg.BindAddress,
|
||||
Handler: mux,
|
||||
@@ -68,7 +68,6 @@ func (s *Server) handleRSS(w http.ResponseWriter, _ *http.Request) {
|
||||
}
|
||||
|
||||
w.Header().Set("Content-Type", "application/rss+xml; charset=utf-8")
|
||||
w.WriteHeader(http.StatusOK)
|
||||
if _, err = w.Write(feedBytes); err != nil {
|
||||
slog.Error("Failed to write RSS response", "error", err)
|
||||
}
|
||||
@@ -90,6 +89,5 @@ func (s *Server) Start() error {
|
||||
// Stop gracefully shuts down the HTTP server
|
||||
func (s *Server) Stop(ctx context.Context) error {
|
||||
slog.Info("Stopping HTTP server")
|
||||
|
||||
return s.server.Shutdown(ctx)
|
||||
}
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
package streamchecker
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"log/slog"
|
||||
"net/http"
|
||||
@@ -10,26 +11,51 @@ import (
|
||||
type Checker struct {
|
||||
streamURL string
|
||||
client *http.Client
|
||||
userAgent string
|
||||
}
|
||||
|
||||
func New(streamURL string) *Checker {
|
||||
return &Checker{
|
||||
// Option represents a functional option for configuring the Checker
|
||||
type Option func(*Checker)
|
||||
|
||||
// WithUserAgent sets a custom User-Agent header
|
||||
func WithUserAgent(userAgent string) Option {
|
||||
return func(c *Checker) {
|
||||
c.userAgent = userAgent
|
||||
}
|
||||
}
|
||||
|
||||
// New creates a new stream checker with sensible defaults
|
||||
func New(streamURL string, opts ...Option) *Checker {
|
||||
c := &Checker{
|
||||
streamURL: streamURL,
|
||||
client: &http.Client{
|
||||
Timeout: 10 * time.Second,
|
||||
},
|
||||
}
|
||||
|
||||
// Apply any provided options
|
||||
for _, opt := range opts {
|
||||
opt(c)
|
||||
}
|
||||
|
||||
return c
|
||||
}
|
||||
|
||||
// IsLive checks if the stream is currently broadcasting
|
||||
func (c *Checker) IsLive() (bool, error) {
|
||||
return c.IsLiveWithContext(context.Background())
|
||||
}
|
||||
|
||||
// IsLiveWithContext checks if the stream is live using the provided context
|
||||
func (c *Checker) IsLiveWithContext(ctx context.Context) (bool, error) {
|
||||
slog.Debug("Checking stream status", "url", c.streamURL)
|
||||
|
||||
req, err := http.NewRequest(http.MethodGet, c.streamURL, nil)
|
||||
req, err := http.NewRequestWithContext(ctx, http.MethodGet, c.streamURL, nil)
|
||||
if err != nil {
|
||||
return false, fmt.Errorf("failed to create request: %w", err)
|
||||
}
|
||||
|
||||
req.Header.Set("User-Agent", "icecast-ripper/1.0")
|
||||
req.Header.Set("User-Agent", c.userAgent)
|
||||
|
||||
resp, err := c.client.Do(req)
|
||||
if err != nil {
|
||||
@@ -51,6 +77,7 @@ func (c *Checker) IsLive() (bool, error) {
|
||||
return false, nil
|
||||
}
|
||||
|
||||
// GetStreamURL returns the URL being monitored
|
||||
func (c *Checker) GetStreamURL() string {
|
||||
return c.streamURL
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user