123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165 |
- package services
- import (
- "bufio"
- "encoding/json"
- "fmt"
- "github.com/pkg/errors"
- "io"
- "jollia.cn/golib/jokode"
- "ngcat/domain"
- "ngcat/domain/po"
- "os"
- "os/exec"
- "sync"
- "time"
- )
- var (
- fileInstanceSize int64
- msgList *domain.QueueX
- lastTableName string
- lastTableNameLocker *sync.RWMutex
- )
- func init() {
- msgList = domain.NewQueue()
- }
- func getLastTableName() string {
- lastTableNameLocker.RLock()
- defer lastTableNameLocker.RUnlock()
- return lastTableName
- }
- func TailFile(filePath string, retryInterval time.Duration) {
- var err error = nil
- for {
- err = tailProcessor(filePath)
- if err != nil {
- jokode.Errorf("tailProcessor error: %v", err)
- return
- }
- time.Sleep(retryInterval * time.Second)
- }
- }
- func testFileStat(filePath string, exitCh chan<- bool) {
- for {
- if fi, err := os.Stat(filePath); err != nil {
- jokode.Warnf("try test file '%s' stat fail", filePath)
- return
- } else {
- fs := fi.Size()
- if fileInstanceSize > 0 && fs < fileInstanceSize {
- // 可能发生了截断或者绕接 需要触发重新监听
- exitCh <- true
- break
- }
- fileInstanceSize = fs
- }
- time.Sleep(10 * time.Second)
- }
- }
- func createTableDynamic(tableName string) {
- var count int64
- db := GetDb()
- if err := db.Raw(fmt.Sprintf(`select count(1) from information_schema.tables where table_name='%s'`, tableName)).Scan(&count).Error; err != nil {
- s := fmt.Sprintf("get dynamic table by name '%s' fail, error info: %v", tableName, err)
- jokode.Error(s)
- panic(s)
- }
- if count == 0 {
- if err := db.Table(tableName).AutoMigrate(&po.NginxAccessLog{}); err != nil {
- s := fmt.Sprintf("try create table '%s' fail, error info: %v", tableName, err)
- jokode.Error(s)
- panic(s)
- }
- }
- }
- func writeToDb() {
- for {
- db := GetDb()
- if db == nil {
- time.Sleep(2 * time.Second)
- }
- obj := msgList.Pop()
- if obj == nil {
- time.Sleep(10 * time.Millisecond)
- continue
- }
- if x, ok := obj.(*po.NginxAccessLog); ok {
- x.FixFields()
- if tableName := x.GetTableName(); tableName != getLastTableName() {
- createTableDynamic(tableName)
- }
- if err := GetDb().Save(x).Error; err != nil {
- jokode.Errorf("try save to db, error info: %v", err)
- }
- }
- time.Sleep(5 * time.Millisecond)
- }
- }
- func tailProcessor(filePath string) error {
- var stdout io.ReadCloser
- defer func() {
- if stdout != nil {
- if err := stdout.Close(); err != nil {
- jokode.Warnf("try close stdout pipe fail, error info: %v", err)
- }
- }
- }()
- chExit := make(chan bool, 1)
- tail := exec.Command("tail", "-f", filePath)
- jokode.Infof("try tail file '%s'", filePath)
- if so, err := tail.StdoutPipe(); err != nil {
- return errors.Wrap(err, "can not get std out pipe")
- } else {
- stdout = so
- }
- go testFileStat(filePath, chExit)
- go writeToDb()
- if err := tail.Start(); err != nil {
- return errors.Wrap(err, "can not start tail")
- }
- reader := bufio.NewReader(stdout)
- for {
- select {
- case <-chExit:
- jokode.Warn("receive exit signal")
- return nil
- default:
- if line, _, err := reader.ReadLine(); err != nil {
- jokode.Errorf("try read stdout fail, error info: %v", err)
- break
- } else if len(line) == 0 {
- // 没有读取到数据, 有可能 tail 进程已经推出
- break
- } else {
- arg := &po.NginxAccessLog{}
- if err = json.Unmarshal(line, arg); err != nil {
- jokode.Errorf("try unmarshal '%s' fail, error info: %v", string(line), err)
- continue
- }
- msgList.Push(arg)
- }
- }
- }
- }
|