package scheduler import ( "context" "fmt" "log/slog" "strconv" "strings" "sync" "time" "github.com/robfig/cron/v3" "veola/internal/apify" "veola/internal/config" "veola/internal/db" "veola/internal/ebay" "veola/internal/models" "veola/internal/ntfy" ) // Provider labels distinguish how a plan is executed: through an Apify actor // run, or through eBay's official Browse API. const ( providerApify = "apify" providerEbay = "ebay" ) type Scheduler struct { cfg *config.Config store *db.Store apify *apify.Client ebay *ebay.Client ntfy *ntfy.Client cron *cron.Cron mu sync.Mutex entries map[int64]cron.EntryID rootCtx context.Context cancel context.CancelFunc } func New(cfg *config.Config, store *db.Store, ap *apify.Client, nt *ntfy.Client) *Scheduler { rootCtx, cancel := context.WithCancel(context.Background()) return &Scheduler{ cfg: cfg, store: store, apify: ap, ebay: ebay.New(cfg.Ebay.ClientID, cfg.Ebay.ClientSecret, cfg.Ebay.Environment), ntfy: nt, cron: cron.New(), entries: make(map[int64]cron.EntryID), rootCtx: rootCtx, cancel: cancel, } } func (s *Scheduler) Start(ctx context.Context) error { items, err := s.store.ListActiveItems(ctx) if err != nil { return err } for _, it := range items { s.register(it) } s.cron.Start() slog.Info("scheduler started", "items", len(items)) return nil } // Stop blocks until running jobs complete. func (s *Scheduler) Stop() { s.cancel() stopCtx := s.cron.Stop() <-stopCtx.Done() slog.Info("scheduler stopped") } // SyncItem registers, re-registers, or removes the cron job for an item based // on its current Active flag. Call after create/update/toggle/delete. func (s *Scheduler) SyncItem(it models.Item) { s.mu.Lock() defer s.mu.Unlock() if existing, ok := s.entries[it.ID]; ok { s.cron.Remove(existing) delete(s.entries, it.ID) } if !it.Active { return } s.registerLocked(it) } func (s *Scheduler) RemoveItem(id int64) { s.mu.Lock() defer s.mu.Unlock() if existing, ok := s.entries[id]; ok { s.cron.Remove(existing) delete(s.entries, id) } } func (s *Scheduler) register(it models.Item) { s.mu.Lock() defer s.mu.Unlock() s.registerLocked(it) } func (s *Scheduler) registerLocked(it models.Item) { mins := it.PollIntervalMinutes if mins <= 0 { mins = s.cfg.Scheduler.GlobalPollIntervalMinutes } if mins <= 0 { mins = 60 } spec := fmt.Sprintf("@every %dm", mins) id := it.ID entryID, err := s.cron.AddFunc(spec, func() { ctx, cancel := context.WithTimeout(s.rootCtx, 10*time.Minute) defer cancel() fresh, err := s.store.GetItem(ctx, id) if err != nil || fresh == nil || !fresh.Active { return } s.RunPoll(ctx, *fresh) }) if err != nil { slog.Error("schedule failed", "item_id", it.ID, "err", err) return } s.entries[it.ID] = entryID } // RunPoll executes one poll cycle for an item. Public so handlers can trigger // "Run Now" without going through cron. Iterates over each (alias × marketplace) // pair; a single failing combo does not poison the others. func (s *Scheduler) RunPoll(ctx context.Context, it models.Item) { plans := s.buildAllInputs(it) if len(plans) == 0 { s.recordError(ctx, it.ID, "no marketplaces configured for this item") return } apifyClient := s.apifyClient(ctx) var results []apify.UnifiedResult var errs []string successes := 0 for _, p := range plans { decoded, err := s.ExecutePlan(ctx, p) if err != nil { label := p.marketplace if p.query != "" { label = fmt.Sprintf("query %q on %s", p.query, p.marketplace) } errs = append(errs, fmt.Sprintf("%s: %s", label, err.Error())) slog.Error("plan failed", "item_id", it.ID, "provider", p.provider, "marketplace", p.marketplace, "query", p.query, "err", err) continue } results = append(results, decoded...) successes++ } if successes == 0 { s.recordError(ctx, it.ID, strings.Join(errs, "; ")) return } if it.UsePriceComparison { pcID := it.ActorPriceCompare if pcID == "" { pcID = s.cfg.Apify.Actors.PriceComparison } if pcID != "" { pcQueries := it.SearchQueries() if len(pcQueries) == 0 && it.URL != "" { pcQueries = []string{""} } for _, q := range pcQueries { pcRaw, err := apifyClient.Run(ctx, pcID, apify.PriceComparisonInput{ Query: q, URL: it.URL, ProxyConfiguration: s.proxyConfig(), }) if err == nil { pc, _ := apify.Decode(pcRaw, apify.SourcePriceCompare) for i := range pc { pc[i].MatchedQuery = q } results = append(results, pc...) } else { slog.Warn("price comparison failed", "item_id", it.ID, "query", q, "err", err) } } } } beforeDedup := len(results) results = DedupByURL(results) threshold := s.cfg.Scheduler.MatchConfidenceThreshold beforeFilter := len(results) results = FilterResults(results, threshold, it.IncludeOutOfStock) results = ApplyItemFilters(results, it.MinPrice, it.ExcludeKeywordsList()) slog.Info("filter applied", "item_id", it.ID, "before_dedup", beforeDedup, "before_filter", beforeFilter, "after", len(results), "min_confidence", threshold, "min_price", it.MinPrice, "exclude_count", len(it.ExcludeKeywordsList()), "include_out_of_stock", it.IncludeOutOfStock, ) bestIdx := PickBest(results) alertsSent := 0 for _, r := range results { exists, err := s.store.ResultExists(ctx, it.ID, r.URL) if err != nil { slog.Error("dedup check failed", "err", err) continue } if exists { // Row already stored — but if this poll surfaced an end time we // didn't have before (or the row predates the ends_at column), // backfill it so countdowns light up for known auctions. if r.EndsAt != nil { if err := s.store.BackfillResultEndsAt(ctx, it.ID, r.URL, *r.EndsAt); err != nil { slog.Error("backfill ends_at failed", "err", err) } } continue } alerted := false if ShouldAlert(it.TargetPrice, r.Price) { if err := s.sendNotification(ctx, it, r); err != nil { slog.Error("ntfy send failed", "err", err) } else { alerted = true alertsSent++ } } price := r.Price _, err = s.store.InsertResult(ctx, &models.Result{ ItemID: it.ID, Title: r.Title, Price: &price, Currency: r.Currency, URL: r.URL, Source: r.Source, ImageURL: r.ImageURL, MatchedQuery: r.MatchedQuery, Alerted: alerted, EndsAt: r.EndsAt, }) if err != nil { slog.Error("insert result failed", "err", err) } } errMsg := "" if len(errs) > 0 { errMsg = strings.Join(errs, "; ") } if bestIdx >= 0 { best := results[bestIdx] bp := best.Price _ = s.store.UpdateItemPollResult(ctx, it.ID, &models.Item{ BestPrice: &bp, BestPriceCurrency: best.Currency, BestPriceStore: best.Store, BestPriceURL: best.URL, BestPriceImageURL: best.ImageURL, BestPriceTitle: best.Title, }, errMsg) _ = s.store.InsertPricePoint(ctx, &models.PricePoint{ ItemID: it.ID, Price: bp, Store: best.Store, }) } else { _ = s.store.UpdateItemPollResult(ctx, it.ID, nil, errMsg) } slog.Info("poll completed", "item_id", it.ID, "item_name", it.Name, "marketplaces", len(plans), "successes", successes, "results", len(results), "alerts_sent", alertsSent, ) } func (s *Scheduler) recordError(ctx context.Context, id int64, msg string) { if err := s.store.UpdateItemPollResult(ctx, id, nil, msg); err != nil { slog.Error("record error failed", "err", err) } } // apifyClient returns an apify.Client whose API key reflects the latest // value from settings, falling back to config.toml. func (s *Scheduler) apifyClient(ctx context.Context) *apify.Client { key := s.cfg.Apify.APIKey if v, _ := s.store.GetSetting(ctx, "apify_api_key"); v != "" { key = v } return apify.New(key) } // ebayClient returns the shared eBay client with credentials refreshed from // settings (falling back to config.toml). The client caches its OAuth token // in memory, so the same instance is reused across polls; credentials are // only re-applied when they actually change. func (s *Scheduler) ebayClient(ctx context.Context) *ebay.Client { id := s.cfg.Ebay.ClientID secret := s.cfg.Ebay.ClientSecret if v, _ := s.store.GetSetting(ctx, "ebay_client_id"); v != "" { id = v } if v, _ := s.store.GetSetting(ctx, "ebay_client_secret"); v != "" { secret = v } s.ebay.EnsureCredentials(id, secret) return s.ebay } // EbayUsage returns the number of eBay Browse API calls made so far today and // the configured daily limit. A limit <= 0 means uncapped. Settings override // config.toml for the limit, mirroring how credentials are resolved. func (s *Scheduler) EbayUsage(ctx context.Context) (used, limit int) { used, _ = s.store.EbayUsageToday(ctx) limit = s.cfg.Ebay.DailyCallLimit if v, _ := s.store.GetSetting(ctx, "ebay_daily_call_limit"); v != "" { if n, err := strconv.Atoi(strings.TrimSpace(v)); err == nil { limit = n } } return used, limit } // ExecutePlan runs one plan and returns decoded, provider-agnostic results // with MatchedQuery already stamped. eBay plans go through the official // Browse API; all other plans run an Apify actor. Callers handle per-plan // errors without poisoning sibling plans. func (s *Scheduler) ExecutePlan(ctx context.Context, p actorPlan) ([]apify.UnifiedResult, error) { var decoded []apify.UnifiedResult switch p.provider { case providerEbay: sp, ok := p.input.(ebay.SearchParams) if !ok { return nil, fmt.Errorf("ebay plan has wrong input type %T", p.input) } used, limit := s.EbayUsage(ctx) if limit > 0 && used >= limit { return nil, fmt.Errorf("ebay daily API call limit reached (%d/%d); polling halted until the next reset (midnight US Pacific)", used, limit) } listings, err := s.ebayClient(ctx).Search(ctx, sp) // The call hit eBay (or at least was attempted against it) whether // or not it succeeded, so it counts against the daily allowance. if n, incErr := s.store.IncrementEbayUsage(ctx); incErr != nil { slog.Error("ebay usage increment failed", "err", incErr) } else if limit > 0 && n >= limit { slog.Warn("ebay daily API call limit reached", "used", n, "limit", limit) } if err != nil { return nil, err } decoded = make([]apify.UnifiedResult, 0, len(listings)) for _, l := range listings { decoded = append(decoded, apify.UnifiedResult{ Title: l.Title, Price: l.Price, Currency: l.Currency, URL: l.URL, Store: l.Store, ImageURL: l.ImageURL, Source: apify.SourceActiveEbay, EndsAt: l.EndsAt, }) } default: if p.actorID == "" { return nil, fmt.Errorf("no actor configured for %s", p.marketplace) } raw, err := s.apifyClient(ctx).Run(ctx, p.actorID, p.input) if err != nil { return nil, err } decoded, _ = apify.Decode(raw, p.source) } usable := 0 for i := range decoded { decoded[i].MatchedQuery = p.query if decoded[i].URL != "" && decoded[i].Price > 0 { usable++ } } slog.Info("plan executed", "provider", p.provider, "marketplace", p.marketplace, "query", p.query, "decoded", len(decoded), "usable", usable, ) return decoded, nil } func (s *Scheduler) sendNotification(ctx context.Context, it models.Item, r apify.UnifiedResult) error { tags := []string{"mag"} if it.TargetPrice != nil && r.Price <= *it.TargetPrice { tags = []string{"shopping_cart", "tada"} } priority := it.NtfyPriority if priority == "" { priority = "default" } topic := it.NtfyTopic if topic == "" { if v, _ := s.store.GetSetting(ctx, "ntfy_default_topic"); v != "" { topic = v } else { topic = s.cfg.Ntfy.DefaultTopic } } msg := fmt.Sprintf("%s %s%.2f", r.Store, currencyPrefix(r.Currency), r.Price) if it.TargetPrice != nil { msg += fmt.Sprintf(" (target: %s%.2f)", currencyPrefix(r.Currency), *it.TargetPrice) } if r.Title != "" { msg += "\n" + r.Title } baseURL := s.cfg.Ntfy.BaseURL if v, _ := s.store.GetSetting(ctx, "ntfy_base_url"); v != "" { baseURL = v } token, _ := s.store.GetSetting(ctx, "ntfy_token") client := ntfy.NewWithToken(baseURL, token) return client.Send(ctx, ntfy.Notification{ Topic: topic, Title: fmt.Sprintf("Veola Alert: %s", it.Name), Message: msg, Priority: priority, Tags: tags, Click: r.URL, }) } func currencyPrefix(c string) string { switch c { case "USD", "": return "$" case "GBP": return "£" case "EUR": return "€" case "JPY": return "¥" } return c + " " } // BuildPreviewInputs returns one actor plan per alias for the first marketplace // on the item. Preview deliberately uses only one marketplace to limit actor // runs, but exercises every alias so the operator sees the full result set. func (s *Scheduler) BuildPreviewInputs(it models.Item) []actorPlan { queries := it.SearchQueries() if len(queries) == 0 { queries = []string{""} } markets := it.Marketplaces if len(markets) > 1 { markets = markets[:1] } var out []actorPlan for _, q := range queries { out = append(out, s.buildInputsForQuery(it, q, markets)...) } return out } type actorPlan struct { marketplace string source string provider string actorID string query string input any } // Marketplace returns the marketplace for this plan. func (p actorPlan) Marketplace() string { return p.marketplace } // Source returns the result-source label (used to pick a decoder). func (p actorPlan) Source() string { return p.source } // ActorID returns the Apify actor ID this plan will invoke. func (p actorPlan) ActorID() string { return p.actorID } // Query returns the alias string this plan searches for. Empty for URL-only items. func (p actorPlan) Query() string { return p.query } // Input returns the actor input payload as expected by apify.Client.Run. func (p actorPlan) Input() any { return p.input } // Provider returns "apify" or "ebay" — how this plan is executed. func (p actorPlan) Provider() string { return p.provider } // buildAllInputs returns one actor plan per (alias × marketplace) for the item. // For URL-only items (no aliases), produces one plan per marketplace with an // empty query string. func (s *Scheduler) buildAllInputs(it models.Item) []actorPlan { queries := it.SearchQueries() if len(queries) == 0 { queries = []string{""} } markets := it.Marketplaces if len(markets) == 0 { markets = []string{"ebay.com"} } var out []actorPlan for _, q := range queries { out = append(out, s.buildInputsForQuery(it, q, markets)...) } return out } // buildInputsForQuery returns one actor plan per marketplace, all using the // same query string. Used by both the scheduler and the preview path. func (s *Scheduler) buildInputsForQuery(it models.Item, query string, markets []string) []actorPlan { url := strings.ToLower(it.URL) plans := make([]actorPlan, 0, len(markets)) for _, m := range markets { mk := strings.ToLower(m) switch { case strings.Contains(mk, "yahoo") || strings.Contains(url, "yahoo.co.jp"): actorID := firstNonEmpty(it.ActorActive, s.cfg.Apify.Actors.YahooAuctionsJP) plans = append(plans, actorPlan{ marketplace: m, source: apify.SourceYahooJP, provider: providerApify, actorID: actorID, query: query, input: apify.YahooAuctionsJPInput{SearchTerm: query, MaxPages: 1}, }) case strings.Contains(mk, "mercari") || strings.Contains(url, "mercari"): actorID := firstNonEmpty(it.ActorActive, s.cfg.Apify.Actors.MercariJP) plans = append(plans, actorPlan{ marketplace: m, source: apify.SourceMercariJP, provider: providerApify, actorID: actorID, query: query, input: apify.MercariJPInput{ SearchKeywords: []string{query}, Status: "on_sale", MaxResults: 30, }, }) case ebay.IsEbayMarketplace(mk): // eBay marketplaces are polled through eBay's official Browse // API, not an Apify scraper actor. plans = append(plans, actorPlan{ marketplace: m, source: apify.SourceActiveEbay, provider: providerEbay, query: query, input: ebay.SearchParams{ MarketplaceID: ebay.MarketplaceID(mk), Query: query, ListingType: it.ListingType, Condition: it.Condition, Region: it.Region, Limit: 30, }, }) default: // Non-eBay custom marketplaces still fall back to the Apify // active-listings actor. actorID := firstNonEmpty(it.ActorActive, s.cfg.Apify.Actors.ActiveListings) plans = append(plans, actorPlan{ marketplace: m, source: apify.SourceActiveEbay, provider: providerApify, actorID: actorID, query: query, input: apify.ActiveListingInput{ SearchQueries: []string{query}, MaxProductsPerSearch: 30, MaxSearchPages: 1, Sort: "best_match", ListingType: mapListingType(it.ListingType), ProxyConfiguration: s.proxyConfig(), }, }) } } return plans } // DedupByURL collapses duplicates within a single result set. When the same // listing matches multiple aliases the first occurrence wins, including its // MatchedQuery tag. func DedupByURL(in []apify.UnifiedResult) []apify.UnifiedResult { seen := map[string]bool{} out := make([]apify.UnifiedResult, 0, len(in)) for _, r := range in { if r.URL == "" { out = append(out, r) continue } key := r.Source + "|" + r.URL if seen[key] { continue } seen[key] = true out = append(out, r) } return out } // proxyConfig returns the apify proxyConfiguration block built from // config.toml. Returns nil — meaning omit the field from actor input // entirely — if use_apify_proxy is false. Group and country are ignored when // use_apify_proxy is false to prevent contradictory input. func (s *Scheduler) proxyConfig() *apify.ProxyConfiguration { p := s.cfg.Apify.Proxy if !p.UseApifyProxy { return nil } return &apify.ProxyConfiguration{ UseApifyProxy: true, ApifyProxyGroups: p.Groups, ApifyProxyCountry: p.Country, } } // mapListingType translates Veola's listing-type vocabulary ("all", "BIN", // "auction") into the automation-lab/ebay-scraper input vocabulary // ("all", "buy_it_now", "auction"). Unrecognized values fall through as-is // in case the user pasted a value the actor accepts but we don't. func mapListingType(s string) string { switch strings.ToLower(s) { case "", "all": return "all" case "bin", "buy_it_now": return "buy_it_now" case "auction": return "auction" } return s } func firstNonEmpty(vs ...string) string { for _, v := range vs { if v != "" { return v } } return "" } // SeedSoldHistory runs the sold-listings actor and writes price_history rows // for an item just added. Errors are logged and swallowed: a missing baseline // is not fatal. func (s *Scheduler) SeedSoldHistory(ctx context.Context, it models.Item) { queries := it.SearchQueries() if len(queries) == 0 { return } markets := it.Marketplaces if len(markets) == 0 { markets = []string{"ebay.com"} } for _, q := range queries { for _, m := range markets { s.seedSoldHistoryFor(ctx, it, q, m) } } } func (s *Scheduler) seedSoldHistoryFor(ctx context.Context, it models.Item, query, marketplace string) { actorID := firstNonEmpty(it.ActorSold, s.cfg.Apify.Actors.SoldListings) source := apify.SourceSoldEbay if strings.Contains(strings.ToLower(marketplace), "yahoo") { actorID = firstNonEmpty(it.ActorSold, s.cfg.Apify.Actors.YahooAuctionsJPSold) source = apify.SourceSoldYahooJP } if actorID == "" { return } raw, err := s.apifyClient(ctx).Run(ctx, actorID, apify.SoldListingInput{ Query: query, Marketplace: marketplace, MaxResults: 50, DaysBack: 30, ProxyConfiguration: s.proxyConfig(), }) if err != nil { slog.Warn("sold history seed failed", "item_id", it.ID, "marketplace", marketplace, "query", query, "err", err) return } for _, r := range raw { var sold apify.SoldListingResult if err := jsonUnmarshal(r, &sold); err != nil || sold.SoldPrice <= 0 { continue } t, _ := time.Parse(time.RFC3339, sold.SoldAt) if t.IsZero() { t = time.Now() } _ = s.store.InsertPricePoint(ctx, &models.PricePoint{ ItemID: it.ID, Price: sold.SoldPrice, Store: sourceLabelToStore(source), PolledAt: t, }) } } func sourceLabelToStore(src string) string { switch src { case apify.SourceSoldYahooJP: return "yahoo-auctions-jp-sold" } return "ebay-sold" }