Pagination Strategies¶
Efficient pagination is crucial for accessing the Danish Parliamentary OData API's large datasets, including 96,538+ cases, 18,139+ political actors, and extensive document collections. This guide provides comprehensive strategies for implementing high-performance pagination patterns.
API Pagination Fundamentals¶
Core Limitations¶
The OData API enforces specific pagination constraints that directly impact your implementation strategy:
- Maximum Records per Request: 100 records (hard limit)
- Default Page Size: 100 records when no
$topis specified - Total Dataset Access: Full datasets accessible via pagination
- No Rate Limits: API supports concurrent paginated requests
Critical URL Encoding Requirements¶
All OData parameters must use URL encoding (%24 instead of $):
# Correct - URL encoded parameters
curl "https://oda.ft.dk/api/Sag?%24top=50&%24skip=100"
# L Incorrect - Will be ignored
curl "https://oda.ft.dk/api/Sag?\$top=50&\$skip=100"
Skip-Based Pagination¶
Skip-based pagination uses $skip and $top parameters for offset-based data retrieval. This is the primary pagination method supported by the API.
Basic Implementation¶
import requests
import json
from typing import List, Dict, Optional, Generator
import time
class ParliamentaryAPI:
BASE_URL = "https://oda.ft.dk/api"
MAX_PAGE_SIZE = 100
def __init__(self):
self.session = requests.Session()
self.session.headers.update({
'User-Agent': 'Parliamentary-Data-Client/1.0',
'Accept': 'application/json'
})
def paginate_entity(self, entity_name: str, page_size: int = 100,
max_records: Optional[int] = None,
filter_expr: Optional[str] = None,
select_fields: Optional[str] = None,
expand_relations: Optional[str] = None,
order_by: Optional[str] = None) -> Generator[Dict, None, None]:
"""
Paginate through all records of a specific entity.
Args:
entity_name: API entity name (e.g., 'Sag', 'Aktør')
page_size: Records per page (1-100)
max_records: Maximum total records to retrieve
filter_expr: OData $filter expression
select_fields: OData $select fields
expand_relations: OData $expand relations
order_by: OData $orderby expression
Yields:
Individual record dictionaries
"""
page_size = min(page_size, self.MAX_PAGE_SIZE)
skip = 0
total_retrieved = 0
while True:
# Build query parameters
params = {
'$top': str(page_size),
'$skip': str(skip)
}
if filter_expr:
params['$filter'] = filter_expr
if select_fields:
params['$select'] = select_fields
if expand_relations:
params['$expand'] = expand_relations
if order_by:
params['$orderby'] = order_by
# Make request
url = f"{self.BASE_URL}/{entity_name}"
response = self.session.get(url, params=params)
if response.status_code != 200:
raise Exception(f"API error: {response.status_code} - {response.text}")
data = response.json()
records = data.get('value', [])
# Stop if no more records
if not records:
break
# Yield individual records
for record in records:
if max_records and total_retrieved >= max_records:
return
yield record
total_retrieved += 1
# If we got fewer records than requested, we've reached the end
if len(records) < page_size:
break
skip += page_size
# Example usage: Get all cases with climate-related topics
api = ParliamentaryAPI()
climate_cases = []
for case in api.paginate_entity(
entity_name='Sag',
filter_expr="substringof('klima', titel) or substringof('miljø', titel)",
select_fields='id,titel,opdateringsdato,resume',
max_records=500
):
climate_cases.append(case)
print(f"Retrieved case {case['id']}: {case['titel'][:50]}...")
print(f"Total climate cases retrieved: {len(climate_cases)}")
JavaScript/TypeScript Implementation¶
class ParliamentaryAPI {
constructor() {
this.baseUrl = 'https://oda.ft.dk/api';
this.maxPageSize = 100;
}
async* paginateEntity(entityName, options = {}) {
const {
pageSize = 100,
maxRecords = null,
filter = null,
select = null,
expand = null,
orderBy = null
} = options;
const actualPageSize = Math.min(pageSize, this.maxPageSize);
let skip = 0;
let totalRetrieved = 0;
while (true) {
const params = new URLSearchParams({
'$top': actualPageSize.toString(),
'$skip': skip.toString()
});
if (filter) params.append('$filter', filter);
if (select) params.append('$select', select);
if (expand) params.append('$expand', expand);
if (orderBy) params.append('$orderby', orderBy);
const url = `${this.baseUrl}/${entityName}?${params}`;
try {
const response = await fetch(url);
if (!response.ok) {
throw new Error(`API error: ${response.status} - ${await response.text()}`);
}
const data = await response.json();
const records = data.value || [];
if (records.length === 0) break;
for (const record of records) {
if (maxRecords && totalRetrieved >= maxRecords) return;
yield record;
totalRetrieved++;
}
if (records.length < actualPageSize) break;
skip += actualPageSize;
} catch (error) {
console.error(`Pagination error at skip=${skip}:`, error);
throw error;
}
}
}
// Collect all records into an array
async getAllRecords(entityName, options = {}) {
const records = [];
for await (const record of this.paginateEntity(entityName, options)) {
records.push(record);
}
return records;
}
// Get total count without retrieving all records
async getTotalCount(entityName, filter = null) {
const params = new URLSearchParams({
'$top': '1',
'$inlinecount': 'allpages'
});
if (filter) params.append('$filter', filter);
const url = `${this.baseUrl}/${entityName}?${params}`;
const response = await fetch(url);
if (!response.ok) {
throw new Error(`API error: ${response.status}`);
}
const data = await response.json();
return parseInt(data['odata.count']) || 0;
}
}
// Example usage
async function analyzeVotingPatterns() {
const api = new ParliamentaryAPI();
// Get total count first for progress tracking
const totalVotes = await api.getTotalCount('Afstemning');
console.log(`Processing ${totalVotes} voting records...`);
let processed = 0;
const votingPatterns = new Map();
for await (const vote of api.paginateEntity('Afstemning', {
expand: 'Stemme/Aktør',
orderBy: 'opdateringsdato desc'
})) {
// Process vote data
processed++;
if (processed % 100 === 0) {
console.log(`Progress: ${processed}/${totalVotes} (${(processed/totalVotes*100).toFixed(1)}%)`);
}
}
return votingPatterns;
}
Parallel Pagination¶
For large datasets, parallel pagination can significantly improve performance by processing multiple page ranges simultaneously.
Python Parallel Implementation¶
import asyncio
import aiohttp
from typing import List, Dict, Optional
import math
from concurrent.futures import ThreadPoolExecutor, as_completed
class ParallelParliamentaryAPI:
def __init__(self, max_workers: int = 5):
self.base_url = "https://oda.ft.dk/api"
self.max_page_size = 100
self.max_workers = max_workers
async def get_page_async(self, session: aiohttp.ClientSession,
entity_name: str, skip: int, top: int,
**kwargs) -> List[Dict]:
"""Fetch a single page asynchronously."""
params = {
'$top': str(top),
'$skip': str(skip)
}
params.update(kwargs)
url = f"{self.base_url}/{entity_name}"
async with session.get(url, params=params) as response:
if response.status != 200:
raise Exception(f"API error: {response.status}")
data = await response.json()
return data.get('value', [])
async def get_total_count(self, entity_name: str,
filter_expr: Optional[str] = None) -> int:
"""Get total record count."""
params = {
'$top': '1',
'$inlinecount': 'allpages'
}
if filter_expr:
params['$filter'] = filter_expr
async with aiohttp.ClientSession() as session:
url = f"{self.base_url}/{entity_name}"
async with session.get(url, params=params) as response:
if response.status != 200:
raise Exception(f"API error: {response.status}")
data = await response.json()
return int(data.get('odata.count', 0))
async def paginate_parallel(self, entity_name: str,
total_records: Optional[int] = None,
page_size: int = 100,
max_records: Optional[int] = None,
**kwargs) -> List[Dict]:
"""
Paginate using parallel requests for maximum performance.
Args:
entity_name: API entity name
total_records: Known total count (saves API call)
page_size: Records per page
max_records: Maximum records to retrieve
**kwargs: Additional OData parameters
Returns:
List of all retrieved records
"""
# Get total count if not provided
if total_records is None:
total_records = await self.get_total_count(entity_name,
kwargs.get('$filter'))
if max_records:
total_records = min(total_records, max_records)
# Calculate page ranges
actual_page_size = min(page_size, self.max_page_size)
total_pages = math.ceil(total_records / actual_page_size)
# Create semaphore to limit concurrent requests
semaphore = asyncio.Semaphore(self.max_workers)
async def fetch_with_semaphore(session, skip):
async with semaphore:
return await self.get_page_async(
session, entity_name, skip, actual_page_size, **kwargs
)
# Execute parallel requests
all_records = []
async with aiohttp.ClientSession() as session:
tasks = []
for page in range(total_pages):
skip = page * actual_page_size
if skip >= total_records:
break
task = fetch_with_semaphore(session, skip)
tasks.append(task)
# Process results as they complete
for page_records in await asyncio.gather(*tasks):
all_records.extend(page_records)
if len(all_records) >= total_records:
break
# Ensure we don't exceed max_records
if max_records and len(all_records) > max_records:
all_records = all_records[:max_records]
return all_records
# Example usage
async def analyze_all_legislation():
api = ParallelParliamentaryAPI(max_workers=8)
# Get all legislation cases in parallel
start_time = time.time()
all_cases = await api.paginate_parallel(
entity_name='Sag',
page_size=100,
**{
'$filter': "year(opdateringsdato) ge 2020",
'$select': 'id,titel,opdateringsdato,resume',
'$orderby': 'opdateringsdato desc'
}
)
elapsed = time.time() - start_time
print(f"Retrieved {len(all_cases)} cases in {elapsed:.2f} seconds")
return all_cases
# Run the parallel analysis
if __name__ == "__main__":
asyncio.run(analyze_all_legislation())
Memory-Efficient Streaming¶
For processing very large datasets (like all 96,538+ cases), memory-efficient streaming prevents memory exhaustion while maintaining performance.
Streaming with Progress Tracking¶
class StreamingParliamentaryAPI:
def __init__(self, batch_size: int = 1000):
self.base_url = "https://oda.ft.dk/api"
self.max_page_size = 100
self.batch_size = batch_size # Process in batches to manage memory
def stream_with_progress(self, entity_name: str,
processor_func,
progress_callback=None,
**kwargs):
"""
Stream records with progress tracking and batch processing.
Args:
entity_name: API entity name
processor_func: Function to process each record
progress_callback: Function called with (current, total, percent)
**kwargs: OData parameters
"""
# Get total count for progress tracking
total_count = self._get_total_count(entity_name, kwargs.get('$filter'))
if progress_callback:
progress_callback(0, total_count, 0.0)
skip = 0
processed = 0
batch_records = []
while skip < total_count:
# Fetch page
params = {
'$top': str(self.max_page_size),
'$skip': str(skip),
**kwargs
}
response = requests.get(f"{self.base_url}/{entity_name}", params=params)
if response.status_code != 200:
raise Exception(f"API error: {response.status_code}")
records = response.json().get('value', [])
if not records:
break
# Add to current batch
batch_records.extend(records)
processed += len(records)
# Process batch when it reaches batch_size or we're done
if len(batch_records) >= self.batch_size or processed >= total_count:
for record in batch_records:
processor_func(record)
# Clear batch to free memory
batch_records.clear()
# Update progress
if progress_callback:
percent = (processed / total_count) * 100
progress_callback(processed, total_count, percent)
skip += len(records)
# If we got fewer records than requested, we're done
if len(records) < self.max_page_size:
break
def _get_total_count(self, entity_name: str, filter_expr: Optional[str] = None) -> int:
"""Get total record count."""
params = {'$top': '1', '$inlinecount': 'allpages'}
if filter_expr:
params['$filter'] = filter_expr
response = requests.get(f"{self.base_url}/{entity_name}", params=params)
if response.status_code != 200:
raise Exception(f"API error: {response.status_code}")
return int(response.json().get('odata.count', 0))
# Example: Process all documents with memory efficiency
def process_all_documents():
api = StreamingParliamentaryAPI(batch_size=500)
# Statistics collection
stats = {
'total_processed': 0,
'document_types': {},
'yearly_counts': {}
}
def document_processor(document):
"""Process individual document record."""
stats['total_processed'] += 1
# Count document types
doc_type = document.get('typeid', 'unknown')
stats['document_types'][doc_type] = stats['document_types'].get(doc_type, 0) + 1
# Count by year
update_date = document.get('opdateringsdato', '')
if update_date:
year = update_date[:4] if len(update_date) >= 4 else 'unknown'
stats['yearly_counts'][year] = stats['yearly_counts'].get(year, 0) + 1
def progress_tracker(current, total, percent):
"""Track processing progress."""
print(f"\rProcessing: {current:,}/{total:,} documents ({percent:.1f}%)", end='', flush=True)
print("Starting document analysis...")
api.stream_with_progress(
entity_name='Dokument',
processor_func=document_processor,
progress_callback=progress_tracker,
**{
'$select': 'id,titel,typeid,opdateringsdato',
'$orderby': 'opdateringsdato desc'
}
)
print("\nDocument analysis complete!")
print(f"Total documents processed: {stats['total_processed']:,}")
print("Document types:", dict(list(stats['document_types'].items())[:5]))
print("Recent years:", dict(list(stats['yearly_counts'].items())[:5]))
if __name__ == "__main__":
process_all_documents()
Cursor-Based Pagination Alternative¶
While the API doesn't natively support cursor-based pagination, you can simulate it using ordered fields for more efficient large dataset traversal.
Timestamp-Based Cursor Pattern¶
from datetime import datetime, timedelta
from typing import Optional
class CursorParliamentaryAPI:
def __init__(self):
self.base_url = "https://oda.ft.dk/api"
self.max_page_size = 100
def paginate_by_timestamp(self, entity_name: str,
timestamp_field: str = 'opdateringsdato',
start_date: Optional[str] = None,
end_date: Optional[str] = None,
page_size: int = 100,
**kwargs):
"""
Paginate using timestamp cursor for consistent results even with data changes.
Args:
entity_name: API entity name
timestamp_field: Field to use as cursor (opdateringsdato, dato, etc.)
start_date: Starting timestamp (ISO format)
end_date: Ending timestamp (ISO format)
page_size: Records per page
**kwargs: Additional OData parameters
Yields:
Individual records in timestamp order
"""
page_size = min(page_size, self.max_page_size)
# Build base filter
filters = []
if start_date:
filters.append(f"{timestamp_field} ge datetime'{start_date}'")
if end_date:
filters.append(f"{timestamp_field} le datetime'{end_date}'")
# Add any additional filters
if '$filter' in kwargs:
filters.append(kwargs['$filter'])
del kwargs['$filter']
base_filter = ' and '.join(filters) if filters else None
last_timestamp = start_date
while True:
# Build current page filter
current_filters = []
if base_filter:
current_filters.append(f"({base_filter})")
if last_timestamp:
current_filters.append(f"{timestamp_field} gt datetime'{last_timestamp}'")
current_filter = ' and '.join(current_filters) if current_filters else None
# Build request parameters
params = {
'$top': str(page_size),
'$orderby': f"{timestamp_field} asc",
**kwargs
}
if current_filter:
params['$filter'] = current_filter
# Make request
response = requests.get(f"{self.base_url}/{entity_name}", params=params)
if response.status_code != 200:
raise Exception(f"API error: {response.status_code}")
records = response.json().get('value', [])
if not records:
break
# Yield records and update cursor
for record in records:
yield record
last_timestamp = record.get(timestamp_field)
# If we got fewer records than requested, we're done
if len(records) < page_size:
break
# Example: Process recent legislation with temporal cursor
def analyze_recent_legislation():
api = CursorParliamentaryAPI()
# Get all cases from 2024 onwards
start_date = "2024-01-01T00:00:00"
recent_cases = []
for case in api.paginate_by_timestamp(
entity_name='Sag',
start_date=start_date,
**{
'$select': 'id,titel,opdateringsdato',
'$filter': "substringof('lov', titel)" # Only legislation
}
):
recent_cases.append(case)
print(f"Found case: {case['titel'][:60]}... ({case['opdateringsdato']})")
print(f"Total recent legislation found: {len(recent_cases)}")
return recent_cases
Error Handling and Recovery¶
Robust pagination requires comprehensive error handling to deal with network issues, rate limits, and data inconsistencies.
Resilient Pagination Implementation¶
import time
from typing import List, Dict, Optional, Callable
import logging
class ResilientParliamentaryAPI:
def __init__(self, max_retries: int = 3, retry_delay: float = 1.0):
self.base_url = "https://oda.ft.dk/api"
self.max_page_size = 100
self.max_retries = max_retries
self.retry_delay = retry_delay
# Setup logging
logging.basicConfig(level=logging.INFO)
self.logger = logging.getLogger(__name__)
def paginate_with_recovery(self, entity_name: str,
recovery_callback: Optional[Callable] = None,
**kwargs):
"""
Paginate with automatic error recovery and progress persistence.
Args:
entity_name: API entity name
recovery_callback: Function called on recoverable errors
**kwargs: OData parameters
"""
skip = 0
consecutive_errors = 0
max_consecutive_errors = 5
while True:
try:
# Attempt to fetch page
records = self._fetch_page_with_retry(entity_name, skip, **kwargs)
if not records:
self.logger.info(f"Pagination complete at skip={skip}")
break
# Reset error counter on success
consecutive_errors = 0
# Yield records
for record in records:
yield record
skip += len(records)
# If we got fewer records than max page size, we're done
if len(records) < self.max_page_size:
break
except Exception as e:
consecutive_errors += 1
self.logger.error(f"Error at skip={skip}: {e}")
if consecutive_errors >= max_consecutive_errors:
self.logger.error("Too many consecutive errors, aborting")
raise
# Call recovery callback if provided
if recovery_callback:
should_continue = recovery_callback(e, skip, consecutive_errors)
if not should_continue:
break
# Wait before retry
time.sleep(self.retry_delay * consecutive_errors)
def _fetch_page_with_retry(self, entity_name: str, skip: int, **kwargs) -> List[Dict]:
"""Fetch a single page with retry logic."""
params = {
'$top': str(self.max_page_size),
'$skip': str(skip),
**kwargs
}
last_error = None
for attempt in range(self.max_retries):
try:
response = requests.get(
f"{self.base_url}/{entity_name}",
params=params,
timeout=30 # 30 second timeout
)
if response.status_code == 200:
return response.json().get('value', [])
elif response.status_code == 429: # Rate limited
wait_time = self.retry_delay * (2 ** attempt)
self.logger.warning(f"Rate limited, waiting {wait_time}s")
time.sleep(wait_time)
continue
else:
raise Exception(f"HTTP {response.status_code}: {response.text}")
except requests.exceptions.RequestException as e:
last_error = e
if attempt < self.max_retries - 1:
wait_time = self.retry_delay * (2 ** attempt)
self.logger.warning(f"Request failed, retrying in {wait_time}s: {e}")
time.sleep(wait_time)
continue
else:
break
raise last_error or Exception("Max retries exceeded")
# Example with progress persistence and recovery
class ProgressTracker:
def __init__(self, checkpoint_file: str = "pagination_checkpoint.txt"):
self.checkpoint_file = checkpoint_file
self.processed_count = 0
self.start_time = time.time()
def load_checkpoint(self) -> int:
"""Load last processed skip value."""
try:
with open(self.checkpoint_file, 'r') as f:
return int(f.read().strip())
except FileNotFoundError:
return 0
def save_checkpoint(self, skip: int):
"""Save current progress."""
with open(self.checkpoint_file, 'w') as f:
f.write(str(skip))
def update_progress(self, records_processed: int, skip: int):
"""Update progress and save checkpoint."""
self.processed_count += records_processed
self.save_checkpoint(skip)
elapsed = time.time() - self.start_time
rate = self.processed_count / elapsed if elapsed > 0 else 0
print(f"Processed: {self.processed_count:,} records, "
f"Skip: {skip:,}, Rate: {rate:.1f} records/sec")
def resilient_data_processing():
api = ResilientParliamentaryAPI()
tracker = ProgressTracker()
# Start from last checkpoint
start_skip = tracker.load_checkpoint()
print(f"Starting from checkpoint: skip={start_skip}")
def recovery_handler(error, skip, consecutive_errors):
"""Handle recoverable errors."""
print(f"Recovery attempt {consecutive_errors} at skip={skip}: {error}")
# Save progress before potential abort
tracker.save_checkpoint(skip)
# Continue unless it's a critical error
return consecutive_errors < 3
# Process all documents with recovery
batch_size = 100
current_batch = []
try:
for i, document in enumerate(api.paginate_with_recovery(
entity_name='Dokument',
recovery_callback=recovery_handler,
**{
'$select': 'id,titel,typeid',
'$skip': str(start_skip) # Resume from checkpoint
}
)):
current_batch.append(document)
# Process in batches
if len(current_batch) >= batch_size:
# Process batch (your custom logic here)
process_document_batch(current_batch)
# Update progress
skip_value = start_skip + (i + 1)
tracker.update_progress(len(current_batch), skip_value)
current_batch.clear()
# Optional: Small delay to be API-friendly
time.sleep(0.1)
except Exception as e:
print(f"Fatal error: {e}")
print(f"Progress saved to checkpoint, can resume later")
raise
finally:
# Process any remaining records in final batch
if current_batch:
process_document_batch(current_batch)
def process_document_batch(documents: List[Dict]):
"""Process a batch of documents."""
for doc in documents:
# Your processing logic here
pass
print(f"Processed batch of {len(documents)} documents")
if __name__ == "__main__":
resilient_data_processing()
Performance Optimization¶
Caching Strategies¶
import redis
import json
import hashlib
from typing import Optional, Dict, Any
class CachedParliamentaryAPI:
def __init__(self, redis_host: str = 'localhost', redis_port: int = 6379,
cache_ttl: int = 3600): # 1 hour cache
self.base_url = "https://oda.ft.dk/api"
self.redis_client = redis.Redis(host=redis_host, port=redis_port, decode_responses=True)
self.cache_ttl = cache_ttl
def _get_cache_key(self, entity_name: str, params: Dict[str, Any]) -> str:
"""Generate consistent cache key."""
# Sort parameters for consistent key generation
param_str = json.dumps(params, sort_keys=True)
key_hash = hashlib.md5(f"{entity_name}:{param_str}".encode()).hexdigest()
return f"parliamentary_api:{key_hash}"
def get_page_cached(self, entity_name: str, skip: int, top: int, **kwargs) -> Optional[List[Dict]]:
"""Get page with caching support."""
params = {'$skip': skip, '$top': top, **kwargs}
cache_key = self._get_cache_key(entity_name, params)
# Try cache first
cached_data = self.redis_client.get(cache_key)
if cached_data:
return json.loads(cached_data)
# Fetch from API
response = requests.get(f"{self.base_url}/{entity_name}", params=params)
if response.status_code != 200:
raise Exception(f"API error: {response.status_code}")
records = response.json().get('value', [])
# Cache the result
self.redis_client.setex(cache_key, self.cache_ttl, json.dumps(records))
return records
def warm_cache(self, entity_name: str, total_records: int, page_size: int = 100):
"""Pre-populate cache with common queries."""
total_pages = math.ceil(total_records / page_size)
print(f"Warming cache for {entity_name}: {total_pages} pages")
for page in range(min(total_pages, 10)): # Warm first 10 pages
skip = page * page_size
self.get_page_cached(entity_name, skip, page_size)
if page % 10 == 0:
print(f"Cache warming: {page}/{total_pages} pages complete")
Query Optimization¶
class OptimizedParliamentaryAPI:
def __init__(self):
self.base_url = "https://oda.ft.dk/api"
self.max_page_size = 100
def get_optimized_fields(self, entity_name: str) -> Dict[str, str]:
"""Return optimized field selections for common entities."""
field_sets = {
'Sag': 'id,titel,opdateringsdato,resume',
'Aktør': 'id,navn,opdateringsdato,typeid',
'Dokument': 'id,titel,dokumenttypeid,opdateringsdato',
'Afstemning': 'id,nummer,opdateringsdato,sagstrinid',
'Møde': 'id,titel,dato,lokale'
}
return field_sets.get(entity_name, '')
def paginate_optimized(self, entity_name: str, analysis_type: str = 'basic', **kwargs):
"""
Optimized pagination with query patterns for specific analysis types.
Args:
entity_name: API entity name
analysis_type: 'basic', 'relationships', 'temporal', 'content'
"""
optimization_configs = {
'basic': {
'select_fields': self.get_optimized_fields(entity_name),
'expand': None
},
'relationships': {
'select_fields': self.get_optimized_fields(entity_name),
'expand': self._get_common_expansions(entity_name)
},
'temporal': {
'select_fields': f"{self.get_optimized_fields(entity_name)},opdateringsdato",
'orderby': 'opdateringsdato desc'
},
'content': {
'select_fields': f"{self.get_optimized_fields(entity_name)},resume,beskrivelse",
'expand': None
}
}
config = optimization_configs.get(analysis_type, optimization_configs['basic'])
# Apply optimization settings
if config['select_fields'] and '$select' not in kwargs:
kwargs['$select'] = config['select_fields']
if config.get('expand') and '$expand' not in kwargs:
kwargs['$expand'] = config['expand']
if config.get('orderby') and '$orderby' not in kwargs:
kwargs['$orderby'] = config['orderby']
# Use standard pagination with optimized parameters
return self.paginate_entity(entity_name, **kwargs)
def _get_common_expansions(self, entity_name: str) -> Optional[str]:
"""Return most commonly needed expansions for each entity."""
common_expansions = {
'Sag': 'Sagskategori',
'Dokument': 'Dokumentkategori',
'Afstemning': 'Sagstrin',
'Møde': 'Periode'
}
return common_expansions.get(entity_name)
Best Practices Summary¶
Production Checklist¶
- Always Use URL Encoding: Use
%24instead of$in all OData parameters - Respect the 100-Record Limit: Never request more than 100 records per page
- Implement Progress Tracking: For long-running operations, show progress and save checkpoints
- Use Appropriate Error Handling: Implement retry logic with exponential backoff
- Optimize Field Selection: Use
$selectto request only needed fields - Cache When Appropriate: Cache frequently accessed pages, especially for reference data
- Consider Memory Usage: Use streaming for large datasets instead of loading everything
- Monitor Performance: Track response times and adjust page sizes accordingly
- Plan for Scale: Use parallel requests for large-scale data analysis
- Handle Data Changes: Be aware that dataset size can change during pagination
Performance Characteristics¶
Based on comprehensive testing of the Danish Parliamentary API:
- Typical Response Time: 85-108ms for standard queries
- Large Dataset Performance: ~2.1s for 10,000 records
- No Rate Limits: API handles concurrent requests well
- Memory Efficiency: Streaming approach prevents memory exhaustion
- Reliability: Consistent performance across all entity types
Common Pitfalls to Avoid¶
- URL Encoding Errors: Forgetting to encode
$as%24 - Exceeding Page Limits: Requesting more than 100 records per page
- Ignoring Total Counts: Not using
$inlinecountfor progress tracking - Memory Issues: Loading entire large datasets into memory
- Poor Error Handling: Not implementing retry logic for network issues
- Cache Misuse: Caching too long for frequently changing data
- Inefficient Queries: Requesting unnecessary fields or expansions
- Sequential Processing: Not leveraging parallel requests for large analyses
This comprehensive pagination strategy guide provides the foundation for efficiently accessing all 96,538+ parliamentary cases and related data through the Danish Parliamentary OData API.