暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

向量检索与实时数仓强强融合,StoneData助客户实现高性能、低成本的非结构化数据分析

石原子科技 2023-12-07
68

石原子实时数仓系统(简称 StoneData)是杭州石原子科技有限公司(StoneAtom)自主设计、研发的企业级实时数据仓库产品,支持对海量数据进行批量处理和实时分析,可实现对千亿级数据的秒级到毫秒级多维分析透视和业务探索。


StoneData 具备多模数据处理能力,除了在结构化数据处理场景可提供更优的性能外,还可以处理半结构化数据、非结构化数据、时序数据和地理信息数据。StoneData 支持传统 MPP 运算和存算分离两种模式,能够灵活应对不同类型的海量数据的存储、加工和实时分析场景的需要。StoneData 是高度可扩展的弹性系统,具备云原生特性,能够在公有云环境提供快速交付、随需可用的服务。StoneData 在其底层被设计为云中立产品,可以支持多云部署和多云互通。

随着 AGI 的进一步发展潮流,向量数据库的重要性愈发凸显,其中,向量检索作为向量数据库的底层技术要求之一,也被越来越多的大数据产品接入,StoneData 也实现了向量计算与实时数仓的融合,本文主要分享我们在非结构化分析场景下融合向量检索的思考与实践(文末有相关案例视频演示~)。


实现原理


StoneData 旨在帮助用户更灵活方便的使用向量检索的能力,实现对非结构化数据的近似检索分析。其实现原理是通过AI算法提取非结构化数据的特征,然后利用特征向量唯一标识非结构化数据,向量间的距离用于衡量非结构化数据之间的相似度。通过SIMD指令加速、高效索引算法、混合检索CBO策略以及低成本存储技术,帮助您实现高性能、低成本的非结构化数据近似查询和分析。


典型应用场景


通过StoneData您可以轻松搭建各种AI应用场景,例如:
  • 图像相似性搜索通过图片检索近似图片;
  • 视频相似性搜索:通过输入视频中的截图查找目标视频;
  • 音频相似性搜索:快速检索大量音频数据,如语音、音乐、音效、动物/自然音,快速检索相似声音;
  • 推荐系统:根据用户画像的信息找出与之匹配的目标产品;
  • 问答系统:交互式数字问答聊天机器人,基于知识库自动回答客户的问题;
  • 文本搜索引擎:通过将关键字与文本数据库进行比较,帮助用户找到他们正在寻找的信息。

典型架构

通过 StoneData 产品搭建向量分析的典型架构如下:

为什么选择 StoneData?


  • 架构简单、功能强大,使用 StoneData 即可完成关系型-非关系型数据的融合分析,无需引入额外的技术组件。 例如,可以检索与输入图片中的连衣裙相似度最高、价格在100元到200元之间且上架时间在最近1个月以内的产品。
  • 支持标准SQL,兼容MySQL方言,简化开发流程,用户使用简单的 SQL 指令即可完成向量数据检索;
  • 支持数据实时更新,传统的向量分析系统中数据只能按照T+1更新,不支持数据实时写入。StoneData 支持数据实时更新和查询;
  • 高可用与高扩展性,支持3副本存储、支持高可用集群,即便少数节点出现故障也不会收到影响;
  • 高性能,支持 HNSW 索引算法,实现毫秒级向量信息检索和10k+的QPS。


支持的度量和索引


在 StoneData 中,相似度读量是用来评估两个向量之间的相似性。选择一个好的距离度量方法可以显著提高分类和聚类的性能。根据输入数据的形式,选择特定的相似度度量方法可以获得最优的性能。StoneData 支持如下度量方式:
  • 欧氏距离(L2)该指标通常用于计算机视觉领域(CV)
  • 内积(IP)该指标通常用于自然语言处理领域(NLP)
  • 余弦夹角(COSINE)该指标适用与 CV/NLP/LLM 等
