Add mutable datastores that can be linked to multiple functions
This commit is contained in:
41
worker.py
41
worker.py
@@ -44,16 +44,51 @@ async def execute_timer_function_async(timer_function):
|
||||
api_url = PYTHON_API_URL
|
||||
else:
|
||||
api_url = NODE_API_URL
|
||||
|
||||
# Load and inject shared environments (namespaced)
|
||||
shared_envs = db.execute('''
|
||||
SELECT se.id, se.name, se.environment
|
||||
FROM timer_function_shared_envs tfse
|
||||
JOIN shared_environments se ON tfse.shared_env_id = se.id
|
||||
WHERE tfse.timer_function_id = %s
|
||||
ORDER BY se.name
|
||||
''', [timer_function['id']])
|
||||
|
||||
# Inject shared environments as nested objects
|
||||
combined_environment = environment.copy() if environment else {}
|
||||
shared_env_map = {} # Track shared env IDs for later extraction
|
||||
if shared_envs:
|
||||
for se in shared_envs:
|
||||
env_data = json.loads(se['environment']) if isinstance(se['environment'], str) else se['environment']
|
||||
combined_environment[se['name']] = env_data
|
||||
shared_env_map[se['name']] = se['id']
|
||||
|
||||
async with aiohttp.ClientSession() as session:
|
||||
async with session.post(api_url, json={
|
||||
'code': code,
|
||||
'request': {'method': 'TIMER'},
|
||||
'environment': environment,
|
||||
'environment': combined_environment,
|
||||
'name': name
|
||||
}) as response:
|
||||
response_data = await response.json()
|
||||
|
||||
# Extract and persist shared environment mutations
|
||||
returned_env = response_data['environment']
|
||||
function_specific_env = {}
|
||||
|
||||
# Separate function-specific properties from shared environments
|
||||
for key, value in returned_env.items():
|
||||
if key in shared_env_map:
|
||||
# This is a shared environment - save it back
|
||||
db.execute(
|
||||
'UPDATE shared_environments SET environment=%s, updated_at=NOW() WHERE id=%s',
|
||||
[json.dumps(value), shared_env_map[key]],
|
||||
commit=True
|
||||
)
|
||||
else:
|
||||
# This is function-specific - keep it
|
||||
function_specific_env[key] = value
|
||||
|
||||
# Update environment and record invocation
|
||||
# Calculate next run time based on trigger type
|
||||
next_run = None
|
||||
@@ -74,7 +109,7 @@ async def execute_timer_function_async(timer_function):
|
||||
last_run = NOW(),
|
||||
next_run = %s
|
||||
WHERE id = %s
|
||||
""", [json.dumps(response_data['environment']), next_run, timer_function['id']], commit=True)
|
||||
""", [json.dumps(function_specific_env), next_run, timer_function['id']], commit=True)
|
||||
|
||||
# Record the invocation
|
||||
db.execute("""
|
||||
@@ -162,4 +197,4 @@ if __name__ == '__main__':
|
||||
try:
|
||||
asyncio.get_event_loop().run_forever()
|
||||
except (KeyboardInterrupt, SystemExit):
|
||||
pass
|
||||
pass
|
||||
|
||||
Reference in New Issue
Block a user