Scaling Production Applications¶
Guide to building scalable applications that consume the Danish Parliamentary API efficiently. Learn proven strategies for handling high-volume data processing and traffic loads.
Overview¶
The Danish Parliament API offers excellent performance characteristics that make it well-suited for large-scale applications. With response times ranging from 85ms for small queries to 2.1 seconds for 10,000 records, proper scaling strategies can help you build robust systems that handle parliamentary data at any scale.
API Performance Baseline¶
Based on comprehensive testing, the API demonstrates:
- Small queries (d100 records): 100-150ms response time
- Medium queries (1,000 records): 300-500ms response time
- Large queries (10,000 records): 2-3 seconds response time
- Complex expansions: 50-100% overhead but eliminate multiple API calls
- Concurrent requests: No observed limits, excellent stability under load
Scaling Strategies Overview¶
1. Request-Level Scaling¶
The API's 100-record limit per request makes pagination essential for large datasets:
import asyncio
import aiohttp
from typing import List, Dict, Any
class ScalableParliamentClient:
def __init__(self, base_url: str = "https://oda.ft.dk/api"):
self.base_url = base_url
self.session = None
async def __aenter__(self):
connector = aiohttp.TCPConnector(limit=100, limit_per_host=20)
timeout = aiohttp.ClientTimeout(total=30, connect=5)
self.session = aiohttp.ClientSession(
connector=connector,
timeout=timeout,
headers={'Accept': 'application/json'}
)
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
if self.session:
await self.session.close()
async def get_all_records(self, entity: str, filters: str = "",
expand: str = "", max_records: int = 10000) -> List[Dict[Any, Any]]:
"""Efficiently retrieve large datasets using pagination"""
all_records = []
skip = 0
batch_size = 100 # API maximum
tasks = []
# Create concurrent requests for better throughput
for offset in range(0, min(max_records, 10000), batch_size):
task = self._fetch_batch(entity, offset, batch_size, filters, expand)
tasks.append(task)
# Process in batches of 10 concurrent requests
if len(tasks) >= 10:
batch_results = await asyncio.gather(*tasks, return_exceptions=True)
all_records.extend(self._process_batch_results(batch_results))
tasks = []
# Process remaining tasks
if tasks:
batch_results = await asyncio.gather(*tasks, return_exceptions=True)
all_records.extend(self._process_batch_results(batch_results))
return all_records[:max_records]
async def _fetch_batch(self, entity: str, skip: int, top: int,
filters: str, expand: str) -> Dict[str, Any]:
"""Fetch a single batch of records"""
params = {
'%24skip': str(skip),
'%24top': str(top)
}
if filters:
params['%24filter'] = filters
if expand:
params['%24expand'] = expand
url = f"{self.base_url}/{entity}"
async with self.session.get(url, params=params) as response:
if response.status == 200:
data = await response.json()
return {
'success': True,
'data': data.get('value', []),
'count': len(data.get('value', []))
}
else:
return {
'success': False,
'error': f"HTTP {response.status}",
'skip': skip
}
def _process_batch_results(self, results: List) -> List[Dict[Any, Any]]:
"""Process batch results and handle errors"""
processed = []
for result in results:
if isinstance(result, Exception):
print(f"Request failed: {result}")
continue
if result.get('success'):
processed.extend(result['data'])
return processed
# Usage example
async def main():
async with ScalableParliamentClient() as client:
# Get all cases from 2023 with actor relationships
cases = await client.get_all_records(
entity="Sag",
filters="startswith(samlingid, '20231')",
expand="SagAktør/Aktør",
max_records=5000
)
print(f"Retrieved {len(cases)} cases")
# Run with: asyncio.run(main())
2. Application-Level Scaling¶
Implement caching and smart request patterns:
import redis
import json
import hashlib
from datetime import datetime, timedelta
from typing import Optional
class CachedParliamentClient:
def __init__(self, redis_url: str = "redis://localhost:6379"):
self.client = ScalableParliamentClient()
self.redis = redis.from_url(redis_url, decode_responses=True)
self.cache_ttl = 3600 # 1 hour cache
def _cache_key(self, entity: str, params: Dict[str, str]) -> str:
"""Generate consistent cache key"""
key_data = f"{entity}:{json.dumps(params, sort_keys=True)}"
return f"parliament_api:{hashlib.md5(key_data.encode()).hexdigest()}"
async def get_cached_or_fetch(self, entity: str, **params) -> Dict[str, Any]:
"""Get data from cache or fetch from API"""
cache_key = self._cache_key(entity, params)
# Try cache first
cached = self.redis.get(cache_key)
if cached:
return json.loads(cached)
# Fetch from API
data = await self.client.get_all_records(entity, **params)
# Cache with expiration
self.redis.setex(
cache_key,
self.cache_ttl,
json.dumps(data, default=str)
)
return data
def invalidate_cache_pattern(self, pattern: str):
"""Invalidate cache entries matching pattern"""
keys = self.redis.keys(f"parliament_api:*{pattern}*")
if keys:
self.redis.delete(*keys)
Horizontal vs Vertical Scaling¶
Horizontal Scaling (Scale Out)¶
Deploy multiple application instances behind a load balancer:
# docker-compose.yml for horizontal scaling
version: '3.8'
services:
nginx:
image: nginx:alpine
ports:
- "80:80"
volumes:
- ./nginx.conf:/etc/nginx/nginx.conf
depends_on:
- app1
- app2
- app3
app1: &app
build: .
environment:
- REDIS_URL=redis://redis:6379
- INSTANCE_ID=1
depends_on:
- redis
app2:
<<: *app
environment:
- REDIS_URL=redis://redis:6379
- INSTANCE_ID=2
app3:
<<: *app
environment:
- REDIS_URL=redis://redis:6379
- INSTANCE_ID=3
redis:
image: redis:alpine
command: redis-server --maxmemory 512mb --maxmemory-policy allkeys-lru
volumes:
- redis_data:/data
volumes:
redis_data:
# nginx.conf - Load balancer configuration
events {
worker_connections 1024;
}
http {
upstream parliament_api_backend {
least_conn;
server app1:8000 weight=1 max_fails=3 fail_timeout=30s;
server app2:8000 weight=1 max_fails=3 fail_timeout=30s;
server app3:8000 weight=1 max_fails=3 fail_timeout=30s;
}
server {
listen 80;
location / {
proxy_pass http://parliament_api_backend;
proxy_set_header Host $host;
proxy_set_header X-Real-IP $remote_addr;
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
# Connection pooling
proxy_http_version 1.1;
proxy_set_header Connection "";
# Timeouts
proxy_connect_timeout 5s;
proxy_send_timeout 30s;
proxy_read_timeout 30s;
}
}
}
Vertical Scaling (Scale Up)¶
Optimize single-instance performance:
# High-performance single-instance configuration
import asyncio
import aiohttp
from concurrent.futures import ThreadPoolExecutor
import multiprocessing
class HighPerformanceParliamentClient:
def __init__(self):
# Optimize connection pool for API characteristics
self.connector = aiohttp.TCPConnector(
limit=200, # Total connection pool size
limit_per_host=50, # Per-host connection limit
keepalive_timeout=30, # Keep connections alive
enable_cleanup_closed=True,
use_dns_cache=True,
ttl_dns_cache=300
)
# Optimize timeouts based on API performance
self.timeout = aiohttp.ClientTimeout(
total=10, # Based on 2.1s max observed + buffer
connect=2, # API responds quickly
sock_read=5
)
# CPU-intensive processing thread pool
self.thread_pool = ThreadPoolExecutor(
max_workers=multiprocessing.cpu_count() * 2
)
async def process_large_dataset(self, entity: str, processor_func):
"""Process large datasets with parallel CPU work"""
async with aiohttp.ClientSession(
connector=self.connector,
timeout=self.timeout
) as session:
# Fetch data with optimal concurrency
semaphore = asyncio.Semaphore(20) # Limit concurrent requests
tasks = []
for skip in range(0, 10000, 100):
task = self._fetch_with_semaphore(
session, semaphore, entity, skip
)
tasks.append(task)
# Gather all data
results = await asyncio.gather(*tasks, return_exceptions=True)
# Process CPU-intensive work in thread pool
loop = asyncio.get_event_loop()
processing_tasks = []
for batch in results:
if isinstance(batch, list):
task = loop.run_in_executor(
self.thread_pool,
processor_func,
batch
)
processing_tasks.append(task)
return await asyncio.gather(*processing_tasks)
async def _fetch_with_semaphore(self, session, semaphore, entity, skip):
async with semaphore:
params = {'%24skip': str(skip), '%24top': '100'}
url = f"https://oda.ft.dk/api/{entity}"
async with session.get(url, params=params) as response:
if response.status == 200:
data = await response.json()
return data.get('value', [])
return []
Load Balancing and Distribution Patterns¶
Geographic Distribution¶
# Multi-region deployment configuration
import random
from typing import Dict, List
class GeographicParliamentClient:
def __init__(self):
# No geographic restrictions observed on API
self.regions = {
'eu-west': {'latency': 50, 'weight': 10},
'eu-central': {'latency': 30, 'weight': 15},
'us-east': {'latency': 150, 'weight': 5},
'asia-pacific': {'latency': 200, 'weight': 3}
}
def select_region(self) -> str:
"""Select optimal region based on latency and load"""
total_weight = sum(r['weight'] for r in self.regions.values())
r = random.uniform(0, total_weight)
current_weight = 0
for region, config in self.regions.items():
current_weight += config['weight']
if r <= current_weight:
return region
return 'eu-central' # Fallback to closest region
# CDN configuration for static content
CLOUDFLARE_CONFIG = {
'zone_settings': {
'browser_cache_ttl': 3600, # 1 hour for API responses
'edge_cache_ttl': 7200, # 2 hours at edge
'cache_level': 'aggressive'
},
'page_rules': [
{
'targets': [{'target': 'url', 'constraint': {'operator': 'matches', 'value': '*/api/*'}}],
'actions': [
{'id': 'cache_level', 'value': 'cache_everything'},
{'id': 'edge_cache_ttl', 'value': 1800} # 30 min for API
]
}
]
}
Request Distribution Strategies¶
# Smart request routing based on API characteristics
from enum import Enum
import asyncio
class QueryComplexity(Enum):
SIMPLE = "simple" # No expansions, small result sets
MEDIUM = "medium" # Some expansions or moderate filtering
COMPLEX = "complex" # Multi-level expansions, large datasets
class SmartParliamentRouter:
def __init__(self):
self.simple_pool = asyncio.Semaphore(50) # High concurrency
self.medium_pool = asyncio.Semaphore(20) # Moderate concurrency
self.complex_pool = asyncio.Semaphore(5) # Limited concurrency
def classify_query(self, entity: str, params: Dict) -> QueryComplexity:
"""Classify query complexity for optimal routing"""
expand = params.get('%24expand', '')
top = int(params.get('%24top', '100'))
filter_clause = params.get('%24filter', '')
# Complex: Multi-level expansions or large datasets
if '/' in expand or top > 50 or len(filter_clause) > 100:
return QueryComplexity.COMPLEX
# Medium: Single-level expansions or moderate filtering
if expand or filter_clause or top > 10:
return QueryComplexity.MEDIUM
# Simple: Basic queries
return QueryComplexity.SIMPLE
async def execute_query(self, entity: str, params: Dict) -> Dict:
"""Execute query with appropriate resource allocation"""
complexity = self.classify_query(entity, params)
if complexity == QueryComplexity.COMPLEX:
async with self.complex_pool:
return await self._execute_with_retries(entity, params, max_retries=3)
elif complexity == QueryComplexity.MEDIUM:
async with self.medium_pool:
return await self._execute_with_retries(entity, params, max_retries=2)
else:
async with self.simple_pool:
return await self._execute_with_retries(entity, params, max_retries=1)
async def _execute_with_retries(self, entity: str, params: Dict, max_retries: int) -> Dict:
"""Execute query with exponential backoff retries"""
for attempt in range(max_retries + 1):
try:
async with aiohttp.ClientSession() as session:
url = f"https://oda.ft.dk/api/{entity}"
async with session.get(url, params=params) as response:
if response.status == 200:
return await response.json()
elif response.status == 429: # Rate limited (unlikely but handle)
await asyncio.sleep(2 ** attempt)
continue
else:
response.raise_for_status()
except Exception as e:
if attempt == max_retries:
raise
await asyncio.sleep(2 ** attempt)
raise Exception(f"Failed to execute query after {max_retries + 1} attempts")
Database Scaling for Cached Data¶
PostgreSQL Scaling Configuration¶
-- Optimized PostgreSQL schema for caching parliamentary data
CREATE DATABASE parliament_cache;
-- Partitioned table for time-series data
CREATE TABLE parliamentary_data (
id BIGSERIAL,
entity_type VARCHAR(50) NOT NULL,
entity_id VARCHAR(100) NOT NULL,
data JSONB NOT NULL,
created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(),
updated_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(),
expires_at TIMESTAMP WITH TIME ZONE
) PARTITION BY RANGE (created_at);
-- Create monthly partitions for efficient data management
CREATE TABLE parliamentary_data_2024_01 PARTITION OF parliamentary_data
FOR VALUES FROM ('2024-01-01') TO ('2024-02-01');
CREATE TABLE parliamentary_data_2024_02 PARTITION OF parliamentary_data
FOR VALUES FROM ('2024-02-01') TO ('2024-03-01');
-- ... continue for all months
-- Indexes for optimal query performance
CREATE INDEX CONCURRENTLY idx_parliamentary_data_entity
ON parliamentary_data (entity_type, entity_id);
CREATE INDEX CONCURRENTLY idx_parliamentary_data_expires
ON parliamentary_data (expires_at) WHERE expires_at IS NOT NULL;
CREATE INDEX CONCURRENTLY idx_parliamentary_data_updated
ON parliamentary_data (updated_at);
-- GIN index for JSONB data queries
CREATE INDEX CONCURRENTLY idx_parliamentary_data_jsonb
ON parliamentary_data USING gin (data);
-- Configuration for high-performance caching
ALTER SYSTEM SET shared_buffers = '2GB';
ALTER SYSTEM SET effective_cache_size = '6GB';
ALTER SYSTEM SET maintenance_work_mem = '512MB';
ALTER SYSTEM SET checkpoint_timeout = '15min';
ALTER SYSTEM SET wal_buffers = '16MB';
ALTER SYSTEM SET default_statistics_target = 500;
SELECT pg_reload_conf();
Redis Clustering for Distributed Cache¶
# Redis cluster configuration for distributed caching
import redis
from rediscluster import RedisCluster
class DistributedParliamentCache:
def __init__(self):
# Redis cluster nodes
startup_nodes = [
{"host": "redis-node-1", "port": 7000},
{"host": "redis-node-2", "port": 7000},
{"host": "redis-node-3", "port": 7000},
{"host": "redis-node-4", "port": 7000},
{"host": "redis-node-5", "port": 7000},
{"host": "redis-node-6", "port": 7000}
]
self.cluster = RedisCluster(
startup_nodes=startup_nodes,
decode_responses=True,
skip_full_coverage_check=True,
health_check_interval=30,
retry_on_timeout=True,
socket_timeout=5,
socket_connect_timeout=5
)
def cache_parliamentary_data(self, entity: str, entity_id: str,
data: Dict, ttl: int = 3600):
"""Cache data with intelligent key distribution"""
# Use consistent hashing for even distribution
key = f"parliament:{entity}:{entity_id}"
# Store main data
self.cluster.setex(key, ttl, json.dumps(data))
# Store metadata for cache management
metadata_key = f"meta:{entity}:{entity_id}"
metadata = {
'cached_at': datetime.utcnow().isoformat(),
'entity_type': entity,
'size_bytes': len(json.dumps(data)),
'ttl': ttl
}
self.cluster.setex(metadata_key, ttl, json.dumps(metadata))
def get_cache_stats(self) -> Dict:
"""Get distributed cache statistics"""
total_keys = 0
total_memory = 0
node_stats = {}
for node in self.cluster.nodes_manager.nodes.values():
info = node.redis_connection.info()
stats = {
'keys': info['db0']['keys'] if 'db0' in info else 0,
'memory_used': info['used_memory'],
'hits': info['keyspace_hits'],
'misses': info['keyspace_misses']
}
node_stats[f"{node.host}:{node.port}"] = stats
total_keys += stats['keys']
total_memory += stats['memory_used']
return {
'total_keys': total_keys,
'total_memory_mb': total_memory / 1024 / 1024,
'nodes': node_stats,
'hit_rate': sum(s['hits'] for s in node_stats.values()) /
(sum(s['hits'] + s['misses'] for s in node_stats.values()) or 1)
}
Queue-Based Processing for High-Volume Operations¶
Celery Task Queue Implementation¶
# celery_tasks.py - Distributed task processing
from celery import Celery, group, chord
from celery.result import AsyncResult
import asyncio
from typing import List
# Celery configuration
app = Celery('parliament_processor')
app.config_from_object({
'broker_url': 'redis://redis-cluster:6379/0',
'result_backend': 'redis://redis-cluster:6379/1',
'task_serializer': 'json',
'accept_content': ['json'],
'result_serializer': 'json',
'timezone': 'Europe/Copenhagen',
'enable_utc': True,
'task_routes': {
'parliament_processor.fetch_entity_batch': {'queue': 'fetch_queue'},
'parliament_processor.process_parliamentary_data': {'queue': 'process_queue'},
'parliament_processor.export_results': {'queue': 'export_queue'}
},
'worker_prefetch_multiplier': 4,
'task_acks_late': True,
'worker_disable_rate_limits': True
})
@app.task(bind=True, max_retries=3, default_retry_delay=60)
def fetch_entity_batch(self, entity: str, skip: int, top: int,
filters: str = "", expand: str = ""):
"""Fetch a batch of parliamentary data"""
try:
client = ScalableParliamentClient()
# Convert to sync for Celery
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
result = loop.run_until_complete(
client._fetch_batch(entity, skip, top, filters, expand)
)
if not result.get('success'):
raise Exception(f"API request failed: {result.get('error')}")
return {
'entity': entity,
'skip': skip,
'data': result['data'],
'count': result['count']
}
except Exception as exc:
raise self.retry(exc=exc, countdown=60 * (2 ** self.request.retries))
@app.task
def process_parliamentary_data(batch_result: Dict) -> Dict:
"""Process a batch of parliamentary data"""
data = batch_result['data']
entity = batch_result['entity']
processed_records = []
for record in data:
# Example: Extract key information based on entity type
if entity == 'Sag':
processed = {
'id': record.get('id'),
'title': record.get('titel'),
'type': record.get('typeid'),
'status': record.get('statusid'),
'period': record.get('samlingid')
}
elif entity == 'Aktør':
processed = {
'id': record.get('id'),
'name': record.get('navn'),
'type': record.get('typeid'),
'party': record.get('gruppenavnkort')
}
else:
processed = record # Pass through unknown entities
processed_records.append(processed)
return {
'entity': entity,
'skip': batch_result['skip'],
'processed_count': len(processed_records),
'records': processed_records
}
@app.task
def export_results(processed_batches: List[Dict], export_format: str = 'json'):
"""Export processed results to final destination"""
all_records = []
for batch in processed_batches:
all_records.extend(batch['records'])
if export_format == 'json':
# Export to JSON file or database
filename = f"export_{datetime.utcnow().strftime('%Y%m%d_%H%M%S')}.json"
with open(f"/exports/{filename}", 'w', encoding='utf-8') as f:
json.dump(all_records, f, ensure_ascii=False, indent=2)
return {'exported_file': filename, 'record_count': len(all_records)}
elif export_format == 'csv':
# Export to CSV format
import pandas as pd
df = pd.DataFrame(all_records)
filename = f"export_{datetime.utcnow().strftime('%Y%m%d_%H%M%S')}.csv"
df.to_csv(f"/exports/{filename}", index=False, encoding='utf-8')
return {'exported_file': filename, 'record_count': len(all_records)}
def process_large_parliamentary_dataset(entity: str, max_records: int = 10000,
filters: str = "", expand: str = ""):
"""Orchestrate large dataset processing using Celery"""
# Create fetch tasks for all batches
batch_size = 100
fetch_tasks = []
for skip in range(0, max_records, batch_size):
task = fetch_entity_batch.s(entity, skip, batch_size, filters, expand)
fetch_tasks.append(task)
# Create processing workflow with Celery canvas
workflow = chord(
group(fetch_tasks), # Parallel fetch operations
export_results.s(export_format='json') # Final export step
)
# Execute workflow
result = workflow.apply_async()
return {
'workflow_id': result.id,
'status': 'started',
'estimated_batches': len(fetch_tasks)
}
def get_workflow_status(workflow_id: str) -> Dict:
"""Get status of a processing workflow"""
result = AsyncResult(workflow_id, app=app)
if result.state == 'PENDING':
return {'status': 'pending', 'progress': 0}
elif result.state == 'PROGRESS':
return {
'status': 'in_progress',
'progress': result.info.get('progress', 0)
}
elif result.state == 'SUCCESS':
return {
'status': 'completed',
'progress': 100,
'result': result.result
}
else:
return {
'status': 'failed',
'error': str(result.info)
}
Apache Kafka for Real-time Processing¶
# kafka_parliament_processor.py - Real-time data streaming
from kafka import KafkaProducer, KafkaConsumer
from kafka.errors import KafkaError
import json
import asyncio
class ParliamentDataStreamer:
def __init__(self, kafka_servers: List[str]):
self.producer = KafkaProducer(
bootstrap_servers=kafka_servers,
value_serializer=lambda x: json.dumps(x, ensure_ascii=False).encode('utf-8'),
key_serializer=lambda x: x.encode('utf-8') if x else None,
acks='all', # Wait for all replicas
retries=5, # Retry failed sends
batch_size=16384, # Batch size for efficiency
linger_ms=10, # Wait up to 10ms to batch
buffer_memory=33554432 # 32MB buffer
)
self.consumer = KafkaConsumer(
bootstrap_servers=kafka_servers,
auto_offset_reset='latest',
enable_auto_commit=True,
group_id='parliament-processors',
value_deserializer=lambda m: json.loads(m.decode('utf-8'))
)
async def stream_parliamentary_updates(self, entities: List[str]):
"""Stream real-time parliamentary data updates"""
client = ScalableParliamentClient()
async with client:
while True:
for entity in entities:
try:
# Fetch latest data
recent_data = await client.get_all_records(
entity=entity,
filters=f"opdateringsdato gt datetime'{datetime.utcnow().isoformat()}'",
max_records=1000
)
# Stream to Kafka
for record in recent_data:
key = f"{entity}:{record.get('id')}"
message = {
'entity': entity,
'action': 'update',
'timestamp': datetime.utcnow().isoformat(),
'data': record
}
future = self.producer.send(
f'parliament-{entity.lower()}',
key=key,
value=message
)
# Handle send result
try:
future.get(timeout=10)
except KafkaError as e:
print(f"Failed to send message: {e}")
except Exception as e:
print(f"Error processing {entity}: {e}")
# Wait before next poll
await asyncio.sleep(300) # 5 minutes
def process_parliament_stream(self, topic: str, processor_func):
"""Process streaming parliamentary data"""
self.consumer.subscribe([topic])
for message in self.consumer:
try:
# Process the message
result = processor_func(message.value)
# Optionally send results to another topic
if result:
self.producer.send(
f"{topic}-processed",
value=result
)
except Exception as e:
print(f"Error processing message: {e}")
# Could implement dead letter queue here
Microservices Architecture Patterns¶
Service Decomposition¶
# microservices/parliament_gateway.py - API Gateway service
from fastapi import FastAPI, HTTPException, Depends
from fastapi.middleware.cors import CORSMiddleware
import httpx
from typing import Dict, List, Optional
app = FastAPI(title="Parliament API Gateway", version="1.0.0")
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["GET"],
allow_headers=["*"],
)
class ServiceRegistry:
def __init__(self):
self.services = {
'cases': 'http://case-service:8000',
'actors': 'http://actor-service:8000',
'votes': 'http://vote-service:8000',
'documents': 'http://document-service:8000',
'cache': 'http://cache-service:8000'
}
def get_service_url(self, service: str) -> str:
return self.services.get(service)
registry = ServiceRegistry()
@app.get("/api/parliament/cases")
async def get_cases(
filters: Optional[str] = None,
expand: Optional[str] = None,
limit: int = 100
):
"""Proxy to case service with load balancing"""
service_url = registry.get_service_url('cases')
params = {'limit': limit}
if filters:
params['filters'] = filters
if expand:
params['expand'] = expand
async with httpx.AsyncClient() as client:
try:
response = await client.get(f"{service_url}/cases", params=params)
response.raise_for_status()
return response.json()
except httpx.RequestError as e:
raise HTTPException(status_code=503, detail=f"Service unavailable: {e}")
@app.get("/api/parliament/actors")
async def get_actors(
actor_type: Optional[str] = None,
party: Optional[str] = None,
limit: int = 100
):
"""Proxy to actor service"""
service_url = registry.get_service_url('actors')
params = {'limit': limit}
if actor_type:
params['type'] = actor_type
if party:
params['party'] = party
async with httpx.AsyncClient() as client:
try:
response = await client.get(f"{service_url}/actors", params=params)
response.raise_for_status()
return response.json()
except httpx.RequestError as e:
raise HTTPException(status_code=503, detail=f"Service unavailable: {e}")
# Circuit breaker pattern
from functools import wraps
import time
class CircuitBreaker:
def __init__(self, failure_threshold: int = 5, reset_timeout: int = 60):
self.failure_threshold = failure_threshold
self.reset_timeout = reset_timeout
self.failure_count = 0
self.last_failure_time = None
self.state = 'CLOSED' # CLOSED, OPEN, HALF_OPEN
def call(self, func, *args, **kwargs):
if self.state == 'OPEN':
if time.time() - self.last_failure_time > self.reset_timeout:
self.state = 'HALF_OPEN'
else:
raise Exception("Circuit breaker is OPEN")
try:
result = func(*args, **kwargs)
self._on_success()
return result
except Exception as e:
self._on_failure()
raise e
def _on_success(self):
self.failure_count = 0
self.state = 'CLOSED'
def _on_failure(self):
self.failure_count += 1
self.last_failure_time = time.time()
if self.failure_count >= self.failure_threshold:
self.state = 'OPEN'
Service-Specific Optimizations¶
# microservices/case_service.py - Specialized case processing service
from fastapi import FastAPI
import asyncio
from typing import List, Dict
app = FastAPI(title="Parliament Case Service")
class CaseService:
def __init__(self):
self.client = ScalableParliamentClient()
self.cache = DistributedParliamentCache()
async def get_cases_with_optimization(self, filters: str = "",
expand: str = "",
limit: int = 100) -> List[Dict]:
"""Optimized case retrieval with caching and preprocessing"""
# Check cache first
cache_key = f"cases:{hashlib.md5(f'{filters}:{expand}:{limit}'.encode()).hexdigest()}"
cached = await self.cache.get(cache_key)
if cached:
return cached
# Fetch from API with optimizations
async with self.client as client:
# Smart batching based on filters
if 'samlingid' in filters and limit <= 100:
# Single request for specific session
cases = await client.get_all_records(
entity="Sag",
filters=filters,
expand=expand,
max_records=limit
)
else:
# Parallel requests for large datasets
cases = await client.get_all_records(
entity="Sag",
filters=filters,
expand=expand,
max_records=limit
)
# Post-process for this service's needs
processed_cases = []
for case in cases:
processed_case = {
'id': case.get('id'),
'title': case.get('titel'),
'summary': case.get('resume'),
'type': case.get('typeid'),
'status': case.get('statusid'),
'session': case.get('samlingid'),
'created_date': case.get('opdateringsdato'),
'actors': self._extract_case_actors(case) if expand else None
}
processed_cases.append(processed_case)
# Cache results
await self.cache.set(cache_key, processed_cases, ttl=1800) # 30 min
return processed_cases[:limit]
def _extract_case_actors(self, case: Dict) -> List[Dict]:
"""Extract and format case actors from expanded data"""
actors = []
if 'SagAktør' in case:
for sag_actor in case['SagAktør']:
if 'Aktør' in sag_actor:
actor = sag_actor['Aktør']
actors.append({
'id': actor.get('id'),
'name': actor.get('navn'),
'role': sag_actor.get('rolleid'),
'type': actor.get('typeid')
})
return actors
case_service = CaseService()
@app.get("/cases")
async def get_cases(
session_id: Optional[str] = None,
case_type: Optional[str] = None,
status: Optional[str] = None,
expand: Optional[str] = None,
limit: int = 100
):
# Build OData filter
filters = []
if session_id:
filters.append(f"samlingid eq '{session_id}'")
if case_type:
filters.append(f"typeid eq {case_type}")
if status:
filters.append(f"statusid eq {status}")
filter_str = " and ".join(filters)
return await case_service.get_cases_with_optimization(
filters=filter_str,
expand=expand or "",
limit=limit
)
Performance Optimization at Scale¶
Connection Pool Optimization¶
# connection_optimization.py - Optimized connection handling
import aiohttp
import asyncio
from aiohttp_retry import RetryClient, ExponentialRetry
import ssl
class OptimizedParliamentClient:
def __init__(self):
# SSL context optimization
self.ssl_context = ssl.create_default_context()
self.ssl_context.set_ciphers('ECDHE+AESGCM:ECDHE+CHACHA20:DHE+AESGCM:DHE+CHACHA20:!aNULL:!MD5:!DSS')
# Connection pool tuning based on API characteristics
self.connector = aiohttp.TCPConnector(
limit=100, # Total pool size
limit_per_host=20, # Per-host limit (API has no rate limiting)
keepalive_timeout=60, # Keep connections alive longer
enable_cleanup_closed=True,
use_dns_cache=True,
ttl_dns_cache=600, # 10-minute DNS cache
ssl_context=self.ssl_context,
resolver=aiohttp.AsyncResolver() # Use async DNS resolution
)
# Timeout optimization for API response patterns
self.timeout = aiohttp.ClientTimeout(
total=15, # 15s total (API max observed: 2.1s + buffer)
connect=3, # 3s connection (API responds quickly)
sock_read=8, # 8s socket read
sock_connect=3 # 3s socket connection
)
# Retry configuration
self.retry_options = ExponentialRetry(
attempts=3,
start_timeout=1,
max_timeout=10,
factor=2.0,
statuses={500, 502, 503, 504} # Retry on server errors
)
async def create_session(self) -> RetryClient:
"""Create optimized HTTP session"""
session = aiohttp.ClientSession(
connector=self.connector,
timeout=self.timeout,
headers={
'User-Agent': 'ParliamentApp/1.0 (+https://yourapp.com)',
'Accept': 'application/json',
'Accept-Encoding': 'gzip, deflate',
'Connection': 'keep-alive'
}
)
return RetryClient(
client_session=session,
retry_options=self.retry_options
)
Query Optimization Patterns¶
# query_optimization.py - Advanced query optimization
from dataclasses import dataclass
from typing import Set, Dict, List, Optional
import re
@dataclass
class QueryOptimizationPlan:
entity: str
filters: List[str]
expansions: List[str]
estimated_cost: int
parallel_requests: List[Dict]
cache_strategy: str
class QueryOptimizer:
def __init__(self):
# Performance characteristics from API testing
self.entity_performance = {
'Sag': {'base_time': 85, 'expansion_cost': 50},
'Aktør': {'base_time': 90, 'expansion_cost': 45},
'Afstemning': {'base_time': 100, 'expansion_cost': 80},
'Stemme': {'base_time': 110, 'expansion_cost': 60},
'Dokument': {'base_time': 95, 'expansion_cost': 40}
}
# High-cardinality filters that benefit from early filtering
self.selective_filters = {
'samlingid': 0.1, # Very selective (specific session)
'statusid': 0.3, # Moderately selective
'typeid': 0.4, # Moderately selective
'id': 0.001 # Extremely selective
}
def optimize_query(self, entity: str, filters: str = "",
expand: str = "", limit: int = 100) -> QueryOptimizationPlan:
"""Create optimal query execution plan"""
# Parse filters and expansions
filter_list = self._parse_filters(filters)
expansion_list = self._parse_expansions(expand)
# Estimate query cost
base_cost = self.entity_performance.get(entity, {}).get('base_time', 100)
expansion_cost = len(expansion_list) * self.entity_performance.get(entity, {}).get('expansion_cost', 50)
filter_cost = self._estimate_filter_cost(filter_list)
total_cost = base_cost + expansion_cost + filter_cost
# Determine if query should be split
if total_cost > 500 or limit > 500: # 500ms threshold
parallel_requests = self._create_parallel_plan(
entity, filter_list, expansion_list, limit
)
cache_strategy = 'distributed'
else:
parallel_requests = [{
'entity': entity,
'filters': filters,
'expand': expand,
'skip': 0,
'top': min(limit, 100)
}]
cache_strategy = 'local'
return QueryOptimizationPlan(
entity=entity,
filters=filter_list,
expansions=expansion_list,
estimated_cost=total_cost,
parallel_requests=parallel_requests,
cache_strategy=cache_strategy
)
def _parse_filters(self, filters: str) -> List[str]:
"""Parse OData filter string"""
if not filters:
return []
# Simple parsing - in production, use proper OData parser
return [f.strip() for f in filters.split(' and ') if f.strip()]
def _parse_expansions(self, expand: str) -> List[str]:
"""Parse OData expansion string"""
if not expand:
return []
return [e.strip() for e in expand.split(',') if e.strip()]
def _estimate_filter_cost(self, filters: List[str]) -> int:
"""Estimate filtering performance cost"""
cost = 0
for filter_expr in filters:
# Extract field name
field_match = re.match(r'(\w+)', filter_expr)
if field_match:
field = field_match.group(1)
selectivity = self.selective_filters.get(field, 0.5)
# More selective filters cost less
cost += int(50 * selectivity)
return cost
def _create_parallel_plan(self, entity: str, filters: List[str],
expansions: List[str], limit: int) -> List[Dict]:
"""Create plan for parallel request execution"""
requests = []
# If we have highly selective filters, use them
selective_filters = [f for f in filters
if any(sf in f for sf in self.selective_filters.keys())]
if selective_filters and limit <= 100:
# Single optimized request
requests.append({
'entity': entity,
'filters': ' and '.join(filters),
'expand': ','.join(expansions),
'skip': 0,
'top': limit
})
else:
# Multiple parallel requests
batch_size = 100
for skip in range(0, min(limit, 1000), batch_size):
requests.append({
'entity': entity,
'filters': ' and '.join(filters),
'expand': ','.join(expansions),
'skip': skip,
'top': min(batch_size, limit - skip)
})
return requests
# Usage
optimizer = QueryOptimizer()
plan = optimizer.optimize_query(
entity="Sag",
filters="samlingid eq '20241' and statusid eq 3",
expand="SagAktør/Aktør",
limit=500
)
Resource Planning and Capacity Management¶
Infrastructure Sizing Guide¶
# infrastructure/parliament-app.yaml - Production deployment sizing
apiVersion: v1
kind: ConfigMap
metadata:
name: parliament-scaling-config
data:
scaling-guide.yaml: |
# Resource requirements based on API performance testing
# Small deployment (< 1,000 requests/day)
small:
api_gateway:
replicas: 2
cpu: "200m"
memory: "256Mi"
max_connections: 50
cache:
type: "redis-single"
memory: "512Mi"
cpu: "100m"
max_connections: 100
database:
type: "postgresql-single"
storage: "10Gi"
cpu: "500m"
memory: "1Gi"
# Medium deployment (1,000 - 10,000 requests/day)
medium:
api_gateway:
replicas: 3
cpu: "500m"
memory: "512Mi"
max_connections: 200
cache:
type: "redis-cluster"
nodes: 3
memory_per_node: "1Gi"
cpu_per_node: "200m"
max_connections: 500
database:
type: "postgresql-ha"
primary:
storage: "50Gi"
cpu: "1"
memory: "4Gi"
replica:
storage: "50Gi"
cpu: "500m"
memory: "2Gi"
# Large deployment (> 10,000 requests/day)
large:
api_gateway:
replicas: 5
cpu: "1"
memory: "1Gi"
max_connections: 500
cache:
type: "redis-cluster"
nodes: 6
memory_per_node: "2Gi"
cpu_per_node: "500m"
max_connections: 2000
database:
type: "postgresql-cluster"
nodes: 3
storage_per_node: "100Gi"
cpu_per_node: "2"
memory_per_node: "8Gi"
workers:
celery_workers: 10
cpu_per_worker: "500m"
memory_per_worker: "1Gi"
---
# Kubernetes deployment with resource limits
apiVersion: apps/v1
kind: Deployment
metadata:
name: parliament-api-gateway
spec:
replicas: 3
selector:
matchLabels:
app: parliament-api-gateway
template:
metadata:
labels:
app: parliament-api-gateway
spec:
containers:
- name: api-gateway
image: parliament-api-gateway:latest
ports:
- containerPort: 8000
resources:
requests:
memory: "512Mi"
cpu: "500m"
limits:
memory: "1Gi"
cpu: "1"
env:
- name: REDIS_URL
value: "redis://redis-cluster:6379"
- name: POSTGRES_URL
valueFrom:
secretKeyRef:
name: db-credentials
key: postgres-url
livenessProbe:
httpGet:
path: /health
port: 8000
initialDelaySeconds: 30
periodSeconds: 10
readinessProbe:
httpGet:
path: /ready
port: 8000
initialDelaySeconds: 5
periodSeconds: 5
Monitoring and Alerting Configuration¶
# monitoring/parliament_metrics.py - Performance monitoring
from prometheus_client import Counter, Histogram, Gauge, start_http_server
import time
import psutil
# Metrics for Parliament API consumption
api_requests_total = Counter(
'parliament_api_requests_total',
'Total API requests made to Parliament API',
['entity', 'status_code', 'cache_hit']
)
api_request_duration = Histogram(
'parliament_api_request_duration_seconds',
'Time spent on Parliament API requests',
['entity', 'complexity'],
buckets=[0.1, 0.25, 0.5, 1.0, 2.5, 5.0, 10.0]
)
cache_hit_ratio = Gauge(
'parliament_cache_hit_ratio',
'Cache hit ratio for Parliament API responses'
)
active_connections = Gauge(
'parliament_active_connections',
'Active connections to Parliament API'
)
queue_size = Gauge(
'parliament_processing_queue_size',
'Size of processing queue',
['queue_name']
)
class ParliamentMetricsCollector:
def __init__(self):
self.start_time = time.time()
self.cache_hits = 0
self.cache_misses = 0
def record_api_request(self, entity: str, duration: float,
status_code: int, cache_hit: bool):
"""Record API request metrics"""
# Classify request complexity
complexity = 'simple'
if duration > 1.0:
complexity = 'complex'
elif duration > 0.5:
complexity = 'medium'
# Record metrics
api_requests_total.labels(
entity=entity,
status_code=str(status_code),
cache_hit=str(cache_hit)
).inc()
api_request_duration.labels(
entity=entity,
complexity=complexity
).observe(duration)
# Update cache metrics
if cache_hit:
self.cache_hits += 1
else:
self.cache_misses += 1
total_requests = self.cache_hits + self.cache_misses
if total_requests > 0:
cache_hit_ratio.set(self.cache_hits / total_requests)
def record_system_metrics(self):
"""Record system resource metrics"""
# CPU and memory usage
cpu_percent = psutil.cpu_percent()
memory = psutil.virtual_memory()
# Network connections
connections = len(psutil.net_connections())
active_connections.set(connections)
def record_queue_metrics(self, queue_name: str, size: int):
"""Record processing queue metrics"""
queue_size.labels(queue_name=queue_name).set(size)
# Start Prometheus metrics server
metrics_collector = ParliamentMetricsCollector()
start_http_server(8080) # Metrics available at :8080/metrics
Capacity Planning Calculator¶
# capacity_planning.py - Resource requirement calculator
from dataclasses import dataclass
from typing import Dict, List
import math
@dataclass
class WorkloadProfile:
daily_requests: int
peak_multiplier: float = 3.0 # Peak traffic is 3x average
avg_response_size_kb: int = 50
cache_hit_ratio: float = 0.7
expansion_ratio: float = 0.3 # 30% of requests use expansions
complex_query_ratio: float = 0.1 # 10% complex queries
@dataclass
class InfrastructureRecommendation:
api_gateway_replicas: int
api_gateway_cpu: str
api_gateway_memory: str
cache_memory_gb: int
cache_nodes: int
db_storage_gb: int
db_cpu: int
db_memory_gb: int
estimated_monthly_cost_usd: float
class ParliamentCapacityPlanner:
def __init__(self):
# Resource costs per unit (example AWS pricing)
self.costs = {
'api_gateway_replica_month': 50, # $50/month per replica
'cache_gb_month': 30, # $30/month per GB cache
'db_cpu_month': 100, # $100/month per CPU
'db_storage_gb_month': 0.5, # $0.50/month per GB storage
'data_transfer_gb': 0.09 # $0.09 per GB data transfer
}
# Performance baselines from API testing
self.performance = {
'requests_per_replica_per_second': 50, # Conservative estimate
'cache_memory_per_1k_objects_mb': 100, # 100MB per 1K cached objects
'db_growth_per_1k_requests_mb': 10 # 10MB DB growth per 1K requests
}
def calculate_requirements(self, workload: WorkloadProfile) -> InfrastructureRecommendation:
"""Calculate infrastructure requirements for given workload"""
# Calculate peak requests per second
daily_requests = workload.daily_requests
peak_rps = (daily_requests * workload.peak_multiplier) / (24 * 3600)
# API Gateway sizing
api_replicas = max(2, math.ceil(peak_rps / self.performance['requests_per_replica_per_second']))
api_cpu = self._calculate_api_cpu(peak_rps, workload)
api_memory = self._calculate_api_memory(peak_rps, workload)
# Cache sizing
cache_objects = daily_requests * (1 - workload.cache_hit_ratio)
cache_memory_gb = math.ceil(
(cache_objects / 1000) * self.performance['cache_memory_per_1k_objects_mb'] / 1024
)
cache_nodes = max(1, math.ceil(cache_memory_gb / 4)) # 4GB per node max
# Database sizing
db_storage_gb = max(10, math.ceil(
(daily_requests * 30) / 1000 * self.performance['db_growth_per_1k_requests_mb'] / 1024
))
db_cpu = max(1, math.ceil(peak_rps / 100)) # 100 RPS per CPU
db_memory_gb = max(2, db_cpu * 2) # 2GB RAM per CPU
# Cost calculation
monthly_cost = self._calculate_monthly_cost(
api_replicas, cache_memory_gb, cache_nodes,
db_cpu, db_storage_gb, workload
)
return InfrastructureRecommendation(
api_gateway_replicas=api_replicas,
api_gateway_cpu=api_cpu,
api_gateway_memory=api_memory,
cache_memory_gb=cache_memory_gb,
cache_nodes=cache_nodes,
db_storage_gb=db_storage_gb,
db_cpu=db_cpu,
db_memory_gb=db_memory_gb,
estimated_monthly_cost_usd=monthly_cost
)
def _calculate_api_cpu(self, peak_rps: float, workload: WorkloadProfile) -> str:
"""Calculate API gateway CPU requirements"""
base_cpu = max(0.5, peak_rps / 100) # 100 RPS per 0.5 CPU
# Add overhead for expansions and complex queries
expansion_overhead = workload.expansion_ratio * 0.5
complex_overhead = workload.complex_query_ratio * 1.0
total_cpu = base_cpu * (1 + expansion_overhead + complex_overhead)
# Round to standard CPU sizes
if total_cpu <= 0.5:
return "500m"
elif total_cpu <= 1.0:
return "1"
elif total_cpu <= 2.0:
return "2"
else:
return f"{math.ceil(total_cpu)}"
def _calculate_api_memory(self, peak_rps: float, workload: WorkloadProfile) -> str:
"""Calculate API gateway memory requirements"""
base_memory = max(512, peak_rps * 10) # 10MB per RPS
# Add buffer for connection pooling and caching
buffer_memory = base_memory * 0.5
total_memory_mb = int(base_memory + buffer_memory)
# Round to standard memory sizes
if total_memory_mb <= 512:
return "512Mi"
elif total_memory_mb <= 1024:
return "1Gi"
elif total_memory_mb <= 2048:
return "2Gi"
else:
return f"{math.ceil(total_memory_mb / 1024)}Gi"
def _calculate_monthly_cost(self, api_replicas: int, cache_memory_gb: int,
cache_nodes: int, db_cpu: int, db_storage_gb: int,
workload: WorkloadProfile) -> float:
"""Calculate estimated monthly infrastructure cost"""
# API Gateway cost
api_cost = api_replicas * self.costs['api_gateway_replica_month']
# Cache cost
cache_cost = cache_memory_gb * self.costs['cache_gb_month']
# Database cost
db_cost = (db_cpu * self.costs['db_cpu_month'] +
db_storage_gb * self.costs['db_storage_gb_month'])
# Data transfer cost (estimate)
monthly_data_gb = (workload.daily_requests * 30 *
workload.avg_response_size_kb / 1024)
data_transfer_cost = monthly_data_gb * self.costs['data_transfer_gb']
return api_cost + cache_cost + db_cost + data_transfer_cost
# Usage example
planner = ParliamentCapacityPlanner()
# Small application
small_workload = WorkloadProfile(daily_requests=1000)
small_req = planner.calculate_requirements(small_workload)
print(f"Small app: {small_req.api_gateway_replicas} replicas, ${small_req.estimated_monthly_cost_usd:.2f}/month")
# Large application
large_workload = WorkloadProfile(
daily_requests=50000,
peak_multiplier=5.0,
cache_hit_ratio=0.8,
expansion_ratio=0.4
)
large_req = planner.calculate_requirements(large_workload)
print(f"Large app: {large_req.api_gateway_replicas} replicas, ${large_req.estimated_monthly_cost_usd:.2f}/month")
Auto-scaling and Elastic Infrastructure¶
Kubernetes HPA Configuration¶
# k8s/hpa-parliament-api.yaml - Horizontal Pod Autoscaler
apiVersion: autoscaling/v2
kind: HorizontalPodAutoscaler
metadata:
name: parliament-api-hpa
spec:
scaleTargetRef:
apiVersion: apps/v1
kind: Deployment
name: parliament-api-gateway
minReplicas: 2
maxReplicas: 20
metrics:
# Scale based on CPU utilization
- type: Resource
resource:
name: cpu
target:
type: Utilization
averageUtilization: 70
# Scale based on memory utilization
- type: Resource
resource:
name: memory
target:
type: Utilization
averageUtilization: 80
# Scale based on custom metrics (requests per second)
- type: Object
object:
metric:
name: parliament_requests_per_second
target:
type: AverageValue
averageValue: "100"
describedObject:
apiVersion: v1
kind: Service
name: parliament-api-service
behavior:
scaleDown:
stabilizationWindowSeconds: 300 # 5 minutes
policies:
- type: Percent
value: 50
periodSeconds: 60
scaleUp:
stabilizationWindowSeconds: 60 # 1 minute
policies:
- type: Percent
value: 100
periodSeconds: 15
- type: Pods
value: 4
periodSeconds: 15
selectPolicy: Max
---
# VPA for vertical scaling recommendations
apiVersion: autoscaling.k8s.io/v1
kind: VerticalPodAutoscaler
metadata:
name: parliament-api-vpa
spec:
targetRef:
apiVersion: apps/v1
kind: Deployment
name: parliament-api-gateway
updatePolicy:
updateMode: "Auto"
resourcePolicy:
containerPolicies:
- containerName: api-gateway
maxAllowed:
cpu: "4"
memory: "8Gi"
minAllowed:
cpu: "100m"
memory: "128Mi"
controlledResources: ["cpu", "memory"]
Custom Auto-scaling Logic¶
# auto_scaling.py - Custom auto-scaling controller
import asyncio
import kubernetes
from kubernetes import client, config
import numpy as np
from datetime import datetime, timedelta
class ParliamentAutoScaler:
def __init__(self):
config.load_incluster_config() # Load from pod
self.v1 = client.AppsV1Api()
self.custom_api = client.CustomObjectsApi()
self.metrics_api = client.MetricsV1beta1Api()
# Scaling parameters
self.namespace = 'default'
self.deployment_name = 'parliament-api-gateway'
self.min_replicas = 2
self.max_replicas = 50
# Performance thresholds
self.cpu_target = 70 # 70% CPU utilization
self.memory_target = 80 # 80% memory utilization
self.response_time_target = 500 # 500ms target response time
async def monitor_and_scale(self):
"""Main monitoring and scaling loop"""
while True:
try:
# Get current metrics
current_replicas = await self._get_current_replicas()
cpu_usage = await self._get_cpu_usage()
memory_usage = await self._get_memory_usage()
response_time = await self._get_avg_response_time()
request_rate = await self._get_request_rate()
# Calculate desired replicas
desired_replicas = self._calculate_desired_replicas(
current_replicas, cpu_usage, memory_usage,
response_time, request_rate
)
# Apply scaling decision
if desired_replicas != current_replicas:
await self._scale_deployment(desired_replicas)
print(f"Scaled from {current_replicas} to {desired_replicas} replicas")
except Exception as e:
print(f"Auto-scaling error: {e}")
await asyncio.sleep(30) # Check every 30 seconds
def _calculate_desired_replicas(self, current_replicas: int,
cpu_usage: float, memory_usage: float,
response_time: float, request_rate: float) -> int:
"""Calculate desired replica count based on multiple metrics"""
scaling_factors = []
# CPU-based scaling
if cpu_usage > 0:
cpu_factor = cpu_usage / self.cpu_target
scaling_factors.append(cpu_factor)
# Memory-based scaling
if memory_usage > 0:
memory_factor = memory_usage / self.memory_target
scaling_factors.append(memory_factor)
# Response time-based scaling
if response_time > 0:
response_factor = response_time / self.response_time_target
scaling_factors.append(response_factor)
# Request rate-based scaling (predictive)
if request_rate > 0:
# Scale up proactively if request rate is increasing
rate_factor = request_rate / (current_replicas * 50) # 50 RPS per replica
scaling_factors.append(rate_factor)
# Use the maximum scaling factor
max_factor = max(scaling_factors) if scaling_factors else 1.0
# Apply dampening to avoid oscillation
if max_factor > 1.2: # Scale up aggressively
desired = math.ceil(current_replicas * min(max_factor, 2.0))
elif max_factor < 0.8: # Scale down conservatively
desired = max(current_replicas - 1, math.ceil(current_replicas * 0.8))
else:
desired = current_replicas # No change needed
# Enforce limits
return max(self.min_replicas, min(desired, self.max_replicas))
async def _get_current_replicas(self) -> int:
"""Get current number of replicas"""
deployment = self.v1.read_namespaced_deployment(
name=self.deployment_name,
namespace=self.namespace
)
return deployment.spec.replicas
async def _get_cpu_usage(self) -> float:
"""Get average CPU usage percentage"""
try:
pod_metrics = self.metrics_api.list_namespaced_pod_metrics(self.namespace)
cpu_values = []
for pod in pod_metrics.items:
if self.deployment_name in pod.metadata.name:
for container in pod.containers:
cpu_value = container.usage['cpu']
# Convert from nanocores to percentage
cpu_cores = int(cpu_value.replace('n', '')) / 1_000_000_000
cpu_values.append(cpu_cores * 100) # Assume 1 CPU limit
return np.mean(cpu_values) if cpu_values else 0
except Exception:
return 0
async def _get_memory_usage(self) -> float:
"""Get average memory usage percentage"""
try:
pod_metrics = self.metrics_api.list_namespaced_pod_metrics(self.namespace)
memory_values = []
for pod in pod_metrics.items:
if self.deployment_name in pod.metadata.name:
for container in pod.containers:
memory_value = container.usage['memory']
# Convert from Ki to percentage (assume 1Gi limit)
memory_ki = int(memory_value.replace('Ki', ''))
memory_percentage = (memory_ki / 1024) / 10.24 # 1Gi = 1024Mi
memory_values.append(memory_percentage)
return np.mean(memory_values) if memory_values else 0
except Exception:
return 0
async def _scale_deployment(self, desired_replicas: int):
"""Scale the deployment to desired replica count"""
body = {'spec': {'replicas': desired_replicas}}
self.v1.patch_namespaced_deployment_scale(
name=self.deployment_name,
namespace=self.namespace,
body=body
)
# Run auto-scaler
scaler = ParliamentAutoScaler()
asyncio.run(scaler.monitor_and_scale())
Cost Optimization for Large-Scale Deployments¶
Cloud Cost Management¶
# cost_optimization.py - Cost-aware scaling and resource management
from dataclasses import dataclass
from typing import Dict, List
import boto3
from datetime import datetime, timedelta
@dataclass
class CostOptimizationPlan:
current_monthly_cost: float
optimized_monthly_cost: float
savings: float
recommendations: List[str]
class ParliamentCostOptimizer:
def __init__(self):
# AWS clients
self.ec2 = boto3.client('ec2')
self.rds = boto3.client('rds')
self.elasticache = boto3.client('elasticache')
self.cloudwatch = boto3.client('cloudwatch')
# Cost per hour by instance type (example AWS pricing)
self.instance_costs = {
't3.micro': 0.0104,
't3.small': 0.0208,
't3.medium': 0.0416,
't3.large': 0.0832,
'm5.large': 0.096,
'm5.xlarge': 0.192,
'm5.2xlarge': 0.384,
'r5.large': 0.126,
'r5.xlarge': 0.252
}
# Reserved instance discounts
self.ri_discounts = {
'1year_no_upfront': 0.25,
'1year_partial': 0.35,
'3year_partial': 0.50
}
async def analyze_current_costs(self) -> Dict[str, float]:
"""Analyze current infrastructure costs"""
costs = {}
# EC2 instances
ec2_response = self.ec2.describe_instances(
Filters=[{'Name': 'tag:Application', 'Values': ['parliament-api']}]
)
ec2_cost = 0
for reservation in ec2_response['Reservations']:
for instance in reservation['Instances']:
if instance['State']['Name'] == 'running':
instance_type = instance['InstanceType']
hourly_cost = self.instance_costs.get(instance_type, 0.1)
ec2_cost += hourly_cost * 24 * 30 # Monthly cost
costs['ec2'] = ec2_cost
# RDS instances
rds_response = self.rds.describe_db_instances()
rds_cost = 0
for db in rds_response['DBInstances']:
if db['DBInstanceStatus'] == 'available':
# Simplified cost calculation
rds_cost += 0.15 * 24 * 30 # $0.15/hour example
costs['rds'] = rds_cost
# ElastiCache
cache_response = self.elasticache.describe_cache_clusters()
cache_cost = 0
for cluster in cache_response['CacheClusters']:
if cluster['CacheClusterStatus'] == 'available':
cache_cost += 0.05 * 24 * 30 # $0.05/hour example
costs['cache'] = cache_cost
return costs
async def recommend_optimizations(self) -> CostOptimizationPlan:
"""Recommend cost optimizations based on usage patterns"""
current_costs = await self.analyze_current_costs()
current_monthly = sum(current_costs.values())
recommendations = []
potential_savings = 0
# Right-sizing recommendations
oversized_instances = await self._find_oversized_instances()
for instance_id, recommendation in oversized_instances.items():
current_cost = self.instance_costs[recommendation['current_type']] * 24 * 30
new_cost = self.instance_costs[recommendation['recommended_type']] * 24 * 30
savings = current_cost - new_cost
potential_savings += savings
recommendations.append(
f"Downsize {instance_id} from {recommendation['current_type']} "
f"to {recommendation['recommended_type']} (${savings:.2f}/month savings)"
)
# Reserved Instance recommendations
ri_savings = current_monthly * self.ri_discounts['1year_partial']
potential_savings += ri_savings
recommendations.append(
f"Purchase 1-year partial upfront RIs (${ri_savings:.2f}/month savings)"
)
# Auto-scaling recommendations
if await self._has_consistent_low_usage():
autoscale_savings = current_monthly * 0.2 # 20% savings
potential_savings += autoscale_savings
recommendations.append(
f"Implement auto-scaling during off-hours (${autoscale_savings:.2f}/month savings)"
)
# Spot instance recommendations for non-critical workloads
spot_savings = current_costs.get('ec2', 0) * 0.7 # 70% savings on spot
potential_savings += spot_savings
recommendations.append(
f"Use spot instances for batch processing (${spot_savings:.2f}/month savings)"
)
return CostOptimizationPlan(
current_monthly_cost=current_monthly,
optimized_monthly_cost=current_monthly - potential_savings,
savings=potential_savings,
recommendations=recommendations
)
async def _find_oversized_instances(self) -> Dict[str, Dict]:
"""Find EC2 instances that are oversized based on CloudWatch metrics"""
oversized = {}
# Get CPU utilization for the past 7 days
end_time = datetime.utcnow()
start_time = end_time - timedelta(days=7)
instances = self.ec2.describe_instances()
for reservation in instances['Reservations']:
for instance in reservation['Instances']:
if instance['State']['Name'] != 'running':
continue
instance_id = instance['InstanceId']
current_type = instance['InstanceType']
# Get CPU metrics
cpu_response = self.cloudwatch.get_metric_statistics(
Namespace='AWS/EC2',
MetricName='CPUUtilization',
Dimensions=[{'Name': 'InstanceId', 'Value': instance_id}],
StartTime=start_time,
EndTime=end_time,
Period=3600,
Statistics=['Average']
)
if cpu_response['Datapoints']:
avg_cpu = np.mean([dp['Average'] for dp in cpu_response['Datapoints']])
# If consistently low CPU usage, recommend smaller instance
if avg_cpu < 20:
recommended_type = self._get_smaller_instance_type(current_type)
if recommended_type:
oversized[instance_id] = {
'current_type': current_type,
'recommended_type': recommended_type,
'avg_cpu': avg_cpu
}
return oversized
def _get_smaller_instance_type(self, current_type: str) -> str:
"""Get a smaller instance type recommendation"""
size_progression = {
't3.large': 't3.medium',
't3.medium': 't3.small',
't3.small': 't3.micro',
'm5.2xlarge': 'm5.xlarge',
'm5.xlarge': 'm5.large',
'm5.large': 't3.large',
'r5.xlarge': 'r5.large',
'r5.large': 'm5.large'
}
return size_progression.get(current_type)
async def _has_consistent_low_usage(self) -> bool:
"""Check if the application has consistent low usage periods"""
# Analyze request patterns over the past week
# This would integrate with your application metrics
# For demo purposes, return True
return True
# Usage
optimizer = ParliamentCostOptimizer()
plan = await optimizer.recommend_optimizations()
print(f"Current cost: ${plan.current_monthly_cost:.2f}/month")
print(f"Optimized cost: ${plan.optimized_monthly_cost:.2f}/month")
print(f"Potential savings: ${plan.savings:.2f}/month")
Resource Scheduling and Lifecycle Management¶
# resource_scheduler.py - Automated resource lifecycle management
import asyncio
from datetime import datetime, time, timezone
import kubernetes
from kubernetes import client
class ParliamentResourceScheduler:
def __init__(self):
self.v1 = client.AppsV1Api()
self.namespace = 'default'
# Business hours configuration (Copenhagen time)
self.business_hours = {
'start': time(8, 0), # 8 AM
'end': time(18, 0), # 6 PM
'weekdays_only': True
}
# Scaling schedules
self.schedules = {
'business_hours': {
'replicas': 5,
'resources': {'cpu': '1', 'memory': '1Gi'}
},
'off_hours': {
'replicas': 2,
'resources': {'cpu': '500m', 'memory': '512Mi'}
},
'weekend': {
'replicas': 1,
'resources': {'cpu': '200m', 'memory': '256Mi'}
}
}
async def run_scheduler(self):
"""Main scheduling loop"""
while True:
current_schedule = self._get_current_schedule()
await self._apply_schedule(current_schedule)
# Check every 15 minutes
await asyncio.sleep(900)
def _get_current_schedule(self) -> str:
"""Determine current schedule based on time"""
now = datetime.now(timezone.utc)
copenhagen_time = now.astimezone(timezone.utc) # Simplified
# Weekend check
if copenhagen_time.weekday() >= 5: # Saturday = 5, Sunday = 6
return 'weekend'
# Business hours check
current_time = copenhagen_time.time()
if (self.business_hours['start'] <= current_time <= self.business_hours['end']):
return 'business_hours'
else:
return 'off_hours'
async def _apply_schedule(self, schedule_name: str):
"""Apply the specified schedule"""
schedule = self.schedules[schedule_name]
# Update replica count
await self._scale_deployment('parliament-api-gateway', schedule['replicas'])
# Update resource requests/limits (requires deployment update)
await self._update_resource_limits('parliament-api-gateway', schedule['resources'])
async def _scale_deployment(self, deployment_name: str, replicas: int):
"""Scale deployment to specified replica count"""
try:
current_deployment = self.v1.read_namespaced_deployment(
name=deployment_name, namespace=self.namespace
)
if current_deployment.spec.replicas != replicas:
body = {'spec': {'replicas': replicas}}
self.v1.patch_namespaced_deployment_scale(
name=deployment_name,
namespace=self.namespace,
body=body
)
print(f"Scaled {deployment_name} to {replicas} replicas")
except Exception as e:
print(f"Failed to scale {deployment_name}: {e}")
async def _update_resource_limits(self, deployment_name: str, resources: Dict[str, str]):
"""Update resource requests and limits"""
try:
deployment = self.v1.read_namespaced_deployment(
name=deployment_name, namespace=self.namespace
)
# Update container resources
container = deployment.spec.template.spec.containers[0]
container.resources.requests = resources
container.resources.limits = resources
# Apply the update
self.v1.patch_namespaced_deployment(
name=deployment_name,
namespace=self.namespace,
body=deployment
)
print(f"Updated {deployment_name} resources: {resources}")
except Exception as e:
print(f"Failed to update resources for {deployment_name}: {e}")
# Start the scheduler
scheduler = ParliamentResourceScheduler()
asyncio.run(scheduler.run_scheduler())
Summary¶
Building scalable applications for the Danish Parliamentary API requires understanding its performance characteristics and implementing appropriate scaling strategies:
Key Performance Insights¶
- Response Times: 85ms to 2.1s depending on query complexity
- No Rate Limiting: Excellent stability under concurrent load
- 100-record Pagination: Plan for multiple requests for large datasets
- Expansion Overhead: 50-100% cost but eliminates multiple API calls
Scaling Best Practices¶
- Implement aggressive caching with Redis clustering
- Use connection pooling optimized for the API's characteristics
- Deploy horizontal scaling with proper load balancing
- Monitor key metrics for proactive scaling decisions
- Optimize query patterns based on entity relationships
- Plan for peak traffic with auto-scaling policies
- Manage costs with scheduled scaling and right-sizing
Production Readiness¶
The Danish Parliament API's excellent performance characteristics make it well-suited for large-scale applications. With proper scaling architecture and monitoring, you can build robust systems that handle parliamentary data processing efficiently at any scale.
For specific implementation guidance, refer to the related documentation:
- Query Optimization for efficient OData patterns
- Caching for cache implementation strategies
- Monitoring for comprehensive observability