In [1]:
import ogr
import os
import shapely, shapely.wkt
import shapely_tools as st
import utm_conversion as utm
from gdal_sample_points import gdal_sample_points
import numpy as np

In [2]:
# explore data
db_fn=r'd:\temp\CF1.2\OSM\manzese_bbox-latest.db'
ds=ogr.Open(db_fn)
for i in range(ds.GetLayerCount()):
 print(str(i) + ': ' + str(ds.GetLayerByIndex(i).GetName()))
 
 # create fields using OGR
 src_lyr = ds.GetLayerByIndex(i)
 f = src_lyr.GetFeature(1)
 if f:
 for i in range(f.GetFieldCount()):
 fd = f.GetFieldDefnRef(i)
 print("--> {:s}: {:s}".format(fd.GetName(), fd.GetTypeName()))

ds.Destroy()

0: points
--> osm_id: String
--> name: String
--> barrier: String
--> highway: String
--> ref: String
--> address: String
--> is_in: String
--> place: String
--> man_made: String
--> all_tags: String
1: lines
--> osm_id: String
--> name: String
--> highway: String
--> waterway: String
--> covered: String
--> depth: String
--> blockage: String
--> barrier: String
--> width: String
--> height: String
--> maxheight: String
--> layer: String
--> sidewalk: String
--> tunnel: String
--> bridge: String
--> z_order: Integer
--> all_tags: String
2: multilinestrings
--> osm_id: String
--> name: String
--> type: String
--> all_tags: String
3: multipolygons
--> osm_id: String
--> osm_way_id: String
--> name: String
--> type: String
--> admin_level: String
--> boundary: String
--> building: String
--> landuse: String
--> height: String
--> building_levelsfj: String
--> building_material: String
--> est_height: String
--> all_tags: String
4: other_relations


In [3]:
dir(f)

['__class__',
 '__delattr__',
 '__doc__',
 '__format__',
 '__getattribute__',
 '__hash__',
 '__init__',
 '__new__',
 '__reduce__',
 '__reduce_ex__',
 '__repr__',
 '__setattr__',
 '__sizeof__',
 '__str__',
 '__subclasshook__']

In [2]:
# utils
def check_Ftype(ftype, value):
 try:
 v_out = ftype(value) # parse
 except ValueError:
 v_out = None
 return v_out

def toUTM(shape):
 new_coords = [utm.from_latlon(c[1], c[0])[:2] for c in shape.coords]
 return shapely.geometry.LineString(new_coords)

In [3]:
gate_pli_fmt = """{d.name}
 2 2
{d.startx} {d.starty}
{d.endx} {d.endy}
""".format

gate_ini_fmt = """[structure]
type = gate
id = {d.name}
polylinefile = {rel_path}{d.name}.pli 
sill_level = {d.sill_level}
lower_edge_level = {d.lower_edge_level}
opening_width = {d.opening_width}
door_height = {d.door_height}
horizontal_opening_direction= symmetric
""".format

In [31]:
# gate class
class gate:
 """class for gate data"""
 def __init__(self, geom=None, dtm_fn=None, osm_id=None, sname=None, depth=2., width=1., **kwargs):
 
 if (geom is None) or (dtm_fn is None):
 print('invalid input, dtm_fn or geom missing for osm-id {:s}'.format(osm_id)) 
 ## TODO replace with formal warning here
 
 # osm naar structure properties & format
 self.osm_id = osm_id
 if not sname:
 sname = 'gate{:s}'.format(osm_id)
 self.name = sname
 self.geom = geom # latlon
 
 # utm latlon
 self.geom_utm = toUTM(self.geom)
 
 # create perpendicular line on midpoint first line segment
 self.geom_utm_p = st.perpendicular_line(self.geom_utm, length=1) #set default value for lenght of new line
 # formatted start & end point 
 self.startx = '{:.2f}'.format(self.geom_utm_p.coords[0][0])
 self.endx = '{:.2f}'.format(self.geom_utm_p.coords[-1][0])
 self.starty = '{:.2f}'.format(self.geom_utm_p.coords[0][1])
 self.endy = '{:.2f}'.format(self.geom_utm_p.coords[-1][1])
 
 # get sill and ground levels from dtm sample
 y = np.mean([self.geom_utm_p.coords[0][1], self.geom_utm_p.coords[-1][1]])
 x = np.mean([self.geom_utm_p.coords[0][0], self.geom_utm_p.coords[-1][0]])
 # sill level is min window dtm level
 sillLvl_dtm = gdal_sample_points(y, x, dtm_fn, win_size=40, func=np.min)[0] # 20 m rectangular window (0.5m res)
 # ground level is max window dtm level
 groundLvl_dtm = gdal_sample_points(y, x, dtm_fn, win_size=40, func=np.max)[0] # 20 m rectangular window
 
 if (sillLvl_dtm is None) or (groundLvl_dtm is None):
 print('no data sampled from dtm for {:s}'.format(self.name))
 ## TODO raise error and build try except around building object in function below
 sillLvl_dtm, groundLvl_dtm = 0, 0 # just to make it run for now
 
 # set gate properties from dtm & osm data
 self.sill_level = '{:.2f}'.format(sillLvl_dtm) # sill level at window min dtm level
 self.lower_edge_level = '{:.2f}'.format(sillLvl_dtm+depth) # lower edge level at sill level + depth 
 self.opening_width = '{:.2f}'.format(width) 
 self.door_height = '{:.2f}'.format(groundLvl_dtm) # door height at ground level
 
 def fmt_ini(self, pli_rel_path=None):
 """create formated string in ini format
 is path to folder with pli files relative to ini file"""
 if pli_rel_path is not None:
 pli_rel_path = '{:s}/'.format(rel_path)
 else:
 pli_rel_path = ''
 return gate_ini_fmt(rel_path=pli_rel_path, d=self)
 
 def fmt_pli(self):
 """create formated string in pli format with line coords"""
 return gate_pli_fmt(d=self)
 
 def to_pli(self, path):
 """write pli file per object to folder """
 with open(os.path.join(path, '{d.name}.pli'.format(d=self)), "w") as text_file:
 text_file.write(self.fmt_pli())
 
 def append2ini(self, ini_fn, pli_rel_path=None):
 """append structure to ini file at per object. 
 is path to folder with pli files relative to ini file"""
 with open(os.path.join(ini_fn), "a") as text_file:
 for o in objects:
 text_file.write(o.fmt_ini(pli_rel_path=pli_rel_path))


