Documentation Index
Fetch the complete documentation index at: https://mintlify.com/cowprotocol/solver-rewards/llms.txt
Use this file to discover all available pages before exploring further.
The solver rewards pipeline draws on two primary data sources: Dune Analytics for on-chain block interval resolution and PostgreSQL analytics databases for pre-computed solver performance metrics. A third source, CoinPaprika, supplies the token exchange rates needed to convert native-token reward caps into COW amounts.
Dune Analytics
PostgreSQL analytics DB
Price data
Dune Analytics
The DuneFetcher class (src/fetch/dune.py) wraps the dune-client library and is the sole interface to Dune queries. It holds a DuneClient instance, the target blockchain string, and an AccountingPeriod, and resolves block numbers for that period at construction time.class DuneFetcher: # pylint: disable=too-few-public-methods
"""
Class Contains DuneAPI Instance and Accounting Period along with several get methods
for various Dune Queries.
"""
dune: DuneClient
period: AccountingPeriod
blockchain: str
def __init__(
self,
dune: DuneClient,
blockchain: str,
period: AccountingPeriod,
):
self.dune = dune
self.blockchain = blockchain
self.period = period
self.start_block, self.end_block = self.get_block_interval()
Block interval resolution
get_block_interval() is called during __init__ and maps the accounting period’s start and end dates to on-chain block numbers. It executes the PERIOD_BLOCK_INTERVAL Dune query (query ID 3333356) with both the network’s blockchain name and the period date parameters, and asserts that exactly one row is returned.def get_block_interval(self) -> tuple[str, str]:
"""Returns block numbers corresponding to date interval"""
results = self._get_query_results(
self._parameterized_query(
QUERIES["PERIOD_BLOCK_INTERVAL"], self._network_and_period_params()
)
)
assert len(results) == 1, "Block Interval Query should return only 1 result!"
return str(results[0]["start_block"]), str(results[0]["end_block"])
Query execution
All Dune queries are routed through _get_query_results(). When no cached execution ID (job_id) is supplied, the method calls dune.refresh(query, ping_frequency=15), which submits the query and polls every 15 seconds until results are available. The execution ID is logged for auditing.def _get_query_results(
self, query: QueryBase, job_id: Optional[str] = None
) -> list[dict[str, str]]:
"""Internally every dune query execution is routed through here."""
log.info(f"Fetching {query.name} from query: {query}")
if not job_id:
exec_result = self.dune.refresh(query, ping_frequency=15)
else:
exec_result = self.dune.get_result(job_id)
log.info(f"Fetch completed for execution {exec_result.execution_id}")
log_saver.print(
f"{query.name} execution ID: {exec_result.execution_id}", Category.EXECUTION
)
return exec_result.get_rows()
Query parameters
Two helpers build the parameter lists passed to each query:
_period_params() — returns the accounting period’s start and end dates as QueryParameter objects.
_network_and_period_params() — appends a blockchain text parameter (e.g. "ethereum", "gnosis") to the period params.
Registered queries
All queries are defined in src/queries.py in the QUERIES dict:| Key | Name | Dune query ID |
|---|
PERIOD_BLOCK_INTERVAL | Block Interval for Accounting Period | 3333356 |
DASHBOARD_SLIPPAGE | Period Solver Rewards | 2510345 |
Set DUNE_API_KEY in your environment before running any pipeline step. The key is read from the environment by DuneConfig.from_network().
PostgreSQL analytics DB
The MultiInstanceDBFetcher class (src/pg_client.py) queries a CoW Protocol analytics database that stores pre-aggregated, dbt-transformed solver metrics. Rather than querying a single database, it fans out to both the prod and staging environments in one call and concatenates the results into a single DataFrame.Connection pattern
Each environment gets its own SQLAlchemy engine with TCP keepalive settings to handle long-running queries:for environment in ["prod", "staging"]:
pg_engine = create_engine(
f"postgresql+psycopg2://{db_url}/{environment}_{network}",
pool_pre_ping=True,
connect_args={
"keepalives": 1,
"keepalives_idle": 30,
"keepalives_interval": 10,
"keepalives_count": 5,
},
)
The database name is constructed as {environment}_{network_db_name} — for example, prod_mainnet or staging_xdai. The connection string is read from the ANALYTICS_DB_URL environment variable.Available fetch methods
get_data_per_solver()
Queries the dbt.fct_data_per_solver_and_accounting_period table for solver performance data within the given accounting period. Address columns (solver, pool_address, reward_target) are stored as bytearrays in the database and are converted to 0x-prefixed hex strings by bytearray2hex().def get_data_per_solver(
self, accounting_period: AccountingPeriod, config: AccountingConfig
) -> DataFrame:
results = self.get_analytics_db_table_prod_and_barn(
"fct_data_per_solver_and_accounting_period", accounting_period, config
)
hex_columns = ["solver", "pool_address", "reward_target"]
for column in hex_columns:
results[column] = results[column].apply(bytearray2hex)
return results
get_partner_and_protocol_fees()
Queries the dbt.fct_partner_and_protocol_fees table for partner and protocol fee data. The partner_fee_recipient column is converted from bytearray to hex.def get_partner_and_protocol_fees(
self, accounting_period: AccountingPeriod, config: AccountingConfig
) -> DataFrame:
results = self.get_analytics_db_table_prod_and_barn(
"fct_partner_and_protocol_fees", accounting_period, config
)
hex_columns = ["partner_fee_recipient"]
for column in hex_columns:
results[column] = results[column].apply(bytearray2hex)
return results
Address conversion
bytearray2hex() converts PostgreSQL bytea columns to Ethereum-style hex addresses:def bytearray2hex(address: bytearray) -> str | None:
return "0x" + address.hex() if address else None
The inverse, pg_hex2bytea(), converts 0x-prefixed hex addresses to the \x-prefixed format that PostgreSQL expects in query literals:def pg_hex2bytea(hex_address: str) -> str:
return hex_address.replace("0x", "\\x")
Query construction
Both fetch methods delegate to get_analytics_db_table_prod_and_barn(), which builds a simple filter query on the accounting_period column:SELECT * FROM dbt.<table_name>
WHERE accounting_period = '<start_time> - <end_time>'
The period string is formatted as YYYY-MM-DD HH:MM:SS - YYYY-MM-DD HH:MM:SS.Solvers that exist in both prod and staging will appear as duplicate rows in the concatenated result. The pipeline logs a warning for any such duplicates and merges them downstream.
Price data
Exchange rates for converting between native tokens and COW are fetched from CoinPaprika’s free tier API (src/fetch/prices.py). Results are cached per (token, day) pair using functools.cache to avoid redundant API calls within a single pipeline run.Supported tokens
class TokenId(Enum):
"""Coin Ids for coin paprika"""
ETH = "eth-ethereum"
XDAI = "dai-dai"
COW = "cow-cow-protocol-token"
USDC = "usdc-usd-coin"
AVAX = "avax-avalanche"
POL = "pol-polygon-ecosystem-token"
GHO = "gho-gho"
BNB = "bnb-bnb"
XPL = "xpl-plasma"
Exchange rate calculation
exchange_rate_atoms() converts a price ratio between any two tokens by fetching their USD prices on a given day and dividing:def exchange_rate_atoms(
token_1_address: Address, token_2_address: Address, day: datetime
) -> Fraction:
token_1 = TOKEN_ADDRESS_TO_ID[token_1_address]
token_2 = TOKEN_ADDRESS_TO_ID[token_2_address]
price_1 = Fraction(usd_price(token_1, day)) / Fraction(10 ** token_1.decimals())
price_2 = Fraction(usd_price(token_2, day)) / Fraction(10 ** token_2.decimals())
return price_1 / price_2
The convention is: x atoms of token_1 have the same value as x * r atoms of token_2, where r is the returned exchange rate.CoinPaprika’s free tier only provides hourly historical data for the last 24 hours. The pipeline fetches daily prices with interval="1d" and asserts that exactly one row is returned for the requested date.
Data flow
The following describes how data flows through the pipeline from raw sources to payout transfers:
- Block resolution —
DuneFetcher runs the PERIOD_BLOCK_INTERVAL query on Dune to map accounting period dates to block numbers for the target network.
- Solver metrics —
MultiInstanceDBFetcher queries fct_data_per_solver_and_accounting_period across prod and staging databases, returning per-solver reward data pre-filtered to the accounting period.
- Fee data —
MultiInstanceDBFetcher queries fct_partner_and_protocol_fees for partner and protocol fee breakdowns.
- Price conversion —
exchange_rate_atoms() fetches COW and native token USD prices from CoinPaprika to apply quote reward caps.
- Payout computation —
construct_payouts() combines the above inputs to produce Transfer objects split into COW and native token buckets.