索引是数据的组织单位。在搜索或查询插入的实体之前,必须声明索引类型和相似度度量。如果您未指定索引类型,则 StoneData 将默认使用暴力搜索。
  • HNSWHNSW是一种基于图形的索引,最适合于对搜索效率有很高需求的场景
  • FLATFLAT最适合于在小规模,百万级数据集上寻求完全准确和精确的搜索结果的场景


案例介绍(一)

使用 StoneData 实现图片检索引擎-以图搜图


本章主要介绍如何使用 StoneData 构建图片检索服务。依赖的第三方软件包括:

  • YOLOv3

  • ResNet-50

  • ToWhee

像百度这样的主要搜索引擎已经为用户提供了按图像搜索的选项。此外,电子商务平台已经意识到了这种功能为在线购物者带来的好处,淘宝将图像搜索整合到了其智能手机应用中。




本演示使用图像特征提取模型 ResNet50 和 StoneData 构建一个可以执行反向图像搜索的系统。



系统架构图


数据源及初始化配置


该演示使用PASCAL VOC图像集,其中包含20个类别的17125张图像,包含人像、动物、交通工具、家居等类别。


图片数据集大小: ~ 2 GB.下载地址: 

https://drive.google.com/file/d/1n_370-5Stk4t0uDV1QqvYkcvyV8rbw0O/view?usp=sharing


