Workflow
Overview
user_code: com.finmars.standard-workflow:workflow-manager
payload: optional
Воркфлоу менеджер предназначен для запуска воркеров в требуемоем порядке с определенным payload.
Основной сценарий использования воркфлоу менеджера предназачен при отсутствии payload.
Сценарий его работы:
- Прочитать или создать файл
`/states/
global_state_manager.json`global\_state\_manager.json - Прочитать все json менеджеров в
`/states/
managers`managers - Прочитать статус выполнения каждого менеджера из поля "status" в соотетствующем json
- Распределить стейт файлы по статусам в виде словаря
{
"to-do": [],
"in-progress": [
"com.finmars.standard-workflow:workflow-manager-20240821095322.json"
],
"done": [
"com.finmars.standard-workflow:workflow-manager-20240724101635.json",
"com.finmars.standard-workflow:workflow-manager-20240724104102.json"
],
"paused": []
}
- Запустить логику обработки данных стейт-файлов
Логика обработки стейт-файлов из global-stateglobal_state_manager.json
- Попытаться прочитать файл
`global_state_manager.
. Если нет, то создать его.json`json - Прочитать список файлов из
/states/managers/
. - Распределить по статусам и сохранить в global_state_manager.json
- Если файла нет, то добавить в to_do
- Если в to-do, то сменить статус на in-progress
- Если файл есть, то обновить статус
- Прочитать содержимое файла из
in-progress
Логика обработки загруженного стейта файла менеджера
# Идем в цикле по воркерам и находим те, которые требуют работы
for worker in state["workers"]:
if worker['status'] in ['success', 'skip', 'ignore']:
continue
elif worker['status'] == 'in-progress':
# Логика
for item in worker['state']:
if worker['state'][item]['status'] == 'in-progress':
item_state_path = worker['state'][item]["state_path"]
item_state = get_data(self, item_state_path)
worker['state'][item] = item_state
if worker['state'][item]['status'] == 'in-progress':
return {"message":"still is in progress"}
state = propagate_status(state)
save_file(self,state_path, state)
for item in worker['state']:
if worker['state'][item]['status'] == 'to-do':
workflow_user_code = f'{worker["configuration_code"]}:{worker["user_code"]}'
payload = {
"data_options": worker["data_options"],
"import_options": worker["import_options"],
"calculation_options": worker["calculation_options"],
"download_options": worker["download_options"],
"state_options": worker["state_options"],
"state": worker['state'][item]
}
start_workflow(self, user_code=workflow_user_code, payload=payload)
worker['state'][item]['status'] = "in-progress"
state = propagate_status(state)
save_file(self,state_path, state)
return {"message":"new workflow has started"}
elif worker['status'] == 'to-do':
for item in worker['state']:
if worker['state'][item]['status'] == 'to-do':
workflow_user_code = f'{worker["configuration_code"]}:{worker["user_code"]}'
payload = {
"data_options": worker["data_options"],
"import_options": worker["import_options"],
"calculation_options": worker["calculation_options"],
"download_options": worker["download_options"],
"state_options": worker["state_options"],
"state": worker['state'][item]
}
start_workflow(self, user_code=workflow_user_code, payload=payload)
worker['state'][item]['status'] = "in-progress"
worker['status'] = "in-progress"
save_file(self,state_path, state)
1. Найти