暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

数据中心建设-X01数据存储与同步

牛仔纹理 2021-04-28
464

处于互联网大潮中,企业在技术的演练和业务的变幻莫测中,对于技术架构来说是一个比较大的挑战,深入后端,我们会发现各家都会自有一些比较适合自己的服务体系,但基本大同小异,比如在针对高并发、低延迟、复杂的业务层、海量数据,采用的优化架构方案包含应用服务与数据服务的分离,内存缓存,服务器集群,数据库读写分离,反向代理加CDN加速,分布式(文件、数据库)系统,使用NoSQL技术和搜索技术,拆分产品线,业务端分布式服务(订单、会员、搜索、管理等)。

简单介绍下数据存储产品

企业级搜索引擎

Elasticsearch(https://www.elastic.co/guide/en/elasticsearch/reference/current/index.html) 基于Lucene的搜索引擎,支持多种语言开发使用,稳定可靠快速

Slor(http://lucene.apache.org/solr/)基于Lucene的搜索引擎,支持全文检索、命中标示、动态聚类、数据库集成、灵活缓存和富文本处理

中小型网站系统,包括日志系统,数据仓库和嵌入式系统

Oracle(https://www.oracle.com/database/)开放性、可伸缩性、安全性高、性能高等,另外Oracle新推出的 Autonomous Data Warehouse 新一代“自动驾驶”的自治型数据库技术打造,将机器学习与数据仓库/数据集市结合在一起,无需人工干预和停机就能自动调优、修补、升级、监视和保护数据

MySQL(https://www.mysql.com/)性能高、低成本、可靠性好、多种API支持、支持多线程、优化的SQL查询算法、可单独(客户端、服务器)使用和嵌入系统多用途等

PostgreSQL(https://www.postgresql.org/)可编程性、可自定义对象、可继承性、丰富的索引、丰富的数据类型等

Greenplum(https://greenplum.org/)基于PostgreSQL、MPP等

SqlServer(https://docs.microsoft.com/en-us/sql/?view=sql-server-ver15)图形化管理工具、内建式分析处理、集中式管理等

DB2(https://www.ibm.com/analytics/db2)适用于海量数据、数据分级技术等

其他数据库  informix sybase Teradata等

文件系统

MongoDB(https://www.mongodb.com/

Hadoop生态圈产品

Hive、HBase等

内存数据库

redis 等

异构同步小案例

在建设数据仓库的过程中,解决数据孤岛,需要将数据同步至同一个数据存储系统中,最近一个项目正好写了一个异构数据库同步脚本,简单介绍下

多线程、可配置多种数据库进行同步、通过配置表的方法来同步表,独立进行,一个作业失败,其他不受牵连、可增量、可全量、可进行procedure等。

脚本如下:

import queue
import threading
import time
import contextlib
import pymysql as mdb
from datetime import date, datetime
import datetime as dt
import os


StopEvent = object()


class ThreadPool(object):

   def __init__(self, max_num):
       self.q = queue.Queue()  
       self.max_num = max_num  

       self.terminal = False  
       self.generate_list = []  
       self.free_list = []

   def run(self, func, args, callback=None):
       

       if len(self.free_list) == 0 and len(self.generate_list) < self.max_num:
           self.generate_thread()  #创建线程
       w = (func, args, callback,)  #把参数封装成元祖
       self.q.put(w)  #添加到任务队列

   def generate_thread(self):
       
       t = threading.Thread(target=self.call)
       t.start()

   def call(self):
       
       current_thread = threading.currentThread  
       self.generate_list.append(current_thread)  
       event = self.q.get()  
       while event != StopEvent:  
           if  self.q.qsize()==0 :
               self.q.put(StopEvent)
           else:
               func, arguments, callback = event  
               try:
                   result = func(*arguments)  
                   status = True  
               except Exception as e:
                   status = False
                   result = e  #结果为错误信息
   
               if callback is not None:  #是否存在回调函数
                   try:
                       callback(status, result)  #执行回调函数
                   except Exception as e:
                       pass
   
               if self.terminal:
                   event = StopEvent  #等于全局变量,表示停止信号       
               else:
                    self.free_list.append(current_thread)  #执行完毕任务,添加到闲置列表
                    if self.q.qsize()>0 :
                        event = self.q.get()  #获取任务
                        self.free_list.remove(current_thread)  
                    else:
                        try:
                            self.generate_list.remove(current_thread)
                        except Exception as e:
                            pass
           '''with self.worker_state(self.free_list, current_thread):
                  print(self.q.qsize())
                  if not self.q.empty():
                      event = self.q.get()
                  else:
                        self.q.empty
          '''      

       else:
           try:
               self.generate_list.remove(current_thread)  
           except Exception as e:
               pass

   def close(self):  
       num = len(self.generate_list)  
       while num:
           self.q.put(StopEvent)  
           num -= 1


   def terminate(self):   #终止线程(清空队列)

       self.terminal = True  

       while self.generate_list:  #如果有已经创建线程存活
           self.q.put(StopEvent)  #有几个线程就发几个终止信号
       self.q.empty()  
                     
           
   @contextlib.contextmanager
   def worker_state(self, state_list, worker_thread):
       state_list.append(worker_thread)
       try:
           yield
       finally:
           state_list.remove(worker_thread)


def getTimes():
   today=datetime.now()
   hours=dt.timedelta(hours=6)
   gettimes=today-hours  
   return gettimes.strftime("%Y-%m-%d %H:%M:%S")  

def changedatetime(tuples):
   res=''
   for t1  in tuples:
       datas=''
       for  t2  in t1:
           if type(t2)==datetime or type(t2)==date:
               datas=datas+" '%s'," %(t2.strftime("%Y-%m-%d %H:%M:%S"))
           elif t2==None:
                   datas=datas+' %s,' %('Null')
           elif type(t2)==str:
               if '\\' in t2:
                   t2=t2.replace('\\','\\\\')                  
               if "'" in t2:
                   t2=t2.replace("'","\\'")
               if '"' in t2:
                   t2=t2.replace('"','\\"')
                 
               datas=datas+'"%s",' %(t2)                
           elif type(t2)==int:
                   datas=datas+' %d,' %(t2)
           else :
               datas=datas+' %s,' %( t2 )
       res=res+"(%s)," %(datas[0:-1])
   return res[0:-1]  



def work(item):
       datetime_column_name= item[2].split(',')[-1]
       config=dict()
       for  it in item[5].split(',') :
           keys,values= it.split('=')
           if values.isdigit():
               config[keys]=eval(values)
           else:
               config[keys]=values
       source_db = mdb.connect(**config)
       source_cur = mdb.cursors.SSCursor(source_db)
       target_db = mdb.connect(host='', port=3306, user='', passwd='', charset='')
       target_cur = target_db.cursor()
       try:
           # 定义查询语句
           print('所运行的代码:', 'select %s from %s where %s >="%s" ; ' %(item[2],item[1],datetime_column_name,item[3].strftime("%Y-%m-%d %H:%M:%S") ))
           source_cur.execute('select %s from %s where %s >="%s" ' %(item[2],item[1],datetime_column_name, item[3].strftime("%Y-%m-%d %H:%M:%S")   ))
           i=2000
           while True:
               getstgdata=source_cur.fetchmany(i)
               if not getstgdata:break
               else:
                   sql = 'REPLACE into %s ( %s) values %s' %(item[6],item[2],changedatetime(getstgdata))
                   target_cur.execute(sql)
                   target_db.commit()
           update_sql="update dataware.synchtodate set start_date =(select ifnull((select DATE_ADD(%s,INTERVAL -3 HOUR) from %s where id=(select max(id) from %s)),CURRENT_DATE))   where type='table' and target_table='%s'" %(datetime_column_name,item[6],item[6],item[6])
           target_cur.execute(update_sql)
           target_db.commit()
           target_db.close()
           source_db.close()
       except BaseException as e:
           print('错误信息:',item[6],e)
           # 提交到数据库
           target_db.commit()
           # 关闭数据库连接
           target_db.close()
           source_db.close()

def procedure(item,cur):
       try:
           p_sql='call %s;' %(item)
           print('运行存储过程',p_sql)
           cur.execute(p_sql)
           
       except BaseException as e:
           print('错误信息:',e)      
       
if __name__=='__main__':    
   # 目标地址的数据连接
   config_db = mdb.connect(host='', port=3306, user='', passwd='', db='dataware', charset='utf8')
   config_cur = config_db.cursor()
 
   config_cur.execute("select * from dataware.synchtodate where type='table' and status=1 ")  
   try:
       
       pool = ThreadPool(10)
       for item in config_cur.fetchall():
           pool.run(func=work, args=(item,))
   
       while  True:
           if  pool.q.qsize()==0 and  len(pool.generate_list)==0 :  
               pool.terminate()
               break
           time.sleep(5)
               
       pool = ThreadPool(10)            
       #运行存储过程
       config_cur.execute("select source_table,status from dataware.synchtodate where type='procedure' and status>=1 order by status ")  
       cunt=1
       for item in config_cur.fetchall():
           if item[1]==cunt:
               cunt=item[1]
               pool.run(func=procedure, args=(item[0],config_cur,))
           else:
               while  True:
                   if  pool.q.qsize()==0 and  len(pool.generate_list)==0 :
                       cunt=item[1]
                       pool.run(func=procedure, args=(item[0],config_cur,))
                       break
                   time.sleep(5)
       while  True:
           if  pool.q.qsize()==0 and  len(pool.generate_list)==0 :  
               pool.terminate()
               break
           time.sleep(5)
       config_db.commit()
       config_db.close()
       os._exit(0)      
   except BaseException as e:
       pass

值得注意和探索的是,时间戳的位置、增量和全量同步用到的SQL,或者其他方式实现。


文章转载自牛仔纹理,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

评论