Daily Update Monitoring¶
Implement systematic daily monitoring of Danish Parliamentary activity using batch processing strategies, scheduled updates, and automated reporting systems. This guide covers comprehensive approaches to track parliamentary changes, generate daily briefings, and maintain continuous oversight of legislative activity.
Overview¶
Daily update monitoring provides structured, reliable tracking of parliamentary activity through:
- Scheduled Batch Processing: Automated data collection at optimal times
- Activity Pattern Analysis: Understanding parliamentary working rhythms
- Daily Briefing Generation: Automated summaries of parliamentary activity
- Trend Detection: Identifying patterns in legislative activity
- Error Recovery: Robust handling of failed updates and data gaps
Parliamentary Activity Patterns¶
Danish Parliamentary Schedule¶
Understanding when parliamentary activity occurs helps optimize monitoring schedules:
import datetime
from dataclasses import dataclass
from typing import List, Dict
@dataclass
class ParliamentarySchedule:
"""Danish Parliamentary working patterns"""
# Regular session periods (September-June)
session_start_month = 9 # September
session_end_month = 6 # June
# Weekly patterns
working_days = [0, 1, 2, 3, 4] # Monday-Friday
committee_days = [1, 2, 3] # Tuesday-Thursday primary
plenary_days = [2, 3, 4] # Wednesday-Friday common
# Daily activity periods
morning_activity_start = 8 # 08:00
afternoon_activity_start = 14 # 14:00
evening_activity_end = 22 # 22:00
# Recess periods
summer_recess = (7, 8) # July-August
winter_recess = (12, 24, 12, 31) # Christmas/New Year
easter_recess_days = 7 # Around Easter
def is_parliamentary_session_active(date: datetime.date) -> bool:
"""Check if parliament is likely in active session"""
month = date.month
weekday = date.weekday()
# Check if in main session period
if month < 7 or month >= 9: # Sept-June
# Check if working day
if weekday < 5: # Monday-Friday
return True
return False
def get_expected_activity_level(date: datetime.date, hour: int) -> str:
"""Predict activity level for given date and hour"""
if not is_parliamentary_session_active(date):
return "minimal"
weekday = date.weekday()
# High activity periods
if weekday in [1, 2, 3] and 10 <= hour <= 20: # Tue-Thu, 10-20
return "high"
elif weekday in [0, 4] and 10 <= hour <= 18: # Mon/Fri, 10-18
return "moderate"
elif 8 <= hour <= 22: # Extended hours
return "low"
else:
return "minimal"
Activity Detection Patterns¶
async def analyze_daily_activity_patterns(days_back: int = 30) -> Dict:
"""Analyze recent activity patterns to optimize monitoring"""
patterns = {
'hourly_distribution': {},
'daily_distribution': {},
'update_frequency': {},
'peak_periods': []
}
start_date = datetime.now() - timedelta(days=days_back)
# Analyze update patterns by hour
for day in range(days_back):
check_date = start_date + timedelta(days=day)
# Get updates for this day
updates = await get_daily_updates(check_date)
# Group by hour
for update in updates:
hour = update['opdateringsdato'].hour
patterns['hourly_distribution'][hour] = patterns['hourly_distribution'].get(hour, 0) + 1
# Daily totals
weekday = check_date.strftime('%A')
patterns['daily_distribution'][weekday] = patterns['daily_distribution'].get(weekday, 0) + len(updates)
# Identify peak periods
if patterns['hourly_distribution']:
peak_hour = max(patterns['hourly_distribution'], key=patterns['hourly_distribution'].get)
patterns['peak_periods'].append(f"Peak hour: {peak_hour}:00-{peak_hour+1}:00")
return patterns
Daily Batch Processing Architecture¶
Scheduled Monitoring Framework¶
import asyncio
import schedule
import logging
from datetime import datetime, timedelta
from typing import Optional, Dict, List
from dataclasses import dataclass, asdict
import json
@dataclass
class DailyUpdateResult:
"""Results from daily update monitoring"""
date: str
total_updates: int
new_cases: int
updated_cases: int
new_documents: int
new_actors: int
processing_time_seconds: float
errors: List[str]
peak_activity_hour: Optional[int] = None
class DailyUpdateMonitor:
"""Comprehensive daily update monitoring system"""
def __init__(self, api_base_url: str = "https://oda.ft.dk/api"):
self.api_base_url = api_base_url
self.logger = self._setup_logging()
self.results_history: List[DailyUpdateResult] = []
def _setup_logging(self) -> logging.Logger:
"""Configure logging for daily monitoring"""
logger = logging.getLogger('daily_monitor')
logger.setLevel(logging.INFO)
# File handler for daily logs
file_handler = logging.FileHandler(
f'daily_monitor_{datetime.now().strftime("%Y%m")}.log'
)
file_handler.setFormatter(
logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')
)
logger.addHandler(file_handler)
return logger
async def run_daily_monitoring(self, target_date: Optional[datetime] = None) -> DailyUpdateResult:
"""Execute comprehensive daily monitoring"""
start_time = datetime.now()
target_date = target_date or datetime.now().date()
self.logger.info(f"Starting daily monitoring for {target_date}")
errors = []
total_updates = 0
try:
# Monitor different entity types
case_updates = await self._monitor_case_updates(target_date)
document_updates = await self._monitor_document_updates(target_date)
actor_updates = await self._monitor_actor_updates(target_date)
# Analyze activity patterns
activity_analysis = await self._analyze_daily_activity(target_date)
total_updates = (
case_updates['total'] +
document_updates['total'] +
actor_updates['total']
)
# Create result summary
result = DailyUpdateResult(
date=target_date.isoformat(),
total_updates=total_updates,
new_cases=case_updates['new'],
updated_cases=case_updates['updated'],
new_documents=document_updates['new'],
new_actors=actor_updates['new'],
processing_time_seconds=(datetime.now() - start_time).total_seconds(),
errors=errors,
peak_activity_hour=activity_analysis.get('peak_hour')
)
self.results_history.append(result)
# Generate daily report
await self._generate_daily_report(result)
self.logger.info(f"Daily monitoring completed: {total_updates} updates processed")
return result
except Exception as e:
error_msg = f"Daily monitoring failed: {str(e)}"
self.logger.error(error_msg)
errors.append(error_msg)
# Return error result
return DailyUpdateResult(
date=target_date.isoformat(),
total_updates=0,
new_cases=0,
updated_cases=0,
new_documents=0,
new_actors=0,
processing_time_seconds=(datetime.now() - start_time).total_seconds(),
errors=errors
)
async def _monitor_case_updates(self, target_date: datetime.date) -> Dict:
"""Monitor case updates for specific date"""
start_datetime = datetime.combine(target_date, datetime.min.time())
end_datetime = start_datetime + timedelta(days=1)
# Query for cases updated on target date
filter_query = (
f"opdateringsdato ge datetime'{start_datetime.isoformat()}' "
f"and opdateringsdato lt datetime'{end_datetime.isoformat()}'"
)
try:
response = await self._api_request(
"Sag",
filter_query=filter_query,
select="id,titel,opdateringsdato,statusid,typeid",
orderby="opdateringsdato desc"
)
# Categorize updates
new_cases = []
updated_cases = []
for case in response['value']:
# Simple heuristic: consider cases from same day as "new"
case_date = datetime.fromisoformat(case['opdateringsdato'].replace('Z', '+00:00'))
if case_date.date() == target_date:
# Further analysis could check if truly new vs updated
updated_cases.append(case)
return {
'total': len(response['value']),
'new': len(new_cases),
'updated': len(updated_cases),
'details': response['value']
}
except Exception as e:
self.logger.error(f"Failed to monitor case updates: {e}")
return {'total': 0, 'new': 0, 'updated': 0, 'details': []}
async def _monitor_document_updates(self, target_date: datetime.date) -> Dict:
"""Monitor document updates for specific date"""
start_datetime = datetime.combine(target_date, datetime.min.time())
end_datetime = start_datetime + timedelta(days=1)
filter_query = (
f"opdateringsdato ge datetime'{start_datetime.isoformat()}' "
f"and opdateringsdato lt datetime'{end_datetime.isoformat()}'"
)
try:
response = await self._api_request(
"Dokument",
filter_query=filter_query,
select="id,titel,opdateringsdato,typeid",
orderby="opdateringsdato desc"
)
return {
'total': len(response['value']),
'new': len(response['value']), # Simplified
'details': response['value']
}
except Exception as e:
self.logger.error(f"Failed to monitor document updates: {e}")
return {'total': 0, 'new': 0, 'details': []}
async def _monitor_actor_updates(self, target_date: datetime.date) -> Dict:
"""Monitor actor updates for specific date"""
start_datetime = datetime.combine(target_date, datetime.min.time())
end_datetime = start_datetime + timedelta(days=1)
filter_query = (
f"opdateringsdato ge datetime'{start_datetime.isoformat()}' "
f"and opdateringsdato lt datetime'{end_datetime.isoformat()}'"
)
try:
response = await self._api_request(
"Aktør",
filter_query=filter_query,
select="id,navn,opdateringsdato,typeid",
orderby="opdateringsdato desc"
)
return {
'total': len(response['value']),
'new': len(response['value']), # Simplified
'details': response['value']
}
except Exception as e:
self.logger.error(f"Failed to monitor actor updates: {e}")
return {'total': 0, 'new': 0, 'details': []}
Morning Briefing Generation¶
Daily Summary Reports¶
class DailyBriefingGenerator:
"""Generate comprehensive daily briefings of parliamentary activity"""
def __init__(self, monitor: DailyUpdateMonitor):
self.monitor = monitor
self.templates = {
'email': self._load_email_template(),
'json': self._load_json_template(),
'markdown': self._load_markdown_template()
}
async def generate_morning_briefing(self, date: datetime.date) -> Dict[str, str]:
"""Generate morning briefing in multiple formats"""
# Get yesterday's results
yesterday_result = await self.monitor.run_daily_monitoring(date - timedelta(days=1))
# Get weekly context
weekly_context = await self._get_weekly_context(date)
# Generate briefing content
briefing_data = {
'date': date.isoformat(),
'yesterday': yesterday_result,
'weekly_context': weekly_context,
'key_highlights': await self._extract_key_highlights(yesterday_result),
'upcoming_focus': await self._predict_today_activity(date)
}
# Generate in different formats
formats = {}
for format_name, template in self.templates.items():
formats[format_name] = self._render_template(template, briefing_data)
return formats
async def _extract_key_highlights(self, result: DailyUpdateResult) -> List[str]:
"""Extract key highlights from daily results"""
highlights = []
# High activity indicator
if result.total_updates > 50:
highlights.append(f"High parliamentary activity: {result.total_updates} total updates")
# New legislation
if result.new_cases > 10:
highlights.append(f"Significant new legislation: {result.new_cases} new cases")
# Peak activity timing
if result.peak_activity_hour:
highlights.append(f"Peak activity occurred at {result.peak_activity_hour}:00")
# Error conditions
if result.errors:
highlights.append(f"Monitoring issues detected: {len(result.errors)} errors")
return highlights
async def _get_weekly_context(self, date: datetime.date) -> Dict:
"""Get weekly activity context"""
week_start = date - timedelta(days=date.weekday())
weekly_totals = {
'total_updates': 0,
'total_cases': 0,
'total_documents': 0,
'active_days': 0
}
for day in range(7):
day_date = week_start + timedelta(days=day)
if day_date <= date:
# Get historical data for completed days
daily_data = await self._get_historical_daily_data(day_date)
if daily_data:
weekly_totals['total_updates'] += daily_data.get('total_updates', 0)
weekly_totals['total_cases'] += daily_data.get('new_cases', 0)
weekly_totals['total_documents'] += daily_data.get('new_documents', 0)
if daily_data.get('total_updates', 0) > 0:
weekly_totals['active_days'] += 1
return weekly_totals
def _render_template(self, template: str, data: Dict) -> str:
"""Render briefing template with data"""
import jinja2
jinja_template = jinja2.Template(template)
return jinja_template.render(**data)
def _load_email_template(self) -> str:
"""Email briefing template"""
return """
Subject: Parliamentary Daily Briefing - {{ date }}
Daily Parliamentary Activity Summary
====================================
Date: {{ date }}
Processing Time: {{ yesterday.processing_time_seconds|round(1) }}s
YESTERDAY'S ACTIVITY
-------------------
" Total Updates: {{ yesterday.total_updates }}
" New Cases: {{ yesterday.new_cases }}
" Updated Cases: {{ yesterday.updated_cases }}
" New Documents: {{ yesterday.new_documents }}
" New Actors: {{ yesterday.new_actors }}
{% if yesterday.peak_activity_hour %}
Peak Activity: {{ yesterday.peak_activity_hour }}:00-{{ yesterday.peak_activity_hour + 1 }}:00
{% endif %}
WEEKLY CONTEXT
--------------
" Week Total Updates: {{ weekly_context.total_updates }}
" Active Days This Week: {{ weekly_context.active_days }}/{{ ((date.weekday() + 1) if date.weekday() < 6 else 7) }}
" Average Daily Activity: {{ (weekly_context.total_updates / weekly_context.active_days)|round(1) if weekly_context.active_days > 0 else 0 }}
KEY HIGHLIGHTS
--------------
{% for highlight in key_highlights %}
" {{ highlight }}
{% endfor %}
{% if yesterday.errors %}
MONITORING ALERTS
-----------------
{% for error in yesterday.errors %}
" {{ error }}
{% endfor %}
{% endif %}
TODAY'S EXPECTED ACTIVITY
-------------------------
{{ upcoming_focus.description }}
---
Generated by Parliamentary Monitoring System
{{ date }} {{ "07:00" }} CET
"""
def _load_markdown_template(self) -> str:
"""Markdown briefing template for documentation/reports"""
return """
# Parliamentary Daily Briefing
**Date:** {{ date }}
**Generated:** {{ "now"|strftime("%Y-%m-%d %H:%M") }} CET
## Executive Summary
{{ key_highlights|join(' " ') }}
## Yesterday's Activity Metrics
| Metric | Count |
|--------|--------|
| Total Updates | {{ yesterday.total_updates }} |
| New Cases | {{ yesterday.new_cases }} |
| Updated Cases | {{ yesterday.updated_cases }} |
| New Documents | {{ yesterday.new_documents }} |
| New Actors | {{ yesterday.new_actors }} |
{% if yesterday.peak_activity_hour %}
**Peak Activity Period:** {{ yesterday.peak_activity_hour }}:00-{{ yesterday.peak_activity_hour + 1 }}:00
{% endif %}
## Weekly Context
- **Total Week Activity:** {{ weekly_context.total_updates }} updates
- **Active Days:** {{ weekly_context.active_days }} days
- **Daily Average:** {{ (weekly_context.total_updates / weekly_context.active_days)|round(1) if weekly_context.active_days > 0 else 0 }} updates/day
## Today's Outlook
{{ upcoming_focus.description }}
{% if yesterday.errors %}
## System Alerts
{% for error in yesterday.errors %}
- {{ error }}
{% endfor %}
{% endif %}
---
*Automated report from Parliamentary Monitoring System*
"""
Activity Level Analysis¶
Trend Detection and Patterns¶
class ActivityAnalyzer:
"""Analyze parliamentary activity trends and patterns"""
def __init__(self):
self.historical_data: List[DailyUpdateResult] = []
async def detect_activity_trends(self, days_back: int = 30) -> Dict:
"""Detect trends in parliamentary activity"""
if len(self.historical_data) < days_back:
await self._load_historical_data(days_back)
recent_data = self.historical_data[-days_back:]
trends = {
'overall_trend': self._calculate_trend(recent_data, 'total_updates'),
'case_trend': self._calculate_trend(recent_data, 'new_cases'),
'document_trend': self._calculate_trend(recent_data, 'new_documents'),
'activity_patterns': self._identify_patterns(recent_data),
'anomalies': self._detect_anomalies(recent_data),
'forecasts': await self._generate_forecasts(recent_data)
}
return trends
def _calculate_trend(self, data: List[DailyUpdateResult], metric: str) -> Dict:
"""Calculate trend for specific metric"""
values = [getattr(result, metric) for result in data]
if len(values) < 2:
return {'direction': 'insufficient_data', 'strength': 0}
# Simple linear trend calculation
x = list(range(len(values)))
slope = self._calculate_slope(x, values)
direction = 'increasing' if slope > 0.1 else 'decreasing' if slope < -0.1 else 'stable'
strength = abs(slope) / max(values) if max(values) > 0 else 0
return {
'direction': direction,
'strength': min(strength, 1.0),
'slope': slope,
'current_value': values[-1],
'average': sum(values) / len(values)
}
def _identify_patterns(self, data: List[DailyUpdateResult]) -> Dict:
"""Identify recurring patterns in activity"""
patterns = {
'weekly_patterns': {},
'peak_hours': {},
'quiet_periods': [],
'high_activity_periods': []
}
# Analyze weekly patterns
weekly_activity = {}
for result in data:
date = datetime.fromisoformat(result.date)
weekday = date.strftime('%A')
if weekday not in weekly_activity:
weekly_activity[weekday] = []
weekly_activity[weekday].append(result.total_updates)
# Calculate weekly averages
for weekday, activities in weekly_activity.items():
patterns['weekly_patterns'][weekday] = {
'average': sum(activities) / len(activities),
'peak': max(activities),
'days_count': len(activities)
}
# Identify peak hours
peak_hours = {}
for result in data:
if result.peak_activity_hour:
hour = result.peak_activity_hour
peak_hours[hour] = peak_hours.get(hour, 0) + 1
patterns['peak_hours'] = dict(sorted(peak_hours.items(), key=lambda x: x[1], reverse=True))
# Identify unusual periods
total_updates = [result.total_updates for result in data]
avg_activity = sum(total_updates) / len(total_updates)
std_dev = (sum((x - avg_activity) ** 2 for x in total_updates) / len(total_updates)) ** 0.5
for result in data:
if result.total_updates < avg_activity - std_dev:
patterns['quiet_periods'].append({
'date': result.date,
'activity': result.total_updates,
'deviation': avg_activity - result.total_updates
})
elif result.total_updates > avg_activity + std_dev:
patterns['high_activity_periods'].append({
'date': result.date,
'activity': result.total_updates,
'deviation': result.total_updates - avg_activity
})
return patterns
def _detect_anomalies(self, data: List[DailyUpdateResult]) -> List[Dict]:
"""Detect anomalous activity patterns"""
anomalies = []
if len(data) < 7:
return anomalies
# Calculate rolling averages and detect outliers
window_size = 7
for i in range(window_size, len(data)):
current = data[i]
recent_window = data[i-window_size:i]
window_avg = sum(r.total_updates for r in recent_window) / window_size
window_std = (sum((r.total_updates - window_avg) ** 2 for r in recent_window) / window_size) ** 0.5
# Check for anomalies (>2 standard deviations from recent average)
if abs(current.total_updates - window_avg) > 2 * window_std:
anomaly_type = 'spike' if current.total_updates > window_avg else 'drop'
anomalies.append({
'date': current.date,
'type': anomaly_type,
'value': current.total_updates,
'expected': window_avg,
'deviation': abs(current.total_updates - window_avg),
'significance': abs(current.total_updates - window_avg) / window_std
})
return anomalies
def _calculate_slope(self, x: List[float], y: List[float]) -> float:
"""Calculate linear regression slope"""
n = len(x)
sum_x = sum(x)
sum_y = sum(y)
sum_xy = sum(x[i] * y[i] for i in range(n))
sum_x2 = sum(x[i] ** 2 for i in range(n))
slope = (n * sum_xy - sum_x * sum_y) / (n * sum_x2 - sum_x ** 2)
return slope
Weekend and Recess Period Handling¶
Adaptive Monitoring Strategies¶
class AdaptiveMonitoringScheduler:
"""Adaptive scheduling based on parliamentary calendar"""
def __init__(self):
self.recess_periods = self._load_recess_calendar()
self.weekend_strategy = "reduced" # reduced, minimal, or normal
def _load_recess_calendar(self) -> List[Dict]:
"""Load parliamentary recess calendar"""
return [
{
'name': 'Summer Recess',
'start': (7, 1), # July 1
'end': (8, 31), # August 31
'monitoring_level': 'minimal'
},
{
'name': 'Christmas Recess',
'start': (12, 20), # December 20
'end': (1, 6), # January 6
'monitoring_level': 'reduced'
},
{
'name': 'Easter Recess',
'duration_days': 7, # Variable dates, 7 days around Easter
'monitoring_level': 'reduced'
}
]
def get_monitoring_strategy(self, date: datetime.date) -> str:
"""Determine monitoring strategy for given date"""
# Check if in recess period
recess_level = self._check_recess_period(date)
if recess_level:
return recess_level
# Check weekend
if date.weekday() >= 5: # Saturday, Sunday
return self.weekend_strategy
# Check session activity level
if is_parliamentary_session_active(date):
return "normal"
else:
return "reduced"
def _check_recess_period(self, date: datetime.date) -> Optional[str]:
"""Check if date falls in recess period"""
for recess in self.recess_periods:
if 'start' in recess and 'end' in recess:
start_month, start_day = recess['start']
end_month, end_day = recess['end']
# Handle year boundary (Christmas recess)
if start_month > end_month: # Crosses year boundary
if (date.month == start_month and date.day >= start_day) or \
(date.month == end_month and date.day <= end_day):
return recess['monitoring_level']
else:
if (date.month == start_month and date.day >= start_day and
date.month == end_month and date.day <= end_day):
return recess['monitoring_level']
return None
def schedule_monitoring_jobs(self) -> Dict[str, List[str]]:
"""Create monitoring schedule based on current strategy"""
today = datetime.now().date()
strategy = self.get_monitoring_strategy(today)
schedules = {
'normal': [
"07:00", # Morning briefing
"12:00", # Midday check
"17:00", # Afternoon summary
"21:00" # Evening wrap-up
],
'reduced': [
"08:00", # Morning briefing
"18:00" # Evening summary
],
'minimal': [
"09:00" # Single daily check
]
}
return {
'strategy': strategy,
'times': schedules.get(strategy, schedules['minimal'])
}
class WeekendMonitoring:
"""Specialized weekend monitoring with reduced overhead"""
async def run_weekend_monitoring(self, date: datetime.date) -> DailyUpdateResult:
"""Lightweight weekend monitoring"""
start_time = datetime.now()
try:
# Focus on essential updates only
urgent_updates = await self._check_urgent_updates(date)
emergency_cases = await self._check_emergency_cases(date)
total_updates = len(urgent_updates) + len(emergency_cases)
result = DailyUpdateResult(
date=date.isoformat(),
total_updates=total_updates,
new_cases=len(emergency_cases),
updated_cases=len(urgent_updates),
new_documents=0, # Skip detailed document monitoring
new_actors=0, # Skip detailed actor monitoring
processing_time_seconds=(datetime.now() - start_time).total_seconds(),
errors=[]
)
# Generate minimal weekend report
if total_updates > 0:
await self._generate_weekend_alert(result, urgent_updates, emergency_cases)
return result
except Exception as e:
return DailyUpdateResult(
date=date.isoformat(),
total_updates=0,
new_cases=0,
updated_cases=0,
new_documents=0,
new_actors=0,
processing_time_seconds=(datetime.now() - start_time).total_seconds(),
errors=[f"Weekend monitoring failed: {str(e)}"]
)
async def _check_urgent_updates(self, date: datetime.date) -> List[Dict]:
"""Check for urgent weekend updates"""
# Look for cases with urgent status updates
urgent_filter = (
"substringof('akut',titel) or substringof('hastebehandling',titel) or "
"substringof('øjeblikkeligt',titel)"
)
# Implementation would query API for urgent cases
return []
async def _check_emergency_cases(self, date: datetime.date) -> List[Dict]:
"""Check for emergency legislative cases"""
# Look for emergency procedures
emergency_filter = "substringof('nødlov',titel) or substringof('hastesag',titel)"
# Implementation would query API for emergency cases
return []
Performance Optimization¶
Batch Processing Optimization¶
class OptimizedBatchProcessor:
"""High-performance batch processing for daily updates"""
def __init__(self, max_concurrent_requests: int = 5):
self.max_concurrent_requests = max_concurrent_requests
self.session = None
self.request_semaphore = asyncio.Semaphore(max_concurrent_requests)
async def __aenter__(self):
import aiohttp
self.session = aiohttp.ClientSession(
timeout=aiohttp.ClientTimeout(total=30),
connector=aiohttp.TCPConnector(limit=100)
)
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
if self.session:
await self.session.close()
async def process_daily_batch(self, date: datetime.date) -> Dict:
"""Optimized batch processing for daily updates"""
# Parallel processing tasks
tasks = [
self._batch_process_entity('Sag', date, ['id', 'titel', 'opdateringsdato']),
self._batch_process_entity('Dokument', date, ['id', 'titel', 'opdateringsdato']),
self._batch_process_entity('Aktør', date, ['id', 'navn', 'opdateringsdato']),
]
# Run tasks concurrently
results = await asyncio.gather(*tasks, return_exceptions=True)
# Combine results
combined_result = {
'cases': results[0] if not isinstance(results[0], Exception) else {'total': 0, 'items': []},
'documents': results[1] if not isinstance(results[1], Exception) else {'total': 0, 'items': []},
'actors': results[2] if not isinstance(results[2], Exception) else {'total': 0, 'items': []},
'errors': [str(r) for r in results if isinstance(r, Exception)]
}
return combined_result
async def _batch_process_entity(self, entity_name: str, date: datetime.date,
select_fields: List[str]) -> Dict:
"""Process single entity type with pagination"""
all_items = []
skip = 0
batch_size = 100
start_datetime = datetime.combine(date, datetime.min.time())
end_datetime = start_datetime + timedelta(days=1)
filter_query = (
f"opdateringsdato ge datetime'{start_datetime.isoformat()}' "
f"and opdateringsdato lt datetime'{end_datetime.isoformat()}'"
)
while True:
try:
async with self.request_semaphore:
batch = await self._api_request(
entity_name,
filter_query=filter_query,
select=','.join(select_fields),
skip=skip,
top=batch_size,
orderby="opdateringsdato desc"
)
if not batch.get('value') or len(batch['value']) == 0:
break
all_items.extend(batch['value'])
# Check if we got less than batch_size (last page)
if len(batch['value']) < batch_size:
break
skip += batch_size
# Rate limiting
await asyncio.sleep(0.1)
except Exception as e:
# Log error but continue processing
print(f"Error processing {entity_name} batch at skip={skip}: {e}")
break
return {
'total': len(all_items),
'items': all_items
}
async def _api_request(self, endpoint: str, **params) -> Dict:
"""Make optimized API request"""
url = f"https://oda.ft.dk/api/{endpoint}"
# Build query parameters
query_params = {}
for key, value in params.items():
if value is not None:
if key == 'filter_query':
query_params['$filter'] = value
elif key in ['select', 'orderby', 'expand']:
query_params[f'${key}'] = value
elif key in ['skip', 'top']:
query_params[f'${key}'] = str(value)
async with self.session.get(url, params=query_params) as response:
response.raise_for_status()
return await response.json()
Error Handling and Recovery¶
Robust Error Management¶
class RobustMonitoringSystem:
"""Monitoring system with comprehensive error handling and recovery"""
def __init__(self):
self.retry_config = {
'max_retries': 3,
'backoff_factor': 2,
'base_delay': 1
}
self.fallback_strategies = {
'api_failure': 'use_cached_data',
'network_timeout': 'retry_with_reduced_scope',
'rate_limit': 'exponential_backoff'
}
async def robust_daily_monitoring(self, date: datetime.date) -> DailyUpdateResult:
"""Daily monitoring with comprehensive error recovery"""
for attempt in range(self.retry_config['max_retries']):
try:
return await self._attempt_monitoring(date, attempt)
except APIException as e:
await self._handle_api_error(e, attempt)
except NetworkException as e:
await self._handle_network_error(e, attempt)
except Exception as e:
await self._handle_unexpected_error(e, attempt)
if attempt < self.retry_config['max_retries'] - 1:
delay = self._calculate_backoff_delay(attempt)
await asyncio.sleep(delay)
# All retries failed, return minimal error result
return await self._create_emergency_result(date)
async def _attempt_monitoring(self, date: datetime.date, attempt: int) -> DailyUpdateResult:
"""Attempt daily monitoring with progressive scope reduction"""
if attempt == 0:
# Full monitoring
return await self._full_monitoring(date)
elif attempt == 1:
# Reduced scope monitoring
return await self._reduced_monitoring(date)
else:
# Minimal monitoring
return await self._minimal_monitoring(date)
async def _handle_api_error(self, error: Exception, attempt: int):
"""Handle API-specific errors"""
if "rate limit" in str(error).lower():
# Implement exponential backoff for rate limiting
delay = (2 ** attempt) * 60 # 1, 2, 4 minutes
await asyncio.sleep(delay)
elif "server error" in str(error).lower():
# Server errors might be temporary
pass
else:
# Log unexpected API errors
logging.error(f"API error on attempt {attempt}: {error}")
async def _create_emergency_result(self, date: datetime.date) -> DailyUpdateResult:
"""Create emergency result when all monitoring attempts fail"""
return DailyUpdateResult(
date=date.isoformat(),
total_updates=-1, # Indicates monitoring failure
new_cases=0,
updated_cases=0,
new_documents=0,
new_actors=0,
processing_time_seconds=0,
errors=["Complete monitoring failure - all retry attempts exhausted"]
)
def _calculate_backoff_delay(self, attempt: int) -> float:
"""Calculate exponential backoff delay"""
base_delay = self.retry_config['base_delay']
backoff_factor = self.retry_config['backoff_factor']
return base_delay * (backoff_factor ** attempt)
class MonitoringHealthCheck:
"""Health monitoring for the monitoring system itself"""
def __init__(self):
self.health_metrics = {
'last_successful_run': None,
'consecutive_failures': 0,
'average_response_time': 0,
'error_rate': 0
}
async def perform_health_check(self) -> Dict:
"""Comprehensive health check of monitoring system"""
health_status = {
'status': 'healthy',
'checks': {},
'alerts': []
}
# Check API connectivity
api_status = await self._check_api_connectivity()
health_status['checks']['api_connectivity'] = api_status
if not api_status['healthy']:
health_status['status'] = 'degraded'
health_status['alerts'].append('API connectivity issues detected')
# Check data freshness
data_freshness = await self._check_data_freshness()
health_status['checks']['data_freshness'] = data_freshness
if not data_freshness['healthy']:
health_status['status'] = 'degraded'
health_status['alerts'].append('Data freshness issues detected')
# Check system resources
resource_status = self._check_system_resources()
health_status['checks']['system_resources'] = resource_status
if not resource_status['healthy']:
health_status['status'] = 'critical'
health_status['alerts'].append('System resource constraints detected')
return health_status
async def _check_api_connectivity(self) -> Dict:
"""Check API endpoint connectivity and response times"""
try:
start_time = time.time()
# Simple connectivity test
async with aiohttp.ClientSession() as session:
async with session.get('https://oda.ft.dk/api/Sag?$top=1') as response:
response_time = time.time() - start_time
return {
'healthy': response.status == 200,
'response_time': response_time,
'status_code': response.status
}
except Exception as e:
return {
'healthy': False,
'error': str(e),
'response_time': None
}
async def _check_data_freshness(self) -> Dict:
"""Check if data is being updated regularly"""
try:
# Check most recent update timestamp
async with aiohttp.ClientSession() as session:
url = 'https://oda.ft.dk/api/Sag'
params = {
'$top': '1',
'$orderby': 'opdateringsdato desc',
'$select': 'opdateringsdato'
}
async with session.get(url, params=params) as response:
if response.status == 200:
data = await response.json()
if data.get('value'):
last_update = datetime.fromisoformat(
data['value'][0]['opdateringsdato'].replace('Z', '+00:00')
)
# Check if data is recent (within last 24 hours)
hours_since_update = (datetime.now(last_update.tzinfo) - last_update).total_seconds() / 3600
return {
'healthy': hours_since_update < 24,
'last_update': last_update.isoformat(),
'hours_since_update': hours_since_update
}
return {'healthy': False, 'error': 'No data available'}
except Exception as e:
return {'healthy': False, 'error': str(e)}
Integration with Email and Notification Systems¶
Multi-Channel Notification System¶
import smtplib
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
from email.mime.base import MIMEBase
from email import encoders
import json
class NotificationSystem:
"""Multi-channel notification system for parliamentary monitoring"""
def __init__(self, config: Dict):
self.config = config
self.channels = {
'email': self._setup_email(),
'slack': self._setup_slack(),
'webhook': self._setup_webhook(),
'sms': self._setup_sms()
}
def _setup_email(self) -> Dict:
"""Configure email notifications"""
return {
'enabled': self.config.get('email', {}).get('enabled', False),
'smtp_server': self.config.get('email', {}).get('smtp_server', 'smtp.gmail.com'),
'smtp_port': self.config.get('email', {}).get('smtp_port', 587),
'username': self.config.get('email', {}).get('username'),
'password': self.config.get('email', {}).get('password'),
'recipients': self.config.get('email', {}).get('recipients', [])
}
async def send_daily_briefing(self, briefing_data: Dict, formats: Dict[str, str]):
"""Send daily briefing through configured channels"""
notifications_sent = []
# Email notifications
if self.channels['email']['enabled']:
email_result = await self._send_email_briefing(briefing_data, formats.get('email', ''))
notifications_sent.append(('email', email_result))
# Slack notifications
if self.channels['slack']['enabled']:
slack_result = await self._send_slack_briefing(briefing_data)
notifications_sent.append(('slack', slack_result))
# Webhook notifications
if self.channels['webhook']['enabled']:
webhook_result = await self._send_webhook_notification(briefing_data)
notifications_sent.append(('webhook', webhook_result))
return notifications_sent
async def _send_email_briefing(self, briefing_data: Dict, email_content: str) -> Dict:
"""Send email briefing"""
try:
msg = MIMEMultipart()
msg['From'] = self.channels['email']['username']
msg['Subject'] = f"Parliamentary Daily Briefing - {briefing_data['date']}"
# Add HTML and plain text versions
html_content = self._convert_to_html(email_content)
msg.attach(MIMEText(email_content, 'plain', 'utf-8'))
msg.attach(MIMEText(html_content, 'html', 'utf-8'))
# Add JSON data as attachment for advanced users
json_attachment = MIMEBase('application', 'json')
json_attachment.set_payload(json.dumps(briefing_data, indent=2, ensure_ascii=False).encode('utf-8'))
encoders.encode_base64(json_attachment)
json_attachment.add_header(
'Content-Disposition',
f'attachment; filename="parliamentary_data_{briefing_data["date"]}.json"'
)
msg.attach(json_attachment)
# Send to all recipients
server = smtplib.SMTP(self.channels['email']['smtp_server'], self.channels['email']['smtp_port'])
server.starttls()
server.login(self.channels['email']['username'], self.channels['email']['password'])
sent_count = 0
for recipient in self.channels['email']['recipients']:
msg['To'] = recipient
server.send_message(msg)
sent_count += 1
del msg['To'] # Remove for next iteration
server.quit()
return {'success': True, 'sent_count': sent_count}
except Exception as e:
return {'success': False, 'error': str(e)}
def _convert_to_html(self, text_content: str) -> str:
"""Convert plain text briefing to HTML"""
html_content = text_content.replace('\n', '<br>')
# Add basic HTML structure
html_template = f"""
<!DOCTYPE html>
<html>
<head>
<meta charset="UTF-8">
<title>Parliamentary Daily Briefing</title>
<style>
body {{ font-family: Arial, sans-serif; margin: 40px; }}
h1, h2 {{ color: #d32f2f; }}
.highlight {{ background-color: #fff3e0; padding: 10px; border-left: 4px solid #ff9800; }}
.metrics {{ background-color: #f5f5f5; padding: 15px; border-radius: 5px; }}
table {{ border-collapse: collapse; width: 100%; }}
th, td {{ border: 1px solid #ddd; padding: 8px; text-align: left; }}
th {{ background-color: #f2f2f2; }}
</style>
</head>
<body>
<pre style="white-space: pre-wrap; font-family: inherit;">{html_content}</pre>
</body>
</html>
"""
return html_template
async def send_emergency_alert(self, alert_data: Dict):
"""Send urgent alerts through fastest channels"""
urgent_channels = ['sms', 'slack'] # Fastest notification channels
alert_message = f"""
=¨ PARLIAMENTARY MONITORING ALERT
Time: {datetime.now().strftime('%Y-%m-%d %H:%M')}
Type: {alert_data.get('type', 'Unknown')}
Severity: {alert_data.get('severity', 'High')}
Details: {alert_data.get('message', 'No details available')}
Immediate action may be required.
"""
for channel in urgent_channels:
if self.channels[channel]['enabled']:
try:
await self._send_channel_message(channel, alert_message)
except Exception as e:
# Log error but continue with other channels
logging.error(f"Failed to send alert via {channel}: {e}")
# Usage Example
async def setup_daily_monitoring_system():
"""Complete setup example for daily monitoring system"""
# Configuration
config = {
'email': {
'enabled': True,
'smtp_server': 'smtp.gmail.com',
'smtp_port': 587,
'username': 'monitoring@example.com',
'password': 'app_password',
'recipients': ['analyst@example.com', 'manager@example.com']
},
'monitoring': {
'max_concurrent_requests': 5,
'retry_attempts': 3,
'weekend_strategy': 'reduced'
}
}
# Initialize systems
monitor = DailyUpdateMonitor()
briefing_generator = DailyBriefingGenerator(monitor)
notification_system = NotificationSystem(config)
scheduler = AdaptiveMonitoringScheduler()
# Setup daily schedule
schedule.every().day.at("07:00").do(
lambda: asyncio.create_task(run_morning_briefing(
monitor, briefing_generator, notification_system
))
)
# Setup weekend monitoring
schedule.every().saturday.at("09:00").do(
lambda: asyncio.create_task(run_weekend_monitoring(monitor))
)
schedule.every().sunday.at("09:00").do(
lambda: asyncio.create_task(run_weekend_monitoring(monitor))
)
# Run scheduler
while True:
schedule.run_pending()
await asyncio.sleep(60) # Check every minute
async def run_morning_briefing(monitor, briefing_generator, notification_system):
"""Execute morning briefing routine"""
try:
# Generate briefing
today = datetime.now().date()
briefing_formats = await briefing_generator.generate_morning_briefing(today)
# Send notifications
briefing_data = {
'date': today.isoformat(),
'generated_at': datetime.now().isoformat()
}
await notification_system.send_daily_briefing(briefing_data, briefing_formats)
logging.info(f"Morning briefing sent successfully for {today}")
except Exception as e:
logging.error(f"Morning briefing failed: {e}")
# Send emergency alert
await notification_system.send_emergency_alert({
'type': 'morning_briefing_failure',
'severity': 'High',
'message': f"Morning briefing failed: {str(e)}"
})
Summary¶
This comprehensive daily update monitoring system provides:
- Intelligent Scheduling: Adaptive monitoring based on parliamentary calendar and activity patterns
- Robust Error Handling: Multi-level retry strategies and fallback mechanisms
- Performance Optimization: Concurrent processing and efficient API usage
- Comprehensive Reporting: Daily briefings in multiple formats with trend analysis
- Multi-Channel Notifications: Email, Slack, SMS, and webhook integration
- Weekend/Recess Adaptation: Scaled monitoring during low-activity periods
- Health Monitoring: Self-monitoring capabilities with alerts
- Data Analysis: Trend detection and anomaly identification
The system is designed to provide reliable, continuous monitoring of Danish Parliamentary activity while adapting to the natural rhythms of parliamentary work and optimizing resource usage during different activity periods.