Commit
·
26110ee
1
Parent(s):
125e09d
Flag unreachable as yellow:
Browse files
app.py
CHANGED
|
@@ -137,7 +137,13 @@ def build_masks(seg):
|
|
| 137 |
return water_mask, garbage_mask, movable_mask
|
| 138 |
|
| 139 |
# Garbage mask can be highlighted in red
|
| 140 |
-
def highlight_chunk_masks_on_frame(
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 141 |
"""
|
| 142 |
Overlays semi-transparent colored regions for garbage chunks on the frame.
|
| 143 |
`objs` must have 'pos' and 'col' keys. The collection status changes the overlay color.
|
|
@@ -150,8 +156,13 @@ def highlight_chunk_masks_on_frame(frame, labels, objs, color_uncollected=(0, 0,
|
|
| 150 |
continue
|
| 151 |
mask = (labels == lab).astype(np.uint8)
|
| 152 |
contours, _ = cv2.findContours(mask, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
|
| 153 |
-
|
| 154 |
-
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 155 |
cv2.drawContours(overlay, contours, -1, color, thickness=cv2.FILLED)
|
| 156 |
# Blend overlay with original frame using alpha
|
| 157 |
return cv2.addWeighted(overlay, alpha, frame, 1 - alpha, 0)
|
|
@@ -201,32 +212,29 @@ def knn_path(start, targets, occ):
|
|
| 201 |
reachable = []; unreachable = []
|
| 202 |
while todo:
|
| 203 |
# KNN follow a Greedy approach, which may not guarantee shortest path, hence only use A*
|
| 204 |
-
# nbrs = NearestNeighbors(n_neighbors=1).fit(todo)
|
| 205 |
-
# _,idx = nbrs.kneighbors([cur]); nxt=tuple(todo[idx[0][0]])
|
| 206 |
-
# seg = astar(cur, nxt, occ)
|
| 207 |
-
# if seg:
|
| 208 |
-
# if path and seg[0]==path[-1]: seg=seg[1:]
|
| 209 |
-
# path.extend(seg)
|
| 210 |
-
# cur = nxt; todo.remove(list(nxt))
|
| 211 |
best = None
|
| 212 |
best_len = float('inf')
|
| 213 |
best_seg = []
|
| 214 |
# Try A* to each target, find shortest actual path
|
| 215 |
for t in todo:
|
| 216 |
seg = astar(cur, tuple(t), occ)
|
| 217 |
-
if seg
|
| 218 |
best = tuple(t)
|
| 219 |
best_len = len(seg)
|
| 220 |
best_seg = seg
|
| 221 |
if not best:
|
| 222 |
-
|
| 223 |
-
|
|
|
|
|
|
|
|
|
|
| 224 |
if path and path[-1] == best_seg[0]:
|
| 225 |
-
best_seg = best_seg[1:] #
|
| 226 |
path.extend(best_seg)
|
|
|
|
| 227 |
cur = best
|
| 228 |
todo.remove(list(best))
|
| 229 |
-
return path
|
| 230 |
|
| 231 |
|
| 232 |
# ── Robot sprite/class -──────────────────────────────────────────────────
|
|
@@ -488,18 +496,28 @@ def _pipeline(uid,img_path):
|
|
| 488 |
robot = Robot(SPRITE)
|
| 489 |
# Robot will be spawn on the closest movable mask to top-left
|
| 490 |
robot.pos = [spawn_x, spawn_y]
|
| 491 |
-
path = knn_path(robot.pos, centres, movable_mask)
|
|
|
|
|
|
|
| 492 |
|
| 493 |
# 4- Video synthesis
|
| 494 |
out_tmp=f"{OUTPUT_DIR}/{uid}_tmp.mp4"
|
| 495 |
vw=cv2.VideoWriter(out_tmp,cv2.VideoWriter_fourcc(*"mp4v"),10.0,(640,640))
|
| 496 |
-
objs=[{"pos":p,"col":False} for p in centres]
|
|
|
|
| 497 |
bg = bgr.copy()
|
| 498 |
for _ in range(15000): # safety frames
|
| 499 |
frame=bg.copy()
|
| 500 |
# Draw garbage chunk masks in red-to-green (semi-transparent)
|
| 501 |
-
frame = highlight_chunk_masks_on_frame(
|
| 502 |
-
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 503 |
# Draw object detections as red (to green) dots
|
| 504 |
for o in objs:
|
| 505 |
color = (0, 0, 128) if not o["col"] else (0, 128, 0)
|
|
|
|
| 137 |
return water_mask, garbage_mask, movable_mask
|
| 138 |
|
| 139 |
# Garbage mask can be highlighted in red
|
| 140 |
+
def highlight_chunk_masks_on_frame(
|
| 141 |
+
frame, labels, objs,
|
| 142 |
+
color_uncollected=(0, 0, 128),
|
| 143 |
+
color_collected=(0, 128, 0),
|
| 144 |
+
color_unreachable=(0, 255, 255),
|
| 145 |
+
alpha=0.8
|
| 146 |
+
):
|
| 147 |
"""
|
| 148 |
Overlays semi-transparent colored regions for garbage chunks on the frame.
|
| 149 |
`objs` must have 'pos' and 'col' keys. The collection status changes the overlay color.
|
|
|
|
| 156 |
continue
|
| 157 |
mask = (labels == lab).astype(np.uint8)
|
| 158 |
contours, _ = cv2.findContours(mask, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
|
| 159 |
+
# Choose color based on status
|
| 160 |
+
if obj.get("unreachable"):
|
| 161 |
+
color = color_unreachable
|
| 162 |
+
elif obj["col"]:
|
| 163 |
+
color = color_collected
|
| 164 |
+
else:
|
| 165 |
+
color = color_uncollected # drawContours on overlay
|
| 166 |
cv2.drawContours(overlay, contours, -1, color, thickness=cv2.FILLED)
|
| 167 |
# Blend overlay with original frame using alpha
|
| 168 |
return cv2.addWeighted(overlay, alpha, frame, 1 - alpha, 0)
|
|
|
|
| 212 |
reachable = []; unreachable = []
|
| 213 |
while todo:
|
| 214 |
# KNN follow a Greedy approach, which may not guarantee shortest path, hence only use A*
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 215 |
best = None
|
| 216 |
best_len = float('inf')
|
| 217 |
best_seg = []
|
| 218 |
# Try A* to each target, find shortest actual path
|
| 219 |
for t in todo:
|
| 220 |
seg = astar(cur, tuple(t), occ)
|
| 221 |
+
if seg and len(seg) < best_len: # index error?
|
| 222 |
best = tuple(t)
|
| 223 |
best_len = len(seg)
|
| 224 |
best_seg = seg
|
| 225 |
if not best:
|
| 226 |
+
# All remaining in `todo` are unreachable
|
| 227 |
+
for u in todo:
|
| 228 |
+
print(f"⚠️ Garbage unreachable at {u}")
|
| 229 |
+
unreachable.append(u)
|
| 230 |
+
break # no more reachable targets
|
| 231 |
if path and path[-1] == best_seg[0]:
|
| 232 |
+
best_seg = best_seg[1:] # skip duplicate
|
| 233 |
path.extend(best_seg)
|
| 234 |
+
reachable.append(list(best))
|
| 235 |
cur = best
|
| 236 |
todo.remove(list(best))
|
| 237 |
+
return path, unreachable
|
| 238 |
|
| 239 |
|
| 240 |
# ── Robot sprite/class -──────────────────────────────────────────────────
|
|
|
|
| 496 |
robot = Robot(SPRITE)
|
| 497 |
# Robot will be spawn on the closest movable mask to top-left
|
| 498 |
robot.pos = [spawn_x, spawn_y]
|
| 499 |
+
path, unreachable = knn_path(robot.pos, centres, movable_mask)
|
| 500 |
+
if unreachable:
|
| 501 |
+
print(f"⚠️ Unreachable garbage chunks at: {unreachable}")
|
| 502 |
|
| 503 |
# 4- Video synthesis
|
| 504 |
out_tmp=f"{OUTPUT_DIR}/{uid}_tmp.mp4"
|
| 505 |
vw=cv2.VideoWriter(out_tmp,cv2.VideoWriter_fourcc(*"mp4v"),10.0,(640,640))
|
| 506 |
+
objs = [{"pos": p, "col": False, "unreachable": False} for p in centres if p not in unreachable]
|
| 507 |
+
objs += [{"pos": p, "col": False, "unreachable": True} for p in unreachable]
|
| 508 |
bg = bgr.copy()
|
| 509 |
for _ in range(15000): # safety frames
|
| 510 |
frame=bg.copy()
|
| 511 |
# Draw garbage chunk masks in red-to-green (semi-transparent)
|
| 512 |
+
frame = highlight_chunk_masks_on_frame(
|
| 513 |
+
frame,
|
| 514 |
+
labels,
|
| 515 |
+
objs,
|
| 516 |
+
color_uncollected=(0, 0, 128), # 🔴
|
| 517 |
+
color_collected=(0, 128, 0), # 🟢
|
| 518 |
+
color_unreachable=(0, 255, 255) # 🟡
|
| 519 |
+
) # 🔴 garbage overlay
|
| 520 |
+
frame = highlight_water_mask_on_frame(frame, water_mask) # 🔵 water overlay
|
| 521 |
# Draw object detections as red (to green) dots
|
| 522 |
for o in objs:
|
| 523 |
color = (0, 0, 128) if not o["col"] else (0, 128, 0)
|