done, for now

This commit is contained in:
2025-08-12 17:39:37 -05:00
commit 95df8adc12
4 changed files with 2587 additions and 0 deletions

2
.gitignore vendored Normal file
View File

@@ -0,0 +1,2 @@
/target
config.toml

2382
Cargo.lock generated Normal file

File diff suppressed because it is too large Load Diff

15
Cargo.toml Normal file
View File

@@ -0,0 +1,15 @@
[package]
name = "notification-pusher-rust"
version = "0.1.0"
edition = "2021"
[dependencies]
rumqttc = "0.19" # MQTT client
tokio = { version = "1", features = ["full"] } # async runtime
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
toml = "0.7"
reqwest = { version = "0.12.23", features = ["json", "rustls-tls"] }
log = "0.4"
tokio-stream = "0.1"
simple_logger = "5.0.0"

188
src/main.rs Normal file
View File

@@ -0,0 +1,188 @@
use rumqttc::{AsyncClient, Event, MqttOptions, QoS};
use serde::{Deserialize, Serialize};
use serde_json::json;
use std::time::{Duration, Instant};
use std::{fs, process, thread};
use tokio::time::timeout;
use tokio_stream::StreamExt;
#[derive(Deserialize)]
struct ConfigBroker {
address: String,
client_id: String,
topic: String,
}
#[derive(Deserialize)]
struct ConfigAuth {
username: Option<String>,
password: Option<String>,
}
#[derive(Deserialize)]
struct Config {
broker: ConfigBroker,
auth: ConfigAuth,
}
#[derive(Serialize, Deserialize, Debug)]
struct Notification {
title: String,
body: String,
topic: String,
}
#[tokio::main]
async fn main() {
simple_logger::init().unwrap();
// Load config.toml
let config_content = fs::read_to_string("config.toml")
.expect("Failed to read config.toml");
let config: Config = toml::from_str(&config_content)
.expect("Failed to parse config.toml");
if config.broker.address.is_empty()
|| config.broker.client_id.is_empty()
|| config.broker.topic.is_empty()
{
log::error!("Missing broker configuration in config.toml");
process::exit(1);
}
let mut mqttoptions = MqttOptions::new(
format!("{}-0", config.broker.client_id),
&config.broker.address,
1883,
);
mqttoptions.set_clean_session(true);
if let Some(username) = &config.auth.username {
mqttoptions.set_credentials(username, config.auth.password.as_deref().unwrap_or(""));
}
let (client, mut eventloop) = AsyncClient::new(mqttoptions, 10);
log::info!(
"Connecting to broker at {} with client ID '{}'",
&config.broker.address,
&config.broker.client_id
);
// Connect is implicit when eventloop.poll() is called for the first time
// Subscribe to topic
client
.subscribe(&config.broker.topic, QoS::AtLeastOnce)
.await
.expect("Failed to subscribe");
log::info!("Subscribed to topic '{}'", &config.broker.topic);
// Publish test message
let test_message = format!("Hello from {}", config.broker.client_id);
client
.publish(
&config.broker.topic,
QoS::AtLeastOnce,
false,
test_message.as_bytes(),
)
.await
.expect("Failed to publish message");
log::info!("Published message: {}", test_message);
// Wait for one message
log::info!("Waiting for incoming message...");
let mut received_message = None;
// We run event loop with timeout to consume one message
let timeout_duration = Duration::from_secs(5);
let start = Instant::now();
while start.elapsed() < timeout_duration {
match timeout(Duration::from_millis(100), eventloop.poll()).await {
Ok(Ok(event)) => {
if let Event::Incoming(rumqttc::Packet::Publish(publish)) = event {
let payload = String::from_utf8_lossy(&publish.payload);
log::info!(
"Received message on topic '{}': {}",
publish.topic,
payload
);
received_message = Some(payload.to_string());
break;
}
}
Ok(Err(e)) => {
log::warn!("MQTT eventloop error: {}", e);
}
Err(_) => {
// timeout on poll, loop again
}
}
}
if received_message.is_none() {
log::warn!("No message received.");
}
// Start notification daemon (listen indefinitely)
log::info!("Starting notification daemon");
start_notification_daemon(client, eventloop, config.broker.topic).await;
}
async fn start_notification_daemon(
client: AsyncClient,
mut eventloop: rumqttc::EventLoop,
topic: String,
) {
client
.subscribe(&topic, QoS::AtLeastOnce)
.await
.expect("Failed to subscribe in daemon");
loop {
match eventloop.poll().await {
Ok(rumqttc::Event::Incoming(rumqttc::Packet::Publish(publish))) => {
let payload = String::from_utf8_lossy(&publish.payload);
log::debug!("Received MQTT message: {} on topic {}", payload, publish.topic);
match serde_json::from_str::<Notification>(&payload) {
Ok(notif) => {
// Send to ntfy.sh
let url = format!("https://ntfy.sh/{}", notif.topic);
let client = reqwest::Client::new();
match client
.post(&url)
.header("Title", &notif.title)
.body(notif.body)
.send()
.await
{
Ok(resp) => {
log::debug!("ntfy.sh status code: {}", resp.status());
}
Err(e) => {
log::warn!("Failed to send notification: {}", e);
}
}
}
Err(e) => {
log::warn!("Failed to parse JSON message: {}", e);
}
}
}
Ok(_) => {}
Err(e) => {
log::warn!("MQTT eventloop error in daemon: {}", e);
// Optionally reconnect here if needed
}
}
}
}