抱歉,您的浏览器无法访问本站
本页面需要浏览器支持(启用)JavaScript
了解详情 >

DepthTrack 是 RGB-D 目标跟踪的数据集之一, 但作者并未提供完整数据集下载链接, 网上也未能找到百度网盘链接, 同时VOT toolkit工具配置极其恶心, 故写了个小工具来帮助环境搭建.

DepthTrack数据集下载

原 Github 仓库给出的是每个序列的下载链接, 因此可以使用 Python 自动获取压缩包路径, 实现下载.

DepthTrack 下载脚本
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
import hashlib
import json
import os
import requests
import zipfile
from tqdm import tqdm
from concurrent.futures import ThreadPoolExecutor, as_completed

download_folder = r'./DepthTrack'

def download_file(file_info):
file_name = file_info['name']
url = file_info['url']
file_path = os.path.join(download_folder, file_name)
try:
response = requests.get(url, stream=True)
total_size = int(response.headers.get('content-length', 0))
block_size = 1024 # 1 Kibibyte
progress_bar = tqdm(total=total_size, unit='iB', unit_scale=True, desc=file_name)

with open(file_path, 'wb') as file:
for data in response.iter_content(block_size):
progress_bar.update(len(data))
file.write(data)
progress_bar.close()
return file_name, True
except Exception as e:
print(f"Error downloading {file_name}: {e}")
return file_name, False

def download_files(file_infos, max_workers=5):
print(f'start downloading {len(file_infos)} files')
results = []
with ThreadPoolExecutor(max_workers=max_workers) as executor:
future_to_file = {executor.submit(download_file, file_info): file_info for file_info in file_infos}
for future in as_completed(future_to_file):
file_info = future_to_file[future]
try:
file_name, success = future.result()
results.append((file_name, success))
except Exception as exc:
print(f"{file_info['name']} error: {exc}")
return results

def generate_download_items(json_record: int | list) -> list:
if isinstance(json_record, list):
download_items = []
for record in json_record:
download_items.extend(generate_download_items(record))
return download_items

json_url = f'https://zenodo.org/records/{json_record}/export/json'
save_to = os.path.join(download_folder, f'{json_record}.json')
if not os.path.exists(save_to):
response = requests.get(json_url, stream=True)
if response.status_code == 200:
with open(save_to, 'wb') as file:
for chunk in response.iter_content(chunk_size=1024):
if chunk:
file.write(chunk)
else:
raise Exception(f"Failed to download file from {json_url}")
with open(save_to, 'r', encoding='utf-8') as file:
data = json.load(file)
# 生成下载列表
download_items = []
entries = data['files']['entries']
for entry in entries:
dic = entries[entry]
item = {
'name': dic['key'],
'url': dic['links']['content'],
'size': dic['size'],
'md5': dic['checksum'][4:]
}
download_items.append(item)
return download_items

def generate_split_txt(download_item: list, item_type: str) -> None:
txt_file_path = os.path.join(download_folder, f'depthtrack_{item_type}.txt')
if not os.path.exists(txt_file_path):
with open(txt_file_path, 'w', encoding='utf-8') as file:
file.write('\n'.join([item['name'].split('.')[0] for item in download_item]))

def check_exists(download_items: list) -> list:
exist_item_list = [item for item in download_items if os.path.exists(os.path.join(download_folder, item['name']))]
need_to_download = [item for item in download_items if item not in exist_item_list]
print(f'find {len(exist_item_list)} exist files')
if len(exist_item_list) == 0:
return download_items
with tqdm(total=len(exist_item_list), desc='checking exist') as pbar:
correct_zip = 0
wrong_zip = 0
for item in exist_item_list:
file_path = os.path.join(download_folder, item['name'])
if item['md5'] == hashlib.md5(open(file_path, 'rb').read()).hexdigest():
correct_zip += 1
else:
need_to_download.append(item)
wrong_zip += 1
pbar.update(1)
pbar.desc = f'correct: {correct_zip}, wrong: {wrong_zip}'
print(f'remove {correct_zip} files')
return need_to_download

def task_download(total_download_items) -> list:
need_to_download = check_exists(total_download_items)
download_files(need_to_download)
return need_to_download

