Add configuration via env vars
This commit is contained in:
103
main.go
103
main.go
@@ -1,99 +1,87 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"math/rand/v2"
|
||||
"os"
|
||||
"sync"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/rs/zerolog"
|
||||
zlog "github.com/rs/zerolog/log"
|
||||
)
|
||||
|
||||
const ROOTPATH string = "./a"
|
||||
|
||||
func main() {
|
||||
config := *getConfig()
|
||||
zerolog.TimeFieldFormat = zerolog.TimeFormatUnix
|
||||
zerolog.SetGlobalLevel(zerolog.DebugLevel)
|
||||
//zerolog.SetGlobalLevel(zerolog.InfoLevel)
|
||||
zerolog.SetGlobalLevel(config.logLevel)
|
||||
zlog.Logger = zlog.Output(zerolog.ConsoleWriter{Out: os.Stderr, TimeFormat: "[2006-01-02 15:04:05]"})
|
||||
if err := runApp(); err != nil {
|
||||
if err := runApp(&config); err != nil {
|
||||
LogError("Application error: %w", err)
|
||||
os.Exit(1)
|
||||
}
|
||||
}
|
||||
|
||||
func runApp() error {
|
||||
//urls := []string{"gemini://smol.gr"}
|
||||
urls := []string{"gemini://smol.gr", "gemini://gmi.noulin.net/"}
|
||||
func runApp(config *Config) error {
|
||||
// urls := []string{"gemini://smol.gr"}
|
||||
urls := []string{"gemini://gmi.noulin.net/", "gemini://warmedal.se/~antenna/"}
|
||||
|
||||
queue := make(chan string)
|
||||
queue := make(chan string, 10000)
|
||||
results := make(chan Snapshot, 100)
|
||||
done := make(chan struct{})
|
||||
|
||||
// Start the crawler.
|
||||
go crawler(queue, done)
|
||||
go spawnStats(queue, results)
|
||||
go resultsHandler(queue, results)
|
||||
spawnWorkers(config, queue, results)
|
||||
|
||||
// Send URLs to the queue
|
||||
for _, url := range urls {
|
||||
// Send URL to queue; blocks until crawler receives it
|
||||
queue <- url
|
||||
}
|
||||
|
||||
// All URLs have been sent and received
|
||||
// because queue is unbuffered; safe to close the queue
|
||||
close(queue)
|
||||
|
||||
// Wait until crawler signals finish
|
||||
<-done
|
||||
return nil
|
||||
}
|
||||
|
||||
func crawler(queue <-chan string, done chan struct{}) {
|
||||
// Start processing results.
|
||||
results := make(chan Snapshot)
|
||||
resultsDone := make(chan struct{})
|
||||
go resultsHandler(results, resultsDone)
|
||||
|
||||
// Create workers that consume the queue channel,
|
||||
// and send their result to results channel.
|
||||
workers := 3
|
||||
LogInfo("Spawning %d workers", workers)
|
||||
var wg sync.WaitGroup
|
||||
// Start worker goroutines
|
||||
for range workers {
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
worker(queue, results)
|
||||
wg.Done()
|
||||
}()
|
||||
func spawnStats(queue chan string, results chan Snapshot) {
|
||||
ticker := time.NewTicker(time.Duration(time.Second * 10))
|
||||
defer ticker.Stop()
|
||||
for range ticker.C {
|
||||
LogInfo("Queue length: %d\n", len(queue))
|
||||
LogInfo("Results length: %d\n", len(results))
|
||||
}
|
||||
|
||||
// Wait until all workers have finished.
|
||||
wg.Wait()
|
||||
LogInfo("All workers have finished")
|
||||
|
||||
// Nobody left to send to results, so we
|
||||
// close it, and the SnapshotsProcessor can
|
||||
// finish
|
||||
close(results)
|
||||
<-resultsDone
|
||||
|
||||
close(done)
|
||||
}
|
||||
|
||||
func resultsHandler(results <-chan Snapshot, done chan struct{}) {
|
||||
func spawnWorkers(config *Config, queue <-chan string, results chan Snapshot) {
|
||||
workers := config.numOfWorkers
|
||||
LogInfo("Spawning %d workers", workers)
|
||||
// Start worker goroutines
|
||||
for i := 0; i < workers; i++ {
|
||||
go func(i int) {
|
||||
worker(i, config.rootPath, queue, results)
|
||||
}(i)
|
||||
}
|
||||
}
|
||||
|
||||
func resultsHandler(queue chan string, results <-chan Snapshot) {
|
||||
for result := range results {
|
||||
if result.Error != nil {
|
||||
LogError("[%s] %w", result.Url, result.Error)
|
||||
} else {
|
||||
LogInfo("[%s] Done", result.Url)
|
||||
LogDebug("[%s] Done", result.Url)
|
||||
for _, link := range result.Links {
|
||||
if strings.HasPrefix(link.Full, "gemini://") {
|
||||
go func(link GeminiUrl) {
|
||||
queue <- link.Full
|
||||
// fmt.Printf("Sent %s to queue\n", link.Full)
|
||||
}(link)
|
||||
}
|
||||
}
|
||||
// fmt.Printf(SnapshotToJSON(result))
|
||||
}
|
||||
}
|
||||
LogInfo("All results have been processed")
|
||||
close(done)
|
||||
}
|
||||
|
||||
func worker(queue <-chan string, results chan Snapshot) {
|
||||
func worker(id int, rootPath string, queue <-chan string, results chan Snapshot) {
|
||||
for url := range queue {
|
||||
LogDebug("Worker %d visiting %s", id, url)
|
||||
result := Visit(url)
|
||||
// If we encountered an error when
|
||||
// visiting, skip processing
|
||||
@@ -101,12 +89,15 @@ func worker(queue <-chan string, results chan Snapshot) {
|
||||
results <- *result
|
||||
continue
|
||||
}
|
||||
LogDebug("Worker %d processing %s", id, url)
|
||||
result = Process(result)
|
||||
if result.Error != nil {
|
||||
results <- *result
|
||||
continue
|
||||
}
|
||||
SaveResult(ROOTPATH, result)
|
||||
LogDebug("Worker %d saving %s", id, url)
|
||||
SaveResult(rootPath, result)
|
||||
results <- *result
|
||||
time.Sleep(time.Duration(rand.IntN(5)) * time.Second)
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user