removed filestore metadata caching
Some checks failed
CodeQL / Analyze (python) (push) Has been cancelled
Docker / build (push) Has been cancelled
golangci-lint / lint (push) Has been cancelled

This commit is contained in:
Dmitrii Andreev
2025-04-07 13:44:49 +03:00
parent dd96de7a3a
commit 926a9ca08b
3 changed files with 114 additions and 51 deletions

View File

@@ -12,7 +12,6 @@ import (
"time"
"github.com/kemko/icecast-ripper/internal/config"
"github.com/kemko/icecast-ripper/internal/filestore"
"github.com/kemko/icecast-ripper/internal/logger"
"github.com/kemko/icecast-ripper/internal/recorder"
"github.com/kemko/icecast-ripper/internal/rss"
@@ -25,7 +24,10 @@ const version = "0.2.0"
func main() {
if err := run(); err != nil {
fmt.Fprintf(os.Stderr, "Error: %v\n", err)
_, err := fmt.Fprintf(os.Stderr, "Error: %v\n", err)
if err != nil {
return
}
os.Exit(1)
}
}
@@ -46,39 +48,22 @@ func run() error {
return fmt.Errorf("configuration error: STREAM_URL must be set")
}
// Extract stream name for GUID generation
// Extract stream name for identification
streamName := extractStreamName(cfg.StreamURL)
slog.Info("Using stream name for identification", "name", streamName)
// Initialize file store
storePath := ""
if cfg.DatabasePath != "" {
storePath = changeExtension(cfg.DatabasePath, ".json")
}
fileStore, err := filestore.Init(storePath)
if err != nil {
return fmt.Errorf("failed to initialize file store: %w", err)
}
// Properly handle Close() error
defer func() {
if err := fileStore.Close(); err != nil {
slog.Error("Error closing file store", "error", err)
}
}()
// Create main context that cancels on shutdown signal
ctx, stop := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM)
defer stop()
// Initialize components
streamChecker := streamchecker.New(cfg.StreamURL)
recorderInstance, err := recorder.New(cfg.TempPath, cfg.RecordingsPath, fileStore, streamName)
recorderInstance, err := recorder.New(cfg.TempPath, cfg.RecordingsPath, streamName)
if err != nil {
return fmt.Errorf("failed to initialize recorder: %w", err)
}
rssGenerator := rss.New(fileStore, cfg, "Icecast Recordings", "Recordings from stream: "+cfg.StreamURL)
rssGenerator := rss.New(cfg, "Icecast Recordings", "Recordings from stream: "+cfg.StreamURL, streamName)
schedulerInstance := scheduler.New(cfg.CheckInterval, streamChecker, recorderInstance)
httpServer := server.New(cfg, rssGenerator)
@@ -142,12 +127,3 @@ func extractStreamName(streamURL string) string {
return streamName
}
// changeExtension changes a file extension
func changeExtension(path string, newExt string) string {
ext := filepath.Ext(path)
if ext == "" {
return path + newExt
}
return path[:len(path)-len(ext)] + newExt
}

View File

