From eca54b2f6892e1cffe34c031938f6f2aeabbb9a9 Mon Sep 17 00:00:00 2001 From: antanst Date: Thu, 22 May 2025 12:46:36 +0300 Subject: [PATCH] Implement context-aware database operations - Add context support to database operations - Implement versioned snapshots for URL history - Update database queries to support URL timestamps - Improve transaction handling with context - Add utility functions for snapshot history --- db/db.go | 548 +++++++++++++++++++++++++++++++++++++++++++++++ db/db_queries.go | 117 ++++++++-- 2 files changed, 644 insertions(+), 21 deletions(-) create mode 100644 db/db.go diff --git a/db/db.go b/db/db.go new file mode 100644 index 0000000..5243fa2 --- /dev/null +++ b/db/db.go @@ -0,0 +1,548 @@ +package db + +import ( + "bytes" + "context" + "database/sql" + "encoding/json" + "errors" + "fmt" + "strings" + "sync" + "time" + + "gemini-grc/common/contextlog" + "gemini-grc/common/snapshot" + commonUrl "gemini-grc/common/url" + "gemini-grc/config" + "gemini-grc/contextutil" + "gemini-grc/logging" + "git.antanst.com/antanst/xerrors" + "github.com/guregu/null/v5" + _ "github.com/jackc/pgx/v5/stdlib" // PGX driver for PostgreSQL + "github.com/jmoiron/sqlx" + "github.com/lib/pq" +) + +type DbService interface { + // Core database methods + Initialize(ctx context.Context) error + Shutdown(ctx context.Context) error + NewTx(ctx context.Context) (*sqlx.Tx, error) + + // URL methods + InsertURL(ctx context.Context, tx *sqlx.Tx, url string) error + NormalizeURL(ctx context.Context, tx *sqlx.Tx, url string, normalizedURL string) error + DeleteURL(ctx context.Context, tx *sqlx.Tx, url string) error + MarkURLsAsBeingProcessed(ctx context.Context, tx *sqlx.Tx, urls []string) error + GetUrlHosts(ctx context.Context, tx *sqlx.Tx) ([]string, error) + GetRandomUrlsFromHosts(ctx context.Context, hosts []string, limit int, tx *sqlx.Tx) ([]string, error) + + // Snapshot methods + SaveSnapshot(ctx context.Context, tx *sqlx.Tx, s *snapshot.Snapshot) error + OverwriteSnapshot(ctx context.Context, tx *sqlx.Tx, s *snapshot.Snapshot) error + GetLatestSnapshot(ctx context.Context, tx *sqlx.Tx, url string) (*snapshot.Snapshot, error) + GetSnapshotAtTimestamp(ctx context.Context, tx *sqlx.Tx, url string, timestamp time.Time) (*snapshot.Snapshot, error) + GetAllSnapshotsForURL(ctx context.Context, tx *sqlx.Tx, url string) ([]*snapshot.Snapshot, error) + GetSnapshotsByDateRange(ctx context.Context, tx *sqlx.Tx, url string, startTime, endTime time.Time) ([]*snapshot.Snapshot, error) + IsContentIdentical(ctx context.Context, tx *sqlx.Tx, s *snapshot.Snapshot) (bool, error) +} + +type DbServiceImpl struct { + db *sqlx.DB + connected bool + + mu sync.Mutex +} + +var Database DbServiceImpl + +// IsDeadlockError checks if the error is a PostgreSQL deadlock error. +func IsDeadlockError(err error) bool { + err = errors.Unwrap(err) + var pqErr *pq.Error + if errors.As(err, &pqErr) { + return pqErr.Code == "40P01" // PostgreSQL deadlock error code + } + return false +} + +// Initialize initializes the database with context +func (d *DbServiceImpl) Initialize(ctx context.Context) error { + // Create a database-specific context + dbCtx := contextutil.ContextWithComponent(ctx, "database") + contextlog.LogDebugWithContext(dbCtx, logging.GetSlogger(), "Initializing database connection") + + d.mu.Lock() + defer d.mu.Unlock() + + if d.connected { + return nil + } + + // Check if the context is cancelled before proceeding + if err := ctx.Err(); err != nil { + return err + } + + // Create a connection pool + connStr := config.CONFIG.PgURL + db, err := sqlx.Open("pgx", connStr) + if err != nil { + contextlog.LogErrorWithContext(dbCtx, logging.GetSlogger(), "Unable to connect to database with URL %s: %v", connStr, err) + return xerrors.NewError(fmt.Errorf("unable to connect to database with URL %s: %w", connStr, err), 0, "", true) + } + + // Configure connection pool + db.SetMaxOpenConns(config.CONFIG.MaxDbConnections) + db.SetMaxIdleConns(config.CONFIG.MaxDbConnections / 2) + db.SetConnMaxLifetime(time.Minute * 5) + db.SetConnMaxIdleTime(time.Minute * 1) + + // Check if the context is cancelled before proceeding with ping + if err := ctx.Err(); err != nil { + return err + } + + // Use PingContext for context-aware ping + err = db.PingContext(ctx) + if err != nil { + contextlog.LogErrorWithContext(dbCtx, logging.GetSlogger(), "Unable to ping database: %v", err) + return xerrors.NewError(fmt.Errorf("unable to ping database: %w", err), 0, "", true) + } + + contextlog.LogDebugWithContext(dbCtx, logging.GetSlogger(), "Database connection initialized successfully") + d.db = db + d.connected = true + return nil +} + +// Shutdown the database with context +func (d *DbServiceImpl) Shutdown(ctx context.Context) error { + dbCtx := contextutil.ContextWithComponent(ctx, "database") + contextlog.LogDebugWithContext(dbCtx, logging.GetSlogger(), "Shutting down database connection") + + d.mu.Lock() + defer d.mu.Unlock() + + if !d.connected { + return nil + } + + // Check if the context is cancelled before proceeding + if err := ctx.Err(); err != nil { + return err + } + + err := d.db.Close() + if err != nil { + contextlog.LogErrorWithContext(dbCtx, logging.GetSlogger(), "Error closing database connection: %v", err) + } else { + contextlog.LogDebugWithContext(dbCtx, logging.GetSlogger(), "Database connection closed successfully") + d.connected = false + } + return err +} + +// NewTx creates a new transaction with context +func (d *DbServiceImpl) NewTx(ctx context.Context) (*sqlx.Tx, error) { + dbCtx := contextutil.ContextWithComponent(ctx, "database") + + // Check if the context is cancelled before proceeding + if err := ctx.Err(); err != nil { + contextlog.LogErrorWithContext(dbCtx, logging.GetSlogger(), "Context error before creating transaction: %v", err) + return nil, err + } + + contextlog.LogDebugWithContext(dbCtx, logging.GetSlogger(), "Creating new database transaction") + tx, err := d.db.BeginTxx(ctx, nil) + if err != nil { + contextlog.LogErrorWithContext(dbCtx, logging.GetSlogger(), "Failed to create transaction: %v", err) + return nil, err + } + + return tx, nil +} + +// InsertURL inserts a URL with context +func (d *DbServiceImpl) InsertURL(ctx context.Context, tx *sqlx.Tx, url string) error { + dbCtx := contextutil.ContextWithComponent(ctx, "database") + contextlog.LogDebugWithContext(dbCtx, logging.GetSlogger(), "Inserting URL %s", url) + + // Check if the context is cancelled before proceeding + if err := ctx.Err(); err != nil { + return err + } + + // Context-aware implementation + normalizedURL, err := commonUrl.ParseURL(url, "", true) + if err != nil { + return err + } + + a := struct { + Url string + Host string + Timestamp time.Time + }{ + Url: normalizedURL.Full, + Host: normalizedURL.Hostname, + Timestamp: time.Now(), + } + + query := SQL_INSERT_URL + _, err = tx.NamedExecContext(ctx, query, a) + if err != nil { + return xerrors.NewError(fmt.Errorf("cannot insert URL: database error %w URL %s", err, url), 0, "", true) + } + return nil +} + +// NormalizeURL normalizes a URL with context +func (d *DbServiceImpl) NormalizeURL(ctx context.Context, tx *sqlx.Tx, url string, normalizedURL string) error { + dbCtx := contextutil.ContextWithComponent(ctx, "database") + + // Check if URLs are already the same + if url == normalizedURL { + return nil + } + + // Check if the context is cancelled before proceeding + if err := ctx.Err(); err != nil { + return err + } + + contextlog.LogDebugWithContext(dbCtx, logging.GetSlogger(), "Updating normalized URL %s -> %s", url, normalizedURL) + + // Context-aware implementation + query := SQL_UPDATE_URL + a := struct { + Url string `db:"Url"` + NormalizedURL string `db:"NormalizedURL"` + }{ + Url: url, + NormalizedURL: normalizedURL, + } + _, err := tx.NamedExecContext(ctx, query, a) + if err != nil { + return xerrors.NewError(fmt.Errorf("cannot update normalized URL: %w URL %s", err, url), 0, "", true) + } + return nil +} + +// DeleteURL deletes a URL with context +func (d *DbServiceImpl) DeleteURL(ctx context.Context, tx *sqlx.Tx, url string) error { + dbCtx := contextutil.ContextWithComponent(ctx, "database") + contextlog.LogDebugWithContext(dbCtx, logging.GetSlogger(), "Deleting URL %s", url) + + // Check if the context is cancelled before proceeding + if err := ctx.Err(); err != nil { + return err + } + + // Context-aware implementation + query := SQL_DELETE_URL + _, err := tx.ExecContext(ctx, query, url) + if err != nil { + return xerrors.NewError(fmt.Errorf("cannot delete URL: database error %w URL %s", err, url), 0, "", true) + } + return nil +} + +// MarkURLsAsBeingProcessed marks URLs as being processed with context +func (d *DbServiceImpl) MarkURLsAsBeingProcessed(ctx context.Context, tx *sqlx.Tx, urls []string) error { + dbCtx := contextutil.ContextWithComponent(ctx, "database") + + // Skip if no URLs provided + if len(urls) == 0 { + return nil + } + + // Check if the context is cancelled before proceeding + if err := ctx.Err(); err != nil { + return err + } + + contextlog.LogDebugWithContext(dbCtx, logging.GetSlogger(), "Marking %d URLs as being processed", len(urls)) + + // Context-aware implementation + if len(urls) > 0 { + // Build a query with multiple parameters instead of using pq.Array + placeholders := make([]string, len(urls)) + args := make([]interface{}, len(urls)) + for i, url := range urls { + placeholders[i] = fmt.Sprintf("$%d", i+1) + args[i] = url + } + query := fmt.Sprintf(SQL_MARK_URLS_BEING_PROCESSED, strings.Join(placeholders, ",")) + _, err := tx.ExecContext(ctx, query, args...) + if err != nil { + return xerrors.NewError(fmt.Errorf("cannot mark URLs as being processed: %w", err), 0, "", true) + } + } + return nil +} + +// GetUrlHosts gets URL hosts with context +func (d *DbServiceImpl) GetUrlHosts(ctx context.Context, tx *sqlx.Tx) ([]string, error) { + dbCtx := contextutil.ContextWithComponent(ctx, "database") + contextlog.LogDebugWithContext(dbCtx, logging.GetSlogger(), "Getting URL hosts") + + // Check if the context is cancelled before proceeding + if err := ctx.Err(); err != nil { + return nil, err + } + + // Context-aware implementation + var hosts []string + var query string + if config.CONFIG.GopherEnable { + query = "SELECT DISTINCT(host) FROM urls WHERE being_processed IS NOT TRUE" + } else { + query = "SELECT DISTINCT(host) FROM urls WHERE url like 'gemini://%' AND being_processed IS NOT TRUE" + } + err := tx.SelectContext(ctx, &hosts, query) + if err != nil { + return nil, xerrors.NewError(err, 0, "", true) + } + return hosts, nil +} + +// GetRandomUrlsFromHosts gets random URLs from hosts with context +func (d *DbServiceImpl) GetRandomUrlsFromHosts(ctx context.Context, hosts []string, limit int, tx *sqlx.Tx) ([]string, error) { + dbCtx := contextutil.ContextWithComponent(ctx, "database") + contextlog.LogDebugWithContext(dbCtx, logging.GetSlogger(), "Getting random URLs from %d hosts with limit %d", len(hosts), limit) + + // Check if the context is cancelled before proceeding + if err := ctx.Err(); err != nil { + return nil, err + } + + // Context-aware implementation + var urls []string + var query string + for _, host := range hosts { + var results []string + if !config.CONFIG.GopherEnable { + query = "SELECT url FROM urls WHERE host=$1 AND url like 'gemini://%' AND being_processed IS NOT TRUE ORDER BY RANDOM() LIMIT $2" + } else { + query = "SELECT url FROM urls WHERE host=$1 AND being_processed IS NOT TRUE ORDER BY RANDOM() LIMIT $2" + } + err := tx.SelectContext(ctx, &results, query, host, limit) + if err != nil { + return nil, xerrors.NewError(err, 0, "", true) + } + urls = append(urls, results...) + } + + // Check context cancellation before mark operation + if err := ctx.Err(); err != nil { + return nil, err + } + + // Use context-aware method for marking URLs + err := d.MarkURLsAsBeingProcessed(ctx, tx, urls) + if err != nil { + return nil, err + } + return urls, nil +} + +// SaveSnapshot saves a snapshot with context +func (d *DbServiceImpl) SaveSnapshot(ctx context.Context, tx *sqlx.Tx, s *snapshot.Snapshot) error { + dbCtx := contextutil.ContextWithComponent(ctx, "database") + contextlog.LogDebugWithContext(dbCtx, logging.GetSlogger(), "Saving snapshot for URL %s", s.URL.String()) + + // Check if the context is cancelled before proceeding + if err := ctx.Err(); err != nil { + return err + } + + // Context-aware implementation + if config.CONFIG.DryRun { + marshalled, err := json.MarshalIndent(s, "", " ") + if err != nil { + return xerrors.NewError(fmt.Errorf("JSON serialization error for %v", s), 0, "", true) + } + contextlog.LogDebugWithContext(dbCtx, logging.GetSlogger(), "Would save snapshot %s", marshalled) + return nil + } + + // Check context before expensive operations + if err := ctx.Err(); err != nil { + return err + } + + // Check if we should skip storing identical content + if config.CONFIG.SkipIdenticalContent { + // Use the context-aware version to check for identical content + identical, err := d.IsContentIdentical(ctx, tx, s) + if err != nil { + return err + } else if identical { + contextlog.LogDebugWithContext(dbCtx, logging.GetSlogger(), "Skipping URL with identical content to existing snapshot: %s", s.URL.String()) + return nil + } + } + + // Always ensure we have a current timestamp + s.Timestamp = null.TimeFrom(time.Now()) + + // For PostgreSQL, use the global sqlx.NamedQueryContext function + // The SQL_INSERT_SNAPSHOT already has a RETURNING id clause + query := SQL_INSERT_SNAPSHOT + rows, err := sqlx.NamedQueryContext(ctx, tx, query, s) + if err != nil { + return xerrors.NewError(fmt.Errorf("cannot save snapshot: %w", err), 0, "", true) + } + defer rows.Close() + + // Scan the returned ID + if rows.Next() { + err = rows.Scan(&s.ID) + if err != nil { + return xerrors.NewError(fmt.Errorf("cannot save snapshot: error scanning returned ID: %w", err), 0, "", true) + } + } + return nil +} + +// OverwriteSnapshot overwrites a snapshot with context (maintained for backward compatibility) +func (d *DbServiceImpl) OverwriteSnapshot(ctx context.Context, tx *sqlx.Tx, s *snapshot.Snapshot) error { + dbCtx := contextutil.ContextWithComponent(ctx, "database") + contextlog.LogDebugWithContext(dbCtx, logging.GetSlogger(), "Overwriting snapshot for URL %s", s.URL.String()) + + // Check if the context is cancelled before proceeding + if err := ctx.Err(); err != nil { + return err + } + + // Now simply delegate to SaveSnapshot which is already context-aware + return d.SaveSnapshot(ctx, tx, s) +} + +// GetLatestSnapshot gets the latest snapshot with context +func (d *DbServiceImpl) GetLatestSnapshot(ctx context.Context, tx *sqlx.Tx, url string) (*snapshot.Snapshot, error) { + dbCtx := contextutil.ContextWithComponent(ctx, "database") + contextlog.LogDebugWithContext(dbCtx, logging.GetSlogger(), "Getting latest snapshot for URL %s", url) + + // Check if the context is cancelled before proceeding + if err := ctx.Err(); err != nil { + return nil, err + } + + // Context-aware implementation + s := &snapshot.Snapshot{} + err := tx.GetContext(ctx, s, SQL_GET_LATEST_SNAPSHOT, url) + if err != nil { + if errors.Is(err, sql.ErrNoRows) { + return nil, xerrors.NewError(fmt.Errorf("no snapshot found for URL %s", url), 0, "", false) + } + return nil, xerrors.NewError(fmt.Errorf("cannot get latest snapshot for URL %s: %w", url, err), 0, "", false) + } + return s, nil +} + +// GetSnapshotAtTimestamp gets a snapshot at a specific timestamp with context +func (d *DbServiceImpl) GetSnapshotAtTimestamp(ctx context.Context, tx *sqlx.Tx, url string, timestamp time.Time) (*snapshot.Snapshot, error) { + dbCtx := contextutil.ContextWithComponent(ctx, "database") + contextlog.LogDebugWithContext(dbCtx, logging.GetSlogger(), "Getting snapshot for URL %s at timestamp %v", url, timestamp) + + // Check if the context is cancelled before proceeding + if err := ctx.Err(); err != nil { + return nil, err + } + + // Context-aware implementation + s := &snapshot.Snapshot{} + err := tx.GetContext(ctx, s, SQL_GET_SNAPSHOT_AT_TIMESTAMP, url, timestamp) + if err != nil { + if errors.Is(err, sql.ErrNoRows) { + return nil, xerrors.NewError(fmt.Errorf("no snapshot found for URL %s at or before %v", url, timestamp), 0, "", false) + } + return nil, xerrors.NewError(fmt.Errorf("cannot get snapshot for URL %s at timestamp %v: %w", url, timestamp, err), 0, "", false) + } + return s, nil +} + +// GetAllSnapshotsForURL gets all snapshots for a URL with context +func (d *DbServiceImpl) GetAllSnapshotsForURL(ctx context.Context, tx *sqlx.Tx, url string) ([]*snapshot.Snapshot, error) { + dbCtx := contextutil.ContextWithComponent(ctx, "database") + contextlog.LogDebugWithContext(dbCtx, logging.GetSlogger(), "Getting all snapshots for URL %s", url) + + // Check if the context is cancelled before proceeding + if err := ctx.Err(); err != nil { + return nil, err + } + + // Context-aware implementation + snapshots := []*snapshot.Snapshot{} + err := tx.SelectContext(ctx, &snapshots, SQL_GET_ALL_SNAPSHOTS_FOR_URL, url) + if err != nil { + return nil, xerrors.NewError(fmt.Errorf("cannot get all snapshots for URL %s: %w", url, err), 0, "", false) + } + return snapshots, nil +} + +// GetSnapshotsByDateRange gets snapshots by date range with context +func (d *DbServiceImpl) GetSnapshotsByDateRange(ctx context.Context, tx *sqlx.Tx, url string, startTime, endTime time.Time) ([]*snapshot.Snapshot, error) { + dbCtx := contextutil.ContextWithComponent(ctx, "database") + contextlog.LogDebugWithContext(dbCtx, logging.GetSlogger(), "Getting snapshots for URL %s in date range %v to %v", url, startTime, endTime) + + // Check if the context is cancelled before proceeding + if err := ctx.Err(); err != nil { + return nil, err + } + + // Context-aware implementation + snapshots := []*snapshot.Snapshot{} + err := tx.SelectContext(ctx, &snapshots, SQL_GET_SNAPSHOTS_BY_DATE_RANGE, url, startTime, endTime) + if err != nil { + return nil, xerrors.NewError(fmt.Errorf("cannot get snapshots for URL %s in date range %v to %v: %w", + url, startTime, endTime, err), 0, "", false) + } + return snapshots, nil +} + +// IsContentIdentical checks if content is identical with context +func (d *DbServiceImpl) IsContentIdentical(ctx context.Context, tx *sqlx.Tx, s *snapshot.Snapshot) (bool, error) { + dbCtx := contextutil.ContextWithComponent(ctx, "database") + contextlog.LogDebugWithContext(dbCtx, logging.GetSlogger(), "Checking if content is identical for URL %s", s.URL.String()) + + // Check if the context is cancelled before proceeding + if err := ctx.Err(); err != nil { + return false, err + } + + // Context-aware implementation + // Only check for identical content if we have gemtext or data + if (!s.GemText.Valid || s.GemText.String == "") && + (!s.Data.Valid || len(s.Data.V) == 0) { + return false, nil + } + + // Try to get the latest snapshot for this URL + latestSnapshot := &snapshot.Snapshot{} + err := tx.GetContext(ctx, latestSnapshot, SQL_GET_LATEST_SNAPSHOT, s.URL.String()) + if err != nil { + // If there's no snapshot yet, it can't be identical + if errors.Is(err, sql.ErrNoRows) { + return false, nil + } + return false, xerrors.NewError(err, 0, "", true) + } + + // Check context cancellation before potentially expensive comparison + if err := ctx.Err(); err != nil { + return false, err + } + + // Check if the content is identical + if s.GemText.Valid && latestSnapshot.GemText.Valid { + return s.GemText.String == latestSnapshot.GemText.String, nil + } else if s.Data.Valid && latestSnapshot.Data.Valid { + return bytes.Equal(s.Data.V, latestSnapshot.Data.V), nil + } + + return false, nil +} diff --git a/db/db_queries.go b/db/db_queries.go index 4e3c1a6..12ebb6b 100644 --- a/db/db_queries.go +++ b/db/db_queries.go @@ -8,7 +8,7 @@ WHERE u.id IN ( SELECT id FROM ( SELECT id, ROW_NUMBER() OVER (PARTITION BY host ORDER BY id) as rn FROM urls - ) t + ) t WHERE rn <= 3 ) ORDER BY RANDOM() @@ -18,30 +18,71 @@ LIMIT $1 SQL_SELECT_RANDOM_URLS = ` SELECT url FROM urls u +WHERE u.being_processed IS NOT TRUE ORDER BY RANDOM() FOR UPDATE SKIP LOCKED LIMIT $1 ` - SQL_INSERT_SNAPSHOT_IF_NEW = ` - INSERT INTO snapshots (url, host, timestamp, mimetype, data, gemtext, links, lang, response_code, error) - VALUES (:url, :host, :timestamp, :mimetype, :data, :gemtext, :links, :lang, :response_code, :error) - ON CONFLICT (url) DO NOTHING - ` - SQL_UPSERT_SNAPSHOT = `INSERT INTO snapshots (url, host, timestamp, mimetype, data, gemtext, links, lang, response_code, error) - VALUES (:url, :host, :timestamp, :mimetype, :data, :gemtext, :links, :lang, :response_code, :error) - ON CONFLICT (url) DO UPDATE SET - url = EXCLUDED.url, - host = EXCLUDED.host, - timestamp = EXCLUDED.timestamp, - mimetype = EXCLUDED.mimetype, - data = EXCLUDED.data, - gemtext = EXCLUDED.gemtext, - links = EXCLUDED.links, - lang = EXCLUDED.lang, - response_code = EXCLUDED.response_code, - error = EXCLUDED.error - RETURNING id + SQL_MARK_URLS_BEING_PROCESSED = `UPDATE urls SET being_processed = true WHERE url IN (%s)` + SQL_SELECT_RANDOM_URLS_GEMINI_ONLY = ` +SELECT url +FROM urls u +WHERE u.url like 'gemini://%' + AND u.being_processed IS NOT TRUE +ORDER BY RANDOM() +FOR UPDATE SKIP LOCKED +LIMIT $1 ` + SQL_SELECT_RANDOM_URLS_GEMINI_ONLY_2 = ` +WITH RankedUrls AS ( + -- Step 1: Assign a random rank to each URL within its host group + SELECT + url, + host, + ROW_NUMBER() OVER (PARTITION BY host ORDER BY RANDOM()) as rn + FROM + urls + WHERE url like 'gemini://%' + AND being_processed IS NOT TRUE +), +OneUrlPerHost AS ( + -- Step 2: Filter to keep only the first-ranked (random) URL per host + SELECT + url, + host + FROM + RankedUrls + WHERE + rn = 1 +) +-- Step 3: From the set of one URL per host, randomly select X +SELECT + url +FROM + OneUrlPerHost +ORDER BY + RANDOM() +FOR UPDATE SKIP LOCKED +LIMIT $1 +` + // New query - always insert a new snapshot without conflict handling + SQL_INSERT_SNAPSHOT = ` + INSERT INTO snapshots (url, host, timestamp, mimetype, data, gemtext, links, lang, response_code, error, header) + VALUES (:url, :host, :timestamp, :mimetype, :data, :gemtext, :links, :lang, :response_code, :error, :header) + RETURNING id + ` + // Keep for backward compatibility, but should be phased out + SQL_INSERT_SNAPSHOT_IF_NEW = ` + INSERT INTO snapshots (url, host, timestamp, mimetype, data, gemtext, links, lang, response_code, error, header) + VALUES (:url, :host, :timestamp, :mimetype, :data, :gemtext, :links, :lang, :response_code, :error, :header) + ON CONFLICT DO NOTHING + ` + // Update to match the SQL_INSERT_SNAPSHOT - we no longer want to upsert, just insert new versions + SQL_UPSERT_SNAPSHOT = ` + INSERT INTO snapshots (url, host, timestamp, mimetype, data, gemtext, links, lang, response_code, error, header) + VALUES (:url, :host, :timestamp, :mimetype, :data, :gemtext, :links, :lang, :response_code, :error, :header) + RETURNING id + ` SQL_UPDATE_SNAPSHOT = `UPDATE snapshots SET url = :url, host = :host, @@ -52,7 +93,8 @@ gemtext = :gemtext, links = :links, lang = :lang, response_code = :response_code, -error = :error +error = :error, +header = :header WHERE id = :id RETURNING id ` @@ -60,8 +102,41 @@ RETURNING id INSERT INTO urls (url, host, timestamp) VALUES (:url, :host, :timestamp) ON CONFLICT (url) DO NOTHING + ` + SQL_UPDATE_URL = ` + UPDATE urls + SET url = :NormalizedURL + WHERE url = :Url + AND NOT EXISTS ( + SELECT 1 FROM urls WHERE url = :NormalizedURL + ) ` SQL_DELETE_URL = ` DELETE FROM urls WHERE url=$1 ` + // New queries for retrieving snapshots + SQL_GET_LATEST_SNAPSHOT = ` + SELECT * FROM snapshots + WHERE url = $1 + ORDER BY timestamp DESC + LIMIT 1 + ` + SQL_GET_SNAPSHOT_AT_TIMESTAMP = ` + SELECT * FROM snapshots + WHERE url = $1 + AND timestamp <= $2 + ORDER BY timestamp DESC + LIMIT 1 + ` + SQL_GET_ALL_SNAPSHOTS_FOR_URL = ` + SELECT * FROM snapshots + WHERE url = $1 + ORDER BY timestamp DESC + ` + SQL_GET_SNAPSHOTS_BY_DATE_RANGE = ` + SELECT * FROM snapshots + WHERE url = $1 + AND timestamp BETWEEN $2 AND $3 + ORDER BY timestamp DESC + ` )