Skip to content

Scaling Production Applications

Guide to building scalable applications that consume the Danish Parliamentary API efficiently. Learn proven strategies for handling high-volume data processing and traffic loads.

Overview

The Danish Parliament API offers excellent performance characteristics that make it well-suited for large-scale applications. With response times ranging from 85ms for small queries to 2.1 seconds for 10,000 records, proper scaling strategies can help you build robust systems that handle parliamentary data at any scale.

API Performance Baseline

Based on comprehensive testing, the API demonstrates:

  • Small queries (d100 records): 100-150ms response time
  • Medium queries (1,000 records): 300-500ms response time
  • Large queries (10,000 records): 2-3 seconds response time
  • Complex expansions: 50-100% overhead but eliminate multiple API calls
  • Concurrent requests: No observed limits, excellent stability under load

Scaling Strategies Overview

1. Request-Level Scaling

The API's 100-record limit per request makes pagination essential for large datasets:

import asyncio
import aiohttp
from typing import List, Dict, Any

class ScalableParliamentClient:
    def __init__(self, base_url: str = "https://oda.ft.dk/api"):
        self.base_url = base_url
        self.session = None

    async def __aenter__(self):
        connector = aiohttp.TCPConnector(limit=100, limit_per_host=20)
        timeout = aiohttp.ClientTimeout(total=30, connect=5)
        self.session = aiohttp.ClientSession(
            connector=connector,
            timeout=timeout,
            headers={'Accept': 'application/json'}
        )
        return self

    async def __aexit__(self, exc_type, exc_val, exc_tb):
        if self.session:
            await self.session.close()

    async def get_all_records(self, entity: str, filters: str = "", 
                            expand: str = "", max_records: int = 10000) -> List[Dict[Any, Any]]:
        """Efficiently retrieve large datasets using pagination"""
        all_records = []
        skip = 0
        batch_size = 100  # API maximum

        tasks = []
        # Create concurrent requests for better throughput
        for offset in range(0, min(max_records, 10000), batch_size):
            task = self._fetch_batch(entity, offset, batch_size, filters, expand)
            tasks.append(task)

            # Process in batches of 10 concurrent requests
            if len(tasks) >= 10:
                batch_results = await asyncio.gather(*tasks, return_exceptions=True)
                all_records.extend(self._process_batch_results(batch_results))
                tasks = []

        # Process remaining tasks
        if tasks:
            batch_results = await asyncio.gather(*tasks, return_exceptions=True)
            all_records.extend(self._process_batch_results(batch_results))

        return all_records[:max_records]

    async def _fetch_batch(self, entity: str, skip: int, top: int, 
                          filters: str, expand: str) -> Dict[str, Any]:
        """Fetch a single batch of records"""
        params = {
            '%24skip': str(skip),
            '%24top': str(top)
        }

        if filters:
            params['%24filter'] = filters
        if expand:
            params['%24expand'] = expand

        url = f"{self.base_url}/{entity}"

        async with self.session.get(url, params=params) as response:
            if response.status == 200:
                data = await response.json()
                return {
                    'success': True,
                    'data': data.get('value', []),
                    'count': len(data.get('value', []))
                }
            else:
                return {
                    'success': False,
                    'error': f"HTTP {response.status}",
                    'skip': skip
                }

    def _process_batch_results(self, results: List) -> List[Dict[Any, Any]]:
        """Process batch results and handle errors"""
        processed = []
        for result in results:
            if isinstance(result, Exception):
                print(f"Request failed: {result}")
                continue
            if result.get('success'):
                processed.extend(result['data'])
        return processed

# Usage example
async def main():
    async with ScalableParliamentClient() as client:
        # Get all cases from 2023 with actor relationships
        cases = await client.get_all_records(
            entity="Sag",
            filters="startswith(samlingid, '20231')",
            expand="SagAktør/Aktør",
            max_records=5000
        )
        print(f"Retrieved {len(cases)} cases")

# Run with: asyncio.run(main())

2. Application-Level Scaling

Implement caching and smart request patterns:

import redis
import json
import hashlib
from datetime import datetime, timedelta
from typing import Optional

class CachedParliamentClient:
    def __init__(self, redis_url: str = "redis://localhost:6379"):
        self.client = ScalableParliamentClient()
        self.redis = redis.from_url(redis_url, decode_responses=True)
        self.cache_ttl = 3600  # 1 hour cache

    def _cache_key(self, entity: str, params: Dict[str, str]) -> str:
        """Generate consistent cache key"""
        key_data = f"{entity}:{json.dumps(params, sort_keys=True)}"
        return f"parliament_api:{hashlib.md5(key_data.encode()).hexdigest()}"

    async def get_cached_or_fetch(self, entity: str, **params) -> Dict[str, Any]:
        """Get data from cache or fetch from API"""
        cache_key = self._cache_key(entity, params)

        # Try cache first
        cached = self.redis.get(cache_key)
        if cached:
            return json.loads(cached)

        # Fetch from API
        data = await self.client.get_all_records(entity, **params)

        # Cache with expiration
        self.redis.setex(
            cache_key,
            self.cache_ttl,
            json.dumps(data, default=str)
        )

        return data

    def invalidate_cache_pattern(self, pattern: str):
        """Invalidate cache entries matching pattern"""
        keys = self.redis.keys(f"parliament_api:*{pattern}*")
        if keys:
            self.redis.delete(*keys)

Horizontal vs Vertical Scaling

Horizontal Scaling (Scale Out)

Deploy multiple application instances behind a load balancer:

# docker-compose.yml for horizontal scaling
version: '3.8'
services:
  nginx:
    image: nginx:alpine
    ports:
      - "80:80"
    volumes:
      - ./nginx.conf:/etc/nginx/nginx.conf
    depends_on:
      - app1
      - app2
      - app3

  app1: &app
    build: .
    environment:
      - REDIS_URL=redis://redis:6379
      - INSTANCE_ID=1
    depends_on:
      - redis

  app2:
    <<: *app
    environment:
      - REDIS_URL=redis://redis:6379
      - INSTANCE_ID=2

  app3:
    <<: *app
    environment:
      - REDIS_URL=redis://redis:6379
      - INSTANCE_ID=3

  redis:
    image: redis:alpine
    command: redis-server --maxmemory 512mb --maxmemory-policy allkeys-lru
    volumes:
      - redis_data:/data

volumes:
  redis_data:
# nginx.conf - Load balancer configuration
events {
    worker_connections 1024;
}

http {
    upstream parliament_api_backend {
        least_conn;
        server app1:8000 weight=1 max_fails=3 fail_timeout=30s;
        server app2:8000 weight=1 max_fails=3 fail_timeout=30s;
        server app3:8000 weight=1 max_fails=3 fail_timeout=30s;
    }

    server {
        listen 80;

        location / {
            proxy_pass http://parliament_api_backend;
            proxy_set_header Host $host;
            proxy_set_header X-Real-IP $remote_addr;
            proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;

            # Connection pooling
            proxy_http_version 1.1;
            proxy_set_header Connection "";

            # Timeouts
            proxy_connect_timeout 5s;
            proxy_send_timeout 30s;
            proxy_read_timeout 30s;
        }
    }
}

Vertical Scaling (Scale Up)

Optimize single-instance performance:

# High-performance single-instance configuration
import asyncio
import aiohttp
from concurrent.futures import ThreadPoolExecutor
import multiprocessing

class HighPerformanceParliamentClient:
    def __init__(self):
        # Optimize connection pool for API characteristics
        self.connector = aiohttp.TCPConnector(
            limit=200,              # Total connection pool size
            limit_per_host=50,      # Per-host connection limit
            keepalive_timeout=30,   # Keep connections alive
            enable_cleanup_closed=True,
            use_dns_cache=True,
            ttl_dns_cache=300
        )

        # Optimize timeouts based on API performance
        self.timeout = aiohttp.ClientTimeout(
            total=10,               # Based on 2.1s max observed + buffer
            connect=2,              # API responds quickly
            sock_read=5
        )

        # CPU-intensive processing thread pool
        self.thread_pool = ThreadPoolExecutor(
            max_workers=multiprocessing.cpu_count() * 2
        )

    async def process_large_dataset(self, entity: str, processor_func):
        """Process large datasets with parallel CPU work"""
        async with aiohttp.ClientSession(
            connector=self.connector,
            timeout=self.timeout
        ) as session:

            # Fetch data with optimal concurrency
            semaphore = asyncio.Semaphore(20)  # Limit concurrent requests
            tasks = []

            for skip in range(0, 10000, 100):
                task = self._fetch_with_semaphore(
                    session, semaphore, entity, skip
                )
                tasks.append(task)

            # Gather all data
            results = await asyncio.gather(*tasks, return_exceptions=True)

            # Process CPU-intensive work in thread pool
            loop = asyncio.get_event_loop()
            processing_tasks = []

            for batch in results:
                if isinstance(batch, list):
                    task = loop.run_in_executor(
                        self.thread_pool,
                        processor_func,
                        batch
                    )
                    processing_tasks.append(task)

            return await asyncio.gather(*processing_tasks)

    async def _fetch_with_semaphore(self, session, semaphore, entity, skip):
        async with semaphore:
            params = {'%24skip': str(skip), '%24top': '100'}
            url = f"https://oda.ft.dk/api/{entity}"

            async with session.get(url, params=params) as response:
                if response.status == 200:
                    data = await response.json()
                    return data.get('value', [])
                return []

