Skip to content

Commit 977e2c7

Browse files
committed
[agent] add publish test example, make it runnable with test
1 parent 83a4020 commit 977e2c7

File tree

8 files changed

+125
-58
lines changed

8 files changed

+125
-58
lines changed

README.md

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,8 +45,12 @@ Install [Rust 1.70+](https://www.rust-lang.org/),
4545
[Node v18](https://nodejs.org/), [NPM v9](https://www.npmjs.com/), and
4646
[mprocs](https://github.com/pvolok/mprocs). Then, run
4747
```shell
48-
cd web && npm install && cd ../
48+
cd web && npm ci && cd ../
49+
cd develop && ./run_mqtt.sh && cd ../
4950
mprocs
51+
52+
# this is check MQTT agent if is OK
53+
cd agent && cargo run --example publish_command.rs
5054
```
5155

5256
## 限制

agent/examples/publish_command.rs

Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
use std::path::PathBuf;
2+
use rumqttc::v5::{AsyncClient, MqttOptions, Incoming};
3+
use rumqttc::v5::mqttbytes::QoS;
4+
use tracing_subscriber::EnvFilter;
5+
use mproxy::message::{RequestMessage, ResponseMessage};
6+
7+
#[tokio::main]
8+
async fn main() -> anyhow::Result<()> {
9+
10+
let command = "ls";
11+
let config_path:PathBuf = "./config.yml".parse().unwrap();
12+
13+
let is_atty = atty::is(atty::Stream::Stdout);
14+
tracing_subscriber::fmt().with_env_filter(EnvFilter::try_from_default_env().unwrap_or_else(|_| EnvFilter::from("debug")),)
15+
.with_ansi(is_atty).init();
16+
17+
let config = mproxy::config::Config::new(Some(config_path)).unwrap();
18+
19+
let mqtt_url = format!("{}?client_id={}", &config.server, "test_web");
20+
let options = MqttOptions::parse_url(&mqtt_url).unwrap();
21+
let (client, mut eventloop) = AsyncClient::new(options,20);
22+
23+
let topic = config.get_response_command_topic();
24+
client.subscribe(topic, QoS::ExactlyOnce).await?;
25+
26+
tokio::spawn(async move {
27+
loop {
28+
let event = eventloop.poll().await;
29+
match event {
30+
Ok(event) => {
31+
match event {
32+
rumqttc::v5::Event::Incoming(Incoming::Publish(data)) => {
33+
let resp: ResponseMessage = serde_json::from_slice(data.payload.as_ref()).unwrap();
34+
35+
match resp {
36+
ResponseMessage::Ok {data, seq,..} => println!("recv:{data}, seq: {seq}"),
37+
ResponseMessage::Err {message,..} => eprintln!("{message}"),
38+
}
39+
}
40+
_ => ()
41+
}
42+
},
43+
Err(err) => eprintln!("{:?}", err),
44+
};
45+
}
46+
});
47+
let command = RequestMessage::Cmd{command: command.to_string(), request_id: "test_request_id".to_string()};
48+
let command = serde_json::to_vec(&command).unwrap();
49+
client.publish(config.get_command_topic(), QoS::ExactlyOnce, false, command).await?;
50+
tokio::signal::ctrl_c().await?;
51+
52+
Ok(())
53+
}

agent/src/connection.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ use tokio::sync::broadcast;
77
use tracing::{debug, error, info, warn};
88

99
use crate::config::Config;
10-
use crate::handler::{RequestMessage, ResponseMessage};
10+
use crate::message::{RequestMessage, ResponseMessage};
1111

1212
#[derive(Clone, Debug)]
1313
pub struct Connection {

agent/src/handler.rs

Lines changed: 12 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -1,26 +1,9 @@
11
use std::io::{BufRead, BufReader};
2-
use serde::{Deserialize, Serialize};
32
use tokio::sync::broadcast;
43
use tracing::{info};
54
use crate::connection::Connection;
6-
use std::process::Command;
7-
8-
9-
#[derive(Serialize, Deserialize, Debug, Clone)]
10-
#[serde(tag = "type")]
11-
pub enum RequestMessage {
12-
Cmd {
13-
command: String,
14-
request_id: String,
15-
}
16-
}
17-
18-
#[derive(Serialize, Deserialize, Debug, Clone)]
19-
#[serde(tag = "type")]
20-
pub enum ResponseMessage {
21-
Ok{ request_id:String, seq:u32, data:String, pid:u32},
22-
Err{ request_id:String, message:String}
23-
}
5+
use std::process::{Command, Stdio};
6+
use crate::message::{RequestMessage, ResponseMessage};
247

258

269
type Receiver = broadcast::Receiver<(String,RequestMessage)>;
@@ -44,7 +27,9 @@ impl Handler {
4427
match cmd {
4528
RequestMessage::Cmd { command,request_id } => {
4629
let mut seq:u32 = 1;
47-
match Command::new(&command).spawn() {
30+
let mut command = Command::new(&command);
31+
command.stdout(Stdio::piped()).stderr(Stdio::piped());
32+
match command.spawn() {
4833
Ok(mut child) => {
4934
let pid = child.id();
5035
if let Some(stdout) = child.stdout.take() {
@@ -70,7 +55,7 @@ impl Handler {
7055
#[cfg(test)]
7156
mod test {
7257
use std::io::{BufRead, BufReader};
73-
use std::process::Command;
58+
use std::process::{Command, Stdio};
7459
use std::time::Duration;
7560

7661
use super::RequestMessage;
@@ -85,15 +70,19 @@ mod test {
8570

8671
#[test]
8772
pub fn parse() {
88-
let mut child = Command::new("top")
73+
let mut cmd = Command::new("ls");
74+
cmd.stdout(Stdio::piped()).stderr(Stdio::piped());
75+
let mut child = cmd
8976
.spawn().unwrap();
77+
78+
9079
if let Some(stdout) = child.stdout.take() {
9180
let reader = BufReader::new(stdout);
9281
reader.lines().filter_map(|line| line.ok()).for_each(|line| println!("{}", line));
9382
}
9483

9584
println!("pid:{}",child.id());
96-
std::thread::sleep(Duration::from_secs(10));
85+
//std::thread::sleep(Duration::from_secs(10));
9786
child.kill().unwrap()
9887

9988
}

agent/src/lib.rs

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
pub mod config;
2+
pub mod message;
3+
mod connection;
4+
mod handler;
5+
6+
use std::sync::Arc;
7+
use tokio::sync::broadcast;
8+
use crate::connection::Connection;
9+
use crate::handler::Handler;
10+
use crate::message::RequestMessage;
11+
12+
pub async fn run(shutdown_rx:broadcast::Receiver<bool>, config: config::Config) -> anyhow::Result<()>{
13+
14+
let config = Arc::new(config);
15+
//todo: 优化 Message clone 问题。 或是替换队列
16+
let (cmd_tx, cmd_rx) = broadcast::channel::<(String, RequestMessage)>(5);
17+
18+
let (connection, event_loop) = Connection::connect(config.clone()).await?;
19+
let mut handler = Handler::new(cmd_rx, connection, config.get_response_command_topic());
20+
let shutdown_rx_1 = shutdown_rx.resubscribe();
21+
22+
tokio::spawn(async move {
23+
handler.run(shutdown_rx_1).await;
24+
});
25+
Connection::loop_event(event_loop, shutdown_rx, cmd_tx).await;
26+
Ok(())
27+
}

agent/src/main.rs

Lines changed: 5 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,13 @@
1-
mod config;
2-
mod connection;
3-
mod handler;
1+
42

53
use anyhow::Context;
64
use clap::{Parser, ArgGroup};
75
use std::path::PathBuf;
8-
use std::sync::Arc;
96
use tokio::{signal, sync::broadcast};
107
use tracing_subscriber::EnvFilter;
11-
use crate::connection::Connection;
12-
use crate::handler::{Handler, RequestMessage};
138

149
#[derive(Parser, Debug)]
15-
#[clap(name = crate::config::APP_NAME, version = env!("CARGO_PKG_VERSION"))]
10+
#[clap(name = mproxy::config::APP_NAME, version = env!("CARGO_PKG_VERSION"))]
1611
#[clap(group(ArgGroup::new("cmds").required(true).args(&["CONFIG"]),))]
1712
struct Cli {
1813
#[clap(value_parser, name = "CONFIG")]
@@ -43,38 +38,18 @@ async fn main() -> anyhow::Result<()> {
4338
tracing_subscriber::fmt().with_env_filter(EnvFilter::try_from_default_env().unwrap_or_else(|_| EnvFilter::from("info")),)
4439
.with_ansi(is_atty).init();
4540

46-
let config = config::Config::new(ops.config_path).with_context(|| "Config can not load")?;
41+
let config = mproxy::config::Config::new(ops.config_path).with_context(|| "Config can not load")?;
4742

48-
run(shutdown_rx, config).await?;
43+
mproxy::run(shutdown_rx, config).await?;
4944
Ok(())
5045
}
5146

5247

53-
async fn run(shutdown_rx:broadcast::Receiver<bool>, config: config::Config) -> anyhow::Result<()>{
54-
55-
let config = Arc::new(config);
56-
//todo: 优化 Message clone 问题。 或是替换队列
57-
let (cmd_tx, cmd_rx) = broadcast::channel::<(String, RequestMessage)>(5);
58-
59-
let (connection, event_loop) = Connection::connect(config.clone()).await?;
60-
let mut handler = Handler::new(cmd_rx, connection, config.get_response_command_topic());
61-
let shutdown_rx_1 = shutdown_rx.resubscribe();
62-
63-
tokio::spawn(async move {
64-
handler.run(shutdown_rx_1).await;
65-
});
66-
Connection::loop_event(event_loop, shutdown_rx, cmd_tx).await;
67-
Ok(())
68-
69-
70-
71-
}
72-
7348
#[cfg(test)]
7449
mod test {
7550
use clap::Parser;
7651
use crate::Cli;
77-
use crate::config::APP_NAME;
52+
use mproxy::config::APP_NAME;
7853

7954
#[test]
8055
fn parse_cli() {

agent/src/message.rs

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
use serde::{Deserialize, Serialize};
2+
3+
#[derive(Serialize, Deserialize, Debug, Clone)]
4+
#[serde(tag = "type")]
5+
pub enum RequestMessage {
6+
Cmd {
7+
command: String,
8+
request_id: String,
9+
}
10+
}
11+
12+
#[derive(Serialize, Deserialize, Debug, Clone)]
13+
#[serde(tag = "type")]
14+
pub enum ResponseMessage {
15+
Ok{ request_id:String, seq:u32, data:String, pid:u32},
16+
Err{ request_id:String, message:String}
17+
}

mprocs.yaml

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,10 @@
11
procs:
2-
mqtt:
3-
shell: cd develop && ./run_mqtt.sh
2+
#mqtt:
3+
# shell: cd develop && ./run_mqtt.sh
44
mproxy:
5-
shell: cd agent && cargo run -- config.toml
5+
env:
6+
RUST_LOG: debug
7+
shell: cd agent && cargo run -- $(pwd)/config.yml
68
web:
79
shell: cd web && npm run dev
810
stop: SIGKILL

0 commit comments

Comments
 (0)