Files
function/routes/tests.py
2025-12-03 21:12:35 +11:00

325 lines
12 KiB
Python

from flask import Blueprint, request, jsonify
from flask_login import current_user, login_required
from extensions import db
import json
tests_bp = Blueprint('tests', __name__)
@tests_bp.route('/http/tests/<int:function_id>', methods=['GET'])
@login_required
def list_tests(function_id):
"""Get all test cases for a function"""
user_id = current_user.id
# Verify function ownership
http_function = db.get_http_function_by_id(user_id, function_id)
if not http_function:
return jsonify({'error': 'Function not found'}), 404
tests = db.get_function_tests(function_id)
return jsonify({'tests': tests})
@tests_bp.route('/http/tests/<int:function_id>', methods=['POST'])
@login_required
def create_test(function_id):
"""Create a new test case"""
user_id = current_user.id
# Verify function ownership
http_function = db.get_http_function_by_id(user_id, function_id)
if not http_function:
return jsonify({'error': 'Function not found'}), 404
data = request.json
name = data.get('name')
description = data.get('description', '')
request_method = data.get('request_method', 'POST')
request_headers = data.get('request_headers', {})
request_body = data.get('request_body', {})
expected_status = data.get('expected_status', 'SUCCESS')
expected_output = data.get('expected_output')
assertions = data.get('assertions', [])
if not name:
return jsonify({'error': 'Test name is required'}), 400
test = db.create_function_test(
function_id,
name,
description,
request_method,
request_headers,
request_body,
expected_status,
expected_output,
assertions
)
return jsonify({'status': 'success', 'test': test})
@tests_bp.route('/http/tests/test/<int:test_id>', methods=['PUT'])
@login_required
def update_test(test_id):
"""Update a test case"""
user_id = current_user.id
# Get and verify test ownership
test = db.get_function_test(test_id)
if not test:
return jsonify({'error': 'Test not found'}), 404
http_function = db.get_http_function_by_id(user_id, test['http_function_id'])
if not http_function:
return jsonify({'error': 'Unauthorized'}), 403
data = request.json
updated_test = db.update_function_test(
test_id,
data.get('name', test['name']),
data.get('description', test['description']),
data.get('request_method', test['request_method']),
data.get('request_headers', test['request_headers']),
data.get('request_body', test['request_body']),
data.get('expected_status', test['expected_status']),
data.get('expected_output', test['expected_output']),
data.get('assertions', test.get('assertions', []))
)
return jsonify({'status': 'success', 'test': updated_test})
@tests_bp.route('/http/tests/test/<int:test_id>', methods=['DELETE'])
@login_required
def delete_test(test_id):
"""Delete a test case"""
user_id = current_user.id
# Get and verify test ownership
test = db.get_function_test(test_id)
if not test:
return jsonify({'error': 'Test not found'}), 404
http_function = db.get_http_function_by_id(user_id, test['http_function_id'])
if not http_function:
return jsonify({'error': 'Unauthorized'}), 403
db.delete_function_test(test_id)
return jsonify({'status': 'success', 'message': 'Test deleted'})
@tests_bp.route('/http/tests/run/<int:test_id>', methods=['POST'])
@login_required
def run_single_test(test_id):
"""Run a single test case"""
user_id = current_user.id
# Get and verify test ownership
test = db.get_function_test(test_id)
if not test:
return jsonify({'error': 'Test not found'}), 404
http_function = db.get_http_function_by_id(user_id, test['http_function_id'])
if not http_function:
return jsonify({'error': 'Unauthorized'}), 403
# Execute the function with test input
result = execute_test(http_function, test)
return jsonify(result)
@tests_bp.route('/http/tests/run-all/<int:function_id>', methods=['POST'])
@login_required
def run_all_tests(function_id):
"""Run all tests for a function"""
user_id = current_user.id
# Verify function ownership
http_function = db.get_http_function_by_id(user_id, function_id)
if not http_function:
return jsonify({'error': 'Function not found'}), 404
tests = db.get_function_tests(function_id)
results = []
for test in tests:
result = execute_test(http_function, test)
results.append({
'test_id': test['id'],
'test_name': test['name'],
**result
})
passed_count = sum(1 for r in results if r['passed'])
total_count = len(results)
return jsonify({
'results': results,
'summary': {
'total': total_count,
'passed': passed_count,
'failed': total_count - passed_count
}
})
def get_nested_value(data, path):
"""Get value from nested dict using dot notation (e.g., 'user.email')"""
if not path:
return data
keys = path.split('.')
value = data
try:
for key in keys:
if isinstance(value, dict):
value = value.get(key)
else:
return None
return value
except (KeyError, TypeError):
return None
def evaluate_assertion(assertion, response_data):
"""Evaluate a single assertion against response data"""
assertion_type = assertion.get('type')
path = assertion.get('path', '')
description = assertion.get('description', '')
# Get value at path
value = get_nested_value(response_data, path)
result = {
'type': assertion_type,
'description': description,
'path': path,
'passed': False,
'message': ''
}
if assertion_type == 'field_exists':
result['passed'] = value is not None
result['message'] = f"Field '{path}' {'exists' if result['passed'] else 'does not exist'}"
elif assertion_type == 'field_equals':
expected_value = assertion.get('value')
result['passed'] = value == expected_value
result['expected'] = expected_value
result['actual'] = value
result['message'] = f"Expected {expected_value} but got {value}" if not result['passed'] else f"Field '{path}' equals {expected_value}"
elif assertion_type == 'field_type':
expected_type = assertion.get('expectedType')
actual_type = type(value).__name__ if value is not None else 'None'
result['passed'] = actual_type == expected_type
result['expected'] = expected_type
result['actual'] = actual_type
result['message'] = f"Expected type {expected_type} but got {actual_type}" if not result['passed'] else f"Field '{path}' is {expected_type}"
elif assertion_type == 'field_contains':
substring = assertion.get('substring', '')
if isinstance(value, str):
result['passed'] = substring in value
result['message'] = f"Field '{path}' {'contains' if result['passed'] else 'does not contain'} '{substring}'"
else:
result['passed'] = False
result['message'] = f"Field '{path}' is not a string"
return result
def execute_test(http_function, test):
"""Execute a test case and return the result"""
import requests
from app import app
try:
# Prepare request data
request_data = {
'code': http_function['script_content'],
'environment': http_function['environment_info'] if isinstance(http_function['environment_info'], dict) else json.loads(http_function['environment_info']),
'request': {
'method': test['request_method'],
'headers': test['request_headers'] if isinstance(test['request_headers'], dict) else json.loads(test['request_headers']),
'json': test['request_body'] if isinstance(test['request_body'], dict) else json.loads(test['request_body']),
'url': f'/test/{http_function["name"]}',
'path': '/'
},
'name': http_function['name']
}
# Call the appropriate runtime isolator
runtime = http_function.get('runtime', 'node')
if runtime == 'python':
api_url = 'http://python-isolator.web:5000/execute'
elif runtime == 'deno':
api_url = 'http://deno-isolator.web:5000/execute'
else:
api_url = 'http://isolator.web:5000/execute'
response = requests.post(api_url, json=request_data, timeout=30)
response_data = response.json()
# Get actual results
actual_status = response_data.get('status')
actual_output = response_data.get('result')
expected_status = test['expected_status']
# Get assertions
assertions = test.get('assertions')
if isinstance(assertions, str):
assertions = json.loads(assertions) if assertions else []
elif assertions is None:
assertions = []
# If assertions exist, use assertion-based validation
if assertions and len(assertions) > 0:
assertion_results = []
for idx, assertion in enumerate(assertions):
assertion_result = evaluate_assertion(assertion, actual_output)
assertion_result['id'] = idx
assertion_results.append(assertion_result)
# Test passes if all assertions pass
passed = all(a['passed'] for a in assertion_results)
return {
'passed': passed,
'actual_status': actual_status,
'actual_output': actual_output,
'expected_status': expected_status,
'assertions': assertion_results,
'logs': response_data.get('logs', []),
'error': None
}
# Fallback to old expected_output comparison
else:
expected_output = test['expected_output'] if isinstance(test['expected_output'], dict) else (json.loads(test['expected_output']) if test['expected_output'] else None)
# Check if test passed
status_match = actual_status == expected_status
output_match = True
# If expected output is specified, compare it
if expected_output is not None:
output_match = actual_output == expected_output
passed = status_match and output_match
return {
'passed': passed,
'actual_status': actual_status,
'actual_output': actual_output,
'expected_status': expected_status,
'expected_output': expected_output,
'logs': response_data.get('logs', []),
'error': None
}
except Exception as e:
return {
'passed': False,
'actual_status': 'ERROR',
'actual_output': None,
'expected_status': test['expected_status'],
'expected_output': test.get('expected_output'),
'logs': [],
'error': str(e)
}