1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65
| func (c *Consumer) GetStartEndOffsets(startTime, endTime int64) (startOffset, endOffset int64) { client, err := sarama.NewClient([]string{brokerList}, nil) helpers.CheckError(err)
nowstamp := time.Now().Unix()
defer client.Close()
startOffset, err = client.GetOffset(topic, 0, startTime*1000) if err != nil { startOffset, err = client.GetOffset(topic, 0, sarama.OffsetOldest) if err != nil { panic(err) } fmt.Println("get offset by time error, then get first offset:", startOffset) }
lastTime := endTime for { lastTime = lastTime + 43200 endOffset, err = client.GetOffset(topic, 0, lastTime*1000) if err != nil || lastTime >= nowstamp { endOffset, err = client.GetOffset(topic, 0, sarama.OffsetNewest) if err != nil { panic(err) } fmt.Println("get offset by time error, then get last offset:", endOffset)
endOffset = endOffset - 1 break }
cmdstr := fmt.Sprintf(`/data/plattech/kafka_2.9.2-0.8.1.1/bin/kafka-simple-consumer-shell.sh --broker-list %s --topic game_server --partition 0 --offset %d --max-messages 1`, brokerList, endOffset) fmt.Println(cmdstr) out, err := exec.Command("/bin/sh", "-c", cmdstr).Output() if err != nil { continue }
dec := json.NewDecoder(strings.NewReader(string(out)))
var msg map[string]interface{} dec.UseNumber() if err := dec.Decode(&msg); err != nil { log.Fatal(err) }
tm, err := msg["time"].(json.Number).Int64() if err != nil { log.Fatal(err) }
if tm >= endTime { break }
}
return startOffset, endOffset }
|