373 lines
13 KiB
Python
373 lines
13 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
RSS to MQTT Push Notification Script
|
|
Reads RSS feeds from config and sends new items as push notifications via MQTT
|
|
"""
|
|
|
|
import json
|
|
import hashlib
|
|
import time
|
|
import logging
|
|
import re
|
|
from typing import Set, Dict, Any, Optional
|
|
from pathlib import Path
|
|
from html import unescape
|
|
from html.parser import HTMLParser
|
|
|
|
import toml
|
|
import feedparser
|
|
import paho.mqtt.client as mqtt
|
|
|
|
|
|
class HTMLTextExtractor(HTMLParser):
|
|
"""Simple HTML parser to extract text content"""
|
|
def __init__(self):
|
|
super().__init__()
|
|
self.text_content = []
|
|
|
|
def handle_data(self, data):
|
|
self.text_content.append(data.strip())
|
|
|
|
def get_text(self):
|
|
return ' '.join(self.text_content).strip()
|
|
|
|
|
|
def clean_html_text(html_content: str) -> str:
|
|
"""Clean HTML content and extract plain text"""
|
|
if not html_content:
|
|
return ""
|
|
|
|
# First unescape HTML entities
|
|
text = unescape(html_content)
|
|
|
|
# Use HTML parser to extract text
|
|
parser = HTMLTextExtractor()
|
|
try:
|
|
parser.feed(text)
|
|
clean_text = parser.get_text()
|
|
except Exception:
|
|
# Fallback: use regex to strip HTML tags
|
|
clean_text = re.sub('<[^<]+?>', '', text)
|
|
|
|
# Clean up whitespace
|
|
clean_text = re.sub(r'\s+', ' ', clean_text).strip()
|
|
|
|
return clean_text
|
|
|
|
|
|
class RSSNotificationPusher:
|
|
def __init__(self, config_path: str = "config.toml"):
|
|
self.config_path = config_path
|
|
self.config = self.load_config()
|
|
self.seen_items: Dict[str, Set[str]] = {}
|
|
self.mqtt_client = None
|
|
self.setup_logging()
|
|
self.setup_mqtt()
|
|
|
|
def setup_logging(self):
|
|
"""Setup logging configuration"""
|
|
logging.basicConfig(
|
|
level=logging.INFO,
|
|
format='%(asctime)s - %(levelname)s - %(message)s',
|
|
handlers=[
|
|
logging.FileHandler('rss_notifier.log'),
|
|
logging.StreamHandler()
|
|
]
|
|
)
|
|
self.logger = logging.getLogger(__name__)
|
|
|
|
def load_config(self) -> Dict[str, Any]:
|
|
"""Load configuration from TOML file"""
|
|
try:
|
|
with open(self.config_path, 'r') as f:
|
|
config = toml.load(f)
|
|
return config
|
|
except FileNotFoundError:
|
|
self.create_sample_config()
|
|
raise FileNotFoundError(f"Config file {self.config_path} not found. Sample created.")
|
|
except Exception as e:
|
|
raise Exception(f"Error loading config: {e}")
|
|
|
|
def create_sample_config(self):
|
|
"""Create a sample config file"""
|
|
sample_config = {
|
|
"broker": {
|
|
"address": "proliant.lan",
|
|
"client_id": "notification-pusher-client",
|
|
"topic": "notification-pusher" # MQTT topic
|
|
},
|
|
"auth": {
|
|
"username": "mqttuser",
|
|
"password": "XXXXXXXXX"
|
|
},
|
|
"notifications": {
|
|
"topic": "rss-feeds", # ntfy.sh topic for notifications
|
|
"max_body_length": 150 # Maximum length for notification body
|
|
},
|
|
"rss": {
|
|
"feeds": [
|
|
"https://feeds.feedburner.com/TechCrunch",
|
|
"https://rss.cnn.com/rss/edition.rss",
|
|
"https://feeds.bbci.co.uk/news/rss.xml"
|
|
],
|
|
"check_interval": 300, # seconds
|
|
"max_items_per_feed": 5 # Only check top N items per feed
|
|
}
|
|
}
|
|
|
|
with open(self.config_path, 'w') as f:
|
|
toml.dump(sample_config, f)
|
|
print(f"Sample config created at {self.config_path}")
|
|
|
|
def setup_mqtt(self):
|
|
"""Setup MQTT client connection"""
|
|
try:
|
|
self.mqtt_client = mqtt.Client(client_id=self.config["broker"]["client_id"])
|
|
|
|
# Set credentials if provided
|
|
if "auth" in self.config:
|
|
self.mqtt_client.username_pw_set(
|
|
self.config["auth"]["username"],
|
|
self.config["auth"]["password"]
|
|
)
|
|
|
|
# Setup callbacks
|
|
self.mqtt_client.on_connect = self.on_mqtt_connect
|
|
self.mqtt_client.on_disconnect = self.on_mqtt_disconnect
|
|
self.mqtt_client.on_publish = self.on_mqtt_publish
|
|
|
|
# Connect to broker
|
|
self.mqtt_client.connect(self.config["broker"]["address"], 1883, 60)
|
|
self.mqtt_client.loop_start()
|
|
|
|
except Exception as e:
|
|
self.logger.error(f"MQTT setup error: {e}")
|
|
raise
|
|
|
|
def on_mqtt_connect(self, client, userdata, flags, rc):
|
|
"""MQTT connection callback"""
|
|
if rc == 0:
|
|
self.logger.info("Connected to MQTT broker")
|
|
else:
|
|
self.logger.error(f"Failed to connect to MQTT broker: {rc}")
|
|
|
|
def on_mqtt_disconnect(self, client, userdata, rc):
|
|
"""MQTT disconnection callback"""
|
|
self.logger.warning("Disconnected from MQTT broker")
|
|
|
|
def on_mqtt_publish(self, client, userdata, mid):
|
|
"""MQTT publish callback"""
|
|
self.logger.debug(f"Message published: {mid}")
|
|
|
|
def generate_item_id(self, feed_url: str, item: Dict[str, Any]) -> str:
|
|
"""Generate unique ID for RSS item"""
|
|
# Use link if available, otherwise use title + published date
|
|
identifier = item.get('link', '') + item.get('title', '') + item.get('published', '')
|
|
return hashlib.md5((feed_url + identifier).encode()).hexdigest()
|
|
|
|
def parse_rss_feed(self, feed_url: str) -> list:
|
|
"""Parse RSS feed and return new items (limited to most recent items)"""
|
|
try:
|
|
self.logger.info(f"Parsing feed: {feed_url}")
|
|
feed = feedparser.parse(feed_url)
|
|
|
|
if feed.bozo:
|
|
self.logger.warning(f"Feed parsing issues for {feed_url}: {feed.bozo_exception}")
|
|
|
|
# Initialize seen items set for this feed if not exists
|
|
if feed_url not in self.seen_items:
|
|
self.seen_items[feed_url] = set()
|
|
|
|
# Get max items to check from config (default 5)
|
|
max_items = self.config.get("rss", {}).get("max_items_per_feed", 5)
|
|
recent_entries = feed.entries[:max_items]
|
|
self.logger.debug(f"Processing top {len(recent_entries)} entries from {feed_url}")
|
|
|
|
new_items = []
|
|
for entry in recent_entries:
|
|
item_id = self.generate_item_id(feed_url, entry)
|
|
|
|
if item_id not in self.seen_items[feed_url]:
|
|
self.seen_items[feed_url].add(item_id)
|
|
new_items.append(entry)
|
|
|
|
self.logger.info(f"Found {len(new_items)} new items in {feed_url} (checked top {max_items})")
|
|
return new_items
|
|
|
|
except Exception as e:
|
|
self.logger.error(f"Error parsing feed {feed_url}: {e}")
|
|
return []
|
|
|
|
def extract_clean_description(self, entry: Dict[str, Any]) -> str:
|
|
"""Extract and clean description/summary from RSS entry"""
|
|
# Try different fields for content
|
|
content_candidates = [
|
|
entry.get('summary', ''),
|
|
entry.get('description', ''),
|
|
entry.get('content', [{}])[0].get('value', '') if entry.get('content') else '',
|
|
entry.get('subtitle', '')
|
|
]
|
|
|
|
# Find the first non-empty content
|
|
raw_content = ""
|
|
for candidate in content_candidates:
|
|
if candidate and candidate.strip():
|
|
raw_content = candidate
|
|
break
|
|
|
|
if not raw_content:
|
|
return "No description available"
|
|
|
|
# Clean HTML and extract text
|
|
clean_text = clean_html_text(raw_content)
|
|
|
|
# If still empty after cleaning, try original with basic cleanup
|
|
if not clean_text:
|
|
clean_text = re.sub(r'<[^>]+>', '', raw_content)
|
|
clean_text = re.sub(r'\s+', ' ', clean_text).strip()
|
|
|
|
return clean_text or "No description available"
|
|
|
|
def create_notification(self, entry: Dict[str, Any], feed_title: str = "") -> Dict[str, Any]:
|
|
"""Create notification object from RSS entry"""
|
|
# Extract and clean title
|
|
title = entry.get('title', 'No Title').strip()
|
|
title = clean_html_text(title) or title # Clean any HTML in title too
|
|
|
|
if feed_title:
|
|
title = f"[{feed_title}] {title}"
|
|
|
|
# Extract and clean description
|
|
body = self.extract_clean_description(entry)
|
|
|
|
# Truncate body if too long
|
|
max_length = self.config.get("notifications", {}).get("max_body_length", 150)
|
|
if len(body) > max_length:
|
|
body = body[:max_length-3] + "..."
|
|
|
|
# Get notification topic from config (for ntfy.sh)
|
|
notification_topic = self.config.get("notifications", {}).get("topic", "rss-feeds")
|
|
|
|
# Create notification object matching the Rust struct format
|
|
notification = {
|
|
"title": title,
|
|
"body": body,
|
|
"topic": notification_topic # This is the ntfy.sh topic
|
|
}
|
|
|
|
# Add link if available
|
|
if 'link' in entry and entry['link'].strip():
|
|
notification["link"] = entry['link'].strip()
|
|
|
|
return notification
|
|
|
|
def send_notification(self, notification: Dict[str, Any]):
|
|
"""Send notification via MQTT"""
|
|
try:
|
|
payload = json.dumps(notification, ensure_ascii=False)
|
|
result = self.mqtt_client.publish(
|
|
self.config["broker"]["topic"],
|
|
payload,
|
|
qos=1
|
|
)
|
|
|
|
if result.rc == mqtt.MQTT_ERR_SUCCESS:
|
|
self.logger.info(f"Notification sent: {notification['title'][:50]}...")
|
|
self.logger.debug(f"Full notification: {notification}")
|
|
else:
|
|
self.logger.error(f"Failed to send notification: {result.rc}")
|
|
|
|
except Exception as e:
|
|
self.logger.error(f"Error sending notification: {e}")
|
|
|
|
def process_feeds(self):
|
|
"""Process all RSS feeds and send notifications for new items"""
|
|
feeds = self.config.get("rss", {}).get("feeds", [])
|
|
|
|
if not feeds:
|
|
self.logger.warning("No RSS feeds configured")
|
|
return
|
|
|
|
for feed_url in feeds:
|
|
try:
|
|
# Parse feed
|
|
feed_data = feedparser.parse(feed_url)
|
|
feed_title = clean_html_text(feed_data.feed.get('title', '')) or 'RSS Feed'
|
|
|
|
# Get new items
|
|
new_items = self.parse_rss_feed(feed_url)
|
|
|
|
# Send notifications for new items
|
|
for item in new_items:
|
|
notification = self.create_notification(item, feed_title)
|
|
self.send_notification(notification)
|
|
|
|
# Small delay to avoid overwhelming
|
|
time.sleep(0.5)
|
|
|
|
except Exception as e:
|
|
self.logger.error(f"Error processing feed {feed_url}: {e}")
|
|
|
|
def run_once(self):
|
|
"""Run the notification process once"""
|
|
self.logger.info("Starting RSS notification check")
|
|
self.process_feeds()
|
|
self.logger.info("RSS notification check completed")
|
|
|
|
def run_continuously(self):
|
|
"""Run the notification process continuously"""
|
|
check_interval = self.config.get("rss", {}).get("check_interval", 300)
|
|
self.logger.info(f"Starting continuous RSS monitoring (interval: {check_interval}s)")
|
|
|
|
try:
|
|
while True:
|
|
self.run_once()
|
|
time.sleep(check_interval)
|
|
|
|
except KeyboardInterrupt:
|
|
self.logger.info("Shutting down...")
|
|
except Exception as e:
|
|
self.logger.error(f"Unexpected error: {e}")
|
|
finally:
|
|
self.cleanup()
|
|
|
|
def cleanup(self):
|
|
"""Cleanup resources"""
|
|
if self.mqtt_client:
|
|
self.mqtt_client.loop_stop()
|
|
self.mqtt_client.disconnect()
|
|
self.logger.info("Cleanup completed")
|
|
|
|
|
|
def main():
|
|
"""Main entry point"""
|
|
import argparse
|
|
|
|
parser = argparse.ArgumentParser(description='RSS to MQTT Push Notification Script')
|
|
parser.add_argument('--config', default='config.toml', help='Config file path')
|
|
parser.add_argument('--once', action='store_true', help='Run once and exit')
|
|
parser.add_argument('--debug', action='store_true', help='Enable debug logging')
|
|
args = parser.parse_args()
|
|
|
|
# Set debug level if requested
|
|
if args.debug:
|
|
logging.getLogger().setLevel(logging.DEBUG)
|
|
|
|
try:
|
|
pusher = RSSNotificationPusher(args.config)
|
|
|
|
if args.once:
|
|
pusher.run_once()
|
|
else:
|
|
pusher.run_continuously()
|
|
|
|
except Exception as e:
|
|
print(f"Error: {e}")
|
|
return 1
|
|
|
|
return 0
|
|
|
|
|
|
if __name__ == "__main__":
|
|
exit(main())
|