原文地址:Using SingleStore as a Time Series Database
原文作者:Akmal Chaudhri
通过使用Kaggle的S&P 500stock data(释义见下面划线部分),来探索SingleStore对时间序列数据库的支持。我们将构建一个快速仪表板来使candlestick charts(释义见下面划线部分)可视化。
S&P 500stock data:S&P是标准普尔公司的英文简写,标准普尔为投资者提供信用评级、独立分析研究、投资咨询等服务。这个数据集就是来自标准普尔500指数的500家样本股从2013年2月到2018年2月五年的部分股票信息,分为所有公司的股票数据和分开的个体股票数据,都包含7个字段,分别为data(日期),open(开盘价),high(最高),low(最低),close(收盘价),volume(成交量),name(股票名)。详细概念可参考此链接:S&P 500 stock data
candlestick charts:蜡烛图,通常叫K线图,股市及期货市场中的一种图表。
摘要
SingleStore是一款多功能的数据库系统。它基于相关技术,支持多种模式,如键值、JSON、全文搜索、地理空间和时间序列。
本文将通过Kaggle中的S&P500股票数据集来探讨SingleStore对时间序列数据的支持。我们还将构建一个快速仪表板,以使用Streamlit可视化蜡烛图。
本文中使用的SQL脚本、Python代码和笔记本文件可在GitHub上找到。笔记本文件在DBC、HTML和iPython格式中是可用的。
介绍
自从关系型数据库技术出现以来,许多管理数据的新要求不断涌现。Martin Fowler等知名人士提出了Polyglot Persistence(多语言持久化)作为管理各种数据和数据处理要求的一种解决方案,如图1所示。
图1 Polyglot Persistence.
复制
然而,Polyglot Persistence是有成本的,并引起了争议,例如:
在一篇关于经常引用polyglot persistence的文章中,Martin Fowler为一个假设的零售商设计了一个Web应用程序,该应用程序将Riak,Neo4j,MongoDB,Cassandra和RDBMS用于不同的数据集。不难想象,这造成了零售商的DevOps(Development和Operations的组合词,这里根据语境译为软件开发、IT运维技术工程师)会成群结队地辞职。
— — Stephen Pimentel
此外:
根据历史经验来看,如果你试图采用其中六种[技术],你至少需要18名员工来操作存储方面 - 比如六种存储技术。这是不可估量的,而且太昂贵了。
— — — Dave McCrory
近年来,也有一些关于使用微服务来实现Polyglot Persistence(多语言持久化)架构的建议。但是,SingleStore可以通过在单个多模型数据库系统中支持不同的数据类型和处理要求来提供更简单的解决方案。这提供了许多很多便利之处,例如,更低的TCO(Total Cost of Ownership,总拥有成本),可以减少开发人员学习多种产品的负担,没有集成难题等等。在一系列文章中,我们将更详细地讨论SingleStore的多模型功能。让我们从时序数据开始吧。
首先,我们需要在SingleStore网站上创建一个免费的托管服务帐户,并在Databricks网站上创建一个免费的CE社区版 帐户。在撰写本文时,SingleStore的托管服务帐户附带$500的信用值。这对于本文中描述的案例研究来说绰绰有余。对于Databricks社区版,我们需要注册免费帐户而不是试用版。使用Spark是因为,在之前的一篇文章中,我们注意到Spark非常适合使用SingleStore的ETL。
如果您在Kaggle没有帐户,请创建一个帐户并下载all_stocks_5yr.csv文件。Kaggle网站显示,此文件的大小为29.58MB。数据集由以下字段组成:
- date,日期:从 2013年2月8日至2018年2月7日的五年每日期间。无缺省值。
- open,开盘价:开盘价。11个缺失值。
- high,高:价格高。缺少8个值。
- low,低:价格低。缺少 8 个值。
- close,收盘价:收盘价。无缺省值。
- volume,交易量:交易的股票总数。无缺省值。
- name,名称: 交易品种。505 个唯一值。无缺省值。
为了方便我们的初步探索,我们可以选择日期、收盘价、名称。
配置Databricks社区版
上一篇文章提供了有关如何配置Databricks社区版以用于SingleStore的详细说明。我们可以将这些确切的说明用于此用例。
上传CSV文件
我们需要将CSV文件上传到Databricks社区版环境。上一篇文章对怎样上传CSV文件有详细介绍。我们可以对这个用例进行精确说明。
创建数据库表
在我们的SingleStore托管服务帐户中,使用SQL编辑器创建一个新数据库。命名为timeseries_db,创建语句如下:
CREATE DATABASE IF NOT EXISTS timeseries_db;
复制
在timeseries_db数据库中创建表,语句如下:
USE timeseries_db; CREATE ROWSTORE TABLE IF NOT EXISTS tick ( ts DATETIME SERIES TIMESTAMP, symbol VARCHAR(5), price NUMERIC(18, 4), KEY(ts) );
复制
每行都有一个名为ts的时间值属性。因为在此示例中,我们不使用小数秒,所以用DATETIME而不是DATETIME(6)。SERIES TIMESTAMP指定表中的列作为默认时间戳。我们将在ts上创建一个KEY,以便能够有效地筛选值的范围。
在Databricks上新建Python脚本
现在让我们在Databricks社区版上新建一个Python脚本。我们称之为时序数据加载器。然后将新建脚本附加到Spark集群。
在新的代码块中,让我们添加以下内容:
from pyspark.sql.types import * tick_schema = StructType([ StructField("ts", TimestampType(), True), StructField("open", DoubleType(), True), StructField("high", DoubleType(), True), StructField("low", DoubleType(), True), StructField("price", DoubleType(), True), StructField("volume", IntegerType(), True), StructField("symbol", StringType(), True) ])
复制
此模式可确保我们拥有正确的列类型。
我们将在接下来的代码块中创建一个新的数据框架,如下:
tick_df = spark.read.csv("/FileStore/all_stocks_5yr.csv", header = True, schema = tick_schema)
复制
此段的作用是读取CSV文件并创建一个名为tick_df的数据框架。还告诉Spark有一个标题行,并要求它使用以前定义的模式。
在以下代码中,让我们获取行数:
tick_df.count()
复制
执行完此操作,我们得到的返回值是619040。
基于初始分析,决定删除这些列,如下所示:
tick_df = tick_df.drop("open", "high", "low", "volume")
复制
并对数据进行排序:
tick_df = tick_df.sort("ts", "symbol")
复制
使用如下语句,对Dataframe的结构进行查看:
tick_df.show(10)
复制
输出如下:
+-------------------+-------+------+ | ts| price|symbol| +-------------------+-------+------+ |2013-02-08 00:00:00| 45.08| A| |2013-02-08 00:00:00| 14.75| AAL| |2013-02-08 00:00:00| 78.9| AAP| |2013-02-08 00:00:00|67.8542| AAPL| |2013-02-08 00:00:00| 36.25| ABBV| |2013-02-08 00:00:00| 46.89| ABC| |2013-02-08 00:00:00| 34.41| ABT| |2013-02-08 00:00:00| 73.31| ACN| |2013-02-08 00:00:00| 39.12| ADBE| |2013-02-08 00:00:00| 45.7| ADI| +-------------------+-------+------+ only showing top 10 rows
复制
现在,将Dataframe写入SingleStore。我们可以添加以下内容:
%run ./Setup
复制
在Setup脚本中,我们需要确保已为SingleStore托管服务集群添加了服务器地址和密码。
在如下代码中,我们将为SingleStore Spark Connector设置部分参数,如下所示:
spark.conf.set("spark.datasource.singlestore.ddlEndpoint", cluster) spark.conf.set("spark.datasource.singlestore.user", "admin") spark.conf.set("spark.datasource.singlestore.password", password) spark.conf.set("spark.datasource.singlestore.disablePushdown", "false")
复制
最后, 我们将使用此Spark Connector把Dataframe写入到SingleStore中:
(tick_df.write .format("singlestore") .option("loadDataCompression", "LZ4") .mode("ignore") .save("timeseries_db.tick"))
复制
以上操作是将Dataframe写入timeseries_db数据库中的tick表。我们可以检查此表是否已成功从SingleStore中填充。
示例查询
现在已经构建了我们的系统,可以在上面运行一些查询。SingleStore支持一系列用于处理时序数据的有用函数。让我们看一些例子。
Average Aggregate(平均聚合函数)
下面的查询说明了如何在表中计算简单的平均总体时间序列值。
SELECT symbol, AVG(price) FROM tick GROUP BY symbol ORDER BY symbol;
复制
输出如下:
+--------+---------------+ | symbol | AVG(price) | +--------+---------------+ | A | 49.20202542 | | AAL | 38.39325226 | | AAP | 132.43346307 | | AAPL | 109.06669849 | | ABBV | 60.86444003 | ... ...
复制
Time Bucketing函数
Time Bucketing函数可以按固定的时间间隔聚合和分组不同时间序列的数据。单一存储支持多种函数:
- FIRST:聚合函数,它返回一组输入值的第一个值,该值定义为与最小时间关联的值。。该文档包含其他详细信息和示例。
- LAST:聚合函数,它返回一组输入值的最后一个值,定义为与最长时间关联的值。与最大时间戳关联的值。该文档包含其他详细信息和示例。
- TIME_BUCKET:聚合函数,它将时间规范化为最接近的存储桶开始时间。该文档包含其他详细信息和示例。
例如,我们可以用TIME_BUCKET来查找以5天为一个间隔分组的平均时间序列值,操作如下:
SELECT symbol, TIME_BUCKET("5d", ts), AVG(price) FROM tick WHERE symbol = "AAPL" GROUP BY 1, 2 ORDER BY 1, 2;
复制
输出如下:
+--------+-----------------------+--------------+ | symbol | TIME_BUCKET("5d", ts) | AVG(price) | +--------+-----------------------+--------------+ | AAPL | 2013-02-08 00:00:00.0 | 67.75280000 | | AAPL | 2013-02-13 00:00:00.0 | 66.36943333 | | AAPL | 2013-02-18 00:00:00.0 | 64.48960000 | | AAPL | 2013-02-23 00:00:00.0 | 63.63516667 | | AAPL | 2013-02-28 00:00:00.0 | 61.51996667 | ... ... ...
复制
我们还可以套用这些函数来创建蜡烛图,图表中显示股票随时间推移的高价、最低价、开盘价和收盘价,按五天时段进行存储,如下所示:
SELECT TIME_BUCKET("5d") AS ts, symbol, MIN(price) AS low, MAX(price) AS high, FIRST(price) AS open, LAST(price) AS close FROM tick WHERE symbol = "AAPL" GROUP BY 2, 1 ORDER BY 2, 1;
复制
输出如下:
+------------+--------+----------+----------+----------+----------+ | ts | symbol | low | high | open | close | +------------+--------+----------+----------+----------+----------+ | 2013-02-08 | AAPL | 66.8428 | 68.5614 | 67.8542 | 66.8428 | | 2013-02-13 | AAPL | 65.7371 | 66.7156 | 66.7156 | 65.7371 | | 2013-02-18 | AAPL | 63.7228 | 65.7128 | 65.7128 | 64.4014 | | 2013-02-23 | AAPL | 63.2571 | 64.1385 | 63.2571 | 63.5099 | | 2013-02-28 | AAPL | 60.0071 | 63.0571 | 63.0571 | 60.0071 | ... ... ... ... ... ...
复制
Smoothing(平滑)
我们可以使用AVG函数将时序数据用作窗口聚合。下面是一个示例,我们查看价格和过去三个点变动的价格平均线:
SELECT symbol, ts, price, AVG(price) OVER (ORDER BY ts ROWS BETWEEN 3 PRECEDING AND CURRENT ROW) AS smoothed_price FROM tick WHERE symbol = "AAPL";
复制
输出如下:
+--------+-----------------------+----------+----------------+ | symbol | ts | price | smoothed_price | +--------+-----------------------+----------+----------------+ | AAPL | 2013-02-08 00:00:00.0 | 67.8542 | 67.85420000 | | AAPL | 2013-02-11 00:00:00.0 | 68.5614 | 68.20780000 | | AAPL | 2013-02-12 00:00:00.0 | 66.8428 | 67.75280000 | | AAPL | 2013-02-13 00:00:00.0 | 66.7156 | 67.49350000 | | AAPL | 2013-02-14 00:00:00.0 | 66.6556 | 67.19385000 | ... ... ... ...
复制
AS OF
查找当前AS OF时间点的表中的行也是常见的时序要求。这可以使用ORDER BY和LIMIT轻松实现。示例如下:
SELECT * FROM tick WHERE ts <= "2021-10-11 00:00:00" AND symbol = "AAPL" ORDER BY ts DESC LIMIT 1;
复制
输出如下:
+-----------------------+--------+----------+ | ts | symbol | price | +-----------------------+--------+----------+ | 2018-02-07 00:00:00.0 | AAPL | 159.5400 | +-----------------------+--------+----------+
复制
Interpolation(插值)
时序数据可能存在间隙。我们可以插入缺失点。SingleStore文档提供了一个示例存储过程,在处理价格变动数据时,可以使用该存储过程来实现此目的。
Bonus: Streamlit Visualization(流光可视化)
早些时候,提到了蜡烛图,能以图形而不是表格格式查看这些图表,那就太好了。我们可以通过Streamlit轻松做到这一点。上一篇文章展示了我们可以轻松地将Streamlit连接到SingleStore。
Install the Required Software(安装所需的软件)
streamlit pandas plotly pymysql
复制
这些可以在GitHub上的requirements.txt文件中找到。按如下所示运行该文件:
pip install -r requirements.txt
复制
应用示例
以下是streamlit_app.py的完整代码:
# streamlit_app.py import streamlit as st import pandas as pd import plotly.graph_objects as go import pymysql # Initialize connection. def init_connection(): return pymysql.connect(**st.secrets["singlestore"]) conn = init_connection() symbol = st.sidebar.text_input("Symbol", value = "AAPL", max_chars = None, key = None, type = "default") num_days = st.sidebar.slider("Number of days", 2, 30, 5) # Perform query. data = pd.read_sql(""" SELECT TIME_BUCKET(%s) AS day, symbol, MIN(price) AS low, MAX(price) AS high, FIRST(price) AS open, LAST(price) AS close FROM tick WHERE symbol = %s GROUP BY 2, 1 ORDER BY 2, 1; """, conn, params = (str(num_days) + "d", symbol.upper())) st.subheader(symbol.upper()) fig = go.Figure(data = [go.Candlestick( x = data["day"], open = data["open"], high = data["high"], low = data["low"], close = data["close"], name = symbol, )]) fig.update_xaxes(type = "category") fig.update_layout(height = 700) st.plotly_chart(fig, use_container_width = True) st.write(data)
复制
Create Secrets file(创建机密文件)
我们的本地Streamlit应用程序将从应用程序根目录中的读取此文件:streamlit/secrets.toml。因此需要创建此文件,内容如下:
# .streamlit/secrets.toml [singlestore] host = "<TO DO>" port = 3306 database = "timeseries_db" user = "admin" password = "<TO DO>"
复制
创建集群时,应将TO DO的主机和密码替换为从SingleStore托管服务获取的值。
运行代码
我们可以按如下方式运行Streamlit应用程序:
streamlit run streamlit_app.py
复制
Web浏览器中的输出应如图2所示。
图2. Streamlit.
在网页上,我们可以在文本框中输入新的股票代码,然后使用滚动条更改TIME_BUCKET的天数。可以随意试验代码以满足您的需求。
小结
本文展示了 SingleStore 是处理时序数据的强大解决方案。使用SQL和内置函数,我们可以实现目标。SingleStore通过添加FIRST、LAST和TIME_BUCKET扩展了对时序的支持。
致谢
感谢John Pickford博士对时间序列数据集的建议和指导。
也万分感激Part-Time Larry 在 Streamlit—Building Financial Dashboards with Python和GitHub上的代码及精彩视频,以激发本文中对Streamlit Visualization的灵感。