def task_check_zip(check_download_items):
with tqdm(total=len(check_download_items), desc='checking files') as pbar:
correct_zip = 0
wrong_zip = 0
for item in check_download_items:
file_path = os.path.join(download_folder, item['name'])
if os.path.exists(file_path) and item['md5'] == hashlib.md5(open(file_path, 'rb').read()).hexdigest():
check_download_items.remove(item)
correct_zip += 1
else:
wrong_zip += 1
pbar.update(1)
pbar.desc = f'correct: {correct_zip}, wrong: {wrong_zip}'
return check_download_items

def task_unzip(download_items):
wrong_items = []
success_num = 0
with tqdm(total=len(download_items), desc='unzip files') as pbar:
for item in download_items:
pbar.desc = f'success: {success_num}, unziping: {item["name"]}'
file_path = os.path.join(download_folder, item['name'])
try:
with zipfile.ZipFile(file_path, 'r') as zip_ref:
zip_ref.extractall(download_folder)
success_num += 1
except:
wrong_items.append(item)
print(f'failed to unzip {file_path}')
pbar.update(1)
print(f'{len(wrong_items)} files failed to unzip:')
for item in wrong_items:
print(item['name'])


if __name__ == "__main__":
auto_unzip = True

json_train_records = [5794115, 5837926]
json_val_records = [5792146]

print(f'generate download items')
download_item_train = generate_download_items(json_train_records)
download_item_val = generate_download_items(json_val_records)
generate_split_txt(download_item_train, 'train')
generate_split_txt(download_item_val, 'val')

total_download_items = [*download_item_train, *download_item_val]
total_download_items.sort(key=lambda x: x['name'])
print(f'find {len(total_download_items)} zip files')

# start download
need_to_download_items = task_download(total_download_items)
need_to_download_items = task_check_zip(need_to_download_items)

# check
while len(need_to_download_items) > 0:
print(f'{len(need_to_download_items)} files are wrong, try to download again')
task_download(need_to_download_items)
need_to_download_items = task_check_zip(need_to_download_items)

if auto_unzip:
print(f'start unzip')
task_unzip(total_download_items)

# finish
print(f'finish')

展开后可复制, 程序会先下载文件列表, 然后多线程下载压缩包. 若已有部分压缩包, 会验证 MD5 并跳过已下载的完整文件. 最后还附上了解压代码.

VOT toolkit 替代品

原工具配置了半天配不出来, 后续选择了另一个替代品 PYSOT toolkit.

直接按照文档安装即可, 注意要严格按照文档要求来, 有一步是要自己在本地构建包的.

该工具需要用到 json 文件来存标注框信息, 但作者没给 DepthTrack 的 json, 下面是本人写的 json 生成脚本, 从 depthtrack.txt 中自动读取序列名, 然后生成 DepthTrack.json.

json 生成脚本
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
import os
import numpy as np
import pandas
from tqdm import tqdm

dataset_root = './../data'
dataset_name = 'DepthTrack'
gt_file_name = 'groundtruth.txt'
gt_split_char = ','

data_dic = {}

# circle all video
txt_file = os.path.join(dataset_root, dataset_name, 'depthtrack.txt')
with open(txt_file, 'r') as f:
seq_names = f.read().splitlines()
seq_names = [s.strip().split('.')[0] for s in seq_names]

for seq_name in tqdm(seq_names):
seq_dir = os.path.join(dataset_root, dataset_name, seq_name)
# gt file
gt_file_path = os.path.join(seq_dir, gt_file_name)
with open(gt_file_path, 'r') as f:
gt_lines = f.read()
gt_lines = gt_lines.replace('nan', '0', -1)
gt_lines = gt_lines.splitlines()
gt_lines = [s.strip().split(gt_split_char) for s in gt_lines]
gt_lines = np.array(gt_lines, dtype=np.int32).tolist()
# img path
img_dir = os.path.join(seq_dir, 'color')
img_list = [f for f in os.listdir(img_dir) if f.endswith('.png') or f.endswith('.jpg')]
# sort
img_list.sort(key=lambda x: int(x.split('.')[0]))
img_list = [f'{seq_name}/color/{f}' for f in img_list]
# attr
attr = [attr.split('.')[0] for attr in os.listdir(seq_dir) if attr.endswith('.tag')]
# absent
absent = [1] * len(img_list)

seq_dic = {
'video_dir': seq_name,
'init_rect': gt_lines[0],
'img_names': img_list,
'gt_rect': gt_lines,
'attr': attr,
'absent': absent
}
data_dic[seq_name] = seq_dic

# save to json
json_path = os.path.join(dataset_root, dataset_name, 'DepthTrack.json')
pandas.DataFrame(data_dic).to_json(json_path)

评论