In [32]:
def osm2gate(fn, check_fields, key='tunnel', value='culvert', layer_index=1, dtm_fn=None):
 objects = []
 
 # read data
 src_ds = ogr.Open(fn)
 src_lyr = src_ds.GetLayerByIndex(layer_index)
 
 for feat in src_lyr:
 # only matching key-value pairs
 if feat.GetField(key) != value:
 continue
 
 # make dict with osm data
 data = {}
 # read geometry
 try:
 data['geom'] = shapely.wkt.loads(feat.GetGeometryRef().ExportToWkt())
 except Exception as e: 
 print('Error({0}), skipping geometry.'.format(e))
 continue

 # set field values
 for i in range(feat.GetFieldCount()):
 fieldDef = feat.GetFieldDefnRef(i)
 fieldName = fieldDef.GetName()
 v = feat.GetField(fieldName)
 
 # check fields -> if not according to format, set None
 if (fieldName in check_fields) and (v is not None):
 ftype = check_fields[fieldName]
 v = check_Ftype(ftype, v) 
 
 if v:
 # write data
 data[fieldName] = v

 # create gate object
 data['dtm_fn'] = dtm_fn
 ## TODO: make try except around this function
 o = gate(**data)
 objects.append(o)
 
 src_ds.Destroy()
 return objects



In [33]:
# filter culverts, skip invalide fields and create gate objects
db_fn=r'd:\temp\CF1.2\OSM\manzese_bbox-latest.db'
dtm_fn = r'd:\temp\CF1.2\DTM\manzese_DTM_0_5m_UTM37S_small.tif'
check_fields = {
 'width': float,
 'depth': float
}

objects = osm2gate(db_fn, check_fields, key='tunnel', value='culvert', layer_index=1, dtm_fn=dtm_fn)

no data sampled from dtm for gate360846699
no data sampled from dtm for gate361855131
no data sampled from dtm for gate373216314
no data sampled from dtm for gate373566298


In [24]:
#write pli files
path =r'd:\temp\CF1.2\DFM_data\culverts'
[o.to_pli(path) for o in objects] # writes 1 pli per structure
print(len(objects))

339


In [25]:
#write ini file
rel_path = r'../culverts'
ini_fn = r'd:\temp\CF1.2\DFM_data\culverts.ini'

# remove old ini file if exists
if os.path.exists(ini_fn):
 os.unlink(ini_fn)
# append to inifile
[o.append2ini(ini_fn, rel_path) for o in objects]
print(len(objects))

339


In [27]:
#
fn_shp = r'd:\temp\CF1.2\Manzese_OSMculverts_adapt.shp'

# write new geometries to shapefile to check
from shapely.geometry import mapping, LineString
import fiona
from fiona import crs
epgs=32737 # UTM37S

# Define a polygon feature geometry with one attribute
schema = {
 'geometry': 'LineString',
 'properties': {'id': 'int', 
 'sill_level': 'float', 'lower_edge_level': 'float',
 'opening_width': 'float', 'door_height': 'float'},
}

# Write a new Shapefile
with fiona.open(fn_shp, 'w', 'ESRI Shapefile', schema, crs=crs.from_epsg(epgs)) as c:
 ## If there are multiple geometries, put the "for" loop here
 for o in objects:
 c.write({
 'geometry': mapping(o.geom_utm_p), # write perpendicular line in utm
 'properties': {'id': int(o.osm_id), 
 'sill_level':float(o.sill_level), 
 'lower_edge_level':float(o.lower_edge_level),
 'opening_width':float(o.opening_width),
 'door_height':float(o.door_height)},
 })