Files
2025-08-12 23:26:31 -05:00

373 lines
13 KiB
Python

#!/usr/bin/env python3
"""
RSS to MQTT Push Notification Script
Reads RSS feeds from config and sends new items as push notifications via MQTT
"""
import json
import hashlib
import time
import logging
import re
from typing import Set, Dict, Any, Optional
from pathlib import Path
from html import unescape
from html.parser import HTMLParser
import toml
import feedparser
import paho.mqtt.client as mqtt
class HTMLTextExtractor(HTMLParser):
"""Simple HTML parser to extract text content"""
def __init__(self):
super().__init__()
self.text_content = []
def handle_data(self, data):
self.text_content.append(data.strip())
def get_text(self):
return ' '.join(self.text_content).strip()
def clean_html_text(html_content: str) -> str:
"""Clean HTML content and extract plain text"""
if not html_content:
return ""
# First unescape HTML entities
text = unescape(html_content)
# Use HTML parser to extract text
parser = HTMLTextExtractor()
try:
parser.feed(text)
clean_text = parser.get_text()
except Exception:
# Fallback: use regex to strip HTML tags
clean_text = re.sub('<[^<]+?>', '', text)
# Clean up whitespace
clean_text = re.sub(r'\s+', ' ', clean_text).strip()
return clean_text
class RSSNotificationPusher:
def __init__(self, config_path: str = "config.toml"):
self.config_path = config_path
self.config = self.load_config()
self.seen_items: Dict[str, Set[str]] = {}
self.mqtt_client = None
self.setup_logging()
self.setup_mqtt()
def setup_logging(self):
"""Setup logging configuration"""
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler('rss_notifier.log'),
logging.StreamHandler()
]
)
self.logger = logging.getLogger(__name__)
def load_config(self) -> Dict[str, Any]:
"""Load configuration from TOML file"""
try:
with open(self.config_path, 'r') as f:
config = toml.load(f)
return config
except FileNotFoundError:
self.create_sample_config()
raise FileNotFoundError(f"Config file {self.config_path} not found. Sample created.")
except Exception as e:
raise Exception(f"Error loading config: {e}")
def create_sample_config(self):
"""Create a sample config file"""
sample_config = {
"broker": {
"address": "proliant.lan",
"client_id": "notification-pusher-client",
"topic": "notification-pusher" # MQTT topic
},
"auth": {
"username": "mqttuser",
"password": "XXXXXXXXX"
},
"notifications": {
"topic": "rss-feeds", # ntfy.sh topic for notifications
"max_body_length": 150 # Maximum length for notification body
},
"rss": {
"feeds": [
"https://feeds.feedburner.com/TechCrunch",
"https://rss.cnn.com/rss/edition.rss",
"https://feeds.bbci.co.uk/news/rss.xml"
],
"check_interval": 300, # seconds
"max_items_per_feed": 5 # Only check top N items per feed
}
}
with open(self.config_path, 'w') as f:
toml.dump(sample_config, f)
print(f"Sample config created at {self.config_path}")
def setup_mqtt(self):
"""Setup MQTT client connection"""
try:
self.mqtt_client = mqtt.Client(client_id=self.config["broker"]["client_id"])
# Set credentials if provided
if "auth" in self.config:
self.mqtt_client.username_pw_set(
self.config["auth"]["username"],
self.config["auth"]["password"]
)
# Setup callbacks
self.mqtt_client.on_connect = self.on_mqtt_connect
self.mqtt_client.on_disconnect = self.on_mqtt_disconnect
self.mqtt_client.on_publish = self.on_mqtt_publish
# Connect to broker
self.mqtt_client.connect(self.config["broker"]["address"], 1883, 60)
self.mqtt_client.loop_start()
except Exception as e:
self.logger.error(f"MQTT setup error: {e}")
raise
def on_mqtt_connect(self, client, userdata, flags, rc):
"""MQTT connection callback"""
if rc == 0:
self.logger.info("Connected to MQTT broker")
else:
self.logger.error(f"Failed to connect to MQTT broker: {rc}")
def on_mqtt_disconnect(self, client, userdata, rc):
"""MQTT disconnection callback"""
self.logger.warning("Disconnected from MQTT broker")
def on_mqtt_publish(self, client, userdata, mid):
"""MQTT publish callback"""
self.logger.debug(f"Message published: {mid}")
def generate_item_id(self, feed_url: str, item: Dict[str, Any]) -> str:
"""Generate unique ID for RSS item"""
# Use link if available, otherwise use title + published date
identifier = item.get('link', '') + item.get('title', '') + item.get('published', '')
return hashlib.md5((feed_url + identifier).encode()).hexdigest()
def parse_rss_feed(self, feed_url: str) -> list:
"""Parse RSS feed and return new items (limited to most recent items)"""
try:
self.logger.info(f"Parsing feed: {feed_url}")
feed = feedparser.parse(feed_url)
if feed.bozo:
self.logger.warning(f"Feed parsing issues for {feed_url}: {feed.bozo_exception}")
# Initialize seen items set for this feed if not exists
if feed_url not in self.seen_items:
self.seen_items[feed_url] = set()
# Get max items to check from config (default 5)
max_items = self.config.get("rss", {}).get("max_items_per_feed", 5)
recent_entries = feed.entries[:max_items]
self.logger.debug(f"Processing top {len(recent_entries)} entries from {feed_url}")
new_items = []
for entry in recent_entries:
item_id = self.generate_item_id(feed_url, entry)
if item_id not in self.seen_items[feed_url]:
self.seen_items[feed_url].add(item_id)
new_items.append(entry)
self.logger.info(f"Found {len(new_items)} new items in {feed_url} (checked top {max_items})")
return new_items
except Exception as e:
self.logger.error(f"Error parsing feed {feed_url}: {e}")
return []
def extract_clean_description(self, entry: Dict[str, Any]) -> str:
"""Extract and clean description/summary from RSS entry"""
# Try different fields for content
content_candidates = [
entry.get('summary', ''),
entry.get('description', ''),
entry.get('content', [{}])[0].get('value', '') if entry.get('content') else '',
entry.get('subtitle', '')
]
# Find the first non-empty content
raw_content = ""
for candidate in content_candidates:
if candidate and candidate.strip():
raw_content = candidate
break
if not raw_content:
return "No description available"
# Clean HTML and extract text
clean_text = clean_html_text(raw_content)
# If still empty after cleaning, try original with basic cleanup
if not clean_text:
clean_text = re.sub(r'<[^>]+>', '', raw_content)
clean_text = re.sub(r'\s+', ' ', clean_text).strip()
return clean_text or "No description available"
def create_notification(self, entry: Dict[str, Any], feed_title: str = "") -> Dict[str, Any]:
"""Create notification object from RSS entry"""
# Extract and clean title
title = entry.get('title', 'No Title').strip()
title = clean_html_text(title) or title # Clean any HTML in title too
if feed_title:
title = f"[{feed_title}] {title}"
# Extract and clean description
body = self.extract_clean_description(entry)
# Truncate body if too long
max_length = self.config.get("notifications", {}).get("max_body_length", 150)
if len(body) > max_length:
body = body[:max_length-3] + "..."
# Get notification topic from config (for ntfy.sh)
notification_topic = self.config.get("notifications", {}).get("topic", "rss-feeds")
# Create notification object matching the Rust struct format
notification = {
"title": title,
"body": body,
"topic": notification_topic # This is the ntfy.sh topic
}
# Add link if available
if 'link' in entry and entry['link'].strip():
notification["link"] = entry['link'].strip()
return notification
def send_notification(self, notification: Dict[str, Any]):
"""Send notification via MQTT"""
try:
payload = json.dumps(notification, ensure_ascii=False)
result = self.mqtt_client.publish(
self.config["broker"]["topic"],
payload,
qos=1
)
if result.rc == mqtt.MQTT_ERR_SUCCESS:
self.logger.info(f"Notification sent: {notification['title'][:50]}...")
self.logger.debug(f"Full notification: {notification}")
else:
self.logger.error(f"Failed to send notification: {result.rc}")
except Exception as e:
self.logger.error(f"Error sending notification: {e}")
def process_feeds(self):
"""Process all RSS feeds and send notifications for new items"""
feeds = self.config.get("rss", {}).get("feeds", [])
if not feeds:
self.logger.warning("No RSS feeds configured")
return
for feed_url in feeds:
try:
# Parse feed
feed_data = feedparser.parse(feed_url)
feed_title = clean_html_text(feed_data.feed.get('title', '')) or 'RSS Feed'
# Get new items
new_items = self.parse_rss_feed(feed_url)
# Send notifications for new items
for item in new_items:
notification = self.create_notification(item, feed_title)
self.send_notification(notification)
# Small delay to avoid overwhelming
time.sleep(0.5)
except Exception as e:
self.logger.error(f"Error processing feed {feed_url}: {e}")
def run_once(self):
"""Run the notification process once"""
self.logger.info("Starting RSS notification check")
self.process_feeds()
self.logger.info("RSS notification check completed")
def run_continuously(self):
"""Run the notification process continuously"""
check_interval = self.config.get("rss", {}).get("check_interval", 300)
self.logger.info(f"Starting continuous RSS monitoring (interval: {check_interval}s)")
try:
while True:
self.run_once()
time.sleep(check_interval)
except KeyboardInterrupt:
self.logger.info("Shutting down...")
except Exception as e:
self.logger.error(f"Unexpected error: {e}")
finally:
self.cleanup()
def cleanup(self):
"""Cleanup resources"""
if self.mqtt_client:
self.mqtt_client.loop_stop()
self.mqtt_client.disconnect()
self.logger.info("Cleanup completed")
def main():
"""Main entry point"""
import argparse
parser = argparse.ArgumentParser(description='RSS to MQTT Push Notification Script')
parser.add_argument('--config', default='config.toml', help='Config file path')
parser.add_argument('--once', action='store_true', help='Run once and exit')
parser.add_argument('--debug', action='store_true', help='Enable debug logging')
args = parser.parse_args()
# Set debug level if requested
if args.debug:
logging.getLogger().setLevel(logging.DEBUG)
try:
pusher = RSSNotificationPusher(args.config)
if args.once:
pusher.run_once()
else:
pusher.run_continuously()
except Exception as e:
print(f"Error: {e}")
return 1
return 0
if __name__ == "__main__":
exit(main())