Flutter Engine
The Flutter Engine
Classes | Functions | Variables
scan_deps Namespace Reference

Classes

class  VarImpl
 

Functions

def extract_deps (deps_file)
 
def parse_readme ()
 
def get_common_ancestor (dep, deps_list)
 
def parse_args (args)
 
def write_manifest (deps, manifest_file)
 
def main (argv)
 

Variables

 SCRIPT_DIR = os.path.dirname(sys.argv[0])
 
 CHECKOUT_ROOT = os.path.realpath(os.path.join(SCRIPT_DIR, '..'))
 
string CHROMIUM_README_FILE = 'third_party/accessibility/README.md'
 
int CHROMIUM_README_COMMIT_LINE = 4
 
string CHROMIUM = 'https://chromium.googlesource.com/chromium/src'
 
string DEP_CLONE_DIR = CHECKOUT_ROOT + '/clone-test'
 
 DEPS = os.path.join(CHECKOUT_ROOT, 'DEPS')
 
string UPSTREAM_PREFIX = 'upstream_'
 

Function Documentation

◆ extract_deps()

def scan_deps.extract_deps (   deps_file)

Definition at line 54 of file scan_deps.py.

54def extract_deps(deps_file):
55 local_scope = {}
56 var = VarImpl(local_scope)
57 global_scope = {
58 'Var': var.lookup,
59 'deps_os': {},
60 }
61 # Read the content.
62 with open(deps_file, 'r') as file:
63 deps_content = file.read()
64
65 # Eval the content.
66 exec(deps_content, global_scope, local_scope)
67
68 if not os.path.exists(DEP_CLONE_DIR):
69 os.mkdir(DEP_CLONE_DIR) # Clone deps with upstream into temporary dir.
70
71 # Extract the deps and filter.
72 deps = local_scope.get('deps', {})
73 deps_list = local_scope.get('vars')
74 filtered_osv_deps = []
75 for _, dep in deps.items():
76 # We currently do not support packages or cipd which are represented
77 # as dictionaries.
78 if not isinstance(dep, str):
79 continue
80
81 dep_split = dep.rsplit('@', 1)
82 ancestor_result = get_common_ancestor([dep_split[0], dep_split[1]], deps_list)
83 if ancestor_result:
84 filtered_osv_deps.append({
85 'package': {'name': ancestor_result[1], 'commit': ancestor_result[0]}
86 })
87
88 try:
89 # Clean up cloned upstream dependency directory.
90 shutil.rmtree(DEP_CLONE_DIR) # Use shutil.rmtree since dir could be non-empty.
91 except OSError as clone_dir_error:
92 print('Error cleaning up clone directory: %s : %s' % (DEP_CLONE_DIR, clone_dir_error.strerror))
93
94 osv_result = {
95 'packageSource': {'path': deps_file, 'type': 'lockfile'}, 'packages': filtered_osv_deps
96 }
97 return osv_result
98
99
def print(*args, **kwargs)
Definition: run_tests.py:49
def get_common_ancestor(dep, deps_list)
Definition: scan_deps.py:121
def extract_deps(deps_file)
Definition: scan_deps.py:54

◆ get_common_ancestor()

def scan_deps.get_common_ancestor (   dep,
  deps_list 
)
Given an input of a mirrored dep,
compare to the mapping of deps to their upstream
in DEPS and find a common ancestor
commit SHA value.

This is done by first cloning the mirrored dep,
then a branch which tracks the upstream.
From there,  git merge-base operates using the HEAD
commit SHA of the upstream branch and the pinned
SHA value of the mirrored branch

Definition at line 121 of file scan_deps.py.

121def get_common_ancestor(dep, deps_list):
122 """
123 Given an input of a mirrored dep,
124 compare to the mapping of deps to their upstream
125 in DEPS and find a common ancestor
126 commit SHA value.
127
128 This is done by first cloning the mirrored dep,
129 then a branch which tracks the upstream.
130 From there, git merge-base operates using the HEAD
131 commit SHA of the upstream branch and the pinned
132 SHA value of the mirrored branch
133 """
134 # dep[0] contains the mirror repo.
135 # dep[1] contains the mirror's pinned SHA.
136 # upstream is the origin repo.
137 dep_name = dep[0].split('/')[-1].split('.')[0]
138 if UPSTREAM_PREFIX + dep_name not in deps_list:
139 print('did not find dep: ' + dep_name)
140 return None
141 try:
142 # Get the upstream URL from the mapping in DEPS file.
143 upstream = deps_list.get(UPSTREAM_PREFIX + dep_name)
144 temp_dep_dir = DEP_CLONE_DIR + '/' + dep_name
145 # Clone dependency from mirror.
146 subprocess.check_output(['git', 'clone', '--quiet', '--', dep[0], dep_name], cwd=DEP_CLONE_DIR)
147
148 # Create branch that will track the upstream dep.
149 print('attempting to add upstream remote from: {upstream}'.format(upstream=upstream))
150 subprocess.check_output(['git', 'remote', 'add', 'upstream', upstream], cwd=temp_dep_dir)
151 subprocess.check_output(['git', 'fetch', '--quiet', 'upstream'], cwd=temp_dep_dir)
152 # Get name of the default branch for upstream (e.g. main/master/etc.).
153 default_branch = subprocess.check_output(
154 'git remote show upstream ' + "| sed -n \'/HEAD branch/s/.*: //p\'",
155 cwd=temp_dep_dir,
156 shell=True
157 )
158 default_branch = byte_str_decode(default_branch)
159 default_branch = default_branch.strip()
160
161 # Make upstream branch track the upstream dep.
162 subprocess.check_output([
163 'git', 'checkout', '--force', '-b', 'upstream', '--track', 'upstream/' + default_branch
164 ],
165 cwd=temp_dep_dir)
166 # Get the most recent commit from default branch of upstream.
167 commit = subprocess.check_output(
168 'git for-each-ref ' + "--format=\'%(objectname:short)\' refs/heads/upstream",
169 cwd=temp_dep_dir,
170 shell=True
171 )
172 commit = byte_str_decode(commit)
173 commit = commit.strip()
174
175 # Perform merge-base on most recent default branch commit and pinned mirror commit.
176 ancestor_commit = subprocess.check_output(
177 'git merge-base {commit} {depUrl}'.format(commit=commit, depUrl=dep[1]),
178 cwd=temp_dep_dir,
179 shell=True
180 )
181 ancestor_commit = byte_str_decode(ancestor_commit)
182 ancestor_commit = ancestor_commit.strip()
183 print('Ancestor commit: ' + ancestor_commit)
184 return ancestor_commit, upstream
185 except subprocess.CalledProcessError as error:
186 print(
187 "Subprocess command '{0}' failed with exit code: {1}.".format(
188 error.cmd, str(error.returncode)
189 )
190 )
191 if error.output:
192 print("Subprocess error output: '{0}'".format(error.output))
193 return None
194
195
uint32_t uint32_t * format
def byte_str_decode(str_or_bytes)