使用 ResNet-50 进行图像特征提取,然后将向量与图片地址存储在 StoneData 中。



    @app.post('/img/load')
    async def load_images(item: Item):
    # Insert all the image under the file path to stonedata
    try:
    total_num = do_load(item.Table, item.File, MODEL, StoneData_CLI)
    LOGGER.info(f"Successfully loaded data, total count: {total_num}")
    return "Successfully loaded data!"
    except Exception as e:
    LOGGER.error(e)
    return {'status': False, 'msg': e}, 400


    # Import vectors to StoneData
    def do_load( image_dir: str, model: Resnet50, stonedata_client: StoneDataHelper):
    stonedata_client.stoneatom_create_table()
    vectors, names = extract_features(image_dir, model)
    lens = 0
    for vector, name in zip(vectors, names):
    stonedata_client.stoneatom_insert_table( vector,bytes.decode(name))
    lens = lens + 1
    return len(lens)


    # Get the vector of images
    def extract_features(img_dir, model):
    try:
    cache = Cache('./tmp')
    feats = []
    names = []
    img_list = get_imgs(img_dir)
    total = len(img_list)
    cache['total'] = total
    for i, img_path in enumerate(img_list):
    try:
    norm_feat = model.resnet50_extract_feat(img_path)
    feats.append(norm_feat)
    names.append(img_path.encode())
    cache['current'] = i + 1
    LOGGER.info(f"Extracting feature from image No. {i + 1} , {total} images in total")
    except Exception as e:
    LOGGER.error(f"Error with extracting feature from image {e}")
    continue
    return feats, names
    except Exception as e:
    LOGGER.error(f"Error with extracting feature from image {e}")
            sys.exit(1)

    每当您上传新图像到图像搜索服务时,它将被转换为新向量,并与以前存储在 StoneData 中的向量进行比较。 然后, StoneData 返回最相似向量的ID,您可以在文件系统中查询相应的图像。以下是演示代码:


      @app.post('/img/search')
      async def search_images(image: UploadFile = File(...), topk: int = Form(TOP_K)):
      # Search the upload image in StoneData
      try:
      # Save the upload image to server.
      content = await image.read()
      img_path = os.path.join(UPLOAD_PATH, image.filename)
      with open(img_path, "wb+") as f:
      f.write(content)
      items = do_search(img_path, topk, MODEL, StoneData_CLI)
      paths = [x[1] for x in items]
      distances = [x[2] for x in items]
      res = dict(zip(paths, distances))
      res_items = sorted(res.items(), key=lambda item: item[1])
      LOGGER.info("Successfully searched similar images!")
      return res_items
      except Exception as e:
      LOGGER.error(e)
      return {'status': False, 'msg': e}, 400




      def do_search(img_path: str, top_k: int, model: Resnet50, stonedata_cli: StoneDataHelper):
      try:
      # verctor for feat
      feat = model.resnet50_extract_feat(img_path)
      items = stonedata_cli.search_vectors(feat, top_k)
      return items
      except Exception as e:
      LOGGER.error(f"Error with search : {e}")
      sys.exit(1)


      def search_vectors(self, vectors, top_k):
      st = "[" + ','.join(str(x) for x in vectors) + "]"
      try:
      sql = "select id ,image_path, distance from (select id, l2_distance(data, '"+ st +"') as distance, image_path from "+ DEFAULT_TABLE +") order by distance limit "+ str(top_k)
      self.cursor.execute(sql)
      results = self.cursor.fetchall()
      LOGGER.debug(f"Successfully search in StoneData: {results}")
      return results
      except Exception as e:
      LOGGER.error(f"Failed to search StoneData in Milvus: {e}")
              sys.exit(1)

      案例介绍(二)

      使用 StoneData 实现文字检索引擎 - 知识库


      文本搜索是指根据文本的内容(例如关键词、语义等)来搜索的服务,是搜索引擎非常重要的一部分。 本示例使用 Stonedata 和 Bert 构建文本搜索引擎,使用 Bert 将文本转换为向量并存储在 Stonedata 中,然后结合搜索与输入文本相似的文本。


      本演示使用文本特征提取模型 Bert 和 StoneData 构建一个可以执行反向文字搜索的系统。


      系统架构图



      数据源及初始化配置


      该演示数据是一份包含了文字标题和内容的 csv 数据,用来做数据向量化转化。

      下载地址:
      https://doc-test.stoneatom.com/assets/files/example-1342431b2f03861821145707705bb997.csv

      使用 Bert 进行特征提取,然后将向量与元数据地址存储在 StoneData 中。


        # 模型初始化
        def __init__(self):
        if not os.path.exists(MODEL_PATH):
        os.makedirs(MODEL_PATH)
        if not os.path.exists("./paraphrase-mpnet-base-v2.zip"):
        url = 'https://public.ukp.informatik.tu-darmstadt.de/reimers/sentence-transformers/v0.2/paraphrase-mpnet-base-v2.zip'
        gdown.download(url)
        with zipfile.ZipFile('paraphrase-mpnet-base-v2.zip', 'r') as zip_ref:
        zip_ref.extractall(MODEL_PATH)
        self.model = SentenceTransformer(MODEL_PATH)


        def sentence_encode(self, data):
        embedding = self.model.encode(data)
        sentence_embeddings = normalize(embedding)
        return sentence_embeddings.tolist()




        # 上传 example.csv ,预处理 数据
        @app.post('/load')
        async def load_text(file: UploadFile = File(...), table_name: str = None):
        try:
        text = await file.read()
        fname = file.filename
        dirs = "data"
        if not os.path.exists(dirs):
        os.makedirs(dirs)
        fname_path = os.path.join(os.getcwd(), os.path.join(dirs, fname))
        with open(fname_path, 'wb') as f:
        f.write(text)
        except Exception :
        return {'status': False, 'msg': 'Failed to load data.'}
        try:
        total_num = do_load(table_name, fname_path,MODEL ,STONEDATA_CLI)
        LOGGER.info(f"Successfully loaded data, total count: {total_num}")
        return "Successfully loaded data!"
        except Exception as e:
        LOGGER.error(e)
        return {'status': False, 'msg': e}, 400


        def do_load(table_name, file_dir, model, STONEDATA_CLI):
        if not table_name:
        table_name = DEFAULT_TABLE
        STONEDATA_CLI.create_table()
        title_data, text_data, sentence_embeddings = extract_features(file_dir, model)
        lens = 0
        for title, text, vector in zip(title_data, text_data, sentence_embeddings):
        st = "[" + ','.join(str(x) for x in vector) + "]"
        sql = "insert into " + table_name + " (title,contents,vector) values ('" + title + "','" + text + "','"+ st +"');"
        try:
        STONEDATA_CLI.cursor.execute(sql)
        STONEDATA_CLI.conn.commit()
        LOGGER.debug(
        f"Insert vectors to StoneData in table: {table_name} with {len(vector)} rows")
        except Exception as e:
        LOGGER.error(f"StoneData ERROR: {e} with sql: {sql}")
        sys.exit(1)
        lens = lens + 1
        return len(lens)

        每当搜索文字时,它将被转换为新向量,并与以前存储在 StoneData 中的向量进行比较。然后,StoneData 返回最相似向量的ID并返回内容。


          @app.get('/search')
          async def do_search_api(query_sentence: str = None):
          try:
          items = STONEDATA_CLI.search_vectors(query_sentence,MODEL, STONEDATA_CLI)
          res=[]
          for p, d, in items:
          dicts = {'title': p, 'content':d}
          res+=[dicts]
          LOGGER.info("Successfully searched similar text!")
          return res
          except Exception as e:
          LOGGER.error(e)
          return {'status': False, 'msg': e}, 400


          def search_vectors(self,query_sentence,MODEL, STONEDATA_CLI,):
          # 转化为新向量
          vectors = MODEL.sentence_encode([query_sentence])
          st = ','.join(str(x) for x in vectors)
          LOGGER.debug(f"Successfully search in sql: {st}")
          try:
          sql = "select title, contents from (select id, l2_distance(vector, '"+ st +"') as distance, title, contents from "+ DEFAULT_TABLE +") order by distance limit 10"
          LOGGER.debug(f"Successfully search in sql: {sql}")
          STONEDATA_CLI.cursor.execute(sql)
          results = STONEDATA_CLI.cursor.fetchall()
          LOGGER.debug(f"Successfully search in StoneData: {results}")
          return results
          except Exception as e:
          LOGGER.error(f"Failed to search StoneData in StoneData: {e}")
          sys.exit(1)


          以上便是全部的分享,如果您需要试用最新融合了向量检索的 StoneData,欢迎联系我们小助手,进行试用!


          关于石原子科技
          石原子科技成立于 2021 年10 月,拥有国内顶级的数据库人才与专家,创始成员源于阿里云 PolarDB、AnalyticDB,腾讯云 TDSQL,华为云 GaussDB,Oracle 等国内外知名数据库团队,专注于一体化 MySQL 实时 HTAP 数据库和离在线一体化实时数据仓库的研发与应用,依托云中立的数据技术进行产品设计,致力于为客户提供大规模、高性能、低成本的一站式实时数据分析服务。

          石原子科技坚持精细布局、自主创新的产品研发路线,打造了两款标杆信创产品:
          业内首个单机内核开源、行列混存+内存计算架构的一体化 MySQL HTAP 数据库 StoneDB

          该产品对标 Oracle HeatWave,使用 MySQL 的用户,通过 StoneDB 可以实现 TP+AP 混合负载,分析性能显著提升 10-100 倍以上,不需要进行数据迁移,也无需与其他 AP 系统集成,弥补 MySQL 分析领域的空白,通过 AP 增强到自主可控的 TP,瞄准大量 MySQL 信创升级 + 替代市场。


          基于全场景的新一代高性能、低成本的离在线一体化实时数仓 StoneData
          高度兼容 MySQL 语法,毫秒级更新,亚秒级查询,满足准实时和实时分析需求,一体化架构将实时和离线融合,减少数据冗余和移动,具有简化技术栈架构的能力;实现业务与技术解耦,支持自助式分析和敏捷分析;无论是数据湖中的非结构化或半结构化数据,还是数据库中的结构化数据,都可使用 StoneData 构建企业的数据分析平台,同时完成高吞吐离线处理和高性能在线分析,实现降本增效。


          公司成立至今,已积累了上千位用户,种子客户达 300 多家,取得 30+ 项软件著作权,成功申请并获准通过了 16+ 项技术专利,分别获评杭州市创新型中小企业、浙江省科技型中小企业、国家级科技型中小企业,产品通过公安部三所自主原创性认证,全面满足信创和等保要求。


          文章转载自石原子科技,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

          评论