server/worker_task: fix 'unknown' status for some big task logs

when trying to parse the task status, we seek 8k from the end
which may be into the middle of a line, so the datetime parsing
can fail (when the log message contains ': ')

This patch does a fast search for the last line, and avoid the
'lines' iterator.
This commit is contained in:
Dietmar Maurer 2020-09-04 10:41:13 +02:00
parent cd6ddb5a69
commit 56b666458c
1 changed files with 24 additions and 17 deletions

View File

@ -1,6 +1,6 @@
use std::collections::HashMap; use std::collections::HashMap;
use std::fs::File; use std::fs::File;
use std::io::{BufRead, BufReader}; use std::io::{Read, BufRead, BufReader};
use std::panic::UnwindSafe; use std::panic::UnwindSafe;
use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{Arc, Mutex}; use std::sync::{Arc, Mutex};
@ -195,8 +195,8 @@ pub fn create_task_log_dirs() -> Result<(), Error> {
/// If there is not a single line with at valid datetime, we assume the /// If there is not a single line with at valid datetime, we assume the
/// starttime to be the endtime /// starttime to be the endtime
pub fn upid_read_status(upid: &UPID) -> Result<TaskState, Error> { pub fn upid_read_status(upid: &UPID) -> Result<TaskState, Error> {
let mut endtime = upid.starttime;
let mut status = TaskState::Unknown { endtime }; let mut status = TaskState::Unknown { endtime: upid.starttime };
let path = upid.log_path(); let path = upid.log_path();
@ -207,22 +207,29 @@ pub fn upid_read_status(upid: &UPID) -> Result<TaskState, Error> {
use std::io::SeekFrom; use std::io::SeekFrom;
let _ = file.seek(SeekFrom::End(-8192)); // ignore errors let _ = file.seek(SeekFrom::End(-8192)); // ignore errors
let reader = BufReader::new(file); let mut data = Vec::with_capacity(8192);
file.read_to_end(&mut data)?;
for line in reader.lines() { let last_line = {
let line = line?; let mut start = 0;
for pos in data.len()-1..=0 {
let mut iter = line.splitn(2, ": "); if data[pos] == b'\n' {
if let Some(time_str) = iter.next() { start = pos + 1;
endtime = chrono::DateTime::parse_from_rfc3339(time_str) break;
.map_err(|err| format_err!("cannot parse '{}': {}", time_str, err))?
.timestamp();
} else {
continue;
} }
match iter.next().and_then(|rest| rest.strip_prefix("TASK ")) { }
None => continue, &data[start..]
Some(rest) => { };
let last_line = std::str::from_utf8(last_line)
.map_err(|err| format_err!("upid_read_status: utf8 parse failed: {}", err))?;
let mut iter = last_line.splitn(2, ": ");
if let Some(time_str) = iter.next() {
if let Ok(endtime) = chrono::DateTime::parse_from_rfc3339(time_str) {
let endtime = endtime.timestamp();
if let Some(rest) = iter.next().and_then(|rest| rest.strip_prefix("TASK ")) {
if let Ok(state) = TaskState::from_endtime_and_message(endtime, rest) { if let Ok(state) = TaskState::from_endtime_and_message(endtime, rest) {
status = state; status = state;
} }