formatting

This commit is contained in:
franksim
2025-11-07 11:20:08 +01:00
parent 8fc3559d6c
commit b159d76517
8 changed files with 119 additions and 82 deletions

View File

@@ -11,7 +11,7 @@ def pad_to_square(img):
max_wh = max(w, h) max_wh = max(w, h)
pad = ((max_wh - w) // 2, (max_wh - h) // 2) pad = ((max_wh - w) // 2, (max_wh - h) // 2)
padding = (pad[0], pad[1], max_wh - w - pad[0], max_wh - h - pad[1]) padding = (pad[0], pad[1], max_wh - w - pad[0], max_wh - h - pad[1])
return F.pad(img, padding, fill=0, padding_mode='constant') return F.pad(img, padding, fill=0, padding_mode="constant")
def build_batch(paths: Sequence[str], transform=None) -> torch.Tensor: def build_batch(paths: Sequence[str], transform=None) -> torch.Tensor:
@@ -21,17 +21,18 @@ def build_batch(paths: Sequence[str], transform=None) -> torch.Tensor:
@param transform: One or multiple image transformations for augmenting the batch images. @param transform: One or multiple image transformations for augmenting the batch images.
@return: Returns one single tensor that contains every image. @return: Returns one single tensor that contains every image.
""" """
preprocess = transforms.Compose([ preprocess = transforms.Compose(
transforms.Lambda(pad_to_square), [
transforms.Resize((224, 224)), transforms.Lambda(pad_to_square),
*([transform] if transform is not None else []), transforms.Resize((224, 224)),
transforms.ToTensor() *([transform] if transform is not None else []),
] transforms.ToTensor(),
]
) )
imgs = [] imgs = []
for path in paths: for path in paths:
img = Image.open(path).convert('RGB') img = Image.open(path).convert("RGB")
img = preprocess(img) img = preprocess(img)
imgs.append(img) imgs.append(img)
batch = torch.stack(imgs) batch = torch.stack(imgs)
@@ -43,8 +44,7 @@ def get_model() -> torch.nn.Module:
@return: Returns a neural network, initialised with pretrained weights. @return: Returns a neural network, initialised with pretrained weights.
""" """
model = models.resnet18( model = models.resnet18(weights=models.ResNet18_Weights.DEFAULT)
weights=models.ResNet18_Weights.DEFAULT)
return model return model

View File

@@ -5,7 +5,12 @@ def avg_color(img: torch.Tensor):
return img.mean(dim=(1, 2)) return img.mean(dim=(1, 2))
def mask(foreground: torch.Tensor, background: torch.Tensor, mask_tensor: torch.Tensor, threshold: float): def mask(
foreground: torch.Tensor,
background: torch.Tensor,
mask_tensor: torch.Tensor,
threshold: float,
):
mask = mask_tensor > threshold mask = mask_tensor > threshold
if foreground.dim() == 3: if foreground.dim() == 3:
mask = mask.unsqueeze(0) mask = mask.unsqueeze(0)

View File

@@ -9,8 +9,8 @@ import logging
logging.basicConfig( logging.basicConfig(
level=logging.INFO, level=logging.INFO,
format='[%(asctime)s] %(levelname)s: %(message)s', format="[%(asctime)s] %(levelname)s: %(message)s",
datefmt='%H:%M:%S' datefmt="%H:%M:%S",
) )
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@@ -34,8 +34,7 @@ class MmpNet(nn.Module):
def __init__(self, num_classes: int): def __init__(self, num_classes: int):
super().__init__() super().__init__()
self.mobilenet = models.mobilenet_v2( self.mobilenet = models.mobilenet_v2(weights=MobileNet_V2_Weights.DEFAULT)
weights=MobileNet_V2_Weights.DEFAULT)
self.classifier = nn.Sequential( self.classifier = nn.Sequential(
nn.Dropout(0.2), nn.Dropout(0.2),
nn.Linear(self.mobilenet.last_channel, num_classes), nn.Linear(self.mobilenet.last_channel, num_classes),
@@ -59,24 +58,23 @@ def get_dataloader(
@param batch_size: Batch size for the data loader @param batch_size: Batch size for the data loader
@param num_workers: Number of workers for the data loader @param num_workers: Number of workers for the data loader
""" """
transform = transforms.Compose([ transform = transforms.Compose(
transforms.ToTensor(), [
transforms.Normalize( transforms.ToTensor(),
mean=[0.4914, 0.4822, 0.4465], transforms.Normalize(
std=[0.2023, 0.1994, 0.2010] mean=[0.4914, 0.4822, 0.4465], std=[0.2023, 0.1994, 0.2010]
), ),
]) ]
)
dataset = datasets.CIFAR10( dataset = datasets.CIFAR10(
root=data_root, root=data_root, train=is_train, download=True, transform=transform
train=is_train,
download=True,
transform=transform
) )
dataloader = DataLoader( dataloader = DataLoader(
dataset, batch_size=batch_size, dataset,
batch_size=batch_size,
shuffle=is_train, shuffle=is_train,
num_workers=num_workers, num_workers=num_workers,
pin_memory=True pin_memory=True,
) )
return dataloader return dataloader
@@ -133,7 +131,8 @@ def train_epoch(
if batch_idx % log_interval == 0 or batch_idx == len(loader): if batch_idx % log_interval == 0 or batch_idx == len(loader):
avg_batch_loss = running_loss / (batch_idx * loader.batch_size) avg_batch_loss = running_loss / (batch_idx * loader.batch_size)
logger.info( logger.info(
f" [Batch {batch_idx}/{len(loader)}] Train Loss: {avg_batch_loss:.4f}") f" [Batch {batch_idx}/{len(loader)}] Train Loss: {avg_batch_loss:.4f}"
)
epoch_loss = running_loss / len(loader.dataset) epoch_loss = running_loss / len(loader.dataset)
logger.info(f" ---> Train Loss (Epoch): {epoch_loss:.4f}") logger.info(f" ---> Train Loss (Epoch): {epoch_loss:.4f}")
@@ -184,11 +183,7 @@ def main():
device=device, device=device,
criterion=criterion, criterion=criterion,
) )
eval_epoch( eval_epoch(model=model, loader=dataloader_eval, device=device)
model=model,
loader=dataloader_eval,
device=device
)
log_epoch_progress(epoche, train_epochs, "end") log_epoch_progress(epoche, train_epochs, "end")

View File

@@ -28,17 +28,23 @@ class AnnotationRect:
def read_groundtruth_file(path: str) -> List[AnnotationRect]: def read_groundtruth_file(path: str) -> List[AnnotationRect]:
"""Exercise 3.1b""" """Exercise 3.1b"""
annotationRects = [] annotationRects = []
with open(path, 'r') as file: with open(path, "r") as file:
for line in file: for line in file:
if line.strip(): if line.strip():
values = line.strip().split() values = line.strip().split()
annotationRects.append(AnnotationRect(float(values[0]), float( annotationRects.append(
values[1]), float(values[2]), float(values[3]))) AnnotationRect(
float(values[0]),
float(values[1]),
float(values[2]),
float(values[3]),
)
)
return annotationRects return annotationRects
def get_image_with_max_annotations(dir_path: str) -> str: def get_image_with_max_annotations(dir_path: str) -> str:
img_pattern = re.compile(r'^(\d+)\.jpg$') img_pattern = re.compile(r"^(\d+)\.jpg$")
files = set(os.listdir(dir_path)) files = set(os.listdir(dir_path))
max_file = None max_file = None
max_annotations = 0 max_annotations = 0
@@ -47,32 +53,41 @@ def get_image_with_max_annotations(dir_path: str) -> str:
match = img_pattern.match(fname) match = img_pattern.match(fname)
if match: if match:
img_file = os.path.join(dir_path, fname) img_file = os.path.join(dir_path, fname)
annotations_number = len(read_groundtruth_file(os.path.join( annotations_number = len(
dir_path, f"{match.group(1)}.gt_data.txt"))) read_groundtruth_file(
if (annotations_number > max_annotations): os.path.join(dir_path, f"{match.group(1)}.gt_data.txt")
)
)
if annotations_number > max_annotations:
max_file = img_file max_file = img_file
max_annotations = annotations_number max_annotations = annotations_number
return max_file return max_file
def visualize_image(image_path: str, output_path='output.jpg', rect_color=(255, 0, 0), width=2): def visualize_image(
img_pattern = re.compile(r'(.*)(\.jpg)') image_path: str, output_path="output.jpg", rect_color=(255, 0, 0), width=2
):
img_pattern = re.compile(r"(.*)(\.jpg)")
match = img_pattern.match(image_path) match = img_pattern.match(image_path)
annotations = read_groundtruth_file(f"{match.group(1)}.gt_data.txt") annotations = read_groundtruth_file(f"{match.group(1)}.gt_data.txt")
img = Image.open(image_path).convert('RGB') img = Image.open(image_path).convert("RGB")
draw = ImageDraw.Draw(img) draw = ImageDraw.Draw(img)
for annotation in annotations: for annotation in annotations:
draw.rectangle([annotation.x1, annotation.y1, annotation.x2, annotation.y2], draw.rectangle(
outline=rect_color, width=width) [annotation.x1, annotation.y1, annotation.x2, annotation.y2],
outline=rect_color,
width=width,
)
img.save(output_path) img.save(output_path)
def main(): def main():
image_file = get_image_with_max_annotations( image_file = get_image_with_max_annotations(
"/home/ubuntu/mmp_wise2526_franksim/.data/mmp-public-3.2/train") "/home/ubuntu/mmp_wise2526_franksim/.data/mmp-public-3.2/train"
)
visualize_image(image_file) visualize_image(image_file)

View File

@@ -17,7 +17,7 @@ class MMP_Dataset(torch.utils.data.Dataset):
@param image_size: Desired image size that this dataset should return @param image_size: Desired image size that this dataset should return
""" """
self.image_size = image_size self.image_size = image_size
img_pattern = re.compile(r'^(\d+)\.jpg$') img_pattern = re.compile(r"^(\d+)\.jpg$")
files = set(os.listdir(path_to_data)) files = set(os.listdir(path_to_data))
self.images = [] self.images = []
@@ -25,12 +25,14 @@ class MMP_Dataset(torch.utils.data.Dataset):
match = img_pattern.match(fname) match = img_pattern.match(fname)
if match: if match:
img_file = os.path.join(path_to_data, fname) img_file = os.path.join(path_to_data, fname)
annotations = read_groundtruth_file(os.path.join( annotations = read_groundtruth_file(
path_to_data, f"{match.group(1)}.gt_data.txt")) os.path.join(path_to_data, f"{match.group(1)}.gt_data.txt")
)
self.images.append((img_file, annotations)) self.images.append((img_file, annotations))
self.images.sort(key=lambda x: int( self.images.sort(
re.match(r"(.*/)(\d+)(\.jpg)", x[0]).group(2))) key=lambda x: int(re.match(r"(.*/)(\d+)(\.jpg)", x[0]).group(2))
)
def __getitem__(self, idx: int) -> Tuple[torch.Tensor, int]: def __getitem__(self, idx: int) -> Tuple[torch.Tensor, int]:
""" """
@@ -38,15 +40,16 @@ class MMP_Dataset(torch.utils.data.Dataset):
""" """
img = Image.open(self.images[idx][0]).convert("RGB") img = Image.open(self.images[idx][0]).convert("RGB")
padding = self.__padding__(img) padding = self.__padding__(img)
transform = transforms.Compose([ transform = transforms.Compose(
transforms.Pad(padding, 0), [
transforms.Resize((self.image_size, self.image_size)), transforms.Pad(padding, 0),
transforms.ToTensor(), transforms.Resize((self.image_size, self.image_size)),
transforms.Normalize( transforms.ToTensor(),
mean=[0.485, 0.456, 0.406], transforms.Normalize(
std=[0.229, 0.224, 0.225] mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]
) ),
]) ]
)
return (transform(img), 1 if len(self.images[idx][1]) > 1 else 0) return (transform(img), 1 if len(self.images[idx][1]) > 1 else 0)
def __padding__(self, img) -> Tuple[int, int, int, int]: def __padding__(self, img) -> Tuple[int, int, int, int]:
@@ -61,16 +64,24 @@ class MMP_Dataset(torch.utils.data.Dataset):
def get_dataloader( def get_dataloader(
path_to_data: str, image_size: int, batch_size: int, num_workers: int, is_train: bool = True path_to_data: str,
image_size: int,
batch_size: int,
num_workers: int,
is_train: bool = True,
) -> DataLoader: ) -> DataLoader:
"""Exercise 3.2d""" """Exercise 3.2d"""
path = os.path.join(path_to_data, "train") if is_train else os.path.join( path = (
path_to_data, "val") os.path.join(path_to_data, "train")
if is_train
else os.path.join(path_to_data, "val")
)
dataset = MMP_Dataset(path_to_data=path, image_size=image_size) dataset = MMP_Dataset(path_to_data=path, image_size=image_size)
dataloader = DataLoader( dataloader = DataLoader(
dataset, batch_size=batch_size, dataset,
batch_size=batch_size,
shuffle=is_train, shuffle=is_train,
num_workers=num_workers, num_workers=num_workers,
pin_memory=True pin_memory=True,
) )
return dataloader return dataloader

View File

@@ -7,8 +7,9 @@ from .dataset import get_dataloader
def main(): def main():
"""Put your code for Exercise 3.3 in here""" """Put your code for Exercise 3.3 in here"""
parser = argparse.ArgumentParser() parser = argparse.ArgumentParser()
parser.add_argument('--tensorboard', action='store_true', parser.add_argument(
help='Enable TensorBoard logging') "--tensorboard", action="store_true", help="Enable TensorBoard logging"
)
args = parser.parse_args() args = parser.parse_args()
device = torch.device("cuda" if torch.cuda.is_available() else "cpu") device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
@@ -16,17 +17,24 @@ def main():
model = MmpNet(num_classes=2).to(device=device) model = MmpNet(num_classes=2).to(device=device)
dataloader_train = get_dataloader( dataloader_train = get_dataloader(
path_to_data=".data/mmp-public-3.2", path_to_data=".data/mmp-public-3.2",
image_size=244, batch_size=32, num_workers=6, is_train=True image_size=244,
batch_size=32,
num_workers=6,
is_train=True,
) )
dataloader_eval = get_dataloader( dataloader_eval = get_dataloader(
path_to_data=".data/mmp-public-3.2", path_to_data=".data/mmp-public-3.2",
image_size=244, batch_size=32, num_workers=6, is_train=False image_size=244,
batch_size=32,
num_workers=6,
is_train=False,
) )
criterion, optimizer = get_criterion_optimizer(model=model) criterion, optimizer = get_criterion_optimizer(model=model)
writer = None writer = None
if args.tensorboard: if args.tensorboard:
from torch.utils.tensorboard import SummaryWriter from torch.utils.tensorboard import SummaryWriter
writer = SummaryWriter(log_dir="runs/a3_mmpnet") writer = SummaryWriter(log_dir="runs/a3_mmpnet")
for epoch in range(train_epochs): for epoch in range(train_epochs):
@@ -37,14 +45,11 @@ def main():
device=device, device=device,
criterion=criterion, criterion=criterion,
) )
val_acc = eval_epoch( val_acc = eval_epoch(model=model, loader=dataloader_eval, device=device)
model=model,
loader=dataloader_eval,
device=device
)
print( print(
f"Epoch [{epoch+1}/{train_epochs}] - Train Loss: {train_loss:.4f} - Val Acc: {val_acc:.4f}") f"Epoch [{epoch + 1}/{train_epochs}] - Train Loss: {train_loss:.4f} - Val Acc: {val_acc:.4f}"
)
if writer is not None: if writer is not None:
writer.add_scalar("Loss/train", train_loss, epoch) writer.add_scalar("Loss/train", train_loss, epoch)