Load Balancing and Distribution Patterns

Geographic Distribution

# Multi-region deployment configuration
import random
from typing import Dict, List

class GeographicParliamentClient:
    def __init__(self):
        # No geographic restrictions observed on API
        self.regions = {
            'eu-west': {'latency': 50, 'weight': 10},
            'eu-central': {'latency': 30, 'weight': 15},
            'us-east': {'latency': 150, 'weight': 5},
            'asia-pacific': {'latency': 200, 'weight': 3}
        }

    def select_region(self) -> str:
        """Select optimal region based on latency and load"""
        total_weight = sum(r['weight'] for r in self.regions.values())
        r = random.uniform(0, total_weight)

        current_weight = 0
        for region, config in self.regions.items():
            current_weight += config['weight']
            if r <= current_weight:
                return region

        return 'eu-central'  # Fallback to closest region

# CDN configuration for static content
CLOUDFLARE_CONFIG = {
    'zone_settings': {
        'browser_cache_ttl': 3600,      # 1 hour for API responses
        'edge_cache_ttl': 7200,         # 2 hours at edge
        'cache_level': 'aggressive'
    },
    'page_rules': [
        {
            'targets': [{'target': 'url', 'constraint': {'operator': 'matches', 'value': '*/api/*'}}],
            'actions': [
                {'id': 'cache_level', 'value': 'cache_everything'},
                {'id': 'edge_cache_ttl', 'value': 1800}  # 30 min for API
            ]
        }
    ]
}

Request Distribution Strategies

# Smart request routing based on API characteristics
from enum import Enum
import asyncio

class QueryComplexity(Enum):
    SIMPLE = "simple"      # No expansions, small result sets
    MEDIUM = "medium"      # Some expansions or moderate filtering
    COMPLEX = "complex"    # Multi-level expansions, large datasets

class SmartParliamentRouter:
    def __init__(self):
        self.simple_pool = asyncio.Semaphore(50)    # High concurrency
        self.medium_pool = asyncio.Semaphore(20)    # Moderate concurrency  
        self.complex_pool = asyncio.Semaphore(5)    # Limited concurrency

    def classify_query(self, entity: str, params: Dict) -> QueryComplexity:
        """Classify query complexity for optimal routing"""
        expand = params.get('%24expand', '')
        top = int(params.get('%24top', '100'))
        filter_clause = params.get('%24filter', '')

        # Complex: Multi-level expansions or large datasets
        if '/' in expand or top > 50 or len(filter_clause) > 100:
            return QueryComplexity.COMPLEX

        # Medium: Single-level expansions or moderate filtering
        if expand or filter_clause or top > 10:
            return QueryComplexity.MEDIUM

        # Simple: Basic queries
        return QueryComplexity.SIMPLE

    async def execute_query(self, entity: str, params: Dict) -> Dict:
        """Execute query with appropriate resource allocation"""
        complexity = self.classify_query(entity, params)

        if complexity == QueryComplexity.COMPLEX:
            async with self.complex_pool:
                return await self._execute_with_retries(entity, params, max_retries=3)
        elif complexity == QueryComplexity.MEDIUM:
            async with self.medium_pool:
                return await self._execute_with_retries(entity, params, max_retries=2)
        else:
            async with self.simple_pool:
                return await self._execute_with_retries(entity, params, max_retries=1)

    async def _execute_with_retries(self, entity: str, params: Dict, max_retries: int) -> Dict:
        """Execute query with exponential backoff retries"""
        for attempt in range(max_retries + 1):
            try:
                async with aiohttp.ClientSession() as session:
                    url = f"https://oda.ft.dk/api/{entity}"
                    async with session.get(url, params=params) as response:
                        if response.status == 200:
                            return await response.json()
                        elif response.status == 429:  # Rate limited (unlikely but handle)
                            await asyncio.sleep(2 ** attempt)
                            continue
                        else:
                            response.raise_for_status()
            except Exception as e:
                if attempt == max_retries:
                    raise
                await asyncio.sleep(2 ** attempt)

        raise Exception(f"Failed to execute query after {max_retries + 1} attempts")

Database Scaling for Cached Data

PostgreSQL Scaling Configuration

-- Optimized PostgreSQL schema for caching parliamentary data
CREATE DATABASE parliament_cache;

-- Partitioned table for time-series data
CREATE TABLE parliamentary_data (
    id BIGSERIAL,
    entity_type VARCHAR(50) NOT NULL,
    entity_id VARCHAR(100) NOT NULL,
    data JSONB NOT NULL,
    created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(),
    updated_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(),
    expires_at TIMESTAMP WITH TIME ZONE
) PARTITION BY RANGE (created_at);

-- Create monthly partitions for efficient data management
CREATE TABLE parliamentary_data_2024_01 PARTITION OF parliamentary_data
    FOR VALUES FROM ('2024-01-01') TO ('2024-02-01');
CREATE TABLE parliamentary_data_2024_02 PARTITION OF parliamentary_data
    FOR VALUES FROM ('2024-02-01') TO ('2024-03-01');
-- ... continue for all months

-- Indexes for optimal query performance
CREATE INDEX CONCURRENTLY idx_parliamentary_data_entity 
    ON parliamentary_data (entity_type, entity_id);
CREATE INDEX CONCURRENTLY idx_parliamentary_data_expires 
    ON parliamentary_data (expires_at) WHERE expires_at IS NOT NULL;
CREATE INDEX CONCURRENTLY idx_parliamentary_data_updated 
    ON parliamentary_data (updated_at);

-- GIN index for JSONB data queries
CREATE INDEX CONCURRENTLY idx_parliamentary_data_jsonb 
    ON parliamentary_data USING gin (data);

-- Configuration for high-performance caching
ALTER SYSTEM SET shared_buffers = '2GB';
ALTER SYSTEM SET effective_cache_size = '6GB';
ALTER SYSTEM SET maintenance_work_mem = '512MB';
ALTER SYSTEM SET checkpoint_timeout = '15min';
ALTER SYSTEM SET wal_buffers = '16MB';
ALTER SYSTEM SET default_statistics_target = 500;
SELECT pg_reload_conf();

Redis Clustering for Distributed Cache

# Redis cluster configuration for distributed caching
import redis
from rediscluster import RedisCluster

class DistributedParliamentCache:
    def __init__(self):
        # Redis cluster nodes
        startup_nodes = [
            {"host": "redis-node-1", "port": 7000},
            {"host": "redis-node-2", "port": 7000}, 
            {"host": "redis-node-3", "port": 7000},
            {"host": "redis-node-4", "port": 7000},
            {"host": "redis-node-5", "port": 7000},
            {"host": "redis-node-6", "port": 7000}
        ]

        self.cluster = RedisCluster(
            startup_nodes=startup_nodes,
            decode_responses=True,
            skip_full_coverage_check=True,
            health_check_interval=30,
            retry_on_timeout=True,
            socket_timeout=5,
            socket_connect_timeout=5
        )

    def cache_parliamentary_data(self, entity: str, entity_id: str, 
                               data: Dict, ttl: int = 3600):
        """Cache data with intelligent key distribution"""
        # Use consistent hashing for even distribution
        key = f"parliament:{entity}:{entity_id}"

        # Store main data
        self.cluster.setex(key, ttl, json.dumps(data))

        # Store metadata for cache management
        metadata_key = f"meta:{entity}:{entity_id}"
        metadata = {
            'cached_at': datetime.utcnow().isoformat(),
            'entity_type': entity,
            'size_bytes': len(json.dumps(data)),
            'ttl': ttl
        }
        self.cluster.setex(metadata_key, ttl, json.dumps(metadata))

    def get_cache_stats(self) -> Dict:
        """Get distributed cache statistics"""
        total_keys = 0
        total_memory = 0
        node_stats = {}

        for node in self.cluster.nodes_manager.nodes.values():
            info = node.redis_connection.info()
            stats = {
                'keys': info['db0']['keys'] if 'db0' in info else 0,
                'memory_used': info['used_memory'],
                'hits': info['keyspace_hits'],
                'misses': info['keyspace_misses']
            }
            node_stats[f"{node.host}:{node.port}"] = stats
            total_keys += stats['keys']
            total_memory += stats['memory_used']

        return {
            'total_keys': total_keys,
            'total_memory_mb': total_memory / 1024 / 1024,
            'nodes': node_stats,
            'hit_rate': sum(s['hits'] for s in node_stats.values()) / 
                       (sum(s['hits'] + s['misses'] for s in node_stats.values()) or 1)
        }

