#!/usr/bin/env python #""" # A script to make sure all CFM Files have the CFQueryParam validation tags # # Copyright (C) 2008-09 Pranav Prakash pranav@myblive.com # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program. If not, see http://www.gnu.org/licenses/ # #""" __author__ = 'Pranav Prakash' __author_email__ = 'pranny@gmail.com' import sgmllib, re, string import sys #""" # The CFMLParser class, is used to parse the CFM/ CFC files so as to extract all the CFQuery tags. # These tags contain the SQL queries which are vulnurable to be attacked. #""" class CFMLParser(sgmllib.SGMLParser): def __init__(self): sgmllib.SGMLParser.__init__(self) self.inside_cfquery = False self.inside_comment = False self.inside_logic = False self.SQL_queries = [] self.query_names = [] self.newSQL_queries = [] self.temp = '' def start_cfquery(self, attributes): self.inside_cfquery = True for k,v in attributes: if k == 'name': self.query_names.append(v) if k == 'sql': self.query_names.pop() self.end_cfquery() def end_cfquery(self): if self.temp != '': self.SQL_queries.append(self.temp) self.temp = '' self.inside_cfquery = False def start_cfqueryparam(self, attributes): self.temp += self.get_starttag_text() #print self.temp def end_cfqueryparam(self): pass def start_cfif(self, attributes): if self.inside_cfquery: self.temp += self.get_starttag_text() self.inside_logic = True else: self.end_cfif() def end_cfif(self): if self.inside_cfquery: self.temp += '</cfif>' self.inside_logic = False else: pass def start_cfelse(self, attributes): if self.inside_logic: self.temp += self.get_starttag_text() def start_cfelseif(self, attributes): if self.inside_logic: self.temp += self.get_starttag_text() def handle_data(self, data): if self.inside_logic or self.inside_cfquery and len(data.lstrip()) > 0: self.temp += data def handle_comment(self, comment): if self.inside_cfquery: self.temp += '<!--'+comment+'-->' def report_ubalanced(self, tag): if tag == 'cfqueryparam': end_cfqueryparam() if tag == 'cfelse': end_cfelse() def get_queries(self): return self.SQL_queries def get_oldqueries(self): return self.SQL_queries def get_newqueries(self): return self.newSQL_queries def ScanQueries(self): for query in self.SQL_queries: self.ScanSingleQuery(query) def ScanSingleQuery(self, query): TrainMan = SQLValidator(query) TrainMan.Validate_SQL() self.newSQL_queries.append(TrainMan.get_ValidatedSQL()) class SQLValidator(): def __init__(self, sql): self.raw_sql = sql self.refinedSQL = '' # Mentioned below are the four regular expressions that cover 'most' of the DML syntax parameters # Most because i am not very strong with SQL :-) # Before using CFUnvalidatedGroupingRE, we need to make sure that a IN or VALUES has been used, else # it goes on for stored procedures too, and that is pretty evil self.variableRE = "'?#\w+\.?\w+\(?\w*\)?#'?(?![_|\w])" self.CFUnvalidatedAssignmentRE = "[\s*|\,]\w+\.?\w+\s=\s'?#\w+\.?\w+\(?\w*\)?#(?![_|\w])'?" self.CFUnvalidatedGroupingRE = "[\(|\,]\s*'?#\w+\.?\w+\(?\w*\)?#'?(?![_|\w])" self.CFUnvalidatedInGroupingRE = re.compile("\sIN\s+\(\s*'?#\w+\.?\w+\(?\w*\)?#'?(?![_|\w])", re.IGNORECASE) self.TypeMap = dict({'vch': 'CF_SQL_VARCHAR', 'bit': 'CF_SQL_BIT', 'time': 'CF_SQL_TIME', 'int': 'CF_SQL_INTEGER', 'now': 'CF_SQL_TIMESTAMP', 'dat': 'CF_SQL_TIMESTAMP', }) def Validate_SQL(self): 'picks up raw SQL and creates refined SQL' self.refinedSQL = re.sub(self.CFUnvalidatedAssignmentRE, self.handleUnvalidatedAssignment, self.raw_sql) self.refinedSQL = re.sub(self.CFUnvalidatedInGroupingRE, self.handleCFUnvalidatedIn, self.refinedSQL) self.refinedSQL = re.sub(self.CFUnvalidatedGroupingRE, self.handleUnvalidatedGrouping, self.refinedSQL) def handleUnvalidatedAssignment(self, s): tag = s.group(0) tokenPattern = re.compile("#\w+\.?\w+\(?\w*\)?#") LHSpattern = re.compile("[\s*|\,]\w+\.?\w+\s=\s") varcharPattern = re.compile("'#\w+\.?\w+\(?\w*\)?#'") token = re.findall(tokenPattern, tag)[0] predecessor = re.findall(LHSpattern, tag)[0] if len(re.findall(varcharPattern, tag)) != 0: newToken = self.handleIndividualTokens(token, predecessor, 'CF_SQL_VARCHAR') else: newToken = self.handleIndividualTokens(token, predecessor) finalValue = predecessor + newToken return finalValue def handleCFUnvalidatedIn(self, s): tag = s.group(0) tokenPattern = re.compile("#\w+\.?\w+\(?\w*\)?#") lhsPattern = re.compile("\sIN\s+\(\s*", re.IGNORECASE) varcharPattern = re.compile("'#\w+\.?\w+\(?\w*\)?#'") token = re.findall(tokenPattern, tag)[0] predecessor = re.findall(lhsPattern, tag)[0] if len(re.findall(varcharPattern, tag)) != 0: newToken = '<cfqueryparam value = "'+ token +'" cfsqltype="CF_SQL_VARCHAR" list="YES" separator="," />' else: newToken = '<cfqueryparam value = "'+ token +'" cfsqltype="'+self.findDataType(token, predecessor)+'" list="YES" separator="," />' finalValue = predecessor + newToken return finalValue def handleUnvalidatedGrouping(self, s): tag = s.group(0) tokenPattern = re.compile("#\w+\.?\w+\(?\w*\)?#") lhsPattern = re.compile("[\(|\,]\s*") varcharPattern = re.compile("'#\w+\.?\w+\(?\w*\)?#'") token = re.findall(tokenPattern, tag)[0] predecessor = re.findall(lhsPattern, tag)[0] if re.findall('IN|in|VALUES|values', self.refinedSQL) == []: return tag else: if re.findall(varcharPattern, tag) != []: newToken = self.handleIndividualTokens(token, predecessor, 'CF_SQL_VARCHAR') else: newToken = self.handleIndividualTokens(token, predecessor) finalValue = predecessor + newToken return finalValue def get_ValidatedSQL(self): return self.refinedSQL def handleIndividualTokens(self, token, predecessor, hint=''): if hint == '': return '<cfqueryparam value = "'+ token +'" cfsqltype="'+self.findDataType(token, predecessor)+'" />' else: return '<cfqueryparam value = "'+ token +'" cfsqltype="'+ hint +'" />' def findDataType(self, rvalue, lvalue='', hint=''): lvalue = rvalue.split('.')[0] try: rvalue = rvalue.split('.')[1] except: rvalue = None if string.find(lvalue.lower(), '#now()#') != -1: return 'CF_SQL_TIMESTAMP' if rvalue is not None: for k in self.TypeMap.keys(): if rvalue.lower().startswith(k): return self.TypeMap[k] else: for k in self.TypeMap.keys(): if lvalue.lower().startswith(k): return self.TypeMap[k] return 'CF_SQL_INTEGER' def ScanAndReplace(o, n, fullText): """ This function replaces all the old sql queries in a document with the new sql o = list of old queries n = list of new queries fullText = full text of the file, where in the operaration is supposed to be performed """ #fullText = fullText.decode('string_escape') if len(o) == len(n): for i in range(0,len(o)): fullText = fullText.replace( o[i], n[i]) #if fullText.find(o[i]) == -1: # print o[i] # print '----------------' # print n[i] return fullText def check_a_file(infilename): f = open(infilename) text = f.read() f.close() myCFMLParser = CFMLParser() myCFMLParser.feed(text) myCFMLParser.close() myCFMLParser.ScanQueries() o = myCFMLParser.get_oldqueries() n = myCFMLParser.get_newqueries() text = ScanAndReplace(o, n, text) f = open('o.cfm', 'w+') for oi in o: f.write(str(oi)) f.close() f = open('n.cfm', 'w+') f.write(text) f.close() if __name__ == '__main__': check_a_file('LMSCoursecreator.cfm')
Hope you all find it useful.
No comments:
Post a Comment