Python工具箱系列(三十三)
Timescaledb
在物联网时代,出现了大量以时间为中心海量产生的传感器数据,称为时序数据。这类数据的特点是:
-
数据记录总有一个时间戳。
-
数据几乎总是追加,不更新也不删除。
-
大量使用近期的数据。很少更新或者回填时间间隔的缺失数据。
-
与时间间隔频率关系不大。但累积的数据量大,可能会有峰值。
-
对这类数据有多种聚合查询的需求,并且越快越好。例如,截止到目前为止,最大值/最小值/平均值是多少,数据流速是多少等。
为此,IT界兴起了时序数据库。TimeScaleDB是其中的佼佼者,截止到2022年7月,它的排名在第5名,值得使用。由于TimeScaleDB是postgresql的一个插件,因此非常便于安装与使用。同时,它也是一个开源的时间序列数据库,为快速获取和复杂查询进行了优化。此外,它也是多模型设计,在体现与时序数据相关的特性外,它执行的是“完整的SQL”,程序员很容易使用与管理它。
它的安装不复杂。使用以下命令在ubuntu bionic下安装单机版本。
apt install -y gnupg postgresql-common apt-transport-https lsb-release wget /usr/share/postgresql-common/pgdg/apt.postgresql.org.sh echo "deb https://packagecloud.io/timescale/timescaledb/ubuntu/ $(lsb_release -c -s) main" > /etc/apt/sources.list.d/timescaledb.list wget --quiet -O - https://packagecloud.io/timescale/timescaledb/gpgkey | apt-key add - apt update apt install -y timescaledb-2-postgresql-14 # 做一个调整 timescaledb-tune --quiet --yes # 重新启动数据库服务 systemctl restart postgresql # 以postgres用户启动命令行 su postgres -c psql # 在psql命令行环境中。输入以下命令,从而关联到timescaledb这个扩展上。 CREATE database sensordb; \c sensordb CREATE EXTENSION IF NOT EXISTS timescaledb; \q # 再次连接 su postgres -c 'psql -d sensordb' # 显示扩展列表(extensions) \dx
一、创建时序相关得表
在sensordb下创建测试用的表,这个创建的过程有些特殊。相关命令如下:
# 传感器表,传统的表 CREATE TABLE sensors( id SERIAL PRIMARY KEY, type VARCHAR(50), location VARCHAR(50) ); # 传感器数据库,这个将转换成为超表 CREATE TABLE sensor_data ( time TIMESTAMP NOT NULL, sensor_id INTEGER, pm25 DOUBLE PRECISION, temperature DOUBLE PRECISION, FOREIGN KEY (sensor_id) REFERENCES sensors (id) ); # CREATE EXTENSION IF NOT EXISTS timescaledb; # 转换为超表 SELECT create_hypertable('sensor_data', 'time'); # 生成4个传感器 INSERT INTO sensors (type, location) VALUES ('a','地板'), ('a', '天花板'), ('b','地板'), ('b', '天花板'); # 测试一下。 select * from sensors;
接下来,使用python连接时序数据库,并且模拟相关的数据插入到表中。
二、使用Python模拟数据
import psycopg2 import random import datetime # 事先创建后数据库demodb demodb = psycopg2.connect(database="sensordb", user="postgres", password="88488848", host="172.17.2.151", port="5432") democur = demodb.cursor() currenttime = datetime.datetime.now() # 插入模拟出来的数据。 for _ in range(100000): currenttime = currenttime+datetime.timedelta(seconds=1) for id in range(1,5,1): pm25 = random.uniform(0, 300) temp = random.uniform(0, 40) insertsql = f'''insert into sensor_data(sensor_id,pm25,temperature,time) values({id},{pm25},{temp},'{currenttime}')''' democur.execute(insertsql) demodb.commit() democur.close() demodb.close()
这里插入10万秒的数据,相当于100000/86400=1.15(天)的数据。在插入数据的同时,就可以同时在数据库中进行按30分钟的分桶查询,这是时序数据库的一个特殊功能。
# su postgres -c 'psql -d sensordb' SELECT time_bucket('30 minutes', time) AS period, AVG(temperature) AS avg_temp, last(temperature, time) AS last_temp, AVG(pm25) AS avg_pm25 FROM sensor_data GROUP BY period; SELECT time_bucket('60 minutes', time) AS period, AVG(temperature) AS avg_temp, last(temperature, time) AS last_temp, AVG(pm25) AS avg_pm25 FROM sensor_data GROUP BY period;
此时,按30分钟时间窗口聚会的数据查询效果如下图所示:
可以看出,TimeScaleDB已经将数据按30分钟来聚合分析。当然,改成任意时间也是可以的,例如,可以改成5秒分析一次也可以,生成结果的时间也非常快。