暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

golang源码分析:推荐系统gorse(part I)

con      gorse使纯go实现的一套分布式推荐系统解决方案:

https://github.com/zhenghaoz/gorse,包含了master,worker和server三个部分。

         整套模型中包含了三个实体:用户、物料、反馈

    type User struct {
    UserId string
    }

    用户通过一个ID来唯一标示

      type Item struct {
      ItemId string
      Timestamp time.Time
      Labels []string
      }

      物料包含一个时间戳,通过它来判断物料是否过期,Lables是物料的特征

        type Feedback struct {
        FeedbackType string
        UserId string
        ItemId string
        Timestamp time.Time
        }

        反馈的类型可以是正向、负向或者中性( positive (like), negative (dislike) or neutural (read))

              gorse做的事情就是把合适的物料推荐给合适的人:推荐数据包括两部分:个性化推荐和非个性化推荐(popular/latest/similar)。

               三个节点的分工非常明确:

               master节点负责用所有的物料数据和用户数据以及反馈数据来拆分训练集和测试集,然后训练模型,模型分为两大类:排序和点击预测;训练完毕后通过模型搜索来获取最优模型和参数。同时非个性化推荐也是master节点完成的。推荐结果写入缓存,缓存是用list来维护的,会定期过滤过期的数据。

              worker节点 ,通过grpc从master拉取模型数据,还有用户列表,定时分批对每一个用户来计算推荐数据,存入缓存。

              server节点提供,提供restful的api接口和swagger文档,前端可以通过接口从缓存中获取当前用户的推荐数据,或者对推荐的数据做出反馈。

              以上就是gorse的架构和核心数据,下面结合源码来进行分析。源码目录如下:

          .deepsource.toml //放置一些配置项,熟悉整个代码后改配置可以快速配出一个系统
          LICENSE
          README.md
          assets
          base
          bin
          cmd 三个节点的入口文件
          codecov.yml
          config
          docker //三个节点会编译成三个镜像,这里放置了他们的Dockerfile
          //以及docker-compose.yml,通过docker compose up 可以启动服务
          floats
          go.mod
          go.sum
          master master节点的实现
          misc
          model 模型相关的实现
          protocol
          server server 节点的实现
          storage  存储的实现,主要是mysqlredis
          worker worker节点的实现

          master

          master 节点的核心逻辑主要包含下面几步

            1,LoadLocalCache //通过缓存的数据初始化模型
            2,初始化缓存和持久化存储(redis、mysql)
            m.DataClient, err = data.Open(m.GorseConfig.Database.DataStore)
            m.CacheClient, err = cache.Open(m.GorseConfig.Database.CacheStore)
            3,err = m.loadRankingDataset()//加载排序用的数据集
            m.rankingTrainSet, m.rankingTestSet = rankingDataset.Split(0, 0)
            4,err = m.loadClickDataset()//加载点击预测数据集
            m.clickTrainSet, m.clickTestSet = clickDataset.Split(0.2, 0)
            5go m.StartHttpServer()//启动一个httpserver主要用户数据的批量导入
            http.Handle("/", http.FileServer(&SinglePageAppFileSystem{statikFS}))
            http.HandleFunc("/api/bulk/items", m.importExportItems)
            http.HandleFunc("/api/bulk/feedback", m.importExportFeedback)
            6go m.FitLoop()//进入训练模型的大循环
            7go m.SearchLoop()//进入最佳模型参数筛选大循环
            8go m.AnalyzeLoop()//进行全局非个性化推荐,存入缓存
            9,启动grpc服务
            protocol.RegisterMasterServer(grpcServer, m)
              if err = grpcServer.Serve(lis); err != nil {

            1,训练模型的大循环也分为三个部分

              1,加载数据集
                m.hasFeedbackInserted()
              m.loadRankingDataset()
                m.loadClickDataset()
               2,训练排序模型、计算评分、存储
              m.fitRankingModelAndNonPersonalized(lastNumRankingUsers, lastNumRankingItems, lastNumRankingFeedback)
                
                score := rankingModel.Fit(m.rankingTrainSet, m.rankingTestSet, ranking.NewFitConfig().SetJobs(m.GorseConfig.Master.FitJobs))
                err := m.DataClient.InsertMeasurement(data.Measurement{Name: RankingTop10NDCG, Value: score.NDCG, Timestamp: time.Now()});
               3,训练点击预测模型、计算评分、存储
              m.fitClickModel(lastNumClickUsers, lastNumClickItems, lastNumClickFeedback)
                
                bestClickModel, bestClickScore := m.clickModelSearcher.GetBestModel()
                score := clickModel.Fit(m.clickTrainSet, m.clickTestSet, click.NewFitConfig().SetJobs(m.GorseConfig.Master.FitJobs))
                m.DataClient.InsertMeasurement(data.Measurement{Name: ClickPrecision, Value: score.Precision, Timestamp: time.Now()})

              其中排序模型包含三部分的内容

                  // collect similar items
                  m.similar(m.rankingItems, m.rankingFullSet, model.SimilarityDot)
                  // collect popular items
                  m.popItem(m.rankingItems, m.rankingFeedbacks)
                  // collect latest items
                   m.latest(m.rankingItems)

                2,最佳模型参数筛选大循环

                主要包括两部分的筛选:排序模型和点击预测模型:

                   1,排序模型
                  m.searchRankingModel(lastNumRankingUsers, lastNumRankingItems, lastNumRankingFeedbacks)
                   err = m.rankingModelSearcher.Fit(m.rankingTrainSet, m.rankingTestSet)
                   2,点击预测模型    
                   m.searchClickModel(lastNumClickUsers, lastNumClickItems, lastNumClickFeedbacks)
                   err = m.clickModelSearcher.Fit(m.clickTrainSet, m.clickTestSet)

                  排序模型具体有多种

                    models := []string{"bpr", "ccd", "knn"}
                    r := RandomSearchCV(m, trainSet, valSet, m.GetParamsGrid(), searcher.numTrials, 0,NewFitConfig().SetJobs(searcher.numJobs))
                    score := estimator.Fit(trainSet, testSet, fitConfig)  

                    点击预测模型参数搜索也分两步:搜索、计算评分

                       RandomSearchCV(fm, trainSet, valSet, grid, searcher.numTrials*2, 0,NewFitConfig().SetJobs(searcher.numJobs))
                       score := estimator.Fit(trainSet, testSet, fitConfig)

                      3,全局非个性化推荐,存入缓存

                         m.analyze()
                         clickThroughRates, err := m.DataClient.GetMeasurements(ClickThroughRate, 30)
                         m.DataClient.GetClickThroughRate(date, m.GorseConfig.Database.ClickFeedbackTypes, m.GorseConfig.Database.ReadFeedbackType)
                         m.DataClient.InsertMeasurement(data.Measurement{
                        Name: ClickThroughRate,
                        Timestamp: date,
                        Value: float32(clickThroughRate),
                        })

                        worker

                        个性化推荐的逻辑在worker节点上,运用master节点训练的模型数据,来做排序推荐,主要分为下面几步:

                          1,加载上次结果的缓存数据
                          state, err := LoadLocalCache(filepath.Join(os.TempDir(), "gorse-worker"))
                          2,获取master的grpc连接
                          w.masterClient = protocol.NewMasterClient(conn)
                          3,同步数据
                          go w.Sync()
                          4,拉取用户和模型相关的数据
                          go w.Pull()
                          主要包括三个部分:用户数据、排序模型、点击预测模型
                          w.masterClient.GetUserIndex
                          w.masterClient.GetRankingModel
                          w.masterClient.GetClickModel
                          5,启动系统监控
                          go w.ServeMetrics()
                          http.Handle("/metrics", promhttp.Handler())
                          6,进行个性化推荐
                          w.Recommend(w.rankingModel, workingUsers)

                          1,同步的数据分三类

                              1,从master节点获取的元数据
                            w.masterClient.GetMeta
                            2,持久化存储里的数据
                              w.dataClient, err = data.Open(w.cfg.Database.DataStore)
                              3,cache里的数据
                            w.cacheClient, err = cache.Open(w.cfg.Database.CacheStore)

                            2,个性化推荐是根据系统的处理能力和用户数量进行分批处理的,处理过程如下

                              1,加载所有物料
                              itemIds := m.GetItemIndex().GetNames()
                               2,加载用户推荐的历史数据
                              historyItems, err := loadUserHistoricalItems(w.dataClient, userId)
                               3,加载用户正反馈的数据
                              favoredItems, err := loadUserHistoricalItems(w.dataClient, userId, w.cfg.Database.PositiveFeedbackType...)
                                4,获取候选物料和评分
                              candidateItems, candidateScores := recItems.PopAll()
                              5,加载最新数据
                              latestItems, err := w.cacheClient.GetScores(cache.LatestItems, "", 0, w.cfg.Recommend.ExploreLatestNum-1)
                              6,通过点击预测模型排序
                              result, err = w.rankByClickTroughRate(userId, candidateItems)
                              7,随机排序
                              result = w.randomInsertLatestItem(candidateItems, candidateScores)
                              8,缓存数据
                              w.cacheClient.SetScores(cache.RecommendItems, userId, result)
                              w.cacheClient.SetString(cache.LastUpdateRecommendTime, userId, base.Now())

                              server

                              其它两个节点主要是离线的,server节点提供在线服务,分为三步

                                1,加载缓存数据
                                state, err := LoadLocalCache(filepath.Join(os.TempDir(), "gorse-server"))
                                2,获取主节点的连接
                                s.masterClient = protocol.NewMasterClient(conn)
                                3,同步数据主要是元数据
                                go s.Sync()
                                s.masterClient.GetMeta
                                4,提供http服务
                                s.StartHttpServer()

                                提供http服务包括三个部分

                                  1,web服务
                                  s.CreateWebService()
                                  2,接口文档
                                  specConfig := restfulspec.Config{
                                  WebServices: restful.RegisteredWebServices(),
                                  APIPath: "/apidocs.json",
                                  }
                                   3,监控
                                   http.Handle("/metrics", promhttp.Handler())

                                  以上就是三个节点的相关核心源码,下面介绍下排序模型和点击预测模型

                                  1,排序模型

                                  gorse实现了4个排序模型

                                  A,als:Alternative -Least-Squares

                                  交替最小二乘法

                                      它通过最小化误差的平方和寻找数据的最佳函数匹配。利用最小二乘法可以简便地求得未知的数据,并使得这些求得的数据与实际数据之间误差的平方和为最小。最小二乘法可用于曲线拟合。

                                  B,bpr:Bayesian Personalized Ranking

                                  基于贝叶斯后验优化的个性化排序算法

                                       BPR算法将用户对物品的评分(显示反馈“1”,隐式反馈“0”)处理为一个pair对的集合<i,j>,其中i为评分为1的物品,j为评分为0的物品。假设某用户有M个“1”的评分,N个“0”的评分,则该用户共有M*N个pair对。

                                     这样数据集就由三元组 <u,i,j>表示,该三元组的物理含义为:相对于物品“j”,用户“u”更喜欢物品“i”。

                                  C,ccd:Cyclic Coordinate Descent (CCD)

                                  是一个启发式的迭代搜索算法,

                                  D,knn:k-Nearest Neighbor

                                  K最近邻分类算法

                                      如果一个样本在特征空间中的k个最相似(即特征空间中最邻近)的样本中的

                                  大多数属于某一个类别,则该样本也属于这个类别

                                  2,点击预测模型

                                  Classification

                                  Regression 

                                  分离器:支持k-fold、 比率ratio 、 leave-one-out分离数据集

                                  模型:推荐模型基于协同过滤算法,包括矩阵分解、基于临接的方法、Slope One、Co-Clustering2

                                  评估:可使用RMSE、 MAE来评分,包括准确率Precision、召回率Recall、归一化折损累积增益NDCG、MAP、MRR、AUC

                                  参数搜寻:使用方式网格搜索grid search 或 随机搜索random search寻找最佳超参数

                                  持久化:保存模型或加载模型



                                  文章转载自golang算法架构leetcode技术php,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

                                  评论