Queue-Based Processing for High-Volume Operations

Celery Task Queue Implementation

# celery_tasks.py - Distributed task processing
from celery import Celery, group, chord
from celery.result import AsyncResult
import asyncio
from typing import List

# Celery configuration
app = Celery('parliament_processor')
app.config_from_object({
    'broker_url': 'redis://redis-cluster:6379/0',
    'result_backend': 'redis://redis-cluster:6379/1',
    'task_serializer': 'json',
    'accept_content': ['json'],
    'result_serializer': 'json',
    'timezone': 'Europe/Copenhagen',
    'enable_utc': True,
    'task_routes': {
        'parliament_processor.fetch_entity_batch': {'queue': 'fetch_queue'},
        'parliament_processor.process_parliamentary_data': {'queue': 'process_queue'},
        'parliament_processor.export_results': {'queue': 'export_queue'}
    },
    'worker_prefetch_multiplier': 4,
    'task_acks_late': True,
    'worker_disable_rate_limits': True
})

@app.task(bind=True, max_retries=3, default_retry_delay=60)
def fetch_entity_batch(self, entity: str, skip: int, top: int, 
                      filters: str = "", expand: str = ""):
    """Fetch a batch of parliamentary data"""
    try:
        client = ScalableParliamentClient()
        # Convert to sync for Celery
        loop = asyncio.new_event_loop()
        asyncio.set_event_loop(loop)

        result = loop.run_until_complete(
            client._fetch_batch(entity, skip, top, filters, expand)
        )

        if not result.get('success'):
            raise Exception(f"API request failed: {result.get('error')}")

        return {
            'entity': entity,
            'skip': skip,
            'data': result['data'],
            'count': result['count']
        }

    except Exception as exc:
        raise self.retry(exc=exc, countdown=60 * (2 ** self.request.retries))

@app.task
def process_parliamentary_data(batch_result: Dict) -> Dict:
    """Process a batch of parliamentary data"""
    data = batch_result['data']
    entity = batch_result['entity']

    processed_records = []
    for record in data:
        # Example: Extract key information based on entity type
        if entity == 'Sag':
            processed = {
                'id': record.get('id'),
                'title': record.get('titel'),
                'type': record.get('typeid'),
                'status': record.get('statusid'),
                'period': record.get('samlingid')
            }
        elif entity == 'Aktør':
            processed = {
                'id': record.get('id'),
                'name': record.get('navn'),
                'type': record.get('typeid'),
                'party': record.get('gruppenavnkort')
            }
        else:
            processed = record  # Pass through unknown entities

        processed_records.append(processed)

    return {
        'entity': entity,
        'skip': batch_result['skip'],
        'processed_count': len(processed_records),
        'records': processed_records
    }

@app.task
def export_results(processed_batches: List[Dict], export_format: str = 'json'):
    """Export processed results to final destination"""
    all_records = []
    for batch in processed_batches:
        all_records.extend(batch['records'])

    if export_format == 'json':
        # Export to JSON file or database
        filename = f"export_{datetime.utcnow().strftime('%Y%m%d_%H%M%S')}.json"
        with open(f"/exports/{filename}", 'w', encoding='utf-8') as f:
            json.dump(all_records, f, ensure_ascii=False, indent=2)
        return {'exported_file': filename, 'record_count': len(all_records)}

    elif export_format == 'csv':
        # Export to CSV format
        import pandas as pd
        df = pd.DataFrame(all_records)
        filename = f"export_{datetime.utcnow().strftime('%Y%m%d_%H%M%S')}.csv"
        df.to_csv(f"/exports/{filename}", index=False, encoding='utf-8')
        return {'exported_file': filename, 'record_count': len(all_records)}

def process_large_parliamentary_dataset(entity: str, max_records: int = 10000,
                                      filters: str = "", expand: str = ""):
    """Orchestrate large dataset processing using Celery"""

    # Create fetch tasks for all batches
    batch_size = 100
    fetch_tasks = []

    for skip in range(0, max_records, batch_size):
        task = fetch_entity_batch.s(entity, skip, batch_size, filters, expand)
        fetch_tasks.append(task)

    # Create processing workflow with Celery canvas
    workflow = chord(
        group(fetch_tasks),                    # Parallel fetch operations
        export_results.s(export_format='json')  # Final export step
    )

    # Execute workflow
    result = workflow.apply_async()

    return {
        'workflow_id': result.id,
        'status': 'started',
        'estimated_batches': len(fetch_tasks)
    }

def get_workflow_status(workflow_id: str) -> Dict:
    """Get status of a processing workflow"""
    result = AsyncResult(workflow_id, app=app)

    if result.state == 'PENDING':
        return {'status': 'pending', 'progress': 0}
    elif result.state == 'PROGRESS':
        return {
            'status': 'in_progress',
            'progress': result.info.get('progress', 0)
        }
    elif result.state == 'SUCCESS':
        return {
            'status': 'completed',
            'progress': 100,
            'result': result.result
        }
    else:
        return {
            'status': 'failed',
            'error': str(result.info)
        }

Apache Kafka for Real-time Processing

# kafka_parliament_processor.py - Real-time data streaming
from kafka import KafkaProducer, KafkaConsumer
from kafka.errors import KafkaError
import json
import asyncio

class ParliamentDataStreamer:
    def __init__(self, kafka_servers: List[str]):
        self.producer = KafkaProducer(
            bootstrap_servers=kafka_servers,
            value_serializer=lambda x: json.dumps(x, ensure_ascii=False).encode('utf-8'),
            key_serializer=lambda x: x.encode('utf-8') if x else None,
            acks='all',                    # Wait for all replicas
            retries=5,                     # Retry failed sends
            batch_size=16384,              # Batch size for efficiency
            linger_ms=10,                  # Wait up to 10ms to batch
            buffer_memory=33554432         # 32MB buffer
        )

        self.consumer = KafkaConsumer(
            bootstrap_servers=kafka_servers,
            auto_offset_reset='latest',
            enable_auto_commit=True,
            group_id='parliament-processors',
            value_deserializer=lambda m: json.loads(m.decode('utf-8'))
        )

    async def stream_parliamentary_updates(self, entities: List[str]):
        """Stream real-time parliamentary data updates"""
        client = ScalableParliamentClient()

        async with client:
            while True:
                for entity in entities:
                    try:
                        # Fetch latest data
                        recent_data = await client.get_all_records(
                            entity=entity,
                            filters=f"opdateringsdato gt datetime'{datetime.utcnow().isoformat()}'",
                            max_records=1000
                        )

                        # Stream to Kafka
                        for record in recent_data:
                            key = f"{entity}:{record.get('id')}"
                            message = {
                                'entity': entity,
                                'action': 'update',
                                'timestamp': datetime.utcnow().isoformat(),
                                'data': record
                            }

                            future = self.producer.send(
                                f'parliament-{entity.lower()}',
                                key=key,
                                value=message
                            )

                            # Handle send result
                            try:
                                future.get(timeout=10)
                            except KafkaError as e:
                                print(f"Failed to send message: {e}")

                    except Exception as e:
                        print(f"Error processing {entity}: {e}")

                # Wait before next poll
                await asyncio.sleep(300)  # 5 minutes

    def process_parliament_stream(self, topic: str, processor_func):
        """Process streaming parliamentary data"""
        self.consumer.subscribe([topic])

        for message in self.consumer:
            try:
                # Process the message
                result = processor_func(message.value)

                # Optionally send results to another topic
                if result:
                    self.producer.send(
                        f"{topic}-processed",
                        value=result
                    )

            except Exception as e:
                print(f"Error processing message: {e}")
                # Could implement dead letter queue here

Microservices Architecture Patterns

Service Decomposition

# microservices/parliament_gateway.py - API Gateway service
from fastapi import FastAPI, HTTPException, Depends
from fastapi.middleware.cors import CORSMiddleware
import httpx
from typing import Dict, List, Optional

app = FastAPI(title="Parliament API Gateway", version="1.0.0")

