Reorganize code for more granular imports

This commit is contained in:
2025-02-26 10:34:25 +02:00
parent a9983f3531
commit 4bceb75695
23 changed files with 1549 additions and 1232 deletions

160
db/db.go
View File

@@ -2,20 +2,22 @@ package db
import (
"encoding/json"
"errors"
"fmt"
"gemini-grc/common"
"os"
"strconv"
"time"
"gemini-grc/common/snapshot"
commonUrl "gemini-grc/common/url"
"gemini-grc/config"
"gemini-grc/errors"
"gemini-grc/logging"
_ "github.com/jackc/pgx/v5/stdlib" // PGX driver for PostgreSQL
"github.com/jmoiron/sqlx"
"github.com/lib/pq"
)
func ConnectToDB() *sqlx.DB {
func ConnectToDB() (*sqlx.DB, error) {
connStr := fmt.Sprintf("postgres://%s:%s@%s:%s/%s", //nolint:nosprintfhostport
os.Getenv("PG_USER"),
os.Getenv("PG_PASSWORD"),
@@ -27,25 +29,26 @@ func ConnectToDB() *sqlx.DB {
// Create a connection pool
db, err := sqlx.Open("pgx", connStr)
if err != nil {
panic(fmt.Sprintf("Unable to connect to database with URL %s: %v\n", connStr, err))
return nil, errors.NewFatalError(fmt.Errorf("unable to connect to database with URL %s: %w", connStr, err))
}
// TODO move PG_MAX_OPEN_CONNECTIONS to config env variables
maxConnections, err := strconv.Atoi(os.Getenv("PG_MAX_OPEN_CONNECTIONS"))
if err != nil {
panic(fmt.Sprintf("Unable to set max DB connections: %s\n", err))
return nil, errors.NewFatalError(fmt.Errorf("unable to set DB max connections: %w", err))
}
db.SetMaxOpenConns(maxConnections)
err = db.Ping()
if err != nil {
panic(fmt.Sprintf("Unable to ping database: %v\n", err))
return nil, errors.NewFatalError(fmt.Errorf("unable to ping database: %w", err))
}
logging.LogDebug("Connected to database")
return db
return db, nil
}
// IsDeadlockError checks if the error is a PostgreSQL deadlock error
// IsDeadlockError checks if the error is a PostgreSQL deadlock error.
func IsDeadlockError(err error) bool {
err = errors.Unwrap(err)
var pqErr *pq.Error
if errors.As(err, &pqErr) {
return pqErr.Code == "40P01" // PostgreSQL deadlock error code
@@ -53,134 +56,85 @@ func IsDeadlockError(err error) bool {
return false
}
func GetURLsToVisit(tx *sqlx.Tx) ([]string, error) {
func GetRandomUrls(tx *sqlx.Tx) ([]string, error) {
var urls []string
err := tx.Select(&urls, SQL_SELECT_RANDOM_URLS_UNIQUE_HOSTS, config.CONFIG.WorkerBatchSize)
err := tx.Select(&urls, SQL_SELECT_RANDOM_URLS, config.CONFIG.WorkerBatchSize)
if err != nil {
return nil, fmt.Errorf("%w: %w", common.ErrDatabase, err)
return nil, errors.NewFatalError(err)
}
return urls, nil
}
func GetRandomUrlsWithBasePath(tx *sqlx.Tx) ([]string, error) {
SqlQuery := `SELECT url FROM snapshots WHERE url ~ '^[^:]+://[^/]+/?$' ORDER BY RANDOM() LIMIT $1`
var urls []string
err := tx.Select(&urls, SqlQuery, config.CONFIG.WorkerBatchSize)
if err != nil {
return nil, errors.NewFatalError(err)
}
return urls, nil
}
func InsertURL(tx *sqlx.Tx, url string) error {
logging.LogDebug("Inserting URL %s", url)
query := SQL_INSERT_URL
_, err := tx.NamedExec(query, url)
normalizedURL, err := commonUrl.ParseURL(url, "", true)
if err != nil {
return fmt.Errorf("%w inserting URL: %w", common.ErrDatabase, err)
return err
}
a := struct {
Url string
Host string
Timestamp time.Time
}{
Url: normalizedURL.Full,
Host: normalizedURL.Hostname,
Timestamp: time.Now(),
}
_, err = tx.NamedExec(query, a)
if err != nil {
return errors.NewFatalError(fmt.Errorf("cannot insert URL: database error %w URL %s", err, url))
}
return nil
}
func SaveSnapshotIfNew(tx *sqlx.Tx, s *common.Snapshot) error {
func DeleteURL(tx *sqlx.Tx, url string) error {
logging.LogDebug("Deleting URL %s", url)
query := SQL_DELETE_URL
_, err := tx.Exec(query, url)
if err != nil {
return errors.NewFatalError(fmt.Errorf("cannot delete URL: database error %w URL %s", err, url))
}
return nil
}
func OverwriteSnapshot(tx *sqlx.Tx, s *snapshot.Snapshot) (err error) {
if config.CONFIG.DryRun {
marshalled, err := json.MarshalIndent(s, "", " ")
if err != nil {
panic(fmt.Sprintf("JSON serialization error for %v", s))
return errors.NewFatalError(fmt.Errorf("JSON serialization error for %v", s))
}
logging.LogDebug("Would insert (if new) snapshot %s", marshalled)
logging.LogDebug("Would upsert snapshot %s", marshalled)
return nil
}
query := SQL_INSERT_SNAPSHOT_IF_NEW
_, err := tx.NamedExec(query, s)
if err != nil {
return fmt.Errorf("[%s] GeminiError inserting snapshot: %w", s.URL, err)
}
return nil
}
func OverwriteSnapshot(workedID int, tx *sqlx.Tx, s *common.Snapshot) (err error) {
// if config.CONFIG.DryRun {
//marshalled, err := json.MarshalIndent(s, "", " ")
//if err != nil {
// panic(fmt.Sprintf("JSON serialization error for %v", s))
//}
//logging.LogDebug("[%d] Would upsert snapshot %s", workedID, marshalled)
// return nil
// }
query := SQL_UPSERT_SNAPSHOT
rows, err := tx.NamedQuery(query, s)
if err != nil {
return fmt.Errorf("[%d] %w while upserting snapshot: %w", workedID, common.ErrDatabase, err)
return errors.NewFatalError(fmt.Errorf("cannot overwrite snapshot: %w", err))
}
defer func() {
_err := rows.Close()
if _err != nil {
err = fmt.Errorf("[%d] %w error closing rows: %w", workedID, common.ErrDatabase, _err)
if err == nil && _err != nil {
err = errors.NewFatalError(fmt.Errorf("cannot overwrite snapshot: error closing rows: %w", err))
}
}()
if rows.Next() {
var returnedID int
err = rows.Scan(&returnedID)
if err != nil {
return fmt.Errorf("[%d] %w error scanning returned id: %w", workedID, common.ErrDatabase, err)
return errors.NewFatalError(fmt.Errorf("cannot overwrite snapshot: error scanning rows: %w", err))
}
s.ID = returnedID
// logging.LogDebug("[%d] Upserted snapshot with ID %d", workedID, returnedID)
}
return nil
}
func UpdateSnapshot(workedID int, tx *sqlx.Tx, s *common.Snapshot) (err error) {
// if config.CONFIG.DryRun {
//marshalled, err := json.MarshalIndent(s, "", " ")
//if err != nil {
// panic(fmt.Sprintf("JSON serialization error for %v", s))
//}
//logging.LogDebug("[%d] Would upsert snapshot %s", workedID, marshalled)
// return nil
// }
query := SQL_UPDATE_SNAPSHOT
rows, err := tx.NamedQuery(query, s)
if err != nil {
return fmt.Errorf("[%d] %w while updating snapshot: %w", workedID, common.ErrDatabase, err)
}
defer func() {
_err := rows.Close()
if _err != nil {
err = fmt.Errorf("[%d] %w error closing rows: %w", workedID, common.ErrDatabase, _err)
}
}()
if rows.Next() {
var returnedID int
err = rows.Scan(&returnedID)
if err != nil {
return fmt.Errorf("[%d] %w error scanning returned id: %w", workedID, common.ErrDatabase, err)
}
s.ID = returnedID
// logging.LogDebug("[%d] Updated snapshot with ID %d", workedID, returnedID)
}
return nil
}
func SaveLinksToDBinBatches(tx *sqlx.Tx, snapshots []*common.Snapshot) error {
if config.CONFIG.DryRun {
return nil
}
const batchSize = 5000
query := SQL_INSERT_SNAPSHOT_IF_NEW
for i := 0; i < len(snapshots); i += batchSize {
end := i + batchSize
if end > len(snapshots) {
end = len(snapshots)
}
batch := snapshots[i:end]
_, err := tx.NamedExec(query, batch)
if err != nil {
return fmt.Errorf("%w: While saving links in batches: %w", common.ErrDatabase, err)
}
}
return nil
}
func SaveLinksToDB(tx *sqlx.Tx, snapshots []*common.Snapshot) error {
if config.CONFIG.DryRun {
return nil
}
query := SQL_INSERT_SNAPSHOT_IF_NEW
_, err := tx.NamedExec(query, snapshots)
if err != nil {
logging.LogError("GeminiError batch inserting snapshots: %w", err)
return fmt.Errorf("DB error: %w", err)
}
return nil
}