暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

Go实现海量日志收集系统

IT大咖说 2021-01-16
1653

再次整理了一下这个日志收集系统的框,如下图

这次要实现的代码的整体逻辑为:

完整代码地址为: https://github.com/pythonsite/logagent

etcd介绍

高可用的分布式key-value存储,可以用于配置共享和服务发现

类似的项目:zookeeper和consul

开发语言:go

接口:提供restful的接口,使用简单

实现算法:基于raft算法的强一致性,高可用的服务存储目录

etcd的应用场景:

  • 服务发现和服务注册

  • 配置中心(我们实现的日志收集客户端需要用到)

  • 分布式锁

  • master选举

官网对etcd的有一个非常简明的介绍:

etcd搭建:
下载地址:https://github.com/coreos/etcd/releases/
根据自己的环境下载对应的版本然后启动起来就可以了

启动之后可以通过如下命令验证一下:

[root@localhost etcd-v3.2.18-linux-amd64]# ./etcdctl set name zhaofan zhaofan[root@localhost etcd-v3.2.18-linux-amd64]# ./etcdctl get namezhaofan[root@localhost etcd-v3.2.18-linux-amd64]#
复制

context 介绍和使用

其实这个东西翻译过来就是上下文管理,那么context的作用是做什么,主要有如下两个作用:

  • 控制goroutine的超时

  • 保存上下文数据

通过下面一个简单的例子进行理解:


