I was attending a DuckDB meetup in Leuven, where a presenter was detailing how they replaced Spark with DuckDB to cut costs, when rather abruptly, someone in the audience asked, “How do I know if I should really use DuckDB now, me being not a data engineer? What if my query is heavier than expected? What if my data fluctuates and just sometimes goes beyond a single machine?”. The presenter admitted that you set up the architecture once, do your best benchmarking, and hope for the data dynamics to stay the same.

I was intrigued, but not really convinced, yet.

Catalog is the bridge

Next day after the event, I dove into the idea that the query engine should be automatically selected based on the query itself. Having some scars from hefty BigQuery bills, I built a SQL executor that switched between DuckDB and BigQuery. The challenge though was that I couldn’t use the same query with both engines due to the lack of shared catalog support.

Fast forward a year, and we now have Unity Catalog support for DuckDB and Polars — yes, the same catalog well supported by distributed engines like Spark and platforms like Databricks. Since we only care about read-only access, I think we can go pretty far with this.

I quickly threw together a prototype — let’s see it in action:

1) Launch the Unity catalog:

$ git clone https://github.com/unitycatalog/unitycatalog.git unity-catalog 
$ cd unity-catalog
$ bin/start-uc-server

should start it at :8080.

2) Load a decent chunk Clickbench data into UC:

spark.read.parquet("data/hits.parquet") \
    .limit(5_000_000) \
    .write \
    .format("delta") \
    .option("overwriteSchema", "true") \
    .mode("overwrite") \
    .option('path', '<abs_path>/hits_delta') \
    .saveAsTable("default.hits")

that’s all it takes to expose a Unity Catalog table backed by a Delta table (files) to other query engines!

3) Pull-in frugal-engine:

$ pip install git+https://github.com/astronautas/frugal-engine

4) Set-up the executor:

import logging
from frugal_engine.main import setup

logging.basicConfig(level=logging.INFO)
setup()

This sets up the hybrid engine with a single-node executor and a cluster executor as a fallback.

Now, let’s run a few experiments 🧪. We’ll try a simple select first:

# %%sequel deliberately, not to interfere with %%sql
%%sequel 
SELECT * 
FROM unity.default.hits LIMIT 10

You should see in the logs that it picked a single node executor since the data size is relatively small:

INFO:frugal_engine.main:single_node_executor has been chosen, because the top source count is less than or equal to the threshold: 6000000
INFO:frugal_engine.main:Executing query in single_node_executor: SELECT * 
FROM unity.default.hits LIMIT 10

Let’s assume our machine is tiny and lower the threshold for single-node workflows:

from frugal_engine.main import setup
setup(threshold=100_000)

%%sequel 
SELECT * 
FROM unity.default.hits LIMIT 10

INFO:frugal_engine.main:Executing query in cluster_executor: SELECT * 
FROM unity.default.hits LIMIT 10

From the logs, we see it falls back to a cluster engine since it “thinks” a single machine won’t process that many rows in a reasonable time. It should also fall back if the single-node executor reaches its memory limits—or even proactively, using ML to forecast that it won’t finish in a reasonable time.

Identical SQL, totally different engines! Convenient-ish to use - check. What about the promise of improved performance / reduced costs?

Quantifying the cost savings

We can’t draw firm conclusions on costs yet, as proper testing with a Spark cluster, Delta access from S3, load full Clickbench dataset - it’s a complex evaluation. I’ll tackle this in the next chapter.

With usability and (some) cost aspects explored, let’s examine what makes the package tick.

DuckDB UC - not quite there yet

I initially tried to use DuckDB for the single-node executor, but… it didn’t go well.

I was so hyped by many talks about how DuckDB should at least have read-only support for Unity Catalog. It works fine for pre-loaded OSS UC tables:

import duckdb
import logging

duckdb.sql("""
	install uc_catalog from core_nightly;
	load uc_catalog;
	install delta;
	load delta;
""")

duckdb.sql(f"""
CREATE SECRET (
	TYPE UC,
	TOKEN 'not-used',
	ENDPOINT 'http://127.0.0.1:8080',
	AWS_REGION 'us-east-2'
);
""")

duckdb.sql("ATTACH 'unity' AS unity (TYPE UC_CATALOG);")


logging.basicConfig(level=logging.DEBUG)
duckdb.sql(f"select * from unity.default.marksheet limit 10;")

But as soon as you commit something to UC via another engine, like Spark, DuckDB halts:

IOException Traceback (most recent call last) Cell In[4], line 4 1 import logging 3 logging.basicConfig(level=logging.DEBUG) ----> 4 duckdb.sql(f"select * from unity.default.hits_2 limit 10;") 

IOException: IO Error: Invalid field found while parsing field: type_precision