app.add_middleware(
    CORSMiddleware,
    allow_origins=["*"],
    allow_credentials=True,
    allow_methods=["GET"],
    allow_headers=["*"],
)

class ServiceRegistry:
    def __init__(self):
        self.services = {
            'cases': 'http://case-service:8000',
            'actors': 'http://actor-service:8000',
            'votes': 'http://vote-service:8000',
            'documents': 'http://document-service:8000',
            'cache': 'http://cache-service:8000'
        }

    def get_service_url(self, service: str) -> str:
        return self.services.get(service)

registry = ServiceRegistry()

@app.get("/api/parliament/cases")
async def get_cases(
    filters: Optional[str] = None,
    expand: Optional[str] = None,
    limit: int = 100
):
    """Proxy to case service with load balancing"""
    service_url = registry.get_service_url('cases')

    params = {'limit': limit}
    if filters:
        params['filters'] = filters
    if expand:
        params['expand'] = expand

    async with httpx.AsyncClient() as client:
        try:
            response = await client.get(f"{service_url}/cases", params=params)
            response.raise_for_status()
            return response.json()
        except httpx.RequestError as e:
            raise HTTPException(status_code=503, detail=f"Service unavailable: {e}")

@app.get("/api/parliament/actors")
async def get_actors(
    actor_type: Optional[str] = None,
    party: Optional[str] = None,
    limit: int = 100
):
    """Proxy to actor service"""
    service_url = registry.get_service_url('actors')

    params = {'limit': limit}
    if actor_type:
        params['type'] = actor_type
    if party:
        params['party'] = party

    async with httpx.AsyncClient() as client:
        try:
            response = await client.get(f"{service_url}/actors", params=params)
            response.raise_for_status()
            return response.json()
        except httpx.RequestError as e:
            raise HTTPException(status_code=503, detail=f"Service unavailable: {e}")

# Circuit breaker pattern
from functools import wraps
import time

class CircuitBreaker:
    def __init__(self, failure_threshold: int = 5, reset_timeout: int = 60):
        self.failure_threshold = failure_threshold
        self.reset_timeout = reset_timeout
        self.failure_count = 0
        self.last_failure_time = None
        self.state = 'CLOSED'  # CLOSED, OPEN, HALF_OPEN

    def call(self, func, *args, **kwargs):
        if self.state == 'OPEN':
            if time.time() - self.last_failure_time > self.reset_timeout:
                self.state = 'HALF_OPEN'
            else:
                raise Exception("Circuit breaker is OPEN")

        try:
            result = func(*args, **kwargs)
            self._on_success()
            return result
        except Exception as e:
            self._on_failure()
            raise e

    def _on_success(self):
        self.failure_count = 0
        self.state = 'CLOSED'

    def _on_failure(self):
        self.failure_count += 1
        self.last_failure_time = time.time()

        if self.failure_count >= self.failure_threshold:
            self.state = 'OPEN'

Service-Specific Optimizations

# microservices/case_service.py - Specialized case processing service
from fastapi import FastAPI
import asyncio
from typing import List, Dict

app = FastAPI(title="Parliament Case Service")

class CaseService:
    def __init__(self):
        self.client = ScalableParliamentClient()
        self.cache = DistributedParliamentCache()

    async def get_cases_with_optimization(self, filters: str = "", 
                                        expand: str = "", 
                                        limit: int = 100) -> List[Dict]:
        """Optimized case retrieval with caching and preprocessing"""

        # Check cache first
        cache_key = f"cases:{hashlib.md5(f'{filters}:{expand}:{limit}'.encode()).hexdigest()}"
        cached = await self.cache.get(cache_key)
        if cached:
            return cached

        # Fetch from API with optimizations
        async with self.client as client:
            # Smart batching based on filters
            if 'samlingid' in filters and limit <= 100:
                # Single request for specific session
                cases = await client.get_all_records(
                    entity="Sag",
                    filters=filters,
                    expand=expand,
                    max_records=limit
                )
            else:
                # Parallel requests for large datasets
                cases = await client.get_all_records(
                    entity="Sag", 
                    filters=filters,
                    expand=expand,
                    max_records=limit
                )

        # Post-process for this service's needs
        processed_cases = []
        for case in cases:
            processed_case = {
                'id': case.get('id'),
                'title': case.get('titel'),
                'summary': case.get('resume'),
                'type': case.get('typeid'),
                'status': case.get('statusid'),
                'session': case.get('samlingid'),
                'created_date': case.get('opdateringsdato'),
                'actors': self._extract_case_actors(case) if expand else None
            }
            processed_cases.append(processed_case)

        # Cache results
        await self.cache.set(cache_key, processed_cases, ttl=1800)  # 30 min

        return processed_cases[:limit]

    def _extract_case_actors(self, case: Dict) -> List[Dict]:
        """Extract and format case actors from expanded data"""
        actors = []
        if 'SagAktør' in case:
            for sag_actor in case['SagAktør']:
                if 'Aktør' in sag_actor:
                    actor = sag_actor['Aktør']
                    actors.append({
                        'id': actor.get('id'),
                        'name': actor.get('navn'),
                        'role': sag_actor.get('rolleid'),
                        'type': actor.get('typeid')
                    })
        return actors

case_service = CaseService()

@app.get("/cases")
async def get_cases(
    session_id: Optional[str] = None,
    case_type: Optional[str] = None,
    status: Optional[str] = None,
    expand: Optional[str] = None,
    limit: int = 100
):
    # Build OData filter
    filters = []
    if session_id:
        filters.append(f"samlingid eq '{session_id}'")
    if case_type:
        filters.append(f"typeid eq {case_type}")
    if status:
        filters.append(f"statusid eq {status}")

    filter_str = " and ".join(filters)

    return await case_service.get_cases_with_optimization(
        filters=filter_str,
        expand=expand or "",
        limit=limit
    )

Performance Optimization at Scale

Connection Pool Optimization

# connection_optimization.py - Optimized connection handling
import aiohttp
import asyncio
from aiohttp_retry import RetryClient, ExponentialRetry
import ssl

class OptimizedParliamentClient:
    def __init__(self):
        # SSL context optimization
        self.ssl_context = ssl.create_default_context()
        self.ssl_context.set_ciphers('ECDHE+AESGCM:ECDHE+CHACHA20:DHE+AESGCM:DHE+CHACHA20:!aNULL:!MD5:!DSS')

        # Connection pool tuning based on API characteristics
        self.connector = aiohttp.TCPConnector(
            limit=100,                    # Total pool size
            limit_per_host=20,           # Per-host limit (API has no rate limiting)
            keepalive_timeout=60,        # Keep connections alive longer
            enable_cleanup_closed=True,
            use_dns_cache=True,
            ttl_dns_cache=600,           # 10-minute DNS cache
            ssl_context=self.ssl_context,
            resolver=aiohttp.AsyncResolver() # Use async DNS resolution
        )

        # Timeout optimization for API response patterns
        self.timeout = aiohttp.ClientTimeout(
            total=15,           # 15s total (API max observed: 2.1s + buffer)
            connect=3,          # 3s connection (API responds quickly)
            sock_read=8,        # 8s socket read
            sock_connect=3      # 3s socket connection
        )

        # Retry configuration
        self.retry_options = ExponentialRetry(
            attempts=3,
            start_timeout=1,
            max_timeout=10,
            factor=2.0,
            statuses={500, 502, 503, 504}  # Retry on server errors
        )

    async def create_session(self) -> RetryClient:
        """Create optimized HTTP session"""
        session = aiohttp.ClientSession(
            connector=self.connector,
            timeout=self.timeout,
            headers={
                'User-Agent': 'ParliamentApp/1.0 (+https://yourapp.com)',
                'Accept': 'application/json',
                'Accept-Encoding': 'gzip, deflate',
                'Connection': 'keep-alive'
            }
        )

        return RetryClient(
            client_session=session,
            retry_options=self.retry_options
        )

Query Optimization Patterns

# query_optimization.py - Advanced query optimization
from dataclasses import dataclass
from typing import Set, Dict, List, Optional
import re

@dataclass
class QueryOptimizationPlan:
    entity: str
    filters: List[str]
    expansions: List[str] 
    estimated_cost: int
    parallel_requests: List[Dict]
    cache_strategy: str

