Enhance crawler with seed list and SQL utilities
Add seedList module for URL initialization, comprehensive SQL utilities for database analysis, and update project configuration.
This commit is contained in:
9
.gitignore
vendored
9
.gitignore
vendored
@@ -1,5 +1,6 @@
|
||||
**/.#*
|
||||
**/*~
|
||||
**/.DS_Store
|
||||
/.idea
|
||||
/.goroot
|
||||
/dist/**
|
||||
@@ -18,3 +19,11 @@ run*.sh
|
||||
/db/sql/**
|
||||
|
||||
**/.claude/settings.local.json
|
||||
|
||||
/crawl.sh
|
||||
/crawler.sh
|
||||
/get.sh
|
||||
/snapshot_history.sh
|
||||
/whitelist.txt
|
||||
|
||||
/CLAUDE.md
|
||||
|
||||
@@ -19,6 +19,13 @@ CREATE UNIQUE INDEX idx_url_timestamp ON snapshots (url, timestamp);
|
||||
CREATE INDEX idx_url_latest ON snapshots (url, timestamp DESC);
|
||||
```
|
||||
|
||||
## Error handling
|
||||
- `xerrors` library is used for error creation/wrapping.
|
||||
- The "Fatal" field is not used, we _always_ panic on fatal errors.
|
||||
- _All_ internal functions _must_ return `xerror` errors.
|
||||
- _All_ external errors are wrapped within `xerror` errors.
|
||||
|
||||
|
||||
### Code Changes
|
||||
|
||||
1. **Updated SQL Queries**:
|
||||
|
||||
@@ -7,7 +7,7 @@ import (
|
||||
"strings"
|
||||
|
||||
"gemini-grc/config"
|
||||
"gemini-grc/logging"
|
||||
"git.antanst.com/antanst/logging"
|
||||
"git.antanst.com/antanst/xerrors"
|
||||
)
|
||||
|
||||
|
||||
67
common/seedList/seedlist.go
Normal file
67
common/seedList/seedlist.go
Normal file
@@ -0,0 +1,67 @@
|
||||
package seedList
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"os"
|
||||
"strings"
|
||||
|
||||
"git.antanst.com/antanst/logging"
|
||||
"git.antanst.com/antanst/xerrors"
|
||||
)
|
||||
|
||||
var seedlist []string //nolint:gochecknoglobals
|
||||
|
||||
func Initialize() error {
|
||||
var err error
|
||||
|
||||
// Initialize seedlist from fixed path
|
||||
if err = loadSeedlist("seed_urls.txt"); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func loadSeedlist(filePath string) error {
|
||||
if seedlist != nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
data, err := os.ReadFile(filePath)
|
||||
if err != nil {
|
||||
seedlist = []string{}
|
||||
return xerrors.NewError(fmt.Errorf("could not load seedlist file: %w", err), 0, "", true)
|
||||
}
|
||||
|
||||
lines := strings.Split(string(data), "\n")
|
||||
seedlist = []string{}
|
||||
|
||||
for _, line := range lines {
|
||||
line = strings.TrimSpace(line)
|
||||
if line == "" || strings.HasPrefix(line, "#") {
|
||||
continue
|
||||
}
|
||||
seedlist = append(seedlist, line)
|
||||
}
|
||||
|
||||
if len(seedlist) > 0 {
|
||||
logging.LogInfo("Loaded %d seed URLs", len(seedlist))
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func Shutdown() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// GetSeedURLs returns the list of seed URLs
|
||||
func GetSeedURLs() []string {
|
||||
if seedlist == nil {
|
||||
return []string{}
|
||||
}
|
||||
// Return a copy to prevent external modification
|
||||
result := make([]string, len(seedlist))
|
||||
copy(result, seedlist)
|
||||
return result
|
||||
}
|
||||
67
common/seedList/seedlist_test.go
Normal file
67
common/seedList/seedlist_test.go
Normal file
@@ -0,0 +1,67 @@
|
||||
package seedList
|
||||
|
||||
import (
|
||||
"os"
|
||||
"testing"
|
||||
)
|
||||
|
||||
func TestLoadSeedlist(t *testing.T) {
|
||||
// Create a temporary test file
|
||||
content := `# Test seed URLs
|
||||
gemini://example.com/
|
||||
gemini://test.com/
|
||||
|
||||
# Another comment
|
||||
gemini://demo.org/`
|
||||
|
||||
tmpFile, err := os.CreateTemp("", "seed_urls_test_*.txt")
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to create temp file: %v", err)
|
||||
}
|
||||
defer os.Remove(tmpFile.Name())
|
||||
|
||||
if _, err := tmpFile.WriteString(content); err != nil {
|
||||
t.Fatalf("Failed to write to temp file: %v", err)
|
||||
}
|
||||
tmpFile.Close()
|
||||
|
||||
// Reset global variable for test
|
||||
seedlist = nil
|
||||
|
||||
// Test loading
|
||||
err = loadSeedlist(tmpFile.Name())
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to load seedlist: %v", err)
|
||||
}
|
||||
|
||||
// Verify content
|
||||
expected := []string{
|
||||
"gemini://example.com/",
|
||||
"gemini://test.com/",
|
||||
"gemini://demo.org/",
|
||||
}
|
||||
|
||||
urls := GetSeedURLs()
|
||||
if len(urls) != len(expected) {
|
||||
t.Errorf("Expected %d URLs, got %d", len(expected), len(urls))
|
||||
}
|
||||
|
||||
for i, url := range urls {
|
||||
if url != expected[i] {
|
||||
t.Errorf("Expected URL %d to be %s, got %s", i, expected[i], url)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestGetSeedURLsEmptyList(t *testing.T) {
|
||||
// Reset global variable
|
||||
originalSeedlist := seedlist
|
||||
defer func() { seedlist = originalSeedlist }()
|
||||
|
||||
seedlist = nil
|
||||
|
||||
urls := GetSeedURLs()
|
||||
if len(urls) != 0 {
|
||||
t.Errorf("Expected empty list, got %d URLs", len(urls))
|
||||
}
|
||||
}
|
||||
@@ -27,7 +27,7 @@ type Snapshot struct {
|
||||
func SnapshotFromURL(u string, normalize bool) (*Snapshot, error) {
|
||||
url, err := commonUrl.ParseURL(u, "", normalize)
|
||||
if err != nil {
|
||||
return nil, xerrors.NewError(err, 0, "", false)
|
||||
return nil, xerrors.NewSimpleError(err)
|
||||
}
|
||||
newSnapshot := Snapshot{
|
||||
URL: *url,
|
||||
|
||||
@@ -117,7 +117,7 @@ func TestURLOperations(t *testing.T) {
|
||||
}
|
||||
})
|
||||
|
||||
t.Run("NormalizeURL", func(t *testing.T) {
|
||||
t.Run("CheckAndUpdateNormalizedURL", func(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
tests := []struct {
|
||||
|
||||
@@ -7,7 +7,7 @@ import (
|
||||
"strings"
|
||||
|
||||
"gemini-grc/config"
|
||||
"gemini-grc/logging"
|
||||
"git.antanst.com/antanst/logging"
|
||||
"git.antanst.com/antanst/xerrors"
|
||||
)
|
||||
|
||||
|
||||
171
common/worker.go
171
common/worker.go
@@ -19,8 +19,8 @@ import (
|
||||
"gemini-grc/gemini"
|
||||
"gemini-grc/gopher"
|
||||
"gemini-grc/hostPool"
|
||||
"gemini-grc/logging"
|
||||
"gemini-grc/robotsMatch"
|
||||
"git.antanst.com/antanst/logging"
|
||||
"git.antanst.com/antanst/xerrors"
|
||||
"github.com/guregu/null/v5"
|
||||
"github.com/jmoiron/sqlx"
|
||||
@@ -30,7 +30,7 @@ func RunWorkerWithTx(workerID int, job string) {
|
||||
// Extract host from URL for the context.
|
||||
parsedURL, err := url2.ParseURL(job, "", true)
|
||||
if err != nil {
|
||||
logging.LogInfo("Failed to parse job URL: %s Error: %s", job, err)
|
||||
logging.LogInfo("Failed to parse URL: %s Error: %s", job, err)
|
||||
return
|
||||
}
|
||||
host := parsedURL.Hostname
|
||||
@@ -38,8 +38,9 @@ func RunWorkerWithTx(workerID int, job string) {
|
||||
// Create a new worker context
|
||||
baseCtx := context.Background()
|
||||
ctx, cancel := contextutil.NewRequestContext(baseCtx, job, host, workerID)
|
||||
defer cancel() // Ensure the context is cancelled when we're done
|
||||
ctx = contextutil.ContextWithComponent(ctx, "worker")
|
||||
defer cancel() // Ensure the context is cancelled when we're done
|
||||
// contextlog.LogInfoWithContext(ctx, logging.GetSlogger(), "======================================\n\n")
|
||||
contextlog.LogDebugWithContext(ctx, logging.GetSlogger(), "Starting worker for URL %s", job)
|
||||
|
||||
// Create a new db transaction
|
||||
@@ -51,66 +52,53 @@ func RunWorkerWithTx(workerID int, job string) {
|
||||
|
||||
err = runWorker(ctx, tx, []string{job})
|
||||
if err != nil {
|
||||
// Handle context cancellation and timeout errors gracefully, instead of treating them as fatal
|
||||
// Two cases to handle:
|
||||
// - context cancellation/timeout errors (log and ignore)
|
||||
// - fatal errors (log and send to chan)
|
||||
// non-fatal errors should've been handled within
|
||||
// the runWorker() function and not bubble up here.
|
||||
if errors.Is(err, context.DeadlineExceeded) || errors.Is(err, context.Canceled) {
|
||||
contextlog.LogDebugWithContext(ctx, logging.GetSlogger(), "Worker timed out or canceled: %v", err)
|
||||
rollbackErr := SafeRollback(ctx, tx)
|
||||
rollbackErr := gemdb.SafeRollback(ctx, tx)
|
||||
if rollbackErr != nil {
|
||||
FatalErrorsChan <- rollbackErr
|
||||
return
|
||||
}
|
||||
return
|
||||
} else if xerrors.IsFatal(err) {
|
||||
contextlog.LogErrorWithContext(ctx, logging.GetSlogger(), "Worker failed: %v", err)
|
||||
rollbackErr := gemdb.SafeRollback(ctx, tx)
|
||||
if rollbackErr != nil {
|
||||
FatalErrorsChan <- rollbackErr
|
||||
return
|
||||
}
|
||||
FatalErrorsChan <- err
|
||||
return
|
||||
|
||||
}
|
||||
// For other errors, we treat them as fatal.
|
||||
contextlog.LogErrorWithContext(ctx, logging.GetSlogger(), "Worker failed: %v", err)
|
||||
rollbackErr := SafeRollback(ctx, tx)
|
||||
if rollbackErr != nil {
|
||||
FatalErrorsChan <- rollbackErr
|
||||
}
|
||||
FatalErrorsChan <- err
|
||||
return
|
||||
panic(err) // We shouldn't reach this point!
|
||||
}
|
||||
|
||||
contextlog.LogDebugWithContext(ctx, logging.GetSlogger(), "Committing transaction")
|
||||
err = tx.Commit()
|
||||
if err != nil && !errors.Is(err, sql.ErrTxDone) {
|
||||
contextlog.LogErrorWithContext(ctx, logging.GetSlogger(), "Failed to commit transaction: %v", err)
|
||||
if rollbackErr := SafeRollback(ctx, tx); rollbackErr != nil {
|
||||
if rollbackErr := gemdb.SafeRollback(ctx, tx); rollbackErr != nil {
|
||||
FatalErrorsChan <- err
|
||||
return
|
||||
}
|
||||
}
|
||||
contextlog.LogDebugWithContext(ctx, logging.GetSlogger(), "Worker done!")
|
||||
}
|
||||
|
||||
// SafeRollback attempts to roll back a transaction,
|
||||
// handling the case if the tx was already finalized.
|
||||
func SafeRollback(ctx context.Context, tx *sqlx.Tx) error {
|
||||
rollbackErr := tx.Rollback()
|
||||
if rollbackErr != nil {
|
||||
// Check if it's the standard "transaction already finalized" error
|
||||
if errors.Is(rollbackErr, sql.ErrTxDone) {
|
||||
contextlog.LogWarnWithContext(ctx, logging.GetSlogger(), "Rollback failed because transaction is already finalized")
|
||||
return nil
|
||||
}
|
||||
// Only panic for other types of rollback failures
|
||||
contextlog.LogErrorWithContext(ctx, logging.GetSlogger(), "Failed to rollback transaction: %v", rollbackErr)
|
||||
return xerrors.NewError(fmt.Errorf("failed to rollback transaction: %w", rollbackErr), 0, "", true)
|
||||
}
|
||||
return nil
|
||||
contextlog.LogDebugWithContext(ctx, logging.GetSlogger(), "Worker done.")
|
||||
}
|
||||
|
||||
func runWorker(ctx context.Context, tx *sqlx.Tx, urls []string) error {
|
||||
total := len(urls)
|
||||
for i, u := range urls {
|
||||
contextlog.LogDebugWithContext(ctx, logging.GetSlogger(), "Starting %d/%d %s", i+1, total, u)
|
||||
urlCtx, cancelFunc := context.WithCancel(ctx)
|
||||
err := WorkOnUrl(urlCtx, tx, u)
|
||||
cancelFunc()
|
||||
for _, u := range urls {
|
||||
err := WorkOnUrl(ctx, tx, u)
|
||||
if err != nil {
|
||||
return err
|
||||
if errors.Is(err, context.DeadlineExceeded) || errors.Is(err, context.Canceled) || xerrors.IsFatal(err) {
|
||||
return err
|
||||
}
|
||||
contextlog.LogErrorWithContext(ctx, logging.GetSlogger(), "Worker failed: %v", err)
|
||||
}
|
||||
contextlog.LogDebugWithContext(ctx, logging.GetSlogger(), "Done %d/%d.", i+1, total)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
@@ -119,47 +107,44 @@ func runWorker(ctx context.Context, tx *sqlx.Tx, urls []string) error {
|
||||
// unexpected errors are returned.
|
||||
// expected errors are stored within the snapshot.
|
||||
func WorkOnUrl(ctx context.Context, tx *sqlx.Tx, url string) (err error) {
|
||||
// Create a context specifically for this URL with "url" component
|
||||
urlCtx := contextutil.ContextWithComponent(ctx, "url")
|
||||
|
||||
contextlog.LogDebugWithContext(urlCtx, logging.GetSlogger(), "Processing URL: %s", url)
|
||||
contextlog.LogDebugWithContext(ctx, logging.GetSlogger(), "Worker visiting URL %s", url)
|
||||
|
||||
s, err := snapshot.SnapshotFromURL(url, true)
|
||||
if err != nil {
|
||||
contextlog.LogErrorWithContext(urlCtx, logging.GetSlogger(), "Failed to parse URL: %v", err)
|
||||
contextlog.LogErrorWithContext(ctx, logging.GetSlogger(), "Failed to parse URL: %v", err)
|
||||
return err
|
||||
}
|
||||
|
||||
// We always use the normalized URL
|
||||
if url != s.URL.Full {
|
||||
//err = gemdb.Database.CheckAndUpdateNormalizedURL(ctx, tx, url, s.URL.Full)
|
||||
//if err != nil {
|
||||
// return err
|
||||
//}
|
||||
//contextlog.LogDebugWithContext(ctx, logging.GetSlogger(), "Normalized URL: %s → %s", url, s.URL.Full)
|
||||
url = s.URL.Full
|
||||
}
|
||||
|
||||
isGemini := url2.IsGeminiUrl(s.URL.String())
|
||||
isGopher := url2.IsGopherURL(s.URL.String())
|
||||
|
||||
if !isGemini && !isGopher {
|
||||
return xerrors.NewError(fmt.Errorf("not a Gopher or Gemini URL: %s", s.URL.String()), 0, "", false)
|
||||
return xerrors.NewSimpleError(fmt.Errorf("not a Gopher or Gemini URL: %s", s.URL.String()))
|
||||
}
|
||||
|
||||
if isGopher && !config.CONFIG.GopherEnable {
|
||||
contextlog.LogDebugWithContext(urlCtx, logging.GetSlogger(), "Skipping gopher URL (disabled in config)")
|
||||
return nil
|
||||
}
|
||||
|
||||
if url != s.URL.Full {
|
||||
err = gemdb.Database.NormalizeURL(ctx, tx, url, s.URL.Full)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
contextlog.LogDebugWithContext(urlCtx, logging.GetSlogger(), "Normalized URL: %s → %s", url, s.URL.Full)
|
||||
url = s.URL.Full
|
||||
return xerrors.NewSimpleError(fmt.Errorf("gopher disabled, not processing Gopher URL: %s", s.URL.String()))
|
||||
}
|
||||
|
||||
// Check if URL is whitelisted
|
||||
isUrlWhitelisted := whiteList.IsWhitelisted(s.URL.String())
|
||||
if isUrlWhitelisted {
|
||||
contextlog.LogInfoWithContext(urlCtx, logging.GetSlogger(), "URL matches whitelist, forcing crawl %s", url)
|
||||
contextlog.LogDebugWithContext(ctx, logging.GetSlogger(), "URL matches whitelist, forcing crawl %s", url)
|
||||
}
|
||||
|
||||
// Only check blacklist if URL is not whitelisted
|
||||
if !isUrlWhitelisted && blackList.IsBlacklisted(s.URL.String()) {
|
||||
contextlog.LogInfoWithContext(urlCtx, logging.GetSlogger(), "URL matches blacklist, ignoring %s", url)
|
||||
contextlog.LogDebugWithContext(ctx, logging.GetSlogger(), "URL matches blacklist, ignoring %s", url)
|
||||
s.Error = null.StringFrom(commonErrors.ErrBlacklistMatch.Error())
|
||||
return saveSnapshotAndRemoveURL(ctx, tx, s)
|
||||
}
|
||||
@@ -169,58 +154,44 @@ func WorkOnUrl(ctx context.Context, tx *sqlx.Tx, url string) (err error) {
|
||||
if !isUrlWhitelisted && isGemini {
|
||||
// If URL matches a robots.txt disallow line,
|
||||
// add it as an error and remove url
|
||||
robotMatch, err = robotsMatch.RobotMatch(urlCtx, s.URL.String())
|
||||
if err != nil {
|
||||
if commonErrors.IsHostError(err) {
|
||||
return removeURL(ctx, tx, s.URL.String())
|
||||
}
|
||||
return err
|
||||
}
|
||||
robotMatch = robotsMatch.RobotMatch(ctx, s.URL.String())
|
||||
if robotMatch {
|
||||
contextlog.LogInfoWithContext(urlCtx, logging.GetSlogger(), "URL matches robots.txt, skipping")
|
||||
contextlog.LogDebugWithContext(ctx, logging.GetSlogger(), "URL matches robots.txt, skipping")
|
||||
s.Error = null.StringFrom(commonErrors.ErrRobotsMatch.Error())
|
||||
return saveSnapshotAndRemoveURL(ctx, tx, s)
|
||||
}
|
||||
}
|
||||
|
||||
contextlog.LogDebugWithContext(urlCtx, logging.GetSlogger(), "Adding to host pool")
|
||||
err = hostPool.AddHostToHostPool(urlCtx, s.Host)
|
||||
err = hostPool.AddHostToHostPool(ctx, s.Host)
|
||||
if err != nil {
|
||||
contextlog.LogErrorWithContext(urlCtx, logging.GetSlogger(), "Failed to add host to pool: %v", err)
|
||||
return err
|
||||
}
|
||||
|
||||
defer func(ctx context.Context, host string) {
|
||||
hostPool.RemoveHostFromPool(ctx, host)
|
||||
}(urlCtx, s.Host)
|
||||
}(ctx, s.Host)
|
||||
|
||||
contextlog.LogDebugWithContext(urlCtx, logging.GetSlogger(), "Visiting %s", s.URL.String())
|
||||
contextlog.LogDebugWithContext(ctx, logging.GetSlogger(), "Visiting %s", s.URL.String())
|
||||
|
||||
// Use context-aware visits for both protocols
|
||||
if isGopher {
|
||||
// Use the context-aware version for Gopher visits
|
||||
s, err = gopher.VisitWithContext(urlCtx, s.URL.String())
|
||||
s, err = gopher.VisitWithContext(ctx, s.URL.String())
|
||||
} else {
|
||||
// Use the context-aware version for Gemini visits
|
||||
s, err = gemini.Visit(urlCtx, s.URL.String())
|
||||
s, err = gemini.Visit(ctx, s.URL.String())
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
contextlog.LogErrorWithContext(urlCtx, logging.GetSlogger(), "Error visiting URL: %v", err)
|
||||
contextlog.LogInfoWithContext(ctx, logging.GetSlogger(), "Error visiting URL: %v", err)
|
||||
return err
|
||||
}
|
||||
if s == nil {
|
||||
contextlog.LogDebugWithContext(urlCtx, logging.GetSlogger(), "No snapshot returned")
|
||||
return nil
|
||||
}
|
||||
|
||||
// Handle Gemini redirection.
|
||||
if isGemini &&
|
||||
s.ResponseCode.ValueOrZero() >= 30 &&
|
||||
s.ResponseCode.ValueOrZero() < 40 {
|
||||
err = handleRedirection(urlCtx, tx, s)
|
||||
err = saveRedirectURL(ctx, tx, s)
|
||||
if err != nil {
|
||||
return fmt.Errorf("error while handling redirection: %s", err)
|
||||
return xerrors.NewSimpleError(fmt.Errorf("error while handling redirection: %s", err))
|
||||
}
|
||||
}
|
||||
|
||||
@@ -231,14 +202,14 @@ func WorkOnUrl(ctx context.Context, tx *sqlx.Tx, url string) (err error) {
|
||||
return err
|
||||
}
|
||||
if identical {
|
||||
contextlog.LogDebugWithContext(urlCtx, logging.GetSlogger(), "Content identical to existing snapshot, skipping")
|
||||
contextlog.LogInfoWithContext(ctx, logging.GetSlogger(), "Content identical to existing snapshot, skipping")
|
||||
return removeURL(ctx, tx, s.URL.String())
|
||||
}
|
||||
}
|
||||
|
||||
// Process and store links since content has changed
|
||||
if len(s.Links.ValueOrZero()) > 0 {
|
||||
contextlog.LogDebugWithContext(urlCtx, logging.GetSlogger(), "Found %d links", len(s.Links.ValueOrZero()))
|
||||
contextlog.LogDebugWithContext(ctx, logging.GetSlogger(), "Found %d links", len(s.Links.ValueOrZero()))
|
||||
err = storeLinks(ctx, tx, s)
|
||||
if err != nil {
|
||||
return err
|
||||
@@ -246,7 +217,11 @@ func WorkOnUrl(ctx context.Context, tx *sqlx.Tx, url string) (err error) {
|
||||
}
|
||||
|
||||
// Save the snapshot and remove the URL from the queue
|
||||
contextlog.LogInfoWithContext(urlCtx, logging.GetSlogger(), "%2d %s", s.ResponseCode.ValueOrZero(), s.URL.String())
|
||||
if s.Error.ValueOrZero() != "" {
|
||||
contextlog.LogInfoWithContext(ctx, logging.GetSlogger(), "%2d %s", s.ResponseCode.ValueOrZero(), s.Error.ValueOrZero())
|
||||
} else {
|
||||
contextlog.LogInfoWithContext(ctx, logging.GetSlogger(), "%2d", s.ResponseCode.ValueOrZero())
|
||||
}
|
||||
return saveSnapshotAndRemoveURL(ctx, tx, s)
|
||||
}
|
||||
|
||||
@@ -273,12 +248,10 @@ func storeLinks(ctx context.Context, tx *sqlx.Tx, s *snapshot.Snapshot) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// Context-aware version of removeURL
|
||||
func removeURL(ctx context.Context, tx *sqlx.Tx, url string) error {
|
||||
return gemdb.Database.DeleteURL(ctx, tx, url)
|
||||
}
|
||||
|
||||
// Context-aware version of saveSnapshotAndRemoveURL
|
||||
func saveSnapshotAndRemoveURL(ctx context.Context, tx *sqlx.Tx, s *snapshot.Snapshot) error {
|
||||
err := gemdb.Database.SaveSnapshot(ctx, tx, s)
|
||||
if err != nil {
|
||||
@@ -304,7 +277,7 @@ func haveWeVisitedURL(ctx context.Context, tx *sqlx.Tx, u string) (bool, error)
|
||||
|
||||
// Check if the context is cancelled
|
||||
if err := ctx.Err(); err != nil {
|
||||
return false, err
|
||||
return false, xerrors.NewSimpleError(err)
|
||||
}
|
||||
|
||||
// Check the urls table which holds the crawl queue.
|
||||
@@ -337,7 +310,6 @@ func haveWeVisitedURL(ctx context.Context, tx *sqlx.Tx, u string) (bool, error)
|
||||
}
|
||||
|
||||
if len(recentSnapshots) > 0 {
|
||||
contextlog.LogDebugWithContext(ctx, logging.GetSlogger(), "Skipping URL %s (updated within last %d days)", u, config.CONFIG.SkipIfUpdatedDays)
|
||||
return true, nil
|
||||
}
|
||||
}
|
||||
@@ -345,30 +317,25 @@ func haveWeVisitedURL(ctx context.Context, tx *sqlx.Tx, u string) (bool, error)
|
||||
return false, nil
|
||||
}
|
||||
|
||||
func handleRedirection(ctx context.Context, tx *sqlx.Tx, s *snapshot.Snapshot) error {
|
||||
// Create a context specifically for redirection handling
|
||||
redirectCtx := contextutil.ContextWithComponent(ctx, "redirect")
|
||||
|
||||
// Use the redirectCtx for all operations
|
||||
func saveRedirectURL(ctx context.Context, tx *sqlx.Tx, s *snapshot.Snapshot) error {
|
||||
newURL, err := url2.ExtractRedirectTargetFromHeader(s.URL, s.Header.ValueOrZero())
|
||||
if err != nil {
|
||||
contextlog.LogErrorWithContext(redirectCtx, logging.GetSlogger(), "Failed to extract redirect target: %v", err)
|
||||
contextlog.LogDebugWithContext(ctx, logging.GetSlogger(), "Failed to extract redirect target: %v", err)
|
||||
return err
|
||||
}
|
||||
contextlog.LogDebugWithContext(redirectCtx, logging.GetSlogger(), "Page redirects to %s", newURL)
|
||||
contextlog.LogDebugWithContext(ctx, logging.GetSlogger(), "Page redirects to %s", newURL)
|
||||
|
||||
haveWeVisited, err := haveWeVisitedURL(redirectCtx, tx, newURL.String())
|
||||
haveWeVisited, err := haveWeVisitedURL(ctx, tx, newURL.String())
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if shouldPersistURL(newURL) && !haveWeVisited {
|
||||
err = gemdb.Database.InsertURL(redirectCtx, tx, newURL.Full)
|
||||
err = gemdb.Database.InsertURL(ctx, tx, newURL.Full)
|
||||
if err != nil {
|
||||
contextlog.LogErrorWithContext(redirectCtx, logging.GetSlogger(), "Failed to insert redirect URL: %v", err)
|
||||
return err
|
||||
}
|
||||
contextlog.LogDebugWithContext(redirectCtx, logging.GetSlogger(), "Saved redirection URL %s", newURL.String())
|
||||
contextlog.LogDebugWithContext(ctx, logging.GetSlogger(), "Saved redirection URL %s", newURL.String())
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
46
db/db.go
46
db/db.go
@@ -16,7 +16,7 @@ import (
|
||||
commonUrl "gemini-grc/common/url"
|
||||
"gemini-grc/config"
|
||||
"gemini-grc/contextutil"
|
||||
"gemini-grc/logging"
|
||||
"git.antanst.com/antanst/logging"
|
||||
"git.antanst.com/antanst/xerrors"
|
||||
"github.com/guregu/null/v5"
|
||||
_ "github.com/jackc/pgx/v5/stdlib" // PGX driver for PostgreSQL
|
||||
@@ -32,7 +32,7 @@ type DbService interface {
|
||||
|
||||
// URL methods
|
||||
InsertURL(ctx context.Context, tx *sqlx.Tx, url string) error
|
||||
NormalizeURL(ctx context.Context, tx *sqlx.Tx, url string, normalizedURL string) error
|
||||
CheckAndUpdateNormalizedURL(ctx context.Context, tx *sqlx.Tx, url string, normalizedURL string) error
|
||||
DeleteURL(ctx context.Context, tx *sqlx.Tx, url string) error
|
||||
MarkURLsAsBeingProcessed(ctx context.Context, tx *sqlx.Tx, urls []string) error
|
||||
GetUrlHosts(ctx context.Context, tx *sqlx.Tx) ([]string, error)
|
||||
@@ -117,10 +117,13 @@ func (d *DbServiceImpl) Initialize(ctx context.Context) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// Shutdown the database with context
|
||||
func (d *DbServiceImpl) Shutdown(ctx context.Context) error {
|
||||
dbCtx := contextutil.ContextWithComponent(ctx, "database")
|
||||
contextlog.LogDebugWithContext(dbCtx, logging.GetSlogger(), "Shutting down database connection")
|
||||
contextlog.LogDebugWithContext(dbCtx, logging.GetSlogger(), "Shutting down database connections")
|
||||
_, err := d.db.Query("UPDATE urls SET being_processed=false")
|
||||
if err != nil {
|
||||
contextlog.LogErrorWithContext(dbCtx, logging.GetSlogger(), "Unable to update urls table: %v", err)
|
||||
}
|
||||
|
||||
d.mu.Lock()
|
||||
defer d.mu.Unlock()
|
||||
@@ -129,12 +132,11 @@ func (d *DbServiceImpl) Shutdown(ctx context.Context) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// Check if the context is cancelled before proceeding
|
||||
if err := ctx.Err(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
err := d.db.Close()
|
||||
err = d.db.Close()
|
||||
if err != nil {
|
||||
contextlog.LogErrorWithContext(dbCtx, logging.GetSlogger(), "Error closing database connection: %v", err)
|
||||
} else {
|
||||
@@ -154,7 +156,6 @@ func (d *DbServiceImpl) NewTx(ctx context.Context) (*sqlx.Tx, error) {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
contextlog.LogDebugWithContext(dbCtx, logging.GetSlogger(), "Creating new database transaction")
|
||||
tx, err := d.db.BeginTxx(ctx, nil)
|
||||
if err != nil {
|
||||
contextlog.LogErrorWithContext(dbCtx, logging.GetSlogger(), "Failed to create transaction: %v", err)
|
||||
@@ -199,7 +200,7 @@ func (d *DbServiceImpl) InsertURL(ctx context.Context, tx *sqlx.Tx, url string)
|
||||
}
|
||||
|
||||
// NormalizeURL normalizes a URL with context
|
||||
func (d *DbServiceImpl) NormalizeURL(ctx context.Context, tx *sqlx.Tx, url string, normalizedURL string) error {
|
||||
func (d *DbServiceImpl) CheckAndUpdateNormalizedURL(ctx context.Context, tx *sqlx.Tx, url string, normalizedURL string) error {
|
||||
dbCtx := contextutil.ContextWithComponent(ctx, "database")
|
||||
|
||||
// Check if URLs are already the same
|
||||
@@ -214,7 +215,6 @@ func (d *DbServiceImpl) NormalizeURL(ctx context.Context, tx *sqlx.Tx, url strin
|
||||
|
||||
contextlog.LogDebugWithContext(dbCtx, logging.GetSlogger(), "Updating normalized URL %s -> %s", url, normalizedURL)
|
||||
|
||||
// Context-aware implementation
|
||||
query := SQL_UPDATE_URL
|
||||
a := struct {
|
||||
Url string `db:"Url"`
|
||||
@@ -514,12 +514,13 @@ func (d *DbServiceImpl) IsContentIdentical(ctx context.Context, tx *sqlx.Tx, s *
|
||||
return false, err
|
||||
}
|
||||
|
||||
// Context-aware implementation
|
||||
// Update: Skipped this because empty pages can be valid
|
||||
// ex. pages with redirect headers
|
||||
// Only check for identical content if we have gemtext or data
|
||||
if (!s.GemText.Valid || s.GemText.String == "") &&
|
||||
(!s.Data.Valid || len(s.Data.V) == 0) {
|
||||
return false, nil
|
||||
}
|
||||
//if (!s.GemText.Valid || s.GemText.String == "") &&
|
||||
// (!s.Data.Valid || len(s.Data.V) == 0) {
|
||||
// return false, nil
|
||||
//}
|
||||
|
||||
// Try to get the latest snapshot for this URL
|
||||
latestSnapshot := &snapshot.Snapshot{}
|
||||
@@ -546,3 +547,20 @@ func (d *DbServiceImpl) IsContentIdentical(ctx context.Context, tx *sqlx.Tx, s *
|
||||
|
||||
return false, nil
|
||||
}
|
||||
|
||||
// SafeRollback attempts to roll back a transaction,
|
||||
// handling the case if the tx was already finalized.
|
||||
func SafeRollback(ctx context.Context, tx *sqlx.Tx) error {
|
||||
rollbackErr := tx.Rollback()
|
||||
if rollbackErr != nil {
|
||||
// Check if it's the standard "transaction already finalized" error
|
||||
if errors.Is(rollbackErr, sql.ErrTxDone) {
|
||||
contextlog.LogWarnWithContext(ctx, logging.GetSlogger(), "Rollback failed because transaction is already finalized")
|
||||
return nil
|
||||
}
|
||||
// Only return error for other types of rollback failures
|
||||
contextlog.LogErrorWithContext(ctx, logging.GetSlogger(), "Failed to rollback transaction: %v", rollbackErr)
|
||||
return xerrors.NewError(fmt.Errorf("failed to rollback transaction: %w", rollbackErr), 0, "", true)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
16
gemget.sh
16
gemget.sh
@@ -1,16 +0,0 @@
|
||||
#!/bin/env bash
|
||||
set -eu
|
||||
set -o pipefail
|
||||
|
||||
# Max response size 10MiB
|
||||
LOG_LEVEL=debug \
|
||||
PRINT_WORKER_STATUS=false \
|
||||
DRY_RUN=false \
|
||||
NUM_OF_WORKERS=1 \
|
||||
WORKER_BATCH_SIZE=1 \
|
||||
BLACKLIST_PATH="$(pwd)/blacklist.txt" \
|
||||
MAX_RESPONSE_SIZE=10485760 \
|
||||
RESPONSE_TIMEOUT=10 \
|
||||
PANIC_ON_UNEXPECTED_ERROR=true \
|
||||
go run ./bin/gemget/main.go "$@"
|
||||
|
||||
@@ -7,8 +7,8 @@ import (
|
||||
|
||||
"gemini-grc/common/linkList"
|
||||
url2 "gemini-grc/common/url"
|
||||
"gemini-grc/logging"
|
||||
"gemini-grc/util"
|
||||
"git.antanst.com/antanst/logging"
|
||||
"git.antanst.com/antanst/xerrors"
|
||||
)
|
||||
|
||||
|
||||
@@ -1,23 +1,214 @@
|
||||
package gemini
|
||||
|
||||
import (
|
||||
"context"
|
||||
"crypto/tls"
|
||||
"errors"
|
||||
"fmt"
|
||||
"io"
|
||||
"net"
|
||||
stdurl "net/url"
|
||||
"regexp"
|
||||
"slices"
|
||||
"strconv"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
commonErrors "gemini-grc/common/errors"
|
||||
"gemini-grc/common/contextlog"
|
||||
"gemini-grc/common/snapshot"
|
||||
_url "gemini-grc/common/url"
|
||||
"gemini-grc/config"
|
||||
"gemini-grc/contextutil"
|
||||
"git.antanst.com/antanst/logging"
|
||||
"git.antanst.com/antanst/xerrors"
|
||||
"github.com/guregu/null/v5"
|
||||
)
|
||||
|
||||
// ProcessData processes the raw data from a Gemini response and populates the Snapshot.
|
||||
// Visit visits a given URL using the Gemini protocol,
|
||||
// and returns a populated snapshot. Any relevant errors
|
||||
// when visiting the URL are stored in the snapshot;
|
||||
// an error is returned only when construction of a
|
||||
// snapshot was not possible (context cancellation errors,
|
||||
// not a valid URL etc.)
|
||||
func Visit(ctx context.Context, url string) (s *snapshot.Snapshot, err error) {
|
||||
geminiCtx := contextutil.ContextWithComponent(ctx, "network")
|
||||
|
||||
s, err = snapshot.SnapshotFromURL(url, true)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// Check if the context has been canceled
|
||||
if err := ctx.Err(); err != nil {
|
||||
return nil, xerrors.NewSimpleError(err)
|
||||
}
|
||||
|
||||
data, err := ConnectAndGetData(geminiCtx, s.URL.String())
|
||||
if err != nil {
|
||||
s.Error = null.StringFrom(err.Error())
|
||||
return s, nil
|
||||
}
|
||||
|
||||
// Check if the context has been canceled
|
||||
if err := ctx.Err(); err != nil {
|
||||
return nil, xerrors.NewSimpleError(err)
|
||||
}
|
||||
|
||||
s = UpdateSnapshotWithData(*s, data)
|
||||
|
||||
if !s.Error.Valid &&
|
||||
s.MimeType.Valid &&
|
||||
s.MimeType.String == "text/gemini" &&
|
||||
len(s.GemText.ValueOrZero()) > 0 {
|
||||
links := GetPageLinks(s.URL, s.GemText.String)
|
||||
if len(links) > 0 {
|
||||
s.Links = null.ValueFrom(links)
|
||||
}
|
||||
}
|
||||
|
||||
return s, nil
|
||||
}
|
||||
|
||||
// ConnectAndGetData is a context-aware version of ConnectAndGetData
|
||||
// that returns the data from a GET request to a Gemini URL. It uses the context
|
||||
// for cancellation, timeout, and logging.
|
||||
func ConnectAndGetData(ctx context.Context, url string) ([]byte, error) {
|
||||
parsedURL, err := stdurl.Parse(url)
|
||||
if err != nil {
|
||||
return nil, xerrors.NewSimpleError(fmt.Errorf("error parsing URL: %w", err))
|
||||
}
|
||||
|
||||
hostname := parsedURL.Hostname()
|
||||
port := parsedURL.Port()
|
||||
if port == "" {
|
||||
port = "1965"
|
||||
}
|
||||
host := fmt.Sprintf("%s:%s", hostname, port)
|
||||
|
||||
// Check if the context has been canceled before proceeding
|
||||
if err := ctx.Err(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
timeoutDuration := time.Duration(config.CONFIG.ResponseTimeout) * time.Second
|
||||
|
||||
// Establish the underlying TCP connection with context-based cancellation
|
||||
dialer := &net.Dialer{
|
||||
Timeout: timeoutDuration,
|
||||
}
|
||||
|
||||
conn, err := dialer.DialContext(ctx, "tcp", host)
|
||||
if err != nil {
|
||||
contextlog.LogDebugWithContext(ctx, logging.GetSlogger(), "Failed to establish TCP connection: %v", err)
|
||||
return nil, xerrors.NewSimpleError(err)
|
||||
}
|
||||
|
||||
// Make sure we always close the connection
|
||||
defer func() {
|
||||
_ = conn.Close()
|
||||
}()
|
||||
|
||||
err = conn.SetReadDeadline(time.Now().Add(timeoutDuration))
|
||||
if err != nil {
|
||||
return nil, xerrors.NewSimpleError(err)
|
||||
}
|
||||
err = conn.SetWriteDeadline(time.Now().Add(timeoutDuration))
|
||||
if err != nil {
|
||||
return nil, xerrors.NewSimpleError(err)
|
||||
}
|
||||
|
||||
// Check if the context has been canceled before proceeding with TLS handshake
|
||||
if err := ctx.Err(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// Perform the TLS handshake
|
||||
tlsConfig := &tls.Config{
|
||||
InsecureSkipVerify: true, //nolint:gosec // Accept all TLS certs, even if insecure.
|
||||
ServerName: parsedURL.Hostname(), // SNI says we should not include port in hostname
|
||||
}
|
||||
|
||||
tlsConn := tls.Client(conn, tlsConfig)
|
||||
err = tlsConn.SetReadDeadline(time.Now().Add(timeoutDuration))
|
||||
if err != nil {
|
||||
return nil, xerrors.NewSimpleError(err)
|
||||
}
|
||||
err = tlsConn.SetWriteDeadline(time.Now().Add(timeoutDuration))
|
||||
if err != nil {
|
||||
return nil, xerrors.NewSimpleError(err)
|
||||
}
|
||||
|
||||
// Check if the context is done before attempting handshake
|
||||
if err := ctx.Err(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// Perform TLS handshake with regular method
|
||||
// (HandshakeContext is only available in Go 1.17+)
|
||||
err = tlsConn.Handshake()
|
||||
if err != nil {
|
||||
return nil, xerrors.NewSimpleError(err)
|
||||
}
|
||||
|
||||
// Check again if the context is done after handshake
|
||||
if err := ctx.Err(); err != nil {
|
||||
return nil, xerrors.NewSimpleError(err)
|
||||
}
|
||||
|
||||
// We read `buf`-sized chunks and add data to `data`
|
||||
buf := make([]byte, 4096)
|
||||
var data []byte
|
||||
|
||||
// Check if the context has been canceled before sending request
|
||||
if err := ctx.Err(); err != nil {
|
||||
return nil, xerrors.NewSimpleError(err)
|
||||
}
|
||||
|
||||
// Send Gemini request to trigger server response
|
||||
// Fix for stupid server bug:
|
||||
// Some servers return 'Header: 53 No proxying to other hosts or ports!'
|
||||
// when the port is 1965 and is still specified explicitly in the URL.
|
||||
url2, _ := _url.ParseURL(url, "", true)
|
||||
_, err = tlsConn.Write([]byte(fmt.Sprintf("%s\r\n", url2.StringNoDefaultPort())))
|
||||
if err != nil {
|
||||
return nil, xerrors.NewSimpleError(err)
|
||||
}
|
||||
|
||||
// Read response bytes in len(buf) byte chunks
|
||||
for {
|
||||
// Check if the context has been canceled before each read
|
||||
if err := ctx.Err(); err != nil {
|
||||
return nil, xerrors.NewSimpleError(err)
|
||||
}
|
||||
|
||||
n, err := tlsConn.Read(buf)
|
||||
if n > 0 {
|
||||
data = append(data, buf[:n]...)
|
||||
}
|
||||
if len(data) > config.CONFIG.MaxResponseSize {
|
||||
contextlog.LogDebugWithContext(ctx, logging.GetSlogger(), "Response too large (max: %d bytes)", config.CONFIG.MaxResponseSize)
|
||||
return nil, xerrors.NewSimpleError(fmt.Errorf("response too large"))
|
||||
}
|
||||
if err != nil {
|
||||
if errors.Is(err, io.EOF) {
|
||||
break
|
||||
}
|
||||
contextlog.LogDebugWithContext(ctx, logging.GetSlogger(), "Error reading data: %v", err)
|
||||
return nil, xerrors.NewSimpleError(err)
|
||||
}
|
||||
}
|
||||
|
||||
contextlog.LogDebugWithContext(ctx, logging.GetSlogger(), "Received %d bytes of data", len(data))
|
||||
return data, nil
|
||||
}
|
||||
|
||||
// UpdateSnapshotWithData processes the raw data from a Gemini response and populates the Snapshot.
|
||||
// This function is exported for use by the robotsMatch package.
|
||||
func ProcessData(s snapshot.Snapshot, data []byte) (*snapshot.Snapshot, error) {
|
||||
func UpdateSnapshotWithData(s snapshot.Snapshot, data []byte) *snapshot.Snapshot {
|
||||
header, body, err := getHeadersAndData(data)
|
||||
if err != nil {
|
||||
return &s, err
|
||||
s.Error = null.StringFrom(err.Error())
|
||||
return &s
|
||||
}
|
||||
code, mimeType, lang := getMimeTypeAndLang(header)
|
||||
|
||||
@@ -39,13 +230,14 @@ func ProcessData(s snapshot.Snapshot, data []byte) (*snapshot.Snapshot, error) {
|
||||
if mimeType == "text/gemini" {
|
||||
validBody, err := BytesToValidUTF8(body)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
s.Error = null.StringFrom(err.Error())
|
||||
return &s
|
||||
}
|
||||
s.GemText = null.StringFrom(validBody)
|
||||
} else {
|
||||
s.Data = null.ValueFrom(body)
|
||||
}
|
||||
return &s, nil
|
||||
return &s
|
||||
}
|
||||
|
||||
// Checks for a Gemini header, which is
|
||||
@@ -55,7 +247,7 @@ func ProcessData(s snapshot.Snapshot, data []byte) (*snapshot.Snapshot, error) {
|
||||
func getHeadersAndData(data []byte) (string, []byte, error) {
|
||||
firstLineEnds := slices.Index(data, '\n')
|
||||
if firstLineEnds == -1 {
|
||||
return "", nil, commonErrors.NewHostError(fmt.Errorf("error parsing header"))
|
||||
return "", nil, xerrors.NewSimpleError(fmt.Errorf("error parsing header"))
|
||||
}
|
||||
firstLine := string(data[:firstLineEnds])
|
||||
rest := data[firstLineEnds+1:]
|
||||
@@ -106,7 +298,3 @@ func getMimeTypeAndLang(headers string) (int, string, string) {
|
||||
lang := matches[3] // Will be empty string if no lang parameter was found
|
||||
return code, mimeType, lang
|
||||
}
|
||||
|
||||
func isGeminiCapsule(s *snapshot.Snapshot) bool {
|
||||
return !s.Error.Valid && s.MimeType.Valid && s.MimeType.String == "text/gemini"
|
||||
}
|
||||
|
||||
@@ -1,276 +0,0 @@
|
||||
package gemini
|
||||
|
||||
import (
|
||||
"context"
|
||||
"crypto/tls"
|
||||
"errors"
|
||||
"fmt"
|
||||
"io"
|
||||
"net"
|
||||
stdurl "net/url"
|
||||
"time"
|
||||
|
||||
"gemini-grc/common/contextlog"
|
||||
commonErrors "gemini-grc/common/errors"
|
||||
"gemini-grc/common/snapshot"
|
||||
_url "gemini-grc/common/url"
|
||||
"gemini-grc/config"
|
||||
"gemini-grc/contextutil"
|
||||
"gemini-grc/logging"
|
||||
"git.antanst.com/antanst/xerrors"
|
||||
"github.com/guregu/null/v5"
|
||||
)
|
||||
|
||||
// Visit visits a given URL using the Gemini protocol.
|
||||
func Visit(ctx context.Context, url string) (s *snapshot.Snapshot, err error) {
|
||||
geminiCtx := contextutil.ContextWithComponent(ctx, "gemini")
|
||||
|
||||
contextlog.LogDebugWithContext(geminiCtx, logging.GetSlogger(), "Visiting Gemini URL: %s", url)
|
||||
|
||||
s, err = snapshot.SnapshotFromURL(url, true)
|
||||
if err != nil {
|
||||
contextlog.LogErrorWithContext(geminiCtx, logging.GetSlogger(), "Failed to create snapshot from URL: %v", err)
|
||||
return nil, err
|
||||
}
|
||||
|
||||
defer func() {
|
||||
if err == nil {
|
||||
return
|
||||
}
|
||||
// GeminiError and HostError should
|
||||
// be stored in the snapshot.
|
||||
if commonErrors.IsHostError(err) {
|
||||
contextlog.LogInfoWithContext(geminiCtx, logging.GetSlogger(), "Host error: %v", err)
|
||||
s.Error = null.StringFrom(err.Error())
|
||||
err = nil
|
||||
return
|
||||
} else if IsGeminiError(err) {
|
||||
contextlog.LogInfoWithContext(geminiCtx, logging.GetSlogger(), "Gemini error: %v", err)
|
||||
s.Error = null.StringFrom(err.Error())
|
||||
s.Header = null.StringFrom(errors.Unwrap(err).(*GeminiError).Header)
|
||||
s.ResponseCode = null.IntFrom(int64(errors.Unwrap(err).(*GeminiError).Code))
|
||||
err = nil
|
||||
return
|
||||
}
|
||||
}()
|
||||
|
||||
// Check if the context has been canceled
|
||||
if err := ctx.Err(); err != nil {
|
||||
return s, err
|
||||
}
|
||||
|
||||
data, err := ConnectAndGetDataWithContext(geminiCtx, s.URL.String())
|
||||
if err != nil {
|
||||
return s, err
|
||||
}
|
||||
|
||||
// Check if the context has been canceled
|
||||
if err := ctx.Err(); err != nil {
|
||||
return s, err
|
||||
}
|
||||
|
||||
s, err = ProcessData(*s, data)
|
||||
if err != nil {
|
||||
return s, err
|
||||
}
|
||||
|
||||
if isGeminiCapsule(s) {
|
||||
links := GetPageLinks(s.URL, s.GemText.String)
|
||||
if len(links) > 0 {
|
||||
s.Links = null.ValueFrom(links)
|
||||
}
|
||||
}
|
||||
|
||||
contextlog.LogDebugWithContext(geminiCtx, logging.GetSlogger(), "Successfully visited URL: %s (Code: %d)", url, s.ResponseCode.ValueOrZero())
|
||||
return s, nil
|
||||
}
|
||||
|
||||
// ConnectAndGetDataWithContext is a context-aware version of ConnectAndGetData
|
||||
// that returns the data from a GET request to a Gemini URL. It uses the context
|
||||
// for cancellation, timeout, and logging.
|
||||
func ConnectAndGetDataWithContext(ctx context.Context, url string) ([]byte, error) {
|
||||
// Parse the URL
|
||||
parsedURL, err := stdurl.Parse(url)
|
||||
if err != nil {
|
||||
return nil, xerrors.NewError(fmt.Errorf("error parsing URL: %w", err), 0, "", false)
|
||||
}
|
||||
|
||||
hostname := parsedURL.Hostname()
|
||||
port := parsedURL.Port()
|
||||
if port == "" {
|
||||
port = "1965"
|
||||
}
|
||||
host := fmt.Sprintf("%s:%s", hostname, port)
|
||||
|
||||
// Check if the context has been canceled before proceeding
|
||||
if err := ctx.Err(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
contextlog.LogDebugWithContext(ctx, logging.GetSlogger(), "Connecting to %s", host)
|
||||
|
||||
timeoutDuration := time.Duration(config.CONFIG.ResponseTimeout) * time.Second
|
||||
|
||||
// Establish the underlying TCP connection with context-based cancellation
|
||||
dialer := &net.Dialer{
|
||||
Timeout: timeoutDuration,
|
||||
}
|
||||
|
||||
// Use DialContext to allow cancellation via context
|
||||
conn, err := dialer.DialContext(ctx, "tcp", host)
|
||||
if err != nil {
|
||||
contextlog.LogDebugWithContext(ctx, logging.GetSlogger(), "Failed to establish TCP connection: %v", err)
|
||||
return nil, commonErrors.NewHostError(err)
|
||||
}
|
||||
|
||||
// Make sure we always close the connection
|
||||
defer func() {
|
||||
_ = conn.Close()
|
||||
}()
|
||||
|
||||
// Set read and write timeouts on the TCP connection
|
||||
err = conn.SetReadDeadline(time.Now().Add(timeoutDuration))
|
||||
if err != nil {
|
||||
return nil, commonErrors.NewHostError(err)
|
||||
}
|
||||
err = conn.SetWriteDeadline(time.Now().Add(timeoutDuration))
|
||||
if err != nil {
|
||||
return nil, commonErrors.NewHostError(err)
|
||||
}
|
||||
|
||||
// Check if the context has been canceled before proceeding with TLS handshake
|
||||
if err := ctx.Err(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// Perform the TLS handshake
|
||||
tlsConfig := &tls.Config{
|
||||
InsecureSkipVerify: true, //nolint:gosec // Accept all TLS certs, even if insecure.
|
||||
ServerName: parsedURL.Hostname(), // SNI says we should not include port in hostname
|
||||
}
|
||||
|
||||
tlsConn := tls.Client(conn, tlsConfig)
|
||||
err = tlsConn.SetReadDeadline(time.Now().Add(timeoutDuration))
|
||||
if err != nil {
|
||||
return nil, commonErrors.NewHostError(err)
|
||||
}
|
||||
err = tlsConn.SetWriteDeadline(time.Now().Add(timeoutDuration))
|
||||
if err != nil {
|
||||
return nil, commonErrors.NewHostError(err)
|
||||
}
|
||||
|
||||
// Check if the context is done before attempting handshake
|
||||
if err := ctx.Err(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// Perform TLS handshake with regular method
|
||||
// (HandshakeContext is only available in Go 1.17+)
|
||||
err = tlsConn.Handshake()
|
||||
if err != nil {
|
||||
contextlog.LogErrorWithContext(ctx, logging.GetSlogger(), "TLS handshake failed: %v", err)
|
||||
return nil, commonErrors.NewHostError(err)
|
||||
}
|
||||
|
||||
// Check again if the context is done after handshake
|
||||
if err := ctx.Err(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// We read `buf`-sized chunks and add data to `data`
|
||||
buf := make([]byte, 4096)
|
||||
var data []byte
|
||||
|
||||
// Check if the context has been canceled before sending request
|
||||
if err := ctx.Err(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// Send Gemini request to trigger server response
|
||||
// Fix for stupid server bug:
|
||||
// Some servers return 'Header: 53 No proxying to other hosts or ports!'
|
||||
// when the port is 1965 and is still specified explicitly in the URL.
|
||||
url2, _ := _url.ParseURL(url, "", true)
|
||||
_, err = tlsConn.Write([]byte(fmt.Sprintf("%s\r\n", url2.StringNoDefaultPort())))
|
||||
if err != nil {
|
||||
contextlog.LogErrorWithContext(ctx, logging.GetSlogger(), "Failed to send request: %v", err)
|
||||
return nil, commonErrors.NewHostError(err)
|
||||
}
|
||||
|
||||
contextlog.LogDebugWithContext(ctx, logging.GetSlogger(), "Request sent, reading response")
|
||||
|
||||
// Read response bytes in len(buf) byte chunks
|
||||
for {
|
||||
// Check if the context has been canceled before each read
|
||||
if err := ctx.Err(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
n, err := tlsConn.Read(buf)
|
||||
if n > 0 {
|
||||
data = append(data, buf[:n]...)
|
||||
}
|
||||
if len(data) > config.CONFIG.MaxResponseSize {
|
||||
contextlog.LogDebugWithContext(ctx, logging.GetSlogger(), "Response too large (max: %d bytes)", config.CONFIG.MaxResponseSize)
|
||||
return nil, commonErrors.NewHostError(fmt.Errorf("response too large"))
|
||||
}
|
||||
if err != nil {
|
||||
if errors.Is(err, io.EOF) {
|
||||
break
|
||||
}
|
||||
contextlog.LogDebugWithContext(ctx, logging.GetSlogger(), "Error reading data: %v", err)
|
||||
return nil, commonErrors.NewHostError(err)
|
||||
}
|
||||
}
|
||||
|
||||
contextlog.LogDebugWithContext(ctx, logging.GetSlogger(), "Received %d bytes of data", len(data))
|
||||
return data, nil
|
||||
}
|
||||
|
||||
// ProcessDataWithContext is a context-aware version of ProcessData that processes
|
||||
// the raw data from a Gemini response and populates the Snapshot.
|
||||
func ProcessDataWithContext(ctx context.Context, s snapshot.Snapshot, data []byte) (*snapshot.Snapshot, error) {
|
||||
// Create a processing-specific context with the "process" component
|
||||
processCtx := contextutil.ContextWithComponent(ctx, "process")
|
||||
|
||||
contextlog.LogDebugWithContext(processCtx, logging.GetSlogger(), "Processing Gemini response data (%d bytes)", len(data))
|
||||
|
||||
header, body, err := getHeadersAndData(data)
|
||||
if err != nil {
|
||||
contextlog.LogErrorWithContext(processCtx, logging.GetSlogger(), "Failed to extract headers: %v", err)
|
||||
return &s, err
|
||||
}
|
||||
|
||||
code, mimeType, lang := getMimeTypeAndLang(header)
|
||||
contextlog.LogDebugWithContext(processCtx, logging.GetSlogger(), "Response code: %d, MimeType: %s, Lang: %s", code, mimeType, lang)
|
||||
|
||||
if code != 0 {
|
||||
s.ResponseCode = null.IntFrom(int64(code))
|
||||
}
|
||||
if header != "" {
|
||||
s.Header = null.StringFrom(header)
|
||||
}
|
||||
if mimeType != "" {
|
||||
s.MimeType = null.StringFrom(mimeType)
|
||||
}
|
||||
if lang != "" {
|
||||
s.Lang = null.StringFrom(lang)
|
||||
}
|
||||
|
||||
// If we've got a Gemini document, populate
|
||||
// `GemText` field, otherwise raw data goes to `Data`.
|
||||
if mimeType == "text/gemini" {
|
||||
validBody, err := BytesToValidUTF8(body)
|
||||
if err != nil {
|
||||
contextlog.LogErrorWithContext(processCtx, logging.GetSlogger(), "UTF-8 validation failed: %v", err)
|
||||
return nil, err
|
||||
}
|
||||
s.GemText = null.StringFrom(validBody)
|
||||
contextlog.LogDebugWithContext(processCtx, logging.GetSlogger(), "Processed gemtext content (%d characters)", len(validBody))
|
||||
} else {
|
||||
s.Data = null.ValueFrom(body)
|
||||
contextlog.LogDebugWithContext(processCtx, logging.GetSlogger(), "Stored binary data (%d bytes)", len(body))
|
||||
}
|
||||
|
||||
return &s, nil
|
||||
}
|
||||
@@ -135,17 +135,7 @@ func TestProcessData(t *testing.T) {
|
||||
for _, test := range tests {
|
||||
t.Run(test.name, func(t *testing.T) {
|
||||
s := snapshot.Snapshot{}
|
||||
result, err := ProcessData(s, test.inputData)
|
||||
|
||||
if test.expectedError && err == nil {
|
||||
t.Errorf("Expected error, got nil")
|
||||
return
|
||||
}
|
||||
|
||||
if !test.expectedError && err != nil {
|
||||
t.Errorf("Unexpected error: %v", err)
|
||||
return
|
||||
}
|
||||
result := UpdateSnapshotWithData(s, test.inputData)
|
||||
|
||||
if test.expectedError {
|
||||
return
|
||||
|
||||
@@ -7,6 +7,7 @@ import (
|
||||
"io"
|
||||
"unicode/utf8"
|
||||
|
||||
"gemini-grc/config"
|
||||
"git.antanst.com/antanst/xerrors"
|
||||
"golang.org/x/text/encoding/charmap"
|
||||
"golang.org/x/text/encoding/japanese"
|
||||
@@ -23,11 +24,16 @@ func BytesToValidUTF8(input []byte) (string, error) {
|
||||
if len(input) == 0 {
|
||||
return "", nil
|
||||
}
|
||||
const maxSize = 10 * 1024 * 1024 // 10MB
|
||||
if len(input) > maxSize {
|
||||
return "", xerrors.NewError(fmt.Errorf("%w: %d bytes (max %d)", ErrInputTooLarge, len(input), maxSize), 0, "", false)
|
||||
|
||||
maxSize := config.CONFIG.MaxResponseSize
|
||||
if maxSize == 0 {
|
||||
maxSize = 1024 * 1024 // Default 1MB for tests
|
||||
}
|
||||
// remove NULL byte 0x00 (ReplaceAll accepts slices)
|
||||
if len(input) > maxSize {
|
||||
return "", xerrors.NewError(fmt.Errorf("BytesToValidUTF8: %w: %d bytes (max %d)", ErrInputTooLarge, len(input), maxSize), 0, "", false)
|
||||
}
|
||||
|
||||
// Always remove NULL bytes first (before UTF-8 validity check)
|
||||
inputNoNull := bytes.ReplaceAll(input, []byte{byte(0)}, []byte{})
|
||||
if utf8.Valid(inputNoNull) {
|
||||
return string(inputNoNull), nil
|
||||
@@ -42,6 +48,8 @@ func BytesToValidUTF8(input []byte) (string, error) {
|
||||
japanese.EUCJP.NewDecoder(), // Japanese
|
||||
korean.EUCKR.NewDecoder(), // Korean
|
||||
}
|
||||
|
||||
// Still invalid Unicode. Try some encodings to convert to.
|
||||
// First successful conversion wins.
|
||||
var lastErr error
|
||||
for _, encoding := range encodings {
|
||||
@@ -56,5 +64,5 @@ func BytesToValidUTF8(input []byte) (string, error) {
|
||||
}
|
||||
}
|
||||
|
||||
return "", xerrors.NewError(fmt.Errorf("%w (tried %d encodings): %w", ErrUTF8Conversion, len(encodings), lastErr), 0, "", false)
|
||||
return "", xerrors.NewError(fmt.Errorf("BytesToValidUTF8: %w (tried %d encodings): %w", ErrUTF8Conversion, len(encodings), lastErr), 0, "", false)
|
||||
}
|
||||
|
||||
5
go.mod
5
go.mod
@@ -3,8 +3,9 @@ module gemini-grc
|
||||
go 1.24.3
|
||||
|
||||
require (
|
||||
git.antanst.com/antanst/logging v0.0.1
|
||||
git.antanst.com/antanst/uid v0.0.1
|
||||
git.antanst.com/antanst/xerrors v0.0.1
|
||||
git.antanst.com/antanst/xerrors v0.0.2
|
||||
github.com/guregu/null/v5 v5.0.0
|
||||
github.com/jackc/pgx/v5 v5.7.2
|
||||
github.com/jmoiron/sqlx v1.4.0
|
||||
@@ -29,3 +30,5 @@ require (
|
||||
replace git.antanst.com/antanst/xerrors => ../xerrors
|
||||
|
||||
replace git.antanst.com/antanst/uid => ../uid
|
||||
|
||||
replace git.antanst.com/antanst/logging => ../logging
|
||||
|
||||
@@ -12,7 +12,7 @@ import (
|
||||
|
||||
commonErrors "gemini-grc/common/errors"
|
||||
"gemini-grc/config"
|
||||
"gemini-grc/logging"
|
||||
"git.antanst.com/antanst/logging"
|
||||
"git.antanst.com/antanst/xerrors"
|
||||
)
|
||||
|
||||
|
||||
@@ -18,7 +18,7 @@ import (
|
||||
_url "gemini-grc/common/url"
|
||||
"gemini-grc/config"
|
||||
"gemini-grc/contextutil"
|
||||
"gemini-grc/logging"
|
||||
"git.antanst.com/antanst/logging"
|
||||
"git.antanst.com/antanst/xerrors"
|
||||
"github.com/guregu/null/v5"
|
||||
)
|
||||
@@ -30,8 +30,6 @@ func VisitWithContext(ctx context.Context, url string) (*snapshot.Snapshot, erro
|
||||
// Create a gopher-specific context with the "gopher" component
|
||||
gopherCtx := contextutil.ContextWithComponent(ctx, "gopher")
|
||||
|
||||
contextlog.LogDebugWithContext(gopherCtx, logging.GetSlogger(), "Visiting Gopher URL: %s", url)
|
||||
|
||||
if !config.CONFIG.GopherEnable {
|
||||
contextlog.LogDebugWithContext(gopherCtx, logging.GetSlogger(), "Gopher protocol is disabled")
|
||||
return nil, nil
|
||||
|
||||
@@ -8,7 +8,8 @@ import (
|
||||
|
||||
"gemini-grc/common/contextlog"
|
||||
"gemini-grc/contextutil"
|
||||
"gemini-grc/logging"
|
||||
"git.antanst.com/antanst/logging"
|
||||
"git.antanst.com/antanst/xerrors"
|
||||
)
|
||||
|
||||
var hostPool = HostPool{hostnames: make(map[string]struct{})}
|
||||
@@ -20,18 +21,13 @@ type HostPool struct {
|
||||
|
||||
// RemoveHostFromPool removes a host from the pool with context awareness
|
||||
func RemoveHostFromPool(ctx context.Context, key string) {
|
||||
// Create a hostPool-specific context
|
||||
hostCtx := contextutil.ContextWithComponent(ctx, "hostPool")
|
||||
|
||||
contextlog.LogDebugWithContext(hostCtx, logging.GetSlogger(), "Removing host %s from pool", key)
|
||||
|
||||
hostPool.lock.Lock()
|
||||
delete(hostPool.hostnames, key)
|
||||
hostPool.lock.Unlock()
|
||||
|
||||
// Add some jitter
|
||||
time.Sleep(time.Duration(rand.Intn(1000)) * time.Millisecond)
|
||||
|
||||
contextlog.LogDebugWithContext(hostCtx, logging.GetSlogger(), "Host %s removed from pool", key)
|
||||
}
|
||||
|
||||
@@ -41,18 +37,18 @@ func AddHostToHostPool(ctx context.Context, key string) error {
|
||||
// Create a hostPool-specific context
|
||||
hostCtx := contextutil.ContextWithComponent(ctx, "hostPool")
|
||||
|
||||
contextlog.LogDebugWithContext(hostCtx, logging.GetSlogger(), "Adding host %s to pool", key)
|
||||
|
||||
// Use a ticker to periodically check if we can add the host
|
||||
ticker := time.NewTicker(500 * time.Millisecond)
|
||||
defer ticker.Stop()
|
||||
|
||||
// We continuously poll the pool,
|
||||
// and if the host isn't already
|
||||
// there, we add it.
|
||||
for {
|
||||
// Check if context is done before attempting to acquire lock
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
contextlog.LogDebugWithContext(hostCtx, logging.GetSlogger(), "Context canceled while waiting to add host %s", key)
|
||||
return ctx.Err()
|
||||
return xerrors.NewSimpleError(ctx.Err())
|
||||
default:
|
||||
// Continue with attempt to add host
|
||||
}
|
||||
@@ -67,15 +63,13 @@ func AddHostToHostPool(ctx context.Context, key string) error {
|
||||
}
|
||||
hostPool.lock.Unlock()
|
||||
|
||||
contextlog.LogDebugWithContext(hostCtx, logging.GetSlogger(), "Host %s busy, waiting...", key)
|
||||
|
||||
// Wait for next tick or context cancellation
|
||||
select {
|
||||
case <-ticker.C:
|
||||
// Try again on next tick
|
||||
case <-ctx.Done():
|
||||
contextlog.LogDebugWithContext(hostCtx, logging.GetSlogger(), "Context canceled while waiting for host %s", key)
|
||||
return ctx.Err()
|
||||
return xerrors.NewSimpleError(ctx.Err())
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,188 +0,0 @@
|
||||
// Package logging provides a simple, structured logging interface using slog.
|
||||
// It offers colored output for better readability in terminal environments.
|
||||
package logging
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"io"
|
||||
"log/slog"
|
||||
"os"
|
||||
"strings"
|
||||
"sync"
|
||||
)
|
||||
|
||||
// Global logger instance.
|
||||
var slogLogger *slog.Logger
|
||||
|
||||
// Current log level - used to filter logs.
|
||||
var currentLogLevel = slog.LevelInfo
|
||||
|
||||
// ANSI color codes for terminal output.
|
||||
const (
|
||||
colorReset = "\033[0m"
|
||||
levelDebug = "\033[37m" // Gray
|
||||
levelInfo = "\033[32m" // Green
|
||||
levelWarn = "\033[33m" // Yellow
|
||||
levelError = "\033[31m" // Red
|
||||
)
|
||||
|
||||
// Standard helper functions for logging
|
||||
func LogDebug(format string, args ...interface{}) {
|
||||
if slogLogger != nil {
|
||||
slogLogger.Debug(fmt.Sprintf(format, args...))
|
||||
}
|
||||
}
|
||||
|
||||
func LogInfo(format string, args ...interface{}) {
|
||||
if slogLogger != nil {
|
||||
slogLogger.Info(fmt.Sprintf(format, args...))
|
||||
}
|
||||
}
|
||||
|
||||
func LogWarn(format string, args ...interface{}) {
|
||||
if slogLogger != nil {
|
||||
slogLogger.Warn(fmt.Sprintf(format, args...))
|
||||
}
|
||||
}
|
||||
|
||||
func LogError(format string, args ...interface{}) {
|
||||
if slogLogger != nil {
|
||||
msg := fmt.Sprintf(format, args...)
|
||||
slogLogger.Error(msg, slog.String("error", msg))
|
||||
}
|
||||
}
|
||||
|
||||
// InitSlogger initializes the slog logger with custom handler.
|
||||
func InitSlogger(level slog.Level) {
|
||||
// Set the global log level
|
||||
currentLogLevel = level
|
||||
|
||||
// Create the handler with color support
|
||||
baseHandler := NewColorHandler(os.Stderr)
|
||||
|
||||
// Create and set the new logger
|
||||
slogLogger = slog.New(baseHandler)
|
||||
|
||||
// Set as default logger
|
||||
slog.SetDefault(slogLogger)
|
||||
|
||||
// Print a startup message to verify logging is working
|
||||
slogLogger.Info("Slog initialized", "level", level.String())
|
||||
}
|
||||
|
||||
// GetSlogger returns the current global slog logger instance.
|
||||
// Can be used by other packages
|
||||
func GetSlogger() *slog.Logger {
|
||||
if slogLogger == nil {
|
||||
return slog.Default()
|
||||
}
|
||||
return slogLogger
|
||||
}
|
||||
|
||||
// ColorHandler formats logs with colors for better terminal readability
|
||||
type ColorHandler struct {
|
||||
out io.Writer
|
||||
mu *sync.Mutex
|
||||
attrs []slog.Attr // Store attributes for this handler
|
||||
}
|
||||
|
||||
// NewColorHandler creates a new handler that writes colored logs to the provided writer
|
||||
func NewColorHandler(w io.Writer) *ColorHandler {
|
||||
if w == nil {
|
||||
w = os.Stderr
|
||||
}
|
||||
return &ColorHandler{
|
||||
out: w,
|
||||
mu: &sync.Mutex{},
|
||||
attrs: make([]slog.Attr, 0),
|
||||
}
|
||||
}
|
||||
|
||||
// Enabled checks if the given log level is enabled
|
||||
func (h *ColorHandler) Enabled(_ context.Context, level slog.Level) bool {
|
||||
return level >= currentLogLevel
|
||||
}
|
||||
|
||||
// Handle processes a log record, formatting it with colors
|
||||
func (h *ColorHandler) Handle(ctx context.Context, r slog.Record) error {
|
||||
h.mu.Lock()
|
||||
defer h.mu.Unlock()
|
||||
|
||||
// Format time
|
||||
timeStr := fmt.Sprintf("[%s]", r.Time.Format("2006-01-02 15:04:05"))
|
||||
|
||||
// Format level
|
||||
var levelStr string
|
||||
switch r.Level {
|
||||
case slog.LevelDebug:
|
||||
levelStr = fmt.Sprintf("%sDEBUG%s", levelDebug, colorReset)
|
||||
case slog.LevelInfo:
|
||||
levelStr = fmt.Sprintf("%sINFO%s", levelInfo, colorReset)
|
||||
case slog.LevelWarn:
|
||||
levelStr = fmt.Sprintf("%sWARN%s", levelWarn, colorReset)
|
||||
case slog.LevelError:
|
||||
levelStr = fmt.Sprintf("%sERROR%s", levelError, colorReset)
|
||||
default:
|
||||
levelStr = r.Level.String()
|
||||
}
|
||||
|
||||
// Build prefix
|
||||
prefix := fmt.Sprintf("%s %s ", timeStr, levelStr)
|
||||
|
||||
// Format message - we'll collect any special fields separately
|
||||
attrMap := make(map[string]string)
|
||||
|
||||
// First collect attributes from the handler itself
|
||||
for _, attr := range h.attrs {
|
||||
attrMap[attr.Key] = attr.Value.String()
|
||||
}
|
||||
|
||||
// Then extract from record attributes, which might override handler attributes
|
||||
r.Attrs(func(a slog.Attr) bool {
|
||||
attrMap[a.Key] = a.Value.String()
|
||||
return true
|
||||
})
|
||||
|
||||
// Format message with attributes on the same line
|
||||
msg := fmt.Sprintf("%s%s", prefix, r.Message)
|
||||
|
||||
// Add attributes to the same line if present
|
||||
if len(attrMap) > 0 {
|
||||
// Add a space after the message
|
||||
msg += " "
|
||||
|
||||
// Build attribute string
|
||||
attrs := make([]string, 0, len(attrMap))
|
||||
for k, v := range attrMap {
|
||||
attrs = append(attrs, fmt.Sprintf("%s=%s", k, v))
|
||||
}
|
||||
|
||||
// Join all attributes with spaces
|
||||
msg += strings.Join(attrs, " ")
|
||||
}
|
||||
|
||||
// Add newline at the end
|
||||
msg += "\n"
|
||||
|
||||
// Write to output
|
||||
_, err := io.WriteString(h.out, msg)
|
||||
return err
|
||||
}
|
||||
|
||||
// WithGroup returns a new Handler that inherits from this Handler
|
||||
func (h *ColorHandler) WithGroup(name string) slog.Handler {
|
||||
return h // For simplicity, we don't support groups
|
||||
}
|
||||
|
||||
// WithAttrs returns a new Handler whose attributes include both
|
||||
// the receiver's attributes and the arguments
|
||||
func (h *ColorHandler) WithAttrs(attrs []slog.Attr) slog.Handler {
|
||||
// Create a new handler with the same output but additional attributes
|
||||
newHandler := &ColorHandler{
|
||||
out: h.out,
|
||||
mu: h.mu,
|
||||
attrs: append(append([]slog.Attr{}, h.attrs...), attrs...),
|
||||
}
|
||||
return newHandler
|
||||
}
|
||||
28
misc/sql/README.md
Normal file
28
misc/sql/README.md
Normal file
@@ -0,0 +1,28 @@
|
||||
# SQL Queries for Snapshot Analysis
|
||||
|
||||
This directory contains SQL queries to analyze snapshot data in the gemini-grc database.
|
||||
|
||||
## Usage
|
||||
|
||||
You can run these queries directly from psql using the `\i` directive:
|
||||
|
||||
```
|
||||
\i misc/sql/snapshots_per_url.sql
|
||||
```
|
||||
|
||||
## Available Queries
|
||||
|
||||
- **snapshots_per_url.sql** - Basic count of snapshots per URL
|
||||
- **snapshots_date_range.sql** - Shows snapshot count with date range information for each URL
|
||||
- **host_snapshot_stats.sql** - Groups snapshots by hosts and shows URLs with multiple snapshots
|
||||
- **content_changes.sql** - Finds URLs with the most content changes between consecutive snapshots
|
||||
- **snapshot_distribution.sql** - Shows the distribution of snapshots per URL (how many URLs have 1, 2, 3, etc. snapshots)
|
||||
- **recent_snapshot_activity.sql** - Shows URLs with most snapshots in the last 7 days
|
||||
- **storage_efficiency.sql** - Shows potential storage savings from deduplication
|
||||
- **snapshots_by_timeframe.sql** - Shows snapshot count by timeframe (day, week, month)
|
||||
|
||||
## Notes
|
||||
|
||||
- These queries are designed to work with PostgreSQL and the gemini-grc database schema
|
||||
- Some queries may be resource-intensive on large databases
|
||||
- The results can help optimize storage and understand the effectiveness of the versioned snapshot feature
|
||||
26
misc/sql/content_changes.sql
Normal file
26
misc/sql/content_changes.sql
Normal file
@@ -0,0 +1,26 @@
|
||||
-- File: content_changes.sql
|
||||
-- Finds URLs with the most content changes between consecutive snapshots
|
||||
-- Usage: \i misc/sql/content_changes.sql
|
||||
|
||||
WITH snapshot_changes AS (
|
||||
SELECT
|
||||
s1.url,
|
||||
s1.timestamp as prev_timestamp,
|
||||
s2.timestamp as next_timestamp,
|
||||
s1.gemtext IS DISTINCT FROM s2.gemtext as gemtext_changed,
|
||||
s1.data IS DISTINCT FROM s2.data as data_changed
|
||||
FROM snapshots s1
|
||||
JOIN snapshots s2 ON s1.url = s2.url AND s1.timestamp < s2.timestamp
|
||||
WHERE NOT EXISTS (
|
||||
SELECT 1 FROM snapshots s3
|
||||
WHERE s3.url = s1.url AND s1.timestamp < s3.timestamp AND s3.timestamp < s2.timestamp
|
||||
)
|
||||
)
|
||||
SELECT
|
||||
url,
|
||||
COUNT(*) + 1 as snapshot_count,
|
||||
SUM(CASE WHEN gemtext_changed OR data_changed THEN 1 ELSE 0 END) as content_changes
|
||||
FROM snapshot_changes
|
||||
GROUP BY url
|
||||
HAVING COUNT(*) + 1 > 1
|
||||
ORDER BY content_changes DESC, snapshot_count DESC;
|
||||
30
misc/sql/crawl_top_level.sql
Normal file
30
misc/sql/crawl_top_level.sql
Normal file
@@ -0,0 +1,30 @@
|
||||
BEGIN;
|
||||
|
||||
WITH matching_urls AS (
|
||||
SELECT url, host
|
||||
FROM snapshots
|
||||
WHERE url ~ '^gemini://[^/]+/$'
|
||||
AND timestamp < (NOW() - INTERVAL '1 week')
|
||||
ORDER BY random()
|
||||
LIMIT 500
|
||||
)
|
||||
INSERT INTO urls (url, host)
|
||||
SELECT url, host
|
||||
FROM matching_urls
|
||||
ON CONFLICT DO NOTHING;
|
||||
|
||||
-- WITH matching_urls AS (
|
||||
-- SELECT url, host
|
||||
-- FROM snapshots
|
||||
-- WHERE url ~ '^gemini://[^/]+/$'
|
||||
-- AND timestamp < (NOW() - INTERVAL '1 week')
|
||||
-- ORDER BY random()
|
||||
-- LIMIT 500
|
||||
-- )
|
||||
-- DELETE FROM snapshots
|
||||
-- WHERE url IN (
|
||||
-- SELECT url
|
||||
-- FROM matching_urls
|
||||
-- );
|
||||
|
||||
COMMIT;
|
||||
20
misc/sql/host_snapshot_stats.sql
Normal file
20
misc/sql/host_snapshot_stats.sql
Normal file
@@ -0,0 +1,20 @@
|
||||
-- File: host_snapshot_stats.sql
|
||||
-- Groups snapshots by hosts and shows URLs with multiple snapshots
|
||||
-- Usage: \i misc/sql/host_snapshot_stats.sql
|
||||
|
||||
SELECT
|
||||
host,
|
||||
COUNT(DISTINCT url) as unique_urls,
|
||||
SUM(CASE WHEN url_count > 1 THEN 1 ELSE 0 END) as urls_with_multiple_snapshots,
|
||||
SUM(snapshot_count) as total_snapshots
|
||||
FROM (
|
||||
SELECT
|
||||
host,
|
||||
url,
|
||||
COUNT(*) as snapshot_count,
|
||||
COUNT(*) OVER (PARTITION BY url) as url_count
|
||||
FROM snapshots
|
||||
GROUP BY host, url
|
||||
) subquery
|
||||
GROUP BY host
|
||||
ORDER BY total_snapshots DESC;
|
||||
1
misc/sql/mark_urls_processed_false.sql
Normal file
1
misc/sql/mark_urls_processed_false.sql
Normal file
@@ -0,0 +1 @@
|
||||
update urls set being_processed=false where being_processed is true;
|
||||
13
misc/sql/recent_snapshot_activity.sql
Normal file
13
misc/sql/recent_snapshot_activity.sql
Normal file
@@ -0,0 +1,13 @@
|
||||
-- File: recent_snapshot_activity.sql
|
||||
-- Shows URLs with most snapshots in the last 7 days
|
||||
-- Usage: \i misc/sql/recent_snapshot_activity.sql
|
||||
|
||||
SELECT
|
||||
url,
|
||||
COUNT(*) as snapshot_count
|
||||
FROM snapshots
|
||||
WHERE timestamp > NOW() - INTERVAL '7 days'
|
||||
GROUP BY url
|
||||
HAVING COUNT(*) > 1
|
||||
ORDER BY snapshot_count DESC
|
||||
LIMIT 20;
|
||||
16
misc/sql/snapshot_distribution.sql
Normal file
16
misc/sql/snapshot_distribution.sql
Normal file
@@ -0,0 +1,16 @@
|
||||
-- File: snapshot_distribution.sql
|
||||
-- Shows the distribution of snapshots per URL (how many URLs have 1, 2, 3, etc. snapshots)
|
||||
-- Usage: \i misc/sql/snapshot_distribution.sql
|
||||
|
||||
WITH counts AS (
|
||||
SELECT url, COUNT(*) as snapshot_count
|
||||
FROM snapshots
|
||||
GROUP BY url
|
||||
)
|
||||
SELECT
|
||||
snapshot_count,
|
||||
COUNT(*) as url_count,
|
||||
ROUND(COUNT(*) * 100.0 / SUM(COUNT(*)) OVER (), 2) as percentage
|
||||
FROM counts
|
||||
GROUP BY snapshot_count
|
||||
ORDER BY snapshot_count;
|
||||
37
misc/sql/snapshots_by_timeframe.sql
Normal file
37
misc/sql/snapshots_by_timeframe.sql
Normal file
@@ -0,0 +1,37 @@
|
||||
-- File: snapshots_by_timeframe.sql
|
||||
-- Shows snapshot count by timeframe (day, week, month)
|
||||
-- Usage: \i misc/sql/snapshots_by_timeframe.sql
|
||||
|
||||
WITH daily_snapshots AS (
|
||||
SELECT
|
||||
date_trunc('day', timestamp) as day,
|
||||
COUNT(*) as snapshot_count,
|
||||
COUNT(DISTINCT url) as unique_urls
|
||||
FROM snapshots
|
||||
GROUP BY day
|
||||
ORDER BY day
|
||||
),
|
||||
weekly_snapshots AS (
|
||||
SELECT
|
||||
date_trunc('week', timestamp) as week,
|
||||
COUNT(*) as snapshot_count,
|
||||
COUNT(DISTINCT url) as unique_urls
|
||||
FROM snapshots
|
||||
GROUP BY week
|
||||
ORDER BY week
|
||||
),
|
||||
monthly_snapshots AS (
|
||||
SELECT
|
||||
date_trunc('month', timestamp) as month,
|
||||
COUNT(*) as snapshot_count,
|
||||
COUNT(DISTINCT url) as unique_urls
|
||||
FROM snapshots
|
||||
GROUP BY month
|
||||
ORDER BY month
|
||||
)
|
||||
SELECT 'Daily' as timeframe, * FROM daily_snapshots
|
||||
UNION ALL
|
||||
SELECT 'Weekly' as timeframe, * FROM weekly_snapshots
|
||||
UNION ALL
|
||||
SELECT 'Monthly' as timeframe, * FROM monthly_snapshots
|
||||
ORDER BY timeframe, day;
|
||||
14
misc/sql/snapshots_date_range.sql
Normal file
14
misc/sql/snapshots_date_range.sql
Normal file
@@ -0,0 +1,14 @@
|
||||
-- File: snapshots_date_range.sql
|
||||
-- Shows snapshot count with date range information for each URL
|
||||
-- Usage: \i misc/sql/snapshots_date_range.sql
|
||||
|
||||
SELECT
|
||||
url,
|
||||
COUNT(*) as snapshot_count,
|
||||
MIN(timestamp) as first_snapshot,
|
||||
MAX(timestamp) as last_snapshot,
|
||||
MAX(timestamp) - MIN(timestamp) as time_span
|
||||
FROM snapshots
|
||||
GROUP BY url
|
||||
HAVING COUNT(*) > 1
|
||||
ORDER BY snapshot_count DESC;
|
||||
8
misc/sql/snapshots_per_url.sql
Normal file
8
misc/sql/snapshots_per_url.sql
Normal file
@@ -0,0 +1,8 @@
|
||||
-- File: snapshots_per_url.sql
|
||||
-- Basic count of snapshots per URL
|
||||
-- Usage: \i misc/sql/snapshots_per_url.sql
|
||||
|
||||
SELECT url, COUNT(*) as snapshot_count
|
||||
FROM snapshots
|
||||
GROUP BY url
|
||||
ORDER BY snapshot_count DESC;
|
||||
20
misc/sql/storage_efficiency.sql
Normal file
20
misc/sql/storage_efficiency.sql
Normal file
@@ -0,0 +1,20 @@
|
||||
-- File: storage_efficiency.sql
|
||||
-- Shows potential storage savings from deduplication
|
||||
-- Usage: \i misc/sql/storage_efficiency.sql
|
||||
|
||||
WITH duplicate_stats AS (
|
||||
SELECT
|
||||
url,
|
||||
COUNT(*) as snapshot_count,
|
||||
COUNT(DISTINCT gemtext) as unique_gemtexts,
|
||||
COUNT(DISTINCT data) as unique_datas
|
||||
FROM snapshots
|
||||
GROUP BY url
|
||||
HAVING COUNT(*) > 1
|
||||
)
|
||||
SELECT
|
||||
SUM(snapshot_count) as total_snapshots,
|
||||
SUM(unique_gemtexts + unique_datas) as unique_contents,
|
||||
SUM(snapshot_count) - SUM(unique_gemtexts + unique_datas) as duplicate_content_count,
|
||||
ROUND((SUM(snapshot_count) - SUM(unique_gemtexts + unique_datas)) * 100.0 / SUM(snapshot_count), 2) as duplicate_percentage
|
||||
FROM duplicate_stats;
|
||||
@@ -7,7 +7,7 @@ import (
|
||||
|
||||
"gemini-grc/common/contextlog"
|
||||
"gemini-grc/contextutil"
|
||||
"gemini-grc/logging"
|
||||
"git.antanst.com/antanst/logging"
|
||||
)
|
||||
|
||||
// ParseRobotsTxt takes robots.txt content and a host, and
|
||||
|
||||
@@ -10,9 +10,10 @@ import (
|
||||
"gemini-grc/common/contextlog"
|
||||
"gemini-grc/common/snapshot"
|
||||
geminiUrl "gemini-grc/common/url"
|
||||
"gemini-grc/config"
|
||||
"gemini-grc/contextutil"
|
||||
"gemini-grc/gemini"
|
||||
"gemini-grc/logging"
|
||||
"git.antanst.com/antanst/logging"
|
||||
)
|
||||
|
||||
// RobotsCache is a map of blocked URLs
|
||||
@@ -38,7 +39,7 @@ func populateRobotsCache(ctx context.Context, key string) (entries []string, _er
|
||||
contextlog.LogDebugWithContext(cacheCtx, logging.GetSlogger(), "Fetching robots.txt from %s", url)
|
||||
|
||||
// Use the context-aware version to honor timeout and cancellation
|
||||
robotsContent, err := gemini.ConnectAndGetDataWithContext(cacheCtx, url)
|
||||
robotsContent, err := gemini.ConnectAndGetData(cacheCtx, url)
|
||||
if err != nil {
|
||||
// Check for context timeout or cancellation specifically
|
||||
if errors.Is(err, context.DeadlineExceeded) || errors.Is(err, context.Canceled) {
|
||||
@@ -59,12 +60,7 @@ func populateRobotsCache(ctx context.Context, key string) (entries []string, _er
|
||||
return []string{}, nil
|
||||
}
|
||||
|
||||
// TODO: Update gemini.ProcessData to accept context
|
||||
s, err = gemini.ProcessData(*s, robotsContent)
|
||||
if err != nil {
|
||||
contextlog.LogDebugWithContext(cacheCtx, logging.GetSlogger(), "robots.txt error: %s", err)
|
||||
return []string{}, nil
|
||||
}
|
||||
s = gemini.UpdateSnapshotWithData(*s, robotsContent)
|
||||
|
||||
if s.ResponseCode.ValueOrZero() != 20 {
|
||||
contextlog.LogDebugWithContext(cacheCtx, logging.GetSlogger(), "robots.txt error code %d, ignoring", s.ResponseCode.ValueOrZero())
|
||||
@@ -91,14 +87,18 @@ func populateRobotsCache(ctx context.Context, key string) (entries []string, _er
|
||||
|
||||
// RobotMatch checks if the snapshot URL matches
|
||||
// a robots.txt allow rule.
|
||||
func RobotMatch(ctx context.Context, u string) (bool, error) {
|
||||
func RobotMatch(ctx context.Context, u string) bool {
|
||||
// Create a context for robots operations
|
||||
robotsCtx := contextutil.ContextWithComponent(ctx, "robotsMatch")
|
||||
|
||||
// TODO Missing Gopher functionality
|
||||
if config.CONFIG.GopherEnable {
|
||||
return false
|
||||
}
|
||||
|
||||
url, err := geminiUrl.ParseURL(u, "", true)
|
||||
if err != nil {
|
||||
contextlog.LogErrorWithContext(robotsCtx, logging.GetSlogger(), "Failed to parse URL: %v", err)
|
||||
return false, err
|
||||
return false
|
||||
}
|
||||
|
||||
key := strings.ToLower(fmt.Sprintf("%s:%d", url.Hostname, url.Port))
|
||||
@@ -112,16 +112,7 @@ func RobotMatch(ctx context.Context, u string) (bool, error) {
|
||||
var fetchErr error
|
||||
disallowedURLs, fetchErr = populateRobotsCache(ctx, key)
|
||||
if fetchErr != nil {
|
||||
contextlog.LogDebugWithContext(robotsCtx, logging.GetSlogger(), "Error populating robots.txt cache for %s: %v", key, fetchErr)
|
||||
|
||||
// Handle context timeouts by propagating the error
|
||||
if errors.Is(fetchErr, context.DeadlineExceeded) || errors.Is(fetchErr, context.Canceled) {
|
||||
contextlog.LogDebugWithContext(robotsCtx, logging.GetSlogger(), "Timeout or cancellation while checking robots.txt")
|
||||
return false, fetchErr
|
||||
}
|
||||
|
||||
// For other errors, assume we can proceed without robots.txt
|
||||
return false, nil
|
||||
return false
|
||||
}
|
||||
if len(disallowedURLs) > 0 {
|
||||
contextlog.LogDebugWithContext(robotsCtx, logging.GetSlogger(), "Added to robots.txt cache: %v => %v", key, disallowedURLs)
|
||||
@@ -137,7 +128,7 @@ func RobotMatch(ctx context.Context, u string) (bool, error) {
|
||||
}
|
||||
contextlog.LogDebugWithContext(robotsCtx, logging.GetSlogger(), "Found %d disallowed paths in robots.txt cache for %s", len(disallowedURLs), key)
|
||||
}
|
||||
return isURLblocked(ctx, disallowedURLs, url.Full), nil
|
||||
return isURLblocked(ctx, disallowedURLs, url.Full)
|
||||
}
|
||||
|
||||
// Initialize initializes the robots.txt match package
|
||||
@@ -157,12 +148,9 @@ func isURLblocked(ctx context.Context, disallowedURLs []string, input string) bo
|
||||
blockCtx := contextutil.ContextWithComponent(ctx, "robotsMatch.isURLblocked")
|
||||
|
||||
inputLower := strings.ToLower(input)
|
||||
contextlog.LogDebugWithContext(blockCtx, logging.GetSlogger(), "Checking URL against robots.txt rules: %s", input)
|
||||
|
||||
for _, url := range disallowedURLs {
|
||||
urlLower := strings.ToLower(url)
|
||||
contextlog.LogDebugWithContext(blockCtx, logging.GetSlogger(), "Comparing against rule: %s (lower: %s vs %s)", url, inputLower, urlLower)
|
||||
|
||||
if strings.HasPrefix(inputLower, urlLower) {
|
||||
contextlog.LogDebugWithContext(blockCtx, logging.GetSlogger(), "MATCH! robots.txt rule: %s blocks URL: %s", url, input)
|
||||
return true
|
||||
|
||||
@@ -2,7 +2,6 @@ package robotsMatch
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"sync"
|
||||
"testing"
|
||||
|
||||
@@ -32,15 +31,7 @@ func TestRobotMatch_EmptyCache(t *testing.T) {
|
||||
|
||||
// For empty cache or DNS errors, RobotMatch should return false (allow the URL) without an error
|
||||
ctx := context.Background()
|
||||
blocked, err := RobotMatch(ctx, "gemini://nonexistent.example.com/")
|
||||
// We expect no error for non-existent host because we changed our error handling
|
||||
// to be more tolerant of DNS/connectivity issues
|
||||
if err != nil {
|
||||
// The only errors we should get are context-related (timeout, cancellation)
|
||||
if !errors.Is(err, context.DeadlineExceeded) && !errors.Is(err, context.Canceled) {
|
||||
t.Errorf("Expected nil error for non-existent host, got: %v", err)
|
||||
}
|
||||
}
|
||||
blocked := RobotMatch(ctx, "gemini://nonexistent.example.com/")
|
||||
|
||||
// The URL should be allowed (not blocked) when robots.txt can't be fetched
|
||||
if blocked {
|
||||
|
||||
10
seed_urls.txt
Normal file
10
seed_urls.txt
Normal file
@@ -0,0 +1,10 @@
|
||||
gemini://geminiprotocol.net/
|
||||
gemini://warmedal.se/~antenna/
|
||||
gemini://skyjake.fi/~Cosmos/
|
||||
gemini://gemini.circumlunar.space/capcom/
|
||||
gemini://auragem.letz.dev/
|
||||
gemini://gemplex.space/
|
||||
gemini://kennedy.gemi.dev/
|
||||
gemini://tlgs.one/
|
||||
gemini://yesterday.gemlog.org/
|
||||
gemini://gemini.cyberbot.space/feed.gmi
|
||||
22
test.txt
Normal file
22
test.txt
Normal file
@@ -0,0 +1,22 @@
|
||||
# Test redirect full url:
|
||||
gemini://gemini.circumlunar.space
|
||||
|
||||
# Test blacklist:
|
||||
gemi.dev
|
||||
|
||||
# Test robots disallow:
|
||||
gemini://tlgs.one/search?aa
|
||||
|
||||
# Test TLS cert required:
|
||||
gemini://astrobotany.mozz.us/app/plant
|
||||
// 31 redirect
|
||||
gemini://gemini.circumlunar.space
|
||||
|
||||
// body with null byte
|
||||
gemini://kennedy.gemi.dev/archive/cached?url=gemini://spam.works/mirrors/textfiles/fun/consult.how&t=638427244900000000&raw=False
|
||||
|
||||
// has invalid url
|
||||
gemini://tlgs.one/known-hosts
|
||||
|
||||
// Needs SNI TLS info (our bug)
|
||||
gemini://hanzbrix.pollux.casa/gemlog/20241002.gmi
|
||||
Reference in New Issue
Block a user