package mainimport (    "fmt"    "time"    "net/http"    "context"    "io/ioutil")type Result struct{    r *http.Response    err error}func process(){    ctx,cancel := context.WithTimeout(context.Background(),2*time.Second)    defer cancel()    tr := &http.Transport{}    client := &http.Client{Transport:tr}    c := make(chan Result,1)    req,err := http.NewRequest("GET","http://www.google.com",nil)    if err != nil{        fmt.Println("http request failed,err:",err)        return    }    // 如果请求成功了会将数据存入到管道中    go func(){        resp,err := client.Do(req)        pack := Result{resp,err}        c <- pack    }()    select{    case <- ctx.Done():        tr.CancelRequest(req)        fmt.Println("timeout!")    case res := <-c:        defer res.r.Body.Close()        out,_:= ioutil.ReadAll(res.r.Body)        fmt.Printf("server response:%s",out)    }    return}func main() {    process()}
复制

写一个通过context保存上下文,代码例子如:

package mainimport (    "github.com/Go-zh/net/context"    "fmt")func add(ctx context.Context,a,b int) int {    traceId := ctx.Value("trace_id").(string)    fmt.Printf("trace_id:%v\n",traceId)    return a+b}func calc(ctx context.Context,a, b int) int{    traceId := ctx.Value("trace_id").(string)    fmt.Printf("trace_id:%v\n",traceId)    //再将ctx传入到add中    return add(ctx,a,b)}func main() {    //将ctx传递到calc中    ctx := context.WithValue(context.Background(),"trace_id","123456")    calc(ctx,20,30)}
复制

结合etcd和context使用

关于通过go连接etcd的简单例子:(这里有个小问题需要注意就是etcd的启动方式,默认启动可能会连接不上,尤其你是在虚拟你安装,所以需要通过如下命令启动:
./etcd --listen-client-urls http://0.0.0.0:2371 --advertise-client-urls http://0.0.0.0:2371 --listen-peer-urls http://0.0.0.0:2381
)

package mainimport (    etcd_client "github.com/coreos/etcd/clientv3"    "time"    "fmt")func main() {    cli, err := etcd_client.New(etcd_client.Config{        Endpoints:[]string{"192.168.0.118:2371"},        DialTimeout:5*time.Second,    })    if err != nil{        fmt.Println("connect failed,err:",err)        return    }    fmt.Println("connect success")    defer cli.Close()}
复制

下面一个例子是通过连接etcd,存值并取值

package mainimport (    "github.com/coreos/etcd/clientv3"    "time"    "fmt"    "context")func main() {    cli,err := clientv3.New(clientv3.Config{        Endpoints:[]string{"192.168.0.118:2371"},        DialTimeout:5*time.Second,    })    if err != nil{        fmt.Println("connect failed,err:",err)        return    }    fmt.Println("connect succ")    defer cli.Close()    ctx,cancel := context.WithTimeout(context.Background(),time.Second)    _,err = cli.Put(ctx,"logagent/conf/","sample_value")    cancel()    if err != nil{        fmt.Println("put failed,err",err)        return    }    ctx, cancel = context.WithTimeout(context.Background(),time.Second)    resp,err := cli.Get(ctx,"logagent/conf/")    cancel()    if err != nil{        fmt.Println("get failed,err:",err)        return    }    for _,ev := range resp.Kvs{        fmt.Printf("%s:%s\n",ev.Key,ev.Value)    }}
复制

关于context官网也有一个例子非常有用,用于控制开启的goroutine的退出,代码如下:


package mainimport (    "context"    "fmt")func main() {    // gen generates integers in a separate goroutine and    // sends them to the returned channel.    // The callers of gen need to cancel the context once    // they are done consuming generated integers not to leak    // the internal goroutine started by gen.    gen := func(ctx context.Context) <-chan int {        dst := make(chan int)        n := 1        go func() {            for {                select {                case <-ctx.Done():                    return // returning not to leak the goroutine                case dst <- n:                    n++                }            }        }()        return dst    }    ctx, cancel := context.WithCancel(context.Background())    defer cancel() // cancel when we are finished consuming integers    for n := range gen(ctx) {        fmt.Println(n)        if n == 5 {            break        }    }}
复制

关于官网文档中的WithDeadline演示的代码例子:

package mainimport (    "context"    "fmt"    "time")func main() {    d := time.Now().Add(50 * time.Millisecond)    ctx, cancel := context.WithDeadline(context.Background(), d)     Even though ctx will be expired, it is good practice to call its     cancelation function in any case. Failure to do so may keep the     context and its parent alive longer than necessary.    defer cancel()    select {    case <-time.After(1 * time.Second):        fmt.Println("overslept")    case <-ctx.Done():        fmt.Println(ctx.Err())    }}
复制

通过上面的代码有了一个基本的使用,那么如果我们通过etcd来做配置管理,如果配置更改之后,我们如何通知对应的服务器配置更改,通过下面例子演示:

package mainimport (    "github.com/coreos/etcd/clientv3"    "time"    "fmt"    "context")func main() {    cli,err := clientv3.New(clientv3.Config{        Endpoints:[]string{"192.168.0.118:2371"},        DialTimeout:5*time.Second,    })    if err != nil {        fmt.Println("connect failed,err:",err)        return    }    defer cli.Close()    // 这里会阻塞    rch := cli.Watch(context.Background(),"logagent/conf/")    for wresp := range rch{        for _,ev := range wresp.Events{            fmt.Printf("%s %q : %q\n", ev.Type, ev.Kv.Key, ev.Kv.Value)        }    }}
复制

实现一个kafka的消费者代码的简单例子:

package mainimport (    "github.com/Shopify/sarama"    "strings"    "fmt"    "time")func main() {    consumer,err := sarama.NewConsumer(strings.Split("192.168.0.118:9092",","),nil)    if err != nil{        fmt.Println("failed to start consumer:",err)        return    }    partitionList,err := consumer.Partitions("nginx_log")    if err != nil {        fmt.Println("Failed to get the list of partitions:",err)        return    }    fmt.Println(partitionList)    for partition := range partitionList{        pc,err := consumer.ConsumePartition("nginx_log",int32(partition),sarama.OffsetNewest)        if err != nil {            fmt.Printf("failed to start consumer for partition %d:%s\n",partition,err)            return        }        defer pc.AsyncClose()        go func(partitionConsumer sarama.PartitionConsumer){            for msg := range pc.Messages(){                fmt.Printf("partition:%d Offset:%d Key:%s Value:%s",msg.Partition,msg.Offset,string(msg.Key),string(msg.Value))            }        }(pc)    }    time.Sleep(time.Hour)    consumer.Close()}
复制

但是上面的代码并不是最佳代码,因为我们最后是通过time.sleep等待goroutine的执行,我们可以更改为通过sync.WaitGroup方式实现

package mainimport (    "github.com/Shopify/sarama"    "strings"    "fmt"    "sync")var (    wg sync.WaitGroup)func main() {    consumer,err := sarama.NewConsumer(strings.Split("192.168.0.118:9092",","),nil)    if err != nil{        fmt.Println("failed to start consumer:",err)        return    }    partitionList,err := consumer.Partitions("nginx_log")    if err != nil {        fmt.Println("Failed to get the list of partitions:",err)        return    }    fmt.Println(partitionList)    for partition := range partitionList{        pc,err := consumer.ConsumePartition("nginx_log",int32(partition),sarama.OffsetNewest)        if err != nil {            fmt.Printf("failed to start consumer for partition %d:%s\n",partition,err)            return        }        defer pc.AsyncClose()        go func(partitionConsumer sarama.PartitionConsumer){            wg.Add(1)            for msg := range partitionConsumer.Messages(){                fmt.Printf("partition:%d Offset:%d Key:%s Value:%s",msg.Partition,msg.Offset,string(msg.Key),string(msg.Value))            }            wg.Done()        }(pc)    }    //time.Sleep(time.Hour)    wg.Wait()    consumer.Close()}
复制

将客户端需要收集的日志信息放到etcd中

关于etcd处理的代码为:

package mainimport (    "github.com/coreos/etcd/clientv3"    "time"    "github.com/astaxie/beego/logs"    "context"    "fmt")var Client *clientv3.Clientvar logConfChan chan string// 初始化etcdfunc initEtcd(addr []string,keyfmt string,timeout time.Duration)(err error){    var keys []string    for _,ip := range ipArrays{        //keyfmt = logagent/%s/log_config        keys = append(keys,fmt.Sprintf(keyfmt,ip))    }    logConfChan = make(chan string,10)    logs.Debug("etcd watch key:%v timeout:%v", keys, timeout)    Client,err = clientv3.New(clientv3.Config{        Endpoints:addr,        DialTimeout: timeout,    })    if err != nil{        logs.Error("connect failed,err:%v",err)        return    }    logs.Debug("init etcd success")    waitGroup.Add(1)    for _, key := range keys{        ctx,cancel := context.WithTimeout(context.Background(),2*time.Second)        // 从etcd中获取要收集日志的信息        resp,err := Client.Get(ctx,key)        cancel()        if err != nil {            logs.Warn("get key %s failed,err:%v",key,err)            continue        }        for _, ev := range resp.Kvs{            logs.Debug("%q : %q\n",  ev.Key, ev.Value)            logConfChan <- string(ev.Value)        }    }    go WatchEtcd(keys)    return}func WatchEtcd(keys []string){    // 这里用于检测当需要收集的日志信息更改时及时更新    var watchChans []clientv3.WatchChan    for _,key := range keys{        rch := Client.Watch(context.Background(),key)        watchChans = append(watchChans,rch)    }    for {        for _,watchC := range watchChans{            select{            case wresp := <-watchC:                for _,ev:= range wresp.Events{                    logs.Debug("%s %q : %q\n", ev.Type, ev.Kv.Key, ev.Kv.Value)                    logConfChan <- string(ev.Kv.Value)                }            default:            }        }        time.Sleep(time.Second)    }    waitGroup.Done()}func GetLogConf()chan string{    return logConfChan}
复制

同样的这里增加对了限速的处理,毕竟日志收集程序不能影响了当前业务的性能,所以增加了limit.go用于限制速度:

package mainimport (    "time"    "sync/atomic"    "github.com/astaxie/beego/logs")type SecondLimit struct {    unixSecond int64    curCount int32    limit int32}func NewSecondLimit(limit int32) *SecondLimit {    secLimit := &SecondLimit{        unixSecond:time.Now().Unix(),        curCount:0,        limit:limit,    }    return secLimit}func (s *SecondLimit) Add(count int) {    sec := time.Now().Unix()    if sec == s.unixSecond {        atomic.AddInt32(&s.curCount,int32(count))        return    }    atomic.StoreInt64(&s.unixSecond,sec)    atomic.StoreInt32(&s.curCount, int32(count))}func (s *SecondLimit) Wait()bool {    for {        sec := time.Now().Unix()        if (sec == atomic.LoadInt64(&s.unixSecond)) && s.curCount == s.limit {            time.Sleep(time.Microsecond)            logs.Debug("limit is running,limit:%d s.curCount:%d",s.limit,s.curCount)            continue        }        if sec != atomic.LoadInt64(&s.unixSecond) {            atomic.StoreInt64(&s.unixSecond,sec)            atomic.StoreInt32(&s.curCount,0)        }        logs.Debug("limit is exited")        return false    }
}

复制

小结

这次基本实现了日志收集的前半段的处理,后面将把日志扔到es中,并最终在页面上呈现





来源:

https://www.toutiao.com/a6916833750924018179/

“IT大咖说”欢迎广大技术人员投稿,投稿邮箱:aliang@itdks.com



来都来了,走啥走,留个言呗~




 IT大咖说  |  关于版权 

由“IT大咖说(ID:itdakashuo)”原创的文章,转载时请注明作者、出处及微信公众号。投稿、约稿、转载请加微信:ITDKS10(备注:投稿),茉莉小姐姐会及时与您联系!

感谢您对IT大咖说的热心支持!



相关推荐


推荐文章


文章转载自IT大咖说,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

评论