from flask import Blueprint, request, jsonify from flask_login import current_user, login_required from extensions import db import json tests_bp = Blueprint('tests', __name__) @tests_bp.route('/http/tests/', methods=['GET']) @login_required def list_tests(function_id): """Get all test cases for a function""" user_id = current_user.id # Verify function ownership http_function = db.get_http_function_by_id(user_id, function_id) if not http_function: return jsonify({'error': 'Function not found'}), 404 tests = db.get_function_tests(function_id) return jsonify({'tests': tests}) @tests_bp.route('/http/tests/', methods=['POST']) @login_required def create_test(function_id): """Create a new test case""" user_id = current_user.id # Verify function ownership http_function = db.get_http_function_by_id(user_id, function_id) if not http_function: return jsonify({'error': 'Function not found'}), 404 data = request.json name = data.get('name') description = data.get('description', '') request_method = data.get('request_method', 'POST') request_headers = data.get('request_headers', {}) request_body = data.get('request_body', {}) expected_status = data.get('expected_status', 'SUCCESS') expected_output = data.get('expected_output') assertions = data.get('assertions', []) if not name: return jsonify({'error': 'Test name is required'}), 400 test = db.create_function_test( function_id, name, description, request_method, request_headers, request_body, expected_status, expected_output, assertions ) return jsonify({'status': 'success', 'test': test}) @tests_bp.route('/http/tests/test/', methods=['PUT']) @login_required def update_test(test_id): """Update a test case""" user_id = current_user.id # Get and verify test ownership test = db.get_function_test(test_id) if not test: return jsonify({'error': 'Test not found'}), 404 http_function = db.get_http_function_by_id(user_id, test['http_function_id']) if not http_function: return jsonify({'error': 'Unauthorized'}), 403 data = request.json updated_test = db.update_function_test( test_id, data.get('name', test['name']), data.get('description', test['description']), data.get('request_method', test['request_method']), data.get('request_headers', test['request_headers']), data.get('request_body', test['request_body']), data.get('expected_status', test['expected_status']), data.get('expected_output', test['expected_output']), data.get('assertions', test.get('assertions', [])) ) return jsonify({'status': 'success', 'test': updated_test}) @tests_bp.route('/http/tests/test/', methods=['DELETE']) @login_required def delete_test(test_id): """Delete a test case""" user_id = current_user.id # Get and verify test ownership test = db.get_function_test(test_id) if not test: return jsonify({'error': 'Test not found'}), 404 http_function = db.get_http_function_by_id(user_id, test['http_function_id']) if not http_function: return jsonify({'error': 'Unauthorized'}), 403 db.delete_function_test(test_id) return jsonify({'status': 'success', 'message': 'Test deleted'}) @tests_bp.route('/http/tests/run/', methods=['POST']) @login_required def run_single_test(test_id): """Run a single test case""" user_id = current_user.id # Get and verify test ownership test = db.get_function_test(test_id) if not test: return jsonify({'error': 'Test not found'}), 404 http_function = db.get_http_function_by_id(user_id, test['http_function_id']) if not http_function: return jsonify({'error': 'Unauthorized'}), 403 # Execute the function with test input result = execute_test(http_function, test) return jsonify(result) @tests_bp.route('/http/tests/run-all/', methods=['POST']) @login_required def run_all_tests(function_id): """Run all tests for a function""" user_id = current_user.id # Verify function ownership http_function = db.get_http_function_by_id(user_id, function_id) if not http_function: return jsonify({'error': 'Function not found'}), 404 tests = db.get_function_tests(function_id) results = [] for test in tests: result = execute_test(http_function, test) results.append({ 'test_id': test['id'], 'test_name': test['name'], **result }) passed_count = sum(1 for r in results if r['passed']) total_count = len(results) return jsonify({ 'results': results, 'summary': { 'total': total_count, 'passed': passed_count, 'failed': total_count - passed_count } }) def get_nested_value(data, path): """Get value from nested dict using dot notation (e.g., 'user.email')""" if not path: return data keys = path.split('.') value = data try: for key in keys: if isinstance(value, dict): value = value.get(key) else: return None return value except (KeyError, TypeError): return None def evaluate_assertion(assertion, response_data): """Evaluate a single assertion against response data""" assertion_type = assertion.get('type') path = assertion.get('path', '') description = assertion.get('description', '') # Get value at path value = get_nested_value(response_data, path) result = { 'type': assertion_type, 'description': description, 'path': path, 'passed': False, 'message': '' } if assertion_type == 'field_exists': result['passed'] = value is not None result['message'] = f"Field '{path}' {'exists' if result['passed'] else 'does not exist'}" elif assertion_type == 'field_equals': expected_value = assertion.get('value') result['passed'] = value == expected_value result['expected'] = expected_value result['actual'] = value result['message'] = f"Expected {expected_value} but got {value}" if not result['passed'] else f"Field '{path}' equals {expected_value}" elif assertion_type == 'field_type': expected_type = assertion.get('expectedType') actual_type = type(value).__name__ if value is not None else 'None' result['passed'] = actual_type == expected_type result['expected'] = expected_type result['actual'] = actual_type result['message'] = f"Expected type {expected_type} but got {actual_type}" if not result['passed'] else f"Field '{path}' is {expected_type}" elif assertion_type == 'field_contains': substring = assertion.get('substring', '') if isinstance(value, str): result['passed'] = substring in value result['message'] = f"Field '{path}' {'contains' if result['passed'] else 'does not contain'} '{substring}'" else: result['passed'] = False result['message'] = f"Field '{path}' is not a string" return result def execute_test(http_function, test): """Execute a test case and return the result""" import requests from app import app try: # Prepare request data request_data = { 'code': http_function['script_content'], 'environment': http_function['environment_info'] if isinstance(http_function['environment_info'], dict) else json.loads(http_function['environment_info']), 'request': { 'method': test['request_method'], 'headers': test['request_headers'] if isinstance(test['request_headers'], dict) else json.loads(test['request_headers']), 'json': test['request_body'] if isinstance(test['request_body'], dict) else json.loads(test['request_body']), 'url': f'/test/{http_function["name"]}', 'path': '/' }, 'name': http_function['name'] } # Call the appropriate runtime isolator runtime = http_function.get('runtime', 'node') if runtime == 'python': api_url = 'http://python-isolator.web:5000/execute' elif runtime == 'deno': api_url = 'http://deno-isolator.web:5000/execute' else: api_url = 'http://isolator.web:5000/execute' response = requests.post(api_url, json=request_data, timeout=30) response_data = response.json() # Get actual results actual_status = response_data.get('status') actual_output = response_data.get('result') expected_status = test['expected_status'] # Get assertions assertions = test.get('assertions') if isinstance(assertions, str): assertions = json.loads(assertions) if assertions else [] elif assertions is None: assertions = [] # If assertions exist, use assertion-based validation if assertions and len(assertions) > 0: assertion_results = [] for idx, assertion in enumerate(assertions): assertion_result = evaluate_assertion(assertion, actual_output) assertion_result['id'] = idx assertion_results.append(assertion_result) # Test passes if all assertions pass passed = all(a['passed'] for a in assertion_results) return { 'passed': passed, 'actual_status': actual_status, 'actual_output': actual_output, 'expected_status': expected_status, 'assertions': assertion_results, 'logs': response_data.get('logs', []), 'error': None } # Fallback to old expected_output comparison else: expected_output = test['expected_output'] if isinstance(test['expected_output'], dict) else (json.loads(test['expected_output']) if test['expected_output'] else None) # Check if test passed status_match = actual_status == expected_status output_match = True # If expected output is specified, compare it if expected_output is not None: output_match = actual_output == expected_output passed = status_match and output_match return { 'passed': passed, 'actual_status': actual_status, 'actual_output': actual_output, 'expected_status': expected_status, 'expected_output': expected_output, 'logs': response_data.get('logs', []), 'error': None } except Exception as e: return { 'passed': False, 'actual_status': 'ERROR', 'actual_output': None, 'expected_status': test['expected_status'], 'expected_output': test.get('expected_output'), 'logs': [], 'error': str(e) }