59 lines
1.8 KiB
Python
59 lines
1.8 KiB
Python
"""
|
||
接受OA请求,读取数字城管的是否允许申请延期。
|
||
"""
|
||
import logging
|
||
|
||
from apps.api import dcm
|
||
from apps.app_handler import AppHandler
|
||
from dock.dcm import dcm_scrape_allow_postpone
|
||
from models.dcm_task import DcmTask
|
||
from paste.core.logging import echo_log
|
||
from paste.web.decorators import route
|
||
|
||
|
||
@route(f'{dcm.ApiPrefix}/fetchAllowPostpone')
|
||
class AllowPostponeHandler(AppHandler):
|
||
"""
|
||
获取是否允许申请延期接口。
|
||
|
||
对接数字城管系统的获取是否允许申请延期接口,用于判断工单有哪些是否允许申请延期。
|
||
"""
|
||
|
||
async def fetch_allow_postpone(self, **kwargs) -> dict:
|
||
# 必填参数校验
|
||
required_keys = ['gdId']
|
||
missing = [
|
||
k for k in required_keys
|
||
if k not in kwargs or kwargs[k] is None
|
||
]
|
||
if missing:
|
||
raise ValueError(f"缺少必要参数: {missing}")
|
||
|
||
dcm_task_id = kwargs.get('gdId', '')
|
||
dcm_task = await DcmTask(id=dcm_task_id).async_find_first()
|
||
assert dcm_task, f"未找到待办工单,工单ID:{dcm_task_id}"
|
||
success, message = await dcm_scrape_allow_postpone.fetch_allow_postpone(dcm_task)
|
||
return {
|
||
'success': success,
|
||
'msg': message,
|
||
}
|
||
|
||
# @auth_token
|
||
async def post(self):
|
||
"""
|
||
处理 POST 请求。
|
||
|
||
---
|
||
tags:
|
||
- D3I API
|
||
summary: 获取是否允许申请延期接口
|
||
"""
|
||
try:
|
||
echo_log(self.request.body.decode())
|
||
_, params = self.get_request_params()
|
||
_result = await self.fetch_allow_postpone(**params)
|
||
self.response_ok(code=0, data=_result)
|
||
except Exception as e:
|
||
self.response_error(e, status_code=200, api_status_code=500)
|
||
self.log(msg=e, level=logging.ERROR, is_log_exc=True)
|