should be done now
This commit is contained in:
372
main.py
Normal file
372
main.py
Normal file
@@ -0,0 +1,372 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
RSS to MQTT Push Notification Script
|
||||
Reads RSS feeds from config and sends new items as push notifications via MQTT
|
||||
"""
|
||||
|
||||
import json
|
||||
import hashlib
|
||||
import time
|
||||
import logging
|
||||
import re
|
||||
from typing import Set, Dict, Any, Optional
|
||||
from pathlib import Path
|
||||
from html import unescape
|
||||
from html.parser import HTMLParser
|
||||
|
||||
import toml
|
||||
import feedparser
|
||||
import paho.mqtt.client as mqtt
|
||||
|
||||
|
||||
class HTMLTextExtractor(HTMLParser):
|
||||
"""Simple HTML parser to extract text content"""
|
||||
def __init__(self):
|
||||
super().__init__()
|
||||
self.text_content = []
|
||||
|
||||
def handle_data(self, data):
|
||||
self.text_content.append(data.strip())
|
||||
|
||||
def get_text(self):
|
||||
return ' '.join(self.text_content).strip()
|
||||
|
||||
|
||||
def clean_html_text(html_content: str) -> str:
|
||||
"""Clean HTML content and extract plain text"""
|
||||
if not html_content:
|
||||
return ""
|
||||
|
||||
# First unescape HTML entities
|
||||
text = unescape(html_content)
|
||||
|
||||
# Use HTML parser to extract text
|
||||
parser = HTMLTextExtractor()
|
||||
try:
|
||||
parser.feed(text)
|
||||
clean_text = parser.get_text()
|
||||
except Exception:
|
||||
# Fallback: use regex to strip HTML tags
|
||||
clean_text = re.sub('<[^<]+?>', '', text)
|
||||
|
||||
# Clean up whitespace
|
||||
clean_text = re.sub(r'\s+', ' ', clean_text).strip()
|
||||
|
||||
return clean_text
|
||||
|
||||
|
||||
class RSSNotificationPusher:
|
||||
def __init__(self, config_path: str = "config.toml"):
|
||||
self.config_path = config_path
|
||||
self.config = self.load_config()
|
||||
self.seen_items: Dict[str, Set[str]] = {}
|
||||
self.mqtt_client = None
|
||||
self.setup_logging()
|
||||
self.setup_mqtt()
|
||||
|
||||
def setup_logging(self):
|
||||
"""Setup logging configuration"""
|
||||
logging.basicConfig(
|
||||
level=logging.INFO,
|
||||
format='%(asctime)s - %(levelname)s - %(message)s',
|
||||
handlers=[
|
||||
logging.FileHandler('rss_notifier.log'),
|
||||
logging.StreamHandler()
|
||||
]
|
||||
)
|
||||
self.logger = logging.getLogger(__name__)
|
||||
|
||||
def load_config(self) -> Dict[str, Any]:
|
||||
"""Load configuration from TOML file"""
|
||||
try:
|
||||
with open(self.config_path, 'r') as f:
|
||||
config = toml.load(f)
|
||||
return config
|
||||
except FileNotFoundError:
|
||||
self.create_sample_config()
|
||||
raise FileNotFoundError(f"Config file {self.config_path} not found. Sample created.")
|
||||
except Exception as e:
|
||||
raise Exception(f"Error loading config: {e}")
|
||||
|
||||
def create_sample_config(self):
|
||||
"""Create a sample config file"""
|
||||
sample_config = {
|
||||
"broker": {
|
||||
"address": "proliant.lan",
|
||||
"client_id": "notification-pusher-client",
|
||||
"topic": "notification-pusher" # MQTT topic
|
||||
},
|
||||
"auth": {
|
||||
"username": "mqttuser",
|
||||
"password": "XXXXXXXXX"
|
||||
},
|
||||
"notifications": {
|
||||
"topic": "rss-feeds", # ntfy.sh topic for notifications
|
||||
"max_body_length": 150 # Maximum length for notification body
|
||||
},
|
||||
"rss": {
|
||||
"feeds": [
|
||||
"https://feeds.feedburner.com/TechCrunch",
|
||||
"https://rss.cnn.com/rss/edition.rss",
|
||||
"https://feeds.bbci.co.uk/news/rss.xml"
|
||||
],
|
||||
"check_interval": 300, # seconds
|
||||
"max_items_per_feed": 5 # Only check top N items per feed
|
||||
}
|
||||
}
|
||||
|
||||
with open(self.config_path, 'w') as f:
|
||||
toml.dump(sample_config, f)
|
||||
print(f"Sample config created at {self.config_path}")
|
||||
|
||||
def setup_mqtt(self):
|
||||
"""Setup MQTT client connection"""
|
||||
try:
|
||||
self.mqtt_client = mqtt.Client(client_id=self.config["broker"]["client_id"])
|
||||
|
||||
# Set credentials if provided
|
||||
if "auth" in self.config:
|
||||
self.mqtt_client.username_pw_set(
|
||||
self.config["auth"]["username"],
|
||||
self.config["auth"]["password"]
|
||||
)
|
||||
|
||||
# Setup callbacks
|
||||
self.mqtt_client.on_connect = self.on_mqtt_connect
|
||||
self.mqtt_client.on_disconnect = self.on_mqtt_disconnect
|
||||
self.mqtt_client.on_publish = self.on_mqtt_publish
|
||||
|
||||
# Connect to broker
|
||||
self.mqtt_client.connect(self.config["broker"]["address"], 1883, 60)
|
||||
self.mqtt_client.loop_start()
|
||||
|
||||
except Exception as e:
|
||||
self.logger.error(f"MQTT setup error: {e}")
|
||||
raise
|
||||
|
||||
def on_mqtt_connect(self, client, userdata, flags, rc):
|
||||
"""MQTT connection callback"""
|
||||
if rc == 0:
|
||||
self.logger.info("Connected to MQTT broker")
|
||||
else:
|
||||
self.logger.error(f"Failed to connect to MQTT broker: {rc}")
|
||||
|
||||
def on_mqtt_disconnect(self, client, userdata, rc):
|
||||
"""MQTT disconnection callback"""
|
||||
self.logger.warning("Disconnected from MQTT broker")
|
||||
|
||||
def on_mqtt_publish(self, client, userdata, mid):
|
||||
"""MQTT publish callback"""
|
||||
self.logger.debug(f"Message published: {mid}")
|
||||
|
||||
def generate_item_id(self, feed_url: str, item: Dict[str, Any]) -> str:
|
||||
"""Generate unique ID for RSS item"""
|
||||
# Use link if available, otherwise use title + published date
|
||||
identifier = item.get('link', '') + item.get('title', '') + item.get('published', '')
|
||||
return hashlib.md5((feed_url + identifier).encode()).hexdigest()
|
||||
|
||||
def parse_rss_feed(self, feed_url: str) -> list:
|
||||
"""Parse RSS feed and return new items (limited to most recent items)"""
|
||||
try:
|
||||
self.logger.info(f"Parsing feed: {feed_url}")
|
||||
feed = feedparser.parse(feed_url)
|
||||
|
||||
if feed.bozo:
|
||||
self.logger.warning(f"Feed parsing issues for {feed_url}: {feed.bozo_exception}")
|
||||
|
||||
# Initialize seen items set for this feed if not exists
|
||||
if feed_url not in self.seen_items:
|
||||
self.seen_items[feed_url] = set()
|
||||
|
||||
# Get max items to check from config (default 5)
|
||||
max_items = self.config.get("rss", {}).get("max_items_per_feed", 5)
|
||||
recent_entries = feed.entries[:max_items]
|
||||
self.logger.debug(f"Processing top {len(recent_entries)} entries from {feed_url}")
|
||||
|
||||
new_items = []
|
||||
for entry in recent_entries:
|
||||
item_id = self.generate_item_id(feed_url, entry)
|
||||
|
||||
if item_id not in self.seen_items[feed_url]:
|
||||
self.seen_items[feed_url].add(item_id)
|
||||
new_items.append(entry)
|
||||
|
||||
self.logger.info(f"Found {len(new_items)} new items in {feed_url} (checked top {max_items})")
|
||||
return new_items
|
||||
|
||||
except Exception as e:
|
||||
self.logger.error(f"Error parsing feed {feed_url}: {e}")
|
||||
return []
|
||||
|
||||
def extract_clean_description(self, entry: Dict[str, Any]) -> str:
|
||||
"""Extract and clean description/summary from RSS entry"""
|
||||
# Try different fields for content
|
||||
content_candidates = [
|
||||
entry.get('summary', ''),
|
||||
entry.get('description', ''),
|
||||
entry.get('content', [{}])[0].get('value', '') if entry.get('content') else '',
|
||||
entry.get('subtitle', '')
|
||||
]
|
||||
|
||||
# Find the first non-empty content
|
||||
raw_content = ""
|
||||
for candidate in content_candidates:
|
||||
if candidate and candidate.strip():
|
||||
raw_content = candidate
|
||||
break
|
||||
|
||||
if not raw_content:
|
||||
return "No description available"
|
||||
|
||||
# Clean HTML and extract text
|
||||
clean_text = clean_html_text(raw_content)
|
||||
|
||||
# If still empty after cleaning, try original with basic cleanup
|
||||
if not clean_text:
|
||||
clean_text = re.sub(r'<[^>]+>', '', raw_content)
|
||||
clean_text = re.sub(r'\s+', ' ', clean_text).strip()
|
||||
|
||||
return clean_text or "No description available"
|
||||
|
||||
def create_notification(self, entry: Dict[str, Any], feed_title: str = "") -> Dict[str, Any]:
|
||||
"""Create notification object from RSS entry"""
|
||||
# Extract and clean title
|
||||
title = entry.get('title', 'No Title').strip()
|
||||
title = clean_html_text(title) or title # Clean any HTML in title too
|
||||
|
||||
if feed_title:
|
||||
title = f"[{feed_title}] {title}"
|
||||
|
||||
# Extract and clean description
|
||||
body = self.extract_clean_description(entry)
|
||||
|
||||
# Truncate body if too long
|
||||
max_length = self.config.get("notifications", {}).get("max_body_length", 150)
|
||||
if len(body) > max_length:
|
||||
body = body[:max_length-3] + "..."
|
||||
|
||||
# Get notification topic from config (for ntfy.sh)
|
||||
notification_topic = self.config.get("notifications", {}).get("topic", "rss-feeds")
|
||||
|
||||
# Create notification object matching the Rust struct format
|
||||
notification = {
|
||||
"title": title,
|
||||
"body": body,
|
||||
"topic": notification_topic # This is the ntfy.sh topic
|
||||
}
|
||||
|
||||
# Add link if available
|
||||
if 'link' in entry and entry['link'].strip():
|
||||
notification["link"] = entry['link'].strip()
|
||||
|
||||
return notification
|
||||
|
||||
def send_notification(self, notification: Dict[str, Any]):
|
||||
"""Send notification via MQTT"""
|
||||
try:
|
||||
payload = json.dumps(notification, ensure_ascii=False)
|
||||
result = self.mqtt_client.publish(
|
||||
self.config["broker"]["topic"],
|
||||
payload,
|
||||
qos=1
|
||||
)
|
||||
|
||||
if result.rc == mqtt.MQTT_ERR_SUCCESS:
|
||||
self.logger.info(f"Notification sent: {notification['title'][:50]}...")
|
||||
self.logger.debug(f"Full notification: {notification}")
|
||||
else:
|
||||
self.logger.error(f"Failed to send notification: {result.rc}")
|
||||
|
||||
except Exception as e:
|
||||
self.logger.error(f"Error sending notification: {e}")
|
||||
|
||||
def process_feeds(self):
|
||||
"""Process all RSS feeds and send notifications for new items"""
|
||||
feeds = self.config.get("rss", {}).get("feeds", [])
|
||||
|
||||
if not feeds:
|
||||
self.logger.warning("No RSS feeds configured")
|
||||
return
|
||||
|
||||
for feed_url in feeds:
|
||||
try:
|
||||
# Parse feed
|
||||
feed_data = feedparser.parse(feed_url)
|
||||
feed_title = clean_html_text(feed_data.feed.get('title', '')) or 'RSS Feed'
|
||||
|
||||
# Get new items
|
||||
new_items = self.parse_rss_feed(feed_url)
|
||||
|
||||
# Send notifications for new items
|
||||
for item in new_items:
|
||||
notification = self.create_notification(item, feed_title)
|
||||
self.send_notification(notification)
|
||||
|
||||
# Small delay to avoid overwhelming
|
||||
time.sleep(0.5)
|
||||
|
||||
except Exception as e:
|
||||
self.logger.error(f"Error processing feed {feed_url}: {e}")
|
||||
|
||||
def run_once(self):
|
||||
"""Run the notification process once"""
|
||||
self.logger.info("Starting RSS notification check")
|
||||
self.process_feeds()
|
||||
self.logger.info("RSS notification check completed")
|
||||
|
||||
def run_continuously(self):
|
||||
"""Run the notification process continuously"""
|
||||
check_interval = self.config.get("rss", {}).get("check_interval", 300)
|
||||
self.logger.info(f"Starting continuous RSS monitoring (interval: {check_interval}s)")
|
||||
|
||||
try:
|
||||
while True:
|
||||
self.run_once()
|
||||
time.sleep(check_interval)
|
||||
|
||||
except KeyboardInterrupt:
|
||||
self.logger.info("Shutting down...")
|
||||
except Exception as e:
|
||||
self.logger.error(f"Unexpected error: {e}")
|
||||
finally:
|
||||
self.cleanup()
|
||||
|
||||
def cleanup(self):
|
||||
"""Cleanup resources"""
|
||||
if self.mqtt_client:
|
||||
self.mqtt_client.loop_stop()
|
||||
self.mqtt_client.disconnect()
|
||||
self.logger.info("Cleanup completed")
|
||||
|
||||
|
||||
def main():
|
||||
"""Main entry point"""
|
||||
import argparse
|
||||
|
||||
parser = argparse.ArgumentParser(description='RSS to MQTT Push Notification Script')
|
||||
parser.add_argument('--config', default='config.toml', help='Config file path')
|
||||
parser.add_argument('--once', action='store_true', help='Run once and exit')
|
||||
parser.add_argument('--debug', action='store_true', help='Enable debug logging')
|
||||
args = parser.parse_args()
|
||||
|
||||
# Set debug level if requested
|
||||
if args.debug:
|
||||
logging.getLogger().setLevel(logging.DEBUG)
|
||||
|
||||
try:
|
||||
pusher = RSSNotificationPusher(args.config)
|
||||
|
||||
if args.once:
|
||||
pusher.run_once()
|
||||
else:
|
||||
pusher.run_continuously()
|
||||
|
||||
except Exception as e:
|
||||
print(f"Error: {e}")
|
||||
return 1
|
||||
|
||||
return 0
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
exit(main())
|
||||
Reference in New Issue
Block a user