#!/usr/bin/env python3 """ RSS to MQTT Push Notification Script Reads RSS feeds from config and sends new items as push notifications via MQTT """ import json import hashlib import time import logging import re from typing import Set, Dict, Any, Optional from pathlib import Path from html import unescape from html.parser import HTMLParser import toml import feedparser import paho.mqtt.client as mqtt class HTMLTextExtractor(HTMLParser): """Simple HTML parser to extract text content""" def __init__(self): super().__init__() self.text_content = [] def handle_data(self, data): self.text_content.append(data.strip()) def get_text(self): return ' '.join(self.text_content).strip() def clean_html_text(html_content: str) -> str: """Clean HTML content and extract plain text""" if not html_content: return "" # First unescape HTML entities text = unescape(html_content) # Use HTML parser to extract text parser = HTMLTextExtractor() try: parser.feed(text) clean_text = parser.get_text() except Exception: # Fallback: use regex to strip HTML tags clean_text = re.sub('<[^<]+?>', '', text) # Clean up whitespace clean_text = re.sub(r'\s+', ' ', clean_text).strip() return clean_text class RSSNotificationPusher: def __init__(self, config_path: str = "config.toml"): self.config_path = config_path self.config = self.load_config() self.seen_items: Dict[str, Set[str]] = {} self.mqtt_client = None self.setup_logging() self.setup_mqtt() def setup_logging(self): """Setup logging configuration""" logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s', handlers=[ logging.FileHandler('rss_notifier.log'), logging.StreamHandler() ] ) self.logger = logging.getLogger(__name__) def load_config(self) -> Dict[str, Any]: """Load configuration from TOML file""" try: with open(self.config_path, 'r') as f: config = toml.load(f) return config except FileNotFoundError: self.create_sample_config() raise FileNotFoundError(f"Config file {self.config_path} not found. Sample created.") except Exception as e: raise Exception(f"Error loading config: {e}") def create_sample_config(self): """Create a sample config file""" sample_config = { "broker": { "address": "proliant.lan", "client_id": "notification-pusher-client", "topic": "notification-pusher" # MQTT topic }, "auth": { "username": "mqttuser", "password": "XXXXXXXXX" }, "notifications": { "topic": "rss-feeds", # ntfy.sh topic for notifications "max_body_length": 150 # Maximum length for notification body }, "rss": { "feeds": [ "https://feeds.feedburner.com/TechCrunch", "https://rss.cnn.com/rss/edition.rss", "https://feeds.bbci.co.uk/news/rss.xml" ], "check_interval": 300, # seconds "max_items_per_feed": 5 # Only check top N items per feed } } with open(self.config_path, 'w') as f: toml.dump(sample_config, f) print(f"Sample config created at {self.config_path}") def setup_mqtt(self): """Setup MQTT client connection""" try: self.mqtt_client = mqtt.Client(client_id=self.config["broker"]["client_id"]) # Set credentials if provided if "auth" in self.config: self.mqtt_client.username_pw_set( self.config["auth"]["username"], self.config["auth"]["password"] ) # Setup callbacks self.mqtt_client.on_connect = self.on_mqtt_connect self.mqtt_client.on_disconnect = self.on_mqtt_disconnect self.mqtt_client.on_publish = self.on_mqtt_publish # Connect to broker self.mqtt_client.connect(self.config["broker"]["address"], 1883, 60) self.mqtt_client.loop_start() except Exception as e: self.logger.error(f"MQTT setup error: {e}") raise def on_mqtt_connect(self, client, userdata, flags, rc): """MQTT connection callback""" if rc == 0: self.logger.info("Connected to MQTT broker") else: self.logger.error(f"Failed to connect to MQTT broker: {rc}") def on_mqtt_disconnect(self, client, userdata, rc): """MQTT disconnection callback""" self.logger.warning("Disconnected from MQTT broker") def on_mqtt_publish(self, client, userdata, mid): """MQTT publish callback""" self.logger.debug(f"Message published: {mid}") def generate_item_id(self, feed_url: str, item: Dict[str, Any]) -> str: """Generate unique ID for RSS item""" # Use link if available, otherwise use title + published date identifier = item.get('link', '') + item.get('title', '') + item.get('published', '') return hashlib.md5((feed_url + identifier).encode()).hexdigest() def parse_rss_feed(self, feed_url: str) -> list: """Parse RSS feed and return new items (limited to most recent items)""" try: self.logger.info(f"Parsing feed: {feed_url}") feed = feedparser.parse(feed_url) if feed.bozo: self.logger.warning(f"Feed parsing issues for {feed_url}: {feed.bozo_exception}") # Initialize seen items set for this feed if not exists if feed_url not in self.seen_items: self.seen_items[feed_url] = set() # Get max items to check from config (default 5) max_items = self.config.get("rss", {}).get("max_items_per_feed", 5) recent_entries = feed.entries[:max_items] self.logger.debug(f"Processing top {len(recent_entries)} entries from {feed_url}") new_items = [] for entry in recent_entries: item_id = self.generate_item_id(feed_url, entry) if item_id not in self.seen_items[feed_url]: self.seen_items[feed_url].add(item_id) new_items.append(entry) self.logger.info(f"Found {len(new_items)} new items in {feed_url} (checked top {max_items})") return new_items except Exception as e: self.logger.error(f"Error parsing feed {feed_url}: {e}") return [] def extract_clean_description(self, entry: Dict[str, Any]) -> str: """Extract and clean description/summary from RSS entry""" # Try different fields for content content_candidates = [ entry.get('summary', ''), entry.get('description', ''), entry.get('content', [{}])[0].get('value', '') if entry.get('content') else '', entry.get('subtitle', '') ] # Find the first non-empty content raw_content = "" for candidate in content_candidates: if candidate and candidate.strip(): raw_content = candidate break if not raw_content: return "No description available" # Clean HTML and extract text clean_text = clean_html_text(raw_content) # If still empty after cleaning, try original with basic cleanup if not clean_text: clean_text = re.sub(r'<[^>]+>', '', raw_content) clean_text = re.sub(r'\s+', ' ', clean_text).strip() return clean_text or "No description available" def create_notification(self, entry: Dict[str, Any], feed_title: str = "") -> Dict[str, Any]: """Create notification object from RSS entry""" # Extract and clean title title = entry.get('title', 'No Title').strip() title = clean_html_text(title) or title # Clean any HTML in title too if feed_title: title = f"[{feed_title}] {title}" # Extract and clean description body = self.extract_clean_description(entry) # Truncate body if too long max_length = self.config.get("notifications", {}).get("max_body_length", 150) if len(body) > max_length: body = body[:max_length-3] + "..." # Get notification topic from config (for ntfy.sh) notification_topic = self.config.get("notifications", {}).get("topic", "rss-feeds") # Create notification object matching the Rust struct format notification = { "title": title, "body": body, "topic": notification_topic # This is the ntfy.sh topic } # Add link if available if 'link' in entry and entry['link'].strip(): notification["link"] = entry['link'].strip() return notification def send_notification(self, notification: Dict[str, Any]): """Send notification via MQTT""" try: payload = json.dumps(notification, ensure_ascii=False) result = self.mqtt_client.publish( self.config["broker"]["topic"], payload, qos=1 ) if result.rc == mqtt.MQTT_ERR_SUCCESS: self.logger.info(f"Notification sent: {notification['title'][:50]}...") self.logger.debug(f"Full notification: {notification}") else: self.logger.error(f"Failed to send notification: {result.rc}") except Exception as e: self.logger.error(f"Error sending notification: {e}") def process_feeds(self): """Process all RSS feeds and send notifications for new items""" feeds = self.config.get("rss", {}).get("feeds", []) if not feeds: self.logger.warning("No RSS feeds configured") return for feed_url in feeds: try: # Parse feed feed_data = feedparser.parse(feed_url) feed_title = clean_html_text(feed_data.feed.get('title', '')) or 'RSS Feed' # Get new items new_items = self.parse_rss_feed(feed_url) # Send notifications for new items for item in new_items: notification = self.create_notification(item, feed_title) self.send_notification(notification) # Small delay to avoid overwhelming time.sleep(0.5) except Exception as e: self.logger.error(f"Error processing feed {feed_url}: {e}") def run_once(self): """Run the notification process once""" self.logger.info("Starting RSS notification check") self.process_feeds() self.logger.info("RSS notification check completed") def run_continuously(self): """Run the notification process continuously""" check_interval = self.config.get("rss", {}).get("check_interval", 300) self.logger.info(f"Starting continuous RSS monitoring (interval: {check_interval}s)") try: while True: self.run_once() time.sleep(check_interval) except KeyboardInterrupt: self.logger.info("Shutting down...") except Exception as e: self.logger.error(f"Unexpected error: {e}") finally: self.cleanup() def cleanup(self): """Cleanup resources""" if self.mqtt_client: self.mqtt_client.loop_stop() self.mqtt_client.disconnect() self.logger.info("Cleanup completed") def main(): """Main entry point""" import argparse parser = argparse.ArgumentParser(description='RSS to MQTT Push Notification Script') parser.add_argument('--config', default='config.toml', help='Config file path') parser.add_argument('--once', action='store_true', help='Run once and exit') parser.add_argument('--debug', action='store_true', help='Enable debug logging') args = parser.parse_args() # Set debug level if requested if args.debug: logging.getLogger().setLevel(logging.DEBUG) try: pusher = RSSNotificationPusher(args.config) if args.once: pusher.run_once() else: pusher.run_continuously() except Exception as e: print(f"Error: {e}") return 1 return 0 if __name__ == "__main__": exit(main())