Skip to main content

Workflow

Overview

user_code: com.finmars.standard-workflow:workflow-manager
payload: optional

Воркфлоу менеджер предназначен для запуска воркеров в требуемоем порядке с определенным payload.

Основной сценарий использования воркфлоу менеджера предназачен при отсутствии payload.

Сценарий его работы:

  1. Прочитать или создать файл `/states/global_state_manager.json`global\_state\_manager.json
  2. Прочитать все json менеджеров в `/states/managers`managers
  3. Прочитать статус выполнения каждого менеджера из поля "status" в соотетствующем json
  4. Распределить стейт файлы по статусам в виде словаря 
{
    "to-do": [],
    "in-progress": [
        "com.finmars.standard-workflow:workflow-manager-20240821095322.json"
    ],
    "done": [
        "com.finmars.standard-workflow:workflow-manager-20240724101635.json",
        "com.finmars.standard-workflow:workflow-manager-20240724104102.json"
    ],
    "paused": []
}

       5. 
  1. Запустить логику обработки данных стейт-файлов

Логика обработки стейт-файлов из global-stateglobal_state_manager.json

  1. Попытаться прочитать файл `global_state_manager.json`json. Если нет, то создать его.
  2. Прочитать список файлов из /states/managers/.
  3. Распределить по статусам и сохранить в global_state_manager.json
    • Если файла нет, то добавить в to_do
    • Если в to-do, то сменить статус на in-progress
    • Если файл есть, то обновить статус
  4. Прочитать содержимое файла из in-progress 

Логика обработки загруженного стейта файла менеджера

# Идем в цикле по воркерам и находим те, которые требуют работы
for worker in state["workers"]:
  
  if worker['status'] in ['success', 'skip', 'ignore']:
    continue
    
  elif worker['status'] == 'in-progress':
  # Логика 
    for item in worker['state']:
      
      if worker['state'][item]['status'] == 'in-progress':
        item_state_path = worker['state'][item]["state_path"]
        item_state = get_data(self, item_state_path)
        worker['state'][item] = item_state
                        
        if worker['state'][item]['status'] == 'in-progress':
          return {"message":"still is in progress"}
        
        state = propagate_status(state)
        save_file(self,state_path, state)
        
    for item in worker['state']:
      if worker['state'][item]['status'] == 'to-do':
        workflow_user_code = f'{worker["configuration_code"]}:{worker["user_code"]}'
        payload = {
          "data_options": worker["data_options"],
          "import_options": worker["import_options"],
          "calculation_options": worker["calculation_options"],
          "download_options": worker["download_options"],
          "state_options":  worker["state_options"],
          "state": worker['state'][item]
        }
        start_workflow(self, user_code=workflow_user_code, payload=payload)
        worker['state'][item]['status'] = "in-progress"
        state = propagate_status(state)
        save_file(self,state_path, state)
        return {"message":"new workflow has started"}
  elif worker['status'] == 'to-do':
    for item in worker['state']:
      if worker['state'][item]['status'] == 'to-do':
        workflow_user_code = f'{worker["configuration_code"]}:{worker["user_code"]}'
        payload = {
          "data_options": worker["data_options"],
          "import_options": worker["import_options"],
          "calculation_options": worker["calculation_options"],
          "download_options": worker["download_options"],
          "state_options":  worker["state_options"],
                            "state": worker['state'][item]
                        }
                        start_workflow(self, user_code=workflow_user_code, payload=payload)
                        worker['state'][item]['status'] = "in-progress"
                        worker['status'] = "in-progress"
                        
                        save_file(self,state_path, state)


1. Найти 

Payload