◆ main()

def scan_deps.main (   argv)

Definition at line 225 of file scan_deps.py.

225def main(argv):
226 args = parse_args(argv)
227 deps = extract_deps(args.deps)
228 readme_deps = parse_readme()
229 write_manifest([deps, readme_deps], args.output)
230 return 0
231
232
def main(argv)
Definition: scan_deps.py:225
def parse_readme()
Definition: scan_deps.py:100
def write_manifest(deps, manifest_file)
Definition: scan_deps.py:218
static void parse_args(int argc, char *argv[], Args *args)
Definition: skqp_main.cpp:97

◆ parse_args()

def scan_deps.parse_args (   args)

Definition at line 196 of file scan_deps.py.

196def parse_args(args):
197 args = args[1:]
198 parser = argparse.ArgumentParser(description='A script to find common ancestor commit SHAs')
199
200 parser.add_argument(
201 '--deps',
202 '-d',
203 type=str,
204 help='Input DEPS file to extract.',
205 default=os.path.join(CHECKOUT_ROOT, 'DEPS')
206 )
207 parser.add_argument(
208 '--output',
209 '-o',
210 type=str,
211 help='Output osv-scanner compatible deps file.',
212 default=os.path.join(CHECKOUT_ROOT, 'osv-lockfile.json')
213 )
214
215 return parser.parse_args(args)
216
217

◆ parse_readme()

def scan_deps.parse_readme ( )
Opens the Flutter Accessibility Library README and uses the commit hash
found in the README to check for viulnerabilities.
The commit hash in this README will always be in the same format

Definition at line 100 of file scan_deps.py.

100def parse_readme():
101 """
102 Opens the Flutter Accessibility Library README and uses the commit hash
103 found in the README to check for viulnerabilities.
104 The commit hash in this README will always be in the same format
105 """
106 file_path = os.path.join(CHECKOUT_ROOT, CHROMIUM_README_FILE)
107 with open(file_path) as file:
108 # Read the content of the file opened.
109 content = file.readlines()
110 commit_line = content[CHROMIUM_README_COMMIT_LINE]
111 commit = re.search(r'(?<=\[).*(?=\])', commit_line)
112
113 osv_result = {
114 'packageSource': {'path': file_path, 'type': 'lockfile'},
115 'packages': [{'package': {'name': CHROMIUM, 'commit': commit.group()}}]
116 }
117
118 return osv_result
119
120

◆ write_manifest()

def scan_deps.write_manifest (   deps,
  manifest_file 
)

Definition at line 218 of file scan_deps.py.

218def write_manifest(deps, manifest_file):
219 output = {'results': deps}
220 print(json.dumps(output, indent=2))
221 with open(manifest_file, 'w') as manifest:
222 json.dump(output, manifest, indent=2)
223
224

Variable Documentation

◆ CHECKOUT_ROOT

scan_deps.CHECKOUT_ROOT = os.path.realpath(os.path.join(SCRIPT_DIR, '..'))

Definition at line 25 of file scan_deps.py.

◆ CHROMIUM

string scan_deps.CHROMIUM = 'https://chromium.googlesource.com/chromium/src'

Definition at line 28 of file scan_deps.py.

◆ CHROMIUM_README_COMMIT_LINE

int scan_deps.CHROMIUM_README_COMMIT_LINE = 4

Definition at line 27 of file scan_deps.py.

◆ CHROMIUM_README_FILE

string scan_deps.CHROMIUM_README_FILE = 'third_party/accessibility/README.md'

Definition at line 26 of file scan_deps.py.

◆ DEP_CLONE_DIR

string scan_deps.DEP_CLONE_DIR = CHECKOUT_ROOT + '/clone-test'

Definition at line 29 of file scan_deps.py.

◆ DEPS

scan_deps.DEPS = os.path.join(CHECKOUT_ROOT, 'DEPS')

Definition at line 30 of file scan_deps.py.

◆ SCRIPT_DIR

scan_deps.SCRIPT_DIR = os.path.dirname(sys.argv[0])

Definition at line 24 of file scan_deps.py.

◆ UPSTREAM_PREFIX

string scan_deps.UPSTREAM_PREFIX = 'upstream_'

Definition at line 31 of file scan_deps.py.