Advanced API Integration and Automation with Qwen AI Image Editor
Build scalable, production-ready image editing workflows with comprehensive API integration patterns and automation strategies.
Welcome to the definitive guide for developers looking to integrate Qwen AI Image Editor into their applications and workflows. Building on our previous articles covering quick start and advanced techniques, this guide focuses specifically on programmatic access, automation, and enterprise-level deployment strategies.
Whether you’re building a SaaS application, automating content creation pipelines, or integrating AI image editing into existing systems, this comprehensive resource provides the technical depth and practical examples needed for successful implementation.
Why API Integration Matters
The true power of Qwen AI Image Editor emerges when you move beyond manual editing to automated, scalable workflows. API integration enables:
- Batch Processing: Handle hundreds or thousands of images automatically
- Workflow Automation: Integrate image editing into existing business processes
- Custom Applications: Build specialized tools for specific use cases
- Enterprise Integration: Connect with CMS, DAM, and other enterprise systems
- Cost Optimization: Reduce manual labor and improve processing efficiency
This guide assumes you have development experience and are familiar with REST APIs, basic authentication concepts, and at least one programming language (Python, JavaScript/Node.js, or similar).
Understanding the Qwen AI Image Editor API Architecture
Before diving into implementation, it’s crucial to understand the API’s architecture and design principles.
Core Components
The Qwen AI Image Editor API consists of several interconnected components:
API Gateway: Handles incoming requests, authentication, and rate limiting Processing Engine: Manages image editing tasks and queue processing Storage Layer: Handles temporary and persistent file storage Monitoring System: Tracks usage, performance, and errors
Request-Response Flow
Client Application → API Gateway → Authentication → Queue → Processing Engine → Storage → Response
This architecture ensures:
- Scalability: Horizontal scaling of processing resources
- Reliability: Queue-based processing prevents request loss
- Performance: Asynchronous processing for large files
- Monitoring: Comprehensive tracking and alerting
Key Design Principles
- RESTful Design: Standard HTTP methods and status codes
- JSON Responses: Consistent, machine-readable output format
- Async Processing: Long-running tasks use job queues
- Idempotent Operations: Safe retry mechanisms for failed requests
- Comprehensive Error Handling: Detailed error codes and messages
Authentication and API Access
Proper authentication is fundamental for secure API integration. Qwen AI Image Editor offers multiple authentication methods to suit different use cases.
API Key Authentication
The most straightforward method for most applications:
import requests
# API key from your Qwen AI dashboard
API_KEY = "your_api_key_here"
BASE_URL = "https://api.qwen.ai/v1"
headers = {
"Authorization": f"Bearer {API_KEY}",
"Content-Type": "application/json"
}
def test_authentication():
"""Test API key validity"""
response = requests.get(f"{BASE_URL}/auth/validate", headers=headers)
return response.status_code == 200
if test_authentication():
print("Authentication successful!")
else:
print("Authentication failed - check your API key")
OAuth 2.0 Integration
For applications requiring user-specific access:
const express = require('express');
const axios = require('axios');
const app = express();
const CLIENT_ID = 'your_client_id';
const CLIENT_SECRET = 'your_client_secret';
const REDIRECT_URI = 'https://your-app.com/callback';
// OAuth 2.0 authorization flow
app.get('/auth/qwen', (req, res) => {
const authUrl = `https://qwen.ai/oauth/authorize?` +
`client_id=${CLIENT_ID}&` +
`redirect_uri=${REDIRECT_URI}&` +
`response_type=code&` +
`scope=image_edit`;
res.redirect(authUrl);
});
// Handle OAuth callback
app.get('/callback', async (req, res) => {
const { code } = req.query;
try {
const response = await axios.post('https://qwen.ai/oauth/token', {
grant_type: 'authorization_code',
client_id: CLIENT_ID,
client_secret: CLIENT_SECRET,
code: code,
redirect_uri: REDIRECT_URI
});
const { access_token } = response.data;
// Store access_token securely for future API calls
res.json({ success: true, access_token });
} catch (error) {
res.status(500).json({ error: 'Authentication failed' });
}
});
Managing API Keys Securely
Never hardcode API keys in your application. Use environment variables or secure configuration management:
import os
from dotenv import load_dotenv
# Load environment variables from .env file
load_dotenv()
class QwenAPIConfig:
def __init__(self):
self.api_key = os.getenv('QWEN_API_KEY')
self.base_url = os.getenv('QWEN_API_URL', 'https://api.qwen.ai/v1')
self.timeout = int(os.getenv('QWEN_API_TIMEOUT', '300'))
if not self.api_key:
raise ValueError("QWEN_API_KEY environment variable is required")
# Usage in your application
config = QwenAPIConfig()
api_client = QwenAPIClient(config)
Rate Limiting and Quota Management
Understanding and working within rate limits is crucial for production applications:
import time
from functools import wraps
from datetime import datetime, timedelta
class RateLimiter:
def __init__(self, max_requests_per_minute=60):
self.max_requests = max_requests_per_minute
self.requests = []
def wait_if_needed(self):
"""Wait if rate limit would be exceeded"""
now = datetime.now()
one_minute_ago = now - timedelta(minutes=1)
# Remove requests older than 1 minute
self.requests = [req_time for req_time in self.requests if req_time > one_minute_ago]
if len(self.requests) >= self.max_requests:
# Calculate wait time
oldest_request = min(self.requests)
wait_time = (oldest_request + timedelta(minutes=1)) - now
if wait_time.total_seconds() > 0:
print(f"Rate limit reached. Waiting {wait_time.total_seconds():.2f} seconds...")
time.sleep(wait_time.total_seconds())
def record_request(self):
"""Record a new request"""
self.requests.append(datetime.now())
def rate_limit(max_requests_per_minute=60):
"""Decorator for rate limiting API calls"""
def decorator(func):
limiter = RateLimiter(max_requests_per_minute)
@wraps(func)
def wrapper(*args, **kwargs):
limiter.wait_if_needed()
result = func(*args, **kwargs)
limiter.record_request()
return result
return wrapper
return decorator
# Usage
@rate_limit(max_requests_per_minute=60)
def edit_image(image_data, prompt):
# Your API call here
pass
Core API Endpoints and Implementation
The Qwen AI Image Editor API provides comprehensive endpoints for all image editing operations. Let’s explore the most important ones with practical implementations.
Image Upload and Preparation
Before editing, images must be uploaded and prepared:
import base64
import requests
from PIL import Image
import io
class QwenImageUploader:
def __init__(self, api_key, base_url="https://api.qwen.ai/v1"):
self.api_key = api_key
self.base_url = base_url
self.headers = {
"Authorization": f"Bearer {api_key}",
"Content-Type": "application/json"
}
def upload_image_from_file(self, file_path):
"""Upload image from local file"""
with open(file_path, 'rb') as image_file:
image_data = image_file.read()
return self.upload_image_data(image_data, file_path)
def upload_image_from_url(self, image_url):
"""Upload image from URL"""
response = requests.get(image_url)
response.raise_for_status()
return self.upload_image_data(response.content, image_url)
def upload_image_data(self, image_data, source_name="unknown"):
"""Upload raw image data"""
# Validate image
try:
image = Image.open(io.BytesIO(image_data))
# Convert to RGB if necessary
if image.mode != 'RGB':
image = image.convert('RGB')
# Resize if too large (API limit is typically 10MB)
max_size = (2048, 2048)
image.thumbnail(max_size, Image.Resampling.LANCZOS)
# Save to bytes
img_buffer = io.BytesIO()
image.save(img_buffer, format='JPEG', quality=90)
img_buffer.seek(0)
# Encode to base64
base64_image = base64.b64encode(img_buffer.getvalue()).decode('utf-8')
except Exception as e:
raise ValueError(f"Invalid image data: {e}")
# Upload to API
upload_data = {
"image": base64_image,
"filename": f"upload_{int(time.time())}.jpg",
"source": source_name
}
response = requests.post(
f"{self.base_url}/images/upload",
headers=self.headers,
json=upload_data
)
if response.status_code == 200:
return response.json() # Returns image_id and temporary_url
else:
raise Exception(f"Upload failed: {response.text}")
# Usage example
uploader = QwenImageUploader("your_api_key")
result = uploader.upload_image_from_file("path/to/your/image.jpg")
image_id = result['image_id']
print(f"Image uploaded successfully with ID: {image_id}")
Image Editing Operations
Core editing functionality with comprehensive parameter support:
class QwenImageEditor:
def __init__(self, api_key, base_url="https://api.qwen.ai/v1"):
self.api_key = api_key
self.base_url = base_url
self.headers = {
"Authorization": f"Bearer {api_key}",
"Content-Type": "application/json"
}
def edit_image(self, image_id, prompt, **kwargs):
"""Perform image editing with comprehensive parameters"""
edit_params = {
"image_id": image_id,
"prompt": prompt,
# Core parameters
"num_inference_steps": kwargs.get('steps', 50),
"guidance_scale": kwargs.get('guidance_scale', 7.5),
"negative_prompt": kwargs.get('negative_prompt', ""),
"seed": kwargs.get('seed', -1), # -1 for random
# Output parameters
"width": kwargs.get('width', 512),
"height": kwargs.get('height', 512),
"output_format": kwargs.get('format', 'jpeg'),
"quality": kwargs.get('quality', 90),
# Advanced parameters
"true_cfg_scale": kwargs.get('true_cfg_scale', 4.0),
"controlnet_conditioning_scale": kwargs.get('controlnet_scale', 1.0),
# Processing options
"async": kwargs.get('async', False), # Async processing for large images
"webhook_url": kwargs.get('webhook_url'), # Callback URL for async jobs
}
response = requests.post(
f"{self.base_url}/images/edit",
headers=self.headers,
json=edit_params
)
if response.status_code == 200:
return response.json()
else:
raise Exception(f"Edit failed: {response.text}")
def edit_image_sync(self, image_id, prompt, **kwargs):
"""Synchronous image editing with timeout handling"""
result = self.edit_image(image_id, prompt, async=False, **kwargs)
if result.get('status') == 'completed':
return result
elif result.get('status') == 'processing':
# Poll for completion (with timeout)
job_id = result['job_id']
return self._poll_job_completion(job_id, timeout=300)
else:
raise Exception(f"Edit failed with status: {result.get('status')}")
def _poll_job_completion(self, job_id, timeout=300):
"""Poll async job until completion or timeout"""
start_time = time.time()
while time.time() - start_time < timeout:
response = requests.get(
f"{self.base_url}/jobs/{job_id}",
headers=self.headers
)
if response.status_code == 200:
job_status = response.json()
if job_status['status'] == 'completed':
return job_status
elif job_status['status'] == 'failed':
raise Exception(f"Job failed: {job_status.get('error', 'Unknown error')}")
# Wait before next poll
time.sleep(2)
else:
raise Exception(f"Failed to check job status: {response.text}")
raise TimeoutError(f"Job {job_id} did not complete within {timeout} seconds")
# Usage examples
editor = QwenImageEditor("your_api_key")
# Basic edit
result = editor.edit_image_sync(
image_id="your_image_id",
prompt="Convert to cyberpunk style with neon lighting",
steps=50,
guidance_scale=7.5
)
# Advanced edit with custom parameters
result = editor.edit_image_sync(
image_id="your_image_id",
prompt="Replace background with mountain landscape, keep subject unchanged",
negative_prompt="blurry, low quality, distorted",
width=1024,
height=1024,
quality=95,
true_cfg_scale=5.0
)
print(f"Edit completed! Result URL: {result.get('output_url')}")
Batch Processing Operations
Process multiple images efficiently:
import concurrent.futures
from typing import List, Dict, Any
class QwenBatchProcessor:
def __init__(self, api_key, max_workers=5):
self.api_key = api_key
self.editor = QwenImageEditor(api_key)
self.max_workers = max_workers
def process_batch(self, image_jobs: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
"""Process multiple images in parallel"""
def process_single_job(job):
try:
image_id = job['image_id']
prompt = job['prompt']
params = job.get('params', {})
result = self.editor.edit_image_sync(image_id, prompt, **params)
return {
'success': True,
'image_id': image_id,
'result': result,
'job_data': job
}
except Exception as e:
return {
'success': False,
'image_id': job.get('image_id', 'unknown'),
'error': str(e),
'job_data': job
}
# Process jobs in parallel
with concurrent.futures.ThreadPoolExecutor(max_workers=self.max_workers) as executor:
results = list(executor.map(process_single_job, image_jobs))
return results
def process_batch_with_progress(self, image_jobs: List[Dict[str, Any]], callback=None):
"""Process batch with progress tracking"""
total_jobs = len(image_jobs)
completed_jobs = 0
results = []
def process_with_callback(job):
nonlocal completed_jobs
result = process_single_job(job)
completed_jobs += 1
if callback:
callback(completed_jobs, total_jobs, result)
return result
with concurrent.futures.ThreadPoolExecutor(max_workers=self.max_workers) as executor:
results = list(executor.map(process_with_callback, image_jobs))
return results
# Usage example
def progress_callback(completed, total, result):
progress = (completed / total) * 100
status = "✅" if result['success'] else "❌"
print(f"{status} Progress: {progress:.1f}% ({completed}/{total}) - Image: {result['image_id']}")
batch_processor = QwenBatchProcessor("your_api_key", max_workers=3)
# Define batch jobs
batch_jobs = [
{
'image_id': 'img_001',
'prompt': 'Convert to black and white',
'params': {'steps': 30}
},
{
'image_id': 'img_002',
'prompt': 'Add vintage film effect',
'params': {'steps': 40, 'quality': 85}
},
{
'image_id': 'img_003',
'prompt': 'Enhance colors and saturation',
'params': {'steps': 50, 'guidance_scale': 8.0}
}
]
# Process with progress tracking
results = batch_processor.process_batch_with_progress(batch_jobs, progress_callback)
# Summary
successful = sum(1 for r in results if r['success'])
failed = len(results) - successful
print(f"\nBatch processing completed!")
print(f"Successful: {successful}, Failed: {failed}")
# Save failed jobs for retry
failed_jobs = [r['job_data'] for r in results if not r['success']]
if failed_jobs:
print(f"Retrying {len(failed_jobs)} failed jobs...")
retry_results = batch_processor.process_batch(failed_jobs)
Advanced Workflow Automation
Build sophisticated automation workflows that integrate with your existing systems.
Queue-Based Processing with Redis
For high-volume, reliable processing:
import redis
import json
import pickle
from dataclasses import dataclass
from typing import Optional
import logging
# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
@dataclass
class ImageEditJob:
job_id: str
image_id: str
prompt: str
params: dict
priority: int = 5 # 1-10, lower numbers = higher priority
retries: int = 0
max_retries: int = 3
created_at: float = None
webhook_url: Optional[str] = None
def __post_init__(self):
if self.created_at is None:
self.created_at = time.time()
class QwenWorkflowQueue:
def __init__(self, redis_url="redis://localhost:6379", api_key=None):
self.redis_client = redis.from_url(redis_url)
self.api_key = api_key
self.editor = QwenImageEditor(api_key) if api_key else None
# Queue names
self.queues = {
'high_priority': 'qwen:queue:high',
'normal_priority': 'qwen:queue:normal',
'low_priority': 'qwen:queue:low'
}
# Result storage
self.results_key = 'qwen:results'
def submit_job(self, job: ImageEditJob) -> str:
"""Submit job to appropriate priority queue"""
queue_name = self._get_queue_name(job.priority)
# Serialize job
job_data = pickle.dumps(job)
# Add to queue
self.redis_client.lpush(queue_name, job_data)
logger.info(f"Submitted job {job.job_id} to {queue_name}")
return job.job_id
def _get_queue_name(self, priority: int) -> str:
"""Get queue name based on priority"""
if priority <= 3:
return self.queues['high_priority']
elif priority <= 7:
return self.queues['normal_priority']
else:
return self.queues['low_priority']
def get_next_job(self, timeout=30) -> Optional[ImageEditJob]:
"""Get next job from queues (priority order)"""
for queue_name in [self.queues['high_priority'],
self.queues['normal_priority'],
self.queues['low_priority']]:
# Blocking pop with timeout
result = self.redis_client.brpop(queue_name, timeout=timeout)
if result:
_, job_data = result
job = pickle.loads(job_data)
logger.info(f"Retrieved job {job.job_id} from {queue_name}")
return job
return None
def process_job(self, job: ImageEditJob) -> dict:
"""Process a single job"""
try:
logger.info(f"Processing job {job.job_id}")
result = self.editor.edit_image_sync(
image_id=job.image_id,
prompt=job.prompt,
**job.params
)
# Mark as successful
job_result = {
'job_id': job.job_id,
'status': 'completed',
'result': result,
'processed_at': time.time()
}
# Store result
self.redis_client.hset(
self.results_key,
job.job_id,
json.dumps(job_result)
)
# Send webhook if specified
if job.webhook_url:
self._send_webhook(job.webhook_url, job_result)
logger.info(f"Successfully completed job {job.job_id}")
return job_result
except Exception as e:
logger.error(f"Failed to process job {job.job_id}: {e}")
# Handle retries
if job.retries < job.max_retries:
job.retries += 1
logger.info(f"Retrying job {job.job_id} (attempt {job.retries}/{job.max_retries})")
# Re-queue with lower priority
job.priority = min(job.priority + 1, 10)
self.submit_job(job)
return {
'job_id': job.job_id,
'status': 'retrying',
'retry_count': job.retries,
'error': str(e)
}
else:
# Mark as failed
job_result = {
'job_id': job.job_id,
'status': 'failed',
'error': str(e),
'failed_at': time.time(),
'retry_count': job.retries
}
self.redis_client.hset(
self.results_key,
job.job_id,
json.dumps(job_result)
)
# Send webhook for failure
if job.webhook_url:
self._send_webhook(job.webhook_url, job_result)
return job_result
def _send_webhook(self, webhook_url: str, data: dict):
"""Send webhook notification"""
try:
response = requests.post(webhook_url, json=data, timeout=10)
response.raise_for_status()
logger.info(f"Webhook sent successfully to {webhook_url}")
except Exception as e:
logger.error(f"Failed to send webhook to {webhook_url}: {e}")
def get_job_result(self, job_id: str) -> Optional[dict]:
"""Get result for a specific job"""
result_data = self.redis_client.hget(self.results_key, job_id)
if result_data:
return json.loads(result_data)
return None
def clear_old_results(self, max_age_hours=24):
"""Clean up old results"""
cutoff_time = time.time() - (max_age_hours * 3600)
# Get all results
all_results = self.redis_client.hgetall(self.results_key)
for job_id, result_data in all_results.items():
try:
result = json.loads(result_data)
# Check age
processed_at = result.get('processed_at') or result.get('failed_at', 0)
if processed_at < cutoff_time:
self.redis_client.hdel(self.results_key, job_id)
logger.info(f"Cleared old result for job {job_id}")
except (json.JSONDecodeError, KeyError) as e:
# Remove malformed entries
self.redis_client.hdel(self.results_key, job_id)
logger.warning(f"Removed malformed result for job {job_id}: {e}")
# Worker process
def worker_process(redis_url="redis://localhost:6379", api_key="your_api_key"):
"""Worker process for processing queue jobs"""
queue = QwenWorkflowQueue(redis_url, api_key)
logger.info("Worker started, waiting for jobs...")
while True:
try:
job = queue.get_next_job(timeout=60)
if job:
queue.process_job(job)
else:
logger.debug("No jobs available, continuing...")
except KeyboardInterrupt:
logger.info("Worker stopped by user")
break
except Exception as e:
logger.error(f"Worker error: {e}")
time.sleep(5) # Prevent rapid error loops
# Usage example
if __name__ == "__main__":
# Initialize queue
queue = QwenWorkflowQueue(redis_url="redis://localhost:6379", api_key="your_api_key")
# Submit some jobs
jobs = [
ImageEditJob(
job_id="job_001",
image_id="img_001",
prompt="Convert to vintage style",
params={"steps": 50},
priority=2, # High priority
webhook_url="https://your-app.com/webhook"
),
ImageEditJob(
job_id="job_002",
image_id="img_002",
prompt="Add dramatic lighting",
params={"steps": 40},
priority=5 # Normal priority
)
]
for job in jobs:
queue.submit_job(job)
# Start worker (in production, run as separate process/service)
worker_process()
Serverless Implementation with AWS Lambda
Deploy scalable, cost-effective processing:
import json
import boto3
import os
import base64
import tempfile
from urllib.parse import unquote_plus
# Lambda handler for AWS
def lambda_handler(event, context):
"""AWS Lambda handler for Qwen image editing"""
# Initialize clients
s3_client = boto3.client('s3')
api_key = os.getenv('QWEN_API_KEY')
if not api_key:
return {
'statusCode': 500,
'body': json.dumps({'error': 'QWEN_API_KEY not configured'})
}
try:
# Parse event (expected from S3 trigger or direct invocation)
if event.get('Records'):
# S3 trigger event
return handle_s3_event(event, s3_client, api_key)
else:
# Direct invocation
return handle_direct_invocation(event, s3_client, api_key)
except Exception as e:
logger.error(f"Lambda execution failed: {e}")
return {
'statusCode': 500,
'body': json.dumps({'error': str(e)})
}
def handle_s3_event(event, s3_client, api_key):
"""Handle S3 trigger event"""
results = []
for record in event['Records']:
s3_info = record['s3']
bucket_name = s3_info['bucket']['name']
object_key = unquote_plus(s3_info['object']['key'])
try:
# Download image from S3
with tempfile.NamedTemporaryFile() as temp_file:
s3_client.download_file(bucket_name, object_key, temp_file.name)
# Upload to Qwen API
uploader = QwenImageUploader(api_key)
upload_result = uploader.upload_image_from_file(temp_file.name)
qwen_image_id = upload_result['image_id']
# Process image (you'd get prompt from event metadata)
prompt = extract_prompt_from_metadata(bucket_name, object_key)
editor = QwenImageEditor(api_key)
edit_result = editor.edit_image_sync(
image_id=qwen_image_id,
prompt=prompt
)
# Download result and upload back to S3
output_key = f"processed/{object_key}"
if edit_result.get('output_url'):
# Download processed image
response = requests.get(edit_result['output_url'])
response.raise_for_status()
# Upload to S3
s3_client.put_object(
Bucket=bucket_name,
Key=output_key,
Body=response.content,
ContentType='image/jpeg'
)
results.append({
'original': object_key,
'processed': output_key,
'status': 'success'
})
except Exception as e:
logger.error(f"Failed to process {object_key}: {e}")
results.append({
'original': object_key,
'error': str(e),
'status': 'failed'
})
return {
'statusCode': 200,
'body': json.dumps({
'processed': len([r for r in results if r['status'] == 'success']),
'failed': len([r for r in results if r['status'] == 'failed']),
'results': results
})
}
def handle_direct_invocation(event, s3_client, api_key):
"""Handle direct Lambda invocation"""
# Expected event structure
image_url = event.get('image_url')
prompt = event.get('prompt')
output_bucket = event.get('output_bucket')
output_key = event.get('output_key')
if not all([image_url, prompt, output_bucket, output_key]):
return {
'statusCode': 400,
'body': json.dumps({'error': 'Missing required parameters'})
}
try:
# Download image from URL
response = requests.get(image_url)
response.raise_for_status()
with tempfile.NamedTemporaryFile() as temp_file:
temp_file.write(response.content)
temp_file.flush()
# Upload to Qwen API
uploader = QwenImageUploader(api_key)
upload_result = uploader.upload_image_from_file(temp_file.name)
qwen_image_id = upload_result['image_id']
# Process image
editor = QwenImageEditor(api_key)
edit_result = editor.edit_image_sync(
image_id=qwen_image_id,
prompt=prompt
)
# Download and upload result
if edit_result.get('output_url'):
result_response = requests.get(edit_result['output_url'])
result_response.raise_for_status()
s3_client.put_object(
Bucket=output_bucket,
Key=output_key,
Body=result_response.content,
ContentType='image/jpeg'
)
return {
'statusCode': 200,
'body': json.dumps({
'status': 'success',
'output_url': f"s3://{output_bucket}/{output_key}",
'processing_time': edit_result.get('processing_time')
})
}
else:
return {
'statusCode': 500,
'body': json.dumps({'error': 'No output URL received from Qwen API'})
}
except Exception as e:
return {
'statusCode': 500,
'body': json.dumps({'error': str(e)})
}
# serverless.yml configuration
serverless_config = """
service: qwen-image-processor
provider:
name: aws
runtime: python3.9
region: us-east-1
timeout: 300 # 5 minutes
memorySize: 1024
environment:
QWEN_API_KEY: ${ssm:/qwen/api_key}
functions:
processImage:
handler: lambda_function.lambda_handler
events:
- s3:
bucket: your-input-bucket
event: s3:ObjectCreated:*
existing: true
layers:
- ${cf:common-layers-stack.Outputs.PythonRequirementsLayerArn}
plugins:
- serverless-python-requirements
custom:
pythonRequirements:
layer: true
"""
# deployment script
def deploy_lambda():
"""Deploy Lambda function"""
import subprocess
# Install Serverless Framework
subprocess.run(["npm", "install", "-g", "serverless"], check=True)
# Deploy
subprocess.run(["serverless", "deploy"], check=True)
print("Lambda deployment completed!")
Production Deployment and Monitoring
Deploy your Qwen AI integration to production with proper monitoring and scaling strategies.
Docker Containerization
Containerize your application for consistent deployment:
# Dockerfile
FROM python:3.9-slim
# Set working directory
WORKDIR /app
# Install system dependencies
RUN apt-get update && apt-get install -y \
build-essential \
libpq-dev \
&& rm -rf /var/lib/apt/lists/*
# Copy requirements
COPY requirements.txt .
# Install Python dependencies
RUN pip install --no-cache-dir -r requirements.txt
# Copy application code
COPY . .
# Create non-root user
RUN useradd -m -u 1000 appuser && chown -R appuser:appuser /app
USER appuser
# Health check
HEALTHCHECK --interval=30s --timeout=10s --start-period=5s --retries=3 \
CMD python health_check.py
# Expose port
EXPOSE 8000
# Start application
CMD ["python", "app.py"]
# docker-compose.yml
version: '3.8'
services:
qwen-api-service:
build: .
ports:
- "8000:8000"
environment:
- QWEN_API_KEY=${QWEN_API_KEY}
- REDIS_URL=redis://redis:6379
- DATABASE_URL=postgresql://user:password@postgres:5432/qwen_db
depends_on:
- redis
- postgres
volumes:
- ./logs:/app/logs
- ./uploads:/app/uploads
restart: unless-stopped
deploy:
replicas: 3
resources:
limits:
cpus: '1.0'
memory: 2G
reservations:
cpus: '0.5'
memory: 1G
redis:
image: redis:7-alpine
ports:
- "6379:6379"
volumes:
- redis_data:/data
restart: unless-stopped
postgres:
image: postgres:15-alpine
environment:
- POSTGRES_USER=user
- POSTGRES_PASSWORD=password
- POSTGRES_DB=qwen_db
volumes:
- postgres_data:/var/lib/postgresql/data
- ./init.sql:/docker-entrypoint-initdb.d/init.sql
restart: unless-stopped
nginx:
image: nginx:alpine
ports:
- "80:80"
- "443:443"
volumes:
- ./nginx.conf:/etc/nginx/nginx.conf
- ./ssl:/etc/nginx/ssl
depends_on:
- qwen-api-service
restart: unless-stopped
volumes:
redis_data:
postgres_data:
Kubernetes Deployment
Scale your application with Kubernetes:
# k8s-deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
name: qwen-api-service
labels:
app: qwen-api-service
spec:
replicas: 5
selector:
matchLabels:
app: qwen-api-service
template:
metadata:
labels:
app: qwen-api-service
spec:
containers:
- name: qwen-api
image: your-registry/qwen-api-service:latest
ports:
- containerPort: 8000
env:
- name: QWEN_API_KEY
valueFrom:
secretKeyRef:
name: qwen-secrets
key: api-key
- name: REDIS_URL
value: "redis://redis-service:6379"
- name: DATABASE_URL
valueFrom:
secretKeyRef:
name: db-secrets
key: database-url
resources:
requests:
memory: "1Gi"
cpu: "500m"
limits:
memory: "2Gi"
cpu: "1000m"
livenessProbe:
httpGet:
path: /health
port: 8000
initialDelaySeconds: 30
periodSeconds: 10
readinessProbe:
httpGet:
path: /ready
port: 8000
initialDelaySeconds: 5
periodSeconds: 5
volumeMounts:
- name: uploads
mountPath: /app/uploads
volumes:
- name: uploads
persistentVolumeClaim:
claimName: uploads-pvc
---
apiVersion: v1
kind: Service
metadata:
name: qwen-api-service
spec:
selector:
app: qwen-api-service
ports:
- protocol: TCP
port: 80
targetPort: 8000
type: ClusterIP
---
apiVersion: autoscaling/v2
kind: HorizontalPodAutoscaler
metadata:
name: qwen-api-hpa
spec:
scaleTargetRef:
apiVersion: apps/v1
kind: Deployment
name: qwen-api-service
minReplicas: 3
maxReplicas: 20
metrics:
- type: Resource
resource:
name: cpu
target:
type: Utilization
averageUtilization: 70
- type: Resource
resource:
name: memory
target:
type: Utilization
averageUtilization: 80
Monitoring and Logging
Implement comprehensive monitoring:
import time
import logging
import psutil
from prometheus_client import Counter, Histogram, Gauge, start_http_server
from functools import wraps
# Prometheus metrics
REQUEST_COUNT = Counter('qwen_api_requests_total', 'Total API requests', ['method', 'endpoint', 'status'])
REQUEST_DURATION = Histogram('qwen_api_request_duration_seconds', 'API request duration')
ACTIVE_JOBS = Gauge('qwen_active_jobs', 'Number of active processing jobs')
SYSTEM_MEMORY = Gauge('system_memory_usage_bytes', 'System memory usage')
SYSTEM_CPU = Gauge('system_cpu_usage_percent', 'System CPU usage')
class QwenAPIMonitor:
def __init__(self):
self.logger = logging.getLogger(__name__)
self.setup_logging()
self.setup_metrics()
def setup_logging(self):
"""Configure structured logging"""
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler('/app/logs/api.log'),
logging.StreamHandler()
]
)
# Add JSON formatter for structured logging
formatter = logging.Formatter(
'%(asctime)s %(name)s %(levelname)s %(message)s'
)
# Add log handler for monitoring systems
monitoring_handler = logging.StreamHandler()
monitoring_handler.setFormatter(formatter)
self.logger.addHandler(monitoring_handler)
def setup_metrics(self):
"""Start Prometheus metrics server"""
start_http_server(8001)
self.logger.info("Prometheus metrics server started on port 8001")
def track_system_metrics(self):
"""Track system resource usage"""
# Memory usage
memory = psutil.virtual_memory()
SYSTEM_MEMORY.set(memory.used)
# CPU usage
cpu_percent = psutil.cpu_percent(interval=1)
SYSTEM_CPU.set(cpu_percent)
def log_request(self, method, endpoint, status, duration, user_id=None):
"""Log API request with metrics"""
REQUEST_COUNT.labels(method=method, endpoint=endpoint, status=status).inc()
REQUEST_DURATION.observe(duration)
log_data = {
'method': method,
'endpoint': endpoint,
'status': status,
'duration': duration,
'timestamp': time.time()
}
if user_id:
log_data['user_id'] = user_id
self.logger.info(f"API Request: {json.dumps(log_data)}")
def monitor_performance(func):
"""Decorator to monitor function performance"""
@wraps(func)
def wrapper(*args, **kwargs):
start_time = time.time()
try:
result = func(*args, **kwargs)
duration = time.time() - start_time
logging.info(f"{func.__name__} completed in {duration:.2f}s")
return result
except Exception as e:
duration = time.time() - start_time
logging.error(f"{func.__name__} failed after {duration:.2f}s: {e}")
raise
return wrapper
# Application monitoring
class QwenApplicationMonitor:
def __init__(self):
self.monitor = QwenAPIMonitor()
self.health_status = True
def health_check(self):
"""Comprehensive health check"""
checks = {
'api_connection': self._check_api_connection(),
'database_connection': self._check_database_connection(),
'redis_connection': self._check_redis_connection(),
'disk_space': self._check_disk_space(),
'memory_usage': self._check_memory_usage()
}
all_healthy = all(checks.values())
return {
'status': 'healthy' if all_healthy else 'unhealthy',
'checks': checks,
'timestamp': time.time()
}
def _check_api_connection(self):
"""Check Qwen API connection"""
try:
# Test API authentication
response = requests.get(
"https://api.qwen.ai/v1/auth/validate",
headers={"Authorization": f"Bearer {os.getenv('QWEN_API_KEY')}"},
timeout=10
)
return response.status_code == 200
except:
return False
def _check_database_connection(self):
"""Check database connection"""
try:
# Implement database health check
return True # Placeholder
except:
return False
def _check_redis_connection(self):
"""Check Redis connection"""
try:
import redis
client = redis.from_url(os.getenv('REDIS_URL', 'redis://localhost:6379'))
client.ping()
return True
except:
return False
def _check_disk_space(self):
"""Check available disk space"""
disk = psutil.disk_usage('/')
free_percent = (disk.free / disk.total) * 100
return free_percent > 10 # At least 10% free
def _check_memory_usage(self):
"""Check memory usage"""
memory = psutil.virtual_memory()
return memory.percent < 90 # Less than 90% usage
# Usage in Flask application
from flask import Flask, jsonify
app = Flask(__name__)
monitor = QwenApplicationMonitor()
@app.route('/health')
def health_check():
return jsonify(monitor.health_check())
@app.route('/metrics')
def metrics():
"""Prometheus metrics endpoint"""
from prometheus_client import generate_latest, CONTENT_TYPE_LATEST
return Response(generate_latest(), mimetype=CONTENT_TYPE_LATEST)
# Monitor system metrics periodically
import threading
import time
def update_system_metrics():
while True:
monitor.track_system_metrics()
time.sleep(30) # Update every 30 seconds
# Start background metrics collection
metrics_thread = threading.Thread(target=update_system_metrics, daemon=True)
metrics_thread.start()
Testing and Quality Assurance
Implement comprehensive testing strategies for reliable API integration.
Unit Testing
Test individual components in isolation:
import unittest
from unittest.mock import Mock, patch, MagicMock
import json
import tempfile
import os
class TestQwenImageUploader(unittest.TestCase):
def setUp(self):
self.api_key = "test_api_key"
self.uploader = QwenImageUploader(self.api_key)
# Mock sample image
self.sample_image_path = self._create_sample_image()
def _create_sample_image(self):
"""Create a sample test image"""
from PIL import Image
with tempfile.NamedTemporaryFile(suffix='.jpg', delete=False) as f:
# Create a simple test image
img = Image.new('RGB', (100, 100), color='red')
img.save(f.name, 'JPEG')
return f.name
def tearDown(self):
# Clean up test files
if os.path.exists(self.sample_image_path):
os.unlink(self.sample_image_path)
@patch('requests.post')
def test_upload_image_success(self, mock_post):
"""Test successful image upload"""
# Mock successful API response
mock_response = Mock()
mock_response.status_code = 200
mock_response.json.return_value = {
'image_id': 'test_image_id',
'temporary_url': 'https://temp.url/image.jpg'
}
mock_post.return_value = mock_response
# Test upload
result = self.uploader.upload_image_from_file(self.sample_image_path)
# Assertions
self.assertEqual(result['image_id'], 'test_image_id')
self.assertIn('temporary_url', result)
# Verify API was called correctly
mock_post.assert_called_once()
call_args = mock_post.call_args
# Check headers
self.assertEqual(call_args[1]['headers']['Authorization'], 'Bearer test_api_key')
self.assertEqual(call_args[1]['headers']['Content-Type'], 'application/json')
# Check request body contains image data
request_data = json.loads(call_args[1]['json'])
self.assertIn('image', request_data)
self.assertTrue(request_data['image'].startswith('data:image/jpeg;base64,'))
@patch('requests.post')
def test_upload_image_api_failure(self, mock_post):
"""Test API failure handling"""
mock_post.return_value.status_code = 400
mock_post.return_value.text = "Invalid image format"
with self.assertRaises(Exception) as context:
self.uploader.upload_image_from_file(self.sample_image_path)
self.assertIn("Upload failed", str(context.exception))
def test_invalid_image_file(self):
"""Test handling of invalid image files"""
# Create invalid image file
with tempfile.NamedTemporaryFile(mode='w', delete=False) as f:
f.write("This is not an image")
invalid_path = f.name
try:
with self.assertRaises(ValueError):
self.uploader.upload_image_from_file(invalid_path)
finally:
os.unlink(invalid_path)
class TestQwenImageEditor(unittest.TestCase):
def setUp(self):
self.api_key = "test_api_key"
self.editor = QwenImageEditor(self.api_key)
self.test_image_id = "test_image_123"
@patch('requests.post')
def test_edit_image_success(self, mock_post):
"""Test successful image editing"""
# Mock successful response
mock_response = Mock()
mock_response.status_code = 200
mock_response.json.return_value = {
'status': 'completed',
'output_url': 'https://output.url/edited_image.jpg',
'processing_time': 15.2
}
mock_post.return_value = mock_response
# Test edit
result = self.editor.edit_image(
image_id=self.test_image_id,
prompt="Convert to black and white"
)
# Assertions
self.assertEqual(result['status'], 'completed')
self.assertIn('output_url', result)
# Verify API call
mock_post.assert_called_once()
call_args = mock_post.call_args[1]
# Check request parameters
request_data = call_args['json']
self.assertEqual(request_data['image_id'], self.test_image_id)
self.assertEqual(request_data['prompt'], "Convert to black and white")
self.assertEqual(request_data['num_inference_steps'], 50) # Default value
self.assertEqual(request_data['guidance_scale'], 7.5) # Default value
@patch('requests.post')
@patch('time.sleep')
def test_edit_image_sync_with_polling(self, mock_sleep, mock_post):
"""Test synchronous editing with job polling"""
# Mock initial response (processing)
processing_response = Mock()
processing_response.status_code = 200
processing_response.json.return_value = {
'status': 'processing',
'job_id': 'job_123'
}
# Mock completion response
completion_response = Mock()
completion_response.status_code = 200
completion_response.json.return_value = {
'status': 'completed',
'output_url': 'https://output.url/edited_image.jpg'
}
# Mock status check response
status_response = Mock()
status_response.status_code = 200
status_response.json.return_value = {
'status': 'completed',
'output_url': 'https://output.url/edited_image.jpg'
}
# Configure mock to return different responses
mock_post.side_effect = [
processing_response, # Initial edit request
status_response, # Status check
]
# Test synchronous edit
result = self.editor.edit_image_sync(
image_id=self.test_image_id,
prompt="Test edit"
)
# Assertions
self.assertEqual(result['status'], 'completed')
self.assertIn('output_url', result)
class TestQwenBatchProcessor(unittest.TestCase):
def setUp(self):
self.api_key = "test_api_key"
self.processor = QwenBatchProcessor(self.api_key, max_workers=2)
@patch.object(QwenBatchProcessor, 'process_single_job')
def test_process_batch_success(self, mock_process):
"""Test successful batch processing"""
# Mock successful job processing
mock_process.return_value = {
'success': True,
'image_id': 'test_img',
'result': {'output_url': 'test_url'}
}
# Define test jobs
jobs = [
{'image_id': 'img_1', 'prompt': 'Edit 1', 'params': {}},
{'image_id': 'img_2', 'prompt': 'Edit 2', 'params': {}}
]
# Process batch
results = self.processor.process_batch(jobs)
# Assertions
self.assertEqual(len(results), 2)
self.assertTrue(all(r['success'] for r in results))
self.assertEqual(mock_process.call_count, 2)
@patch.object(QwenBatchProcessor, 'process_single_job')
def test_process_batch_with_failures(self, mock_process):
"""Test batch processing with some failures"""
# Mock mixed results
mock_process.side_effect = [
{'success': True, 'image_id': 'img_1', 'result': {}},
{'success': False, 'image_id': 'img_2', 'error': 'API Error'},
{'success': True, 'image_id': 'img_3', 'result': {}}
]
jobs = [
{'image_id': 'img_1', 'prompt': 'Edit 1', 'params': {}},
{'image_id': 'img_2', 'prompt': 'Edit 2', 'params': {}},
{'image_id': 'img_3', 'prompt': 'Edit 3', 'params': {}}
]
results = self.processor.process_batch(jobs)
# Assertions
self.assertEqual(len(results), 3)
self.assertEqual(len([r for r in results if r['success']]), 2)
self.assertEqual(len([r for r in results if not r['success']]), 1)
if __name__ == '__main__':
unittest.main()
Integration Testing
Test complete workflows end-to-end:
import pytest
import requests
import time
import tempfile
import os
from PIL import Image
class TestQwenAPIIntegration:
"""Integration tests for Qwen API"""
@pytest.fixture(scope="class")
def api_client(self):
"""Create API client for testing"""
api_key = os.getenv('QWEN_TEST_API_KEY')
if not api_key:
pytest.skip("QWEN_TEST_API_KEY not set")
return QwenImageEditor(api_key)
@pytest.fixture(scope="class")
def test_image(self):
"""Create test image"""
with tempfile.NamedTemporaryFile(suffix='.jpg', delete=False) as f:
# Create test image
img = Image.new('RGB', (512, 512), color='blue')
img.save(f.name, 'JPEG')
yield f.name
os.unlink(f.name)
def test_complete_workflow(self, api_client, test_image):
"""Test complete image editing workflow"""
# Step 1: Upload image
uploader = QwenImageUploader(api_client.api_key)
upload_result = uploader.upload_image_from_file(test_image)
assert 'image_id' in upload_result
image_id = upload_result['image_id']
# Step 2: Edit image
edit_result = api_client.edit_image_sync(
image_id=image_id,
prompt="Convert to grayscale",
steps=30 # Faster for testing
)
assert edit_result['status'] == 'completed'
assert 'output_url' in edit_result
# Step 3: Download and verify result
response = requests.get(edit_result['output_url'])
response.raise_for_status()
# Verify we got a valid image
result_img = Image.open(io.BytesIO(response.content))
assert result_img.size == (512, 512) # Check dimensions
def test_batch_processing_workflow(self, api_client, test_image):
"""Test batch processing workflow"""
# Upload multiple images
uploader = QwenImageUploader(api_client.api_key)
image_ids = []
for i in range(3):
result = uploader.upload_image_from_file(test_image)
image_ids.append(result['image_id'])
# Process batch
batch_processor = QwenBatchProcessor(api_client.api_key, max_workers=2)
jobs = [
{
'image_id': img_id,
'prompt': f"Apply style {i+1}",
'params': {'steps': 25}
}
for i, img_id in enumerate(image_ids)
]
results = batch_processor.process_batch(jobs)
# Verify results
assert len(results) == 3
assert len([r for r in results if r['success']]) >= 2 # Allow for occasional failures
@pytest.mark.slow
def test_performance_limits(self, api_client, test_image):
"""Test performance and rate limits"""
uploader = QwenImageUploader(api_client.api_key)
# Test upload performance
start_time = time.time()
upload_results = []
for i in range(5): # Test 5 concurrent uploads
result = uploader.upload_image_from_file(test_image)
upload_results.append(result)
upload_time = time.time() - start_time
# Upload should complete within reasonable time
assert upload_time < 60 # 60 seconds max
assert len(upload_results) == 5
# Test editing performance
editor = QwenImageEditor(api_client.api_key)
edit_start = time.time()
for result in upload_results:
edit_result = editor.edit_image_sync(
image_id=result['image_id'],
prompt="Quick test edit",
steps=20 # Fast for testing
)
assert edit_result['status'] == 'completed'
edit_time = time.time() - edit_start
# Edits should complete within reasonable time
assert edit_time < 300 # 5 minutes max
# Configuration for pytest
# pytest.ini or conftest.py
pytest_plugins = ["pytest_asyncio"]
def pytest_configure(config):
"""Configure pytest"""
config.addinivalue_line(
"markers", "slow: marks tests as slow (deselect with '-m \"not slow\"')"
)
config.addinivalue_line(
"markers", "integration: marks tests as integration tests"
)
Load Testing
Test system performance under load:
import asyncio
import aiohttp
import time
import statistics
from concurrent.futures import ThreadPoolExecutor
import matplotlib.pyplot as plt
class QwenLoadTester:
def __init__(self, api_key, base_url="https://api.qwen.ai/v1"):
self.api_key = api_key
self.base_url = base_url
self.results = []
async def single_request(self, session, request_data):
"""Single async request"""
start_time = time.time()
try:
async with session.post(
f"{self.base_url}/images/edit",
headers={
"Authorization": f"Bearer {self.api_key}",
"Content-Type": "application/json"
},
json=request_data
) as response:
end_time = time.time()
duration = end_time - start_time
result = {
'success': response.status == 200,
'status_code': response.status,
'duration': duration,
'timestamp': start_time
}
if response.status == 200:
result['response'] = await response.json()
else:
result['error'] = await response.text()
return result
except Exception as e:
end_time = time.time()
return {
'success': False,
'error': str(e),
'duration': end_time - start_time,
'timestamp': start_time
}
async def load_test_concurrent(self, concurrent_users=10, requests_per_user=5):
"""Load test with concurrent users"""
# Create test data
test_requests = [
{
'image_id': f'test_img_{i}',
'prompt': f'Test edit {i}',
'num_inference_steps': 20 # Faster for testing
}
for i in range(concurrent_users * requests_per_user)
]
async with aiohttp.ClientSession() as session:
tasks = []
# Create tasks for concurrent users
for user_id in range(concurrent_users):
user_tasks = []
for req_id in range(requests_per_user):
request_idx = user_id * requests_per_user + req_id
task = self.single_request(session, test_requests[request_idx])
user_tasks.append(task)
tasks.extend(user_tasks)
# Execute all tasks concurrently
results = await asyncio.gather(*tasks, return_exceptions=True)
# Filter out exceptions
valid_results = [r for r in results if isinstance(r, dict)]
self.results.extend(valid_results)
return valid_results
def analyze_results(self):
"""Analyze load test results"""
if not self.results:
return {}
# Basic statistics
total_requests = len(self.results)
successful_requests = len([r for r in self.results if r['success']])
failed_requests = total_requests - successful_requests
durations = [r['duration'] for r in self.results if r['success']]
analysis = {
'total_requests': total_requests,
'successful_requests': successful_requests,
'failed_requests': failed_requests,
'success_rate': (successful_requests / total_requests) * 100,
'duration_stats': {
'min': min(durations) if durations else 0,
'max': max(durations) if durations else 0,
'mean': statistics.mean(durations) if durations else 0,
'median': statistics.median(durations) if durations else 0,
'stdev': statistics.stdev(durations) if len(durations) > 1 else 0
}
}
# Performance percentiles
if durations:
durations.sort()
analysis['percentiles'] = {
'p50': durations[int(len(durations) * 0.5)],
'p90': durations[int(len(durations) * 0.9)],
'p95': durations[int(len(durations) * 0.95)],
'p99': durations[int(len(durations) * 0.99)]
}
# Throughput calculation
if self.results:
time_span = max(r['timestamp'] for r in self.results) - min(r['timestamp'] for r in self.results)
analysis['throughput'] = {
'requests_per_second': total_requests / time_span if time_span > 0 else 0,
'successful_rps': successful_requests / time_span if time_span > 0 else 0
}
return analysis
def generate_report(self, output_file="load_test_report.html"):
"""Generate HTML report"""
analysis = self.analyze_results()
html_template = """
<!DOCTYPE html>
<html>
<head>
<title>Qwen API Load Test Report</title>
<script src="https://cdn.jsdelivr.net/npm/chart.js"></script>
<style>
body { font-family: Arial, sans-serif; margin: 20px; }
.metric { background: #f5f5f5; padding: 15px; margin: 10px 0; border-radius: 5px; }
.chart { margin: 20px 0; }
table { border-collapse: collapse; width: 100%; }
th, td { border: 1px solid #ddd; padding: 8px; text-align: left; }
th { background-color: #f2f2f2; }
</style>
</head>
<body>
<h1>Qwen API Load Test Report</h1>
<div class="metric">
<h2>Summary</h2>
<table>
<tr><th>Metric</th><th>Value</th></tr>
<tr><td>Total Requests</td><td>{total_requests}</td></tr>
<tr><td>Success Rate</td><td>{success_rate:.2f}%</td></tr>
<tr><td>Average Duration</td><td>{avg_duration:.2f}s</td></tr>
<tr><td>Throughput</td><td>{throughput:.2f} req/s</td></tr>
</table>
</div>
<div class="chart">
<h2>Response Time Distribution</h2>
<canvas id="durationChart" width="400" height="200"></canvas>
</div>
<div class="chart">
<h2>Success Rate Over Time</h2>
<canvas id="successChart" width="400" height="200"></canvas>
</div>
<script>
// Duration distribution chart
const durationCtx = document.getElementById('durationChart').getContext('2d');
new Chart(durationCtx, {{
type: 'histogram',
data: {{
labels: {duration_labels},
datasets: [{{
label: 'Response Times',
data: {duration_data},
backgroundColor: 'rgba(54, 162, 235, 0.6)'
}}]
}}
}});
// Success rate chart
const successCtx = document.getElementById('successChart').getContext('2d');
new Chart(successCtx, {{
type: 'line',
data: {{
labels: {time_labels},
datasets: [{{
label: 'Success Rate %',
data: {success_data},
borderColor: 'rgb(75, 192, 192)',
tension: 0.1
}}]
}}
}});
</script>
</body>
</html>
"""
# Prepare data for charts
durations = [r['duration'] for r in self.results if r['success']]
with open(output_file, 'w') as f:
f.write(html_template.format(
total_requests=analysis['total_requests'],
success_rate=analysis['success_rate'],
avg_duration=analysis['duration_stats']['mean'],
throughput=analysis['throughput']['requests_per_second'],
duration_labels=str([f"{d:.1f}s" for d in durations]),
duration_data=str(durations),
time_labels='[]', # Would need time-series data
success_data='[]' # Would need success rate over time
))
print(f"Load test report generated: {output_file}")
# Usage example
async def run_load_test():
tester = QwenLoadTester("your_api_key")
# Run load test
results = await tester.load_test_concurrent(
concurrent_users=20,
requests_per_user=5
)
# Analyze results
analysis = tester.analyze_results()
print(f"Load test completed:")
print(f"Total requests: {analysis['total_requests']}")
print(f"Success rate: {analysis['success_rate']:.2f}%")
print(f"Average duration: {analysis['duration_stats']['mean']:.2f}s")
print(f"Throughput: {analysis['throughput']['requests_per_second']:.2f} req/s")
# Generate report
tester.generate_report()
if __name__ == "__main__":
asyncio.run(run_load_test())
Security Best Practices
Implement robust security measures for production deployments.
API Key Management
Secure your API credentials:
import os
import json
import hashlib
import cryptography.fernet
from cryptography.fernet import Fernet
import boto3
from botocore.exceptions import ClientError
class SecureAPIKeyManager:
def __init__(self, encryption_key=None):
if encryption_key:
self.fernet = Fernet(encryption_key.encode())
else:
# Generate new encryption key
self.fernet = Fernet(Fernet.generate_key())
def encrypt_api_key(self, api_key):
"""Encrypt API key for storage"""
return self.fernet.encrypt(api_key.encode()).decode()
def decrypt_api_key(self, encrypted_key):
"""Decrypt API key for use"""
return self.fernet.decrypt(encrypted_key.encode()).decode()
def hash_api_key(self, api_key):
"""Create hash for API key identification"""
return hashlib.sha256(api_key.encode()).hexdigest()
class AWSSecretsManager:
def __init__(self, region_name='us-east-1'):
self.client = boto3.client('secretsmanager', region_name=region_name)
def store_api_key(self, secret_name, api_key, description="Qwen API Key"):
"""Store API key in AWS Secrets Manager"""
try:
self.client.create_secret(
Name=secret_name,
Description=description,
SecretString=json.dumps({
'api_key': api_key,
'created_at': str(time.time())
})
)
print(f"Secret {secret_name} created successfully")
except self.client.exceptions.ResourceExistsException:
# Update existing secret
self.client.update_secret(
SecretId=secret_name,
Description=description,
SecretString=json.dumps({
'api_key': api_key,
'updated_at': str(time.time())
})
)
print(f"Secret {secret_name} updated successfully")
def get_api_key(self, secret_name):
"""Retrieve API key from AWS Secrets Manager"""
try:
response = self.client.get_secret_value(SecretId=secret_name)
secret_data = json.loads(response['SecretString'])
return secret_data['api_key']
except ClientError as e:
print(f"Error retrieving secret {secret_name}: {e}")
return None
def rotate_api_key(self, secret_name, new_api_key):
"""Rotate API key with zero downtime"""
# Store new key
self.store_api_key(f"{secret_name}-new", new_api_key, "New Qwen API Key")
# Update application to use new key
# (This would be implemented in your deployment pipeline)
# Mark old key for deletion (grace period)
print(f"API key rotation initiated for {secret_name}")
print(f"Delete old key after grace period: {secret_name}-old")
# Secure configuration management
class SecureConfig:
def __init__(self, config_file='config.json'):
self.config_file = config_file
self.key_manager = SecureAPIKeyManager()
self.config = self._load_config()
def _load_config(self):
"""Load configuration from encrypted file"""
if os.path.exists(self.config_file):
with open(self.config_file, 'r') as f:
encrypted_config = json.load(f)
# Decrypt configuration
config = {}
for key, value in encrypted_config.items():
if key.endswith('_encrypted'):
config[key.replace('_encrypted', '')] = self.key_manager.decrypt_api_key(value)
else:
config[key] = value
return config
else:
return self._create_default_config()
def _create_default_config(self):
"""Create default configuration"""
config = {
'qwen_api_key': os.getenv('QWEN_API_KEY'),
'base_url': 'https://api.qwen.ai/v1',
'timeout': 300,
'max_retries': 3,
'rate_limit_per_minute': 60
}
self._save_config(config)
return config
def _save_config(self, config):
"""Save configuration (encrypt sensitive values)"""
encrypted_config = {}
for key, value in config.items():
if 'api_key' in key.lower() or 'secret' in key.lower():
encrypted_config[f"{key}_encrypted"] = self.key_manager.encrypt_api_key(value)
else:
encrypted_config[key] = value
with open(self.config_file, 'w') as f:
json.dump(encrypted_config, f, indent=2)
def get(self, key, default=None):
"""Get configuration value"""
return self.config.get(key, default)
def set(self, key, value):
"""Set configuration value"""
self.config[key] = value
self._save_config(self.config)
# Usage
config = SecureConfig()
api_key = config.get('qwen_api_key')
# Or use AWS Secrets Manager
secrets_manager = AWSSecretsManager()
api_key = secrets_manager.get_api_key('qwen/prod/api-key')
Input Validation and Sanitization
Validate all inputs to prevent security issues:
import re
import bleach
from PIL import Image
import io
import base64
class InputValidator:
"""Comprehensive input validation for Qwen API"""
# Allowed file extensions
ALLOWED_EXTENSIONS = {'.jpg', '.jpeg', '.png', '.webp', '.bmp', '.tiff'}
# Maximum file size (10MB)
MAX_FILE_SIZE = 10 * 1024 * 1024
# Maximum image dimensions
MAX_DIMENSIONS = (4096, 4096)
# Minimum dimensions
MIN_DIMENSIONS = (64, 64)
# Prompt validation
MAX_PROMPT_LENGTH = 1000
BANNED_WORDS = ['violent', 'illegal', 'harmful', 'adult'] # Example
@classmethod
def validate_image_file(cls, file_data, filename):
"""Validate uploaded image file"""
validation_result = {
'valid': True,
'errors': [],
'warnings': []
}
# Check file size
if len(file_data) > cls.MAX_FILE_SIZE:
validation_result['valid'] = False
validation_result['errors'].append(f"File size exceeds {cls.MAX_FILE_SIZE} bytes")
# Check file extension
file_ext = os.path.splitext(filename)[1].lower()
if file_ext not in cls.ALLOWED_EXTENSIONS:
validation_result['valid'] = False
validation_result['errors'].append(f"File extension {file_ext} not allowed")
# Validate image content
try:
image = Image.open(io.BytesIO(file_data))
# Check dimensions
if image.size[0] < cls.MIN_DIMENSIONS[0] or image.size[1] < cls.MIN_DIMENSIONS[1]:
validation_result['valid'] = False
validation_result['errors'].append(f"Image too small: {image.size}")
if image.size[0] > cls.MAX_DIMENSIONS[0] or image.size[1] > cls.MAX_DIMENSIONS[1]:
validation_result['warnings'].append(f"Image will be resized from {image.size}")
# Check image mode
if image.mode not in ['RGB', 'RGBA']:
validation_result['warnings'].append(f"Image mode {image.mode} will be converted to RGB")
except Exception as e:
validation_result['valid'] = False
validation_result['errors'].append(f"Invalid image file: {e}")
return validation_result
@classmethod
def validate_prompt(cls, prompt):
"""Validate edit prompt"""
validation_result = {
'valid': True,
'errors': [],
'warnings': [],
'sanitized_prompt': prompt
}
# Check length
if len(prompt) > cls.MAX_PROMPT_LENGTH:
validation_result['valid'] = False
validation_result['errors'].append(f"Prompt too long: {len(prompt)} > {cls.MAX_PROMPT_LENGTH}")
# Check for banned words
lower_prompt = prompt.lower()
banned_found = [word for word in cls.BANNED_WORDS if word in lower_prompt]
if banned_found:
validation_result['valid'] = False
validation_result['errors'].append(f"Prompt contains prohibited content: {banned_found}")
# Sanitize prompt (remove potentially harmful content)
sanitized = bleach.clean(prompt, tags=[], strip=True)
sanitized = re.sub(r'[<>]', '', sanitized) # Remove brackets
if sanitized != prompt:
validation_result['warnings'].append("Prompt was sanitized for security")
validation_result['sanitized_prompt'] = sanitized
return validation_result
@classmethod
def validate_base64_image(cls, base64_data):
"""Validate base64 encoded image"""
validation_result = {
'valid': True,
'errors': [],
'warnings': []
}
try:
# Remove data URL prefix if present
if base64_data.startswith('data:image/'):
base64_data = base64_data.split(',')[1]
# Decode base64
image_data = base64.b64decode(base64_data)
# Validate as image file
file_validation = cls.validate_image_file(image_data, 'base64_image')
if not file_validation['valid']:
validation_result['valid'] = False
validation_result['errors'].extend(file_validation['errors'])
validation_result['warnings'].extend(file_validation['warnings'])
except Exception as e:
validation_result['valid'] = False
validation_result['errors'].append(f"Invalid base64 image data: {e}")
return validation_result
@classmethod
def validate_job_parameters(cls, params):
"""Validate job parameters"""
validation_result = {
'valid': True,
'errors': [],
'sanitized_params': {}
}
# Define parameter ranges
param_ranges = {
'num_inference_steps': {'min': 1, 'max': 100},
'guidance_scale': {'min': 1.0, 'max': 20.0},
'width': {'min': 64, 'max': 4096},
'height': {'min': 64, 'max': 4096},
'quality': {'min': 1, 'max': 100}
}
for param, value in params.items():
if param in param_ranges:
range_info = param_ranges[param]
if isinstance(value, (int, float)):
if value < range_info['min'] or value > range_info['max']:
validation_result['valid'] = False
validation_result['errors'].append(
f"{param} {value} out of range [{range_info['min']}, {range_info['max']}]"
)
# Clamp to valid range
sanitized_value = max(range_info['min'], min(range_info['max'], value))
validation_result['sanitized_params'][param] = sanitized_value
else:
validation_result['sanitized_params'][param] = value
else:
validation_result['valid'] = False
validation_result['errors'].append(f"{param} must be numeric")
else:
# Pass through unknown parameters (but could add validation here)
validation_result['sanitized_params'][param] = value
return validation_result
# Usage in API endpoints
@app.route('/api/upload', methods=['POST'])
def upload_image():
"""Secure image upload endpoint"""
try:
# Get file from request
if 'image' not in request.files:
return jsonify({'error': 'No image file provided'}), 400
file = request.files['image']
# Validate file
validation_result = InputValidator.validate_image_file(
file.read(),
file.filename
)
if not validation_result['valid']:
return jsonify({
'error': 'Invalid image file',
'details': validation_result['errors']
}), 400
# Process file if valid
file.seek(0) # Reset file pointer
# ... continue with upload logic
return jsonify({
'success': True,
'warnings': validation_result['warnings']
})
except Exception as e:
return jsonify({'error': str(e)}), 500
@app.route('/api/edit', methods=['POST'])
def edit_image():
"""Secure image editing endpoint"""
try:
data = request.get_json()
# Validate prompt
prompt_validation = InputValidator.validate_prompt(data.get('prompt', ''))
if not prompt_validation['valid']:
return jsonify({
'error': 'Invalid prompt',
'details': prompt_validation['errors']
}), 400
# Validate parameters
params = data.get('params', {})
param_validation = InputValidator.validate_job_parameters(params)
if not param_validation['valid']:
return jsonify({
'error': 'Invalid parameters',
'details': param_validation['errors']
}), 400
# Use sanitized values
sanitized_prompt = prompt_validation['sanitized_prompt']
sanitized_params = param_validation['sanitized_params']
# Continue with processing...
return jsonify({
'success': True,
'prompt': sanitized_prompt,
'params': sanitized_params,
'warnings': prompt_validation.get('warnings', [])
})
except Exception as e:
return jsonify({'error': str(e)}), 500
Conclusion and Next Steps
This comprehensive guide has covered the essential aspects of integrating Qwen AI Image Editor into production applications. From basic API authentication to advanced automation workflows, security best practices, and monitoring strategies, you now have the foundation to build scalable, reliable image editing solutions.
Key Takeaways
- Start Simple: Begin with basic API integration and gradually add complexity
- Prioritize Security: Never hardcode credentials; always use secure storage
- Implement Monitoring: Track performance, errors, and usage patterns
- Design for Scale: Use queues, caching, and load balancing from the start
- Test Thoroughly: Unit tests, integration tests, and load testing are essential
- Plan for Failures: Implement retry logic, error handling, and circuit breakers
Recommended Implementation Path
- Week 1: Basic API integration with authentication
- Week 2: Implement core editing functionality and error handling
- Week 3: Add batch processing and workflow automation
- Week 4: Implement monitoring, logging, and alerting
- Week 5: Security hardening and compliance measures
- Week 6: Load testing and performance optimization
- Week 7: Production deployment and documentation
Additional Resources
Official Documentation:
Development Tools:
Production Resources:
This article completes our three-part series on Qwen AI Image Editor. You now have everything needed to move from basic usage to advanced professional implementation with API integration and automation.
Last updated: January 2025