mod_installer/
weidu_parser.rs

1use std::{
2    sync::{
3        Arc,
4        atomic::{AtomicUsize, Ordering},
5        mpsc::{Receiver, Sender, TryRecvError},
6    },
7    thread,
8};
9
10use config::{args::Options, parser_config::ParserConfig, state::State};
11
12use crate::utils::sleep;
13
14#[derive(Debug)]
15enum ParserState {
16    CollectingQuestion,
17    WaitingForMoreQuestionContent,
18    LookingForInterestingOutput,
19}
20
21pub(crate) fn parse_raw_output(
22    options: &Options,
23    sender: Sender<State>,
24    receiver: Receiver<String>,
25    parser_config: Arc<ParserConfig>,
26    wait_count: Arc<AtomicUsize>,
27) {
28    let mut current_state = ParserState::LookingForInterestingOutput;
29    let mut buffer = vec![];
30    let mut question = vec![];
31    let mut grace_ticks: usize = 3;
32    if let Err(err) = sender.send(State::InProgress) {
33        log::error!("Failed to send process start event, {err}");
34        return;
35    }
36    let options = options.clone();
37    thread::spawn(move || {
38        loop {
39            match receiver.try_recv() {
40                Ok(string) => {
41                    log::info!("{string}");
42                    let installer_state = parser_config.detect_weidu_finished_state(&string);
43                    if installer_state != State::InProgress
44                        && let Err(err) = sender.send(installer_state)
45                    {
46                        log::error!("Failed to send process error event. {err}");
47                        return;
48                    }
49                    buffer.push(string.clone());
50                    match current_state {
51                        ParserState::CollectingQuestion
52                        | ParserState::WaitingForMoreQuestionContent => {
53                            if parser_config.useful_status_words.contains(&string) {
54                                log::debug!(
55                                    "Weidu seems to know an answer for the last question, ignoring it"
56                                );
57                                current_state = ParserState::LookingForInterestingOutput;
58                                question.clear();
59                            } else {
60                                log::debug!("Appending line '{string}' to user question");
61                                question.push(string);
62                                current_state = ParserState::CollectingQuestion;
63                            }
64                        }
65                        ParserState::LookingForInterestingOutput => {
66                            if parser_config.string_looks_like_question(&string) {
67                                log::debug!(
68                                    "Changing parser state to '{:?}' due to line {}",
69                                    ParserState::CollectingQuestion,
70                                    string
71                                );
72                                current_state = ParserState::CollectingQuestion;
73                                let min_index = buffer.len().saturating_sub(options.lookback);
74                                for history in buffer.get(min_index..).unwrap_or_default() {
75                                    question.push(history.clone());
76                                }
77                            }
78                        }
79                    }
80                }
81                Err(TryRecvError::Empty) => match current_state {
82                    ParserState::CollectingQuestion if grace_ticks > 0 => {
83                        log::debug!("Collecting question, with grace of {grace_ticks} remaining");
84                        sleep(options.tick);
85                        grace_ticks -= 1;
86                    }
87                    ParserState::CollectingQuestion => {
88                        log::debug!(
89                            "Changing parser state to '{:?}'",
90                            ParserState::WaitingForMoreQuestionContent
91                        );
92                        current_state = ParserState::WaitingForMoreQuestionContent;
93                        grace_ticks = 3;
94                    }
95                    ParserState::WaitingForMoreQuestionContent => {
96                        log::debug!("No new weidu output, sending question to user");
97                        if let Err(err) = sender.send(State::RequiresInput {
98                            question: question.join(""),
99                        }) {
100                            log::error!("Failed to send question: {err}");
101                            return;
102                        }
103                        current_state = ParserState::LookingForInterestingOutput;
104                        question.clear();
105                        continue;
106                    }
107                    _ if wait_count.load(Ordering::Relaxed) >= options.timeout => {
108                        if let Err(err) = sender.send(State::TimedOut) {
109                            log::error!("Could send timeout error: {}", err);
110                            return;
111                        }
112                    }
113                    _ => {}
114                },
115                Err(TryRecvError::Disconnected) => {
116                    if let Err(err) = sender.send(State::Completed) {
117                        log::error!("Failed to send process end event {err}");
118                    }
119                    return;
120                }
121            }
122        }
123    });
124}