513 lines
18 KiB
Python
513 lines
18 KiB
Python
|
|
import os
|
|||
|
|
import shutil
|
|||
|
|
import zipfile
|
|||
|
|
import pickle
|
|||
|
|
import pandas as pd
|
|||
|
|
from datetime import datetime
|
|||
|
|
from pathlib import Path, PurePath
|
|||
|
|
from typing import Union, Optional, List, Dict, Any, Callable
|
|||
|
|
from utils.logger import log
|
|||
|
|
|
|||
|
|
class FileHandler:
|
|||
|
|
"""
|
|||
|
|
跨平台文件操作工具类(兼容Windows/macOS/Linux)
|
|||
|
|
功能规范:
|
|||
|
|
- 读取文件内容的方法返回DataFrame
|
|||
|
|
- 其他所有方法返回统一格式字典:
|
|||
|
|
{
|
|||
|
|
'success': bool, # 操作是否成功
|
|||
|
|
'message': str, # 操作结果描述
|
|||
|
|
'data': Any # 操作返回的数据(可选)
|
|||
|
|
}
|
|||
|
|
"""
|
|||
|
|
|
|||
|
|
def __init__(self, base_path: Optional[Union[str, Path]] = None):
|
|||
|
|
"""
|
|||
|
|
初始化文件处理器
|
|||
|
|
:param base_path: 基础路径(自动处理跨平台路径格式)
|
|||
|
|
"""
|
|||
|
|
self.base_path = self._normalize_path(base_path) if base_path else None
|
|||
|
|
self.log = log.bind(module=self.__class__.__name__)
|
|||
|
|
|
|||
|
|
def _normalize_path(self, path: Union[str, Path]) -> Path:
|
|||
|
|
"""统一转换为跨平台Path对象"""
|
|||
|
|
return Path(str(path).replace('\\', '/'))
|
|||
|
|
|
|||
|
|
def _resolve_path(self, path: Union[str, Path]) -> Path:
|
|||
|
|
"""解析路径(自动处理跨平台路径)"""
|
|||
|
|
path = self._normalize_path(path)
|
|||
|
|
if not path.is_absolute() and self.base_path:
|
|||
|
|
return self._normalize_path(self.base_path / path)
|
|||
|
|
return path
|
|||
|
|
|
|||
|
|
def _format_result(self,
|
|||
|
|
success: bool,
|
|||
|
|
message: str = "",
|
|||
|
|
data: Optional[Any] = None) -> Dict[str, Any]:
|
|||
|
|
"""统一返回结果格式"""
|
|||
|
|
return {
|
|||
|
|
'success': bool(success),
|
|||
|
|
'message': str(message),
|
|||
|
|
'data': data
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
def read_file(self,
|
|||
|
|
file_path: Union[str, Path],
|
|||
|
|
encoding: str = 'utf-8',
|
|||
|
|
**kwargs) -> pd.DataFrame:
|
|||
|
|
"""
|
|||
|
|
读取文件内容为DataFrame(跨平台兼容)
|
|||
|
|
:param file_path: 文件路径(自动处理跨平台格式)
|
|||
|
|
:param encoding: 文件编码(默认utf-8)
|
|||
|
|
:return: 包含文件内容的DataFrame
|
|||
|
|
:raises: 文件读取失败时抛出原始异常
|
|||
|
|
"""
|
|||
|
|
file_path = self._resolve_path(file_path)
|
|||
|
|
try:
|
|||
|
|
ext = self.get_file_extension(file_path)
|
|||
|
|
|
|||
|
|
if ext in ['csv', 'txt']:
|
|||
|
|
df = pd.read_csv(file_path, encoding=encoding, **kwargs)
|
|||
|
|
elif ext in ['xls', 'xlsx']:
|
|||
|
|
df = pd.read_excel(file_path, **kwargs)
|
|||
|
|
elif ext == 'json':
|
|||
|
|
df = pd.read_json(file_path, encoding=encoding, **kwargs)
|
|||
|
|
elif ext in ['pkl', 'pickle']:
|
|||
|
|
# 统一将pickle内容转为DataFrame返回
|
|||
|
|
obj = pd.read_pickle(file_path)
|
|||
|
|
if isinstance(obj, pd.DataFrame):
|
|||
|
|
df = obj
|
|||
|
|
elif isinstance(obj, list):
|
|||
|
|
df = pd.DataFrame(obj)
|
|||
|
|
elif isinstance(obj, dict):
|
|||
|
|
df = pd.DataFrame([obj])
|
|||
|
|
else:
|
|||
|
|
df = pd.DataFrame({'content': [obj]})
|
|||
|
|
elif ext == 'parquet':
|
|||
|
|
df = pd.read_parquet(file_path, **kwargs)
|
|||
|
|
else:
|
|||
|
|
with open(file_path, 'r', encoding=encoding) as f:
|
|||
|
|
return pd.DataFrame({'content': [f.read()]})
|
|||
|
|
|
|||
|
|
self.log.debug(f"文件读取成功 | path={file_path} shape={df.shape}")
|
|||
|
|
return df
|
|||
|
|
except Exception as e:
|
|||
|
|
self.log.error(f"文件读取失败 | path={file_path} error={str(e)}")
|
|||
|
|
raise
|
|||
|
|
|
|||
|
|
def write_file(self,
|
|||
|
|
file_path: Union[str, Path],
|
|||
|
|
data: Union[pd.DataFrame, Dict, List],
|
|||
|
|
encoding: str = 'utf-8',
|
|||
|
|
**kwargs) -> Dict[str, Any]:
|
|||
|
|
"""
|
|||
|
|
写入文件(跨平台兼容)
|
|||
|
|
:param file_path: 目标文件路径
|
|||
|
|
:param data: 要写入的数据(支持DataFrame/dict/list)
|
|||
|
|
:param encoding: 文件编码(默认utf-8)
|
|||
|
|
:return: 操作结果字典
|
|||
|
|
"""
|
|||
|
|
file_path = self._resolve_path(file_path)
|
|||
|
|
try:
|
|||
|
|
# 自动创建父目录
|
|||
|
|
parent_dir = file_path.parent
|
|||
|
|
if not parent_dir.exists():
|
|||
|
|
self.create_dir(parent_dir)
|
|||
|
|
|
|||
|
|
# 根据扩展名选择写入方式
|
|||
|
|
ext = self.get_file_extension(file_path)
|
|||
|
|
|
|||
|
|
if ext in ['pkl', 'pickle']:
|
|||
|
|
# 直接按原始对象进行pickle序列化
|
|||
|
|
with open(file_path, 'wb') as f:
|
|||
|
|
pickle.dump(data, f)
|
|||
|
|
else:
|
|||
|
|
# 统一数据格式到DataFrame
|
|||
|
|
if isinstance(data, pd.DataFrame):
|
|||
|
|
df = data
|
|||
|
|
else:
|
|||
|
|
df = pd.DataFrame(data if isinstance(data, list) else [data])
|
|||
|
|
|
|||
|
|
if ext in ['csv', 'txt']:
|
|||
|
|
df.to_csv(file_path, encoding=encoding, index=False, **kwargs)
|
|||
|
|
elif ext in ['xls', 'xlsx']:
|
|||
|
|
df.to_excel(file_path, index=False, **kwargs)
|
|||
|
|
elif ext == 'json':
|
|||
|
|
df.to_json(file_path, force_ascii=False, **kwargs)
|
|||
|
|
elif ext == 'parquet':
|
|||
|
|
df.to_parquet(file_path, **kwargs)
|
|||
|
|
else:
|
|||
|
|
with open(file_path, 'w', encoding=encoding) as f:
|
|||
|
|
f.write(str(data))
|
|||
|
|
|
|||
|
|
# 返回成功结果
|
|||
|
|
return self._format_result(
|
|||
|
|
True,
|
|||
|
|
"文件写入成功",
|
|||
|
|
{
|
|||
|
|
'file_path': str(file_path),
|
|||
|
|
'file_size': os.path.getsize(file_path)
|
|||
|
|
}
|
|||
|
|
)
|
|||
|
|
except Exception as e:
|
|||
|
|
return self._format_result(
|
|||
|
|
False,
|
|||
|
|
f"文件写入失败: {str(e)}",
|
|||
|
|
{'file_path': str(file_path)}
|
|||
|
|
)
|
|||
|
|
|
|||
|
|
def file_exists(self, file_path: Union[str, Path]) -> Dict[str, Any]:
|
|||
|
|
"""
|
|||
|
|
检查文件是否存在(跨平台兼容)
|
|||
|
|
:return: 包含exists字段的结果字典
|
|||
|
|
"""
|
|||
|
|
file_path = self._resolve_path(file_path)
|
|||
|
|
exists = file_path.is_file()
|
|||
|
|
msg = f"文件{'' if exists else '不'}存在: {file_path}"
|
|||
|
|
return self._format_result(True, msg, {'exists': exists})
|
|||
|
|
|
|||
|
|
def dir_exists(self, dir_path: Union[str, Path]) -> Dict[str, Any]:
|
|||
|
|
"""
|
|||
|
|
检查目录是否存在(跨平台兼容)
|
|||
|
|
:return: 包含exists字段的结果字典
|
|||
|
|
"""
|
|||
|
|
dir_path = self._resolve_path(dir_path)
|
|||
|
|
exists = dir_path.is_dir()
|
|||
|
|
msg = f"目录{'' if exists else '不'}存在: {dir_path}"
|
|||
|
|
return self._format_result(True, msg, {'exists': exists})
|
|||
|
|
|
|||
|
|
def create_dir(self, dir_path: Union[str, Path]) -> Dict[str, Any]:
|
|||
|
|
"""
|
|||
|
|
创建目录(跨平台兼容)
|
|||
|
|
:return: 包含path字段的结果字典
|
|||
|
|
"""
|
|||
|
|
dir_path = self._resolve_path(dir_path)
|
|||
|
|
try:
|
|||
|
|
dir_path.mkdir(parents=True, exist_ok=True)
|
|||
|
|
|
|||
|
|
# Windows系统需要额外设置权限
|
|||
|
|
if os.name == 'nt':
|
|||
|
|
try:
|
|||
|
|
os.chmod(dir_path, 0o777)
|
|||
|
|
except:
|
|||
|
|
pass
|
|||
|
|
|
|||
|
|
return self._format_result(True, "目录创建成功", {'path': str(dir_path)})
|
|||
|
|
except Exception as e:
|
|||
|
|
return self._format_result(False, f"目录创建失败: {str(e)}", {'path': str(dir_path)})
|
|||
|
|
|
|||
|
|
def delete_file(self, file_path: Union[str, Path]) -> Dict[str, Any]:
|
|||
|
|
"""
|
|||
|
|
删除文件(跨平台兼容)
|
|||
|
|
:return: 包含path字段的结果字典
|
|||
|
|
"""
|
|||
|
|
file_path = self._resolve_path(file_path)
|
|||
|
|
try:
|
|||
|
|
if not file_path.exists():
|
|||
|
|
return self._format_result(False, "文件不存在", {'path': str(file_path)})
|
|||
|
|
|
|||
|
|
file_path.unlink()
|
|||
|
|
return self._format_result(True, "文件删除成功", {'path': str(file_path)})
|
|||
|
|
except Exception as e:
|
|||
|
|
return self._format_result(False, f"文件删除失败: {str(e)}", {'path': str(file_path)})
|
|||
|
|
|
|||
|
|
def delete_dir(self, dir_path: Union[str, Path]) -> Dict[str, Any]:
|
|||
|
|
"""
|
|||
|
|
删除目录及其内容(跨平台兼容)
|
|||
|
|
:return: 包含path字段的结果字典
|
|||
|
|
"""
|
|||
|
|
dir_path = self._resolve_path(dir_path)
|
|||
|
|
try:
|
|||
|
|
if not dir_path.exists():
|
|||
|
|
return self._format_result(False, "目录不存在", {'path': str(dir_path)})
|
|||
|
|
|
|||
|
|
shutil.rmtree(dir_path)
|
|||
|
|
return self._format_result(True, "目录删除成功", {'path': str(dir_path)})
|
|||
|
|
except Exception as e:
|
|||
|
|
return self._format_result(False, f"目录删除失败: {str(e)}", {'path': str(dir_path)})
|
|||
|
|
|
|||
|
|
def list_files(self,
|
|||
|
|
dir_path: Union[str, Path],
|
|||
|
|
recursive: bool = False,
|
|||
|
|
pattern: str = '*') -> Dict[str, Any]:
|
|||
|
|
"""
|
|||
|
|
列出目录中的文件(跨平台兼容)
|
|||
|
|
:param recursive: 是否递归查找
|
|||
|
|
:param pattern: 文件匹配模式(如*.txt)
|
|||
|
|
:return: 包含files字段的结果字典
|
|||
|
|
"""
|
|||
|
|
dir_path = self._resolve_path(dir_path)
|
|||
|
|
try:
|
|||
|
|
if recursive:
|
|||
|
|
files = list(dir_path.rglob(pattern))
|
|||
|
|
else:
|
|||
|
|
files = list(dir_path.glob(pattern))
|
|||
|
|
|
|||
|
|
file_info = [
|
|||
|
|
{
|
|||
|
|
'path': str(f),
|
|||
|
|
'name': f.name,
|
|||
|
|
'size': f.stat().st_size,
|
|||
|
|
'modified': datetime.fromtimestamp(f.stat().st_mtime).isoformat(),
|
|||
|
|
'is_dir': f.is_dir()
|
|||
|
|
} for f in files if f.is_file() # 只返回文件,不包括目录
|
|||
|
|
]
|
|||
|
|
|
|||
|
|
return self._format_result(
|
|||
|
|
True,
|
|||
|
|
f"找到 {len(file_info)} 个文件",
|
|||
|
|
{'files': file_info}
|
|||
|
|
)
|
|||
|
|
except Exception as e:
|
|||
|
|
return self._format_result(
|
|||
|
|
False,
|
|||
|
|
f"列出文件失败: {str(e)}",
|
|||
|
|
{'files': []}
|
|||
|
|
)
|
|||
|
|
|
|||
|
|
def get_file_extension(self, file_path: Union[str, Path]) -> str:
|
|||
|
|
"""
|
|||
|
|
获取文件扩展名(跨平台兼容)
|
|||
|
|
:return: 小写且不带点的扩展名(如 'jpg')
|
|||
|
|
"""
|
|||
|
|
file_path = self._resolve_path(file_path)
|
|||
|
|
ext = file_path.suffix.lower().lstrip('.')
|
|||
|
|
self.log.trace(f"获取文件扩展名 | path={file_path} ext={ext}")
|
|||
|
|
return ext
|
|||
|
|
|
|||
|
|
def copy_file(self,
|
|||
|
|
src_path: Union[str, Path],
|
|||
|
|
dst_path: Union[str, Path]) -> Dict[str, Any]:
|
|||
|
|
"""
|
|||
|
|
复制文件(跨平台兼容)
|
|||
|
|
:return: 包含source和destination字段的结果字典
|
|||
|
|
"""
|
|||
|
|
src_path = self._resolve_path(src_path)
|
|||
|
|
dst_path = self._resolve_path(dst_path)
|
|||
|
|
try:
|
|||
|
|
if not src_path.exists():
|
|||
|
|
return self._format_result(
|
|||
|
|
False,
|
|||
|
|
"源文件不存在",
|
|||
|
|
{
|
|||
|
|
'source': str(src_path),
|
|||
|
|
'destination': str(dst_path)
|
|||
|
|
}
|
|||
|
|
)
|
|||
|
|
|
|||
|
|
# 确保目标目录存在
|
|||
|
|
self.create_dir(dst_path.parent)
|
|||
|
|
|
|||
|
|
shutil.copy2(src_path, dst_path)
|
|||
|
|
return self._format_result(
|
|||
|
|
True,
|
|||
|
|
"文件复制成功",
|
|||
|
|
{
|
|||
|
|
'source': str(src_path),
|
|||
|
|
'destination': str(dst_path),
|
|||
|
|
'file_size': dst_path.stat().st_size
|
|||
|
|
}
|
|||
|
|
)
|
|||
|
|
except Exception as e:
|
|||
|
|
return self._format_result(
|
|||
|
|
False,
|
|||
|
|
f"文件复制失败: {str(e)}",
|
|||
|
|
{
|
|||
|
|
'source': str(src_path),
|
|||
|
|
'destination': str(dst_path)
|
|||
|
|
}
|
|||
|
|
)
|
|||
|
|
|
|||
|
|
def move_file(self,
|
|||
|
|
src_path: Union[str, Path],
|
|||
|
|
dst_path: Union[str, Path]) -> Dict[str, Any]:
|
|||
|
|
"""
|
|||
|
|
移动/重命名文件(跨平台兼容)
|
|||
|
|
:return: 包含source和destination字段的结果字典
|
|||
|
|
"""
|
|||
|
|
src_path = self._resolve_path(src_path)
|
|||
|
|
dst_path = self._resolve_path(dst_path)
|
|||
|
|
try:
|
|||
|
|
if not src_path.exists():
|
|||
|
|
return self._format_result(
|
|||
|
|
False,
|
|||
|
|
"源文件不存在",
|
|||
|
|
{
|
|||
|
|
'source': str(src_path),
|
|||
|
|
'destination': str(dst_path)
|
|||
|
|
}
|
|||
|
|
)
|
|||
|
|
|
|||
|
|
# 确保目标目录存在
|
|||
|
|
self.create_dir(dst_path.parent)
|
|||
|
|
|
|||
|
|
shutil.move(src_path, dst_path)
|
|||
|
|
return self._format_result(
|
|||
|
|
True,
|
|||
|
|
"文件移动成功",
|
|||
|
|
{
|
|||
|
|
'source': str(src_path),
|
|||
|
|
'destination': str(dst_path)
|
|||
|
|
}
|
|||
|
|
)
|
|||
|
|
except Exception as e:
|
|||
|
|
return self._format_result(
|
|||
|
|
False,
|
|||
|
|
f"文件移动失败: {str(e)}",
|
|||
|
|
{
|
|||
|
|
'source': str(src_path),
|
|||
|
|
'destination': str(dst_path)
|
|||
|
|
}
|
|||
|
|
)
|
|||
|
|
|
|||
|
|
def zip_files(self,
|
|||
|
|
file_paths: List[Union[str, Path]],
|
|||
|
|
zip_path: Union[str, Path]) -> Dict[str, Any]:
|
|||
|
|
"""
|
|||
|
|
压缩多个文件到zip(跨平台兼容)
|
|||
|
|
:param file_paths: 要压缩的文件路径列表
|
|||
|
|
:param zip_path: 目标zip文件路径
|
|||
|
|
:return: 包含zip_path和file_count字段的结果字典
|
|||
|
|
"""
|
|||
|
|
zip_path = self._resolve_path(zip_path)
|
|||
|
|
try:
|
|||
|
|
# 确保目标目录存在
|
|||
|
|
self.create_dir(zip_path.parent)
|
|||
|
|
|
|||
|
|
with zipfile.ZipFile(zip_path, 'w', zipfile.ZIP_DEFLATED) as zipf:
|
|||
|
|
file_count = 0
|
|||
|
|
for file_path in file_paths:
|
|||
|
|
file_path = self._resolve_path(file_path)
|
|||
|
|
if file_path.exists():
|
|||
|
|
zipf.write(file_path, file_path.name)
|
|||
|
|
file_count += 1
|
|||
|
|
|
|||
|
|
return self._format_result(
|
|||
|
|
True,
|
|||
|
|
"文件压缩成功",
|
|||
|
|
{
|
|||
|
|
'zip_path': str(zip_path),
|
|||
|
|
'file_count': file_count,
|
|||
|
|
'zip_size': os.path.getsize(zip_path)
|
|||
|
|
}
|
|||
|
|
)
|
|||
|
|
except Exception as e:
|
|||
|
|
return self._format_result(
|
|||
|
|
False,
|
|||
|
|
f"文件压缩失败: {str(e)}",
|
|||
|
|
{
|
|||
|
|
'zip_path': str(zip_path)
|
|||
|
|
}
|
|||
|
|
)
|
|||
|
|
|
|||
|
|
def unzip(self,
|
|||
|
|
zip_path: Union[str, Path],
|
|||
|
|
extract_to: Optional[Union[str, Path]] = None) -> Dict[str, Any]:
|
|||
|
|
"""
|
|||
|
|
解压zip文件(跨平台兼容)
|
|||
|
|
:param extract_to: 解压目标目录(默认为zip文件所在目录)
|
|||
|
|
:return: 包含extract_to和file_count字段的结果字典
|
|||
|
|
"""
|
|||
|
|
zip_path = self._resolve_path(zip_path)
|
|||
|
|
extract_to = self._resolve_path(extract_to) if extract_to else zip_path.parent
|
|||
|
|
|
|||
|
|
try:
|
|||
|
|
if not zip_path.exists():
|
|||
|
|
return self._format_result(
|
|||
|
|
False,
|
|||
|
|
"ZIP文件不存在",
|
|||
|
|
{
|
|||
|
|
'zip_path': str(zip_path),
|
|||
|
|
'extract_to': str(extract_to)
|
|||
|
|
}
|
|||
|
|
)
|
|||
|
|
|
|||
|
|
# 确保目标目录存在
|
|||
|
|
self.create_dir(extract_to)
|
|||
|
|
|
|||
|
|
with zipfile.ZipFile(zip_path, 'r') as zipf:
|
|||
|
|
file_list = zipf.namelist()
|
|||
|
|
zipf.extractall(extract_to)
|
|||
|
|
|
|||
|
|
return self._format_result(
|
|||
|
|
True,
|
|||
|
|
"文件解压成功",
|
|||
|
|
{
|
|||
|
|
'extract_to': str(extract_to),
|
|||
|
|
'file_count': len(file_list)
|
|||
|
|
}
|
|||
|
|
)
|
|||
|
|
except Exception as e:
|
|||
|
|
return self._format_result(
|
|||
|
|
False,
|
|||
|
|
f"文件解压失败: {str(e)}",
|
|||
|
|
{
|
|||
|
|
'zip_path': str(zip_path),
|
|||
|
|
'extract_to': str(extract_to)
|
|||
|
|
}
|
|||
|
|
)
|
|||
|
|
|
|||
|
|
|
|||
|
|
# ---------------------------- 测试用例 ----------------------------
|
|||
|
|
if __name__ == "__main__":
|
|||
|
|
# 初始化处理器(自动处理跨平台路径)
|
|||
|
|
project_root = next(p for p in Path(__file__).resolve().parents if
|
|||
|
|
(p / '.git').exists() or (p / 'pyproject.toml').exists() or (p / 'requirements.txt').exists())
|
|||
|
|
handler = FileHandler(project_root / "test")
|
|||
|
|
|
|||
|
|
# 测试路径标准化
|
|||
|
|
test_paths = [
|
|||
|
|
"normal/path",
|
|||
|
|
"windows\\style\\path",
|
|||
|
|
"mixed/path\\with\\both"
|
|||
|
|
]
|
|||
|
|
|
|||
|
|
print("=== 路径标准化测试 ===")
|
|||
|
|
for path in test_paths:
|
|||
|
|
resolved = handler._resolve_path(path)
|
|||
|
|
print(f"原始路径: {path} -> 标准化: {resolved} (类型: {type(resolved)})")
|
|||
|
|
|
|||
|
|
# 测试目录操作
|
|||
|
|
print("\n=== 目录操作测试 ===")
|
|||
|
|
dir_result = handler.create_dir("test_dir")
|
|||
|
|
print(dir_result)
|
|||
|
|
|
|||
|
|
# 测试文件操作
|
|||
|
|
print("\n=== 文件操作测试 ===")
|
|||
|
|
test_data = [{"name": "Alice", "age": 25}, {"name": "Bob", "age": 30}]
|
|||
|
|
write_result = handler.write_file("test_dir/data.json", test_data)
|
|||
|
|
print(write_result)
|
|||
|
|
|
|||
|
|
# 测试文件读取
|
|||
|
|
try:
|
|||
|
|
df = handler.read_file("test_dir/data.json")
|
|||
|
|
print("\n读取文件内容:")
|
|||
|
|
print(df)
|
|||
|
|
except Exception as e:
|
|||
|
|
print(f"\n文件读取失败: {str(e)}")
|
|||
|
|
|
|||
|
|
# 测试列表文件
|
|||
|
|
print("\n=== 文件列表测试 ===")
|
|||
|
|
list_result = handler.list_files("test_dir")
|
|||
|
|
print(list_result)
|
|||
|
|
|
|||
|
|
# 测试压缩解压
|
|||
|
|
print("\n=== 压缩解压测试 ===")
|
|||
|
|
zip_result = handler.zip_files(
|
|||
|
|
["test_dir/data.json"],
|
|||
|
|
"test_archive.zip"
|
|||
|
|
)
|
|||
|
|
print(zip_result)
|
|||
|
|
|
|||
|
|
unzip_result = handler.unzip(
|
|||
|
|
"test_archive.zip",
|
|||
|
|
"extracted_files"
|
|||
|
|
)
|
|||
|
|
print(unzip_result)
|
|||
|
|
|
|||
|
|
# 清理测试数据
|
|||
|
|
print("\n=== 清理测试数据 ===")
|
|||
|
|
print(handler.delete_file("test_dir/data.json"))
|
|||
|
|
print(handler.delete_dir("test_dir"))
|
|||
|
|
print(handler.delete_file("test_archive.zip"))
|
|||
|
|
print(handler.delete_dir("extracted_files"))
|