{ "cells": [ { "cell_type": "code", "execution_count": 1, "metadata": { "collapsed": false }, "outputs": [], "source": [ "import ogr\n", "import os\n", "import shapely, shapely.wkt\n", "import shapely_tools as st\n", "import utm_conversion as utm\n", "from gdal_sample_points import gdal_sample_points\n", "import numpy as np" ] }, { "cell_type": "code", "execution_count": 2, "metadata": { "collapsed": false }, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "0: points\n", "--> osm_id: String\n", "--> name: String\n", "--> barrier: String\n", "--> highway: String\n", "--> ref: String\n", "--> address: String\n", "--> is_in: String\n", "--> place: String\n", "--> man_made: String\n", "--> all_tags: String\n", "1: lines\n", "--> osm_id: String\n", "--> name: String\n", "--> highway: String\n", "--> waterway: String\n", "--> covered: String\n", "--> depth: String\n", "--> blockage: String\n", "--> barrier: String\n", "--> width: String\n", "--> height: String\n", "--> maxheight: String\n", "--> layer: String\n", "--> sidewalk: String\n", "--> tunnel: String\n", "--> bridge: String\n", "--> z_order: Integer\n", "--> all_tags: String\n", "2: multilinestrings\n", "--> osm_id: String\n", "--> name: String\n", "--> type: String\n", "--> all_tags: String\n", "3: multipolygons\n", "--> osm_id: String\n", "--> osm_way_id: String\n", "--> name: String\n", "--> type: String\n", "--> admin_level: String\n", "--> boundary: String\n", "--> building: String\n", "--> landuse: String\n", "--> height: String\n", "--> building_levelsfj: String\n", "--> building_material: String\n", "--> est_height: String\n", "--> all_tags: String\n", "4: other_relations\n" ] } ], "source": [ "# explore data\n", "db_fn=r'd:\\temp\\CF1.2\\OSM\\manzese_bbox-latest.db'\n", "ds=ogr.Open(db_fn)\n", "for i in range(ds.GetLayerCount()):\n", " print(str(i) + ': ' + str(ds.GetLayerByIndex(i).GetName()))\n", " \n", " # create fields using OGR\n", " src_lyr = ds.GetLayerByIndex(i)\n", " f = src_lyr.GetFeature(1)\n", " if f:\n", " for i in range(f.GetFieldCount()):\n", " fd = f.GetFieldDefnRef(i)\n", " print(\"--> {:s}: {:s}\".format(fd.GetName(), fd.GetTypeName()))\n", "\n", "ds.Destroy()" ] }, { "cell_type": "code", "execution_count": 3, "metadata": { "collapsed": false }, "outputs": [ { "data": { "text/plain": [ "['__class__',\n", " '__delattr__',\n", " '__doc__',\n", " '__format__',\n", " '__getattribute__',\n", " '__hash__',\n", " '__init__',\n", " '__new__',\n", " '__reduce__',\n", " '__reduce_ex__',\n", " '__repr__',\n", " '__setattr__',\n", " '__sizeof__',\n", " '__str__',\n", " '__subclasshook__']" ] }, "execution_count": 3, "metadata": {}, "output_type": "execute_result" } ], "source": [ "dir(f)" ] }, { "cell_type": "code", "execution_count": 2, "metadata": { "collapsed": true }, "outputs": [], "source": [ "# utils\n", "def check_Ftype(ftype, value):\n", " try:\n", " v_out = ftype(value) # parse\n", " except ValueError:\n", " v_out = None\n", " return v_out\n", "\n", "def toUTM(shape):\n", " new_coords = [utm.from_latlon(c[1], c[0])[:2] for c in shape.coords]\n", " return shapely.geometry.LineString(new_coords)" ] }, { "cell_type": "code", "execution_count": 3, "metadata": { "collapsed": true }, "outputs": [], "source": [ "gate_pli_fmt = \"\"\"{d.name}\n", " 2 2\n", "{d.startx} {d.starty}\n", "{d.endx} {d.endy}\n", "\"\"\".format\n", "\n", "gate_ini_fmt = \"\"\"[structure]\n", "type = gate\n", "id = {d.name}\n", "polylinefile = {rel_path}{d.name}.pli \n", "sill_level = {d.sill_level}\n", "lower_edge_level = {d.lower_edge_level}\n", "opening_width = {d.opening_width}\n", "door_height = {d.door_height}\n", "horizontal_opening_direction= symmetric\n", "\"\"\".format" ] }, { "cell_type": "code", "execution_count": 31, "metadata": { "collapsed": false }, "outputs": [], "source": [ "# gate class\n", "class gate:\n", " \"\"\"class for gate data\"\"\"\n", " def __init__(self, geom=None, dtm_fn=None, osm_id=None, sname=None, depth=2., width=1., **kwargs):\n", " \n", " if (geom is None) or (dtm_fn is None):\n", " print('invalid input, dtm_fn or geom missing for osm-id {:s}'.format(osm_id)) \n", " ## TODO replace with formal warning here\n", " \n", " # osm naar structure properties & format\n", " self.osm_id = osm_id\n", " if not sname:\n", " sname = 'gate{:s}'.format(osm_id)\n", " self.name = sname\n", " self.geom = geom # latlon\n", " \n", " # utm latlon\n", " self.geom_utm = toUTM(self.geom)\n", " \n", " # create perpendicular line on midpoint first line segment\n", " self.geom_utm_p = st.perpendicular_line(self.geom_utm, length=1) #set default value for lenght of new line\n", " # formatted start & end point \n", " self.startx = '{:.2f}'.format(self.geom_utm_p.coords[0][0])\n", " self.endx = '{:.2f}'.format(self.geom_utm_p.coords[-1][0])\n", " self.starty = '{:.2f}'.format(self.geom_utm_p.coords[0][1])\n", " self.endy = '{:.2f}'.format(self.geom_utm_p.coords[-1][1])\n", " \n", " # get sill and ground levels from dtm sample\n", " y = np.mean([self.geom_utm_p.coords[0][1], self.geom_utm_p.coords[-1][1]])\n", " x = np.mean([self.geom_utm_p.coords[0][0], self.geom_utm_p.coords[-1][0]])\n", " # sill level is min window dtm level\n", " sillLvl_dtm = gdal_sample_points(y, x, dtm_fn, win_size=40, func=np.min)[0] # 20 m rectangular window (0.5m res)\n", " # ground level is max window dtm level\n", " groundLvl_dtm = gdal_sample_points(y, x, dtm_fn, win_size=40, func=np.max)[0] # 20 m rectangular window\n", " \n", " if (sillLvl_dtm is None) or (groundLvl_dtm is None):\n", " print('no data sampled from dtm for {:s}'.format(self.name))\n", " ## TODO raise error and build try except around building object in function below\n", " sillLvl_dtm, groundLvl_dtm = 0, 0 # just to make it run for now\n", " \n", " # set gate properties from dtm & osm data\n", " self.sill_level = '{:.2f}'.format(sillLvl_dtm) # sill level at window min dtm level\n", " self.lower_edge_level = '{:.2f}'.format(sillLvl_dtm+depth) # lower edge level at sill level + depth \n", " self.opening_width = '{:.2f}'.format(width) \n", " self.door_height = '{:.2f}'.format(groundLvl_dtm) # door height at ground level\n", " \n", " def fmt_ini(self, pli_rel_path=None):\n", " \"\"\"create formated string in ini format\n", " is path to folder with pli files relative to ini file\"\"\"\n", " if pli_rel_path is not None:\n", " pli_rel_path = '{:s}/'.format(rel_path)\n", " else:\n", " pli_rel_path = ''\n", " return gate_ini_fmt(rel_path=pli_rel_path, d=self)\n", " \n", " def fmt_pli(self):\n", " \"\"\"create formated string in pli format with line coords\"\"\"\n", " return gate_pli_fmt(d=self)\n", " \n", " def to_pli(self, path):\n", " \"\"\"write pli file per object to folder \"\"\"\n", " with open(os.path.join(path, '{d.name}.pli'.format(d=self)), \"w\") as text_file:\n", " text_file.write(self.fmt_pli())\n", " \n", " def append2ini(self, ini_fn, pli_rel_path=None):\n", " \"\"\"append structure to ini file at per object. \n", " is path to folder with pli files relative to ini file\"\"\"\n", " with open(os.path.join(ini_fn), \"a\") as text_file:\n", " for o in objects:\n", " text_file.write(o.fmt_ini(pli_rel_path=pli_rel_path))\n" ] }, { "cell_type": "code", "execution_count": 32, "metadata": { "collapsed": false }, "outputs": [], "source": [ "def osm2gate(fn, check_fields, key='tunnel', value='culvert', layer_index=1, dtm_fn=None):\n", " objects = []\n", " \n", " # read data\n", " src_ds = ogr.Open(fn)\n", " src_lyr = src_ds.GetLayerByIndex(layer_index)\n", " \n", " for feat in src_lyr:\n", " # only matching key-value pairs\n", " if feat.GetField(key) != value:\n", " continue\n", " \n", " # make dict with osm data\n", " data = {}\n", " # read geometry\n", " try:\n", " data['geom'] = shapely.wkt.loads(feat.GetGeometryRef().ExportToWkt())\n", " except Exception as e: \n", " print('Error({0}), skipping geometry.'.format(e))\n", " continue\n", "\n", " # set field values\n", " for i in range(feat.GetFieldCount()):\n", " fieldDef = feat.GetFieldDefnRef(i)\n", " fieldName = fieldDef.GetName()\n", " v = feat.GetField(fieldName)\n", " \n", " # check fields -> if not according to format, set None\n", " if (fieldName in check_fields) and (v is not None):\n", " ftype = check_fields[fieldName]\n", " v = check_Ftype(ftype, v) \n", " \n", " if v:\n", " # write data\n", " data[fieldName] = v\n", "\n", " # create gate object\n", " data['dtm_fn'] = dtm_fn\n", " ## TODO: make try except around this function\n", " o = gate(**data)\n", " objects.append(o)\n", " \n", " src_ds.Destroy()\n", " return objects\n", "\n" ] }, { "cell_type": "code", "execution_count": 33, "metadata": { "collapsed": false }, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "no data sampled from dtm for gate360846699\n", "no data sampled from dtm for gate361855131\n", "no data sampled from dtm for gate373216314\n", "no data sampled from dtm for gate373566298\n" ] } ], "source": [ "# filter culverts, skip invalide fields and create gate objects\n", "db_fn=r'd:\\temp\\CF1.2\\OSM\\manzese_bbox-latest.db'\n", "dtm_fn = r'd:\\temp\\CF1.2\\DTM\\manzese_DTM_0_5m_UTM37S_small.tif'\n", "check_fields = {\n", " 'width': float,\n", " 'depth': float\n", "}\n", "\n", "objects = osm2gate(db_fn, check_fields, key='tunnel', value='culvert', layer_index=1, dtm_fn=dtm_fn)" ] }, { "cell_type": "code", "execution_count": 24, "metadata": { "collapsed": false }, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "339\n" ] } ], "source": [ "#write pli files\n", "path =r'd:\\temp\\CF1.2\\DFM_data\\culverts'\n", "[o.to_pli(path) for o in objects] # writes 1 pli per structure\n", "print(len(objects))" ] }, { "cell_type": "code", "execution_count": 25, "metadata": { "collapsed": false }, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "339\n" ] } ], "source": [ "#write ini file\n", "rel_path = r'../culverts'\n", "ini_fn = r'd:\\temp\\CF1.2\\DFM_data\\culverts.ini'\n", "\n", "# remove old ini file if exists\n", "if os.path.exists(ini_fn):\n", " os.unlink(ini_fn)\n", "# append to inifile\n", "[o.append2ini(ini_fn, rel_path) for o in objects]\n", "print(len(objects))" ] }, { "cell_type": "code", "execution_count": 27, "metadata": { "collapsed": false }, "outputs": [], "source": [ "#\n", "fn_shp = r'd:\\temp\\CF1.2\\Manzese_OSMculverts_adapt.shp'\n", "\n", "# write new geometries to shapefile to check\n", "from shapely.geometry import mapping, LineString\n", "import fiona\n", "from fiona import crs\n", "epgs=32737 # UTM37S\n", "\n", "# Define a polygon feature geometry with one attribute\n", "schema = {\n", " 'geometry': 'LineString',\n", " 'properties': {'id': 'int', \n", " 'sill_level': 'float', 'lower_edge_level': 'float',\n", " 'opening_width': 'float', 'door_height': 'float'},\n", "}\n", "\n", "# Write a new Shapefile\n", "with fiona.open(fn_shp, 'w', 'ESRI Shapefile', schema, crs=crs.from_epsg(epgs)) as c:\n", " ## If there are multiple geometries, put the \"for\" loop here\n", " for o in objects:\n", " c.write({\n", " 'geometry': mapping(o.geom_utm_p), # write perpendicular line in utm\n", " 'properties': {'id': int(o.osm_id), \n", " 'sill_level':float(o.sill_level), \n", " 'lower_edge_level':float(o.lower_edge_level),\n", " 'opening_width':float(o.opening_width),\n", " 'door_height':float(o.door_height)},\n", " })" ] } ], "metadata": { "anaconda-cloud": {}, "kernelspec": { "display_name": "Python [Root]", "language": "python", "name": "Python [Root]" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 2 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython2", "version": "2.7.12" } }, "nbformat": 4, "nbformat_minor": 0 }