class QueryOptimizer:
    def __init__(self):
        # Performance characteristics from API testing
        self.entity_performance = {
            'Sag': {'base_time': 85, 'expansion_cost': 50},
            'Aktør': {'base_time': 90, 'expansion_cost': 45},
            'Afstemning': {'base_time': 100, 'expansion_cost': 80},
            'Stemme': {'base_time': 110, 'expansion_cost': 60},
            'Dokument': {'base_time': 95, 'expansion_cost': 40}
        }

        # High-cardinality filters that benefit from early filtering
        self.selective_filters = {
            'samlingid': 0.1,     # Very selective (specific session)
            'statusid': 0.3,      # Moderately selective
            'typeid': 0.4,        # Moderately selective
            'id': 0.001           # Extremely selective
        }

    def optimize_query(self, entity: str, filters: str = "", 
                      expand: str = "", limit: int = 100) -> QueryOptimizationPlan:
        """Create optimal query execution plan"""

        # Parse filters and expansions
        filter_list = self._parse_filters(filters)
        expansion_list = self._parse_expansions(expand)

        # Estimate query cost
        base_cost = self.entity_performance.get(entity, {}).get('base_time', 100)
        expansion_cost = len(expansion_list) * self.entity_performance.get(entity, {}).get('expansion_cost', 50)
        filter_cost = self._estimate_filter_cost(filter_list)

        total_cost = base_cost + expansion_cost + filter_cost

        # Determine if query should be split
        if total_cost > 500 or limit > 500:  # 500ms threshold
            parallel_requests = self._create_parallel_plan(
                entity, filter_list, expansion_list, limit
            )
            cache_strategy = 'distributed'
        else:
            parallel_requests = [{
                'entity': entity,
                'filters': filters,
                'expand': expand,
                'skip': 0,
                'top': min(limit, 100)
            }]
            cache_strategy = 'local'

        return QueryOptimizationPlan(
            entity=entity,
            filters=filter_list,
            expansions=expansion_list,
            estimated_cost=total_cost,
            parallel_requests=parallel_requests,
            cache_strategy=cache_strategy
        )

    def _parse_filters(self, filters: str) -> List[str]:
        """Parse OData filter string"""
        if not filters:
            return []

        # Simple parsing - in production, use proper OData parser
        return [f.strip() for f in filters.split(' and ') if f.strip()]

    def _parse_expansions(self, expand: str) -> List[str]:
        """Parse OData expansion string"""
        if not expand:
            return []

        return [e.strip() for e in expand.split(',') if e.strip()]

    def _estimate_filter_cost(self, filters: List[str]) -> int:
        """Estimate filtering performance cost"""
        cost = 0
        for filter_expr in filters:
            # Extract field name
            field_match = re.match(r'(\w+)', filter_expr)
            if field_match:
                field = field_match.group(1)
                selectivity = self.selective_filters.get(field, 0.5)
                # More selective filters cost less
                cost += int(50 * selectivity)
        return cost

    def _create_parallel_plan(self, entity: str, filters: List[str], 
                            expansions: List[str], limit: int) -> List[Dict]:
        """Create plan for parallel request execution"""
        requests = []

        # If we have highly selective filters, use them
        selective_filters = [f for f in filters 
                           if any(sf in f for sf in self.selective_filters.keys())]

        if selective_filters and limit <= 100:
            # Single optimized request
            requests.append({
                'entity': entity,
                'filters': ' and '.join(filters),
                'expand': ','.join(expansions),
                'skip': 0,
                'top': limit
            })
        else:
            # Multiple parallel requests
            batch_size = 100
            for skip in range(0, min(limit, 1000), batch_size):
                requests.append({
                    'entity': entity,
                    'filters': ' and '.join(filters),
                    'expand': ','.join(expansions),
                    'skip': skip,
                    'top': min(batch_size, limit - skip)
                })

        return requests

# Usage
optimizer = QueryOptimizer()
plan = optimizer.optimize_query(
    entity="Sag",
    filters="samlingid eq '20241' and statusid eq 3",
    expand="SagAktør/Aktør",
    limit=500
)

Resource Planning and Capacity Management

Infrastructure Sizing Guide

# infrastructure/parliament-app.yaml - Production deployment sizing
apiVersion: v1
kind: ConfigMap
metadata:
  name: parliament-scaling-config
data:
  scaling-guide.yaml: |
    # Resource requirements based on API performance testing

    # Small deployment (< 1,000 requests/day)
    small:
      api_gateway:
        replicas: 2
        cpu: "200m"
        memory: "256Mi"
        max_connections: 50

      cache:
        type: "redis-single"
        memory: "512Mi"
        cpu: "100m"
        max_connections: 100

      database:
        type: "postgresql-single"
        storage: "10Gi"
        cpu: "500m"
        memory: "1Gi"

    # Medium deployment (1,000 - 10,000 requests/day)  
    medium:
      api_gateway:
        replicas: 3
        cpu: "500m"
        memory: "512Mi"
        max_connections: 200

      cache:
        type: "redis-cluster"
        nodes: 3
        memory_per_node: "1Gi"
        cpu_per_node: "200m"
        max_connections: 500

      database:
        type: "postgresql-ha"
        primary:
          storage: "50Gi"
          cpu: "1"
          memory: "4Gi"
        replica:
          storage: "50Gi"
          cpu: "500m" 
          memory: "2Gi"

    # Large deployment (> 10,000 requests/day)
    large:
      api_gateway:
        replicas: 5
        cpu: "1"
        memory: "1Gi"
        max_connections: 500

      cache:
        type: "redis-cluster"
        nodes: 6
        memory_per_node: "2Gi"
        cpu_per_node: "500m"
        max_connections: 2000

      database:
        type: "postgresql-cluster"
        nodes: 3
        storage_per_node: "100Gi"
        cpu_per_node: "2"
        memory_per_node: "8Gi"

      workers:
        celery_workers: 10
        cpu_per_worker: "500m"
        memory_per_worker: "1Gi"
---
# Kubernetes deployment with resource limits
apiVersion: apps/v1
kind: Deployment
metadata:
  name: parliament-api-gateway
spec:
  replicas: 3
  selector:
    matchLabels:
      app: parliament-api-gateway
  template:
    metadata:
      labels:
        app: parliament-api-gateway
    spec:
      containers:
      - name: api-gateway
        image: parliament-api-gateway:latest
        ports:
        - containerPort: 8000
        resources:
          requests:
            memory: "512Mi"
            cpu: "500m"
          limits:
            memory: "1Gi"
            cpu: "1"
        env:
        - name: REDIS_URL
          value: "redis://redis-cluster:6379"
        - name: POSTGRES_URL
          valueFrom:
            secretKeyRef:
              name: db-credentials
              key: postgres-url
        livenessProbe:
          httpGet:
            path: /health
            port: 8000
          initialDelaySeconds: 30
          periodSeconds: 10
        readinessProbe:
          httpGet:
            path: /ready
            port: 8000
          initialDelaySeconds: 5
          periodSeconds: 5

Monitoring and Alerting Configuration

# monitoring/parliament_metrics.py - Performance monitoring
from prometheus_client import Counter, Histogram, Gauge, start_http_server
import time
import psutil

# Metrics for Parliament API consumption
api_requests_total = Counter(
    'parliament_api_requests_total',
    'Total API requests made to Parliament API',
    ['entity', 'status_code', 'cache_hit']
)

api_request_duration = Histogram(
    'parliament_api_request_duration_seconds',
    'Time spent on Parliament API requests',
    ['entity', 'complexity'],
    buckets=[0.1, 0.25, 0.5, 1.0, 2.5, 5.0, 10.0]
)

cache_hit_ratio = Gauge(
    'parliament_cache_hit_ratio',
    'Cache hit ratio for Parliament API responses'
)

active_connections = Gauge(
    'parliament_active_connections',
    'Active connections to Parliament API'
)

queue_size = Gauge(
    'parliament_processing_queue_size',
    'Size of processing queue',
    ['queue_name']
)

class ParliamentMetricsCollector:
    def __init__(self):
        self.start_time = time.time()
        self.cache_hits = 0
        self.cache_misses = 0

    def record_api_request(self, entity: str, duration: float, 
                          status_code: int, cache_hit: bool):
        """Record API request metrics"""

        # Classify request complexity
        complexity = 'simple'
        if duration > 1.0:
            complexity = 'complex'
        elif duration > 0.5:
            complexity = 'medium'

        # Record metrics
        api_requests_total.labels(
            entity=entity,
            status_code=str(status_code),
            cache_hit=str(cache_hit)
        ).inc()

        api_request_duration.labels(
            entity=entity,
            complexity=complexity
        ).observe(duration)

        # Update cache metrics
        if cache_hit:
            self.cache_hits += 1
        else:
            self.cache_misses += 1

        total_requests = self.cache_hits + self.cache_misses
        if total_requests > 0:
            cache_hit_ratio.set(self.cache_hits / total_requests)

    def record_system_metrics(self):
        """Record system resource metrics"""
        # CPU and memory usage
        cpu_percent = psutil.cpu_percent()
        memory = psutil.virtual_memory()

        # Network connections
        connections = len(psutil.net_connections())
        active_connections.set(connections)

    def record_queue_metrics(self, queue_name: str, size: int):
        """Record processing queue metrics"""
        queue_size.labels(queue_name=queue_name).set(size)

