# coding: utf-8 import datetime import random from typing import Union import pandas as pd from sqlalchemy import select, delete from tornado_swagger.model import register_swagger_model from wtforms import StringField, TextAreaField, IntegerField, DateTimeField from wtforms.validators import Length import models from models.common_model import CommonModel from models.db_models import TD3iGovsOrderMaster from paste.core.logging import echo_log from paste.rbac.rbac_user import RbacUser from paste.util.pagination import Pagination from paste.web.form import ModelForm class GovsOrderMasterForm(ModelForm): """省12345工单主表表单验证类""" id = IntegerField('工单唯一ID') belong_dept = StringField('所属部门', validators=[Length(max=100)]) order_id = StringField('工单编号', validators=[Length(max=50)]) order_no = StringField('工单号', validators=[Length(max=50)]) order_type = IntegerField('表单类型') order_source = StringField('诉求来源', validators=[Length(max=30)]) order_source_detail = StringField('诉求来源详情', validators=[Length(max=30)]) order_status = StringField('工单状态', validators=[Length(max=50)]) order_user_id = StringField('用户ID', validators=[Length(max=50)]) order_user_name = StringField('来电人姓名', validators=[Length(max=50)]) order_user_sex = StringField('来电人性别', validators=[Length(max=50)]) order_user_phone2 = StringField('备用联系电话', validators=[Length(max=20)]) order_handle_way = StringField('处理方式', validators=[Length(max=50)]) order_invalid_type = StringField('工单作废原因', validators=[Length(max=50)]) master_id = IntegerField('工单主表ID') call_number = StringField('来电号码', validators=[Length(max=20)]) contact_number = StringField('联系电话', validators=[Length(max=20)]) title = StringField('工单标题', validators=[Length(max=100)]) call_time = DateTimeField('来电时间') first_order_status = StringField('一级状态编码', validators=[Length(max=10)]) secord_order_status = StringField('二级状态编码', validators=[Length(max=10)]) atomic_order_status = StringField('原子状态编码', validators=[Length(max=10)]) area_code = StringField('区域代码', validators=[Length(max=10)]) area_code_city = StringField('市区域代码', validators=[Length(max=50)]) area_code_area = StringField('区区域代码', validators=[Length(max=50)]) area_code_street = StringField('街道区域代码', validators=[Length(max=50)]) address_detail = TextAreaField('详细地址') case_lnglat = StringField('地理坐标', validators=[Length(max=50)]) case_accord_type_one_name = StringField('诉求归口一级', validators=[Length(max=50)]) case_accord_type_two_name = StringField('诉求归口二级', validators=[Length(max=50)]) case_accord_type_three_name = StringField('诉求归口三级', validators=[Length(max=50)]) case_accord_type_four_name = StringField('四级事项分类', validators=[Length(max=50)]) case_accord_type_five_name = StringField('五级事项分类', validators=[Length(max=50)]) case_content = TextAreaField('诉求内容') case_goal = TextAreaField('诉求目的') case_labels = TextAreaField('工单标签列表') case_public = StringField('是否公开', validators=[Length(max=10)]) case_is_urgent = IntegerField('紧急程度') case_comple_time = DateTimeField('案件办结时间') first_level_affiliation = StringField('一级归属单位', validators=[Length(max=50)]) second_level_affiliation = StringField('二级归属单位', validators=[Length(max=50)]) third_level_affiliation = StringField('三级归属单位', validators=[Length(max=50)]) fourth_level_affiliation = StringField('四级归属单位', validators=[Length(max=50)]) fifth_level_affiliation = StringField('五级归属单位', validators=[Length(max=50)]) case_accord_code = StringField('事项编码', validators=[Length(max=50)]) sixth_level_affiliation = StringField('六级归属单位', validators=[Length(max=50)]) seventh_level_affiliation = StringField('七级归属单位', validators=[Length(max=50)]) info_protect = IntegerField('信息保护') case_is_visit = IntegerField('是否回访') service_object_type = IntegerField('服务对象类型') hotspot = StringField('是否热点事件', validators=[Length(max=10)]) result_satisfied = StringField('结果满意度', validators=[Length(max=10)]) first_vist_satisfied = StringField('首次走访满意度', validators=[Length(max=10)]) contact_timely = StringField('是否及时联系', validators=[Length(max=50)]) distribute_type = StringField('分派类型', validators=[Length(max=50)]) active_dept_ids = StringField('当前处理部门ID列表', validators=[Length(max=255)]) active_dept_name = StringField('当前处理部门名称', validators=[Length(max=50)]) case_solve = TextAreaField('处理结果') supervise_type = StringField('监督类型', validators=[Length(max=30)]) leader_indicate = TextAreaField('领导批示') extension = TextAreaField('扩展字段') org_id = StringField('组织ID', validators=[Length(max=50)]) org_name = StringField('组织名称', validators=[Length(max=50)]) knowledge_quote = TextAreaField('知识引用') special_type = StringField('特殊类型', validators=[Length(max=30)]) attachment_ids = TextAreaField('附件ID列表') file_exist = IntegerField('是否存在附件') record_id = StringField('通话记录ID', validators=[Length(max=50)]) call_end_time = DateTimeField('通话结束时间') call_total_time = StringField('通话总时长', validators=[Length(max=20)]) plan_finish_time = DateTimeField('计划完成时间') remark = TextAreaField('备注') tenant_id = IntegerField('租户ID') process_instance_id = StringField('流程实例ID', validators=[Length(max=100)]) visit_count = IntegerField('走访次数') residue_date = StringField('剩余天数', validators=[Length(max=30)]) whether_approval = StringField('是否审批', validators=[Length(max=10)]) over_time_warning_flag = StringField('超时预警标志', validators=[Length(max=10)]) create_no = StringField('创建编号', validators=[Length(max=20)]) belong_platform = StringField('所属平台', validators=[Length(max=50)]) next_task_id = StringField('下一个任务ID', validators=[Length(max=64)]) return_visit_reason = TextAreaField('回访原因') back_count = StringField('回退次数', validators=[Length(max=100)]) visit_adv_content = TextAreaField('走访建议内容') current_processing_platform = StringField('当前处理平台', validators=[Length(max=100)]) judgment_flag = StringField('判定标志', validators=[Length(max=10)]) thrid_order_id = StringField('第三方工单ID', validators=[Length(max=50)]) is_dispatch_accurate = StringField('是否精准分派', validators=[Length(max=10)]) is_coordination = StringField('是否协调', validators=[Length(max=10)]) coordination_time = DateTimeField('协调时间') govs_sign = IntegerField('是否已在省12345签收,1:签收,0:未签收') creator_id = IntegerField('创建人ID') create_by = StringField('创建人姓名', validators=[Length(max=50)]) updator_id = IntegerField('更新人ID') update_by = StringField('更新人姓名', validators=[Length(max=50)]) create_date = DateTimeField('工单创建时间') update_date = DateTimeField('工单更新时间') def process(self, formdata=None, obj=None, **kwargs): if formdata: for name, values in formdata.items(): if isinstance(values, list) and values: formdata[name] = [v.strip() if isinstance(v, str) else v for v in values] elif isinstance(values, str): formdata[name] = values.strip() super().process(formdata, obj, **kwargs) class GovsOrderMasterBase(TD3iGovsOrderMaster, CommonModel): """省12345工单主表业务基类""" FieldMapping = { "id": "id", 'plan_sign_time': 'planSignTime', 'claim_status': 'claimStatus', 'plan_back_time': 'planBackTime', 'handle_time': 'handleTime', 'back_time': 'backTime', 'complete_time': 'completeTime', "belong_dept": "belongDept", "order_id": "orderId", "order_no": "orderNo", "order_type": "orderType", "order_source": "orderSource", "order_source_detail": "orderSourceDetail", "order_status": "orderStatus", "order_user_id": "orderUserId", "order_user_name": "orderUserName", "order_user_sex": "orderUserSex", "order_user_phone2": "orderUserPhone2", "order_handle_way": "orderHandleWay", "order_invalid_type": "orderInvalidType", "next_task_id": "nextTaskId", "master_id": "masterId", "call_number": "callNumber", "contact_number": "contactNumber", "title": "title", "call_time": "callTime", "first_order_status": "firstOrderStatus", "secord_order_status": "secordOrderStatus", "atomic_order_status": "atomicOrderStatus", "area_code": "areaCode", "area_code_city": "areaCodeCity", "area_code_area": "areaCodeArea", "area_code_street": "areaCodeStreet", "address_detail": "addressDetail", "case_lnglat": "caseLnglat", "case_accord_type_one_name": "caseAccordTypeOneName", "case_accord_type_two_name": "caseAccordTypeTwoName", "case_accord_type_three_name": "caseAccordTypeThreeName", "case_accord_type_four_name": "caseAccordTypeFourName", "case_accord_type_five_name": "caseAccordTypeFiveName", "case_accord_ext": "caseAccordExt", "case_content": "caseContent", "case_goal": "caseGoal", "case_labels": "caseLabels", "case_public": "casePublic", "case_type": "caseType", "case_is_urgent": "caseIsUrgent", "case_comple_time": "caseCompleTime", "first_level_affiliation": "firstLevelAffiliation", "second_level_affiliation": "secondLevelAffiliation", "third_level_affiliation": "thirdLevelAffiliation", "fourth_level_affiliation": "fourthLevelAffiliation", "fifth_level_affiliation": "fifthLevelAffiliation", "case_accord_code": "caseAccordCode", "sixth_level_affiliation": "sixthLevelAffiliation", "seventh_level_affiliation": "seventhLevelAffiliation", "info_protect": "infoProtect", "case_is_visit": "caseIsVisit", "service_object_type": "serviceObjectType", "hotspot": "hotspot", "result_satisfied": "resultSatisfied", "first_vist_satisfied": "firstVistSatisfied", "contact_timely": "contactTimely", "distribute_type": "distributeType", "dept_type": "deptType", "dept_name": "deptName", "active_dept_ids": "activeDeptIds", "active_dept_name": "activeDeptName", "case_solve": "caseSolve", "supervise_type": "superviseType", "leader_indicate": "leaderIndicate", "extension": "extension", "org_id": "orgId", "org_name": "orgName", "knowledge_quote": "knowledgeQuote", "special_type": "specialType", "attachment_ids": "attachmentIds", "attachment_list": "attachmentList", "file_exist": "fileExist", "record_id": "recordId", "call_end_time": "callEndTime", "call_total_time": "callTotalTime", "plan_finish_time": "planFinishTime", "remark": "remark", "tenant_id": "tenantId", "erge_revoke_plug": "ergeRevokePlug", "exist_quoto_info": "existQuotoInfo", "process_instance_id": "processInstanceId", "sound_recording_address_list": "soundRecordingAddressList", "visit_count": "visitCount", "residue_date": "residueDate", "whether_approval": "whetherApproval", "over_time_warning_flag": "overTimeWarningFlag", "create_no": "createNo", "belong_platform": "belongPlatform", "return_visit_reason": "returnVisitReason", "back_count": "backCount", "visit_adv_content": "visitAdvContent", "tripartite_call_record_info": "tripartiteCallRecordInfo", "knowledge_references": "knowledgeReferences", "current_processing_platform": "currentProcessingPlatform", "judgment_flag": "judgmentFlag", "thrid_order_id": "thridOrderId", "is_dispatch_accurate": "isDispatchAccurate", "is_coordination": "isCoordination", "coordination_time": "coordinationTime", "creator_id": "creatorId", "create_by": "createBy", "updator_id": "updatorId", "update_by": "updateBy" } @classmethod async def exist_other(cls, id: Union[str, int], order_id: str = None, order_no: str = None): """检查是否存在除当前工单外的其他同编号工单""" _query = select(cls).where(cls.id != id) if order_id: _query = _query.where(cls.order_id == order_id) if order_no: _query = _query.where(cls.order_no == order_no) _task: cls = await cls.query_first(_query) return _task @classmethod async def find_by_ids(cls, ids: list[Union[str, int]]): """根据ID列表批量查找工单""" _query = select(cls).where(cls.id.in_(ids)) _list: list[cls] = (await cls.orm_execute_scalars(_query)).all() return _list @classmethod async def is_exist(cls, order_id: str = None, order_no: str = None): """检查工单是否已经存在""" _query = select(cls) if order_id: _query = _query.where(cls.order_id == order_id) if order_no: _query = _query.where(cls.order_no == order_no) _task: cls = await cls.query_first(_query) return _task @classmethod async def search_base(cls, is_paging=True, **kwargs): """按参数搜索工单的基础方法""" page_number = kwargs.get('page_number', random.randint(1, 100)) page_size = kwargs.get('page_size', 20) kwargs.update({'page_number': page_number, 'page_size': page_size}) _name_likes = { cls.order_status.key: '%{}%', cls.order_source.key: '%{}%', cls.case_accord_type_one_name.key: '%{}%', } _query = select(cls).where( *cls.search_wheres(likes=_name_likes, **kwargs) ).group_by(cls.id) _paging = None if is_paging: _row_count = await cls.query_count(_query) _paging = Pagination(_row_count).paging(page_number, page_size) _data_query = _query.limit(page_size).offset(_paging.offset_size) else: _data_query = _query.where() _sort_clause = cls.sort_clauses(kwargs.get('sort_clause', {})) if _sort_clause: _data_query = _data_query.order_by(*_sort_clause) else: _data_query = _data_query.order_by(cls.create_date.desc()) _df = await cls.query_as_df(_data_query) if not _df.empty: _df.replace(models.EmptyInDF + models.EmptyDatetimeInDF, '', inplace=True) _df[cls.id.key] = _df[cls.id.key].astype(str) return _df, _paging @classmethod async def search(cls, **kwargs): """按参数搜索工单,返回分页格式数据""" _df, _paging = await cls.search_base(**kwargs) return { 'total': _paging.row_count, 'rows': _df.to_dict('records'), 'pagination': { 'page_number': _paging.page_number, 'page_count': _paging.page_count, 'page_size': _paging.page_size, }, } @classmethod async def exists_order_id(cls, data_df: pd.DataFrame): """根据 order_id 判断数据是否存在""" if data_df.empty: return pd.DataFrame(), pd.DataFrame() order_ids = data_df[cls.order_id.key].unique().tolist() if not order_ids: return pd.DataFrame(), data_df.copy() _query = select(cls.id, cls.order_id).where(cls.order_id.in_(order_ids)) existing_df = await cls.query_as_df(_query) if existing_df.empty: return pd.DataFrame(), data_df.copy() order_id_to_id_map = dict(zip(existing_df[cls.order_id.key], existing_df[cls.id.key])) mask_exists = data_df[cls.order_id.key].isin(existing_df[cls.order_id.key]) exists_df = data_df[mask_exists].copy() exists_df[cls.id.key] = exists_df[cls.order_id.key].map(order_id_to_id_map) latest_df = data_df[~mask_exists].copy() return exists_df, latest_df @register_swagger_model class GovsOrderMaster(GovsOrderMasterBase): """省12345工单主表业务类""" @classmethod async def create(cls, user: RbacUser = None, **kwargs): """创建新工单""" for _k, _v in kwargs.items(): if isinstance(_v, str): kwargs[_k] = _v.strip() _form = GovsOrderMasterForm(formdata=kwargs) _form.validate_form() _existing = await cls.is_exist( order_id=_form.order_id.data, order_no=_form.order_no.data ) assert _existing is None, "工单编号或工单号已存在,不能重复创建。" _task = cls().copy_from_dict(_form.data, skip_none=True) if user: _task.created_by = user.username _task.updated_by = user.username await _task.async_save() return _task @classmethod async def delete(cls, task_id: Union[str, int]): """删除工单""" _task: cls = await cls.async_find_by_id(task_id) assert _task, f"根据 ID {task_id} 未找到工单。" _del_query = delete(cls).where(cls.id == _task.id) await cls.raw_execute(_del_query) echo_log(f'已删除工单(工单号:{_task.order_no},ID:{_task.id}).') return _task @classmethod async def modify(cls, task_id: Union[str, int], user: RbacUser = None, **kwargs): """修改工单信息""" for _k, _v in kwargs.items(): if isinstance(_v, str): kwargs[_k] = _v.strip() _form = GovsOrderMasterForm(formdata=kwargs) _form.validate_form() _other = await cls.exist_other( task_id, order_id=_form.order_id.data, order_no=_form.order_no.data ) assert _other is None, "工单编号或工单号已存在,不能重复修改。" _task: cls = await cls.async_find_by_id(task_id) assert _task, f'查无此工单信息。' _task.copy_from_dict(_form.data, skip_none=True).before_save() if user: _task.updated_by = user.username await _task.async_save() return _task @classmethod async def create_batch(cls, data_df: pd.DataFrame, user: RbacUser = None): """批量创建工单""" if data_df.empty: return 0 if user: data_df['created_by'] = user.username data_df['updated_by'] = user.username records = data_df.to_dict('records') tasks = [cls().copy_from_dict(record, skip_none=True) for record in records] session = cls.get_aio_session() try: session.add_all(tasks) await session.commit() except Exception as e: await session.rollback() raise e finally: await session.close() echo_log(f"批量创建成功:创建 {len(tasks)} 条工单。") return len(tasks) @classmethod async def modify_batch(cls, data_df: pd.DataFrame, user: RbacUser = None): """批量修改工单""" if data_df.empty: return 0 if 'id' not in data_df.columns: echo_log(f"错误:modify_batch 要求输入数据必须包含 '{cls.id.key}' 列") return 0 data_df['updated_at'] = datetime.datetime.now() if user: data_df['updated_by'] = user.username update_data = data_df.to_dict('records') session = cls.get_aio_session() try: await session.run_sync( lambda sync_session: sync_session.bulk_update_mappings(cls, update_data) ) await session.commit() updated_count = len(update_data) except Exception as e: await session.rollback() raise e finally: await session.close() echo_log(f"批量修改成功:更新 {updated_count} 条工单。") return updated_count @classmethod async def save_batch(cls, data_df: pd.DataFrame, user: RbacUser = None): """批量保存数据,自动处理新建和更新""" _exists_df, _latest_df = await cls.exists_order_id(data_df) _created_count = await cls.create_batch(_latest_df, user) _updated_count = await cls.modify_batch(_exists_df, user) return _created_count, _updated_count