View File

@@ -10,19 +10,25 @@ def get_anchor_grid(
aspect_ratios: Sequence[float], aspect_ratios: Sequence[float],
) -> np.ndarray: ) -> np.ndarray:
anchor_grid = np.empty( anchor_grid = np.empty(
[len(anchor_widths), len(aspect_ratios), num_rows, num_cols, 4], dtype=float) [len(anchor_widths), len(aspect_ratios), num_rows, num_cols, 4], dtype=float
for (width_idx, ratio_idx, row, col) in np.ndindex(anchor_grid.shape[:-1]): )
for width_idx, ratio_idx, row, col in np.ndindex(anchor_grid.shape[:-1]):
anchor_point = ( anchor_point = (
col * scale_factor + scale_factor / 2, row * scale_factor + scale_factor / 2) col * scale_factor + scale_factor / 2,
row * scale_factor + scale_factor / 2,
)
width = anchor_widths[width_idx] width = anchor_widths[width_idx]
ratio = aspect_ratios[ratio_idx] ratio = aspect_ratios[ratio_idx]
anchor_grid[width_idx, ratio_idx, row, col] = get_box( anchor_grid[width_idx, ratio_idx, row, col] = get_box(
width, ratio, anchor_point) width, ratio, anchor_point
)
return anchor_grid return anchor_grid
def get_box(width: float, ratio: float, anchor_point: tuple[float, float]) -> np.ndarray: def get_box(
width: float, ratio: float, anchor_point: tuple[float, float]
) -> np.ndarray:
box = np.empty(4, dtype=float) box = np.empty(4, dtype=float)
box[0] = anchor_point[0] - (width / 2) box[0] = anchor_point[0] - (width / 2)
box[1] = anchor_point[1] - (width * ratio / 2) box[1] = anchor_point[1] - (width * ratio / 2)

View File

@@ -28,7 +28,7 @@ def get_label_grid(
for gt in gts: for gt in gts:
iou = iou(item, gt) iou = iou(item, gt)
label_grid[width, ratio, row, col] = False label_grid[width, ratio, row, col] = False
if (iou >= min_iou): if iou >= min_iou:
label_grid[width, ratio, row, col] = True label_grid[width, ratio, row, col] = True
break break
return label_grid return label_grid