Building an IoT Platform Using Modern Data Stack — Part 2 (FastAPI + Streamlit)

Niño Francisco Liwa
Datamatiks
Published in
6 min readOct 30, 2023

--

Accessing data using FastAPI and Streamlit

After setting up our infrastructure and data pipeline using DuckDB and Prefect in Part 1 of this series of articles, we can now proceed to accessing and consuming our sensor data using a modern data stack API framework and visualization tool.

For our API we will be using FastAPI — a modern Python framework for developing API. For the visualization, we will learn how to use and run Streamlit — a modern open-source dashboard and visualization tool for building data apps, also using Python.

A. Building the Data API

Like any other Python package, we first need to install FastAPI using pip or any package manager of your choice.

pip install fastapi
pip install "uvicorn[standard]"

A sample FastAPI app is something like below. You can see more at the FastAPI documentation.

from typing import Union

from fastapi import FastAPI

app = FastAPI()


@app.get("/")
def read_root():
return {"Hello": "World"}


@app.get("/items/{item_id}")
def read_item(item_id: int, q: Union[str, None] = None):
return {"item_id": item_id, "q": q}

To execute this, save the code above in the file name main.py then execute the command below

uvicorn main:app --reload

INFO: Uvicorn running on http://127.0.0.1:8000 (Press CTRL+C to quit)
INFO: Started reloader process [28720]
INFO: Started server process [28722]
INFO: Waiting for application startup.
INFO: Application startup complete.

With that out of the way, we can follow a similar pattern to create a basic API using FastAPI. Below is the basic API I have created to access data in a DuckDB database.

#
# Main.py
#

from typing import Union
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
import io

from .data import Database

app = FastAPI()
db = Database()

@app.get("/")
def read_root():
return {"Hello": "World"}


@app.get("/items/{item_id}")
def read_item(item_id: int, q: Union[str, None] = None):
return {"item_id": item_id, "q": q}

@app.get("/api/v1/daily_avg", response_class=StreamingResponse)
def get_daily_avg():
avg_df = db.get_daily_avg()
avg_df = avg_df.reset_index()

return _to_csv(avg_df,'daily_avg')

@app.get("/api/v1/daily_max", response_class=StreamingResponse)
def get_daily_max():
max_df = db.get_daily_max()
max_df = max_df.reset_index()

return _to_csv(max_df,'daily_max')

@app.get("/api/v1/daily_min" ,response_class=StreamingResponse)
def get_daily_min():
min_df = db.get_daily_min()
min_df = min_df.reset_index()

return _to_csv(min_df,'daily_min')


@app.get("/api/v1/metadata", response_class=StreamingResponse)
def get_metadata():
mdf = db.get_gauge_metadata()
return _to_csv(mdf,'metadata')

@app.get("/api/v1/sensor_metrics", response_class=StreamingResponse)
def get_sensor_metrics():
print("api/v1/sensor_metrics")
gdf = db.get_gauge_metrics()
gdf = gdf.reset_index()

return _to_csv(gdf,'metrics')


def _to_csv(df, fname):
stream = io.StringIO()
df.to_csv(stream, index=False)
response = StreamingResponse(
iter([stream.getvalue()]), media_type="text/csv")
response.headers["Content-Disposition"] = f"attachment; filename={fname}.csv"

return response

I also created another file to represent our DuckDB database.

#
# data.py
#
import duckdb

def get_conn(db=':memory:',is_shared=False):
return duckdb.connect(database=db, read_only=is_shared)

class Database():

def __init__(self):
self.conn = get_conn('db/datahack.duckdb')
self._metrics_df = self.__init_metrics_df(self.conn)
self._attributes = self.__get_attributes()

def __init_metrics_df(self, conn):
metrics_df = conn.sql('SELECT * EXCLUDE(measured_day,measured_hour,measured_minute) FROM ingest_gauge_metrics').df()
metrics_df = metrics_df.set_index("measured_datetime")
metrics_df = metrics_df.astype('Float64')

conn.close()
return metrics_df

def __get_attributes(self):
conn = get_conn('db/datahack.duckdb')
attrs_df = conn.sql('SELECT * FROM gauge_sensors_metadata').df()
conn.close()
return attrs_df #.to_csv(index=False)