# Start Prometheus metrics server
metrics_collector = ParliamentMetricsCollector()
start_http_server(8080)  # Metrics available at :8080/metrics

Capacity Planning Calculator

# capacity_planning.py - Resource requirement calculator
from dataclasses import dataclass
from typing import Dict, List
import math

@dataclass
class WorkloadProfile:
    daily_requests: int
    peak_multiplier: float = 3.0  # Peak traffic is 3x average
    avg_response_size_kb: int = 50
    cache_hit_ratio: float = 0.7
    expansion_ratio: float = 0.3  # 30% of requests use expansions
    complex_query_ratio: float = 0.1  # 10% complex queries

@dataclass 
class InfrastructureRecommendation:
    api_gateway_replicas: int
    api_gateway_cpu: str
    api_gateway_memory: str
    cache_memory_gb: int
    cache_nodes: int
    db_storage_gb: int
    db_cpu: int
    db_memory_gb: int
    estimated_monthly_cost_usd: float

class ParliamentCapacityPlanner:
    def __init__(self):
        # Resource costs per unit (example AWS pricing)
        self.costs = {
            'api_gateway_replica_month': 50,  # $50/month per replica
            'cache_gb_month': 30,             # $30/month per GB cache
            'db_cpu_month': 100,              # $100/month per CPU
            'db_storage_gb_month': 0.5,       # $0.50/month per GB storage
            'data_transfer_gb': 0.09          # $0.09 per GB data transfer
        }

        # Performance baselines from API testing
        self.performance = {
            'requests_per_replica_per_second': 50,  # Conservative estimate
            'cache_memory_per_1k_objects_mb': 100,   # 100MB per 1K cached objects
            'db_growth_per_1k_requests_mb': 10       # 10MB DB growth per 1K requests
        }

    def calculate_requirements(self, workload: WorkloadProfile) -> InfrastructureRecommendation:
        """Calculate infrastructure requirements for given workload"""

        # Calculate peak requests per second
        daily_requests = workload.daily_requests
        peak_rps = (daily_requests * workload.peak_multiplier) / (24 * 3600)

        # API Gateway sizing
        api_replicas = max(2, math.ceil(peak_rps / self.performance['requests_per_replica_per_second']))
        api_cpu = self._calculate_api_cpu(peak_rps, workload)
        api_memory = self._calculate_api_memory(peak_rps, workload)

        # Cache sizing
        cache_objects = daily_requests * (1 - workload.cache_hit_ratio)
        cache_memory_gb = math.ceil(
            (cache_objects / 1000) * self.performance['cache_memory_per_1k_objects_mb'] / 1024
        )
        cache_nodes = max(1, math.ceil(cache_memory_gb / 4))  # 4GB per node max

        # Database sizing
        db_storage_gb = max(10, math.ceil(
            (daily_requests * 30) / 1000 * self.performance['db_growth_per_1k_requests_mb'] / 1024
        ))
        db_cpu = max(1, math.ceil(peak_rps / 100))  # 100 RPS per CPU
        db_memory_gb = max(2, db_cpu * 2)  # 2GB RAM per CPU

        # Cost calculation
        monthly_cost = self._calculate_monthly_cost(
            api_replicas, cache_memory_gb, cache_nodes,
            db_cpu, db_storage_gb, workload
        )

        return InfrastructureRecommendation(
            api_gateway_replicas=api_replicas,
            api_gateway_cpu=api_cpu,
            api_gateway_memory=api_memory,
            cache_memory_gb=cache_memory_gb,
            cache_nodes=cache_nodes,
            db_storage_gb=db_storage_gb,
            db_cpu=db_cpu,
            db_memory_gb=db_memory_gb,
            estimated_monthly_cost_usd=monthly_cost
        )

    def _calculate_api_cpu(self, peak_rps: float, workload: WorkloadProfile) -> str:
        """Calculate API gateway CPU requirements"""
        base_cpu = max(0.5, peak_rps / 100)  # 100 RPS per 0.5 CPU

        # Add overhead for expansions and complex queries
        expansion_overhead = workload.expansion_ratio * 0.5
        complex_overhead = workload.complex_query_ratio * 1.0

        total_cpu = base_cpu * (1 + expansion_overhead + complex_overhead)

        # Round to standard CPU sizes
        if total_cpu <= 0.5:
            return "500m"
        elif total_cpu <= 1.0:
            return "1"
        elif total_cpu <= 2.0:
            return "2"
        else:
            return f"{math.ceil(total_cpu)}"

    def _calculate_api_memory(self, peak_rps: float, workload: WorkloadProfile) -> str:
        """Calculate API gateway memory requirements"""
        base_memory = max(512, peak_rps * 10)  # 10MB per RPS

        # Add buffer for connection pooling and caching
        buffer_memory = base_memory * 0.5
        total_memory_mb = int(base_memory + buffer_memory)

        # Round to standard memory sizes
        if total_memory_mb <= 512:
            return "512Mi"
        elif total_memory_mb <= 1024:
            return "1Gi"
        elif total_memory_mb <= 2048:
            return "2Gi"
        else:
            return f"{math.ceil(total_memory_mb / 1024)}Gi"

    def _calculate_monthly_cost(self, api_replicas: int, cache_memory_gb: int,
                              cache_nodes: int, db_cpu: int, db_storage_gb: int,
                              workload: WorkloadProfile) -> float:
        """Calculate estimated monthly infrastructure cost"""

        # API Gateway cost
        api_cost = api_replicas * self.costs['api_gateway_replica_month']

        # Cache cost
        cache_cost = cache_memory_gb * self.costs['cache_gb_month']

        # Database cost
        db_cost = (db_cpu * self.costs['db_cpu_month'] + 
                  db_storage_gb * self.costs['db_storage_gb_month'])

        # Data transfer cost (estimate)
        monthly_data_gb = (workload.daily_requests * 30 * 
                          workload.avg_response_size_kb / 1024)
        data_transfer_cost = monthly_data_gb * self.costs['data_transfer_gb']

        return api_cost + cache_cost + db_cost + data_transfer_cost

# Usage example
planner = ParliamentCapacityPlanner()

# Small application
small_workload = WorkloadProfile(daily_requests=1000)
small_req = planner.calculate_requirements(small_workload)
print(f"Small app: {small_req.api_gateway_replicas} replicas, ${small_req.estimated_monthly_cost_usd:.2f}/month")

# Large application  
large_workload = WorkloadProfile(
    daily_requests=50000,
    peak_multiplier=5.0,
    cache_hit_ratio=0.8,
    expansion_ratio=0.4
)
large_req = planner.calculate_requirements(large_workload)
print(f"Large app: {large_req.api_gateway_replicas} replicas, ${large_req.estimated_monthly_cost_usd:.2f}/month")

Auto-scaling and Elastic Infrastructure

Kubernetes HPA Configuration

# k8s/hpa-parliament-api.yaml - Horizontal Pod Autoscaler
apiVersion: autoscaling/v2
kind: HorizontalPodAutoscaler
metadata:
  name: parliament-api-hpa
spec:
  scaleTargetRef:
    apiVersion: apps/v1
    kind: Deployment
    name: parliament-api-gateway
  minReplicas: 2
  maxReplicas: 20
  metrics:
  # Scale based on CPU utilization
  - type: Resource
    resource:
      name: cpu
      target:
        type: Utilization
        averageUtilization: 70
  # Scale based on memory utilization  
  - type: Resource
    resource:
      name: memory
      target:
        type: Utilization
        averageUtilization: 80
  # Scale based on custom metrics (requests per second)
  - type: Object
    object:
      metric:
        name: parliament_requests_per_second
      target:
        type: AverageValue
        averageValue: "100"
      describedObject:
        apiVersion: v1
        kind: Service
        name: parliament-api-service
  behavior:
    scaleDown:
      stabilizationWindowSeconds: 300  # 5 minutes
      policies:
      - type: Percent
        value: 50
        periodSeconds: 60
    scaleUp:
      stabilizationWindowSeconds: 60   # 1 minute
      policies:
      - type: Percent  
        value: 100
        periodSeconds: 15
      - type: Pods
        value: 4
        periodSeconds: 15
      selectPolicy: Max
---
# VPA for vertical scaling recommendations
apiVersion: autoscaling.k8s.io/v1
kind: VerticalPodAutoscaler  
metadata:
  name: parliament-api-vpa
