tail_service.go 3.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165
  1. package services
  2. import (
  3. "bufio"
  4. "encoding/json"
  5. "fmt"
  6. "github.com/pkg/errors"
  7. "io"
  8. "jollia.cn/golib/jokode"
  9. "ngcat/domain"
  10. "ngcat/domain/po"
  11. "os"
  12. "os/exec"
  13. "sync"
  14. "time"
  15. )
  16. var (
  17. fileInstanceSize int64
  18. msgList *domain.QueueX
  19. lastTableName string
  20. lastTableNameLocker *sync.RWMutex
  21. )
  22. func init() {
  23. msgList = domain.NewQueue()
  24. }
  25. func getLastTableName() string {
  26. lastTableNameLocker.RLock()
  27. defer lastTableNameLocker.RUnlock()
  28. return lastTableName
  29. }
  30. func TailFile(filePath string, retryInterval time.Duration) {
  31. var err error = nil
  32. for {
  33. err = tailProcessor(filePath)
  34. if err != nil {
  35. jokode.Errorf("tailProcessor error: %v", err)
  36. return
  37. }
  38. time.Sleep(retryInterval * time.Second)
  39. }
  40. }
  41. func testFileStat(filePath string, exitCh chan<- bool) {
  42. for {
  43. if fi, err := os.Stat(filePath); err != nil {
  44. jokode.Warnf("try test file '%s' stat fail", filePath)
  45. return
  46. } else {
  47. fs := fi.Size()
  48. if fileInstanceSize > 0 && fs < fileInstanceSize {
  49. // 可能发生了截断或者绕接 需要触发重新监听
  50. exitCh <- true
  51. break
  52. }
  53. fileInstanceSize = fs
  54. }
  55. time.Sleep(10 * time.Second)
  56. }
  57. }
  58. func createTableDynamic(tableName string) {
  59. var count int64
  60. db := GetDb()
  61. if err := db.Raw(fmt.Sprintf(`select count(1) from information_schema.tables where table_name='%s'`, tableName)).Scan(&count).Error; err != nil {
  62. s := fmt.Sprintf("get dynamic table by name '%s' fail, error info: %v", tableName, err)
  63. jokode.Error(s)
  64. panic(s)
  65. }
  66. if count == 0 {
  67. if err := db.Table(tableName).AutoMigrate(&po.NginxAccessLog{}); err != nil {
  68. s := fmt.Sprintf("try create table '%s' fail, error info: %v", tableName, err)
  69. jokode.Error(s)
  70. panic(s)
  71. }
  72. }
  73. }
  74. func writeToDb() {
  75. for {
  76. db := GetDb()
  77. if db == nil {
  78. time.Sleep(2 * time.Second)
  79. }
  80. obj := msgList.Pop()
  81. if obj == nil {
  82. time.Sleep(10 * time.Millisecond)
  83. continue
  84. }
  85. if x, ok := obj.(*po.NginxAccessLog); ok {
  86. x.FixFields()
  87. if tableName := x.GetTableName(); tableName != getLastTableName() {
  88. createTableDynamic(tableName)
  89. }
  90. if err := GetDb().Save(x).Error; err != nil {
  91. jokode.Errorf("try save to db, error info: %v", err)
  92. }
  93. }
  94. time.Sleep(5 * time.Millisecond)
  95. }
  96. }
  97. func tailProcessor(filePath string) error {
  98. var stdout io.ReadCloser
  99. defer func() {
  100. if stdout != nil {
  101. if err := stdout.Close(); err != nil {
  102. jokode.Warnf("try close stdout pipe fail, error info: %v", err)
  103. }
  104. }
  105. }()
  106. chExit := make(chan bool, 1)
  107. tail := exec.Command("tail", "-f", filePath)
  108. jokode.Infof("try tail file '%s'", filePath)
  109. if so, err := tail.StdoutPipe(); err != nil {
  110. return errors.Wrap(err, "can not get std out pipe")
  111. } else {
  112. stdout = so
  113. }
  114. go testFileStat(filePath, chExit)
  115. go writeToDb()
  116. if err := tail.Start(); err != nil {
  117. return errors.Wrap(err, "can not start tail")
  118. }
  119. reader := bufio.NewReader(stdout)
  120. for {
  121. select {
  122. case <-chExit:
  123. jokode.Warn("receive exit signal")
  124. return nil
  125. default:
  126. if line, _, err := reader.ReadLine(); err != nil {
  127. jokode.Errorf("try read stdout fail, error info: %v", err)
  128. break
  129. } else if len(line) == 0 {
  130. // 没有读取到数据, 有可能 tail 进程已经推出
  131. break
  132. } else {
  133. arg := &po.NginxAccessLog{}
  134. if err = json.Unmarshal(line, arg); err != nil {
  135. jokode.Errorf("try unmarshal '%s' fail, error info: %v", string(line), err)
  136. continue
  137. }
  138. msgList.Push(arg)
  139. }
  140. }
  141. }
  142. }