@property
def metrics_df(self):
return self._metrics_df

@property
def attributes(self):
return self._attributes

def get_gauge_metrics(self):
print('Get metrics')
return self.metrics_df

def get_gauge_metadata(self):
print('Get metadata')
return self.attributes

def get_daily_avg(self):
print('Daily Avg')
daily_mean = self.metrics_df.resample("D").mean()
print(daily_mean)

return daily_mean

def get_daily_max(self):
print('Daily Max')
daily_max = self.metrics_df.resample("D").max()
print(daily_max)

return daily_max

def get_daily_min(self):
print('Daily Min')
daily_min = self.metrics_df.resample("D").min()
print(daily_min)

return daily_min

To test the API, we simply run this command

uvicorn src.api.main:app --host 0.0.0.0 --proxy-headers --reload

Now, going to this URL http://0.0.0.0:8000/docs will show us the documentation as we will as a testing option as shown below:

To test the metadata endpoint, we just click on the name provided like below and click execute.

B. Displaying the data using Streamlit

Finally, once our API is up, we can invoke them inside a Streamlit app and build some cool visualization.

To be able to run the Streamlit app, we first need to install it like any other Python package:

pip install streamlit

#testing command line
streamlit hello

#testing with a file
streamlit run hello.py

To learn more about Streamlit you can check out its documentation for advanced visualization.

Below is a simple dashboard that I forked from the DataProfessor GitHub repo and then modified for my use case.

#weatherboard_app.py

import streamlit as st
import pandas as pd
import plost
import altair as alt
import math

from dotenv import load_dotenv
from urllib.request import urlopen
import json
import folium
from streamlit_folium import st_folium, folium_static
from streamlit_plotly_events import plotly_events
import plotly.figure_factory as ff
import plotly.express as px
import numpy as np
from datetime import datetime

load_dotenv()

#API_URL = 'http://0.0.0.0:8000'
API_URL = 'http://datahack-api-1:8000/api/v1'
st.set_page_config(layout='wide', initial_sidebar_state='expanded')

dt_now = datetime.now()
dt_day = dt_now.strftime('%A')
_,colT2 = st.columns([3,7])
with colT2:
st.title('Brisbane Weatherboard')
st.write(f"### {dt_day}, {dt_now}")
with open('style.css') as f:
st.markdown(f'<style>{f.read()}</style>', unsafe_allow_html=True)

st.sidebar.header('Weatherboard `v.1.0`')

st.sidebar.subheader('Map parameter')
time_hist_color = st.sidebar.selectbox('Display by sensor', ('Rainfall', 'Stream'))

st.sidebar.subheader('Heat map parameter')
time_hist_color = st.sidebar.selectbox('Color by', ('min_value', 'max_value'))

#st.sidebar.subheader('Donut chart parameter')
#donut_theta = st.sidebar.selectbox('Select data', ('Active', 'Inactive'))

st.sidebar.subheader('Line chart parameters')
plot_data = st.sidebar.multiselect('Select data', ['avg_value', 'max_value','min_value'], ['avg_value', 'min_value','max_value'])
plot_height = st.sidebar.slider('Specify plot height', 0,2,1)

st.sidebar.markdown('''
---
Created with ❤️ by [Data Professor](https://youtube.com/dataprofessor/).\n\n

Modified with passion by Datamatiks.
''')

# Metrics row
st.markdown('### Weather Metrics')
col1, col2, col3 = st.columns(3)

#Retrive other weather data from public api
response = urlopen("https://wttr.in/Brisbane?format=j1")
json_data = response.read().decode('utf-8', 'replace')
d = json.loads(json_data)

df_brissy = pd.json_normalize(d['current_condition'])
temp = df_brissy['temp_C'].values[0]
wind = df_brissy['windspeedKmph'].values[0]
humid = df_brissy['humidity'].values[0]
col1.metric("Temperature", f"{temp} °C", "1.2 °C")
col2.metric("Wind", f"{wind} kph", "-8%")
col3.metric("Humidity", f"{humid}%", "4%")