spec:
  targetRef:
    apiVersion: apps/v1
    kind: Deployment
    name: parliament-api-gateway
  updatePolicy:
    updateMode: "Auto"
  resourcePolicy:
    containerPolicies:
    - containerName: api-gateway
      maxAllowed:
        cpu: "4"
        memory: "8Gi"
      minAllowed:
        cpu: "100m"
        memory: "128Mi"
      controlledResources: ["cpu", "memory"]

Custom Auto-scaling Logic

# auto_scaling.py - Custom auto-scaling controller
import asyncio
import kubernetes
from kubernetes import client, config
import numpy as np
from datetime import datetime, timedelta

class ParliamentAutoScaler:
    def __init__(self):
        config.load_incluster_config()  # Load from pod
        self.v1 = client.AppsV1Api()
        self.custom_api = client.CustomObjectsApi()
        self.metrics_api = client.MetricsV1beta1Api()

        # Scaling parameters
        self.namespace = 'default'
        self.deployment_name = 'parliament-api-gateway'
        self.min_replicas = 2
        self.max_replicas = 50

        # Performance thresholds
        self.cpu_target = 70          # 70% CPU utilization
        self.memory_target = 80       # 80% memory utilization
        self.response_time_target = 500  # 500ms target response time

    async def monitor_and_scale(self):
        """Main monitoring and scaling loop"""
        while True:
            try:
                # Get current metrics
                current_replicas = await self._get_current_replicas()
                cpu_usage = await self._get_cpu_usage()
                memory_usage = await self._get_memory_usage()
                response_time = await self._get_avg_response_time()
                request_rate = await self._get_request_rate()

                # Calculate desired replicas
                desired_replicas = self._calculate_desired_replicas(
                    current_replicas, cpu_usage, memory_usage,
                    response_time, request_rate
                )

                # Apply scaling decision
                if desired_replicas != current_replicas:
                    await self._scale_deployment(desired_replicas)
                    print(f"Scaled from {current_replicas} to {desired_replicas} replicas")

            except Exception as e:
                print(f"Auto-scaling error: {e}")

            await asyncio.sleep(30)  # Check every 30 seconds

    def _calculate_desired_replicas(self, current_replicas: int,
                                  cpu_usage: float, memory_usage: float,
                                  response_time: float, request_rate: float) -> int:
        """Calculate desired replica count based on multiple metrics"""

        scaling_factors = []

        # CPU-based scaling
        if cpu_usage > 0:
            cpu_factor = cpu_usage / self.cpu_target
            scaling_factors.append(cpu_factor)

        # Memory-based scaling
        if memory_usage > 0:
            memory_factor = memory_usage / self.memory_target  
            scaling_factors.append(memory_factor)

        # Response time-based scaling
        if response_time > 0:
            response_factor = response_time / self.response_time_target
            scaling_factors.append(response_factor)

        # Request rate-based scaling (predictive)
        if request_rate > 0:
            # Scale up proactively if request rate is increasing
            rate_factor = request_rate / (current_replicas * 50)  # 50 RPS per replica
            scaling_factors.append(rate_factor)

        # Use the maximum scaling factor
        max_factor = max(scaling_factors) if scaling_factors else 1.0

        # Apply dampening to avoid oscillation
        if max_factor > 1.2:  # Scale up aggressively
            desired = math.ceil(current_replicas * min(max_factor, 2.0))
        elif max_factor < 0.8:  # Scale down conservatively
            desired = max(current_replicas - 1, math.ceil(current_replicas * 0.8))
        else:
            desired = current_replicas  # No change needed

        # Enforce limits
        return max(self.min_replicas, min(desired, self.max_replicas))

    async def _get_current_replicas(self) -> int:
        """Get current number of replicas"""
        deployment = self.v1.read_namespaced_deployment(
            name=self.deployment_name,
            namespace=self.namespace
        )
        return deployment.spec.replicas

    async def _get_cpu_usage(self) -> float:
        """Get average CPU usage percentage"""
        try:
            pod_metrics = self.metrics_api.list_namespaced_pod_metrics(self.namespace)
            cpu_values = []

            for pod in pod_metrics.items:
                if self.deployment_name in pod.metadata.name:
                    for container in pod.containers:
                        cpu_value = container.usage['cpu']
                        # Convert from nanocores to percentage
                        cpu_cores = int(cpu_value.replace('n', '')) / 1_000_000_000
                        cpu_values.append(cpu_cores * 100)  # Assume 1 CPU limit

            return np.mean(cpu_values) if cpu_values else 0
        except Exception:
            return 0

    async def _get_memory_usage(self) -> float:
        """Get average memory usage percentage"""
        try:
            pod_metrics = self.metrics_api.list_namespaced_pod_metrics(self.namespace)
            memory_values = []

            for pod in pod_metrics.items:
                if self.deployment_name in pod.metadata.name:
                    for container in pod.containers:
                        memory_value = container.usage['memory']
                        # Convert from Ki to percentage (assume 1Gi limit)
                        memory_ki = int(memory_value.replace('Ki', ''))
                        memory_percentage = (memory_ki / 1024) / 10.24  # 1Gi = 1024Mi
                        memory_values.append(memory_percentage)

            return np.mean(memory_values) if memory_values else 0
        except Exception:
            return 0

    async def _scale_deployment(self, desired_replicas: int):
        """Scale the deployment to desired replica count"""
        body = {'spec': {'replicas': desired_replicas}}
        self.v1.patch_namespaced_deployment_scale(
            name=self.deployment_name,
            namespace=self.namespace,
            body=body
        )

# Run auto-scaler
scaler = ParliamentAutoScaler()
asyncio.run(scaler.monitor_and_scale())

Cost Optimization for Large-Scale Deployments

Cloud Cost Management

# cost_optimization.py - Cost-aware scaling and resource management
from dataclasses import dataclass
from typing import Dict, List
import boto3
from datetime import datetime, timedelta

@dataclass
class CostOptimizationPlan:
    current_monthly_cost: float
    optimized_monthly_cost: float
    savings: float
    recommendations: List[str]