CatalogException Traceback (most recent call last) Cell In[5], line 4 1 import logging 3 logging.basicConfig(level=logging.DEBUG) ----> 4 duckdb.sql(f"select * from unity.default.hits_2 limit 10;") 

CatalogException: Catalog Error: Table with name hits_2 does not exist! Did you mean "information_schema.views"?

I spent hours trying to figure it out. Fair enough, the extension is tagged as experimental, and there’s a related GitHub issue, but I expected clearer error messages. 👎

Polars to the rescue

Polars, on the other hand, works flawlessly! Unlike DuckDB, you can’t just attach a catalog—there’s no dedicated Polars client or anything like polars.sql yet. To achieve this, we’ll create a wrapper around Polars UC:

class PolarsClient:
    """
    Just an interface similar to Spark and DuckDB i.e. exposing sql, but going through UC catalog
    """
    def __init__(self, unity_catalog_uri: str, memory_limit: int = None):
        self.unity_catalog_uri = unity_catalog_uri
        self.catalog = pl.Catalog(unity_catalog_uri, require_https=False) # last flag - for local testing
        self.memory_limit = None

    def sql(self, query: str) -> pl.DataFrame:
        tables = [table for table in sqlglot.parse_one(query).find_all(sqlglot.exp.Table)]

        assert len(tables) == 1, "Only one table is supported as a source for now"

        df_scanned = [self.catalog.scan_table(str(table.catalog), str(table.db), str(table.this)) for table in tables]
        df_scanned = df_scanned[0]

        var_name = next(name for name, val in locals().items() if val is df_scanned)

        query_for_pl = sqlglot.parse_one(query).from_(var_name).sql()
        return pl.sql(query_for_pl).collect()

See what we did here? We used sqlglot to retrieve the source tables, pre-load them into LazyFrame, and replace references in the original SQL query. Nasty, but it gets the job done!

def setup(unity_catalog_uri: str = "http://localhost:8080", threshold: int = 6_000_000):
    spark = _build_local_spark_client(unity_catalog_uri)
    polars_client = PolarsClient(unity_catalog_uri)
    client = HybridClient(spark_client=spark, polars_client=polars_client, threshold=threshold)

    # non-invasive way to add a magic function
    get_ipython().register_magic_function(lambda line, cell=None: client.sql(line or cell),
        magic_kind="line_cell", magic_name="sequel")

setup isn’t that interesting—it just introduces a new magic function, sequel, that delegates queries to HybridClient. The real interesting part is engine the decision function.

Using query information to select the best engine

    def _count_heuristic(self, query: str) -> Choice:
        tables = [table for table in parse_one(query).find_all(exp.Table)]
        counts = [self.polars_client.sql(f"select count(*) from {table.catalog}.{table.db}.{table.name}").to_series()[0] for table in tables]
        top_count = max(counts)

        logger.debug(f"Top source count: {top_count}")
        
        if top_count <= self.threshold:
            logger.info(f"{Choice.POLARS.value} has been chosen, because the top source count is less than or equal to the threshold: {self.threshold}")
            return Choice.POLARS
        else:
            logger.info(f"{Choice.SPARK.value} has been chosen, because the top source count is greater than the threshold: {self.threshold}")
            return Choice.SPARK

Essentially, we select the most appropriate engine for the query based on source table statistic - the row count - and then delegate. It’s a simple heuristic, but it illustrates the point.

    choice = self._count_heuristic(query)
    
    if choice == Choice.POLARS:
        try:
            logger.info(f"Executing query in {Choice.POLARS.value}: {query}")
            # polars - unsupported dialect, duckdb might be closest?
            query = sqlglot.transpile(sql=query, write="duckdb")[0]
            return self.polars_client.sql(query).to_pandas()
        # fixme - too generic exception
        except Exception as e:
            logger.error(f"Error: {e}, fallbacking to {Choice.SPARK.value}")                
            query = sqlglot.transpile(sql=query, write="spark")[0]

        return self.spark_client.sql(query).toPandas()
    else:
        logger.info(f"Executing query in {Choice.SPARK.value}: {query}")
        return self.spark_client.sql(query).toPandas()

We use sqlglot to transpile between dialects, but unfortunately Polars SQL is not a supported dialect yet. Finally, if you look closely, we fallback to the default distributed Spark executor if we encouter an unforseen issue - e.g. unsupported SQL operation, memory issues, etc.

Next Step – Databricks

Hope you at least got intrigued by the idea that it’s possible to balance the benefits of single and cluster type query engines without changing analyst workflows. Next, when I have time, I’ll extend this to Databricks and test cost savings in a more realistic production setting - think Spark cluster, Delta tables in S3, annoying authentication issues, etc.

Stay tuned!