#retrive metadata
metadata = pd.read_csv(
f'{API_URL}/metadata',
header=0,
names=['sensor_id',
'location_id',
'location_name',
'sensor_type',
'unit',
'latitude',
'longitude']
)

#retrieve daily metrics
sensors_df = pd.read_csv(f'{API_URL}/sensor_metrics',header=0)
avg_df = pd.read_csv(f'{API_URL}/daily_avg',header=0)
max_df = pd.read_csv(f'{API_URL}/daily_max',header=0)
min_df = pd.read_csv(f'{API_URL}/daily_min',header=0)

new_avg_df = avg_df.melt( id_vars=["measured_datetime"],
var_name="sensor_id",
value_name="avg_value")

new_max_df = max_df.melt( id_vars=["measured_datetime"],
var_name="sensor_id",
value_name="max_value")

new_min_df = min_df.melt( id_vars=["measured_datetime"],
var_name="sensor_id",
value_name="min_value")

#merge all metrics
merge_keys=['sensor_id','measured_datetime']
daily_metrics_df = new_avg_df.merge(new_max_df, on=merge_keys).merge(new_min_df, on=merge_keys).set_index('sensor_id')

#enrich metrics with metadata
joined_df = daily_metrics_df.join(metadata.set_index('sensor_id')).reset_index()


date_now_str = str(dt_now.date())
df_filtered = joined_df.loc[(joined_df['measured_datetime'] == date_now_str)]
df_active = df_filtered[df_filtered['avg_value']>0].groupby('sensor_id').apply(len)
df_inactive = df_filtered[df_filtered['avg_value']==0].groupby('sensor_id').apply(len)
df_unknown = df_filtered[df_filtered['avg_value'].isnull() ].groupby('sensor_id').apply(len)

gauge_status_df = pd.DataFrame(
{'status':['Active','Inactive','Unknown'],
'count': [len(df_active.index),len(df_inactive.index),len(df_unknown.index)]
}
)

### Map plotly version
st.markdown('### Rainfall and Stream AHD Height Sensors Map')
px.set_mapbox_access_token(os.environ['MAPBOX_TOKEN'] )
df = metadata.__deepcopy__()

df = joined_df.fillna(0)
fig = px.scatter_mapbox(df, lat="latitude", lon="longitude", hover_name="location_name", hover_data=["sensor_type", "min_value"], color="avg_value", size="max_value",
color_continuous_scale=px.colors.cyclical.IceFire, size_max=50, zoom=10, width=1500, height=800)
st.plotly_chart(fig)

#Heatmap
c1, c2 = st.columns((7,3))
with c1:
st.markdown('### Heatmap')
plost.time_hist(
data=joined_df,
date='measured_datetime',
x_unit='hours',
y_unit='day',
color=time_hist_color,
aggregate='median',
legend=None,
height=345,
use_container_width=True)
with c2:
st.markdown('### Sensors status')
plost.donut_chart(
data=gauge_status_df,
theta='count',
color='status',
legend='bottom',
use_container_width=True)

# Row C
st.markdown('### Line chart')
grouped = joined_df.groupby('sensor_type')
for key, group in grouped:
st.markdown(f"*** {key}")
#st.write(group)
df = group.set_index('measured_datetime')
df = group.astype({'avg_value': 'float','max_value': 'float','min_value': 'float'})
plost.line_chart(df,
x = 'measured_datetime' ,
y = ('min_value','max_value','avg_value'),
color='sensor_id',
height=plot_height,
#pan_zoom='minimap'
)

st.write(joined_df.reset_index().sort_values(by='measured_datetime', ascending=False))

Running the app like this:

streamlit run src/web/weatherboard_app.py

It will then launch this dashboard.

There we have it, we have access to our DuckDB sensor data using FastAPI and then display the data in a Streamlit dashboard. I added some weather information from another API for weather called https://wttr.in/.

Finally, in the last part of this article, we will discuss how we can package all of these technologies together using docker/docker-compose and then deploy them to the Azure cloud. So, watch this space for it.

Happy reading!

UPDATE: GitHub repo: https://github.com/datamatiks/iot-platform-modern-data-stack-poc

--

--

Niño Francisco Liwa
Datamatiks

I am a Data Engineer with an interest in building software products, IoT, startups, crypto and entrepreneurship.