Fix snapshot overwrite logic to preserve successful responses
- Prevent overwriting snapshots that have valid response codes - Ensure URL is removed from queue when snapshot update is skipped - Add last_crawled timestamp tracking for better crawl scheduling - Remove SkipIdenticalContent flag, simplify content deduplication logic - Update database schema with last_crawled column and indexes 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude <noreply@anthropic.com>
This commit is contained in:
38
TODO.md
Normal file
38
TODO.md
Normal file
@@ -0,0 +1,38 @@
|
|||||||
|
# TODO
|
||||||
|
|
||||||
|
## Outstanding Issues
|
||||||
|
|
||||||
|
### 1. Ctrl+C Signal Handling Issue
|
||||||
|
|
||||||
|
**Problem**: The crawler sometimes doesn't exit properly when Ctrl+C is pressed.
|
||||||
|
|
||||||
|
**Root Cause**: The main thread gets stuck in blocking operations before it can check for signals:
|
||||||
|
- Database operations in the polling loop (`cmd/crawler/crawler.go:239-250`)
|
||||||
|
- Job queueing when channel is full (`jobs <- url` can block if workers are slow)
|
||||||
|
- Long-running database transactions
|
||||||
|
|
||||||
|
**Location**: `cmd/crawler/crawler.go` - main polling loop starting at line 233
|
||||||
|
|
||||||
|
**Solution**: Add signal/context checking to blocking operations:
|
||||||
|
- Use cancellable context instead of `context.Background()` for database operations
|
||||||
|
- Make job queueing non-blocking or context-aware
|
||||||
|
- Add timeout/cancellation to database operations
|
||||||
|
|
||||||
|
### 2. fetchSnapshotsFromHistory() Doesn't Work with --skip-identical-content=true
|
||||||
|
|
||||||
|
**Problem**: When `--skip-identical-content=true` (default), URLs with unchanged content get continuously re-queued.
|
||||||
|
|
||||||
|
**Root Cause**: The function tracks when content last changed, not when URLs were last crawled:
|
||||||
|
- Identical content → no new snapshot created
|
||||||
|
- Query finds old snapshot timestamp → re-queues URL
|
||||||
|
- Creates infinite loop of re-crawling unchanged content
|
||||||
|
|
||||||
|
**Location**: `cmd/crawler/crawler.go:388-470` - `fetchSnapshotsFromHistory()` function
|
||||||
|
|
||||||
|
**Solution Options**:
|
||||||
|
1. Add `last_crawled` timestamp to URLs table
|
||||||
|
2. Create separate `crawl_attempts` table
|
||||||
|
3. Always create snapshot entries (even for duplicates) but mark them as such
|
||||||
|
4. Modify logic to work with existing schema constraints
|
||||||
|
|
||||||
|
**Current Status**: Function assumes `SkipIdenticalContent=false` per original comment at line 391.
|
||||||
@@ -307,139 +307,17 @@ func enqueueSeedURLs(ctx context.Context, tx *sqlx.Tx) error {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
//func fetchSnapshotsFromHistory(ctx context.Context, tx *sqlx.Tx, num int, age int) (int, error) {
|
|
||||||
// // Select <num> snapshots from snapshots table for recrawling
|
|
||||||
// // They should be at least <age> days old, random order,
|
|
||||||
// // one snapshot per host if possible.
|
|
||||||
// historyCtx := contextutil.ContextWithComponent(context.Background(), "fetchSnapshotsFromHistory")
|
|
||||||
// contextlog.LogDebugWithContext(historyCtx, logging.GetSlogger(), "Looking for %d snapshots at least %d days old to recrawl", num, age)
|
|
||||||
//
|
|
||||||
// // Calculate the cutoff date
|
|
||||||
// cutoffDate := time.Now().AddDate(0, 0, -age)
|
|
||||||
//
|
|
||||||
// // SQL to select one random URL per host from snapshots older than the cutoff date
|
|
||||||
// // TODO implement when gopher enabled
|
|
||||||
// query := `
|
|
||||||
// WITH hosts AS (
|
|
||||||
// SELECT DISTINCT host
|
|
||||||
// FROM snapshots
|
|
||||||
// WHERE url ~ '^gemini://[^/]+/?$'
|
|
||||||
// AND gemtext IS NOT NULL
|
|
||||||
// AND timestamp < $1
|
|
||||||
// AND error IS NULL
|
|
||||||
// ),
|
|
||||||
// ranked_snapshots AS (
|
|
||||||
// SELECT
|
|
||||||
// s.url,
|
|
||||||
// s.host,
|
|
||||||
// s.timestamp,
|
|
||||||
// ROW_NUMBER() OVER (PARTITION BY s.host ORDER BY RANDOM()) as rank
|
|
||||||
// FROM snapshots s
|
|
||||||
// JOIN hosts h ON s.host = h.host
|
|
||||||
// WHERE s.timestamp < $1
|
|
||||||
// )
|
|
||||||
// SELECT url, host
|
|
||||||
// FROM ranked_snapshots
|
|
||||||
// WHERE rank = 1
|
|
||||||
// ORDER BY RANDOM()
|
|
||||||
// LIMIT $2
|
|
||||||
// `
|
|
||||||
//
|
|
||||||
// type SnapshotURL struct {
|
|
||||||
// URL string `db:"url"`
|
|
||||||
// Host string `db:"host"`
|
|
||||||
// }
|
|
||||||
//
|
|
||||||
// // Execute the query
|
|
||||||
// var snapshotURLs []SnapshotURL
|
|
||||||
// err := tx.Select(&snapshotURLs, query, cutoffDate, num)
|
|
||||||
// if err != nil {
|
|
||||||
// return 0, err
|
|
||||||
// }
|
|
||||||
//
|
|
||||||
// if len(snapshotURLs) == 0 {
|
|
||||||
// historyCtx := contextutil.ContextWithComponent(context.Background(), "fetchSnapshotsFromHistory")
|
|
||||||
// contextlog.LogInfoWithContext(historyCtx, logging.GetSlogger(), "No old snapshots found to recrawl")
|
|
||||||
// return 0, nil
|
|
||||||
// }
|
|
||||||
//
|
|
||||||
// // For each selected snapshot, add the URL to the urls table
|
|
||||||
// insertCount := 0
|
|
||||||
// for _, snapshot := range snapshotURLs {
|
|
||||||
// err := gemdb.Database.InsertURL(ctx, tx, snapshot.URL)
|
|
||||||
// if err != nil {
|
|
||||||
// logging.LogError("Error inserting URL %s from old snapshot to queue: %v", snapshot.URL, err)
|
|
||||||
// return 0, err
|
|
||||||
// }
|
|
||||||
// insertCount++
|
|
||||||
// }
|
|
||||||
//
|
|
||||||
// // Note: The transaction is committed by the caller (runJobScheduler),
|
|
||||||
// // not here. This function is called as part of a larger transaction.
|
|
||||||
// if insertCount > 0 {
|
|
||||||
// historyCtx := contextutil.ContextWithComponent(context.Background(), "fetchSnapshotsFromHistory")
|
|
||||||
// contextlog.LogInfoWithContext(historyCtx, logging.GetSlogger(), "Added %d old URLs to recrawl queue", insertCount)
|
|
||||||
// }
|
|
||||||
//
|
|
||||||
// return insertCount, nil
|
|
||||||
//}
|
|
||||||
|
|
||||||
func fetchSnapshotsFromHistory(ctx context.Context, tx *sqlx.Tx, num int, age int) (int, error) {
|
func fetchSnapshotsFromHistory(ctx context.Context, tx *sqlx.Tx, num int, age int) (int, error) {
|
||||||
// Select <num> snapshots from snapshots table for recrawling
|
// Select <num> snapshots from snapshots table for recrawling
|
||||||
// Find URLs where the LATEST crawl attempt (via last_crawled) is at least <age> days old
|
// Find URLs where the LATEST crawl attempt (via last_crawled) is at least <age> days old
|
||||||
// Now works correctly with SkipIdenticalContent=true - fixes infinite recrawl loop
|
// Uses last_crawled timestamp to track actual crawl attempts regardless of content changes
|
||||||
historyCtx := contextutil.ContextWithComponent(context.Background(), "fetchSnapshotsFromHistory")
|
historyCtx := contextutil.ContextWithComponent(context.Background(), "fetchSnapshotsFromHistory")
|
||||||
contextlog.LogDebugWithContext(historyCtx, logging.GetSlogger(), "Looking for %d URLs whose latest crawl attempt is at least %d days old to recrawl", num, age)
|
contextlog.LogDebugWithContext(historyCtx, logging.GetSlogger(), "Looking for %d URLs whose latest crawl attempt is at least %d days old to recrawl", num, age)
|
||||||
|
|
||||||
// Calculate the cutoff date
|
// Calculate the cutoff date
|
||||||
cutoffDate := time.Now().AddDate(0, 0, -age)
|
cutoffDate := time.Now().AddDate(0, 0, -age)
|
||||||
|
|
||||||
// SQL to select URLs where the latest crawl attempt is older than the cutoff date
|
// Use the query from db_queries.go to find URLs that need re-crawling
|
||||||
// Now uses last_crawled timestamp to track actual crawl attempts
|
|
||||||
// Works correctly with SkipIdenticalContent setting - avoids infinite recrawl loop
|
|
||||||
// One URL per host for diversity, focusing on root domain URLs
|
|
||||||
query := `
|
|
||||||
WITH latest_attempts AS (
|
|
||||||
SELECT
|
|
||||||
url,
|
|
||||||
host,
|
|
||||||
COALESCE(MAX(last_crawled), '1970-01-01'::timestamp) as latest_attempt
|
|
||||||
FROM snapshots
|
|
||||||
WHERE url ~ '^gemini://[^/]+/?$'
|
|
||||||
GROUP BY url, host
|
|
||||||
),
|
|
||||||
root_urls_with_content AS (
|
|
||||||
SELECT DISTINCT
|
|
||||||
la.url,
|
|
||||||
la.host,
|
|
||||||
la.latest_attempt
|
|
||||||
FROM latest_attempts la
|
|
||||||
JOIN snapshots s ON s.url = la.url
|
|
||||||
WHERE (s.gemtext IS NOT NULL OR s.data IS NOT NULL)
|
|
||||||
AND s.response_code BETWEEN 20 AND 29
|
|
||||||
),
|
|
||||||
eligible_urls AS (
|
|
||||||
SELECT
|
|
||||||
url,
|
|
||||||
host,
|
|
||||||
latest_attempt
|
|
||||||
FROM root_urls_with_content
|
|
||||||
WHERE latest_attempt < $1
|
|
||||||
),
|
|
||||||
ranked_urls AS (
|
|
||||||
SELECT
|
|
||||||
url,
|
|
||||||
host,
|
|
||||||
latest_attempt,
|
|
||||||
ROW_NUMBER() OVER (PARTITION BY host ORDER BY RANDOM()) as rank
|
|
||||||
FROM eligible_urls
|
|
||||||
)
|
|
||||||
SELECT url, host
|
|
||||||
FROM ranked_urls
|
|
||||||
WHERE rank = 1
|
|
||||||
ORDER BY RANDOM()
|
|
||||||
LIMIT $2
|
|
||||||
`
|
|
||||||
|
|
||||||
type SnapshotURL struct {
|
type SnapshotURL struct {
|
||||||
URL string `db:"url"`
|
URL string `db:"url"`
|
||||||
@@ -448,7 +326,7 @@ func fetchSnapshotsFromHistory(ctx context.Context, tx *sqlx.Tx, num int, age in
|
|||||||
|
|
||||||
// Execute the query
|
// Execute the query
|
||||||
var snapshotURLs []SnapshotURL
|
var snapshotURLs []SnapshotURL
|
||||||
err := tx.Select(&snapshotURLs, query, cutoffDate, num)
|
err := tx.Select(&snapshotURLs, gemdb.SQL_FETCH_SNAPSHOTS_FROM_HISTORY, cutoffDate, num)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return 0, err
|
return 0, err
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -196,15 +196,15 @@ func WorkOnUrl(ctx context.Context, tx *sqlx.Tx, url string) (err error) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Check if we should skip a potentially
|
// Check if we should skip a potentially
|
||||||
// identical snapshot
|
// identical snapshot with one from history
|
||||||
skipIdentical, err := shouldSkipIdenticalSnapshot(ctx, tx, s)
|
isIdentical, err := isContentIdentical(ctx, tx, s)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
if skipIdentical {
|
if isIdentical {
|
||||||
contextlog.LogInfoWithContext(ctx, logging.GetSlogger(), "Content identical to existing snapshot, recording crawl attempt")
|
contextlog.LogInfoWithContext(ctx, logging.GetSlogger(), "Content identical to existing snapshot, updating crawl timestamp")
|
||||||
// Record the crawl attempt to track that we processed this URL
|
// Update the last_crawled timestamp to track that we processed this URL
|
||||||
err = gemdb.Database.RecordCrawlAttempt(ctx, tx, s)
|
err = gemdb.Database.UpdateLastCrawled(ctx, tx, s.URL.String())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
@@ -222,37 +222,46 @@ func WorkOnUrl(ctx context.Context, tx *sqlx.Tx, url string) (err error) {
|
|||||||
|
|
||||||
// Save the snapshot and remove the URL from the queue
|
// Save the snapshot and remove the URL from the queue
|
||||||
if s.Error.ValueOrZero() != "" {
|
if s.Error.ValueOrZero() != "" {
|
||||||
contextlog.LogInfoWithContext(ctx, logging.GetSlogger(), "%2d %s", s.ResponseCode.ValueOrZero(), s.Error.ValueOrZero())
|
// Only save error if we didn't have any valid
|
||||||
|
// snapshot data from a previous crawl!
|
||||||
|
shouldUpdateSnapshot, err := shouldUpdateSnapshotData(ctx, tx, s)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if shouldUpdateSnapshot {
|
||||||
|
contextlog.LogInfoWithContext(ctx, logging.GetSlogger(), "%2d %s", s.ResponseCode.ValueOrZero(), s.Error.ValueOrZero())
|
||||||
|
return saveSnapshotAndRemoveURL(ctx, tx, s)
|
||||||
|
} else {
|
||||||
|
contextlog.LogInfoWithContext(ctx, logging.GetSlogger(), "%2d %s (but old content exists, not updating)", s.ResponseCode.ValueOrZero(), s.Error.ValueOrZero())
|
||||||
|
return removeURL(ctx, tx, s.URL.String())
|
||||||
|
}
|
||||||
} else {
|
} else {
|
||||||
contextlog.LogInfoWithContext(ctx, logging.GetSlogger(), "%2d", s.ResponseCode.ValueOrZero())
|
contextlog.LogInfoWithContext(ctx, logging.GetSlogger(), "%2d", s.ResponseCode.ValueOrZero())
|
||||||
|
return saveSnapshotAndRemoveURL(ctx, tx, s)
|
||||||
}
|
}
|
||||||
return saveSnapshotAndRemoveURL(ctx, tx, s)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func shouldSkipIdenticalSnapshot(ctx context.Context, tx *sqlx.Tx, s *snapshot.Snapshot) (bool, error) {
|
func shouldUpdateSnapshotData(ctx context.Context, tx *sqlx.Tx, s *snapshot.Snapshot) (bool, error) {
|
||||||
// Check if content is identical to previous snapshot and we should skip further processing
|
prevSnapshot, err := gemdb.Database.GetLatestSnapshot(ctx, tx, s.URL.String())
|
||||||
if config.CONFIG.SkipIdenticalContent {
|
if err != nil {
|
||||||
identical, err := gemdb.Database.IsContentIdentical(ctx, tx, s)
|
return false, err
|
||||||
if err != nil {
|
|
||||||
return false, err
|
|
||||||
}
|
|
||||||
if identical {
|
|
||||||
return true, nil
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
// We write every Gemini capsule, but still
|
if prevSnapshot == nil {
|
||||||
// skip identical pages that aren't capsules.
|
return true, nil
|
||||||
if s.MimeType.String != "text/gemini" {
|
}
|
||||||
identical, err := gemdb.Database.IsContentIdentical(ctx, tx, s)
|
if prevSnapshot.ResponseCode.Valid {
|
||||||
if err != nil {
|
return false, nil
|
||||||
return false, err
|
}
|
||||||
}
|
return true, nil
|
||||||
if identical {
|
}
|
||||||
return true, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
|
func isContentIdentical(ctx context.Context, tx *sqlx.Tx, s *snapshot.Snapshot) (bool, error) {
|
||||||
|
// Always check if content is identical to previous snapshot
|
||||||
|
identical, err := gemdb.Database.IsContentIdentical(ctx, tx, s)
|
||||||
|
if err != nil {
|
||||||
|
return false, err
|
||||||
}
|
}
|
||||||
return false, nil
|
return identical, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// storeLinks checks and stores the snapshot links in the database.
|
// storeLinks checks and stores the snapshot links in the database.
|
||||||
|
|||||||
@@ -9,19 +9,18 @@ import (
|
|||||||
|
|
||||||
// Config holds the application configuration loaded from environment variables.
|
// Config holds the application configuration loaded from environment variables.
|
||||||
type Config struct {
|
type Config struct {
|
||||||
PgURL string
|
PgURL string
|
||||||
LogLevel slog.Level // Logging level (debug, info, warn, error)
|
LogLevel slog.Level // Logging level (debug, info, warn, error)
|
||||||
MaxResponseSize int // Maximum size of response in bytes
|
MaxResponseSize int // Maximum size of response in bytes
|
||||||
MaxDbConnections int // Maximum number of database connections.
|
MaxDbConnections int // Maximum number of database connections.
|
||||||
NumOfWorkers int // Number of concurrent workers
|
NumOfWorkers int // Number of concurrent workers
|
||||||
ResponseTimeout int // Timeout for responses in seconds
|
ResponseTimeout int // Timeout for responses in seconds
|
||||||
BlacklistPath string // File that has blacklisted strings of "host:port"
|
BlacklistPath string // File that has blacklisted strings of "host:port"
|
||||||
WhitelistPath string // File with URLs that should always be crawled regardless of blacklist
|
WhitelistPath string // File with URLs that should always be crawled regardless of blacklist
|
||||||
DryRun bool // If false, don't write to disk
|
DryRun bool // If false, don't write to disk
|
||||||
GopherEnable bool // Enable Gopher crawling
|
GopherEnable bool // Enable Gopher crawling
|
||||||
SeedUrlPath string // Add URLs from file to queue
|
SeedUrlPath string // Add URLs from file to queue
|
||||||
SkipIdenticalContent bool // When true, skip storing snapshots with identical content
|
SkipIfUpdatedDays int // Skip re-crawling URLs updated within this many days (0 to disable)
|
||||||
SkipIfUpdatedDays int // Skip re-crawling URLs updated within this many days (0 to disable, default 0)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
var CONFIG Config //nolint:gochecknoglobals
|
var CONFIG Config //nolint:gochecknoglobals
|
||||||
@@ -39,7 +38,6 @@ func Initialize() *Config {
|
|||||||
maxResponseSize := flag.Int("max-response-size", 1024*1024, "Maximum size of response in bytes")
|
maxResponseSize := flag.Int("max-response-size", 1024*1024, "Maximum size of response in bytes")
|
||||||
responseTimeout := flag.Int("response-timeout", 10, "Timeout for network responses in seconds")
|
responseTimeout := flag.Int("response-timeout", 10, "Timeout for network responses in seconds")
|
||||||
blacklistPath := flag.String("blacklist-path", "", "File that has blacklist regexes")
|
blacklistPath := flag.String("blacklist-path", "", "File that has blacklist regexes")
|
||||||
skipIdenticalContent := flag.Bool("skip-identical-content", true, "Skip storing snapshots with identical content")
|
|
||||||
skipIfUpdatedDays := flag.Int("skip-if-updated-days", 60, "Skip re-crawling URLs updated within this many days (0 to disable)")
|
skipIfUpdatedDays := flag.Int("skip-if-updated-days", 60, "Skip re-crawling URLs updated within this many days (0 to disable)")
|
||||||
whitelistPath := flag.String("whitelist-path", "", "File with URLs that should always be crawled regardless of blacklist")
|
whitelistPath := flag.String("whitelist-path", "", "File with URLs that should always be crawled regardless of blacklist")
|
||||||
seedUrlPath := flag.String("seed-url-path", "", "File with seed URLs that should be added to the queue immediatelly")
|
seedUrlPath := flag.String("seed-url-path", "", "File with seed URLs that should be added to the queue immediatelly")
|
||||||
@@ -56,7 +54,6 @@ func Initialize() *Config {
|
|||||||
config.WhitelistPath = *whitelistPath
|
config.WhitelistPath = *whitelistPath
|
||||||
config.SeedUrlPath = *seedUrlPath
|
config.SeedUrlPath = *seedUrlPath
|
||||||
config.MaxDbConnections = *maxDbConnections
|
config.MaxDbConnections = *maxDbConnections
|
||||||
config.SkipIdenticalContent = *skipIdenticalContent
|
|
||||||
config.SkipIfUpdatedDays = *skipIfUpdatedDays
|
config.SkipIfUpdatedDays = *skipIfUpdatedDays
|
||||||
|
|
||||||
level, err := ParseSlogLevel(*loglevel)
|
level, err := ParseSlogLevel(*loglevel)
|
||||||
|
|||||||
47
db/db.go
47
db/db.go
@@ -41,7 +41,7 @@ type DbService interface {
|
|||||||
// Snapshot methods
|
// Snapshot methods
|
||||||
SaveSnapshot(ctx context.Context, tx *sqlx.Tx, s *snapshot.Snapshot) error
|
SaveSnapshot(ctx context.Context, tx *sqlx.Tx, s *snapshot.Snapshot) error
|
||||||
OverwriteSnapshot(ctx context.Context, tx *sqlx.Tx, s *snapshot.Snapshot) error
|
OverwriteSnapshot(ctx context.Context, tx *sqlx.Tx, s *snapshot.Snapshot) error
|
||||||
RecordCrawlAttempt(ctx context.Context, tx *sqlx.Tx, s *snapshot.Snapshot) error
|
UpdateLastCrawled(ctx context.Context, tx *sqlx.Tx, url string) error
|
||||||
GetLatestSnapshot(ctx context.Context, tx *sqlx.Tx, url string) (*snapshot.Snapshot, error)
|
GetLatestSnapshot(ctx context.Context, tx *sqlx.Tx, url string) (*snapshot.Snapshot, error)
|
||||||
GetSnapshotAtTimestamp(ctx context.Context, tx *sqlx.Tx, url string, timestamp time.Time) (*snapshot.Snapshot, error)
|
GetSnapshotAtTimestamp(ctx context.Context, tx *sqlx.Tx, url string, timestamp time.Time) (*snapshot.Snapshot, error)
|
||||||
GetAllSnapshotsForURL(ctx context.Context, tx *sqlx.Tx, url string) ([]*snapshot.Snapshot, error)
|
GetAllSnapshotsForURL(ctx context.Context, tx *sqlx.Tx, url string) ([]*snapshot.Snapshot, error)
|
||||||
@@ -374,21 +374,10 @@ func (d *DbServiceImpl) SaveSnapshot(ctx context.Context, tx *sqlx.Tx, s *snapsh
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
// Check if we should skip storing identical content
|
// Always ensure we have current timestamps
|
||||||
if config.CONFIG.SkipIdenticalContent {
|
currentTime := time.Now()
|
||||||
// Use the context-aware version to check for identical content
|
s.Timestamp = null.TimeFrom(currentTime)
|
||||||
identical, err := d.IsContentIdentical(ctx, tx, s)
|
s.LastCrawled = null.TimeFrom(currentTime)
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
} else if identical {
|
|
||||||
contextlog.LogDebugWithContext(dbCtx, logging.GetSlogger(), "Skipping URL with identical content to existing snapshot: %s", s.URL.String())
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Always ensure we have a current timestamp
|
|
||||||
s.Timestamp = null.TimeFrom(time.Now())
|
|
||||||
// last_crawled will be set automatically by database DEFAULT
|
|
||||||
|
|
||||||
// For PostgreSQL, use the global sqlx.NamedQueryContext function
|
// For PostgreSQL, use the global sqlx.NamedQueryContext function
|
||||||
// The SQL_INSERT_SNAPSHOT already has a RETURNING id clause
|
// The SQL_INSERT_SNAPSHOT already has a RETURNING id clause
|
||||||
@@ -423,26 +412,20 @@ func (d *DbServiceImpl) OverwriteSnapshot(ctx context.Context, tx *sqlx.Tx, s *s
|
|||||||
return d.SaveSnapshot(ctx, tx, s)
|
return d.SaveSnapshot(ctx, tx, s)
|
||||||
}
|
}
|
||||||
|
|
||||||
// RecordCrawlAttempt records a crawl attempt without saving full content (when content is identical)
|
// UpdateLastCrawled updates the last_crawled timestamp for the most recent snapshot of a URL
|
||||||
func (d *DbServiceImpl) RecordCrawlAttempt(ctx context.Context, tx *sqlx.Tx, s *snapshot.Snapshot) error {
|
func (d *DbServiceImpl) UpdateLastCrawled(ctx context.Context, tx *sqlx.Tx, url string) error {
|
||||||
dbCtx := contextutil.ContextWithComponent(ctx, "database")
|
dbCtx := contextutil.ContextWithComponent(ctx, "database")
|
||||||
contextlog.LogDebugWithContext(dbCtx, logging.GetSlogger(), "Recording crawl attempt for URL %s", s.URL.String())
|
contextlog.LogDebugWithContext(dbCtx, logging.GetSlogger(), "Updating last_crawled timestamp for URL %s", url)
|
||||||
|
|
||||||
// Check if the context is cancelled before proceeding
|
// Check if the context is cancelled before proceeding
|
||||||
if err := ctx.Err(); err != nil {
|
if err := ctx.Err(); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
// Record the crawl attempt with minimal data
|
// Update the last_crawled timestamp for the most recent snapshot
|
||||||
// timestamp and last_crawled will be set automatically by database DEFAULT
|
_, err := tx.ExecContext(ctx, SQL_UPDATE_LAST_CRAWLED, url)
|
||||||
_, err := tx.ExecContext(ctx, SQL_RECORD_CRAWL_ATTEMPT,
|
|
||||||
s.URL.String(),
|
|
||||||
s.Host,
|
|
||||||
s.MimeType.String,
|
|
||||||
s.ResponseCode.ValueOrZero(),
|
|
||||||
s.Error.String)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return xerrors.NewError(fmt.Errorf("cannot record crawl attempt for URL %s: %w", s.URL.String(), err), 0, "", true)
|
return xerrors.NewError(fmt.Errorf("cannot update last_crawled for URL %s: %w", url, err), 0, "", true)
|
||||||
}
|
}
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
@@ -541,14 +524,6 @@ func (d *DbServiceImpl) IsContentIdentical(ctx context.Context, tx *sqlx.Tx, s *
|
|||||||
return false, err
|
return false, err
|
||||||
}
|
}
|
||||||
|
|
||||||
// Update: Skipped this because empty pages can be valid
|
|
||||||
// ex. pages with redirect headers
|
|
||||||
// Only check for identical content if we have gemtext or data
|
|
||||||
//if (!s.GemText.Valid || s.GemText.String == "") &&
|
|
||||||
// (!s.Data.Valid || len(s.Data.V) == 0) {
|
|
||||||
// return false, nil
|
|
||||||
//}
|
|
||||||
|
|
||||||
// Try to get the latest snapshot for this URL
|
// Try to get the latest snapshot for this URL
|
||||||
latestSnapshot := &snapshot.Snapshot{}
|
latestSnapshot := &snapshot.Snapshot{}
|
||||||
err := tx.GetContext(ctx, latestSnapshot, SQL_GET_LATEST_SNAPSHOT, s.URL.String())
|
err := tx.GetContext(ctx, latestSnapshot, SQL_GET_LATEST_SNAPSHOT, s.URL.String())
|
||||||
|
|||||||
@@ -67,38 +67,10 @@ LIMIT $1
|
|||||||
`
|
`
|
||||||
// New query - always insert a new snapshot without conflict handling
|
// New query - always insert a new snapshot without conflict handling
|
||||||
SQL_INSERT_SNAPSHOT = `
|
SQL_INSERT_SNAPSHOT = `
|
||||||
INSERT INTO snapshots (url, host, timestamp, mimetype, data, gemtext, links, lang, response_code, error, header)
|
INSERT INTO snapshots (url, host, timestamp, mimetype, data, gemtext, links, lang, response_code, error, header, last_crawled)
|
||||||
VALUES (:url, :host, :timestamp, :mimetype, :data, :gemtext, :links, :lang, :response_code, :error, :header)
|
VALUES (:url, :host, :timestamp, :mimetype, :data, :gemtext, :links, :lang, :response_code, :error, :header, :last_crawled)
|
||||||
RETURNING id
|
RETURNING id
|
||||||
`
|
`
|
||||||
// Keep for backward compatibility, but should be phased out
|
|
||||||
SQL_INSERT_SNAPSHOT_IF_NEW = `
|
|
||||||
INSERT INTO snapshots (url, host, timestamp, mimetype, data, gemtext, links, lang, response_code, error, header)
|
|
||||||
VALUES (:url, :host, :timestamp, :mimetype, :data, :gemtext, :links, :lang, :response_code, :error, :header)
|
|
||||||
ON CONFLICT DO NOTHING
|
|
||||||
`
|
|
||||||
// Update to match the SQL_INSERT_SNAPSHOT - we no longer want to upsert, just insert new versions
|
|
||||||
SQL_UPSERT_SNAPSHOT = `
|
|
||||||
INSERT INTO snapshots (url, host, timestamp, mimetype, data, gemtext, links, lang, response_code, error, header)
|
|
||||||
VALUES (:url, :host, :timestamp, :mimetype, :data, :gemtext, :links, :lang, :response_code, :error, :header)
|
|
||||||
RETURNING id
|
|
||||||
`
|
|
||||||
SQL_UPDATE_SNAPSHOT = `UPDATE snapshots
|
|
||||||
SET url = :url,
|
|
||||||
host = :host,
|
|
||||||
timestamp = :timestamp,
|
|
||||||
mimetype = :mimetype,
|
|
||||||
data = :data,
|
|
||||||
gemtext = :gemtext,
|
|
||||||
links = :links,
|
|
||||||
lang = :lang,
|
|
||||||
response_code = :response_code,
|
|
||||||
error = :error,
|
|
||||||
header = :header,
|
|
||||||
last_crawled = CURRENT_TIMESTAMP
|
|
||||||
WHERE id = :id
|
|
||||||
RETURNING id
|
|
||||||
`
|
|
||||||
SQL_INSERT_URL = `
|
SQL_INSERT_URL = `
|
||||||
INSERT INTO urls (url, host, timestamp)
|
INSERT INTO urls (url, host, timestamp)
|
||||||
VALUES (:url, :host, :timestamp)
|
VALUES (:url, :host, :timestamp)
|
||||||
@@ -115,7 +87,6 @@ RETURNING id
|
|||||||
SQL_DELETE_URL = `
|
SQL_DELETE_URL = `
|
||||||
DELETE FROM urls WHERE url=$1
|
DELETE FROM urls WHERE url=$1
|
||||||
`
|
`
|
||||||
// New queries for retrieving snapshots
|
|
||||||
SQL_GET_LATEST_SNAPSHOT = `
|
SQL_GET_LATEST_SNAPSHOT = `
|
||||||
SELECT * FROM snapshots
|
SELECT * FROM snapshots
|
||||||
WHERE url = $1
|
WHERE url = $1
|
||||||
@@ -140,9 +111,65 @@ RETURNING id
|
|||||||
AND timestamp BETWEEN $2 AND $3
|
AND timestamp BETWEEN $2 AND $3
|
||||||
ORDER BY timestamp DESC
|
ORDER BY timestamp DESC
|
||||||
`
|
`
|
||||||
// New query to record crawl attempt when content is identical (no new snapshot needed)
|
// Update last_crawled timestamp for the most recent snapshot of a URL
|
||||||
SQL_RECORD_CRAWL_ATTEMPT = `
|
SQL_UPDATE_LAST_CRAWLED = `
|
||||||
INSERT INTO snapshots (url, host, mimetype, response_code, error)
|
UPDATE snapshots
|
||||||
VALUES ($1, $2, $3, $4, $5)
|
SET last_crawled = CURRENT_TIMESTAMP
|
||||||
|
WHERE id = (
|
||||||
|
SELECT id FROM snapshots
|
||||||
|
WHERE url = $1
|
||||||
|
ORDER BY timestamp DESC
|
||||||
|
LIMIT 1
|
||||||
|
)
|
||||||
|
`
|
||||||
|
// SQL_FETCH_SNAPSHOTS_FROM_HISTORY Fetches URLs from snapshots for re-crawling based on last_crawled timestamp
|
||||||
|
// This query finds root domain URLs that haven't been crawled recently and selects
|
||||||
|
// one URL per host for diversity. Uses CTEs to:
|
||||||
|
// 1. Find latest crawl attempt per URL (via MAX(last_crawled))
|
||||||
|
// 2. Filter to URLs with actual content and successful responses (20-29)
|
||||||
|
// 3. Select URLs where latest crawl is older than cutoff date
|
||||||
|
// 4. Rank randomly within each host and pick one URL per host
|
||||||
|
// Parameters: $1 = cutoff_date, $2 = limit
|
||||||
|
SQL_FETCH_SNAPSHOTS_FROM_HISTORY = `
|
||||||
|
WITH latest_attempts AS (
|
||||||
|
SELECT
|
||||||
|
url,
|
||||||
|
host,
|
||||||
|
COALESCE(MAX(last_crawled), '1970-01-01'::timestamp) as latest_attempt
|
||||||
|
FROM snapshots
|
||||||
|
WHERE url ~ '^gemini://[^/]+/?$' AND mimetype = 'text/gemini'
|
||||||
|
GROUP BY url, host
|
||||||
|
),
|
||||||
|
root_urls_with_content AS (
|
||||||
|
SELECT DISTINCT
|
||||||
|
la.url,
|
||||||
|
la.host,
|
||||||
|
la.latest_attempt
|
||||||
|
FROM latest_attempts la
|
||||||
|
JOIN snapshots s ON s.url = la.url
|
||||||
|
WHERE (s.gemtext IS NOT NULL OR s.data IS NOT NULL)
|
||||||
|
AND s.response_code BETWEEN 20 AND 29
|
||||||
|
),
|
||||||
|
eligible_urls AS (
|
||||||
|
SELECT
|
||||||
|
url,
|
||||||
|
host,
|
||||||
|
latest_attempt
|
||||||
|
FROM root_urls_with_content
|
||||||
|
WHERE latest_attempt < $1
|
||||||
|
),
|
||||||
|
ranked_urls AS (
|
||||||
|
SELECT
|
||||||
|
url,
|
||||||
|
host,
|
||||||
|
latest_attempt,
|
||||||
|
ROW_NUMBER() OVER (PARTITION BY host ORDER BY RANDOM()) as rank
|
||||||
|
FROM eligible_urls
|
||||||
|
)
|
||||||
|
SELECT url, host
|
||||||
|
FROM ranked_urls
|
||||||
|
WHERE rank = 1
|
||||||
|
ORDER BY RANDOM()
|
||||||
|
LIMIT $2
|
||||||
`
|
`
|
||||||
)
|
)
|
||||||
|
|||||||
115
misc/sql/cleanup_duplicate_snapshots.sql
Normal file
115
misc/sql/cleanup_duplicate_snapshots.sql
Normal file
@@ -0,0 +1,115 @@
|
|||||||
|
-- Cleanup script for snapshots table after adding last_crawled column
|
||||||
|
-- This script consolidates multiple snapshots per URL by:
|
||||||
|
-- 1. Keeping the latest snapshot with content (non-null gemtext OR data)
|
||||||
|
-- 2. Setting its last_crawled to the most recent timestamp from any snapshot for that URL
|
||||||
|
-- 3. Deleting all other snapshots for URLs with multiple snapshots
|
||||||
|
--
|
||||||
|
-- IMPORTANT: This script will permanently delete data. Make sure to backup your database first!
|
||||||
|
|
||||||
|
BEGIN;
|
||||||
|
|
||||||
|
-- Update last_crawled for URLs with multiple snapshots
|
||||||
|
-- Keep the latest snapshot with content and update its last_crawled to the most recent timestamp
|
||||||
|
WITH url_snapshots AS (
|
||||||
|
-- Get all snapshots grouped by URL with row numbers
|
||||||
|
SELECT
|
||||||
|
id,
|
||||||
|
url,
|
||||||
|
timestamp,
|
||||||
|
last_crawled,
|
||||||
|
gemtext,
|
||||||
|
data,
|
||||||
|
ROW_NUMBER() OVER (PARTITION BY url ORDER BY timestamp DESC) as rn_by_timestamp
|
||||||
|
FROM snapshots
|
||||||
|
),
|
||||||
|
latest_content_snapshots AS (
|
||||||
|
-- Find the latest snapshot with content for each URL
|
||||||
|
SELECT
|
||||||
|
url,
|
||||||
|
id as keep_id,
|
||||||
|
timestamp as keep_timestamp
|
||||||
|
FROM url_snapshots
|
||||||
|
WHERE (gemtext IS NOT NULL OR data IS NOT NULL)
|
||||||
|
AND rn_by_timestamp = (
|
||||||
|
SELECT MIN(rn_by_timestamp)
|
||||||
|
FROM url_snapshots us2
|
||||||
|
WHERE us2.url = url_snapshots.url
|
||||||
|
AND (us2.gemtext IS NOT NULL OR us2.data IS NOT NULL)
|
||||||
|
)
|
||||||
|
),
|
||||||
|
most_recent_timestamps AS (
|
||||||
|
-- Get the most recent timestamp (last_crawled or timestamp) for each URL
|
||||||
|
SELECT
|
||||||
|
url,
|
||||||
|
GREATEST(
|
||||||
|
MAX(timestamp),
|
||||||
|
COALESCE(MAX(last_crawled), '1970-01-01'::timestamp)
|
||||||
|
) as most_recent_time
|
||||||
|
FROM snapshots
|
||||||
|
GROUP BY url
|
||||||
|
)
|
||||||
|
-- Update the last_crawled of snapshots we're keeping
|
||||||
|
UPDATE snapshots
|
||||||
|
SET last_crawled = mrt.most_recent_time
|
||||||
|
FROM latest_content_snapshots lcs
|
||||||
|
JOIN most_recent_timestamps mrt ON lcs.url = mrt.url
|
||||||
|
WHERE snapshots.id = lcs.keep_id;
|
||||||
|
|
||||||
|
-- Delete all other snapshots for URLs that have multiple snapshots
|
||||||
|
WITH url_snapshots AS (
|
||||||
|
SELECT
|
||||||
|
id,
|
||||||
|
url,
|
||||||
|
timestamp,
|
||||||
|
gemtext,
|
||||||
|
data,
|
||||||
|
ROW_NUMBER() OVER (PARTITION BY url ORDER BY timestamp DESC) as rn_by_timestamp
|
||||||
|
FROM snapshots
|
||||||
|
),
|
||||||
|
latest_content_snapshots AS (
|
||||||
|
-- Find the latest snapshot with content for each URL
|
||||||
|
SELECT
|
||||||
|
url,
|
||||||
|
id as keep_id
|
||||||
|
FROM url_snapshots
|
||||||
|
WHERE (gemtext IS NOT NULL OR data IS NOT NULL)
|
||||||
|
AND rn_by_timestamp = (
|
||||||
|
SELECT MIN(rn_by_timestamp)
|
||||||
|
FROM url_snapshots us2
|
||||||
|
WHERE us2.url = url_snapshots.url
|
||||||
|
AND (us2.gemtext IS NOT NULL OR us2.data IS NOT NULL)
|
||||||
|
)
|
||||||
|
),
|
||||||
|
snapshots_to_delete AS (
|
||||||
|
-- Find snapshots to delete (all except the ones we're keeping)
|
||||||
|
SELECT s.id
|
||||||
|
FROM snapshots s
|
||||||
|
LEFT JOIN latest_content_snapshots lcs ON s.id = lcs.keep_id
|
||||||
|
WHERE lcs.keep_id IS NULL
|
||||||
|
AND s.url IN (
|
||||||
|
-- Only for URLs that have multiple snapshots
|
||||||
|
SELECT url
|
||||||
|
FROM snapshots
|
||||||
|
GROUP BY url
|
||||||
|
HAVING COUNT(*) > 1
|
||||||
|
)
|
||||||
|
)
|
||||||
|
DELETE FROM snapshots
|
||||||
|
WHERE id IN (SELECT id FROM snapshots_to_delete);
|
||||||
|
|
||||||
|
-- Show summary of changes
|
||||||
|
SELECT
|
||||||
|
'Cleanup completed. Remaining snapshots: ' || COUNT(*) as summary
|
||||||
|
FROM snapshots;
|
||||||
|
|
||||||
|
-- Show URLs that still have multiple snapshots (should be 0 after cleanup)
|
||||||
|
SELECT
|
||||||
|
'URLs with multiple snapshots after cleanup: ' || COUNT(*) as validation
|
||||||
|
FROM (
|
||||||
|
SELECT url
|
||||||
|
FROM snapshots
|
||||||
|
GROUP BY url
|
||||||
|
HAVING COUNT(*) > 1
|
||||||
|
) multi_snapshots;
|
||||||
|
|
||||||
|
COMMIT;
|
||||||
@@ -26,7 +26,8 @@ CREATE TABLE snapshots (
|
|||||||
lang TEXT,
|
lang TEXT,
|
||||||
response_code INTEGER,
|
response_code INTEGER,
|
||||||
error TEXT,
|
error TEXT,
|
||||||
header TEXT
|
header TEXT,
|
||||||
|
last_crawled TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT CURRENT_TIMESTAMP
|
||||||
);
|
);
|
||||||
|
|
||||||
CREATE UNIQUE INDEX idx_url_timestamp ON snapshots (url, timestamp);
|
CREATE UNIQUE INDEX idx_url_timestamp ON snapshots (url, timestamp);
|
||||||
@@ -40,4 +41,6 @@ CREATE INDEX idx_host ON snapshots (host);
|
|||||||
CREATE INDEX idx_response_code_error ON snapshots (response_code, error);
|
CREATE INDEX idx_response_code_error ON snapshots (response_code, error);
|
||||||
CREATE INDEX idx_response_code_error_nulls ON snapshots (response_code, error) WHERE response_code IS NULL AND error IS NULL;
|
CREATE INDEX idx_response_code_error_nulls ON snapshots (response_code, error) WHERE response_code IS NULL AND error IS NULL;
|
||||||
CREATE INDEX idx_snapshots_unprocessed ON snapshots (host) WHERE response_code IS NULL AND error IS NULL;
|
CREATE INDEX idx_snapshots_unprocessed ON snapshots (host) WHERE response_code IS NULL AND error IS NULL;
|
||||||
CREATE INDEX idx_url_latest ON snapshots (url, timestamp DESC);
|
CREATE INDEX idx_url_latest ON snapshots (url, timestamp DESC);
|
||||||
|
CREATE INDEX idx_last_crawled ON snapshots (last_crawled);
|
||||||
|
CREATE INDEX idx_url_last_crawled ON snapshots (url, last_crawled DESC);
|
||||||
Reference in New Issue
Block a user