Document Flow Tracking through the Danish Parliamentary System¶
This comprehensive guide explains how to track document flow through the Danish Parliamentary system using the OData API. Understanding document lifecycle, relationships, and automated monitoring enables sophisticated tracking of legislative processes from document creation to final publication.
Overview¶
Document flow tracking involves monitoring documents as they move through the parliamentary system, from initial creation through various stages of review, processing, and publication. The Danish Parliament API provides rich relationship data through junction tables, enabling comprehensive tracking of:
- Document lifecycle stages and status transitions
- Actor relationships and role changes over time
- Committee workflows and processing milestones
- Cross-reference networks between related documents
- Publication timelines and revision tracking
Document Lifecycle Fundamentals¶
Core Document Entities¶
The document tracking system revolves around several key entities:
graph TD
D[Dokument<br/>Core Document] --> DT[Dokumenttype<br/>28 Types]
D --> DS[Dokumentstatus<br/>Status Lifecycle]
D --> DK[Dokumentkategori<br/>Categories]
D <--> DA[DokumentAktør<br/>25 Role Types]
DA <--> A[Aktør<br/>Politicians/Committees]
D <--> SD[SagDokument<br/>Case Relationships]
SD <--> S[Sag<br/>Legislative Cases]
D <--> STD[SagstrinDokument<br/>Process Steps]
STD <--> ST[Sagstrin<br/>Case Steps]
D <--> F[Fil<br/>Attached Files]
style D fill:#fff3e0
style DA fill:#e1f5fe
style SD fill:#e8f5e8
Document Lifecycle Stages¶
Documents progress through distinct lifecycle stages tracked via multiple timestamp fields:
- Creation:
dato- Initial document creation - Processing: Internal parliamentary processing (tracked via status changes)
- Release:
frigivelsesdato- Public release timestamp - Updates:
opdateringsdato- Ongoing modifications and corrections - Revision:
Omtrykentity tracks reprints and corrections
Document Types and Legislative Journey¶
Key Document Types in Legislative Process¶
The Danish Parliament uses 28 document types, each following specific workflow patterns:
| Type ID | Type Name | Danish | Typical Flow |
|---|---|---|---|
| 1 | Report | Redegørelse | Committee Publication |
| 3 | Ministerial Statement | Ministerredegørelse | Ministry Parliament Publication |
| 4 | Act Document | Aktstykke | Ministry Committee Voting Publication |
| 5 | Inquiry | Forespørgsel | MP Ministry Response Publication |
| 13 | Parliamentary Question | Spørgsmål | MP Ministry Answer Publication |
| 15 | Proposal | Forslag | MP/Committee Review Voting Publication |
Document Type Flow Tracking¶
def track_document_type_flow(document_type_id, days_back=30):
"""Track documents of specific type through their lifecycle"""
since_date = (datetime.now() - timedelta(days=days_back)).isoformat()
# Get documents with full relationship context
filter_query = f"typeid eq {document_type_id} and opdateringsdato gt datetime'{since_date}'"
expand_query = "Dokumenttype,Dokumentstatus,DokumentAktør/Aktør,SagDokument/Sag"
url = f"https://oda.ft.dk/api/Dokument?%24filter={filter_query}&%24expand={expand_query}&%24orderby=opdateringsdato desc&%24top=100"
response = requests.get(url)
documents = response.json()['value']
flow_analysis = {}
for doc in documents:
doc_flow = {
'id': doc['id'],
'title': doc['titel'][:100],
'created': doc['dato'],
'released': doc.get('frigivelsesdato'),
'updated': doc['opdateringsdato'],
'status': doc.get('Dokumentstatus', {}).get('status', 'Unknown'),
'actors': [
{
'name': rel['Aktør']['navn'],
'role': rel.get('DokumentAktørRolle', {}).get('rolle', 'Unknown')
}
for rel in doc.get('DokumentAktør', [])
],
'cases': [case['Sag']['titel'] for case in doc.get('SagDokument', [])]
}
flow_analysis[doc['id']] = doc_flow
return flow_analysis
# Example: Track parliamentary questions
questions_flow = track_document_type_flow(13, days_back=7)
Document-Actor Relationships and Role Tracking¶
Understanding Document Roles¶
The DokumentAktør junction table captures 25 different role types that define how actors interact with documents:
Communication Flow Roles¶
- Afsender (Sender) - Document originator
- Til (To) - Primary recipient
- Modtager (Receiver) - Formal recipient
- Kopi til (Copy to) - CC recipient
Parliamentary Process Roles¶
- Forslagsstiller (Proposer) - Legislative proposal author
- Stiller (Submitter) - Document submitter
- Spørger (Questioner) - Parliamentary question author
- Besvaret af (Answered by) - Question responder
- Minister - Ministerial association
Role Change Tracking¶
def track_document_role_changes(document_id):
"""Track how actor roles change for a document over time"""
# Get all document-actor relationships with timestamps
filter_query = f"dokumentid eq {document_id}"
expand_query = "Aktør,DokumentAktørRolle"
url = f"https://oda.ft.dk/api/DokumentAktør?%24filter={filter_query}&%24expand={expand_query}&%24orderby=opdateringsdato"
response = requests.get(url)
relationships = response.json()['value']
role_timeline = []
for rel in relationships:
role_timeline.append({
'timestamp': rel['opdateringsdato'],
'actor': rel['Aktør']['navn'],
'role': rel['DokumentAktørRolle']['rolle'],
'role_id': rel['rolleid']
})
return role_timeline
# Example: Track specific document roles
role_changes = track_document_role_changes(12345)
for change in role_changes:
print(f"{change['timestamp']}: {change['actor']} as {change['role']}")
Multi-Role Actor Analysis¶
async function analyzeActorDocumentRoles(actorId, monthsBack = 6) {
const sinceDate = new Date();
sinceDate.setMonth(sinceDate.getMonth() - monthsBack);
const isoDate = sinceDate.toISOString();
// Get all document relationships for this actor
const filter = `aktørid eq ${actorId} and opdateringsdato gt datetime'${isoDate}'`;
const expand = 'Dokument($select=titel,dato,typeid),DokumentAktørRolle($select=rolle)';
const url = `https://oda.ft.dk/api/DokumentAktør?%24filter=${filter}&%24expand=${expand}&%24orderby=opdateringsdato desc&%24top=1000`;
const response = await fetch(url);
const data = await response.json();
// Analyze role patterns
const roleAnalysis = {};
data.value.forEach(rel => {
const role = rel.DokumentAktørRolle?.rolle || 'Unknown';
if (!roleAnalysis[role]) {
roleAnalysis[role] = {
count: 0,
documents: [],
firstSeen: rel.opdateringsdato,
lastSeen: rel.opdateringsdato
};
}
roleAnalysis[role].count++;
roleAnalysis[role].documents.push({
id: rel.dokumentid,
title: rel.Dokument?.titel?.substring(0, 100),
date: rel.Dokument?.dato,
type: rel.Dokument?.typeid
});
if (rel.opdateringsdato < roleAnalysis[role].firstSeen) {
roleAnalysis[role].firstSeen = rel.opdateringsdato;
}
if (rel.opdateringsdato > roleAnalysis[role].lastSeen) {
roleAnalysis[role].lastSeen = rel.opdateringsdato;
}
});
return roleAnalysis;
}
// Example: Analyze minister's document roles
const ministerRoles = await analyzeActorDocumentRoles(12345);
console.log('Document roles for actor:', ministerRoles);
Publication Timeline Monitoring¶
Document Publishing Milestones¶
Track key publication milestones using timestamp analysis:
def analyze_publication_timeline(document_ids):
"""Analyze publication timeline for multiple documents"""
# Get documents with full timestamp information
document_filter = " or ".join([f"id eq {doc_id}" for doc_id in document_ids])
select_fields = "id,titel,dato,frigivelsesdato,opdateringsdato,statusid"
expand_query = "Dokumentstatus"
url = f"https://oda.ft.dk/api/Dokument?%24filter={document_filter}&%24select={select_fields}&%24expand={expand_query}"
response = requests.get(url)
documents = response.json()['value']
timeline_analysis = []
for doc in documents:
created = datetime.fromisoformat(doc['dato'].replace('Z', '+00:00'))
released = None
if doc.get('frigivelsesdato'):
released = datetime.fromisoformat(doc['frigivelsesdato'].replace('Z', '+00:00'))
updated = datetime.fromisoformat(doc['opdateringsdato'].replace('Z', '+00:00'))
# Calculate processing times
processing_time = None
if released:
processing_time = (released - created).total_seconds() / 3600 # hours
timeline_data = {
'document_id': doc['id'],
'title': doc['titel'][:100],
'created': created,
'released': released,
'last_updated': updated,
'status': doc.get('Dokumentstatus', {}).get('status', 'Unknown'),
'processing_hours': processing_time,
'still_processing': released is None,
'recent_updates': (datetime.now(timezone.utc) - updated).days < 7
}
timeline_analysis.append(timeline_data)
return sorted(timeline_analysis, key=lambda x: x['created'])
# Example: Analyze recent parliamentary questions
recent_questions = get_recent_documents_by_type(13, days_back=14)
question_ids = [q['id'] for q in recent_questions[:20]]
timeline = analyze_publication_timeline(question_ids)
Publication Pattern Detection¶
def detect_publication_patterns(document_type_id, months_back=12):
"""Detect patterns in document publication timing"""
start_date = (datetime.now() - timedelta(days=months_back*30)).isoformat()
filter_query = f"typeid eq {document_type_id} and frigivelsesdato gt datetime'{start_date}' and frigivelsesdato ne null"
url = f"https://oda.ft.dk/api/Dokument?%24filter={filter_query}&%24select=frigivelsesdato,dato&%24top=1000"
response = requests.get(url)
documents = response.json()['value']
patterns = {
'by_weekday': defaultdict(int),
'by_hour': defaultdict(int),
'processing_times': [],
'monthly_counts': defaultdict(int)
}
for doc in documents:
created = datetime.fromisoformat(doc['dato'].replace('Z', '+00:00'))
released = datetime.fromisoformat(doc['frigivelsesdato'].replace('Z', '+00:00'))
# Publication day patterns
patterns['by_weekday'][released.strftime('%A')] += 1
patterns['by_hour'][released.hour] += 1
patterns['monthly_counts'][released.strftime('%Y-%m')] += 1
# Processing time patterns
processing_hours = (released - created).total_seconds() / 3600
patterns['processing_times'].append(processing_hours)
# Calculate statistics
if patterns['processing_times']:
patterns['avg_processing_hours'] = statistics.mean(patterns['processing_times'])
patterns['median_processing_hours'] = statistics.median(patterns['processing_times'])
patterns['processing_percentiles'] = {
'25th': statistics.quantiles(patterns['processing_times'], n=4)[0],
'75th': statistics.quantiles(patterns['processing_times'], n=4)[2],
'95th': statistics.quantiles(patterns['processing_times'], n=100)[94]
}
return patterns
# Example: Analyze ministerial statement patterns
patterns = detect_publication_patterns(3) # Ministerredegørelse
print(f"Average processing time: {patterns['avg_processing_hours']:.1f} hours")
print(f"Most common publication day: {max(patterns['by_weekday'], key=patterns['by_weekday'].get)}")
Document Version Control and Revision Tracking¶
Tracking Document Revisions¶
The Omtryk (Reprint) entity tracks document revisions and corrections:
def track_document_revisions(document_id):
"""Track all revisions for a document"""
# Get document revisions
filter_query = f"dokumentid eq {document_id}"
url = f"https://oda.ft.dk/api/Omtryk?%24filter={filter_query}&%24orderby=dato"
response = requests.get(url)
revisions = response.json()['value']
# Get original document info
doc_url = f"https://oda.ft.dk/api/Dokument({document_id})?%24select=titel,dato,opdateringsdato"
doc_response = requests.get(doc_url)
original_doc = doc_response.json()
revision_history = {
'document_id': document_id,
'original_title': original_doc['titel'],
'original_date': original_doc['dato'],
'last_updated': original_doc['opdateringsdato'],
'revisions': []
}
for revision in revisions:
revision_info = {
'revision_date': revision['dato'],
'reason': revision.get('grund', 'No reason specified'),
'revision_id': revision['id'],
'days_since_original': (
datetime.fromisoformat(revision['dato'].replace('Z', '+00:00')) -
datetime.fromisoformat(original_doc['dato'].replace('Z', '+00:00'))
).days
}
revision_history['revisions'].append(revision_info)
return revision_history
# Example: Check for revised documents
def find_frequently_revised_documents(min_revisions=2):
"""Find documents with multiple revisions"""
# Get documents that have revisions
url = f"https://oda.ft.dk/api/Dokument?%24expand=Omtryk&%24filter=Omtryk/any()&%24top=100"
response = requests.get(url)
documents = response.json()['value']
frequently_revised = []
for doc in documents:
revision_count = len(doc.get('Omtryk', []))
if revision_count >= min_revisions:
frequently_revised.append({
'id': doc['id'],
'title': doc['titel'][:100],
'revision_count': revision_count,
'revisions': doc['Omtryk']
})
return sorted(frequently_revised, key=lambda x: x['revision_count'], reverse=True)
revised_docs = find_frequently_revised_documents()
Change Detection System¶
class DocumentChangeDetector {
constructor() {
this.knownDocuments = new Map();
this.changeLog = [];
}
async checkForChanges(documentIds) {
const changes = [];
// Batch request for current document states
const idsFilter = documentIds.map(id => `id eq ${id}`).join(' or ');
const url = `https://oda.ft.dk/api/Dokument?%24filter=${idsFilter}&%24select=id,titel,opdateringsdato,statusid&%24expand=Dokumentstatus`;
const response = await fetch(url);
const data = await response.json();
for (const doc of data.value) {
const docId = doc.id;
const currentState = {
title: doc.titel,
lastUpdated: doc.opdateringsdato,
status: doc.Dokumentstatus?.status || 'Unknown'
};
if (this.knownDocuments.has(docId)) {
const previousState = this.knownDocuments.get(docId);
// Check for changes
const docChanges = [];
if (currentState.title !== previousState.title) {
docChanges.push({
field: 'title',
old: previousState.title,
new: currentState.title
});
}
if (currentState.lastUpdated !== previousState.lastUpdated) {
docChanges.push({
field: 'lastUpdated',
old: previousState.lastUpdated,
new: currentState.lastUpdated
});
}
if (currentState.status !== previousState.status) {
docChanges.push({
field: 'status',
old: previousState.status,
new: currentState.status
});
}
if (docChanges.length > 0) {
changes.push({
documentId: docId,
timestamp: new Date().toISOString(),
changes: docChanges
});
}
}
// Update known state
this.knownDocuments.set(docId, currentState);
}
// Log changes
this.changeLog.push(...changes);
return changes;
}
getChangeHistory(documentId) {
return this.changeLog.filter(change => change.documentId === documentId);
}
}
// Example usage
const detector = new DocumentChangeDetector();
// Monitor specific documents
const documentsToMonitor = [12345, 12346, 12347];
setInterval(async () => {
const changes = await detector.checkForChanges(documentsToMonitor);
if (changes.length > 0) {
console.log('Document changes detected:', changes);
}
}, 300000); // Check every 5 minutes
Committee Document Workflows¶
Committee Processing Tracking¶
Track how documents flow through committee processes:
def track_committee_document_workflow(committee_actor_id, days_back=30):
"""Track documents processed by a specific committee"""
since_date = (datetime.now() - timedelta(days=days_back)).isoformat()
# Find committee involvement in documents
filter_query = f"aktørid eq {committee_actor_id} and opdateringsdato gt datetime'{since_date}'"
expand_query = "Dokument($select=titel,dato,typeid;$expand=Dokumenttype),DokumentAktørRolle"
url = f"https://oda.ft.dk/api/DokumentAktør?%24filter={filter_query}&%24expand={expand_query}&%24orderby=opdateringsdato desc&%24top=200"
response = requests.get(url)
relationships = response.json()['value']
workflow_analysis = {
'committee_id': committee_actor_id,
'total_documents': len(relationships),
'by_role': defaultdict(list),
'by_document_type': defaultdict(int),
'timeline': []
}
for rel in relationships:
doc = rel['Dokument']
role = rel['DokumentAktørRolle']['rolle']
doc_type = doc.get('Dokumenttype', {}).get('type', 'Unknown')
workflow_entry = {
'document_id': doc['id'],
'title': doc['titel'][:100],
'role': role,
'date': rel['opdateringsdato'],
'document_type': doc_type
}
workflow_analysis['by_role'][role].append(workflow_entry)
workflow_analysis['by_document_type'][doc_type] += 1
workflow_analysis['timeline'].append(workflow_entry)
# Sort timeline by date
workflow_analysis['timeline'].sort(key=lambda x: x['date'], reverse=True)
return workflow_analysis
# Example: Track Environment Committee workflows
env_committee_id = 1234 # Example committee actor ID
committee_workflow = track_committee_document_workflow(env_committee_id)
Cross-Committee Document Analysis¶
def analyze_multi_committee_documents():
"""Find documents involving multiple committees"""
# Get committee actor IDs (assuming committee actors have specific type)
committees_url = "https://oda.ft.dk/api/Aktør?%24filter=typeid eq 6&%24select=id,navn&%24top=100" # Assuming type 6 is committees
committees_response = requests.get(committees_url)
committees = committees_response.json()['value']
committee_ids = [c['id'] for c in committees]
# Find documents with multiple committee involvement
multi_committee_docs = {}
for committee_id in committee_ids:
filter_query = f"aktørid eq {committee_id}"
url = f"https://oda.ft.dk/api/DokumentAktør?%24filter={filter_query}&%24select=dokumentid,rolleid&%24top=1000"
response = requests.get(url)
relationships = response.json()['value']
for rel in relationships:
doc_id = rel['dokumentid']
if doc_id not in multi_committee_docs:
multi_committee_docs[doc_id] = []
multi_committee_docs[doc_id].append({
'committee_id': committee_id,
'role_id': rel['rolleid']
})
# Filter for documents with 2+ committees
multi_committee_results = {
doc_id: committees
for doc_id, committees in multi_committee_docs.items()
if len(committees) >= 2
}
return multi_committee_results
multi_committee_docs = analyze_multi_committee_documents()
print(f"Found {len(multi_committee_docs)} documents involving multiple committees")
Cross-Reference Tracking Between Related Documents¶
Document Relationship Networks¶
Track how documents reference and relate to each other:
def build_document_relationship_network(seed_document_id, depth=2):
"""Build a network of related documents starting from a seed document"""
visited = set()
document_network = {}
queue = [(seed_document_id, 0)]
while queue and depth > 0:
doc_id, current_depth = queue.pop(0)
if doc_id in visited or current_depth >= depth:
continue
visited.add(doc_id)
# Get document info and relationships
doc_url = f"https://oda.ft.dk/api/Dokument({doc_id})?%24select=id,titel,dato,typeid&%24expand=SagDokument/Sag"
doc_response = requests.get(doc_url)
document = doc_response.json()
document_network[doc_id] = {
'title': document['titel'][:100],
'date': document['dato'],
'type_id': document['typeid'],
'related_documents': [],
'cases': []
}
# Find related documents through cases
for case_rel in document.get('SagDokument', []):
case_id = case_rel['sagid']
# Get other documents in the same case
case_docs_url = f"https://oda.ft.dk/api/SagDokument?%24filter=sagid eq {case_id}&%24select=dokumentid&%24top=100"
case_docs_response = requests.get(case_docs_url)
case_documents = case_docs_response.json()['value']
for case_doc in case_documents:
related_doc_id = case_doc['dokumentid']
if related_doc_id != doc_id and related_doc_id not in visited:
document_network[doc_id]['related_documents'].append(related_doc_id)
if current_depth + 1 < depth:
queue.append((related_doc_id, current_depth + 1))
document_network[doc_id]['cases'].append(case_id)
return document_network
# Example: Build network from parliamentary question
network = build_document_relationship_network(12345, depth=3)
Document Citation Analysis¶
def analyze_document_citations(document_ids, search_terms=None):
"""Analyze how documents cite or reference each other"""
citation_patterns = {
'explicit_citations': [],
'keyword_connections': [],
'temporal_clusters': [],
'actor_connections': []
}
# Get documents with full text and relationships
doc_filter = " or ".join([f"id eq {doc_id}" for doc_id in document_ids])
expand_query = "DokumentAktør/Aktør,SagDokument/Sag"
url = f"https://oda.ft.dk/api/Dokument?%24filter={doc_filter}&%24expand={expand_query}&%24top=1000"
response = requests.get(url)
documents = response.json()['value']
# Build citation network
for doc in documents:
doc_analysis = {
'id': doc['id'],
'title': doc['titel'],
'actors': [rel['Aktør']['navn'] for rel in doc.get('DokumentAktør', [])],
'cases': [rel['Sag']['titel'] for rel in doc.get('SagDokument', [])],
'date': doc['dato']
}
# Look for explicit references to other documents
title_text = doc['titel'].lower()
for other_doc in documents:
if other_doc['id'] != doc['id']:
other_id_str = str(other_doc['id'])
if other_id_str in title_text or any(
keyword in title_text for keyword in ['svar på', 'besvarelse', 'opfølgning']
):
citation_patterns['explicit_citations'].append({
'citing_doc': doc['id'],
'cited_doc': other_doc['id'],
'type': 'explicit_reference'
})
# Find keyword connections if search terms provided
if search_terms:
for term in search_terms:
if term.lower() in title_text:
citation_patterns['keyword_connections'].append({
'document_id': doc['id'],
'keyword': term,
'title_snippet': doc['titel'][:100]
})
return citation_patterns
# Example: Analyze climate-related document citations
climate_docs = get_documents_by_keyword('klima', limit=50)
climate_citations = analyze_document_citations(
[doc['id'] for doc in climate_docs],
search_terms=['klima', 'miljø', 'CO2', 'bæredygtig']
)
Automated Document Flow Monitoring Systems¶
Real-Time Document Monitor¶
class DocumentFlowMonitor:
def __init__(self, config):
self.config = config
self.last_check = datetime.now(timezone.utc)
self.monitored_documents = set()
self.alerts = []
def add_document_monitoring(self, document_id, alert_conditions=None):
"""Add a document to monitoring with specific alert conditions"""
self.monitored_documents.add(document_id)
if alert_conditions:
self.config.setdefault('alerts', {})[document_id] = alert_conditions
def check_for_updates(self):
"""Check for updates across all monitored documents"""
if not self.monitored_documents:
return []
current_time = datetime.now(timezone.utc)
since_last_check = self.last_check.isoformat()
# Build filter for monitored documents
doc_filter = " or ".join([f"id eq {doc_id}" for doc_id in self.monitored_documents])
filter_query = f"({doc_filter}) and opdateringsdato gt datetime'{since_last_check}'"
# Get updated documents with relationships
expand_query = "Dokumentstatus,DokumentAktør/Aktør,DokumentAktør/DokumentAktørRolle"
url = f"https://oda.ft.dk/api/Dokument?%24filter={filter_query}&%24expand={expand_query}&%24top=1000"
response = requests.get(url)
updated_documents = response.json()['value']
alerts = []
for doc in updated_documents:
doc_id = doc['id']
# Create update notification
update_info = {
'document_id': doc_id,
'title': doc['titel'][:100],
'timestamp': doc['opdateringsdato'],
'status': doc.get('Dokumentstatus', {}).get('status', 'Unknown'),
'changes_detected': []
}
# Check for specific alert conditions
alert_conditions = self.config.get('alerts', {}).get(doc_id, {})
# Status change alerts
if alert_conditions.get('status_changes'):
current_status = doc.get('Dokumentstatus', {}).get('status')
if current_status in alert_conditions['status_changes']:
update_info['changes_detected'].append(f"Status changed to: {current_status}")
# Actor role alerts
if alert_conditions.get('role_changes'):
for role_rel in doc.get('DokumentAktør', []):
role = role_rel.get('DokumentAktørRolle', {}).get('rolle')
if role in alert_conditions['role_changes']:
actor_name = role_rel.get('Aktør', {}).get('navn', 'Unknown')
update_info['changes_detected'].append(f"New role: {actor_name} as {role}")
alerts.append(update_info)
self.last_check = current_time
self.alerts.extend(alerts)
return alerts
def get_alert_history(self, hours_back=24):
"""Get alert history for specified time period"""
cutoff_time = datetime.now(timezone.utc) - timedelta(hours=hours_back)
return [
alert for alert in self.alerts
if datetime.fromisoformat(alert['timestamp'].replace('Z', '+00:00')) >= cutoff_time
]
# Example: Set up monitoring system
monitor_config = {
'alerts': {
12345: {
'status_changes': ['Published', 'Withdrawn', 'Revised'],
'role_changes': ['Besvaret af', 'Minister']
}
}
}
monitor = DocumentFlowMonitor(monitor_config)
monitor.add_document_monitoring(12345)
# Check for updates every 5 minutes
import schedule
import time
schedule.every(5).minutes.do(lambda: monitor.check_for_updates())
while True:
schedule.run_pending()
time.sleep(60)
Batch Document Processing Pipeline¶
class DocumentFlowPipeline {
constructor() {
this.processors = [];
this.results = [];
}
addProcessor(name, processorFunction) {
this.processors.push({ name, process: processorFunction });
}
async processDocuments(documentIds, batchSize = 50) {
const results = [];
// Process documents in batches to avoid API limits
for (let i = 0; i < documentIds.length; i += batchSize) {
const batch = documentIds.slice(i, i + batchSize);
const batchResults = await this.processBatch(batch);
results.push(...batchResults);
}
return results;
}
async processBatch(documentIds) {
// Fetch document data for batch
const idsFilter = documentIds.map(id => `id eq ${id}`).join(' or ');
const url = `https://oda.ft.dk/api/Dokument?%24filter=${idsFilter}&%24expand=Dokumenttype,Dokumentstatus,DokumentAktør/Aktør,SagDokument/Sag&%24top=${documentIds.length}`;
const response = await fetch(url);
const data = await response.json();
const batchResults = [];
for (const document of data.value) {
const docResult = {
id: document.id,
processingResults: {}
};
// Run all processors on this document
for (const processor of this.processors) {
try {
const result = await processor.process(document);
docResult.processingResults[processor.name] = result;
} catch (error) {
docResult.processingResults[processor.name] = { error: error.message };
}
}
batchResults.push(docResult);
}
return batchResults;
}
}
// Example processors
const pipeline = new DocumentFlowPipeline();
// Add document classification processor
pipeline.addProcessor('classification', (document) => {
return {
type: document.Dokumenttype?.type || 'Unknown',
status: document.Dokumentstatus?.status || 'Unknown',
hasActors: document.DokumentAktør?.length > 0,
hasCases: document.SagDokument?.length > 0
};
});
// Add timeline processor
pipeline.addProcessor('timeline', (document) => {
const created = new Date(document.dato);
const updated = new Date(document.opdateringsdato);
const released = document.frigivelsesdato ? new Date(document.frigivelsesdato) : null;
return {
ageInDays: Math.floor((new Date() - created) / (1000 * 60 * 60 * 24)),
processingTime: released ? Math.floor((released - created) / (1000 * 60 * 60)) : null,
recentlyUpdated: (new Date() - updated) < (7 * 24 * 60 * 60 * 1000)
};
});
// Add actor network processor
pipeline.addProcessor('actorNetwork', (document) => {
const actors = document.DokumentAktør || [];
const roles = {};
actors.forEach(rel => {
const actorName = rel.Aktør?.navn || 'Unknown';
const roleName = rel.DokumentAktørRolle?.rolle || 'Unknown';
if (!roles[roleName]) roles[roleName] = [];
roles[roleName].push(actorName);
});
return {
totalActors: actors.length,
roles: roles,
hasMinister: actors.some(rel => rel.rolleid === 5),
hasCommittee: actors.some(rel => rel.Aktør?.typeid === 6)
};
});
// Process a batch of documents
const recentDocuments = await getRecentDocuments(100);
const documentIds = recentDocuments.map(doc => doc.id);
const results = await pipeline.processDocuments(documentIds);
console.log('Processed', results.length, 'documents');
Document Status Change Detection and Alerts¶
Status Transition Monitoring¶
class DocumentStatusMonitor:
def __init__(self):
self.status_history = defaultdict(list)
self.transition_rules = {}
def add_transition_rule(self, from_status, to_status, alert_callback):
"""Add a rule to trigger alerts on specific status transitions"""
key = f"{from_status} -> {to_status}"
self.transition_rules[key] = alert_callback
def monitor_status_changes(self, document_ids, check_interval_minutes=15):
"""Monitor documents for status changes"""
while True:
try:
# Get current status for all documents
doc_filter = " or ".join([f"id eq {doc_id}" for doc_id in document_ids])
url = f"https://oda.ft.dk/api/Dokument?%24filter={doc_filter}&%24expand=Dokumentstatus&%24select=id,titel,statusid&%24top={len(document_ids)}"
response = requests.get(url)
documents = response.json()['value']
current_time = datetime.now()
for doc in documents:
doc_id = doc['id']
current_status = doc.get('Dokumentstatus', {}).get('status', 'Unknown')
# Check for status changes
if doc_id in self.status_history:
previous_status = self.status_history[doc_id][-1]['status']
if current_status != previous_status:
# Status changed - trigger alerts
transition_key = f"{previous_status} -> {current_status}"
# Log the change
self.status_history[doc_id].append({
'status': current_status,
'timestamp': current_time,
'previous_status': previous_status
})
# Check for matching transition rules
if transition_key in self.transition_rules:
self.transition_rules[transition_key](doc, previous_status, current_status)
# Generic status change alert
if 'any_change' in self.transition_rules:
self.transition_rules['any_change'](doc, previous_status, current_status)
else:
# First time seeing this document
self.status_history[doc_id].append({
'status': current_status,
'timestamp': current_time,
'previous_status': None
})
# Wait before next check
time.sleep(check_interval_minutes * 60)
except Exception as e:
print(f"Error in status monitoring: {e}")
time.sleep(60) # Wait a minute before retrying
def get_status_history(self, document_id):
"""Get complete status history for a document"""
return self.status_history.get(document_id, [])
# Example: Set up status monitoring with alerts
def alert_on_publication(document, old_status, new_status):
print(f"=â Document {document['id']} published: {document['titel'][:100]}")
print(f" Status: {old_status} {new_status}")
def alert_on_withdrawal(document, old_status, new_status):
print(f"=4 Document {document['id']} withdrawn: {document['titel'][:100]}")
print(f" Status: {old_status} {new_status}")
def alert_any_change(document, old_status, new_status):
print(f"=Ý Status change for document {document['id']}: {old_status} {new_status}")
monitor = DocumentStatusMonitor()
monitor.add_transition_rule("Draft", "Published", alert_on_publication)
monitor.add_transition_rule("Published", "Withdrawn", alert_on_withdrawal)
monitor.add_transition_rule("any_change", alert_any_change)
# Start monitoring (this would run in a background thread)
documents_to_monitor = [12345, 12346, 12347]
# monitor.monitor_status_changes(documents_to_monitor, check_interval_minutes=10)
Advanced Alert System¶
class DocumentAlertSystem {
constructor() {
this.subscribers = new Map();
this.alertHistory = [];
this.isRunning = false;
}
subscribe(alertType, callback) {
if (!this.subscribers.has(alertType)) {
this.subscribers.set(alertType, []);
}
this.subscribers.get(alertType).push(callback);
}
async triggerAlert(alertType, data) {
const alert = {
type: alertType,
timestamp: new Date().toISOString(),
data: data,
id: Date.now() + Math.random()
};
this.alertHistory.push(alert);
// Notify subscribers
const callbacks = this.subscribers.get(alertType) || [];
for (const callback of callbacks) {
try {
await callback(alert);
} catch (error) {
console.error(`Alert callback error:`, error);
}
}
// Also notify wildcard subscribers
const wildcardCallbacks = this.subscribers.get('*') || [];
for (const callback of wildcardCallbacks) {
try {
await callback(alert);
} catch (error) {
console.error(`Wildcard alert callback error:`, error);
}
}
}
async monitorDocumentFlow(documentIds, conditions = {}) {
this.isRunning = true;
let lastCheck = new Date();
while (this.isRunning) {
try {
// Check for document updates
const updates = await this.checkDocumentUpdates(documentIds, lastCheck);
for (const update of updates) {
// Check various alert conditions
await this.processDocumentUpdate(update, conditions);
}
lastCheck = new Date();
// Wait 5 minutes before next check
await new Promise(resolve => setTimeout(resolve, 5 * 60 * 1000));
} catch (error) {
console.error('Document monitoring error:', error);
await new Promise(resolve => setTimeout(resolve, 60 * 1000));
}
}
}
async checkDocumentUpdates(documentIds, since) {
const sinceISO = since.toISOString();
const idsFilter = documentIds.map(id => `id eq ${id}`).join(' or ');
const filter = `(${idsFilter}) and opdateringsdato gt datetime'${sinceISO}'`;
const url = `https://oda.ft.dk/api/Dokument?%24filter=${filter}&%24expand=Dokumentstatus,DokumentAktør/Aktør,DokumentAktør/DokumentAktørRolle,SagDokument/Sag&%24orderby=opdateringsdato desc&%24top=1000`;
const response = await fetch(url);
const data = await response.json();
return data.value;
}
async processDocumentUpdate(document, conditions) {
const docData = {
id: document.id,
title: document.titel,
status: document.Dokumentstatus?.status || 'Unknown',
lastUpdated: document.opdateringsdato,
actors: document.DokumentAktør?.map(rel => ({
name: rel.Aktør?.navn,
role: rel.DokumentAktørRolle?.rolle
})) || [],
cases: document.SagDokument?.map(rel => rel.Sag?.titel) || []
};
// Status change alerts
if (conditions.statusAlerts) {
for (const statusToWatch of conditions.statusAlerts) {
if (docData.status === statusToWatch) {
await this.triggerAlert('status_change', {
document: docData,
newStatus: statusToWatch,
message: `Document status changed to: ${statusToWatch}`
});
}
}
}
// Actor role alerts
if (conditions.actorRoleAlerts) {
for (const actor of docData.actors) {
if (conditions.actorRoleAlerts.includes(actor.role)) {
await this.triggerAlert('actor_role', {
document: docData,
actor: actor,
message: `New actor role: ${actor.name} as ${actor.role}`
});
}
}
}
// Keyword alerts
if (conditions.keywordAlerts) {
const titleLower = docData.title.toLowerCase();
for (const keyword of conditions.keywordAlerts) {
if (titleLower.includes(keyword.toLowerCase())) {
await this.triggerAlert('keyword_match', {
document: docData,
keyword: keyword,
message: `Document contains keyword: ${keyword}`
});
}
}
}
// Always trigger general update alert
await this.triggerAlert('document_updated', {
document: docData,
message: 'Document was updated'
});
}
stop() {
this.isRunning = false;
}
getAlertHistory(alertType = null, hours = 24) {
const cutoff = new Date(Date.now() - (hours * 60 * 60 * 1000));
return this.alertHistory
.filter(alert => new Date(alert.timestamp) >= cutoff)
.filter(alert => !alertType || alert.type === alertType)
.sort((a, b) => new Date(b.timestamp) - new Date(a.timestamp));
}
}
// Example: Set up comprehensive alert system
const alertSystem = new DocumentAlertSystem();
// Subscribe to different alert types
alertSystem.subscribe('status_change', async (alert) => {
console.log(`=Ê Status Alert: ${alert.data.message}`);
console.log(` Document: ${alert.data.document.title.substring(0, 100)}`);
});
alertSystem.subscribe('actor_role', async (alert) => {
console.log(`=d Actor Alert: ${alert.data.message}`);
console.log(` Document: ${alert.data.document.title.substring(0, 100)}`);
});
alertSystem.subscribe('keyword_match', async (alert) => {
console.log(`=
Keyword Alert: ${alert.data.message}`);
console.log(` Document: ${alert.data.document.title.substring(0, 100)}`);
});
// Generic alert subscriber
alertSystem.subscribe('*', async (alert) => {
// Log all alerts to a database or file
console.log(`[${alert.timestamp}] ${alert.type}: ${alert.data.message}`);
});
// Start monitoring with specific conditions
const monitoringConditions = {
statusAlerts: ['Published', 'Withdrawn', 'Under Review'],
actorRoleAlerts: ['Minister', 'Besvaret af', 'Forslagsstiller'],
keywordAlerts: ['klima', 'budget', 'sundhed']
};
const documentsToMonitor = [12345, 12346, 12347];
// Start monitoring (would typically run this in background)
// alertSystem.monitorDocumentFlow(documentsToMonitor, monitoringConditions);
Performance Optimization for Bulk Document Tracking¶
Efficient Batch Processing Strategies¶
class BulkDocumentTracker:
def __init__(self, max_concurrent=5, batch_size=100):
self.max_concurrent = max_concurrent
self.batch_size = batch_size
self.rate_limiter = RateLimiter(max_calls=100, time_window=60) # 100 calls per minute
async def track_document_collection(self, document_ids, tracking_options):
"""Efficiently track a large collection of documents"""
# Split into batches for processing
batches = [
document_ids[i:i + self.batch_size]
for i in range(0, len(document_ids), self.batch_size)
]
results = []
semaphore = asyncio.Semaphore(self.max_concurrent)
async def process_batch(batch):
async with semaphore:
await self.rate_limiter.wait()
return await self.track_document_batch(batch, tracking_options)
# Process batches concurrently
batch_tasks = [process_batch(batch) for batch in batches]
batch_results = await asyncio.gather(*batch_tasks)
# Flatten results
for batch_result in batch_results:
results.extend(batch_result)
return results
async def track_document_batch(self, document_ids, options):
"""Track a batch of documents with optimized queries"""
# Build efficient filter query
doc_filter = " or ".join([f"id eq {doc_id}" for doc_id in document_ids])
# Optimize expansions based on tracking options
expansions = []
if options.get('include_status'):
expansions.append("Dokumentstatus")
if options.get('include_actors'):
expansions.append("DokumentAktør/Aktør")
if options.get('include_actor_roles'):
expansions.append("DokumentAktør/DokumentAktørRolle")
if options.get('include_cases'):
expansions.append("SagDokument/Sag")
expand_query = ",".join(expansions) if expansions else None
# Build select fields for efficiency
select_fields = ["id", "titel", "dato", "opdateringsdato", "typeid"]
if options.get('include_release_date'):
select_fields.append("frigivelsesdato")
if options.get('include_content_fields'):
select_fields.extend(["spørgsmålstitel", "procedurenummer"])
select_query = ",".join(select_fields)
# Construct optimized URL
url_params = [
f"%24filter={doc_filter}",
f"%24select={select_query}",
f"%24top={len(document_ids)}"
]
if expand_query:
url_params.append(f"%24expand={expand_query}")
url = f"https://oda.ft.dk/api/Dokument?{'&'.join(url_params)}"
# Make request with retry logic
max_retries = 3
for attempt in range(max_retries):
try:
async with aiohttp.ClientSession() as session:
async with session.get(url, timeout=aiohttp.ClientTimeout(total=60)) as response:
if response.status == 200:
data = await response.json()
return self.process_batch_results(data['value'], options)
else:
print(f"HTTP {response.status} for batch, attempt {attempt + 1}")
except Exception as e:
print(f"Error processing batch (attempt {attempt + 1}): {e}")
if attempt < max_retries - 1:
await asyncio.sleep(2 ** attempt) # Exponential backoff
else:
raise
return []
def process_batch_results(self, documents, options):
"""Process and optimize batch results"""
processed_docs = []
for doc in documents:
processed_doc = {
'id': doc['id'],
'title': doc['titel'][:150], # Truncate for memory efficiency
'created': doc['dato'],
'updated': doc['opdateringsdato'],
'type_id': doc['typeid']
}
# Add optional fields based on tracking options
if options.get('include_status') and 'Dokumentstatus' in doc:
processed_doc['status'] = doc['Dokumentstatus'].get('status')
if options.get('include_actors') and 'DokumentAktør' in doc:
processed_doc['actors'] = [
{
'name': rel.get('Aktør', {}).get('navn'),
'role': rel.get('DokumentAktørRolle', {}).get('rolle') if options.get('include_actor_roles') else None,
'role_id': rel.get('rolleid')
}
for rel in doc['DokumentAktør']
]
if options.get('include_cases') and 'SagDokument' in doc:
processed_doc['cases'] = [
rel['Sag']['titel'][:100] for rel in doc['SagDokument']
]
processed_docs.append(processed_doc)
return processed_docs
# Rate limiter utility
class RateLimiter:
def __init__(self, max_calls, time_window):
self.max_calls = max_calls
self.time_window = time_window
self.calls = []
async def wait(self):
now = time.time()
# Remove old calls outside the time window
self.calls = [call_time for call_time in self.calls if now - call_time < self.time_window]
if len(self.calls) >= self.max_calls:
sleep_time = self.time_window - (now - self.calls[0])
if sleep_time > 0:
await asyncio.sleep(sleep_time)
self.calls.append(now)
# Example: Bulk tracking of parliamentary questions
async def track_parliamentary_questions_bulk():
# Get recent parliamentary questions
recent_questions_url = "https://oda.ft.dk/api/Dokument?%24filter=typeid eq 13 and opdateringsdato gt datetime'2025-08-01T00:00:00'&%24select=id&%24top=1000"
async with aiohttp.ClientSession() as session:
async with session.get(recent_questions_url) as response:
data = await response.json()
question_ids = [doc['id'] for doc in data['value']]
# Set up tracking options
tracking_options = {
'include_status': True,
'include_actors': True,
'include_actor_roles': True,
'include_cases': True,
'include_release_date': True
}
# Create bulk tracker and process
tracker = BulkDocumentTracker(max_concurrent=3, batch_size=50)
results = await tracker.track_document_collection(question_ids, tracking_options)
print(f"Tracked {len(results)} parliamentary questions")
return results
# Run bulk tracking
# results = asyncio.run(track_parliamentary_questions_bulk())
Memory-Efficient Document Streaming¶
def stream_document_analysis(filter_query, analysis_function, batch_size=100):
"""Stream document analysis to handle large datasets efficiently"""
offset = 0
total_processed = 0
while True:
# Get batch with pagination
url = f"https://oda.ft.dk/api/Dokument?%24filter={filter_query}&%24skip={offset}&%24top={batch_size}&%24orderby=id"
response = requests.get(url)
batch_data = response.json()['value']
if not batch_data:
break # No more data
# Process batch and yield results
for document in batch_data:
try:
result = analysis_function(document)
if result: # Only yield non-None results
yield result
total_processed += 1
except Exception as e:
print(f"Error processing document {document.get('id')}: {e}")
offset += batch_size
# Optional: Add progress reporting
if total_processed % 1000 == 0:
print(f"Processed {total_processed} documents...")
# Example: Memory-efficient analysis of all documents
def analyze_document_metrics(document):
"""Analyze key metrics for a document"""
created = datetime.fromisoformat(document['dato'].replace('Z', '+00:00'))
updated = datetime.fromisoformat(document['opdateringsdato'].replace('Z', '+00:00'))
return {
'id': document['id'],
'age_days': (datetime.now(timezone.utc) - created).days,
'last_update_days': (datetime.now(timezone.utc) - updated).days,
'title_length': len(document.get('titel', '')),
'has_question': bool(document.get('spørgsmålstitel')),
'type_id': document.get('typeid')
}
# Stream analysis results
filter_all_docs = "id gt 0" # Simple filter to get all documents
metrics_generator = stream_document_analysis(filter_all_docs, analyze_document_metrics)
# Process results in batches to avoid memory issues
batch_results = []
for i, metrics in enumerate(metrics_generator):
batch_results.append(metrics)
if len(batch_results) >= 1000:
# Process accumulated batch (save to database, analyze, etc.)
process_metrics_batch(batch_results)
batch_results = [] # Clear for next batch
if i >= 50000: # Limit for demo purposes
break
# Process final batch
if batch_results:
process_metrics_batch(batch_results)
Optimized Multi-Entity Tracking¶
class OptimizedMultiEntityTracker {
constructor() {
this.cache = new Map();
this.batchQueue = new Map();
this.processingInterval = null;
}
startBatchProcessing(intervalMs = 5000) {
this.processingInterval = setInterval(() => {
this.processBatches();
}, intervalMs);
}
stopBatchProcessing() {
if (this.processingInterval) {
clearInterval(this.processingInterval);
this.processingInterval = null;
}
}
addToTrackingQueue(entityType, entityId, options = {}) {
if (!this.batchQueue.has(entityType)) {
this.batchQueue.set(entityType, new Map());
}
this.batchQueue.get(entityType).set(entityId, {
id: entityId,
options: options,
addedAt: Date.now()
});
}
async processBatches() {
for (const [entityType, entities] of this.batchQueue.entries()) {
if (entities.size === 0) continue;
const entityIds = Array.from(entities.keys());
const batchOptions = Array.from(entities.values())[0].options; // Use first item's options
try {
await this.processBatch(entityType, entityIds, batchOptions);
entities.clear(); // Clear processed entities
} catch (error) {
console.error(`Error processing ${entityType} batch:`, error);
}
}
}
async processBatch(entityType, entityIds, options) {
const batchSize = 100;
// Process in smaller batches if needed
for (let i = 0; i < entityIds.length; i += batchSize) {
const batch = entityIds.slice(i, i + batchSize);
await this.processEntityBatch(entityType, batch, options);
}
}
async processEntityBatch(entityType, entityIds, options) {
const idsFilter = entityIds.map(id => `id eq ${id}`).join(' or ');
const baseUrl = `https://oda.ft.dk/api/${entityType}`;
// Build optimized query parameters
const params = new URLSearchParams({
'%24filter': idsFilter,
'%24top': entityIds.length.toString()
});
// Add expansions based on entity type and options
const expansions = this.getOptimalExpansions(entityType, options);
if (expansions.length > 0) {
params.append('%24expand', expansions.join(','));
}
// Add field selection for efficiency
const selectFields = this.getSelectFields(entityType, options);
if (selectFields.length > 0) {
params.append('%24select', selectFields.join(','));
}
const url = `${baseUrl}?${params}`;
try {
const response = await fetch(url);
if (!response.ok) {
throw new Error(`HTTP ${response.status}: ${response.statusText}`);
}
const data = await response.json();
// Cache results for future use
for (const entity of data.value) {
const cacheKey = `${entityType}:${entity.id}`;
this.cache.set(cacheKey, {
data: entity,
cachedAt: Date.now()
});
}
// Process results if callback provided
if (options.onResults) {
options.onResults(entityType, data.value);
}
} catch (error) {
console.error(`Failed to fetch ${entityType} batch:`, error);
}
}
getOptimalExpansions(entityType, options) {
const expansions = [];
switch (entityType) {
case 'Dokument':
if (options.includeActors) expansions.push('DokumentAktør/Aktør');
if (options.includeActorRoles) expansions.push('DokumentAktør/DokumentAktørRolle');
if (options.includeStatus) expansions.push('Dokumentstatus');
if (options.includeCases) expansions.push('SagDokument/Sag');
if (options.includeFiles) expansions.push('Fil');
break;
case 'Sag':
if (options.includeActors) expansions.push('SagAktør/Aktør');
if (options.includeDocuments) expansions.push('SagDokument/Dokument');
if (options.includeSteps) expansions.push('Sagstrin');
break;
case 'Aktør':
if (options.includeDocuments) expansions.push('DokumentAktør/Dokument');
if (options.includeCases) expansions.push('SagAktør/Sag');
break;
}
return expansions;
}
getSelectFields(entityType, options) {
const commonFields = ['id', 'opdateringsdato'];
switch (entityType) {
case 'Dokument':
return [
...commonFields,
'titel', 'dato', 'typeid',
...(options.includeContent ? ['spørgsmålstitel', 'procedurenummer'] : []),
...(options.includeReleaseDate ? ['frigivelsesdato'] : [])
];
case 'Sag':
return [
...commonFields,
'titel', 'oprettelsdato', 'typeid'
];
case 'Aktør':
return [
...commonFields,
'navn', 'typeid'
];
default:
return commonFields;
}
}
getCachedEntity(entityType, entityId) {
const cacheKey = `${entityType}:${entityId}`;
const cached = this.cache.get(cacheKey);
if (cached && (Date.now() - cached.cachedAt) < 300000) { // 5 minute cache
return cached.data;
}
return null;
}
clearCache() {
this.cache.clear();
}
}
// Example: Optimized tracking of related entities
const tracker = new OptimizedMultiEntityTracker();
// Set up result processors
const documentProcessor = (entityType, entities) => {
console.log(`Processed ${entities.length} documents`);
entities.forEach(doc => {
// Process document data
if (doc.DokumentAktør && doc.DokumentAktør.length > 0) {
console.log(`Document ${doc.id} has ${doc.DokumentAktør.length} actor relationships`);
}
});
};
// Start batch processing
tracker.startBatchProcessing(3000); // Process every 3 seconds
// Add documents to tracking queue
const documentsToTrack = [12345, 12346, 12347, 12348];
documentsToTrack.forEach(docId => {
tracker.addToTrackingQueue('Dokument', docId, {
includeActors: true,
includeActorRoles: true,
includeStatus: true,
includeCases: true,
onResults: documentProcessor
});
});
// Add cases to tracking
const casesToTrack = [5001, 5002, 5003];
casesToTrack.forEach(caseId => {
tracker.addToTrackingQueue('Sag', caseId, {
includeDocuments: true,
includeActors: true,
onResults: (entityType, entities) => {
console.log(`Processed ${entities.length} cases`);
}
});
});
// Clean up after processing
setTimeout(() => {
tracker.stopBatchProcessing();
tracker.clearCache();
}, 60000);
Conclusion¶
Document flow tracking through the Danish Parliamentary system provides comprehensive insights into legislative processes, actor relationships, and democratic accountability. The rich relationship data available through junction tables, combined with temporal tracking capabilities, enables sophisticated monitoring systems that can:
- Track document lifecycle from creation to publication
- Monitor actor role changes and communication patterns
- Detect status transitions and process milestones
- Build networks of related documents and cross-references
- Provide real-time alerts on important changes
- Optimize performance for large-scale document analysis
The examples and patterns provided in this guide enable developers to build robust document tracking systems that scale from individual document monitoring to comprehensive parliamentary process analysis, supporting transparency and democratic oversight through technology.