We've been thinking a lot about latency, streaming and (near) real-time analytics lately. At Definite, we deal with a lot of data pipelines. In most cases (e.g. ingesting Stripe data), our customers are fine with batch processing (e.g. every hour). But as we've grown, we've seen more and more need for near real-time pipelines (e.g. ingesting events or CDC from Postgres).
But before I go any further, while writing this, I came across this video and you really need to watch it. I'll see you back here in ~8 minutes.
Ok, quack to business.
DuckDB is amazing for "single player" analytics, but it has a couple serious limitations that prevent it from being used in more traditional data workflows.
This is fine for batch processing, but it's a problem if you want to stream data continuously while running analytics.
There are several ways to tackle the problem, but one approach I really like leverages Arrow Flight. Duck Takes Flight is about 200 lines of Python, and it neatly sidesteps DuckDB's concurrency limitations and shows how to use Flight paired with DuckDB.
Here's what the Flight server looks like:
class DuckDBFlightServer(flight.FlightServerBase):
def __init__(self, location="grpc://localhost:8815", db_path="duck_flight.db"):
super().__init__(location)
self.db_path = db_path
self.conn = duckdb.connect(db_path)
def do_get(self, context, ticket):
query = ticket.ticket.decode("utf-8")
result_table = self.conn.execute(query).fetch_arrow_table()
batches = result_table.to_batches(max_chunksize=1024)
return flight.RecordBatchStream(pa.Table.from_batches(batches))
def do_put(self, context, descriptor, reader, writer):
table = reader.read_all()
table_name = descriptor.path[0].decode('utf-8')
batches = table.to_batches(max_chunksize=1024)
aligned_table = pa.Table.from_batches(batches)
self.conn.register("temp_table", aligned_table)
self.conn.execute(f"INSERT INTO {table_name} SELECT * FROM temp_table")
That's it. The server just sits there accepting data and queries.
The load_data.py
script generates some random data and sends it to our Flight server:
def generate_batch(batch_id):
num_rows = 1_000
data = {
"batch_id": [batch_id] * num_rows,
"timestamp": [datetime.now().isoformat()] * num_rows,
"value": [random.uniform(0, 100) for _ in range(num_rows)],
"category": [random.choice(['A', 'B', 'C', 'D']) for _ in range(num_rows)]
}
return pa.Table.from_pydict(data)
You can run multiple instances of this loader simultaneously. Try starting a few copies and watch them feed data in parallel. In contrast, vanilla DuckDB will throw an error as soon as you attempt to connect a second writer.
And here's how you query it:
def execute_query(client, query):
try:
ticket = flight.Ticket(query.encode("utf-8"))
reader = client.do_get(ticket)
result = reader.read_all()
return result
except Exception as e:
print(f"Query error: {str(e)}")
return None
The magic here is that these queries work while data is being loaded. You can have multiple loaders feeding data in and multiple clients running queries, all at the same time. That's not something DuckDB can do on its own.
If you want to try it:
pip install duckdb pyarrow
python duckdb_flight_server.py
python load_data.py
python query_data.py
That's all it takes. No clusters or dependency hell (you only need duckdb
and pyarrow
). And unlike vanilla DuckDB, you can run as many writers and readers as you want.
If you're building something that needs to move data around quickly and query it on the fly - especially if you need concurrent access that DuckDB doesn't provide - this approach is worth exploring. It might be all you need. The full code is available on GitHub.
And if you're looking for a complete analytics solution that handles everything from ETL to warehousing to BI, check out what we're building at Definite. We're taking this same philosophy - making powerful data tools simple and accessible - and applying it to the entire analytics stack.
Get the new standard in analytics. Sign up below or get in touch and we’ll set you up in under 30 minutes.