package services import ( "bufio" "encoding/json" "fmt" "github.com/pkg/errors" "io" "jollia.cn/golib/jokode" "nginx-tail/domain" "nginx-tail/domain/po" "os" "os/exec" "sync" "time" ) var ( fileInstanceSize int64 msgList *domain.QueueX lastTableName string lastTableNameLocker *sync.RWMutex ) func init() { msgList = domain.NewQueue() } func getLastTableName() string { lastTableNameLocker.RLock() defer lastTableNameLocker.RUnlock() return lastTableName } func TailFile(filePath string, retryInterval time.Duration) { var err error = nil for { err = tailProcessor(filePath) if err != nil { jokode.Errorf("tailProcessor error: %v", err) return } time.Sleep(retryInterval * time.Second) } } func testFileStat(filePath string, exitCh chan<- bool) { for { if fi, err := os.Stat(filePath); err != nil { jokode.Warnf("try test file '%s' stat fail", filePath) return } else { fs := fi.Size() if fileInstanceSize > 0 && fs < fileInstanceSize { // 可能发生了截断或者绕接 需要触发重新监听 exitCh <- true break } fileInstanceSize = fs } time.Sleep(10 * time.Second) } } func createTableDynamic(tableName string) { var count int64 db := GetDb() if err := db.Raw(fmt.Sprintf(`select count(1) from information_schema.tables where table_name='%s'`, tableName)).Scan(&count).Error; err != nil { s := fmt.Sprintf("get dynamic table by name '%s' fail, error info: %v", tableName, err) jokode.Error(s) panic(s) } if count == 0 { if err := db.Table(tableName).AutoMigrate(&po.NginxAccessLog{}); err != nil { s := fmt.Sprintf("try create table '%s' fail, error info: %v", tableName, err) jokode.Error(s) panic(s) } } } func writeToDb() { for { db := GetDb() if db == nil { time.Sleep(2 * time.Second) } obj := msgList.Pop() if obj == nil { time.Sleep(10 * time.Millisecond) continue } if x, ok := obj.(*po.NginxAccessLog); ok { x.FixFields() if tableName := x.GetTableName(); tableName != getLastTableName() { createTableDynamic(tableName) } if err := GetDb().Save(x).Error; err != nil { jokode.Errorf("try save to db, error info: %v", err) } } time.Sleep(5 * time.Millisecond) } } func tailProcessor(filePath string) error { var stdout io.ReadCloser defer func() { if stdout != nil { if err := stdout.Close(); err != nil { jokode.Warnf("try close stdout pipe fail, error info: %v", err) } } }() chExit := make(chan bool, 1) tail := exec.Command("tail", "-f", filePath) jokode.Infof("try tail file '%s'", filePath) if so, err := tail.StdoutPipe(); err != nil { return errors.Wrap(err, "can not get std out pipe") } else { stdout = so } go testFileStat(filePath, chExit) go writeToDb() if err := tail.Start(); err != nil { return errors.Wrap(err, "can not start tail") } reader := bufio.NewReader(stdout) for { select { case <-chExit: jokode.Warn("receive exit signal") return nil default: if line, _, err := reader.ReadLine(); err != nil { jokode.Errorf("try read stdout fail, error info: %v", err) break } else if len(line) == 0 { // 没有读取到数据, 有可能 tail 进程已经推出 break } else { arg := &po.NginxAccessLog{} if err = json.Unmarshal(line, arg); err != nil { jokode.Errorf("try unmarshal '%s' fail, error info: %v", string(line), err) continue } msgList.Push(arg) } } } }