Files
veola/internal/scheduler/scheduler.go
2026-05-13 19:42:49 -07:00

600 lines
16 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
package scheduler
import (
"context"
"fmt"
"log/slog"
"strings"
"sync"
"time"
"github.com/robfig/cron/v3"
"veola/internal/apify"
"veola/internal/config"
"veola/internal/db"
"veola/internal/models"
"veola/internal/ntfy"
)
type Scheduler struct {
cfg *config.Config
store *db.Store
apify *apify.Client
ntfy *ntfy.Client
cron *cron.Cron
mu sync.Mutex
entries map[int64]cron.EntryID
rootCtx context.Context
cancel context.CancelFunc
}
func New(cfg *config.Config, store *db.Store, ap *apify.Client, nt *ntfy.Client) *Scheduler {
rootCtx, cancel := context.WithCancel(context.Background())
return &Scheduler{
cfg: cfg,
store: store,
apify: ap,
ntfy: nt,
cron: cron.New(),
entries: make(map[int64]cron.EntryID),
rootCtx: rootCtx,
cancel: cancel,
}
}
func (s *Scheduler) Start(ctx context.Context) error {
items, err := s.store.ListActiveItems(ctx)
if err != nil {
return err
}
for _, it := range items {
s.register(it)
}
s.cron.Start()
slog.Info("scheduler started", "items", len(items))
return nil
}
// Stop blocks until running jobs complete.
func (s *Scheduler) Stop() {
s.cancel()
stopCtx := s.cron.Stop()
<-stopCtx.Done()
slog.Info("scheduler stopped")
}
// SyncItem registers, re-registers, or removes the cron job for an item based
// on its current Active flag. Call after create/update/toggle/delete.
func (s *Scheduler) SyncItem(it models.Item) {
s.mu.Lock()
defer s.mu.Unlock()
if existing, ok := s.entries[it.ID]; ok {
s.cron.Remove(existing)
delete(s.entries, it.ID)
}
if !it.Active {
return
}
s.registerLocked(it)
}
func (s *Scheduler) RemoveItem(id int64) {
s.mu.Lock()
defer s.mu.Unlock()
if existing, ok := s.entries[id]; ok {
s.cron.Remove(existing)
delete(s.entries, id)
}
}
func (s *Scheduler) register(it models.Item) {
s.mu.Lock()
defer s.mu.Unlock()
s.registerLocked(it)
}
func (s *Scheduler) registerLocked(it models.Item) {
mins := it.PollIntervalMinutes
if mins <= 0 {
mins = s.cfg.Scheduler.GlobalPollIntervalMinutes
}
if mins <= 0 {
mins = 60
}
spec := fmt.Sprintf("@every %dm", mins)
id := it.ID
entryID, err := s.cron.AddFunc(spec, func() {
ctx, cancel := context.WithTimeout(s.rootCtx, 10*time.Minute)
defer cancel()
fresh, err := s.store.GetItem(ctx, id)
if err != nil || fresh == nil || !fresh.Active {
return
}
s.RunPoll(ctx, *fresh)
})
if err != nil {
slog.Error("schedule failed", "item_id", it.ID, "err", err)
return
}
s.entries[it.ID] = entryID
}
// RunPoll executes one poll cycle for an item. Public so handlers can trigger
// "Run Now" without going through cron. Iterates over each (alias × marketplace)
// pair; a single failing combo does not poison the others.
func (s *Scheduler) RunPoll(ctx context.Context, it models.Item) {
plans := s.buildAllInputs(it)
if len(plans) == 0 {
s.recordError(ctx, it.ID, "no marketplaces configured for this item")
return
}
apifyClient := s.apifyClient(ctx)
var results []apify.UnifiedResult
var errs []string
successes := 0
for _, p := range plans {
if p.actorID == "" {
errs = append(errs, fmt.Sprintf("%s: no actor configured", p.marketplace))
continue
}
raw, err := apifyClient.Run(ctx, p.actorID, p.input)
if err != nil {
label := p.marketplace
if p.query != "" {
label = fmt.Sprintf("query %q on %s", p.query, p.marketplace)
}
errs = append(errs, fmt.Sprintf("%s: %s", label, err.Error()))
slog.Error("apify run failed", "item_id", it.ID, "marketplace", p.marketplace, "query", p.query, "err", err)
continue
}
decoded, _ := apify.Decode(raw, p.source)
usable := 0
for i := range decoded {
decoded[i].MatchedQuery = p.query
if decoded[i].URL != "" && decoded[i].Price > 0 {
usable++
}
}
slog.Info("apify run decoded",
"item_id", it.ID,
"marketplace", p.marketplace,
"query", p.query,
"actor", p.actorID,
"raw", len(raw),
"decoded", len(decoded),
"usable", usable,
)
if usable == 0 && len(raw) > 0 {
var sample map[string]any
if err := jsonUnmarshal(raw[0], &sample); err == nil {
keys := make([]string, 0, len(sample))
for k := range sample {
keys = append(keys, k)
}
slog.Warn("decoded zero usable rows; raw item keys",
"item_id", it.ID,
"marketplace", p.marketplace,
"actor", p.actorID,
"keys", keys,
)
}
}
results = append(results, decoded...)
successes++
}
if successes == 0 {
s.recordError(ctx, it.ID, strings.Join(errs, "; "))
return
}
if it.UsePriceComparison {
pcID := it.ActorPriceCompare
if pcID == "" {
pcID = s.cfg.Apify.Actors.PriceComparison
}
if pcID != "" {
pcQueries := it.SearchQueries()
if len(pcQueries) == 0 && it.URL != "" {
pcQueries = []string{""}
}
for _, q := range pcQueries {
pcRaw, err := apifyClient.Run(ctx, pcID, apify.PriceComparisonInput{
Query: q, URL: it.URL,
ProxyConfiguration: s.proxyConfig(),
})
if err == nil {
pc, _ := apify.Decode(pcRaw, apify.SourcePriceCompare)
for i := range pc {
pc[i].MatchedQuery = q
}
results = append(results, pc...)
} else {
slog.Warn("price comparison failed", "item_id", it.ID, "query", q, "err", err)
}
}
}
}
beforeDedup := len(results)
results = DedupByURL(results)
threshold := s.cfg.Scheduler.MatchConfidenceThreshold
beforeFilter := len(results)
results = FilterResults(results, threshold, it.IncludeOutOfStock)
results = ApplyItemFilters(results, it.MinPrice, it.ExcludeKeywordsList())
slog.Info("filter applied",
"item_id", it.ID,
"before_dedup", beforeDedup,
"before_filter", beforeFilter,
"after", len(results),
"min_confidence", threshold,
"min_price", it.MinPrice,
"exclude_count", len(it.ExcludeKeywordsList()),
"include_out_of_stock", it.IncludeOutOfStock,
)
bestIdx := PickBest(results)
alertsSent := 0
for _, r := range results {
exists, err := s.store.ResultExists(ctx, it.ID, r.URL)
if err != nil {
slog.Error("dedup check failed", "err", err)
continue
}
if exists {
continue
}
alerted := false
if ShouldAlert(it.TargetPrice, r.Price) {
if err := s.sendNotification(ctx, it, r); err != nil {
slog.Error("ntfy send failed", "err", err)
} else {
alerted = true
alertsSent++
}
}
price := r.Price
_, err = s.store.InsertResult(ctx, &models.Result{
ItemID: it.ID,
Title: r.Title,
Price: &price,
Currency: r.Currency,
URL: r.URL,
Source: r.Source,
ImageURL: r.ImageURL,
MatchedQuery: r.MatchedQuery,
Alerted: alerted,
})
if err != nil {
slog.Error("insert result failed", "err", err)
}
}
errMsg := ""
if len(errs) > 0 {
errMsg = strings.Join(errs, "; ")
}
if bestIdx >= 0 {
best := results[bestIdx]
bp := best.Price
_ = s.store.UpdateItemPollResult(ctx, it.ID, &models.Item{
BestPrice: &bp,
BestPriceStore: best.Store,
BestPriceURL: best.URL,
BestPriceImageURL: best.ImageURL,
BestPriceTitle: best.Title,
}, errMsg)
_ = s.store.InsertPricePoint(ctx, &models.PricePoint{
ItemID: it.ID,
Price: bp,
Store: best.Store,
})
} else {
_ = s.store.UpdateItemPollResult(ctx, it.ID, nil, errMsg)
}
slog.Info("poll completed",
"item_id", it.ID,
"item_name", it.Name,
"marketplaces", len(plans),
"successes", successes,
"results", len(results),
"alerts_sent", alertsSent,
)
}
func (s *Scheduler) recordError(ctx context.Context, id int64, msg string) {
if err := s.store.UpdateItemPollResult(ctx, id, nil, msg); err != nil {
slog.Error("record error failed", "err", err)
}
}
// apifyClient returns an apify.Client whose API key reflects the latest
// value from settings, falling back to config.toml.
func (s *Scheduler) apifyClient(ctx context.Context) *apify.Client {
key := s.cfg.Apify.APIKey
if v, _ := s.store.GetSetting(ctx, "apify_api_key"); v != "" {
key = v
}
return apify.New(key)
}
func (s *Scheduler) sendNotification(ctx context.Context, it models.Item, r apify.UnifiedResult) error {
tags := []string{"mag"}
if it.TargetPrice != nil && r.Price <= *it.TargetPrice {
tags = []string{"shopping_cart", "tada"}
}
priority := it.NtfyPriority
if priority == "" {
priority = "default"
}
topic := it.NtfyTopic
if topic == "" {
if v, _ := s.store.GetSetting(ctx, "ntfy_default_topic"); v != "" {
topic = v
} else {
topic = s.cfg.Ntfy.DefaultTopic
}
}
msg := fmt.Sprintf("%s %s%.2f", r.Store, currencyPrefix(r.Currency), r.Price)
if it.TargetPrice != nil {
msg += fmt.Sprintf(" (target: %s%.2f)", currencyPrefix(r.Currency), *it.TargetPrice)
}
if r.Title != "" {
msg += "\n" + r.Title
}
baseURL := s.cfg.Ntfy.BaseURL
if v, _ := s.store.GetSetting(ctx, "ntfy_base_url"); v != "" {
baseURL = v
}
token, _ := s.store.GetSetting(ctx, "ntfy_token")
client := ntfy.NewWithToken(baseURL, token)
return client.Send(ctx, ntfy.Notification{
Topic: topic,
Title: fmt.Sprintf("Veola Alert: %s", it.Name),
Message: msg,
Priority: priority,
Tags: tags,
Click: r.URL,
})
}
func currencyPrefix(c string) string {
switch c {
case "USD", "":
return "$"
case "GBP":
return "£"
case "EUR":
return "€"
case "JPY":
return "¥"
}
return c + " "
}
// BuildPreviewInputs returns one actor plan per alias for the first marketplace
// on the item. Preview deliberately uses only one marketplace to limit actor
// runs, but exercises every alias so the operator sees the full result set.
func (s *Scheduler) BuildPreviewInputs(it models.Item) []actorPlan {
queries := it.SearchQueries()
if len(queries) == 0 {
queries = []string{""}
}
markets := it.Marketplaces
if len(markets) > 1 {
markets = markets[:1]
}
var out []actorPlan
for _, q := range queries {
out = append(out, s.buildInputsForQuery(it, q, markets)...)
}
return out
}
type actorPlan struct {
marketplace string
source string
actorID string
query string
input any
}
// Marketplace returns the marketplace for this plan.
func (p actorPlan) Marketplace() string { return p.marketplace }
// Source returns the result-source label (used to pick a decoder).
func (p actorPlan) Source() string { return p.source }
// ActorID returns the Apify actor ID this plan will invoke.
func (p actorPlan) ActorID() string { return p.actorID }
// Query returns the alias string this plan searches for. Empty for URL-only items.
func (p actorPlan) Query() string { return p.query }
// Input returns the actor input payload as expected by apify.Client.Run.
func (p actorPlan) Input() any { return p.input }
// buildAllInputs returns one actor plan per (alias × marketplace) for the item.
// For URL-only items (no aliases), produces one plan per marketplace with an
// empty query string.
func (s *Scheduler) buildAllInputs(it models.Item) []actorPlan {
queries := it.SearchQueries()
if len(queries) == 0 {
queries = []string{""}
}
markets := it.Marketplaces
if len(markets) == 0 {
markets = []string{"ebay.com"}
}
var out []actorPlan
for _, q := range queries {
out = append(out, s.buildInputsForQuery(it, q, markets)...)
}
return out
}
// buildInputsForQuery returns one actor plan per marketplace, all using the
// same query string. Used by both the scheduler and the preview path.
func (s *Scheduler) buildInputsForQuery(it models.Item, query string, markets []string) []actorPlan {
url := strings.ToLower(it.URL)
plans := make([]actorPlan, 0, len(markets))
for _, m := range markets {
mk := strings.ToLower(m)
switch {
case strings.Contains(mk, "yahoo") || strings.Contains(url, "yahoo.co.jp"):
actorID := firstNonEmpty(it.ActorActive, s.cfg.Apify.Actors.YahooAuctionsJP)
plans = append(plans, actorPlan{m, apify.SourceYahooJP, actorID, query, apify.YahooAuctionsJPInput{
SearchTerm: query,
MaxPages: 1,
}})
case strings.Contains(mk, "mercari") || strings.Contains(url, "mercari"):
actorID := firstNonEmpty(it.ActorActive, s.cfg.Apify.Actors.MercariJP)
plans = append(plans, actorPlan{m, apify.SourceMercariJP, actorID, query, apify.MercariJPInput{
SearchKeywords: []string{query},
Status: "on_sale",
MaxResults: 30,
}})
default:
actorID := firstNonEmpty(it.ActorActive, s.cfg.Apify.Actors.ActiveListings)
plans = append(plans, actorPlan{m, apify.SourceActiveEbay, actorID, query, apify.ActiveListingInput{
SearchQueries: []string{query},
MaxProductsPerSearch: 30,
MaxSearchPages: 1,
Sort: "best_match",
ListingType: mapListingType(it.ListingType),
ProxyConfiguration: s.proxyConfig(),
}})
}
}
return plans
}
// DedupByURL collapses duplicates within a single result set. When the same
// listing matches multiple aliases the first occurrence wins, including its
// MatchedQuery tag.
func DedupByURL(in []apify.UnifiedResult) []apify.UnifiedResult {
seen := map[string]bool{}
out := make([]apify.UnifiedResult, 0, len(in))
for _, r := range in {
if r.URL == "" {
out = append(out, r)
continue
}
key := r.Source + "|" + r.URL
if seen[key] {
continue
}
seen[key] = true
out = append(out, r)
}
return out
}
// proxyConfig returns the apify proxyConfiguration block built from
// config.toml. Returns nil — meaning omit the field from actor input
// entirely — if use_apify_proxy is false. Group and country are ignored when
// use_apify_proxy is false to prevent contradictory input.
func (s *Scheduler) proxyConfig() *apify.ProxyConfiguration {
p := s.cfg.Apify.Proxy
if !p.UseApifyProxy {
return nil
}
return &apify.ProxyConfiguration{
UseApifyProxy: true,
ApifyProxyGroups: p.Groups,
ApifyProxyCountry: p.Country,
}
}
// mapListingType translates Veola's listing-type vocabulary ("all", "BIN",
// "auction") into the automation-lab/ebay-scraper input vocabulary
// ("all", "buy_it_now", "auction"). Unrecognized values fall through as-is
// in case the user pasted a value the actor accepts but we don't.
func mapListingType(s string) string {
switch strings.ToLower(s) {
case "", "all":
return "all"
case "bin", "buy_it_now":
return "buy_it_now"
case "auction":
return "auction"
}
return s
}
func firstNonEmpty(vs ...string) string {
for _, v := range vs {
if v != "" {
return v
}
}
return ""
}
// SeedSoldHistory runs the sold-listings actor and writes price_history rows
// for an item just added. Errors are logged and swallowed: a missing baseline
// is not fatal.
func (s *Scheduler) SeedSoldHistory(ctx context.Context, it models.Item) {
queries := it.SearchQueries()
if len(queries) == 0 {
return
}
markets := it.Marketplaces
if len(markets) == 0 {
markets = []string{"ebay.com"}
}
for _, q := range queries {
for _, m := range markets {
s.seedSoldHistoryFor(ctx, it, q, m)
}
}
}
func (s *Scheduler) seedSoldHistoryFor(ctx context.Context, it models.Item, query, marketplace string) {
actorID := firstNonEmpty(it.ActorSold, s.cfg.Apify.Actors.SoldListings)
source := apify.SourceSoldEbay
if strings.Contains(strings.ToLower(marketplace), "yahoo") {
actorID = firstNonEmpty(it.ActorSold, s.cfg.Apify.Actors.YahooAuctionsJPSold)
source = apify.SourceSoldYahooJP
}
if actorID == "" {
return
}
raw, err := s.apifyClient(ctx).Run(ctx, actorID, apify.SoldListingInput{
Query: query, Marketplace: marketplace, MaxResults: 50, DaysBack: 30,
ProxyConfiguration: s.proxyConfig(),
})
if err != nil {
slog.Warn("sold history seed failed", "item_id", it.ID, "marketplace", marketplace, "query", query, "err", err)
return
}
for _, r := range raw {
var sold apify.SoldListingResult
if err := jsonUnmarshal(r, &sold); err != nil || sold.SoldPrice <= 0 {
continue
}
t, _ := time.Parse(time.RFC3339, sold.SoldAt)
if t.IsZero() {
t = time.Now()
}
_ = s.store.InsertPricePoint(ctx, &models.PricePoint{
ItemID: it.ID,
Price: sold.SoldPrice,
Store: sourceLabelToStore(source),
PolledAt: t,
})
}
}
func sourceLabelToStore(src string) string {
switch src {
case apify.SourceSoldYahooJP:
return "yahoo-auctions-jp-sold"
}
return "ebay-sold"
}