"""structure module.
Ths structure module defines metadata for segmentation (RTSTRUCT, SEG).
The metadata are attributes of the Structre class.
This module also defines routines for transforming and
accessing the Structure metadata in CERR coordinate system and
to convert images to real world units.
"""
from dataclasses import dataclass, field
from typing import List
import numpy as np
import os
from pydicom import dcmread
import cerr.dataclasses.scan as scn
from cerr.utils import uid
import cerr.contour.rasterseg as rs
import nibabel as nib
import SimpleITK as sitk
from skimage import measure
from datetime import datetime
from pydicom.uid import generate_uid
import json
from cerr.radiomics.preprocess import imgResample3D
import cerr.utils.mask as maskUtils
import warnings
def get_empty_list():
return []
def get_empty_np_array():
return np.empty((0,0,0))
[docs]
@dataclass
class Structure:
"""This class defines data object for volumetric segmentation. The metadata can be populated from DICOM, NifTi and
numpy arrays.
Attributes:
patientName (str): Patient's name.
structureName (str): Structure's name.
ROIInterpretedType (str): maps to DICOM tag (3006,00A4).
numberOfScans (int): Number of scan slices containing segmentation.
dateWritten (str): Date structure was created. Corresponds to DICOM tag (3006,0008) StructureSetDate.
structureColor (List): rgb triplet representing the color for this structure
structureDescription (str): Description of the structure. DICOM SeriesDescription.
roiGenerationAlgorithm (str) = Type of algorithm used to generate ROI. DICOM tag (3006,0036).
Permitted values are AUTOMATIC, SEMIAUTOMATIC and MANUAL
roiGenerationDescription (str): User-defined description of technique used to generate ROI. DICOM tag (3006,0038).
contour (List): List of contours including segmentation x,y,z CERR virtual coordinates per scan slice.
The ith entry in the list corresponds to the ith slice of the associated scanArray.
rasterSegments (np.ndarray): Numpy array of size numSegments x 10. The columns of this array are
z-value, y-value, x segment start, x segment stop, x increment, slice, row,
column start, column stop, voxel thickness for that slice.
Each row represents a scan segment.
strUID (str): unique identifier for structure object
assocScanUID (str): unique identifier for the scan associated with the structure object.
structSetSopInstanceUID: str = ""
referencedFrameOfReferenceUID (str): UID for frame of reference
referencedSeriesUID (str): UID of structure series i.e. DICOM SeriesInstanceUID
structureFileFormat (str): File format from which structure's metadata was populated.
Permitted values are "RTSTRUCT", "NPARRAY", "NIFTI".
"""
roiNumber: int = 0
patientName: str = ""
structureName: str = ""
ROIInterpretedType: str = ""
structureFormat: str = ""
numberOfScans: int = 0
maximumNumberScans: int = 0
maximumPointsPerSegment: int = 0
maximumSegmentsPerScan: int = 0
structureEdition: str = ""
writer: str = ""
dateWritten: str = ""
structureColor: List = field(default_factory=get_empty_np_array)
structureDescription: str = ""
roiGenerationAlgorithm: str = ""
roiGenerationDescription: str = ""
studyNumberOfOrigin: str = ""
contour: List = field(default_factory=get_empty_np_array)
rasterSegments: np.ndarray = field(default_factory=get_empty_np_array)
DSHPoints: np.ndarray = field(default_factory=get_empty_np_array)
orientationOfStructure: str = ""
transferProtocol: str = ""
visible: bool = True
strUID: str = ""
assocScanUID: str = ""
structSetSopInstanceUID: str = ""
rasterized: bool = False
referencedFrameOfReferenceUID: str = ""
referencedSeriesUID: str = ""
structureFileFormat: str = ""
def __getitem__(self, key):
return getattr(self, key)
def __setitem__(self, key, value):
return setattr(self, key, value)
[docs]
def saveNii(self, niiFileName, planC):
""" Routine to save pyCERR Structure object to NifTi file
Args:
niiFileName (str): File name including the full path to save the pyCERR scan object to NifTi file.
planC (cerr.plan_container.PlanC): pyCERR plan container object.
Returns:
int: 0 when NifTi file is written successfully.
"""
str_num = getStructNumFromUID(self.strUID, planC)
scan_num = scn.getScanNumFromUID(self.assocScanUID,planC)
affine3M = planC.scan[scan_num].getNiiAffine()
mask3M = rs.getStrMask(str_num,planC)
mask3M = np.moveaxis(mask3M,[0,1],[1,0])
# https://neurostars.org/t/direction-orientation-matrix-dicom-vs-nifti/14382/2
# dcmImgOri = planC.scan[scan_num].scanInfo[0].imageOrientationPatient
# slice_normal = dcmImgOri[[1,2,0]] * dcmImgOri[[5,3,4]] \
# - dcmImgOri[[2,0,1]] * dcmImgOri[[4,5,3]]
# zDiff = np.matmul(slice_normal, planC.scan[scan_num].scanInfo[1].imagePositionPatient) \
# - np.matmul(slice_normal, planC.scan[scan_num].scanInfo[0].imagePositionPatient)
# ippDiffV = planC.scan[scan_num].scanInfo[1].imagePositionPatient - planC.scan[scan_num].scanInfo[0].imagePositionPatient
if scn.flipSliceOrderFlag(planC.scan[scan_num]): # np.all(np.sign(zDiff) < 0):
#if not planC.scan[scan_num].isCerrSliceOrderMatchDcm():
mask3M = np.flip(mask3M,axis=2)
str_img = nib.Nifti1Image(mask3M.astype('uint16'), affine3M)
nib.save(str_img, niiFileName)
[docs]
def convertDcmToCerrVirtualCoords(self,planC):
"""Routine to convert x,y,z coordinates of segmentation from DICOM to pyCERR virtual coordinates. More information
about virtual coordinates is on the Wiki https://github.com/cerr/pyCERR/wiki/Coordinate-system
"""
assocScanUID = self.assocScanUID
scan_num = scn.getScanNumFromUID(assocScanUID,planC)
im_to_virtual_phys_transM = planC.scan[scan_num].Image2VirtualPhysicalTransM
im_to_phys_transM = planC.scan[scan_num].Image2PhysicalTransM
position_matrix_inv = np.linalg.inv(im_to_phys_transM)
_,_,zValsV = planC.scan[scan_num].getScanXYZVals()
isPhysicalCoords = self.structureFileFormat in ["RTSTRUCT", "NPARRAY", "NIFTI"]
scan_sop_inst_list = np.array([scan_info.sopInstanceUID for scan_info in planC.scan[scan_num].scanInfo])
numSlcs = len(planC.scan[scan_num].scanInfo)
for ctr_num,contour in enumerate(self.contour):
if np.any(contour.segments):
if isPhysicalCoords:
tempa = np.hstack((contour.segments, np.ones((contour.segments.shape[0], 1))))
tempa = np.matmul(position_matrix_inv, tempa.T)
# Round the z-coordinates (slice) of 'tempa'
tempa[2, :] = np.round(tempa[2, :])
else: # image coords
slcNum = np.argwhere(scan_sop_inst_list == contour.referencedSopInstanceUID)
slcNum = slcNum[0,0]
if scn.flipSliceOrderFlag(planC.scan[scan_num]):
slcNum = numSlcs - slcNum - 1
#tempa = np.hstack((contour.segments, np.ones((contour.segments.shape[0], 1))))
#tempa[:, 2] = slcNum
#tempa = np.matmul(im_to_phys_transM, tempa.T)
tempa = contour.segments
tempa[:,2] = slcNum
tempa = np.hstack((tempa, np.ones((contour.segments.shape[0], 1)))).T
tempb = np.matmul(im_to_virtual_phys_transM, tempa)
tempb = tempb[:3,:].T
self.contour[ctr_num].segments = tempb
# Sort contours per CT slices
dcm_contour_list = self.contour
contr_sop_inst_list = np.array([c.referencedSopInstanceUID for c in dcm_contour_list])
scan_info = planC.scan[scan_num].scanInfo
#scan_sop_inst_list = np.array([s.sopInstanceUID for s in scan_info])
num_slices = len(planC.scan[scan_num].scanInfo)
contour_list = np.empty(num_slices,Contour)
for slc_num in range(num_slices):
contour_list[slc_num] = []
scan_sop_inst = planC.scan[scan_num].scanInfo[slc_num].sopInstanceUID
if scan_sop_inst != "":
seg_matches = np.where(contr_sop_inst_list == scan_sop_inst)
else:
seg_matches = [[]]
if scan_sop_inst == "" or len(seg_matches[0]) == 0:
matches = []
for iCtr,ctr in enumerate(dcm_contour_list):
if np.all((ctr.segments[:,2] - zValsV[slc_num])**2 < 1e-5):
matches.append(iCtr)
seg_matches = (np.asarray(matches,dtype='int64'),)
segments = []
for seg_num in seg_matches[0]: # 0 for 1-d array. we only care about the 1st dim
segment = Segment()
segment.points = dcm_contour_list[seg_num].segments
segments.append(segment)
if np.any(segments):
contour = Contour()
contour.segments = segments
contour.referencedSopInstanceUID = dcm_contour_list[seg_num].referencedSopInstanceUID
contour.referencedSopClassUID = dcm_contour_list[seg_num].referencedSopClassUID
contour_list[slc_num] = contour
self.contour = contour_list
return self
[docs]
def getStructureAssociatedScan(self, planC):
"""
Args:
planC (cerr.plan_container.PlanC): pyCERR's plan container object
Returns:
int: associated scan index for structure object based on the scan UID associated with
the structure.
"""
# Preallocate memory
scanUID = [None] * len(planC.scan)
# Get all associated scan UID from structures
allAssocScanUID = [struct['assocScanUID'] for struct in planC.structure]
# Get the scan UID from the scan field under planC
scanUID = [scan['scanUID'] for scan in planC.scan]
# Get associated scan UID's
assocScanUID = self['assocScanUID']
# Match all the UID to check which scan the structure belongs to.
assocScans = [i for i, x in enumerate(scanUID) if x == assocScanUID]
return assocScans[0]
[docs]
def getSitkImage(self, planC):
""" Routine to convert pyCERR Structure object to SimpleITK Image object
Returns:
sitk.Image: SimpleITK Image with value of 1 assigned to segmented pixels
"""
assocScanNum = scn.getScanNumFromUID(self.assocScanUID, planC)
mask3M = rs.getStrMask(self, planC)
sitkArray = np.transpose(mask3M.astype(int), (2, 0, 1)) # z,y,x order
# CERR slice ordering is opposite of DICOM
if scn.flipSliceOrderFlag(planC.scan[assocScanNum]):
sitkArray = np.flip(sitkArray, axis = 0)
originXyz = list(np.matmul(planC.scan[assocScanNum].Image2PhysicalTransM, np.asarray([0,0,0,1]).T)[:3] * 10)
xV, yV, zV = planC.scan[assocScanNum].getScanXYZVals()
dx = np.abs(xV[1] - xV[0]) * 10
dy = np.abs(yV[1] - yV[0]) * 10
dz = np.abs(zV[1] - zV[0]) * 10
spacing = [dx, dy, dz]
img_ori = planC.scan[assocScanNum].scanInfo[0].imageOrientationPatient
slice_normal = img_ori[[1,2,0]] * img_ori[[5,3,4]] \
- img_ori[[2,0,1]] * img_ori[[4,5,3]]
# Get row-major directions for ITK
dir_cosine_mat = np.hstack((img_ori.reshape(3,2,order="F"),slice_normal.reshape(3,1)))
direction = dir_cosine_mat.reshape(9,order='C')
img = sitk.GetImageFromArray(sitkArray)
img.SetOrigin(originXyz)
img.SetSpacing(spacing)
img.SetDirection(direction)
return img
[docs]
def getStructDict(self):
""" Routine to get dictionary representation of structure metadata
Returns:
dict: fields of the dictionary are attributes of the Structure object.
"""
structDict = self.__dict__.copy()
contourList = []
for ctr in structDict['contour']:
if ctr:
ctrDict = ctr.__dict__.copy()
segList = []
for seg in ctrDict['segments']:
segList.append(seg.__dict__.copy())
ctrDict['segments'] = segList
contourList.append(ctrDict)
else:
contourList.append([])
structDict['contour'] = contourList
return structDict
[docs]
@dataclass
class Contour:
"""This class defines data object for storing segmented contours. The metadata can be populated from DICOM, NifTi and
numpy arrays.
Attributes:
referencedSopInstanceUID (str): Instance UID of associated image slice.
referencedSopClassUID (str): Class UID of associated image slice.
segments (np.array): array of segments.
"""
referencedSopInstanceUID: str = ""
referencedSopClassUID: str = ""
segments: np.array = field(default_factory=get_empty_np_array)
[docs]
@dataclass
class Segment:
"""This class defines data object for storing contour segments.
Attributes:
points (numpy.ndarray): (n X 3) array containing x,y,z coordinates of the segment in pyCERR virtual coordinate system.
"""
points: np.ndarray = field(default_factory=get_empty_np_array)
class jsonSerializeSegment(json.JSONEncoder):
def default(self, segObj):
segDict = {}
if not segObj:
segDict['points'] = ''
if isinstance(segObj, Segment):
segDict['points'] = segObj.points.tolist()
return segDict
else:
type_name = segObj.__class__.__name__
raise TypeError("Unexpected type {0}".format(type_name))
class jsonSerializeContour(json.JSONEncoder):
def default(self, ctrObj):
ctrDict = {}
if isinstance(ctrObj, Contour):
ctrDict['referencedSopClassUID'] = ctrObj.referencedSopClassUID
ctrDict['referencedSopInstanceUID'] = ctrObj.referencedSopInstanceUID
segList = []
for seg in ctrObj.segments:
segList.append(json.dumps(seg, cls=jsonSerializeSegment))
ctrDict['segments'] = segList
return ctrDict
else:
type_name = ctrObj.__class__.__name__
raise TypeError("Unexpected type {0}".format(type_name))
fieldsList = ['structureName', 'patientName', 'assocScanUID', 'strUID',
'ROIInterpretedType', 'structureColor',
'roiGenerationDescription', 'roiGenerationAlgorithm',
'structSetSopInstanceUID', 'referencedFrameOfReferenceUID',
'structureFileFormat']
class jsonSerializeStruct(json.JSONEncoder):
def default(self, strObj):
strDict = {}
if isinstance(strObj, Structure):
for fld in fieldsList:
strDict[fld] = getattr(strObj, fld)
ctrList = []
for ctr in strObj.contour:
ctrList.append(json.dumps(ctr, cls=jsonSerializeContour))
strDict['contour'] = ctrList
return strDict
else:
type_name = strObj.__class__.__name__
raise TypeError("Unexpected type {0}".format(type_name))
def getJsonList(structNumV, planC):
if isinstance(structNumV, (int, float)):
structNumV = [structNumV]
strList = []
for strNum in structNumV:
strObj = planC.structure[strNum]
strList.append(json.dumps(strObj, ensure_ascii=False, indent=4, cls=jsonSerializeStruct))
return strList
[docs]
def saveJson(structNumV, jsonFileName, planC):
"""
Args:
structNumV (List): List of structure indices to export to JSON format.
jsonFileName (str): JSON file name.
planC (cerr.plan_container.PlanC): pyCERR's plan container object
Returns:
None
"""
strList = getJsonList(structNumV, planC)
with open(jsonFileName, 'w', encoding='utf-8') as f:
json.dump(strList, f, ensure_ascii=False, indent=4)
[docs]
def importJson(planC, strList=None, jsonFileName=None):
"""
Args:
planC (cerr.plan_container.PlanC): pyCERR's plan container object.
strList (list of structures): (optional) list of structure metadata imported from json.
Required when jsonFileName is None.
jsonFileName: (optional) JSON file name containing structure metadata.
Required when strList is None.
Returns:
cerr.plan_container.PlanC: pyCERR's plan container object.
"""
if jsonFileName:
with open(jsonFileName, 'r', encoding='utf-8') as f:
strList = json.load(f)
elif strList:
pass
else:
return 'Provide filepath or strList'
strUIDs = [s.strUID for s in planC.structure]
for strJsonObj in strList:
strJsonObj = json.loads(strJsonObj)
if strJsonObj['strUID'] in strUIDs:
warnings.warn("Structure " + strJsonObj['strUID'] + " not imported from JSON as it already exists in planC")
continue
strObj = Structure()
for fld in fieldsList:
setattr(strObj, fld, strJsonObj[fld])
ctrList = strJsonObj['contour']
num_contours = len(ctrList)
contour_list = np.empty(num_contours,Contour)
for ctr_num, ctr in enumerate(ctrList):
ctr = json.loads(ctr)
if not ctr:
contour_list[ctr_num] = []
continue
ctrObj = Contour()
segList = ctr['segments']
segments = []
for seg in segList:
seg = json.loads(seg)
segment = Segment()
segment.points = np.asarray(seg['points'])
segments.append(segment)
ctrObj.segments = segments
ctrObj.referencedSopClassUID = ctr['referencedSopClassUID']
ctrObj.referencedSopInstanceUID = ctr['referencedSopInstanceUID']
contour_list[ctr_num] = ctrObj
strObj.contour = contour_list
planC.structure.append(strObj)
planC.structure[-1].rasterSegments = rs.generateRastersegs(planC.structure[-1], planC)
return planC
[docs]
def parseContours(contour_seq):
"""This routine parses the ContourSequence metadata from DICOM and returns a list of pyCERR Contour objects.
Args:
contour_seq (pydicom.dataset.Dataset): Pydicom Dataset object for ContourSequence, DICOM tag (3006,0040).
Returns:
List[cerr.dataclasses.structure.Contour]: list of pyCERR Contour objects
"""
num_contours = len(contour_seq)
contour_list = np.empty(num_contours,Contour)
for ctr_num,contr in enumerate(contour_seq):
sop_instance_uid = ""
sop_class_uid = ""
if hasattr(contr,"ContourImageSequence"):
ref_seq = contr.ContourImageSequence #
sop_instance_uid = ref_seq[0].ReferencedSOPInstanceUID
sop_class_uid = ref_seq[0].ReferencedSOPClassUID
geometry_type = contr.ContourGeometricType
num_pts = contr.NumberOfContourPoints
contour_data = contr.ContourData
contour_data = np.transpose(np.reshape(contour_data,(3,num_pts),order="F"))
contour_data = contour_data / 10
if not np.array_equal(contour_data[0,:],contour_data[-1,:]):
contour_data = np.vstack((contour_data,contour_data[0,:]))
contour = Contour()
contour.referencedSopClassUID = sop_class_uid
contour.referencedSopInstanceUID = sop_instance_uid
contour.segments = contour_data
contour_list[ctr_num] = contour
return contour_list
[docs]
def loadStructure(file_list):
"""This routine parses a list of DICOM files and imports metadata from RTSTRUCT and SEG modalities
to a list of pyCERR's Structure objects
.
Args:
file_list (List[str]): List of DICOM file paths.
Returns:
List[cerr.dataclasses.structure.Structure]: List of pyCERR's Structure objects.
"""
struct_list = []
for file in file_list:
ds = dcmread(file)
if ds.Modality == "RTSTRUCT":
roi_contour_seq = ds.ROIContourSequence
str_roi_seq = ds.StructureSetROISequence
roi_obs_seq = ds.RTROIObservationsSequence
ctrSeqRefRoiNums = np.array([roiCtr.ReferencedROINumber.real for roiCtr in roi_contour_seq])
obsSeqRefRoiNums = np.array([roiObs.ReferencedROINumber.real for roiObs in roi_obs_seq])
num_structs = len(roi_contour_seq)
#for str_num, (roi_contour,str_roi) in enumerate(zip(roi_contour_seq,str_roi_seq)):
for str_num, str_roi in enumerate(str_roi_seq):
struct_meta = Structure() #parse_structure_fields(roi_contour_seq,str_roi_seq)
struct_meta.patientName = str(ds.PatientName)
struct_meta.writer = ds.Manufacturer
struct_meta.dateWritten = ds.StructureSetDate
if hasattr(ds,"SeriesDescription"): struct_meta.structureDescription = ds.SeriesDescription
struct_meta.roiNumber = str_roi.ROINumber.real
struct_meta.structureName = str_roi.ROIName
struct_meta.roiGenerationAlgorithm = str_roi.ROIGenerationAlgorithm
if ("3006","0038") in str_roi:
struct_meta.roiGenerationDescription = str_roi["3006","0038"].value
ref_FOR_uid = str_roi.ReferencedFrameOfReferenceUID.name
struct_meta.referencedFrameOfReferenceUID = ref_FOR_uid
# Find the matching ROIContourSequence element
indCtrSeq = ctrSeqRefRoiNums == struct_meta.roiNumber
indObsSeq = obsSeqRefRoiNums == struct_meta.roiNumber
roi_contour = roi_contour_seq[np.where(indCtrSeq)[0][0]]
roi_obs = roi_obs_seq[np.where(indObsSeq)[0][0]]
if not hasattr(roi_contour,"ContourSequence"):
continue
struct_meta.contour = parseContours(roi_contour.ContourSequence)
struct_meta.numberOfScans = len(roi_contour.ContourSequence) # number of scan slices
if hasattr(roi_contour, "ROIDisplayColor"):
colorTriplet = roi_contour.ROIDisplayColor
struct_meta.structureColor = [int(val) for val in colorTriplet]
else:
struct_meta.structureColor = getColorForStructNum(str_num)
struct_meta.ROIInterpretedType = roi_obs.RTROIInterpretedType
struct_meta.strUID = uid.createUID("structure")
struct_meta.structSetSopInstanceUID = ds.SOPInstanceUID.name
struct_meta.structureFileFormat = "RTSTRUCT"
#structureColor
# struct_meta.roiGenerationAlgorithm = str_roi.ROIGenerationAlgorithm
# if ("3006","0038") in str_roi:
# struct_meta.roiGenerationDescription = str_roi["3006","0038"].value
# struct_meta.contour = parse_contours(roi_contour.ContourSequence)
# ref_FOR_uid = str_roi.ReferencedFrameOfReferenceUID
ref_FOR_seq = ds.ReferencedFrameOfReferenceSequence
for ref_FOR in ref_FOR_seq:
if ref_FOR.FrameOfReferenceUID.name == ref_FOR_uid:
struct_meta.assocScanUID = "CT." + ref_FOR.RTReferencedStudySequence[0].RTReferencedSeriesSequence[0].SeriesInstanceUID
# Associated scan found. Break out of for loop
break
struct_list.append(struct_meta)
elif ds.Modality == "SEG":
# Read segmentation mask
mask3M = ds.pixel_array
mask3M = np.transpose(mask3M,[1,2,0])
numStructs = len(ds.SegmentSequence)
if hasattr(ds.PerFrameFunctionalGroupsSequence[0], 'SegmentIdentificationSequence'):
refSegNums = np.array([segId.SegmentIdentificationSequence[0].ReferencedSegmentNumber for segId in ds.PerFrameFunctionalGroupsSequence])
else:
#numFrames = len(ds.SharedFunctionalGroupsSequence[0].DerivationImageSequence[0].SourceImageSequence)
numFrames = len(ds.PerFrameFunctionalGroupsSequence)
refSegNums = np.array([segId.ReferencedSegmentNumber for segId in ds.SharedFunctionalGroupsSequence[0].SegmentIdentificationSequence] * numFrames)
for strNum in range(numStructs):
struct_meta = Structure() #parse_structure_fields(roi_contour_seq,str_roi_seq)
struct_meta.structureFileFormat = "SEG"
struct_meta.patientName = str(ds.PatientName)
struct_meta.writer = ds.Manufacturer
struct_meta.dateWritten = ds.SeriesDate
if hasattr(ds,"SeriesDescription"): struct_meta.structureDescription = ds.SeriesDescription
struct_meta.roiNumber = ds.SegmentSequence[strNum].SegmentNumber
struct_meta.structureName = ds.SegmentSequence[strNum].SegmentLabel
struct_meta.numberOfScans = len(ds.PerFrameFunctionalGroupsSequence) # number of scan slices
struct_meta.strUID = uid.createUID("structure")
struct_meta.structSetSopInstanceUID = ds.SOPInstanceUID
if hasattr(ds.SegmentSequence[strNum], 'SegmentAlgorithmType'):
struct_meta.roiGenerationAlgorithm = ds.SegmentSequence[strNum].SegmentAlgorithmType
if hasattr(ds.SegmentSequence[strNum], 'SegmentAlgorithmName'):
struct_meta.roiGenerationDescription = ds.SegmentSequence[strNum].SegmentAlgorithmName
if hasattr(ds.SegmentSequence[strNum], 'RecommendedDisplayCIELabValue'):
struct_meta.structureColor = ds.SegmentSequence[strNum].RecommendedDisplayCIELabValue
struct_meta.structureColor = getColorForStructNum(strNum)
ref_FOR_uid = ds.FrameOfReferenceUID
refSeriesInstanceUID = ds.ReferencedSeriesSequence[0].SeriesInstanceUID
struct_meta.assocScanUID = "CT." + refSeriesInstanceUID
# Segment number
perFrameSegNum = np.argwhere(refSegNums == ds.SegmentSequence[strNum].SegmentNumber)
perFrameSegNum = perFrameSegNum[:,0]
contour_list = np.empty((0),Contour)
for frameNum in perFrameSegNum:
if hasattr(ds.PerFrameFunctionalGroupsSequence[0], 'DerivationImageSequence'):
sopInstanceUID = ds.PerFrameFunctionalGroupsSequence[frameNum].DerivationImageSequence[0].SourceImageSequence[0].ReferencedSOPInstanceUID
sopClassUID = ds.PerFrameFunctionalGroupsSequence[frameNum].DerivationImageSequence[0].SourceImageSequence[0].ReferencedSOPClassUID
else:
sopInstanceUID = ds.SharedFunctionalGroupsSequence[0].DerivationImageSequence[0].SourceImageSequence[frameNum].ReferencedSOPInstanceUID
sopClassUID = ds.SharedFunctionalGroupsSequence[0].DerivationImageSequence[0].SourceImageSequence[frameNum].ReferencedSOPClassUID
mask2D = mask3M[:,:,frameNum]
if not np.any(mask2D):
continue
contours = measure.find_contours(mask2D == 1, 0.5)
num_contours = len(contours)
for iContour, contour in enumerate(contours):
contObj = Contour()
segment = np.empty((contour.shape[0],3))
colV = contour[:, 1]
rowV = contour[:, 0]
slcV = np.ones_like(rowV) # just a paceholder. This is assigned later on based on SOPInstance UID match
ptsM = np.asarray((colV,rowV,slcV))
ptsM = np.vstack((ptsM, np.ones((1, ptsM.shape[1]))))
#ptsM = np.matmul(planC.scan[assocScanNum].Image2PhysicalTransM, ptsM)[:3,:].T
contObj.segments = ptsM[:3,:].T
contObj.referencedSopInstanceUID = sopInstanceUID
contObj.referencedSopClassUID = sopClassUID
#contour_list.append(contObj)
contour_list = np.append(contour_list,contObj)
struct_meta.contour = contour_list
struct_list.append(struct_meta)
return struct_list
[docs]
def importNii(file_list, assocScanNum, planC, labels_dict = {}):
"""This routine imports segmentation from a list of nii files into planC.
Args:
file_list (List or str): List of nii file paths or a string containing path for a single file.
assocScanNum (int): index of scan in planC to associate the segmentation.
planC (cerr.plan_container.PlanC): pyCERR's plan container object.
labels_dict (dict): dictionary of index to structure name mapping. e.g. {1: 'GTV', 2: 'Lung_total'}
Returns:
cerr.plan_container.PlanC: pyCERR's plan container object
"""
if isinstance(file_list,str) and os.path.exists(file_list):
file_list = [file_list]
struct_list = []
numStructs = len(planC.structure)
numAdded = 0
for file in file_list:
reader = sitk.ImageFileReader()
reader.SetFileName(file)
reader.LoadPrivateTagsOn()
reader.ReadImageInformation()
image = reader.Execute()
# Get mask on scan grid
scanImage = planC.scan[assocScanNum].getSitkImage()
extrapVal = 0
resample = sitk.ResampleImageFilter()
resample.SetReferenceImage(scanImage)
resample.SetInterpolator(getattr(sitk,'sitkNearestNeighbor'))
resample.SetDefaultPixelValue(extrapVal)
resampMaskImage = resample.Execute(image)
maskOnScan3M = scn.getCERRScanArrayFromITK(resampMaskImage, assocScanNum, planC)
all_labels = np.unique(maskOnScan3M)
all_labels = all_labels[all_labels != 0]
if len(labels_dict) == 0:
for label in all_labels:
labels_dict[label] = "Label " + str(label)
for label in labels_dict.keys():
planC = importStructureMask(maskOnScan3M == label, assocScanNum, labels_dict[label], planC, None)
return planC
[docs]
def importStructureMask(mask3M, assocScanNum, structName, planC, structNum=None):
"""
Args:
mask3M (np.ndarray): binary mask for segmentation which is of the same shape as the associated scan
assocScanNum (int): index of scan object within planC.scan to associate the structure
structName (str): Name of the structure
planC (cerr.plan_container.PlanC): pyCERR's container object
structNum (int or None): optional, index of structure object within planC.structure to replace
Returns:
cerr.plan_container.PlanC: pyCERR's container object with updated planC.structure attribute
"""
# Pad mask to account for boundary edges
paddedMask3M = mask3M.astype(int)
paddedMask3M = np.pad(paddedMask3M, ((1,1),(1,1),(0,0)), 'constant', constant_values = 0)
dt = datetime.now()
if isinstance(structNum,(int,float)):
struct_meta = planC.structure[structNum]
else:
struct_meta = Structure()
structNum = len(planC.structure)
struct_meta.structureColor = getColorForStructNum(structNum)
struct_meta.strUID = uid.createUID("structure")
struct_meta.structSetSopInstanceUID = generate_uid()
struct_meta.assocScanUID = planC.scan[assocScanNum].scanUID
struct_meta.structureFileFormat = "NPARRAY"
struct_meta.structureName = structName
struct_meta.dateWritten = dt.strftime("%Y%m%d")
struct_meta.roiNumber = ""
contour_list = np.empty((0),Contour)
numSlcs = len(planC.scan[assocScanNum].scanInfo)
dim = paddedMask3M.shape
for slc in range(dim[2]):
if not np.any(paddedMask3M[:,:,slc]):
continue
contours = measure.find_contours(paddedMask3M[:,:,slc] == 1, 0.5)
if scn.flipSliceOrderFlag(planC.scan[assocScanNum]):
slcNum = numSlcs - slc - 1
else:
slcNum = slc
# _,_,niiZ = image.TransformIndexToPhysicalPoint((0,0,slc))
# ind = np.where((zDicomV - niiZ)**2 < slcMatchTol)
# if len(ind[0]) == 1:
# ind = ind[0][0]
# else:
# raise Exception('No matching slices found.')
sopClassUID = planC.scan[assocScanNum].scanInfo[slc].sopClassUID
sopInstanceUID = planC.scan[assocScanNum].scanInfo[slc].sopInstanceUID
num_contours = len(contours)
for iContour, contour in enumerate(contours):
contObj = Contour()
segment = np.empty((contour.shape[0],3))
colV = contour[:, 1] - 1 # - 1 to account for padding
rowV = contour[:, 0] - 1 # - 1 to account for padding
slcV = np.ones_like(rowV) * slcNum
ptsM = np.asarray((colV,rowV,slcV))
ptsM = np.vstack((ptsM, np.ones((1, ptsM.shape[1]))))
ptsM = np.matmul(planC.scan[assocScanNum].Image2PhysicalTransM, ptsM)[:3,:].T
contObj.segments = ptsM
contObj.referencedSopInstanceUID = sopInstanceUID
contObj.referencedSopClassUID = sopClassUID
#contour_list.append(contObj)
contour_list = np.append(contour_list,contObj)
struct_meta.contour = contour_list
# struct_meta = structr.load_nii_structure(nii_file_name,assocScanNum,planC,labels_dict)
numOrigStructs = len(planC.structure)
if structNum == numOrigStructs:
planC.structure.append(struct_meta)
#str_num = len(planC.structure) - 1
planC.structure[structNum].convertDcmToCerrVirtualCoords(planC)
planC.structure[structNum].rasterSegments = rs.generateRastersegs(planC.structure[structNum], planC)
return planC
[docs]
def getColorForStructNum(structNum):
"""This routine returns the rgb color triplet to assign to a new structure object.
Args:
structNum (int): index of structure object in planC.structure
Returns:
List: rgb triplet
"""
colorMat = np.array([[ 230, 161, 0],
[0, 230, 0],
[230, 0, 0],
[ 0, 207, 207],
[172, 0, 172],
[172, 172, 0],
[138, 172, 230],
[184, 57, 57],
[230, 0, 230],
[172, 115, 0],
[ 0, 230, 115],
[230, 115, 230],
[115, 230, 0],
[69, 0, 207],
[0, 161, 69],
[161, 69, 0],
[0, 184, 207],
[161, 0, 184],
[207, 138, 0],
[76, 151, 230],
[230, 76, 76],
[230, 0, 207],
[207, 115, 0],
[0, 230, 92],
[207, 138, 207],
[138, 207, 0],
[161, 92, 184],
[138, 207, 46]])
# cycle colors
colorIndex = np.mod(structNum, colorMat.shape[0])
return list(colorMat[colorIndex,:])
[docs]
def copyToScan(structNum, scanNum, planC):
"""This routine copies structure object at index structNum to scan objact at index scanNum in planC.scan.
Args:
structNum (int): index of structure object in planC.structure
scanNum (int): index of scan object in planC.scan
planC (cerr.plan_container.PlanC): pyCERR's planc ontainer object
Returns:
cerr.plan_container.PlanC): updated planC with new planC.structure element associated with scanNum
"""
# Get associated scan number for structNum
origScanNum = scn.getScanNumFromUID(planC.structure[structNum].assocScanUID, planC)
mask3M = rs.getStrMask(structNum,planC)
# Get x,y,z, grid for the original scan
xOrigV, yOrigV, zOrigV = planC.scan[origScanNum].getScanXYZVals()
# Get x,y,z grid for the new scan
xNewV, yNewV, zNewV = planC.scan[scanNum].getScanXYZVals()
# Interpolate mask from original scan to the new scan
extrapVal = 0
newMask3M = imgResample3D(mask3M.astype(float), xOrigV, yOrigV, zOrigV, xNewV, yNewV, zNewV, 'sitkLinear',extrapVal) >= 0.5
structName = planC.structure[structNum].structureName
structNum = None
planC = importStructureMask(newMask3M, scanNum, structName, planC, structNum)
return planC
[docs]
def getStructNumFromUID(assocStrUID, planC) -> int:
"""This routine returns the index of the planC.structure element corresponding to the input pyCERR's structure UID
Args:
assocStrUID (str): UID of the structure object
planC (cerr.plan_container.PlanC): pyCERR's plan container object
Returns:
int: Index of the planC.structure element corresponding to the input UID
None when there is no matching element in planC.structure corresponding to the input UID.
"""
uid_list = [s.strUID for s in planC.structure]
if assocStrUID in uid_list:
return uid_list.index(assocStrUID)
else:
return None
[docs]
def getStructNumFromSOPInstanceUID(assocStrUID,planC) -> int:
"""This routine returns the index of the planC.structure element corresponding to the input SOP Instance UID
Args:
assocStrUID (str): SOP Instance UID of the structure object
planC (cerr.plan_container.PlanC): pyCERR's plan container object
Returns:
int: Index of the planC.structure element corresponding to the input UID
None when there is no matching element in planC.structure corresponding to the input SOP Instance UID.
"""
uid_list = [s.structSetSopInstanceUID for s in planC.structure]
if assocStrUID in uid_list:
return uid_list.index(assocStrUID)
else:
return None
[docs]
def calcIsocenter(strNum, planC):
"""This routine calculates the isocenter of the input structure index in planC.structure
Args:
strNum (int): Index of structure object in planC.structure
planC (cerr.plan_container.PlanC): pyCERR's plan container
Returns:
List: x,y,z coordinates in pyCERR's virtual coordinate system for the isocenter.
"""
assocScanNum = scn.getScanNumFromUID(planC.structure[strNum].assocScanUID, planC)
mask3M = rs.getStrMask(strNum,planC)
rV, cV, sV = np.where(mask3M)
midSliceInd = int(np.round(sV.mean()))
midRowInd = int(np.round(rV.mean()))
midColInd = int(np.round(cV.mean()))
# store isocenter to update viewer to display the central slice later on
xV, yV, zV = planC.scan[assocScanNum].getScanXYZVals()
isocenter = [xV[midColInd], yV[midRowInd], zV[midSliceInd]]
return isocenter
[docs]
def getMatchingIndex(structName, strList, matchCriteria='exact'):
"""This routine returns the index of element/s from the list of structure names that match the input name.
Args:
structName (str): Structure name to find a match
strList: List of structure names
matchCriteria: Criteria used to find the match
'EXACT' - returns indices of exact matches
'FIRSTCHARS' - returns indices where first characters of elements in the list match input structName
Returns:
List: list of matching indices from input strList
"""
if matchCriteria.upper() == 'EXACT':
indMatchV = [i for i, s in enumerate(strList) if s.lower() == structName.lower()]
elif matchCriteria.upper() == 'FIRSTCHARS':
indMatchV = [i for i, s in enumerate(strList) if s.lower().startswith(structName.lower())]
else:
# implementation for 'contains' match criteria
indMatchV = []
for i, s in enumerate(strList):
if structName.lower() in s.lower():
indMatchV.append(i)
return indMatchV
[docs]
def getContourPolygons(strNum, planC, rcsFlag=False):
"""This routine returns the list of polygonal coordinates for all the segments of input structutre.
Args:
strNum (int): index of structure element in planC.structure
planC (cerr.plan_container.PlanC): pyCERR's plan container object
rcsFlag (bool): optional, flag to return polygonal coordinates in row,col,slc units.
By default, the polygonal coordinates are returned in physical units of cm.
Returns:
list: list of nx3 arrays corresponding to polygonal segments, where n is the number of points in that segment,
the columns of each array are x,y,z coordinates in physical units of cm or r,c,s units.
"""
assocScanNum = scn.getScanNumFromUID(planC.structure[strNum].assocScanUID, planC)
numSlcs = len(planC.structure[strNum].contour)
polygons = []
for slc in range(numSlcs):
if planC.structure[strNum].contour[slc]:
for seg in planC.structure[strNum].contour[slc].segments:
if rcsFlag:
rowV, colV = rs.xytom(seg.points[:,0], seg.points[:,1],slc,planC, assocScanNum)
pts = np.array((rowV, colV, slc*np.ones_like(rowV)), dtype=np.float64).T
else:
pts = seg.points
polygons.append(pts)
return polygons
[docs]
def getClosedMask(structNum, structuringElementSizeCm, planC, saveFlag=False,\
replaceFlag=None, procSructName=None):
"""
Function for morphological closing and hole-filling for binary masks
Args:
structNum : int for index of structure in planC.
structuringElementSizeCm : float for size of structuring element for closing in cm
planC: pyCERR plan container object.
saveFlag: [optional, default=False] bool flag for saving processed mask to planC.
replaceFlag: [optional, default=False] bool flag for replacing input mask with
processed mask in planC.
procSructName: [optional, default=None] string for output structure name.
Original structure name is used if None.
Returns:
filledMask3M: np.ndarray(dtype=bool) for filled mask.
planC: pyCERR plan container object.
"""
# Get binary mask of structure
mask3M = rs.getStrMask(structNum,planC)
# Get mask resolution
assocScanNum = scn.getScanNumFromUID(planC.structure[structNum].assocScanUID,\
planC)
inputResV = planC.scan[assocScanNum].getScanSpacing()
filledMask3M = maskUtils.closeMask(mask3M,inputResV,structuringElementSizeCm)
# # Create structuring element
# structuringElement = createStructuringElement(structuringElementSizeCm,\
# inputResV, dimensions=3)
#
# # Apply morphological closing
# closedMask3M = morphologicalClosing(mask3M, structuringElement)
#
# # Fill any remaining holes
# filledMask3M = fillHoles(closedMask3M)
# Save to planC
if saveFlag:
if procSructName is None:
procSructName = planC.structure[structNum].structureName
assocScanNum = scn.getScanNumFromUID(planC.structure[structNum].assocScanUID,\
planC)
newStructNum = None
if replaceFlag:
# Delete structNum
#del planC.structure[structNum]
newStructNum = structNum
#pc.import_structure_mask(filledMask3M, assocScanNum, procSructName, planC)
planC = importStructureMask(filledMask3M, assocScanNum, procSructName, planC, newStructNum)
return filledMask3M, planC
[docs]
def getLargestConnComps(structNum, numConnComponents, planC=None, saveFlag=None,\
replaceFlag=None, procSructName=None):
"""
Function to retain 'N' largest connected components in input binary mask
Args:
structNum: int for index of structure in planC
(OR) np.ndarray(dtype=bool) 3D binary mask.
structuringElementSizeCm: float for desired size of structuring element for
morphological closing in cm.
planC: [optional, default=None] pyCERR plan container object.
saveFlag: [optional, default=False] bool flag for importing filtered mask
to planC if set to True.
replaceFlag: [optional, default=False] bool flag for replacing
input mask with processed mask to planC if set to True.
procSructName: [optional, default=None] string for output structure name.
Original structure name is used if empty.
Returns:
maskOut3M: np.ndarray(dtype=bool) filtered binary mask.
planC: pyCERR plan container object.
"""
if np.isscalar(structNum):
# Get binary mask of structure
mask3M = rs.getStrMask(structNum,planC)
else:
# Input is binary structure mask
mask3M = structNum
maskOut3M = maskUtils.largestConnComps(mask3M, numConnComponents)
# if np.sum(mask3M) > 1:
# #Extract connected components
# labeledArray, numFeatures = label(mask3M, structure=np.ones((3, 3, 3)))
#
# # Sort by size
# ccSiz = [len(labeledArray[labeledArray == i]) for i in range(1, numFeatures + 1)]
# rankV = np.argsort(ccSiz)[::-1]
# if len(rankV) > numConnComponents:
# selV = rankV[:numConnComponents]
# else:
# selV = rankV[:]
#
# # Return N largest
# maskOut3M = np.zeros_like(mask3M, dtype=bool)
# for n in selV:
# idxV = labeledArray == n + 1
# maskOut3M[idxV] = True
# else:
# maskOut3M = mask3M
if planC is not None and saveFlag and np.isscalar(structNum):
if procSructName is None:
procSructName = planC.structure[structNum].structureName
assocScanNum = scn.getScanNumFromUID(planC.structure[structNum].assocScanUID,\
planC)
newStructNum = None
if replaceFlag:
# Delete structNum
#del planC.structure[structNum]
newStructNum = structNum
planC = importStructureMask(maskOut3M, assocScanNum, procSructName, planC, newStructNum)
return maskOut3M, planC
[docs]
def getLabelMap(strNumV, planC, labelDict=None, dim=3):
"""
Function to create label map for user-specified structures.
Args:
strNumV (list) : Structure indices to be exported.
planC (plan_container.planC): pyCERR plan_container object.
labelDict (dict): [optional, default={}] dictionary mapping indices with structure names.
dim (int): [optional, default=3] int indicating dimensions of output label map.
When set to 3, returns 3D label map np.array(dtype=int).
When set to 4, returns 4D array of binary masks (required for overlapping structures).
Returns:
labelMap3M: np.ndarray(dtype=int) for label map.
"""
if labelDict is None:
labelDict = {}
# Assign default labels
for idx in range(len(strNumV)):
strName = planC.structure[strNumV[idx]].structureName
labelDict[idx + 1] = strName
allLabels = labelDict.keys()
if dim == 3:
assocScan = planC.structure[strNumV[0]].getStructureAssociatedScan(planC)
shape = planC.scan[assocScan].getScanSize()
labelMap = np.zeros(shape, dtype=int)
for strNum in strNumV:
strName = planC.structure[strNum].structureName
strLabel = [label for label in allLabels if labelDict[label] == strName]
if isinstance(strLabel, str):
strLabel = int(strLabel)
mask3M = rs.getStrMask(strNum, planC)
if np.any(np.logical_and(mask3M, labelMap > 0)):
raise Exception("Overlapping structures encountered. Please set dim=4.")
labelMap[mask3M] = strLabel
elif dim == 4:
labelMap = np.array(getMaskList(strNumV, planC, labelDict=labelDict))
else:
raise ValueError("Invalid input. Dim must be 3 or 4.")
return labelMap
[docs]
def getMaskList(strNumV, planC, labelDict=None):
"""
Function to create list of binary masks of user-specified structures.
Args:
strNumV: list of structure indices to be exported.
planC: pyCERR plan_container object.
labelDict: [optional, default={}] dictionary mapping indices with structure names.
Returns:
maskList: list(dtype=bool) of binary masks.
"""
if labelDict is None or len(labelDict)==0:
labelDict = {}
# Assign default labels
for idx in range(len(strNumV)):
strName = planC.structure[strNumV[idx]].structureName
labelDict[idx + 1] = strName
maskList = []
allLabels = list(labelDict.keys())
for idx in range(len(strNumV)):
scanNum = scn.getScanNumFromUID(planC.structure[strNumV[idx]].assocScanUID, planC)
mask3M = rs.getStrMask(strNumV[idx], planC)
strName = planC.structure[strNumV[idx]].structureName
matchLabels = [label for label in allLabels if labelDict[label] == strName]
strLabel = matchLabels[0] if len(matchLabels)>0 else None
mask3M = np.moveaxis(mask3M, [0, 1], [1, 0])
if scn.flipSliceOrderFlag(planC.scan[scanNum]):
mask3M = np.flip(mask3M, axis=2)
if isinstance(strLabel, str):
strLabel = int(strLabel)
maskList.insert(strLabel-1, mask3M)
return maskList