处于互联网大潮中,企业在技术的演练和业务的变幻莫测中,对于技术架构来说是一个比较大的挑战,深入后端,我们会发现各家都会自有一些比较适合自己的服务体系,但基本大同小异,比如在针对高并发、低延迟、复杂的业务层、海量数据,采用的优化架构方案包含应用服务与数据服务的分离,内存缓存,服务器集群,数据库读写分离,反向代理加CDN加速,分布式(文件、数据库)系统,使用NoSQL技术和搜索技术,拆分产品线,业务端分布式服务(订单、会员、搜索、管理等)。
简单介绍下数据存储产品
企业级搜索引擎
Elasticsearch(https://www.elastic.co/guide/en/elasticsearch/reference/current/index.html) 基于Lucene的搜索引擎,支持多种语言开发使用,稳定可靠快速
Slor(http://lucene.apache.org/solr/)基于Lucene的搜索引擎,支持全文检索、命中标示、动态聚类、数据库集成、灵活缓存和富文本处理
中小型网站系统,包括日志系统,数据仓库和嵌入式系统
Oracle(https://www.oracle.com/database/)开放性、可伸缩性、安全性高、性能高等,另外Oracle新推出的 Autonomous Data Warehouse 新一代“自动驾驶”的自治型数据库技术打造,将机器学习与数据仓库/数据集市结合在一起,无需人工干预和停机就能自动调优、修补、升级、监视和保护数据
MySQL(https://www.mysql.com/)性能高、低成本、可靠性好、多种API支持、支持多线程、优化的SQL查询算法、可单独(客户端、服务器)使用和嵌入系统多用途等
PostgreSQL(https://www.postgresql.org/)可编程性、可自定义对象、可继承性、丰富的索引、丰富的数据类型等
Greenplum(https://greenplum.org/)基于PostgreSQL、MPP等
SqlServer(https://docs.microsoft.com/en-us/sql/?view=sql-server-ver15)图形化管理工具、内建式分析处理、集中式管理等
DB2(https://www.ibm.com/analytics/db2)适用于海量数据、数据分级技术等
其他数据库 informix sybase Teradata等
文件系统
MongoDB(https://www.mongodb.com/)
Hadoop生态圈产品
Hive、HBase等
内存数据库
redis 等
异构同步小案例
在建设数据仓库的过程中,解决数据孤岛,需要将数据同步至同一个数据存储系统中,最近一个项目正好写了一个异构数据库同步脚本,简单介绍下
多线程、可配置多种数据库进行同步、通过配置表的方法来同步表,独立进行,一个作业失败,其他不受牵连、可增量、可全量、可进行procedure等。
脚本如下:
import queue
import threading
import time
import contextlib
import pymysql as mdb
from datetime import date, datetime
import datetime as dt
import os
StopEvent = object()
class ThreadPool(object):
def __init__(self, max_num):
self.q = queue.Queue()
self.max_num = max_num
self.terminal = False
self.generate_list = []
self.free_list = []
def run(self, func, args, callback=None):
if len(self.free_list) == 0 and len(self.generate_list) < self.max_num:
self.generate_thread() #创建线程
w = (func, args, callback,) #把参数封装成元祖
self.q.put(w) #添加到任务队列
def generate_thread(self):
t = threading.Thread(target=self.call)
t.start()
def call(self):
current_thread = threading.currentThread
self.generate_list.append(current_thread)
event = self.q.get()
while event != StopEvent:
if self.q.qsize()==0 :
self.q.put(StopEvent)
else:
func, arguments, callback = event
try:
result = func(*arguments)
status = True
except Exception as e:
status = False
result = e #结果为错误信息
if callback is not None: #是否存在回调函数
try:
callback(status, result) #执行回调函数
except Exception as e:
pass
if self.terminal:
event = StopEvent #等于全局变量,表示停止信号
else:
self.free_list.append(current_thread) #执行完毕任务,添加到闲置列表
if self.q.qsize()>0 :
event = self.q.get() #获取任务
self.free_list.remove(current_thread)
else:
try:
self.generate_list.remove(current_thread)
except Exception as e:
pass
'''with self.worker_state(self.free_list, current_thread):
print(self.q.qsize())
if not self.q.empty():
event = self.q.get()
else:
self.q.empty
'''
else:
try:
self.generate_list.remove(current_thread)
except Exception as e:
pass
def close(self):
num = len(self.generate_list)
while num:
self.q.put(StopEvent)
num -= 1
def terminate(self): #终止线程(清空队列)
self.terminal = True
while self.generate_list: #如果有已经创建线程存活
self.q.put(StopEvent) #有几个线程就发几个终止信号
self.q.empty()
@contextlib.contextmanager
def worker_state(self, state_list, worker_thread):
state_list.append(worker_thread)
try:
yield
finally:
state_list.remove(worker_thread)
def getTimes():
today=datetime.now()
hours=dt.timedelta(hours=6)
gettimes=today-hours
return gettimes.strftime("%Y-%m-%d %H:%M:%S")
def changedatetime(tuples):
res=''
for t1 in tuples:
datas=''
for t2 in t1:
if type(t2)==datetime or type(t2)==date:
datas=datas+" '%s'," %(t2.strftime("%Y-%m-%d %H:%M:%S"))
elif t2==None:
datas=datas+' %s,' %('Null')
elif type(t2)==str:
if '\\' in t2:
t2=t2.replace('\\','\\\\')
if "'" in t2:
t2=t2.replace("'","\\'")
if '"' in t2:
t2=t2.replace('"','\\"')
datas=datas+'"%s",' %(t2)
elif type(t2)==int:
datas=datas+' %d,' %(t2)
else :
datas=datas+' %s,' %( t2 )
res=res+"(%s)," %(datas[0:-1])
return res[0:-1]
def work(item):
datetime_column_name= item[2].split(',')[-1]
config=dict()
for it in item[5].split(',') :
keys,values= it.split('=')
if values.isdigit():
config[keys]=eval(values)
else:
config[keys]=values
source_db = mdb.connect(**config)
source_cur = mdb.cursors.SSCursor(source_db)
target_db = mdb.connect(host='', port=3306, user='', passwd='', charset='')
target_cur = target_db.cursor()
try:
# 定义查询语句
print('所运行的代码:', 'select %s from %s where %s >="%s" ; ' %(item[2],item[1],datetime_column_name,item[3].strftime("%Y-%m-%d %H:%M:%S") ))
source_cur.execute('select %s from %s where %s >="%s" ' %(item[2],item[1],datetime_column_name, item[3].strftime("%Y-%m-%d %H:%M:%S") ))
i=2000
while True:
getstgdata=source_cur.fetchmany(i)
if not getstgdata:break
else:
sql = 'REPLACE into %s ( %s) values %s' %(item[6],item[2],changedatetime(getstgdata))
target_cur.execute(sql)
target_db.commit()
update_sql="update dataware.synchtodate set start_date =(select ifnull((select DATE_ADD(%s,INTERVAL -3 HOUR) from %s where id=(select max(id) from %s)),CURRENT_DATE)) where type='table' and target_table='%s'" %(datetime_column_name,item[6],item[6],item[6])
target_cur.execute(update_sql)
target_db.commit()
target_db.close()
source_db.close()
except BaseException as e:
print('错误信息:',item[6],e)
# 提交到数据库
target_db.commit()
# 关闭数据库连接
target_db.close()
source_db.close()
def procedure(item,cur):
try:
p_sql='call %s;' %(item)
print('运行存储过程',p_sql)
cur.execute(p_sql)
except BaseException as e:
print('错误信息:',e)
if __name__=='__main__':
# 目标地址的数据连接
config_db = mdb.connect(host='', port=3306, user='', passwd='', db='dataware', charset='utf8')
config_cur = config_db.cursor()
config_cur.execute("select * from dataware.synchtodate where type='table' and status=1 ")
try:
pool = ThreadPool(10)
for item in config_cur.fetchall():
pool.run(func=work, args=(item,))
while True:
if pool.q.qsize()==0 and len(pool.generate_list)==0 :
pool.terminate()
break
time.sleep(5)
pool = ThreadPool(10)
#运行存储过程
config_cur.execute("select source_table,status from dataware.synchtodate where type='procedure' and status>=1 order by status ")
cunt=1
for item in config_cur.fetchall():
if item[1]==cunt:
cunt=item[1]
pool.run(func=procedure, args=(item[0],config_cur,))
else:
while True:
if pool.q.qsize()==0 and len(pool.generate_list)==0 :
cunt=item[1]
pool.run(func=procedure, args=(item[0],config_cur,))
break
time.sleep(5)
while True:
if pool.q.qsize()==0 and len(pool.generate_list)==0 :
pool.terminate()
break
time.sleep(5)
config_db.commit()
config_db.close()
os._exit(0)
except BaseException as e:
pass
值得注意和探索的是,时间戳的位置、增量和全量同步用到的SQL,或者其他方式实现。