@@ -12,15 +12,12 @@ import (
"strings"
"sync"
"time"
"github.com/kemko/icecast-ripper/internal/filestore"
"github.com/kemko/icecast-ripper/internal/hash"
)
// Recorder handles recording streams
type Recorder struct {
tempPath string
recordingsPath string
db *filestore.Store
client *http.Client
mu sync.Mutex
isRecording bool
@@ -28,7 +25,8 @@ type Recorder struct {
streamName string
}
func New(tempPath, recordingsPath string, db *filestore.Store, streamName string) (*Recorder, error) {
// New creates a recorder instance
func New(tempPath, recordingsPath string, streamName string) (*Recorder, error) {
for _, dir := range []string{tempPath, recordingsPath} {
if err := os.MkdirAll(dir, 0755); err != nil {
return nil, fmt.Errorf("failed to create directory %s: %w", dir, err)
@@ -38,18 +36,19 @@ func New(tempPath, recordingsPath string, db *filestore.Store, streamName string
return &Recorder{
tempPath: tempPath,
recordingsPath: recordingsPath,
db: db,
streamName: streamName,
client: &http.Client{Timeout: 0}, // No timeout, rely on context cancellation
}, nil
}
// IsRecording returns whether a recording is currently in progress
func (r *Recorder) IsRecording() bool {
r.mu.Lock()
defer r.mu.Unlock()
return r.isRecording
}
// StartRecording begins recording a stream
func (r *Recorder) StartRecording(ctx context.Context, streamURL string) error {
r.mu.Lock()
defer r.mu.Unlock()
@@ -68,6 +67,7 @@ func (r *Recorder) StartRecording(ctx context.Context, streamURL string) error {
return nil
}
// StopRecording stops an in-progress recording
func (r *Recorder) StopRecording() {
r.mu.Lock()
defer r.mu.Unlock()
@@ -146,11 +146,6 @@ func (r *Recorder) recordStream(ctx context.Context, streamURL string) {
tempFilePath = "" // Prevent cleanup in defer
slog.Info("Recording saved", "path", finalPath, "size", bytesWritten, "duration", duration)
guid := hash.GenerateGUID(r.streamName, startTime, finalFilename)
if _, err = r.db.AddRecordedFile(finalFilename, guid, bytesWritten, duration, startTime); err != nil {
slog.Error("Failed to add recording to database", "error", err)
}
}
func (r *Recorder) downloadStream(ctx context.Context, streamURL string, writer io.Writer) (int64, error) {

View File

@@ -2,27 +2,40 @@ package rss
import (
"fmt"
"io/fs"
"log/slog"
"net/url"
"path/filepath"
"regexp"
"sort"
"strings"
"time"
"github.com/gorilla/feeds"
"github.com/kemko/icecast-ripper/internal/config"
"github.com/kemko/icecast-ripper/internal/filestore"
"github.com/kemko/icecast-ripper/internal/hash"
)
// RecordingInfo contains metadata about a recording
type RecordingInfo struct {
Filename string
Hash string
FileSize int64
Duration time.Duration
RecordedAt time.Time
}
// Generator creates RSS feeds
type Generator struct {
fileStore *filestore.Store
feedBaseURL string
recordingsPath string
feedTitle string
feedDesc string
streamName string
}
// New creates a new RSS Generator instance
func New(fileStore *filestore.Store, cfg *config.Config, title, description string) *Generator {
func New(cfg *config.Config, title, description, streamName string) *Generator {
baseURL := cfg.RSSFeedURL
if baseURL == "" {
slog.Warn("RSS_FEED_URL not set, using default")
@@ -35,19 +48,22 @@ func New(fileStore *filestore.Store, cfg *config.Config, title, description stri
}
return &Generator{
fileStore: fileStore,
feedBaseURL: baseURL,
recordingsPath: cfg.RecordingsPath,
feedTitle: title,
feedDesc: description,
streamName: streamName,
}
}
// Pattern to extract timestamp from recording filename (recording_20230505_120000.mp3)
var recordingPattern = regexp.MustCompile(`recording_(\d{8}_\d{6})\.mp3$`)
// GenerateFeed produces the RSS feed XML as a byte slice
func (g *Generator) GenerateFeed(maxItems int) ([]byte, error) {
recordings, err := g.fileStore.GetRecordedFiles(maxItems)
recordings, err := g.scanRecordings(maxItems)
if err != nil {
return nil, fmt.Errorf("failed to get recorded files: %w", err)
return nil, fmt.Errorf("failed to scan recordings: %w", err)
}
feed := &feeds.Feed{
@@ -70,12 +86,12 @@ func (g *Generator) GenerateFeed(maxItems int) ([]byte, error) {
Title: fmt.Sprintf("Recording %s", rec.RecordedAt.Format("2006-01-02 15:04")),
Link: &feeds.Link{Href: fileURL},
Description: fmt.Sprintf("Icecast stream recording from %s. Duration: %s",
rec.RecordedAt.Format(time.RFC1123), rec.Duration.String()),
rec.RecordedAt.Format(time.RFC1123), rec.Duration.String()),
Created: rec.RecordedAt,
Id: rec.Hash,
Enclosure: &feeds.Enclosure{
Enclosure: &feeds.Enclosure{
Url: fileURL,
Length: fmt.Sprintf("%d", rec.FileSize), // Convert int64 to string
Length: fmt.Sprintf("%d", rec.FileSize),
Type: "audio/mpeg",
},
}
@@ -90,3 +106,79 @@ func (g *Generator) GenerateFeed(maxItems int) ([]byte, error) {
slog.Debug("RSS feed generated", "itemCount", len(feed.Items))
return []byte(rssFeed), nil
}
// scanRecordings scans the recordings directory and returns metadata about the files
func (g *Generator) scanRecordings(maxItems int) ([]RecordingInfo, error) {
var recordings []RecordingInfo
err := filepath.WalkDir(g.recordingsPath, func(path string, d fs.DirEntry, err error) error {
if err != nil {
return err
}
// Skip directories
if d.IsDir() {
return nil
}
// Only process mp3 files
if !strings.HasSuffix(strings.ToLower(d.Name()), ".mp3") {
return nil
}
// Extract timestamp from filename
matches := recordingPattern.FindStringSubmatch(d.Name())
if len(matches) < 2 {
// Skip files not matching our pattern
slog.Debug("Skipping non-conforming filename", "filename", d.Name())
return nil
}
// Parse the timestamp
timestamp, err := time.Parse("20060102_150405", matches[1])
if err != nil {
slog.Warn("Failed to parse timestamp from filename", "filename", d.Name(), "error", err)
return nil
}
info, err := d.Info()
if err != nil {
slog.Warn("Failed to get file info", "filename", d.Name(), "error", err)
return nil
}
// Calculate an estimated duration based on file size
// Assuming ~128kbps MP3 bitrate: 16KB per second
estimatedDuration := time.Duration(info.Size() / 16000) * time.Second
// Generate a stable hash for the recording
filename := filepath.Base(path)
fileHash := hash.GenerateGUID(g.streamName, timestamp, filename)
recordings = append(recordings, RecordingInfo{
Filename: filename,
Hash: fileHash,
FileSize: info.Size(),
Duration: estimatedDuration,
RecordedAt: timestamp,
})
return nil
})
if err != nil {
return nil, fmt.Errorf("failed to walk recordings directory: %w", err)
}
// Sort recordings by timestamp (newest first)
sort.Slice(recordings, func(i, j int) bool {
return recordings[i].RecordedAt.After(recordings[j].RecordedAt)
})
// Limit number of items if specified
if maxItems > 0 && maxItems < len(recordings) {
recordings = recordings[:maxItems]
}
return recordings, nil
}