Skip to content
Snippets Groups Projects
Commit 0122ad15 authored by Millian Poquet's avatar Millian Poquet
Browse files

wip: base code to manage weekly slots

parent 4d8b7336
No related tags found
No related merge requests found
......@@ -3,6 +3,10 @@
url = "https://github.com/NixOS/nixpkgs/archive/21.11.tar.gz";
sha256 = "sha256:162dywda2dvfj1248afxc45kcrg83appjd0nmdb541hl7rnncf02";
}) {}
, kapack ? import (fetchTarball {
url = "https://github.com/oar-team/nur-kapack/archive/052fb35eb29228d9e4ea8afa09e9f0e390782cbd.tar.gz";
sha256 = "sha256:0bvsgm5wv5zh3isi51sxhyryxh6g0x29id4f68c07nwvsq6qlmr9";
}) {inherit pkgs;}
}:
let
......@@ -30,6 +34,7 @@ in rec {
requests
click
beautifulsoup4
kapack.procset
];
};
......@@ -37,6 +42,7 @@ in rec {
buildInputs = with pyPkgs; [
ipython
lflex_celcat_survival
ipdb
];
};
......
......@@ -3,3 +3,4 @@ from . import course_request
from . import events
from . import fetch
from . import ics
from . import slot_parse
......@@ -8,7 +8,8 @@ def parse_credentials_from_file(credentials_filename):
credentials_dict = json.load(f)
username = credentials_dict['username']
password = credentials_dict['password']
return username, password
teacher_code = credentials_dict['teacher_code']
return username, password, teacher_code
def create_authenticated_session(username, password):
s = requests.Session()
......
......@@ -14,14 +14,25 @@ import lflex_celcat_survival as lcs
def main(course_request_file, credentials_file, json, csv_raw, csv, ics, csv_no_description):
logging.basicConfig(level=logging.INFO)
req = lcs.course_request.CourseRequest(course_request_file)
if all(o is None for o in [json, csv_raw, csv, ics]):
logging.warning('No option set, doing nothing.')
return
#req = lcs.course_request.CourseRequest(course_request_file)
#if all(o is None for o in [json, csv_raw, csv, ics]):
# logging.warning('No option set, doing nothing.')
# return
username, password = lcs.auth.parse_credentials_from_file(credentials_file)
username, password, teacher_code = lcs.auth.parse_credentials_from_file(credentials_file)
session = lcs.auth.create_authenticated_session(username, password)
requested_slots_df = lcs.slot_parse.read_weekslot_csv(course_request_file, 'fr', 2)
celcat_slots = lcs.events.request_slots_by_mod_code(requested_slots_df, session)
# slots listed in entry file but absent from celcat
print(celcat_slots[celcat_slots['slot_in_celcat'].isna()])
# slots listed in entry file and on celcat, but with no reserved room
print(celcat_slots[celcat_slots['room_parsed'] == 'unset'])
return
celcat_raw_response = req.do_request(session)
if json is not None:
with open(json, 'w') as f:
......
from html import unescape
import itertools
import logging
import pandas as pd
import re
from . import fetch
ROOM_RE = re.compile(r'^(?:FSI|F2SMH) / (.*)$')
COURSE_TYPE_RE = re.compile(r'COURS|COURS/TD|TD|TP|CONTROLE CONTINU|CONTROLE PARTIEL')
STUDENT_GROUP_RE = re.compile(r'K?IN[A-Z0-9]+')
class CelcatEvents:
def __init__(self, celcat_raw_response):
......@@ -15,7 +23,7 @@ class FilteredCelcatEvents:
self.crossed_df = celcat_events.df.merge(course_request.df, how='cross')
# parse descriptions
parsed_desc_df = self.crossed_df.apply(FilteredCelcatEvents.parse_description, axis=1)
parsed_desc_df = self.crossed_df.apply(parse_description, axis=1)
self.crossed_df = pd.concat([self.crossed_df.reset_index(drop=True), parsed_desc_df], axis=1)
self.crossed_df['keep'] = self.crossed_df.apply(lambda row: FilteredCelcatEvents.timeslot_matches_course(row), axis=1)
......@@ -60,44 +68,100 @@ class FilteredCelcatEvents:
problematic_time_slots = problematic_time_slots.sort_values(by=['course_request_id', 'start'])[['course_request_id', 'module_apogee', 'module_readable', 'start', 'end', 'course_type', 'group']]
logging.warning(f'\n{problematic_time_slots}')
def parse_description(row):
'''
Expecting an HTML text with this information, separated by HTML/CRCF line breaks:
- (The room where the course takes place): optional
- The apogee code of the course and its readable name
- A list of student groups that should attend this course
- The course type
Example: 'FSI / U3-01\r\n\r\n<br />\r\n\r\nKINX7AD1 - Parall&#233;lisme [KINX7AD1]\r\n\r\n<br />\r\n\r\nKINB7TPA41<br />KINB7TPA42\r\n\r\n<br />\r\n\r\nTD\r\n'
'''
desc = row['description'].replace('\n', '').replace('\r', '')
fields = [x.strip() for x in desc.split('<br />')]
room = 'unset'
groups_joined = 'unset'
course_type = 'unset'
if len(fields) == 0:
raise ValueError(f'There should be at least 1 field, but fields are {fields}')
elif len(fields) == 1:
# probably not a course. examples: "CONGES\r\n" or "FERIE\r\n"
course_type = fields[0]
else:
# first field should be the room, but this is not always set
room = 'unset'
if fields[0].startswith('FSI / '):
room = fields[0].replace('FSI / ', '')
fields = fields[1:]
# let us assume that the second field is the course name
def parse_description(row):
'''
Expecting an HTML text with this information, separated by HTML/CRCF line breaks:
- (The rooms where the course takes place): optional and there can be multiple rooms
- The apogee code of the course and its readable name
- A list of student groups that should attend this course
- (The teacher name): optional
- The course type
- (Random misc. info): optional
Example: 'FSI / U3-01\r\n\r\n<br />\r\n\r\nKINX7AD1 - Parall&#233;lisme [KINX7AD1]\r\n\r\n<br />\r\n\r\nKINB7TPA41<br />KINB7TPA42\r\n\r\n<br />\r\n\r\nTD\r\n'
'FSI / Amphi GRIGNARD (bat 2A)\r\n\r\n<br />\r\n\r\nKINXIB11 - Bas\r\n\r\n<br />\r\n\r\nINXIB11A\r\n\r\n<br />\r\n\r\nCOLLET CHRISTOPHE\r\n\r\n<br />\r\n\r\nCOURS\r\n\r\n<br />\r\n\r\nSem 36 &#224; 42 partie syst&#232;me\nSem 43 &#224; 50 parti Archi\r\n'
'FSI / Amphi VANDEL (U2-A4)<br />FSI / U2-115\r\n\r\n<br />\r\n\r\nKINXIB11 - Bas\r\n\r\n<br />\r\n\r\nINXIB11A\r\n\r\n<br />\r\n\r\nCOLLET CHRISTOPHE\r\n\r\n<br />\r\n\r\nCONTROLE CONTINU\r\n\r\n<br />\r\n\r\nSalle TD en U2 pour ESH 22012044, 22307815, 22304984, 22400685, 22307573\nPartie syst&#232;me CC1 = Sem39, CC2=Sem42 et CC4 = Sem45\nPartie Archi CC3=Sem48 et CC4 = Sem50\r\n'
'FSI / U3-105\r\n\r\n<br />\r\n\r\nKINX7AD1 - Parall&#233;lisme\r\n\r\n<br />\r\n\r\nKINM7CM<br />KINM7TDA5\r\n\r\n<br />\r\n\r\nTD\r\n'
'''
desc = unescape(row['description']).replace('\n', '').replace('\r', '')
fields = [x.strip() for x in desc.split('<br />')]
preparse_fields = fields[:]
rooms = []
teacher = 'unset'
groups = []
course_type = 'unset'
if len(fields) == 0:
raise ValueError(f'There should be at least 1 field, but fields are {fields}')
elif len(fields) == 1:
# probably not a course. examples: "CONGES\r\n" or "FERIE\r\n"
course_type = fields[0]
else:
# first fields should be the room, but this is not always set
while (m := ROOM_RE.match(fields[0])) is not None:
rooms.append(m[0])
fields = fields[1:]
# last field should be the course type
course_type = fields[-1]
# assume that the next field is the course name, and skip it
fields = fields[1:]
# skip notes at the end of the fields until they look like a course type
while COURSE_TYPE_RE.match(fields[-1]) is None:
fields = fields[:-1]
if len(fields) <= 0:
break
# last field is a course type
course_type = fields[-1]
fields = fields[:-1]
# all remaining fields should be student groups
groups = fields[:-1]
groups_joined = ' '.join(groups)
# the last field may be a teacher, but this is optional
if STUDENT_GROUP_RE.match(fields[-1]) is None:
teacher = fields[-1]
fields = fields[:-1]
# all remaining fields should be student groups
groups = []
while len(fields) > 0 and (m := STUDENT_GROUP_RE.match(fields[0])) is not None:
groups.append(m[0])
fields = fields[1:]
return pd.Series([room, course_type, groups_joined], index=['room_parsed', 'course_type_parsed', 'groups_parsed'])
if len(rooms) == 0:
rooms = ['unset']
if len(groups) == 0:
groups = ['unset']
return pd.Series([rooms, teacher, course_type, groups], index=['rooms_parsed', 'teacher_parsed', 'course_type_parsed', 'groups_parsed'])
def request_slots_by_mod_code(flat_slot_df, session):
subject_codes = list(flat_slot_df['mod_code'].dropna().unique())
min_start_dt = flat_slot_df['start_dt'].min()
max_end_dt = flat_slot_df['end_dt'].max()
raw_response = fetch.do_celcat_request_subjects(min_start_dt, max_end_dt, subject_codes, session)
celcat_slots = CelcatEvents(raw_response)
celcat_df = celcat_slots.df
parsed_df = celcat_df.apply(parse_description, axis=1)
celcat_df = pd.concat([celcat_df.reset_index(drop=True), parsed_df.reset_index(drop=True)], axis=1).reset_index(drop=True)
flat_celcat_rows = []
for _, row in celcat_df.iterrows():
for room_parsed, group_parsed, module in itertools.product(row['rooms_parsed'], row['groups_parsed'], row['modules']):
flat_celcat_rows.append({
'start_dt': row['start'],
'end_dt': row['end'],
'eventCategory': row['eventCategory'],
'room_parsed': room_parsed,
'teacher_parsed': row['teacher_parsed'],
'course_type_parsed': row['course_type_parsed'],
'student_group': group_parsed,
'mod_code': module,
'slot_in_celcat': True
})
flat_celcat_df = pd.DataFrame(flat_celcat_rows)
merged = pd.merge(flat_slot_df, flat_celcat_df, how='left')
merged = merged.sort_values(by=['start_dt', 'end_dt'])
return merged
import datetime
import itertools
import pandas as pd
import re
from procset import ProcSet
SLOT_RE_TEMPLATE = r'^(?P<weekday>WEEKDAYLIST)(?P<hour>\d{2})h(?P<minute>\d{2})?$'
DURATION_RE_STR = r'^(?P<hour>\d{1,2})h(?P<minute>\d{1,2})?$'
DURATION_RE = re.compile(DURATION_RE_STR)
ACADEMIC_YEAR_RE_STR = r'^(?P<beginyear>\d{4})-(?P<endyear>\d{4})$'
ACADEMIC_YEAR_RE = re.compile(ACADEMIC_YEAR_RE_STR)
def gen_parsable_weekdays(lang, nb_char):
'''
Generate a list of truncated weekdays, and a string->isoweekday map to parse & interpret results
Args:
lang: The language to use, such as 'fr' for French or 'en' for English.
nb_char: The number of characters to use to represent each week day.
Returns:
list(str): The ordered list of truncated week day names. In iso order (Monday to Sunday).
dict(str, int): A map from truncated week day names to their iso number (1 is Monday, 7 is Sunday).
'''
lang_to_weekdays = {
'en': ['Monday', 'Tuesday', 'Wednesday', 'Thursday', 'Friday', 'Saturday', 'Sunday'],
'fr': ['Lundi', 'Mardi', 'Mercredi', 'Jeudi', 'Vendredi', 'Samedi', 'Dimanche'],
}
if lang not in lang_to_weekdays:
raise ValueError(f'lang={lang} is not supported. supported languages: {sorted(list(lang_to_weekdays.keys()))}')
weekdays = lang_to_weekdays[lang]
trunc_weekdays = [wd[:nb_char] for wd in weekdays]
assert len(set(trunc_weekdays)) == len(trunc_weekdays), f"Invalid weekday format: using lang={lang} with {nb_char} characters creates non-unique truncated weekdays {trunc_weekdays}"
trunc_weekdays_map = {wd: num+1 for num, wd in enumerate(trunc_weekdays)}
return trunc_weekdays, trunc_weekdays_map
def gen_slot_parser(lang, weekday_nb_char):
'''
Generate a parser (compiled regex and truncated weekday name to iso weekday map) for a given lang and number of characters per weekday
Args:
lang: The language to use, such as 'fr' for French or 'en' for English.
nb_char: The number of characters to use to represent each week day.
Returns:
re.Pattern: The compiled regular expression that can parse a slot.
dict(str, int): A map from truncated week day names to their iso number (1 is Monday, 7 is Sunday).
'''
weekdays, weekday_parse_map = gen_parsable_weekdays(lang, weekday_nb_char)
daylist = '|'.join(weekdays)
re_str = SLOT_RE_TEMPLATE.replace('WEEKDAYLIST', daylist)
r = re.compile(re_str)
return r, weekday_parse_map
def slot_to_dt(slot: str, year: int, week: int, re_parser: re.Pattern, wd_iso_map: dict[str, int]):
'''
Generate a time point (datetime) from a slot and context (year, int) and parsing information
'''
m = re_parser.match(slot)
if m is None:
raise ValueError(f"Slot '{slot}' could not be parsed")
wd_iso = wd_iso_map[m['weekday']]
hours = int(m['hour'])
minutes = m['minute'] or '0'
minutes = int(minutes)
dt = datetime.datetime.fromisocalendar(year, week, wd_iso)
dt = dt + datetime.timedelta(hours=hours, minutes=minutes)
return dt
def duration_to_timedelta(duration: str):
'''
Parse a string duration to a timedelta.
'''
m = DURATION_RE.match(duration)
if m is None:
raise ValueError(f"Duration '{duration}' could not be parsed")
hours = int(m['hour'])
minutes = m['minute'] or '0'
minutes = int(minutes)
delta = datetime.timedelta(hours=hours, minutes=minutes)
return delta
def year_from_academic_year_week(academic_year, week, week_cut=32):
'''
Determine the year to use of an (academic year, week) tuple depending on whether week is before of after cut
'''
m = ACADEMIC_YEAR_RE.match(academic_year)
if m is None:
raise ValueError(f"Academic year '{academic_year}' could not be parsed")
begin_year = int(m['beginyear'])
end_year = int(m['endyear'])
if end_year != begin_year + 1:
raise ValueError(f"Invalid academic year '{academic_year}': years should be consecutive")
if week <= week_cut:
return end_year
return begin_year
def read_weekslot_csv(filename, slot_lang, slot_nb_char):
col_types = {
'mod_code': str,
'display_name': str,
'student_group': str,
'slots': str,
'duration': str,
'academic_year': str,
'weeks': str,
}
df = pd.read_csv(filename, dtype=col_types)
re_parser, wd_iso_map = gen_slot_parser(slot_lang, slot_nb_char)
flat_slots = []
for index, row in df.iterrows():
slots = row['slots'].split()
weeks = ProcSet.from_str(row['weeks'])
for slot, week in itertools.product(slots, weeks):
year = year_from_academic_year_week(row['academic_year'], week)
dt_begin = slot_to_dt(slot, year, week, re_parser, wd_iso_map)
dt_end = dt_begin + duration_to_timedelta(row['duration'])
flat_slots.append({
'mod_code': row['mod_code'],
'display_name': row['display_name'],
'student_group': row['student_group'],
'start_dt': dt_begin,
'end_dt': dt_end,
})
flat_df = pd.DataFrame(flat_slots)
return flat_df
......@@ -26,7 +26,8 @@ dependencies = [
"pandas>=1.3.0",
"requests>=2.26.0",
"click>=8.0.0",
"beautifulsoup4>=4.10.0"
"beautifulsoup4>=4.10.0",
"procset>=1.0"
]
[project.scripts]
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment