mod_installer/
weidu_parser.rs1use std::{
2 sync::{
3 Arc,
4 atomic::{AtomicUsize, Ordering},
5 mpsc::{Receiver, Sender, TryRecvError},
6 },
7 thread,
8};
9
10use config::{args::Options, parser_config::ParserConfig, state::State};
11
12use crate::utils::sleep;
13
14#[derive(Debug)]
15enum ParserState {
16 CollectingQuestion,
17 WaitingForMoreQuestionContent,
18 LookingForInterestingOutput,
19}
20
21pub(crate) fn parse_raw_output(
22 options: &Options,
23 sender: Sender<State>,
24 receiver: Receiver<String>,
25 parser_config: Arc<ParserConfig>,
26 wait_count: Arc<AtomicUsize>,
27) {
28 let mut current_state = ParserState::LookingForInterestingOutput;
29 let mut buffer = vec![];
30 let mut question = vec![];
31 let mut grace_ticks: usize = 3;
32 if let Err(err) = sender.send(State::InProgress) {
33 log::error!("Failed to send process start event, {err}");
34 return;
35 }
36 let options = options.clone();
37 thread::spawn(move || {
38 loop {
39 match receiver.try_recv() {
40 Ok(string) => {
41 log::info!("{string}");
42 let installer_state = parser_config.detect_weidu_finished_state(&string);
43 if installer_state != State::InProgress
44 && let Err(err) = sender.send(installer_state)
45 {
46 log::error!("Failed to send process error event. {err}");
47 return;
48 }
49 buffer.push(string.clone());
50 match current_state {
51 ParserState::CollectingQuestion
52 | ParserState::WaitingForMoreQuestionContent => {
53 if parser_config.useful_status_words.contains(&string) {
54 log::debug!(
55 "Weidu seems to know an answer for the last question, ignoring it"
56 );
57 current_state = ParserState::LookingForInterestingOutput;
58 question.clear();
59 } else {
60 log::debug!("Appending line '{string}' to user question");
61 question.push(string);
62 current_state = ParserState::CollectingQuestion;
63 }
64 }
65 ParserState::LookingForInterestingOutput => {
66 if parser_config.string_looks_like_question(&string) {
67 log::debug!(
68 "Changing parser state to '{:?}' due to line {}",
69 ParserState::CollectingQuestion,
70 string
71 );
72 current_state = ParserState::CollectingQuestion;
73 let min_index = buffer.len().saturating_sub(options.lookback);
74 for history in buffer.get(min_index..).unwrap_or_default() {
75 question.push(history.clone());
76 }
77 }
78 }
79 }
80 }
81 Err(TryRecvError::Empty) => match current_state {
82 ParserState::CollectingQuestion if grace_ticks > 0 => {
83 log::debug!("Collecting question, with grace of {grace_ticks} remaining");
84 sleep(options.tick);
85 grace_ticks -= 1;
86 }
87 ParserState::CollectingQuestion => {
88 log::debug!(
89 "Changing parser state to '{:?}'",
90 ParserState::WaitingForMoreQuestionContent
91 );
92 current_state = ParserState::WaitingForMoreQuestionContent;
93 grace_ticks = 3;
94 }
95 ParserState::WaitingForMoreQuestionContent => {
96 log::debug!("No new weidu output, sending question to user");
97 if let Err(err) = sender.send(State::RequiresInput {
98 question: question.join(""),
99 }) {
100 log::error!("Failed to send question: {err}");
101 return;
102 }
103 current_state = ParserState::LookingForInterestingOutput;
104 question.clear();
105 continue;
106 }
107 _ if wait_count.load(Ordering::Relaxed) >= options.timeout => {
108 if let Err(err) = sender.send(State::TimedOut) {
109 log::error!("Could send timeout error: {}", err);
110 return;
111 }
112 }
113 _ => {}
114 },
115 Err(TryRecvError::Disconnected) => {
116 if let Err(err) = sender.send(State::Completed) {
117 log::error!("Failed to send process end event {err}");
118 }
119 return;
120 }
121 }
122 }
123 });
124}