class ParliamentCostOptimizer:
    def __init__(self):
        # AWS clients
        self.ec2 = boto3.client('ec2')
        self.rds = boto3.client('rds')
        self.elasticache = boto3.client('elasticache')
        self.cloudwatch = boto3.client('cloudwatch')

        # Cost per hour by instance type (example AWS pricing)
        self.instance_costs = {
            't3.micro': 0.0104,
            't3.small': 0.0208,
            't3.medium': 0.0416,
            't3.large': 0.0832,
            'm5.large': 0.096,
            'm5.xlarge': 0.192,
            'm5.2xlarge': 0.384,
            'r5.large': 0.126,
            'r5.xlarge': 0.252
        }

        # Reserved instance discounts
        self.ri_discounts = {
            '1year_no_upfront': 0.25,
            '1year_partial': 0.35,
            '3year_partial': 0.50
        }

    async def analyze_current_costs(self) -> Dict[str, float]:
        """Analyze current infrastructure costs"""
        costs = {}

        # EC2 instances
        ec2_response = self.ec2.describe_instances(
            Filters=[{'Name': 'tag:Application', 'Values': ['parliament-api']}]
        )

        ec2_cost = 0
        for reservation in ec2_response['Reservations']:
            for instance in reservation['Instances']:
                if instance['State']['Name'] == 'running':
                    instance_type = instance['InstanceType']
                    hourly_cost = self.instance_costs.get(instance_type, 0.1)
                    ec2_cost += hourly_cost * 24 * 30  # Monthly cost

        costs['ec2'] = ec2_cost

        # RDS instances
        rds_response = self.rds.describe_db_instances()
        rds_cost = 0
        for db in rds_response['DBInstances']:
            if db['DBInstanceStatus'] == 'available':
                # Simplified cost calculation
                rds_cost += 0.15 * 24 * 30  # $0.15/hour example

        costs['rds'] = rds_cost

        # ElastiCache
        cache_response = self.elasticache.describe_cache_clusters()
        cache_cost = 0
        for cluster in cache_response['CacheClusters']:
            if cluster['CacheClusterStatus'] == 'available':
                cache_cost += 0.05 * 24 * 30  # $0.05/hour example

        costs['cache'] = cache_cost

        return costs

    async def recommend_optimizations(self) -> CostOptimizationPlan:
        """Recommend cost optimizations based on usage patterns"""
        current_costs = await self.analyze_current_costs()
        current_monthly = sum(current_costs.values())

        recommendations = []
        potential_savings = 0

        # Right-sizing recommendations
        oversized_instances = await self._find_oversized_instances()
        for instance_id, recommendation in oversized_instances.items():
            current_cost = self.instance_costs[recommendation['current_type']] * 24 * 30
            new_cost = self.instance_costs[recommendation['recommended_type']] * 24 * 30
            savings = current_cost - new_cost
            potential_savings += savings

            recommendations.append(
                f"Downsize {instance_id} from {recommendation['current_type']} "
                f"to {recommendation['recommended_type']} (${savings:.2f}/month savings)"
            )

        # Reserved Instance recommendations
        ri_savings = current_monthly * self.ri_discounts['1year_partial']
        potential_savings += ri_savings
        recommendations.append(
            f"Purchase 1-year partial upfront RIs (${ri_savings:.2f}/month savings)"
        )

        # Auto-scaling recommendations
        if await self._has_consistent_low_usage():
            autoscale_savings = current_monthly * 0.2  # 20% savings
            potential_savings += autoscale_savings
            recommendations.append(
                f"Implement auto-scaling during off-hours (${autoscale_savings:.2f}/month savings)"
            )

        # Spot instance recommendations for non-critical workloads
        spot_savings = current_costs.get('ec2', 0) * 0.7  # 70% savings on spot
        potential_savings += spot_savings
        recommendations.append(
            f"Use spot instances for batch processing (${spot_savings:.2f}/month savings)"
        )

        return CostOptimizationPlan(
            current_monthly_cost=current_monthly,
            optimized_monthly_cost=current_monthly - potential_savings,
            savings=potential_savings,
            recommendations=recommendations
        )

    async def _find_oversized_instances(self) -> Dict[str, Dict]:
        """Find EC2 instances that are oversized based on CloudWatch metrics"""
        oversized = {}

        # Get CPU utilization for the past 7 days
        end_time = datetime.utcnow()
        start_time = end_time - timedelta(days=7)

        instances = self.ec2.describe_instances()
        for reservation in instances['Reservations']:
            for instance in reservation['Instances']:
                if instance['State']['Name'] != 'running':
                    continue

                instance_id = instance['InstanceId']
                current_type = instance['InstanceType']

                # Get CPU metrics
                cpu_response = self.cloudwatch.get_metric_statistics(
                    Namespace='AWS/EC2',
                    MetricName='CPUUtilization',
                    Dimensions=[{'Name': 'InstanceId', 'Value': instance_id}],
                    StartTime=start_time,
                    EndTime=end_time,
                    Period=3600,
                    Statistics=['Average']
                )

                if cpu_response['Datapoints']:
                    avg_cpu = np.mean([dp['Average'] for dp in cpu_response['Datapoints']])

                    # If consistently low CPU usage, recommend smaller instance
                    if avg_cpu < 20:
                        recommended_type = self._get_smaller_instance_type(current_type)
                        if recommended_type:
                            oversized[instance_id] = {
                                'current_type': current_type,
                                'recommended_type': recommended_type,
                                'avg_cpu': avg_cpu
                            }

        return oversized

    def _get_smaller_instance_type(self, current_type: str) -> str:
        """Get a smaller instance type recommendation"""
        size_progression = {
            't3.large': 't3.medium',
            't3.medium': 't3.small',
            't3.small': 't3.micro',
            'm5.2xlarge': 'm5.xlarge',
            'm5.xlarge': 'm5.large',
            'm5.large': 't3.large',
            'r5.xlarge': 'r5.large',
            'r5.large': 'm5.large'
        }
        return size_progression.get(current_type)

    async def _has_consistent_low_usage(self) -> bool:
        """Check if the application has consistent low usage periods"""
        # Analyze request patterns over the past week
        # This would integrate with your application metrics
        # For demo purposes, return True
        return True

# Usage
optimizer = ParliamentCostOptimizer()
plan = await optimizer.recommend_optimizations()
print(f"Current cost: ${plan.current_monthly_cost:.2f}/month")
print(f"Optimized cost: ${plan.optimized_monthly_cost:.2f}/month")
print(f"Potential savings: ${plan.savings:.2f}/month")

Resource Scheduling and Lifecycle Management

# resource_scheduler.py - Automated resource lifecycle management
import asyncio
from datetime import datetime, time, timezone
import kubernetes
from kubernetes import client

class ParliamentResourceScheduler:
    def __init__(self):
        self.v1 = client.AppsV1Api()
        self.namespace = 'default'

        # Business hours configuration (Copenhagen time)
        self.business_hours = {
            'start': time(8, 0),    # 8 AM
            'end': time(18, 0),     # 6 PM
            'weekdays_only': True
        }

        # Scaling schedules
        self.schedules = {
            'business_hours': {
                'replicas': 5,
                'resources': {'cpu': '1', 'memory': '1Gi'}
            },
            'off_hours': {
                'replicas': 2,
                'resources': {'cpu': '500m', 'memory': '512Mi'}
            },
            'weekend': {
                'replicas': 1,
                'resources': {'cpu': '200m', 'memory': '256Mi'}
            }
        }

    async def run_scheduler(self):
        """Main scheduling loop"""
        while True:
            current_schedule = self._get_current_schedule()
            await self._apply_schedule(current_schedule)

            # Check every 15 minutes
            await asyncio.sleep(900)

    def _get_current_schedule(self) -> str:
        """Determine current schedule based on time"""
        now = datetime.now(timezone.utc)
        copenhagen_time = now.astimezone(timezone.utc)  # Simplified

        # Weekend check
        if copenhagen_time.weekday() >= 5:  # Saturday = 5, Sunday = 6
            return 'weekend'

        # Business hours check
        current_time = copenhagen_time.time()
        if (self.business_hours['start'] <= current_time <= self.business_hours['end']):
            return 'business_hours'
        else:
            return 'off_hours'

    async def _apply_schedule(self, schedule_name: str):
        """Apply the specified schedule"""
        schedule = self.schedules[schedule_name]

        # Update replica count
        await self._scale_deployment('parliament-api-gateway', schedule['replicas'])

        # Update resource requests/limits (requires deployment update)
        await self._update_resource_limits('parliament-api-gateway', schedule['resources'])

    async def _scale_deployment(self, deployment_name: str, replicas: int):
        """Scale deployment to specified replica count"""
        try:
            current_deployment = self.v1.read_namespaced_deployment(
                name=deployment_name, namespace=self.namespace
            )

            if current_deployment.spec.replicas != replicas:
                body = {'spec': {'replicas': replicas}}
                self.v1.patch_namespaced_deployment_scale(
                    name=deployment_name,
                    namespace=self.namespace,
                    body=body
                )
                print(f"Scaled {deployment_name} to {replicas} replicas")
        except Exception as e:
            print(f"Failed to scale {deployment_name}: {e}")

    async def _update_resource_limits(self, deployment_name: str, resources: Dict[str, str]):
        """Update resource requests and limits"""
        try:
            deployment = self.v1.read_namespaced_deployment(
                name=deployment_name, namespace=self.namespace
            )

            # Update container resources
            container = deployment.spec.template.spec.containers[0]
            container.resources.requests = resources
            container.resources.limits = resources

            # Apply the update
            self.v1.patch_namespaced_deployment(
                name=deployment_name,
                namespace=self.namespace,
                body=deployment
            )
            print(f"Updated {deployment_name} resources: {resources}")

        except Exception as e:
            print(f"Failed to update resources for {deployment_name}: {e}")

# Start the scheduler
scheduler = ParliamentResourceScheduler()
asyncio.run(scheduler.run_scheduler())

Summary

Building scalable applications for the Danish Parliamentary API requires understanding its performance characteristics and implementing appropriate scaling strategies:

Key Performance Insights

  • Response Times: 85ms to 2.1s depending on query complexity
  • No Rate Limiting: Excellent stability under concurrent load
  • 100-record Pagination: Plan for multiple requests for large datasets
  • Expansion Overhead: 50-100% cost but eliminates multiple API calls

Scaling Best Practices

  1. Implement aggressive caching with Redis clustering
  2. Use connection pooling optimized for the API's characteristics
  3. Deploy horizontal scaling with proper load balancing
  4. Monitor key metrics for proactive scaling decisions
  5. Optimize query patterns based on entity relationships
  6. Plan for peak traffic with auto-scaling policies
  7. Manage costs with scheduled scaling and right-sizing

Production Readiness

The Danish Parliament API's excellent performance characteristics make it well-suited for large-scale applications. With proper scaling architecture and monitoring, you can build robust systems that handle parliamentary data processing efficiently at any scale.

For specific implementation guidance, refer to the related documentation: - Query Optimization for efficient OData patterns - Caching for cache implementation strategies
- Monitoring for comprehensive observability