goes away from content based guids to filename based

This commit is contained in:
Dmitrii Andreev
2025-04-07 11:55:08 +03:00
parent 42aed9decb
commit b0b46579b5
5 changed files with 251 additions and 153 deletions

View File

@@ -1,118 +1,173 @@
package database
import (
"database/sql"
"encoding/json"
"fmt"
"log/slog"
"os"
"path/filepath"
"sort"
"sync"
"time"
_ "github.com/mattn/go-sqlite3" // SQLite driver
)
// DB represents the database connection.
type DB struct {
*sql.DB
// FileStore represents an in-memory store with optional file persistence
type FileStore struct {
files map[string]*RecordedFile // Map of filename -> file data
storePath string // Path to persistence file, if empty no persistence
mu sync.RWMutex // Mutex for thread safety
}
// RecordedFile represents a row in the recorded_files table.
// RecordedFile represents information about a recorded file.
type RecordedFile struct {
ID int64
Filename string
Hash string
FileSize int64 // Size in bytes
Duration time.Duration // Duration of the recording
RecordedAt time.Time
ID int64 `json:"id"`
Filename string `json:"filename"`
Hash string `json:"hash"` // GUID for the file
FileSize int64 `json:"fileSize"`
Duration time.Duration `json:"duration"`
RecordedAt time.Time `json:"recordedAt"`
}
// InitDB initializes the SQLite database connection and creates tables if they don't exist.
func InitDB(dataSourceName string) (*DB, error) {
slog.Info("Initializing database connection", "path", dataSourceName)
db, err := sql.Open("sqlite3", dataSourceName)
if err != nil {
return nil, fmt.Errorf("failed to open database: %w", err)
// InitDB initializes a new FileStore.
// If dataSourceName is provided, it will be used as the path to persist the store.
func InitDB(dataSourceName string) (*FileStore, error) {
slog.Info("Initializing file store", "path", dataSourceName)
fs := &FileStore{
files: make(map[string]*RecordedFile),
storePath: dataSourceName,
}
if err = db.Ping(); err != nil {
return nil, fmt.Errorf("failed to connect to database: %w", err)
// If a data source is provided, try to load existing data
if dataSourceName != "" {
if err := fs.loadFromFile(); err != nil {
// If file doesn't exist, that's fine - we'll create it when we save
if !os.IsNotExist(err) {
slog.Error("Failed to load file store data", "error", err)
}
}
}
// Create table if not exists
query := `
CREATE TABLE IF NOT EXISTS recorded_files (
id INTEGER PRIMARY KEY AUTOINCREMENT,
filename TEXT NOT NULL UNIQUE,
hash TEXT NOT NULL UNIQUE,
filesize INTEGER NOT NULL,
duration INTEGER NOT NULL, -- Store duration in seconds or milliseconds
recorded_at DATETIME NOT NULL
);
CREATE INDEX IF NOT EXISTS idx_recorded_at ON recorded_files (recorded_at);
`
if _, err = db.Exec(query); err != nil {
return nil, fmt.Errorf("failed to create table: %w", err)
}
slog.Info("Database initialized successfully")
return &DB{db}, nil
slog.Info("File store initialized successfully")
return fs, nil
}
// AddRecordedFile inserts a new record into the database.
func (db *DB) AddRecordedFile(filename, hash string, fileSize int64, duration time.Duration, recordedAt time.Time) (int64, error) {
result, err := db.Exec(
"INSERT INTO recorded_files (filename, hash, filesize, duration, recorded_at) VALUES (?, ?, ?, ?, ?)",
filename, hash, fileSize, int64(duration.Seconds()), recordedAt, // Store duration as seconds
)
if err != nil {
return 0, fmt.Errorf("failed to insert recorded file: %w", err)
// loadFromFile loads the file store data from the persistence file.
func (fs *FileStore) loadFromFile() error {
fs.mu.Lock()
defer fs.mu.Unlock()
if fs.storePath == "" {
return nil // No persistence path set
}
id, err := result.LastInsertId()
data, err := os.ReadFile(fs.storePath)
if err != nil {
return 0, fmt.Errorf("failed to get last insert ID: %w", err)
return err
}
slog.Debug("Added recorded file to database", "id", id, "filename", filename, "hash", hash)
var files []*RecordedFile
if err := json.Unmarshal(data, &files); err != nil {
return fmt.Errorf("failed to parse file store data: %w", err)
}
// Reset the map and repopulate it
fs.files = make(map[string]*RecordedFile, len(files))
for _, file := range files {
fs.files[file.Filename] = file
}
slog.Info("Loaded file store data", "count", len(fs.files))
return nil
}
// saveToFile persists the file store data to disk.
func (fs *FileStore) saveToFile() error {
fs.mu.RLock()
defer fs.mu.RUnlock()
if fs.storePath == "" {
return nil // No persistence path set
}
// Create directory if it doesn't exist
dir := filepath.Dir(fs.storePath)
if err := os.MkdirAll(dir, 0755); err != nil {
return fmt.Errorf("failed to create directory for file store: %w", err)
}
// Convert map to slice
files := make([]*RecordedFile, 0, len(fs.files))
for _, file := range fs.files {
files = append(files, file)
}
data, err := json.MarshalIndent(files, "", " ")
if err != nil {
return fmt.Errorf("failed to marshal file store data: %w", err)
}
if err := os.WriteFile(fs.storePath, data, 0644); err != nil {
return fmt.Errorf("failed to write file store data: %w", err)
}
slog.Debug("Saved file store data", "count", len(files))
return nil
}
// AddRecordedFile adds a file to the store.
func (fs *FileStore) AddRecordedFile(filename, hash string, fileSize int64, duration time.Duration, recordedAt time.Time) (int64, error) {
fs.mu.Lock()
defer fs.mu.Unlock()
// Generate a unique ID (just use timestamp as we don't need real DB IDs)
id := time.Now().UnixNano()
// Store the file
fs.files[filename] = &RecordedFile{
ID: id,
Filename: filename,
Hash: hash,
FileSize: fileSize,
Duration: duration,
RecordedAt: recordedAt,
}
// Persist changes
if err := fs.saveToFile(); err != nil {
slog.Error("Failed to save file store data", "error", err)
}
slog.Debug("Added recorded file to store", "id", id, "filename", filename, "hash", hash)
return id, nil
}
// GetRecordedFiles retrieves all recorded files, ordered by recording date descending.
func (db *DB) GetRecordedFiles(limit int) ([]RecordedFile, error) {
query := "SELECT id, filename, hash, filesize, duration, recorded_at FROM recorded_files ORDER BY recorded_at DESC"
args := []interface{}{}
if limit > 0 {
query += " LIMIT ?"
args = append(args, limit)
func (fs *FileStore) GetRecordedFiles(limit int) ([]RecordedFile, error) {
fs.mu.RLock()
defer fs.mu.RUnlock()
// Convert map to slice for sorting
files := make([]RecordedFile, 0, len(fs.files))
for _, file := range fs.files {
files = append(files, *file)
}
rows, err := db.Query(query, args...)
if err != nil {
return nil, fmt.Errorf("failed to query recorded files: %w", err)
}
defer func() {
if err := rows.Close(); err != nil {
slog.Error("Failed to close rows", "error", err)
}
}()
// Sort by recorded_at descending
sort.Slice(files, func(i, j int) bool {
return files[i].RecordedAt.After(files[j].RecordedAt)
})
var files []RecordedFile
for rows.Next() {
var rf RecordedFile
var durationSeconds int64
if err := rows.Scan(&rf.ID, &rf.Filename, &rf.Hash, &rf.FileSize, &durationSeconds, &rf.RecordedAt); err != nil {
return nil, fmt.Errorf("failed to scan recorded file row: %w", err)
}
rf.Duration = time.Duration(durationSeconds) * time.Second
files = append(files, rf)
}
if err = rows.Err(); err != nil {
return nil, fmt.Errorf("error iterating over recorded file rows: %w", err)
// Apply limit if provided
if limit > 0 && limit < len(files) {
files = files[:limit]
}
return files, nil
}
// Close closes the database connection.
func (db *DB) Close() error {
slog.Info("Closing database connection")
return db.DB.Close()
// Close closes the file store and ensures all data is persisted.
func (fs *FileStore) Close() error {
slog.Info("Closing file store")
return fs.saveToFile()
}

View File

@@ -4,56 +4,52 @@ import (
"crypto/sha256"
"encoding/hex"
"fmt"
"io"
"log/slog"
"os"
"path/filepath"
"time"
)
const (
// hashChunkSize defines how many bytes of the file to read for hashing.
// 1MB should be sufficient for uniqueness in most cases while being fast.
hashChunkSize = 1 * 1024 * 1024 // 1 MB
)
// GenerateFileHash generates a SHA256 hash based on the filename and the first `hashChunkSize` bytes of the file content.
// This avoids reading the entire file for large recordings.
func GenerateFileHash(filePath string) (string, error) {
// GenerateGUID generates a unique identifier based on file metadata without reading file content.
// It uses recording metadata (stream name, recording start time, filename) to create a deterministic GUID.
func GenerateGUID(streamName string, recordedAt time.Time, filePath string) string {
filename := filepath.Base(filePath)
slog.Debug("Generating hash", "file", filePath, "chunkSize", hashChunkSize)
file, err := os.Open(filePath)
if err != nil {
return "", fmt.Errorf("failed to open file %s for hashing: %w", filePath, err)
}
defer func() {
if err := file.Close(); err != nil {
slog.Error("Failed to close file", "path", filePath, "error", err)
}
}()
// Create a unique string combining stream name, recording time (rounded to seconds),
// and filename for uniqueness
input := fmt.Sprintf("%s:%s:%s",
streamName,
recordedAt.UTC().Format(time.RFC3339),
filename)
slog.Debug("Generating GUID", "input", input)
// Hash the combined string to create a GUID
hasher := sha256.New()
hasher.Write([]byte(input))
guid := hex.EncodeToString(hasher.Sum(nil))
// Include filename in the hash
if _, err := hasher.Write([]byte(filename)); err != nil {
return "", fmt.Errorf("failed to write filename to hasher: %w", err)
}
// Read only the first part of the file
chunk := make([]byte, hashChunkSize)
bytesRead, err := file.Read(chunk)
if err != nil && err != io.EOF {
return "", fmt.Errorf("failed to read file chunk %s: %w", filePath, err)
}
// Hash the chunk that was read
if _, err := hasher.Write(chunk[:bytesRead]); err != nil {
return "", fmt.Errorf("failed to write file chunk to hasher: %w", err)
}
hashBytes := hasher.Sum(nil)
hashString := hex.EncodeToString(hashBytes)
slog.Debug("Generated hash", "file", filePath, "hash", hashString)
return hashString, nil
slog.Debug("Generated GUID", "file", filePath, "guid", guid)
return guid
}
// GenerateFileHash remains for backward compatibility but is now built on GenerateGUID
// This uses file metadata from the path and mtime instead of reading file content
func GenerateFileHash(filePath string) (string, error) {
fileInfo, err := os.Stat(filePath)
if err != nil {
return "", fmt.Errorf("failed to stat file %s: %w", filePath, err)
}
// Extract stream name from directory structure if possible, otherwise use parent dir
dir := filepath.Dir(filePath)
streamName := filepath.Base(dir)
// Use file modification time as a proxy for recording time
recordedAt := fileInfo.ModTime()
guid := GenerateGUID(streamName, recordedAt, filePath)
slog.Debug("Generated hash using metadata", "file", filePath, "hash", guid)
return guid, nil
}

View File

@@ -20,15 +20,16 @@ import (
type Recorder struct {
tempPath string
recordingsPath string
db *database.DB
db *database.FileStore
client *http.Client
mu sync.Mutex // Protects access to isRecording
isRecording bool
cancelFunc context.CancelFunc // To stop the current recording
streamName string // Name of the stream being recorded
}
// New creates a new Recorder instance.
func New(tempPath, recordingsPath string, db *database.DB) (*Recorder, error) {
func New(tempPath, recordingsPath string, db *database.FileStore, streamName string) (*Recorder, error) {
// Ensure temp and recordings directories exist
if err := os.MkdirAll(tempPath, 0755); err != nil {
return nil, fmt.Errorf("failed to create temp directory %s: %w", tempPath, err)
@@ -41,6 +42,7 @@ func New(tempPath, recordingsPath string, db *database.DB) (*Recorder, error) {
tempPath: tempPath,
recordingsPath: recordingsPath,
db: db,
streamName: streamName,
client: &http.Client{
// Use a longer timeout for downloading the stream
Timeout: 0, // No timeout for the download itself, rely on context cancellation
@@ -159,17 +161,6 @@ func (r *Recorder) recordStream(ctx context.Context, streamURL string) {
endTime := time.Now()
duration := endTime.Sub(startTime)
// Generate hash
fileHash, err := hash.GenerateFileHash(tempFilePath)
if err != nil {
slog.Error("Failed to generate file hash", "path", tempFilePath, "error", err)
if err := os.Remove(tempFilePath); err != nil {
slog.Error("Failed to remove temporary file after hash error", "path", tempFilePath, "error", err)
}
tempFilePath = "" // Prevent deferred cleanup
return
}
// Generate final filename (e.g., based on timestamp)
finalFilename := fmt.Sprintf("recording_%s.mp3", startTime.Format("20060102_150405")) // Assuming mp3, adjust if needed
finalFilename = sanitizeFilename(finalFilename) // Ensure filename is valid
@@ -187,10 +178,13 @@ func (r *Recorder) recordStream(ctx context.Context, streamURL string) {
tempFilePath = "" // File moved, clear path to prevent deferred cleanup
slog.Info("Recording saved", "path", finalPath, "size", bytesWritten, "duration", duration)
// Generate GUID for this recording using the new metadata-based approach
guid := hash.GenerateGUID(r.streamName, startTime, finalFilename)
// Add record to database
_, err = r.db.AddRecordedFile(finalFilename, fileHash, bytesWritten, duration, startTime)
_, err = r.db.AddRecordedFile(finalFilename, guid, bytesWritten, duration, startTime)
if err != nil {
slog.Error("Failed to add recording to database", "filename", finalFilename, "hash", fileHash, "error", err)
slog.Error("Failed to add recording to database", "filename", finalFilename, "guid", guid, "error", err)
// Consider how to handle this - maybe retry later? For now, just log.
}
}

View File

@@ -35,7 +35,7 @@ type Item struct {
Link string `xml:"link"` // Link to the specific recording file
Description string `xml:"description"` // Can include duration, size etc.
PubDate string `xml:"pubDate"` // RFC1123Z format of recording time
GUID GUID `xml:"guid"` // Unique identifier (using file hash)
GUID GUID `xml:"guid"` // Unique identifier (using metadata-based GUID)
Enclosure Enclosure `xml:"enclosure"` // Describes the media file
}
@@ -56,7 +56,7 @@ type Enclosure struct {
// Generator creates RSS feeds.
type Generator struct {
db *database.DB
fileStore *database.FileStore
feedBaseURL string // Base URL for links in the feed (e.g., http://server.com/recordings/)
recordingsPath string // Local path to recordings (needed for file info, maybe not directly used in feed)
feedTitle string
@@ -64,7 +64,7 @@ type Generator struct {
}
// New creates a new RSS Generator instance.
func New(db *database.DB, cfg *config.Config, title, description string) *Generator {
func New(fileStore *database.FileStore, cfg *config.Config, title, description string) *Generator {
// Ensure the base URL for recordings ends with a slash
baseURL := cfg.RSSFeedURL // This should be the URL base for *serving* files
if baseURL == "" {
@@ -76,7 +76,7 @@ func New(db *database.DB, cfg *config.Config, title, description string) *Genera
}
return &Generator{
db: db,
fileStore: fileStore,
feedBaseURL: baseURL,
recordingsPath: cfg.RecordingsPath,
feedTitle: title,
@@ -87,7 +87,7 @@ func New(db *database.DB, cfg *config.Config, title, description string) *Genera
// GenerateFeed fetches recordings and produces the RSS feed XML as a byte slice.
func (g *Generator) GenerateFeed(maxItems int) ([]byte, error) {
slog.Debug("Generating RSS feed", "maxItems", maxItems)
recordings, err := g.db.GetRecordedFiles(maxItems)
recordings, err := g.fileStore.GetRecordedFiles(maxItems)
if err != nil {
return nil, fmt.Errorf("failed to get recorded files for RSS feed: %w", err)
}
@@ -106,7 +106,7 @@ func (g *Generator) GenerateFeed(maxItems int) ([]byte, error) {
Description: fmt.Sprintf("Icecast stream recording from %s. Duration: %s.", rec.RecordedAt.Format(time.RFC1123), rec.Duration.String()),
PubDate: rec.RecordedAt.Format(time.RFC1123Z), // Use RFC1123Z for pubDate
GUID: GUID{
IsPermaLink: false, // The hash itself is not a permalink URL
IsPermaLink: false, // The guid itself is not a permalink URL
Value: rec.Hash,
},
Enclosure: Enclosure{