eBay marketplaces are now polled through eBay's official Buy > Browse API (client-credentials OAuth2) instead of an Apify scraper actor; Apify still handles Yahoo JP and Mercari. Browse API calls are tracked per day in a new ebay_api_usage table and capped (default 5000, configurable) on eBay's Pacific-time reset clock, so polling halts before the limit is hit. Credentials live in config.toml [ebay] and are overridable via /settings, which also surfaces the day's running call count. Also carries the server.secure_cookies config plumbing (field, accessor, example) consumed by the following commit. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
700 lines
20 KiB
Go
700 lines
20 KiB
Go
package scheduler
|
||
|
||
import (
|
||
"context"
|
||
"fmt"
|
||
"log/slog"
|
||
"strconv"
|
||
"strings"
|
||
"sync"
|
||
"time"
|
||
|
||
"github.com/robfig/cron/v3"
|
||
|
||
"veola/internal/apify"
|
||
"veola/internal/config"
|
||
"veola/internal/db"
|
||
"veola/internal/ebay"
|
||
"veola/internal/models"
|
||
"veola/internal/ntfy"
|
||
)
|
||
|
||
// Provider labels distinguish how a plan is executed: through an Apify actor
|
||
// run, or through eBay's official Browse API.
|
||
const (
|
||
providerApify = "apify"
|
||
providerEbay = "ebay"
|
||
)
|
||
|
||
type Scheduler struct {
|
||
cfg *config.Config
|
||
store *db.Store
|
||
apify *apify.Client
|
||
ebay *ebay.Client
|
||
ntfy *ntfy.Client
|
||
cron *cron.Cron
|
||
|
||
mu sync.Mutex
|
||
entries map[int64]cron.EntryID
|
||
|
||
rootCtx context.Context
|
||
cancel context.CancelFunc
|
||
}
|
||
|
||
func New(cfg *config.Config, store *db.Store, ap *apify.Client, nt *ntfy.Client) *Scheduler {
|
||
rootCtx, cancel := context.WithCancel(context.Background())
|
||
return &Scheduler{
|
||
cfg: cfg,
|
||
store: store,
|
||
apify: ap,
|
||
ebay: ebay.New(cfg.Ebay.ClientID, cfg.Ebay.ClientSecret, cfg.Ebay.Environment),
|
||
ntfy: nt,
|
||
cron: cron.New(),
|
||
entries: make(map[int64]cron.EntryID),
|
||
rootCtx: rootCtx,
|
||
cancel: cancel,
|
||
}
|
||
}
|
||
|
||
func (s *Scheduler) Start(ctx context.Context) error {
|
||
items, err := s.store.ListActiveItems(ctx)
|
||
if err != nil {
|
||
return err
|
||
}
|
||
for _, it := range items {
|
||
s.register(it)
|
||
}
|
||
s.cron.Start()
|
||
slog.Info("scheduler started", "items", len(items))
|
||
return nil
|
||
}
|
||
|
||
// Stop blocks until running jobs complete.
|
||
func (s *Scheduler) Stop() {
|
||
s.cancel()
|
||
stopCtx := s.cron.Stop()
|
||
<-stopCtx.Done()
|
||
slog.Info("scheduler stopped")
|
||
}
|
||
|
||
// SyncItem registers, re-registers, or removes the cron job for an item based
|
||
// on its current Active flag. Call after create/update/toggle/delete.
|
||
func (s *Scheduler) SyncItem(it models.Item) {
|
||
s.mu.Lock()
|
||
defer s.mu.Unlock()
|
||
if existing, ok := s.entries[it.ID]; ok {
|
||
s.cron.Remove(existing)
|
||
delete(s.entries, it.ID)
|
||
}
|
||
if !it.Active {
|
||
return
|
||
}
|
||
s.registerLocked(it)
|
||
}
|
||
|
||
func (s *Scheduler) RemoveItem(id int64) {
|
||
s.mu.Lock()
|
||
defer s.mu.Unlock()
|
||
if existing, ok := s.entries[id]; ok {
|
||
s.cron.Remove(existing)
|
||
delete(s.entries, id)
|
||
}
|
||
}
|
||
|
||
func (s *Scheduler) register(it models.Item) {
|
||
s.mu.Lock()
|
||
defer s.mu.Unlock()
|
||
s.registerLocked(it)
|
||
}
|
||
|
||
func (s *Scheduler) registerLocked(it models.Item) {
|
||
mins := it.PollIntervalMinutes
|
||
if mins <= 0 {
|
||
mins = s.cfg.Scheduler.GlobalPollIntervalMinutes
|
||
}
|
||
if mins <= 0 {
|
||
mins = 60
|
||
}
|
||
spec := fmt.Sprintf("@every %dm", mins)
|
||
id := it.ID
|
||
entryID, err := s.cron.AddFunc(spec, func() {
|
||
ctx, cancel := context.WithTimeout(s.rootCtx, 10*time.Minute)
|
||
defer cancel()
|
||
fresh, err := s.store.GetItem(ctx, id)
|
||
if err != nil || fresh == nil || !fresh.Active {
|
||
return
|
||
}
|
||
s.RunPoll(ctx, *fresh)
|
||
})
|
||
if err != nil {
|
||
slog.Error("schedule failed", "item_id", it.ID, "err", err)
|
||
return
|
||
}
|
||
s.entries[it.ID] = entryID
|
||
}
|
||
|
||
// RunPoll executes one poll cycle for an item. Public so handlers can trigger
|
||
// "Run Now" without going through cron. Iterates over each (alias × marketplace)
|
||
// pair; a single failing combo does not poison the others.
|
||
func (s *Scheduler) RunPoll(ctx context.Context, it models.Item) {
|
||
plans := s.buildAllInputs(it)
|
||
if len(plans) == 0 {
|
||
s.recordError(ctx, it.ID, "no marketplaces configured for this item")
|
||
return
|
||
}
|
||
apifyClient := s.apifyClient(ctx)
|
||
var results []apify.UnifiedResult
|
||
var errs []string
|
||
successes := 0
|
||
for _, p := range plans {
|
||
decoded, err := s.ExecutePlan(ctx, p)
|
||
if err != nil {
|
||
label := p.marketplace
|
||
if p.query != "" {
|
||
label = fmt.Sprintf("query %q on %s", p.query, p.marketplace)
|
||
}
|
||
errs = append(errs, fmt.Sprintf("%s: %s", label, err.Error()))
|
||
slog.Error("plan failed", "item_id", it.ID, "provider", p.provider, "marketplace", p.marketplace, "query", p.query, "err", err)
|
||
continue
|
||
}
|
||
results = append(results, decoded...)
|
||
successes++
|
||
}
|
||
if successes == 0 {
|
||
s.recordError(ctx, it.ID, strings.Join(errs, "; "))
|
||
return
|
||
}
|
||
|
||
if it.UsePriceComparison {
|
||
pcID := it.ActorPriceCompare
|
||
if pcID == "" {
|
||
pcID = s.cfg.Apify.Actors.PriceComparison
|
||
}
|
||
if pcID != "" {
|
||
pcQueries := it.SearchQueries()
|
||
if len(pcQueries) == 0 && it.URL != "" {
|
||
pcQueries = []string{""}
|
||
}
|
||
for _, q := range pcQueries {
|
||
pcRaw, err := apifyClient.Run(ctx, pcID, apify.PriceComparisonInput{
|
||
Query: q, URL: it.URL,
|
||
ProxyConfiguration: s.proxyConfig(),
|
||
})
|
||
if err == nil {
|
||
pc, _ := apify.Decode(pcRaw, apify.SourcePriceCompare)
|
||
for i := range pc {
|
||
pc[i].MatchedQuery = q
|
||
}
|
||
results = append(results, pc...)
|
||
} else {
|
||
slog.Warn("price comparison failed", "item_id", it.ID, "query", q, "err", err)
|
||
}
|
||
}
|
||
}
|
||
}
|
||
|
||
beforeDedup := len(results)
|
||
results = DedupByURL(results)
|
||
|
||
threshold := s.cfg.Scheduler.MatchConfidenceThreshold
|
||
beforeFilter := len(results)
|
||
results = FilterResults(results, threshold, it.IncludeOutOfStock)
|
||
results = ApplyItemFilters(results, it.MinPrice, it.ExcludeKeywordsList())
|
||
slog.Info("filter applied",
|
||
"item_id", it.ID,
|
||
"before_dedup", beforeDedup,
|
||
"before_filter", beforeFilter,
|
||
"after", len(results),
|
||
"min_confidence", threshold,
|
||
"min_price", it.MinPrice,
|
||
"exclude_count", len(it.ExcludeKeywordsList()),
|
||
"include_out_of_stock", it.IncludeOutOfStock,
|
||
)
|
||
|
||
bestIdx := PickBest(results)
|
||
alertsSent := 0
|
||
for _, r := range results {
|
||
exists, err := s.store.ResultExists(ctx, it.ID, r.URL)
|
||
if err != nil {
|
||
slog.Error("dedup check failed", "err", err)
|
||
continue
|
||
}
|
||
if exists {
|
||
continue
|
||
}
|
||
alerted := false
|
||
if ShouldAlert(it.TargetPrice, r.Price) {
|
||
if err := s.sendNotification(ctx, it, r); err != nil {
|
||
slog.Error("ntfy send failed", "err", err)
|
||
} else {
|
||
alerted = true
|
||
alertsSent++
|
||
}
|
||
}
|
||
price := r.Price
|
||
_, err = s.store.InsertResult(ctx, &models.Result{
|
||
ItemID: it.ID,
|
||
Title: r.Title,
|
||
Price: &price,
|
||
Currency: r.Currency,
|
||
URL: r.URL,
|
||
Source: r.Source,
|
||
ImageURL: r.ImageURL,
|
||
MatchedQuery: r.MatchedQuery,
|
||
Alerted: alerted,
|
||
})
|
||
if err != nil {
|
||
slog.Error("insert result failed", "err", err)
|
||
}
|
||
}
|
||
|
||
errMsg := ""
|
||
if len(errs) > 0 {
|
||
errMsg = strings.Join(errs, "; ")
|
||
}
|
||
if bestIdx >= 0 {
|
||
best := results[bestIdx]
|
||
bp := best.Price
|
||
_ = s.store.UpdateItemPollResult(ctx, it.ID, &models.Item{
|
||
BestPrice: &bp,
|
||
BestPriceStore: best.Store,
|
||
BestPriceURL: best.URL,
|
||
BestPriceImageURL: best.ImageURL,
|
||
BestPriceTitle: best.Title,
|
||
}, errMsg)
|
||
_ = s.store.InsertPricePoint(ctx, &models.PricePoint{
|
||
ItemID: it.ID,
|
||
Price: bp,
|
||
Store: best.Store,
|
||
})
|
||
} else {
|
||
_ = s.store.UpdateItemPollResult(ctx, it.ID, nil, errMsg)
|
||
}
|
||
|
||
slog.Info("poll completed",
|
||
"item_id", it.ID,
|
||
"item_name", it.Name,
|
||
"marketplaces", len(plans),
|
||
"successes", successes,
|
||
"results", len(results),
|
||
"alerts_sent", alertsSent,
|
||
)
|
||
}
|
||
|
||
func (s *Scheduler) recordError(ctx context.Context, id int64, msg string) {
|
||
if err := s.store.UpdateItemPollResult(ctx, id, nil, msg); err != nil {
|
||
slog.Error("record error failed", "err", err)
|
||
}
|
||
}
|
||
|
||
// apifyClient returns an apify.Client whose API key reflects the latest
|
||
// value from settings, falling back to config.toml.
|
||
func (s *Scheduler) apifyClient(ctx context.Context) *apify.Client {
|
||
key := s.cfg.Apify.APIKey
|
||
if v, _ := s.store.GetSetting(ctx, "apify_api_key"); v != "" {
|
||
key = v
|
||
}
|
||
return apify.New(key)
|
||
}
|
||
|
||
// ebayClient returns the shared eBay client with credentials refreshed from
|
||
// settings (falling back to config.toml). The client caches its OAuth token
|
||
// in memory, so the same instance is reused across polls; credentials are
|
||
// only re-applied when they actually change.
|
||
func (s *Scheduler) ebayClient(ctx context.Context) *ebay.Client {
|
||
id := s.cfg.Ebay.ClientID
|
||
secret := s.cfg.Ebay.ClientSecret
|
||
if v, _ := s.store.GetSetting(ctx, "ebay_client_id"); v != "" {
|
||
id = v
|
||
}
|
||
if v, _ := s.store.GetSetting(ctx, "ebay_client_secret"); v != "" {
|
||
secret = v
|
||
}
|
||
s.ebay.EnsureCredentials(id, secret)
|
||
return s.ebay
|
||
}
|
||
|
||
// EbayUsage returns the number of eBay Browse API calls made so far today and
|
||
// the configured daily limit. A limit <= 0 means uncapped. Settings override
|
||
// config.toml for the limit, mirroring how credentials are resolved.
|
||
func (s *Scheduler) EbayUsage(ctx context.Context) (used, limit int) {
|
||
used, _ = s.store.EbayUsageToday(ctx)
|
||
limit = s.cfg.Ebay.DailyCallLimit
|
||
if v, _ := s.store.GetSetting(ctx, "ebay_daily_call_limit"); v != "" {
|
||
if n, err := strconv.Atoi(strings.TrimSpace(v)); err == nil {
|
||
limit = n
|
||
}
|
||
}
|
||
return used, limit
|
||
}
|
||
|
||
// ExecutePlan runs one plan and returns decoded, provider-agnostic results
|
||
// with MatchedQuery already stamped. eBay plans go through the official
|
||
// Browse API; all other plans run an Apify actor. Callers handle per-plan
|
||
// errors without poisoning sibling plans.
|
||
func (s *Scheduler) ExecutePlan(ctx context.Context, p actorPlan) ([]apify.UnifiedResult, error) {
|
||
var decoded []apify.UnifiedResult
|
||
switch p.provider {
|
||
case providerEbay:
|
||
sp, ok := p.input.(ebay.SearchParams)
|
||
if !ok {
|
||
return nil, fmt.Errorf("ebay plan has wrong input type %T", p.input)
|
||
}
|
||
used, limit := s.EbayUsage(ctx)
|
||
if limit > 0 && used >= limit {
|
||
return nil, fmt.Errorf("ebay daily API call limit reached (%d/%d); polling halted until the next reset (midnight US Pacific)", used, limit)
|
||
}
|
||
listings, err := s.ebayClient(ctx).Search(ctx, sp)
|
||
// The call hit eBay (or at least was attempted against it) whether
|
||
// or not it succeeded, so it counts against the daily allowance.
|
||
if n, incErr := s.store.IncrementEbayUsage(ctx); incErr != nil {
|
||
slog.Error("ebay usage increment failed", "err", incErr)
|
||
} else if limit > 0 && n >= limit {
|
||
slog.Warn("ebay daily API call limit reached", "used", n, "limit", limit)
|
||
}
|
||
if err != nil {
|
||
return nil, err
|
||
}
|
||
decoded = make([]apify.UnifiedResult, 0, len(listings))
|
||
for _, l := range listings {
|
||
decoded = append(decoded, apify.UnifiedResult{
|
||
Title: l.Title,
|
||
Price: l.Price,
|
||
Currency: l.Currency,
|
||
URL: l.URL,
|
||
Store: l.Store,
|
||
ImageURL: l.ImageURL,
|
||
Source: apify.SourceActiveEbay,
|
||
})
|
||
}
|
||
default:
|
||
if p.actorID == "" {
|
||
return nil, fmt.Errorf("no actor configured for %s", p.marketplace)
|
||
}
|
||
raw, err := s.apifyClient(ctx).Run(ctx, p.actorID, p.input)
|
||
if err != nil {
|
||
return nil, err
|
||
}
|
||
decoded, _ = apify.Decode(raw, p.source)
|
||
}
|
||
usable := 0
|
||
for i := range decoded {
|
||
decoded[i].MatchedQuery = p.query
|
||
if decoded[i].URL != "" && decoded[i].Price > 0 {
|
||
usable++
|
||
}
|
||
}
|
||
slog.Info("plan executed",
|
||
"provider", p.provider,
|
||
"marketplace", p.marketplace,
|
||
"query", p.query,
|
||
"decoded", len(decoded),
|
||
"usable", usable,
|
||
)
|
||
return decoded, nil
|
||
}
|
||
|
||
func (s *Scheduler) sendNotification(ctx context.Context, it models.Item, r apify.UnifiedResult) error {
|
||
tags := []string{"mag"}
|
||
if it.TargetPrice != nil && r.Price <= *it.TargetPrice {
|
||
tags = []string{"shopping_cart", "tada"}
|
||
}
|
||
priority := it.NtfyPriority
|
||
if priority == "" {
|
||
priority = "default"
|
||
}
|
||
topic := it.NtfyTopic
|
||
if topic == "" {
|
||
if v, _ := s.store.GetSetting(ctx, "ntfy_default_topic"); v != "" {
|
||
topic = v
|
||
} else {
|
||
topic = s.cfg.Ntfy.DefaultTopic
|
||
}
|
||
}
|
||
msg := fmt.Sprintf("%s %s%.2f", r.Store, currencyPrefix(r.Currency), r.Price)
|
||
if it.TargetPrice != nil {
|
||
msg += fmt.Sprintf(" (target: %s%.2f)", currencyPrefix(r.Currency), *it.TargetPrice)
|
||
}
|
||
if r.Title != "" {
|
||
msg += "\n" + r.Title
|
||
}
|
||
baseURL := s.cfg.Ntfy.BaseURL
|
||
if v, _ := s.store.GetSetting(ctx, "ntfy_base_url"); v != "" {
|
||
baseURL = v
|
||
}
|
||
token, _ := s.store.GetSetting(ctx, "ntfy_token")
|
||
client := ntfy.NewWithToken(baseURL, token)
|
||
return client.Send(ctx, ntfy.Notification{
|
||
Topic: topic,
|
||
Title: fmt.Sprintf("Veola Alert: %s", it.Name),
|
||
Message: msg,
|
||
Priority: priority,
|
||
Tags: tags,
|
||
Click: r.URL,
|
||
})
|
||
}
|
||
|
||
func currencyPrefix(c string) string {
|
||
switch c {
|
||
case "USD", "":
|
||
return "$"
|
||
case "GBP":
|
||
return "£"
|
||
case "EUR":
|
||
return "€"
|
||
case "JPY":
|
||
return "¥"
|
||
}
|
||
return c + " "
|
||
}
|
||
|
||
// BuildPreviewInputs returns one actor plan per alias for the first marketplace
|
||
// on the item. Preview deliberately uses only one marketplace to limit actor
|
||
// runs, but exercises every alias so the operator sees the full result set.
|
||
func (s *Scheduler) BuildPreviewInputs(it models.Item) []actorPlan {
|
||
queries := it.SearchQueries()
|
||
if len(queries) == 0 {
|
||
queries = []string{""}
|
||
}
|
||
markets := it.Marketplaces
|
||
if len(markets) > 1 {
|
||
markets = markets[:1]
|
||
}
|
||
var out []actorPlan
|
||
for _, q := range queries {
|
||
out = append(out, s.buildInputsForQuery(it, q, markets)...)
|
||
}
|
||
return out
|
||
}
|
||
|
||
type actorPlan struct {
|
||
marketplace string
|
||
source string
|
||
provider string
|
||
actorID string
|
||
query string
|
||
input any
|
||
}
|
||
|
||
// Marketplace returns the marketplace for this plan.
|
||
func (p actorPlan) Marketplace() string { return p.marketplace }
|
||
|
||
// Source returns the result-source label (used to pick a decoder).
|
||
func (p actorPlan) Source() string { return p.source }
|
||
|
||
// ActorID returns the Apify actor ID this plan will invoke.
|
||
func (p actorPlan) ActorID() string { return p.actorID }
|
||
|
||
// Query returns the alias string this plan searches for. Empty for URL-only items.
|
||
func (p actorPlan) Query() string { return p.query }
|
||
|
||
// Input returns the actor input payload as expected by apify.Client.Run.
|
||
func (p actorPlan) Input() any { return p.input }
|
||
|
||
// Provider returns "apify" or "ebay" — how this plan is executed.
|
||
func (p actorPlan) Provider() string { return p.provider }
|
||
|
||
// buildAllInputs returns one actor plan per (alias × marketplace) for the item.
|
||
// For URL-only items (no aliases), produces one plan per marketplace with an
|
||
// empty query string.
|
||
func (s *Scheduler) buildAllInputs(it models.Item) []actorPlan {
|
||
queries := it.SearchQueries()
|
||
if len(queries) == 0 {
|
||
queries = []string{""}
|
||
}
|
||
markets := it.Marketplaces
|
||
if len(markets) == 0 {
|
||
markets = []string{"ebay.com"}
|
||
}
|
||
var out []actorPlan
|
||
for _, q := range queries {
|
||
out = append(out, s.buildInputsForQuery(it, q, markets)...)
|
||
}
|
||
return out
|
||
}
|
||
|
||
// buildInputsForQuery returns one actor plan per marketplace, all using the
|
||
// same query string. Used by both the scheduler and the preview path.
|
||
func (s *Scheduler) buildInputsForQuery(it models.Item, query string, markets []string) []actorPlan {
|
||
url := strings.ToLower(it.URL)
|
||
plans := make([]actorPlan, 0, len(markets))
|
||
for _, m := range markets {
|
||
mk := strings.ToLower(m)
|
||
switch {
|
||
case strings.Contains(mk, "yahoo") || strings.Contains(url, "yahoo.co.jp"):
|
||
actorID := firstNonEmpty(it.ActorActive, s.cfg.Apify.Actors.YahooAuctionsJP)
|
||
plans = append(plans, actorPlan{
|
||
marketplace: m, source: apify.SourceYahooJP, provider: providerApify,
|
||
actorID: actorID, query: query,
|
||
input: apify.YahooAuctionsJPInput{SearchTerm: query, MaxPages: 1},
|
||
})
|
||
case strings.Contains(mk, "mercari") || strings.Contains(url, "mercari"):
|
||
actorID := firstNonEmpty(it.ActorActive, s.cfg.Apify.Actors.MercariJP)
|
||
plans = append(plans, actorPlan{
|
||
marketplace: m, source: apify.SourceMercariJP, provider: providerApify,
|
||
actorID: actorID, query: query,
|
||
input: apify.MercariJPInput{
|
||
SearchKeywords: []string{query},
|
||
Status: "on_sale",
|
||
MaxResults: 30,
|
||
},
|
||
})
|
||
case ebay.IsEbayMarketplace(mk):
|
||
// eBay marketplaces are polled through eBay's official Browse
|
||
// API, not an Apify scraper actor.
|
||
plans = append(plans, actorPlan{
|
||
marketplace: m, source: apify.SourceActiveEbay, provider: providerEbay,
|
||
query: query,
|
||
input: ebay.SearchParams{
|
||
MarketplaceID: ebay.MarketplaceID(mk),
|
||
Query: query,
|
||
ListingType: it.ListingType,
|
||
Limit: 30,
|
||
},
|
||
})
|
||
default:
|
||
// Non-eBay custom marketplaces still fall back to the Apify
|
||
// active-listings actor.
|
||
actorID := firstNonEmpty(it.ActorActive, s.cfg.Apify.Actors.ActiveListings)
|
||
plans = append(plans, actorPlan{
|
||
marketplace: m, source: apify.SourceActiveEbay, provider: providerApify,
|
||
actorID: actorID, query: query,
|
||
input: apify.ActiveListingInput{
|
||
SearchQueries: []string{query},
|
||
MaxProductsPerSearch: 30,
|
||
MaxSearchPages: 1,
|
||
Sort: "best_match",
|
||
ListingType: mapListingType(it.ListingType),
|
||
ProxyConfiguration: s.proxyConfig(),
|
||
},
|
||
})
|
||
}
|
||
}
|
||
return plans
|
||
}
|
||
|
||
// DedupByURL collapses duplicates within a single result set. When the same
|
||
// listing matches multiple aliases the first occurrence wins, including its
|
||
// MatchedQuery tag.
|
||
func DedupByURL(in []apify.UnifiedResult) []apify.UnifiedResult {
|
||
seen := map[string]bool{}
|
||
out := make([]apify.UnifiedResult, 0, len(in))
|
||
for _, r := range in {
|
||
if r.URL == "" {
|
||
out = append(out, r)
|
||
continue
|
||
}
|
||
key := r.Source + "|" + r.URL
|
||
if seen[key] {
|
||
continue
|
||
}
|
||
seen[key] = true
|
||
out = append(out, r)
|
||
}
|
||
return out
|
||
}
|
||
|
||
// proxyConfig returns the apify proxyConfiguration block built from
|
||
// config.toml. Returns nil — meaning omit the field from actor input
|
||
// entirely — if use_apify_proxy is false. Group and country are ignored when
|
||
// use_apify_proxy is false to prevent contradictory input.
|
||
func (s *Scheduler) proxyConfig() *apify.ProxyConfiguration {
|
||
p := s.cfg.Apify.Proxy
|
||
if !p.UseApifyProxy {
|
||
return nil
|
||
}
|
||
return &apify.ProxyConfiguration{
|
||
UseApifyProxy: true,
|
||
ApifyProxyGroups: p.Groups,
|
||
ApifyProxyCountry: p.Country,
|
||
}
|
||
}
|
||
|
||
// mapListingType translates Veola's listing-type vocabulary ("all", "BIN",
|
||
// "auction") into the automation-lab/ebay-scraper input vocabulary
|
||
// ("all", "buy_it_now", "auction"). Unrecognized values fall through as-is
|
||
// in case the user pasted a value the actor accepts but we don't.
|
||
func mapListingType(s string) string {
|
||
switch strings.ToLower(s) {
|
||
case "", "all":
|
||
return "all"
|
||
case "bin", "buy_it_now":
|
||
return "buy_it_now"
|
||
case "auction":
|
||
return "auction"
|
||
}
|
||
return s
|
||
}
|
||
|
||
func firstNonEmpty(vs ...string) string {
|
||
for _, v := range vs {
|
||
if v != "" {
|
||
return v
|
||
}
|
||
}
|
||
return ""
|
||
}
|
||
|
||
// SeedSoldHistory runs the sold-listings actor and writes price_history rows
|
||
// for an item just added. Errors are logged and swallowed: a missing baseline
|
||
// is not fatal.
|
||
func (s *Scheduler) SeedSoldHistory(ctx context.Context, it models.Item) {
|
||
queries := it.SearchQueries()
|
||
if len(queries) == 0 {
|
||
return
|
||
}
|
||
markets := it.Marketplaces
|
||
if len(markets) == 0 {
|
||
markets = []string{"ebay.com"}
|
||
}
|
||
for _, q := range queries {
|
||
for _, m := range markets {
|
||
s.seedSoldHistoryFor(ctx, it, q, m)
|
||
}
|
||
}
|
||
}
|
||
|
||
func (s *Scheduler) seedSoldHistoryFor(ctx context.Context, it models.Item, query, marketplace string) {
|
||
actorID := firstNonEmpty(it.ActorSold, s.cfg.Apify.Actors.SoldListings)
|
||
source := apify.SourceSoldEbay
|
||
if strings.Contains(strings.ToLower(marketplace), "yahoo") {
|
||
actorID = firstNonEmpty(it.ActorSold, s.cfg.Apify.Actors.YahooAuctionsJPSold)
|
||
source = apify.SourceSoldYahooJP
|
||
}
|
||
if actorID == "" {
|
||
return
|
||
}
|
||
raw, err := s.apifyClient(ctx).Run(ctx, actorID, apify.SoldListingInput{
|
||
Query: query, Marketplace: marketplace, MaxResults: 50, DaysBack: 30,
|
||
ProxyConfiguration: s.proxyConfig(),
|
||
})
|
||
if err != nil {
|
||
slog.Warn("sold history seed failed", "item_id", it.ID, "marketplace", marketplace, "query", query, "err", err)
|
||
return
|
||
}
|
||
for _, r := range raw {
|
||
var sold apify.SoldListingResult
|
||
if err := jsonUnmarshal(r, &sold); err != nil || sold.SoldPrice <= 0 {
|
||
continue
|
||
}
|
||
t, _ := time.Parse(time.RFC3339, sold.SoldAt)
|
||
if t.IsZero() {
|
||
t = time.Now()
|
||
}
|
||
_ = s.store.InsertPricePoint(ctx, &models.PricePoint{
|
||
ItemID: it.ID,
|
||
Price: sold.SoldPrice,
|
||
Store: sourceLabelToStore(source),
|
||
PolledAt: t,
|
||
})
|
||
}
|
||
}
|
||
|
||
func sourceLabelToStore(src string) string {
|
||
switch src {
|
||
case apify.SourceSoldYahooJP:
|
||
return "yahoo-auctions-jp-sold"
|
||
}
|
||
return "ebay-sold"
|
||
}
|