jsocket.cpp 204 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524152515261527152815291530153115321533153415351536153715381539154015411542154315441545154615471548154915501551155215531554155515561557155815591560156115621563156415651566156715681569157015711572157315741575157615771578157915801581158215831584158515861587158815891590159115921593159415951596159715981599160016011602160316041605160616071608160916101611161216131614161516161617161816191620162116221623162416251626162716281629163016311632163316341635163616371638163916401641164216431644164516461647164816491650165116521653165416551656165716581659166016611662166316641665166616671668166916701671167216731674167516761677167816791680168116821683168416851686168716881689169016911692169316941695169616971698169917001701170217031704170517061707170817091710171117121713171417151716171717181719172017211722172317241725172617271728172917301731173217331734173517361737173817391740174117421743174417451746174717481749175017511752175317541755175617571758175917601761176217631764176517661767176817691770177117721773177417751776177717781779178017811782178317841785178617871788178917901791179217931794179517961797179817991800180118021803180418051806180718081809181018111812181318141815181618171818181918201821182218231824182518261827182818291830183118321833183418351836183718381839184018411842184318441845184618471848184918501851185218531854185518561857185818591860186118621863186418651866186718681869187018711872187318741875187618771878187918801881188218831884188518861887188818891890189118921893189418951896189718981899190019011902190319041905190619071908190919101911191219131914191519161917191819191920192119221923192419251926192719281929193019311932193319341935193619371938193919401941194219431944194519461947194819491950195119521953195419551956195719581959196019611962196319641965196619671968196919701971197219731974197519761977197819791980198119821983198419851986198719881989199019911992199319941995199619971998199920002001200220032004200520062007200820092010201120122013201420152016201720182019202020212022202320242025202620272028202920302031203220332034203520362037203820392040204120422043204420452046204720482049205020512052205320542055205620572058205920602061206220632064206520662067206820692070207120722073207420752076207720782079208020812082208320842085208620872088208920902091209220932094209520962097209820992100210121022103210421052106210721082109211021112112211321142115211621172118211921202121212221232124212521262127212821292130213121322133213421352136213721382139214021412142214321442145214621472148214921502151215221532154215521562157215821592160216121622163216421652166216721682169217021712172217321742175217621772178217921802181218221832184218521862187218821892190219121922193219421952196219721982199220022012202220322042205220622072208220922102211221222132214221522162217221822192220222122222223222422252226222722282229223022312232223322342235223622372238223922402241224222432244224522462247224822492250225122522253225422552256225722582259226022612262226322642265226622672268226922702271227222732274227522762277227822792280228122822283228422852286228722882289229022912292229322942295229622972298229923002301230223032304230523062307230823092310231123122313231423152316231723182319232023212322232323242325232623272328232923302331233223332334233523362337233823392340234123422343234423452346234723482349235023512352235323542355235623572358235923602361236223632364236523662367236823692370237123722373237423752376237723782379238023812382238323842385238623872388238923902391239223932394239523962397239823992400240124022403240424052406240724082409241024112412241324142415241624172418241924202421242224232424242524262427242824292430243124322433243424352436243724382439244024412442244324442445244624472448244924502451245224532454245524562457245824592460246124622463246424652466246724682469247024712472247324742475247624772478247924802481248224832484248524862487248824892490249124922493249424952496249724982499250025012502250325042505250625072508250925102511251225132514251525162517251825192520252125222523252425252526252725282529253025312532253325342535253625372538253925402541254225432544254525462547254825492550255125522553255425552556255725582559256025612562256325642565256625672568256925702571257225732574257525762577257825792580258125822583258425852586258725882589259025912592259325942595259625972598259926002601260226032604260526062607260826092610261126122613261426152616261726182619262026212622262326242625262626272628262926302631263226332634263526362637263826392640264126422643264426452646264726482649265026512652265326542655265626572658265926602661266226632664266526662667266826692670267126722673267426752676267726782679268026812682268326842685268626872688268926902691269226932694269526962697269826992700270127022703270427052706270727082709271027112712271327142715271627172718271927202721272227232724272527262727272827292730273127322733273427352736273727382739274027412742274327442745274627472748274927502751275227532754275527562757275827592760276127622763276427652766276727682769277027712772277327742775277627772778277927802781278227832784278527862787278827892790279127922793279427952796279727982799280028012802280328042805280628072808280928102811281228132814281528162817281828192820282128222823282428252826282728282829283028312832283328342835283628372838283928402841284228432844284528462847284828492850285128522853285428552856285728582859286028612862286328642865286628672868286928702871287228732874287528762877287828792880288128822883288428852886288728882889289028912892289328942895289628972898289929002901290229032904290529062907290829092910291129122913291429152916291729182919292029212922292329242925292629272928292929302931293229332934293529362937293829392940294129422943294429452946294729482949295029512952295329542955295629572958295929602961296229632964296529662967296829692970297129722973297429752976297729782979298029812982298329842985298629872988298929902991299229932994299529962997299829993000300130023003300430053006300730083009301030113012301330143015301630173018301930203021302230233024302530263027302830293030303130323033303430353036303730383039304030413042304330443045304630473048304930503051305230533054305530563057305830593060306130623063306430653066306730683069307030713072307330743075307630773078307930803081308230833084308530863087308830893090309130923093309430953096309730983099310031013102310331043105310631073108310931103111311231133114311531163117311831193120312131223123312431253126312731283129313031313132313331343135313631373138313931403141314231433144314531463147314831493150315131523153315431553156315731583159316031613162316331643165316631673168316931703171317231733174317531763177317831793180318131823183318431853186318731883189319031913192319331943195319631973198319932003201320232033204320532063207320832093210321132123213321432153216321732183219322032213222322332243225322632273228322932303231323232333234323532363237323832393240324132423243324432453246324732483249325032513252325332543255325632573258325932603261326232633264326532663267326832693270327132723273327432753276327732783279328032813282328332843285328632873288328932903291329232933294329532963297329832993300330133023303330433053306330733083309331033113312331333143315331633173318331933203321332233233324332533263327332833293330333133323333333433353336333733383339334033413342334333443345334633473348334933503351335233533354335533563357335833593360336133623363336433653366336733683369337033713372337333743375337633773378337933803381338233833384338533863387338833893390339133923393339433953396339733983399340034013402340334043405340634073408340934103411341234133414341534163417341834193420342134223423342434253426342734283429343034313432343334343435343634373438343934403441344234433444344534463447344834493450345134523453345434553456345734583459346034613462346334643465346634673468346934703471347234733474347534763477347834793480348134823483348434853486348734883489349034913492349334943495349634973498349935003501350235033504350535063507350835093510351135123513351435153516351735183519352035213522352335243525352635273528352935303531353235333534353535363537353835393540354135423543354435453546354735483549355035513552355335543555355635573558355935603561356235633564356535663567356835693570357135723573357435753576357735783579358035813582358335843585358635873588358935903591359235933594359535963597359835993600360136023603360436053606360736083609361036113612361336143615361636173618361936203621362236233624362536263627362836293630363136323633363436353636363736383639364036413642364336443645364636473648364936503651365236533654365536563657365836593660366136623663366436653666366736683669367036713672367336743675367636773678367936803681368236833684368536863687368836893690369136923693369436953696369736983699370037013702370337043705370637073708370937103711371237133714371537163717371837193720372137223723372437253726372737283729373037313732373337343735373637373738373937403741374237433744374537463747374837493750375137523753375437553756375737583759376037613762376337643765376637673768376937703771377237733774377537763777377837793780378137823783378437853786378737883789379037913792379337943795379637973798379938003801380238033804380538063807380838093810381138123813381438153816381738183819382038213822382338243825382638273828382938303831383238333834383538363837383838393840384138423843384438453846384738483849385038513852385338543855385638573858385938603861386238633864386538663867386838693870387138723873387438753876387738783879388038813882388338843885388638873888388938903891389238933894389538963897389838993900390139023903390439053906390739083909391039113912391339143915391639173918391939203921392239233924392539263927392839293930393139323933393439353936393739383939394039413942394339443945394639473948394939503951395239533954395539563957395839593960396139623963396439653966396739683969397039713972397339743975397639773978397939803981398239833984398539863987398839893990399139923993399439953996399739983999400040014002400340044005400640074008400940104011401240134014401540164017401840194020402140224023402440254026402740284029403040314032403340344035403640374038403940404041404240434044404540464047404840494050405140524053405440554056405740584059406040614062406340644065406640674068406940704071407240734074407540764077407840794080408140824083408440854086408740884089409040914092409340944095409640974098409941004101410241034104410541064107410841094110411141124113411441154116411741184119412041214122412341244125412641274128412941304131413241334134413541364137413841394140414141424143414441454146414741484149415041514152415341544155415641574158415941604161416241634164416541664167416841694170417141724173417441754176417741784179418041814182418341844185418641874188418941904191419241934194419541964197419841994200420142024203420442054206420742084209421042114212421342144215421642174218421942204221422242234224422542264227422842294230423142324233423442354236423742384239424042414242424342444245424642474248424942504251425242534254425542564257425842594260426142624263426442654266426742684269427042714272427342744275427642774278427942804281428242834284428542864287428842894290429142924293429442954296429742984299430043014302430343044305430643074308430943104311431243134314431543164317431843194320432143224323432443254326432743284329433043314332433343344335433643374338433943404341434243434344434543464347434843494350435143524353435443554356435743584359436043614362436343644365436643674368436943704371437243734374437543764377437843794380438143824383438443854386438743884389439043914392439343944395439643974398439944004401440244034404440544064407440844094410441144124413441444154416441744184419442044214422442344244425442644274428442944304431443244334434443544364437443844394440444144424443444444454446444744484449445044514452445344544455445644574458445944604461446244634464446544664467446844694470447144724473447444754476447744784479448044814482448344844485448644874488448944904491449244934494449544964497449844994500450145024503450445054506450745084509451045114512451345144515451645174518451945204521452245234524452545264527452845294530453145324533453445354536453745384539454045414542454345444545454645474548454945504551455245534554455545564557455845594560456145624563456445654566456745684569457045714572457345744575457645774578457945804581458245834584458545864587458845894590459145924593459445954596459745984599460046014602460346044605460646074608460946104611461246134614461546164617461846194620462146224623462446254626462746284629463046314632463346344635463646374638463946404641464246434644464546464647464846494650465146524653465446554656465746584659466046614662466346644665466646674668466946704671467246734674467546764677467846794680468146824683468446854686468746884689469046914692469346944695469646974698469947004701470247034704470547064707470847094710471147124713471447154716471747184719472047214722472347244725472647274728472947304731473247334734473547364737473847394740474147424743474447454746474747484749475047514752475347544755475647574758475947604761476247634764476547664767476847694770477147724773477447754776477747784779478047814782478347844785478647874788478947904791479247934794479547964797479847994800480148024803480448054806480748084809481048114812481348144815481648174818481948204821482248234824482548264827482848294830483148324833483448354836483748384839484048414842484348444845484648474848484948504851485248534854485548564857485848594860486148624863486448654866486748684869487048714872487348744875487648774878487948804881488248834884488548864887488848894890489148924893489448954896489748984899490049014902490349044905490649074908490949104911491249134914491549164917491849194920492149224923492449254926492749284929493049314932493349344935493649374938493949404941494249434944494549464947494849494950495149524953495449554956495749584959496049614962496349644965496649674968496949704971497249734974497549764977497849794980498149824983498449854986498749884989499049914992499349944995499649974998499950005001500250035004500550065007500850095010501150125013501450155016501750185019502050215022502350245025502650275028502950305031503250335034503550365037503850395040504150425043504450455046504750485049505050515052505350545055505650575058505950605061506250635064506550665067506850695070507150725073507450755076507750785079508050815082508350845085508650875088508950905091509250935094509550965097509850995100510151025103510451055106510751085109511051115112511351145115511651175118511951205121512251235124512551265127512851295130513151325133513451355136513751385139514051415142514351445145514651475148514951505151515251535154515551565157515851595160516151625163516451655166516751685169517051715172517351745175517651775178517951805181518251835184518551865187518851895190519151925193519451955196519751985199520052015202520352045205520652075208520952105211521252135214521552165217521852195220522152225223522452255226522752285229523052315232523352345235523652375238523952405241524252435244524552465247524852495250525152525253525452555256525752585259526052615262526352645265526652675268526952705271527252735274527552765277527852795280528152825283528452855286528752885289529052915292529352945295529652975298529953005301530253035304530553065307530853095310531153125313531453155316531753185319532053215322532353245325532653275328532953305331533253335334533553365337533853395340534153425343534453455346534753485349535053515352535353545355535653575358535953605361536253635364536553665367536853695370537153725373537453755376537753785379538053815382538353845385538653875388538953905391539253935394539553965397539853995400540154025403540454055406540754085409541054115412541354145415541654175418541954205421542254235424542554265427542854295430543154325433543454355436543754385439544054415442544354445445544654475448544954505451545254535454545554565457545854595460546154625463546454655466546754685469547054715472547354745475547654775478547954805481548254835484548554865487548854895490549154925493549454955496549754985499550055015502550355045505550655075508550955105511551255135514551555165517551855195520552155225523552455255526552755285529553055315532553355345535553655375538553955405541554255435544554555465547554855495550555155525553555455555556555755585559556055615562556355645565556655675568556955705571557255735574557555765577557855795580558155825583558455855586558755885589559055915592559355945595559655975598559956005601560256035604560556065607560856095610561156125613561456155616561756185619562056215622562356245625562656275628562956305631563256335634563556365637563856395640564156425643564456455646564756485649565056515652565356545655565656575658565956605661566256635664566556665667566856695670567156725673567456755676567756785679568056815682568356845685568656875688568956905691569256935694569556965697569856995700570157025703570457055706570757085709571057115712571357145715571657175718571957205721572257235724572557265727572857295730573157325733573457355736573757385739574057415742574357445745574657475748574957505751575257535754575557565757575857595760576157625763576457655766576757685769577057715772577357745775577657775778577957805781578257835784578557865787578857895790579157925793579457955796579757985799580058015802580358045805580658075808580958105811581258135814581558165817581858195820582158225823582458255826582758285829583058315832583358345835583658375838583958405841584258435844584558465847584858495850585158525853585458555856585758585859586058615862586358645865586658675868586958705871587258735874587558765877587858795880588158825883588458855886588758885889589058915892589358945895589658975898589959005901590259035904590559065907590859095910591159125913591459155916591759185919592059215922592359245925592659275928592959305931593259335934593559365937593859395940594159425943594459455946594759485949595059515952595359545955595659575958595959605961596259635964596559665967596859695970597159725973597459755976597759785979598059815982598359845985598659875988598959905991599259935994599559965997599859996000600160026003600460056006600760086009601060116012601360146015601660176018601960206021602260236024602560266027602860296030603160326033603460356036603760386039604060416042604360446045604660476048604960506051605260536054605560566057605860596060606160626063606460656066606760686069607060716072607360746075607660776078607960806081608260836084608560866087608860896090609160926093609460956096609760986099610061016102610361046105610661076108610961106111611261136114611561166117611861196120612161226123612461256126612761286129613061316132613361346135613661376138613961406141614261436144614561466147614861496150615161526153615461556156615761586159616061616162616361646165616661676168616961706171617261736174617561766177617861796180618161826183618461856186618761886189619061916192619361946195619661976198619962006201620262036204620562066207620862096210621162126213621462156216621762186219622062216222622362246225622662276228622962306231623262336234623562366237623862396240624162426243624462456246624762486249625062516252625362546255625662576258625962606261626262636264626562666267626862696270627162726273627462756276627762786279628062816282628362846285628662876288628962906291629262936294629562966297629862996300630163026303630463056306630763086309631063116312631363146315631663176318631963206321632263236324632563266327632863296330633163326333633463356336633763386339634063416342634363446345634663476348634963506351635263536354635563566357635863596360636163626363636463656366636763686369637063716372637363746375637663776378637963806381638263836384638563866387638863896390639163926393639463956396639763986399640064016402640364046405640664076408640964106411641264136414641564166417641864196420642164226423642464256426642764286429643064316432643364346435643664376438643964406441644264436444644564466447644864496450645164526453645464556456645764586459646064616462646364646465646664676468646964706471647264736474647564766477647864796480648164826483648464856486648764886489649064916492649364946495649664976498649965006501650265036504650565066507650865096510651165126513651465156516651765186519652065216522652365246525652665276528652965306531653265336534653565366537653865396540654165426543654465456546654765486549655065516552655365546555655665576558655965606561656265636564656565666567656865696570657165726573657465756576657765786579658065816582658365846585658665876588658965906591659265936594659565966597659865996600660166026603660466056606660766086609661066116612661366146615661666176618661966206621662266236624662566266627662866296630663166326633663466356636663766386639664066416642664366446645664666476648664966506651665266536654665566566657665866596660666166626663666466656666666766686669667066716672667366746675667666776678667966806681668266836684668566866687668866896690669166926693669466956696669766986699670067016702670367046705670667076708670967106711671267136714671567166717671867196720672167226723672467256726672767286729673067316732673367346735673667376738673967406741674267436744674567466747674867496750675167526753675467556756675767586759676067616762676367646765676667676768676967706771677267736774677567766777677867796780678167826783678467856786678767886789679067916792679367946795679667976798679968006801680268036804680568066807680868096810681168126813681468156816681768186819682068216822682368246825682668276828682968306831683268336834683568366837683868396840684168426843684468456846684768486849685068516852685368546855685668576858685968606861686268636864686568666867686868696870687168726873687468756876687768786879688068816882688368846885688668876888688968906891689268936894689568966897689868996900690169026903690469056906690769086909691069116912691369146915691669176918691969206921692269236924692569266927692869296930693169326933693469356936693769386939694069416942694369446945694669476948694969506951695269536954695569566957695869596960696169626963696469656966696769686969697069716972697369746975697669776978697969806981698269836984698569866987698869896990699169926993699469956996699769986999700070017002700370047005700670077008700970107011701270137014701570167017701870197020702170227023702470257026702770287029703070317032703370347035703670377038703970407041704270437044704570467047704870497050705170527053705470557056705770587059706070617062706370647065706670677068706970707071707270737074707570767077707870797080708170827083708470857086708770887089
  1. /*##############################################################################
  2. HPCC SYSTEMS software Copyright (C) 2012 HPCC Systems®.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. ############################################################################## */
  13. // New IPv6 Version - IN PROGRESS
  14. /*
  15. TBD IPv6 connect
  16. multicast
  17. look at loopback
  18. */
  19. #include <string>
  20. #include <unordered_set>
  21. #include <functional>
  22. #include "platform.h"
  23. #ifdef _VER_C5
  24. #include <clwclib.h>
  25. #else
  26. #include "platform.h"
  27. #include <stdio.h>
  28. #endif
  29. #include <algorithm>
  30. #ifdef _WIN32
  31. #define WIN32_LEAN_AND_MEAN
  32. #include <windows.h>
  33. #include <winsock2.h>
  34. #include <ws2tcpip.h>
  35. #include <signal.h>
  36. #else
  37. #include <sys/types.h>
  38. #include <sys/socket.h>
  39. #include <netinet/tcp.h>
  40. #include <netinet/in.h>
  41. #include <arpa/inet.h>
  42. #include <stddef.h>
  43. #include <errno.h>
  44. #include <net/if.h>
  45. #include <poll.h>
  46. #endif
  47. #include <limits.h>
  48. #include "jmutex.hpp"
  49. #include "jsocket.hpp"
  50. #include "jexcept.hpp"
  51. #include "jio.hpp"
  52. #include "jmisc.hpp"
  53. #include "jthread.hpp"
  54. #include "jqueue.tpp"
  55. #include "jtime.hpp"
  56. #include "jprop.hpp"
  57. #include "jregexp.hpp"
  58. #include "jdebug.hpp"
  59. // epoll only with linux
  60. #ifndef __linux__
  61. # undef _HAS_EPOLL_SUPPORT
  62. #else
  63. # define _HAS_EPOLL_SUPPORT
  64. # ifdef _HAS_EPOLL_SUPPORT
  65. # include <unistd.h>
  66. # include <sys/epoll.h>
  67. # ifndef EPOLLRDHUP
  68. // Centos 5.x bug - epoll.h does not define but its in the kernel
  69. # define EPOLLRDHUP 0x2000
  70. # endif
  71. # define MAX_RET_EVENTS 5000 // max events returned from epoll_wait() call
  72. static unsigned epoll_hdlPerThrd = UINT_MAX;
  73. # endif
  74. #endif
  75. // various options
  76. #define CONNECT_TIMEOUT_REFUSED_WAIT 1000 // maximum to sleep on connect_timeout
  77. #define TRACE_SLOW_BLOCK_TRANSFER
  78. #define DEFAULT_CONNECT_TIME (100*1000) // for connect_wait
  79. #ifdef _DEBUG
  80. // #define SIMULATE_LOST_UDP_PACKETS
  81. #endif
  82. #ifdef SIMULATE_LOST_UDP_PACKETS
  83. static const int dropThreshold = 10000;
  84. static int dropCounter = 0;
  85. #endif
  86. #ifdef _DEBUG
  87. //#define SOCKTRACE
  88. //#define EPOLLTRACE
  89. #endif
  90. #ifdef _TESTING
  91. #define _TRACE
  92. #endif
  93. #ifdef _TRACE
  94. #define THROWJSOCKEXCEPTION(exc) \
  95. { StringBuffer msg; \
  96. msg.appendf("Target: %s, Raised in: %s, line %d",tracename ,sanitizeSourceFile(__FILE__), __LINE__); \
  97. IJSOCK_Exception *e = new SocketException(exc,msg.str());\
  98. throw e; }
  99. #define THROWJSOCKEXCEPTION2(exc) \
  100. { StringBuffer msg; \
  101. msg.appendf("Raised in: %s, line %d",sanitizeSourceFile(__FILE__), __LINE__); \
  102. IJSOCK_Exception *e = new SocketException(exc,msg.str());\
  103. throw e; }
  104. #define LOGERR(err,ref,info) LogErr(err,ref,info,__LINE__,NULL)
  105. #define LOGERR2(err,ref,info) LogErr(err,ref,info,__LINE__,tracename)
  106. #else
  107. #define THROWJSOCKEXCEPTION(exc) \
  108. { IJSOCK_Exception *e = new SocketException(exc);\
  109. throw e; }
  110. #define THROWJSOCKEXCEPTION2(exc) THROWJSOCKEXCEPTION(exc)
  111. #define LOGERR(err,ref,info)
  112. #define LOGERR2(err,ref,info)
  113. #endif
  114. JSocketStatistics STATS;
  115. static const bool IP4only=false; // slighly faster if we know no IPv6
  116. static bool IP6preferred=false; // e.g. for DNS and socket create
  117. IpSubNet PreferredSubnet(NULL,NULL); // set this if you prefer a particular subnet for debugging etc
  118. // e.g. PreferredSubnet("192.168.16.0", "255.255.255.0")
  119. static RelaxedAtomic<unsigned> pre_conn_unreach_cnt{0}; // global count of pre_connect() JSE_NETUNREACH error
  120. #define IPV6_SERIALIZE_PREFIX (0x00ff00ff)
  121. class jlib_thrown_decl SocketException: public IJSOCK_Exception, public CInterface
  122. {
  123. public:
  124. IMPLEMENT_IINTERFACE;
  125. SocketException(int code,const char *_msg=NULL) : errcode(code)
  126. {
  127. if (_msg)
  128. msg = strdup(_msg);
  129. else
  130. msg = NULL;
  131. };
  132. ~SocketException() { free(msg); }
  133. int errorCode() const { return errcode; }
  134. static StringBuffer & geterrormessage(int err,StringBuffer &str)
  135. {
  136. switch (err) {
  137. case JSOCKERR_ok: return str.append("ok");
  138. case JSOCKERR_not_opened: return str.append("socket not opened");
  139. case JSOCKERR_bad_address: return str.append("bad address");
  140. case JSOCKERR_connection_failed: return str.append("connection failed");
  141. case JSOCKERR_broken_pipe: return str.append("connection is broken");
  142. case JSOCKERR_graceful_close: return str.append("connection closed other end");
  143. case JSOCKERR_invalid_access_mode: return str.append("invalid access mode");
  144. case JSOCKERR_timeout_expired: return str.append("timeout expired");
  145. case JSOCKERR_port_in_use: return str.append("port in use");
  146. case JSOCKERR_cancel_accept: return str.append("cancel accept");
  147. case JSOCKERR_connectionless_socket: return str.append("connectionless socket");
  148. case JSOCKERR_handle_too_large: return str.append("handle too large");
  149. case JSOCKERR_bad_netaddr: return str.append("bad net addr");
  150. case JSOCKERR_ipv6_not_implemented: return str.append("IPv6 not implemented");
  151. // OS errors
  152. #ifdef _WIN32
  153. case WSAEINTR: return str.append("WSAEINTR(10004) - Interrupted system call.");
  154. case WSAEBADF: return str.append("WSAEBADF(10009) - Bad file number.");
  155. case WSAEACCES: return str.append("WSAEACCES(10013) - Permission denied.");
  156. case WSAEFAULT: return str.append("WSAEFAULT(10014) - Bad address.");
  157. case WSAEINVAL: return str.append("WSAEINVAL(10022) - Invalid argument.");
  158. case WSAEMFILE: return str.append("WSAEMFILE(10024) - Too many open files.");
  159. case WSAEWOULDBLOCK: return str.append("WSAEWOULDBLOCK(10035) - Operation would block.");
  160. case WSAEINPROGRESS: return str.append("WSAEINPROGRESS(10036) - Operation now in progress.");
  161. case WSAEALREADY: return str.append("WSAEALREADY(10037) - Operation already in progress.");
  162. case WSAENOTSOCK: return str.append("WSAENOTSOCK(10038) - Socket operation on nonsocket.");
  163. case WSAEDESTADDRREQ: return str.append("WSAEDESTADDRREQ(10039) - Destination address required.");
  164. case WSAEMSGSIZE: return str.append("WSAEMSGSIZE(10040) - Message too long.");
  165. case WSAEPROTOTYPE: return str.append("WSAEPROTOTYPE(10041) - Protocol wrong type for socket.");
  166. case WSAENOPROTOOPT: return str.append("WSAENOPROTOOPT(10042) - Protocol not available.");
  167. case WSAEPROTONOSUPPORT: return str.append("WSAEPROTONOSUPPORT(10043) - Protocol not supported.");
  168. case WSAESOCKTNOSUPPORT: return str.append("WSAESOCKTNOSUPPORT(10044) - Socket type not supported.");
  169. case WSAEOPNOTSUPP: return str.append("WSAEOPNOTSUPP(10045) - Operation not supported on socket.");
  170. case WSAEPFNOSUPPORT: return str.append("WSAEPFNOSUPPORT(10046) - Protocol family not supported.");
  171. case WSAEAFNOSUPPORT: return str.append("WSAEAFNOSUPPORT(10047) - Address family not supported by protocol family.");
  172. case WSAEADDRINUSE: return str.append("WSAEADDRINUSE(10048) - Address already in use.");
  173. case WSAEADDRNOTAVAIL: return str.append("WSAEADDRNOTAVAIL(10049) - Cannot assign requested address.");
  174. case WSAENETDOWN: return str.append("WSAENETDOWN(10050) - Network is down.");
  175. case WSAENETUNREACH: return str.append("WSAENETUNREACH(10051) - Network is unreachable.");
  176. case WSAENETRESET: return str.append("WSAENETRESET(10052) - Network dropped connection on reset.");
  177. case WSAECONNABORTED: return str.append("WSAECONNABORTED(10053) - Software caused connection abort.");
  178. case WSAECONNRESET: return str.append("WSAECONNRESET(10054) - Connection reset by peer.");
  179. case WSAENOBUFS: return str.append("WSAENOBUFS(10055) - No buffer space available.");
  180. case WSAEISCONN: return str.append("WSAEISCONN(10056) - Socket is already connected.");
  181. case WSAENOTCONN: return str.append("WSAENOTCONN(10057) - Socket is not connected.");
  182. case WSAESHUTDOWN: return str.append("WSAESHUTDOWN(10058) - Cannot send after socket shutdown.");
  183. case WSAETOOMANYREFS: return str.append("WSAETOOMANYREFS(10059) - Too many references: cannot splice.");
  184. case WSAETIMEDOUT: return str.append("WSAETIMEDOUT(10060) - Connection timed out.");
  185. case WSAECONNREFUSED: return str.append("WSAECONNREFUSED(10061) - Connection refused.");
  186. case WSAELOOP: return str.append("WSAELOOP(10062) - Too many levels of symbolic links.");
  187. case WSAENAMETOOLONG: return str.append("WSAENAMETOOLONG(10063) - File name too long.");
  188. case WSAEHOSTDOWN: return str.append("WSAEHOSTDOWN(10064) - Host is down.");
  189. case WSAEHOSTUNREACH: return str.append("WSAEHOSTUNREACH(10065) - No route to host.");
  190. case WSASYSNOTREADY: return str.append("WSASYSNOTREADY(10091) - The network subsystem is unusable.");
  191. case WSAVERNOTSUPPORTED: return str.append("WSAVERNOTSUPPORTED(10092) - The Windows Sockets DLL cannot support this application.");
  192. case WSANOTINITIALISED: return str.append("WSANOTINITIALISED(10093) - Winsock not initialized.");
  193. case WSAEDISCON: return str.append("WSAEDISCON(10101) - Disconnect.");
  194. case WSAHOST_NOT_FOUND: return str.append("WSAHOST_NOT_FOUND(11001) - Host not found.");
  195. case WSATRY_AGAIN: return str.append("WSATRY_AGAIN(11002) - Nonauthoritative host not found.");
  196. case WSANO_RECOVERY: return str.append("WSANO_RECOVERY(11003) - Nonrecoverable error.");
  197. case WSANO_DATA: return str.append("WSANO_DATA(11004) - Valid name, no data record of requested type.");
  198. #else
  199. case ENOTSOCK: return str.append("ENOTSOCK - Socket operation on non-socket ");
  200. case EDESTADDRREQ: return str.append("EDESTADDRREQ - Destination address required ");
  201. case EMSGSIZE: return str.append("EMSGSIZE - Message too long ");
  202. case EPROTOTYPE: return str.append("EPROTOTYPE - Protocol wrong type for socket ");
  203. case ENOPROTOOPT: return str.append("ENOPROTOOPT - Protocol not available ");
  204. case EPROTONOSUPPORT: return str.append("EPROTONOSUPPORT - Protocol not supported ");
  205. case ESOCKTNOSUPPORT: return str.append("ESOCKTNOSUPPORT - Socket type not supported ");
  206. case EOPNOTSUPP: return str.append("EOPNOTSUPP - Operation not supported on socket ");
  207. case EPFNOSUPPORT: return str.append("EPFNOSUPPORT - Protocol family not supported ");
  208. case EAFNOSUPPORT: return str.append("EAFNOSUPPORT - Address family not supported by protocol family ");
  209. case EADDRINUSE: return str.append("EADDRINUSE - Address already in use ");
  210. case EADDRNOTAVAIL: return str.append("EADDRNOTAVAIL - Can't assign requested address ");
  211. case ENETDOWN: return str.append("ENETDOWN - Network is down ");
  212. case ENETUNREACH: return str.append("ENETUNREACH - Network is unreachable ");
  213. case ENETRESET: return str.append("ENETRESET - Network dropped connection because of reset ");
  214. case ECONNABORTED: return str.append("ECONNABORTED - Software caused connection abort ");
  215. case ECONNRESET: return str.append("ECONNRESET - Connection reset by peer ");
  216. case ENOBUFS: return str.append("ENOBUFS - No buffer space available ");
  217. case EISCONN: return str.append("EISCONN - Socket is already connected ");
  218. case ENOTCONN: return str.append("ENOTCONN - Socket is not connected ");
  219. case ESHUTDOWN: return str.append("ESHUTDOWN - Can't send after socket shutdown ");
  220. case ETOOMANYREFS: return str.append("ETOOMANYREFS - Too many references: can't splice ");
  221. case ETIMEDOUT: return str.append("ETIMEDOUT - Connection timed out ");
  222. case ECONNREFUSED: return str.append("ECONNREFUSED - Connection refused ");
  223. case EHOSTDOWN: return str.append("EHOSTDOWN - Host is down ");
  224. case EHOSTUNREACH: return str.append("EHOSTUNREACH - No route to host ");
  225. case EWOULDBLOCK: return str.append("EWOULDBLOCK - operation already in progress");
  226. case EINPROGRESS: return str.append("EINPROGRESS - operation now in progress ");
  227. #endif
  228. }
  229. IException *ose = makeOsException(err);
  230. ose->errorMessage(str);
  231. ose->Release();
  232. return str;
  233. }
  234. StringBuffer & errorMessage(StringBuffer &str) const
  235. {
  236. if (msg)
  237. return geterrormessage(errcode,str).append('\n').append(msg);
  238. return geterrormessage(errcode,str);
  239. }
  240. MessageAudience errorAudience() const
  241. {
  242. switch (errcode) {
  243. case JSOCKERR_port_in_use:
  244. return MSGAUD_operator;
  245. }
  246. return MSGAUD_user;
  247. }
  248. private:
  249. int errcode;
  250. char *msg;
  251. };
  252. IJSOCK_Exception *IPv6NotImplementedException(const char *filename,unsigned lineno)
  253. {
  254. StringBuffer msg;
  255. msg.appendf("%s(%d)",filename,lineno);
  256. return new SocketException(JSOCKERR_ipv6_not_implemented,msg.str());
  257. }
  258. struct MCASTREQ
  259. {
  260. struct in_addr imr_multiaddr; /* multicast group to join */
  261. struct in_addr imr_interface; /* interface to join on */
  262. MCASTREQ(const char *mcip)
  263. {
  264. imr_multiaddr.s_addr = inet_addr(mcip);
  265. imr_interface.s_addr = htonl(INADDR_ANY);
  266. }
  267. };
  268. #ifdef __APPLE__
  269. #ifndef MSG_NOSIGNAL
  270. #define MSG_NOSIGNAL 0x4000
  271. #endif
  272. #endif
  273. #if defined( _WIN32)
  274. #define T_SOCKET SOCKET
  275. #define T_FD_SET fd_set
  276. #define XFD_SETSIZE FD_SETSIZE
  277. //Following are defined in more modern headers
  278. #define XFD_ZERO(s) FD_ZERO(s)
  279. #define SEND_FLAGS 0
  280. #define CHECKSOCKRANGE(s)
  281. #define _USE_SELECT // Windows bug 309411 - WSAPoll does not report failed connections - wont fix
  282. // #define poll(a, b, c) WSAPoll((a), (b), (c))
  283. #elif defined(__FreeBSD__) || defined(__APPLE__)
  284. #define XFD_SETSIZE FD_SETSIZE
  285. #define T_FD_SET fd_set
  286. #define XFD_ZERO(s) FD_ZERO(s)
  287. #define T_SOCKET int
  288. #define SEND_FLAGS (MSG_NOSIGNAL)
  289. #define CHECKSOCKRANGE(s)
  290. #else
  291. #define XFD_SETSIZE 32768
  292. struct xfd_set { __fd_mask fds_bits[XFD_SETSIZE / __NFDBITS]; }; // define our own
  293. // linux 64 bit
  294. #ifdef __linux__
  295. #ifdef __64BIT__
  296. #undef __FDMASK
  297. #define __FDMASK(d) (1UL << ((d) % __NFDBITS))
  298. #undef __FDELT
  299. #define __FDELT(d) ((d) / __NFDBITS)
  300. #undef __FD_SET
  301. #define __FD_SET(d, s) (__FDS_BITS (s)[__FDELT(d)] |= __FDMASK(d))
  302. #undef __FD_ISSET
  303. #define __FD_ISSET(d, s) ((__FDS_BITS (s)[__FDELT(d)] & __FDMASK(d)) != 0)
  304. #endif
  305. #define CHECKSOCKRANGE(s) { if (s>=XFD_SETSIZE) THROWJSOCKEXCEPTION2(JSOCKERR_handle_too_large); }
  306. #endif
  307. // end 64 bit
  308. #define T_FD_SET xfd_set
  309. #define XFD_ZERO(s) memset(s,0,sizeof(xfd_set))
  310. #define T_SOCKET int
  311. #define SEND_FLAGS (MSG_NOSIGNAL)
  312. #endif
  313. #ifdef CENTRAL_NODE_RANDOM_DELAY
  314. static SocketEndpointArray CentralNodeArray;
  315. #endif
  316. enum SOCKETMODE { sm_tcp_server, sm_tcp, sm_udp_server, sm_udp, sm_multicast_server, sm_multicast};
  317. #define BADSOCKERR(err) ((err==JSE_BADF)||(err==JSE_NOTSOCK))
  318. #ifdef POLLRDHUP
  319. # define POLLINX (POLLIN | POLLRDHUP)
  320. #else
  321. # define POLLINX POLLIN
  322. #endif
  323. #ifdef _HAS_EPOLL_SUPPORT
  324. # ifdef EPOLLRDHUP
  325. # define EPOLLINX (EPOLLIN | EPOLLRDHUP)
  326. # else
  327. # define EPOLLINX EPOLLIN
  328. # endif
  329. #endif
  330. class CSocket: public ISocket, public CInterface
  331. {
  332. public:
  333. IMPLEMENT_IINTERFACE;
  334. static CriticalSection crit;
  335. protected:
  336. friend class CSocketConnectWait;
  337. enum { ss_open, ss_shutdown, ss_close, ss_pre_open } state;
  338. T_SOCKET sock;
  339. char* hostname; // host address
  340. unsigned short hostport; // host port
  341. unsigned short localPort;
  342. SOCKETMODE sockmode;
  343. IpAddress targetip;
  344. SocketEndpoint returnep; // set by set_return_addr
  345. MCASTREQ * mcastreq;
  346. size32_t nextblocksize;
  347. unsigned blockflags;
  348. unsigned blocktimeoutms;
  349. bool owned;
  350. enum {accept_not_cancelled, accept_cancel_pending, accept_cancelled} accept_cancel_state;
  351. bool in_accept;
  352. bool nonblocking;
  353. bool nagling;
  354. static unsigned connectingcount;
  355. #ifdef USERECVSEM
  356. static Semaphore receiveblocksem;
  357. bool receiveblocksemowned; // owned by this socket
  358. #endif
  359. #ifdef _TRACE
  360. char * tracename;
  361. #endif
  362. public:
  363. void open(int listen_queue_size,bool reuseports=false);
  364. bool connect_timeout( unsigned timeout, bool noexception);
  365. void connect_wait( unsigned timems);
  366. void udpconnect();
  367. void read(void* buf, size32_t min_size, size32_t max_size, size32_t &size_read,unsigned timeoutsecs);
  368. void readtms(void* buf, size32_t min_size, size32_t max_size, size32_t &size_read, unsigned timedelaysecs);
  369. void read(void* buf, size32_t size);
  370. size32_t write(void const* buf, size32_t size);
  371. size32_t writetms(void const* buf, size32_t size, unsigned timeoutms=WAIT_FOREVER);
  372. size32_t write_multiple(unsigned num,void const**buf, size32_t *size);
  373. size32_t udp_write_to(const SocketEndpoint &ep,void const* buf, size32_t size);
  374. void close();
  375. void errclose();
  376. bool connectionless() { return (sockmode!=sm_tcp)&&(sockmode!=sm_tcp_server); }
  377. void shutdown(unsigned mode=SHUTDOWN_READWRITE);
  378. ISocket* accept(bool allowcancel, SocketEndpoint *peerEp=nullptr);
  379. int wait_read(unsigned timeout);
  380. int logPollError(unsigned revents, const char *rwstr);
  381. int wait_write(unsigned timeout);
  382. int name(char *name,size32_t namemax);
  383. int peer_name(char *name,size32_t namemax);
  384. SocketEndpoint &getPeerEndpoint(SocketEndpoint &ep);
  385. IpAddress & getPeerAddress(IpAddress &addr);
  386. SocketEndpoint &getEndpoint(SocketEndpoint &ep) const override;
  387. void set_return_addr(int port,const char *name); // sets returnep
  388. void cancel_accept();
  389. size32_t get_max_send_size();
  390. bool set_nonblock(bool on=true);
  391. bool set_nagle(bool on);
  392. void set_linger(int lingersecs);
  393. void set_keep_alive(bool set);
  394. void logConnectionInfo(unsigned timeoutms, unsigned conn_mstime);
  395. virtual void set_inherit(bool inherit=false);
  396. virtual bool check_connection();
  397. virtual bool isSecure() const override;
  398. // Block functions
  399. void set_block_mode(unsigned flags,size32_t recsize=0,unsigned timeoutms=0);
  400. bool send_block(const void *blk,size32_t sz);
  401. size32_t receive_block_size();
  402. size32_t receive_block(void *blk,size32_t sz);
  403. size32_t get_send_buffer_size();
  404. void set_send_buffer_size(size32_t sz);
  405. bool join_multicast_group(SocketEndpoint &ep); // for udp multicast
  406. bool leave_multicast_group(SocketEndpoint &ep); // for udp multicast
  407. void set_ttl(unsigned _ttl);
  408. size32_t get_receive_buffer_size();
  409. void set_receive_buffer_size(size32_t sz);
  410. size32_t avail_read();
  411. int pre_connect(bool block);
  412. int post_connect();
  413. void setTraceName(const char * prefix, const char * name);
  414. void setTraceName();
  415. CSocket(const SocketEndpoint &_ep,SOCKETMODE smode,const char *name);
  416. CSocket(T_SOCKET new_sock,SOCKETMODE smode,bool _owned);
  417. virtual ~CSocket();
  418. virtual unsigned OShandle() const
  419. {
  420. return (unsigned)sock;
  421. }
  422. virtual bool isValid() const
  423. {
  424. return sock != INVALID_SOCKET;
  425. }
  426. private:
  427. int closesock()
  428. {
  429. if (sock!=INVALID_SOCKET) {
  430. T_SOCKET s = sock;
  431. sock = INVALID_SOCKET;
  432. STATS.activesockets--;
  433. #ifdef SOCKTRACE
  434. PROGLOG("SOCKTRACE: Closing socket %x %d (%p)", s, s, this);
  435. #endif
  436. #ifdef _WIN32
  437. return ::closesocket(s);
  438. #else
  439. ::shutdown(s, SHUT_WR);
  440. return ::close(s);
  441. #endif
  442. }
  443. else
  444. return 0;
  445. }
  446. };
  447. CriticalSection CSocket::crit;
  448. unsigned CSocket::connectingcount=0;
  449. #ifdef USERECVSEM
  450. Semaphore CSocket::receiveblocksem(2);
  451. #endif
  452. #ifdef _WIN32
  453. class win_socket_library
  454. {
  455. static bool initdone; // to prevent dependancy probs very early on (e.g. jlog)
  456. public:
  457. win_socket_library() { init(); }
  458. bool init()
  459. {
  460. if (initdone)
  461. return true;
  462. WSADATA wsa;
  463. if (WSAStartup(MAKEWORD(2, 2), &wsa) != 0) {
  464. if (WSAStartup(MAKEWORD(1, 1), &wsa) != 0) {
  465. MessageBox(NULL,"Failed to initialize windows sockets","JLib Socket Error",MB_OK);
  466. return false;
  467. }
  468. }
  469. initdone = true;
  470. return true;
  471. }
  472. ~win_socket_library()
  473. {
  474. WSACleanup();
  475. }
  476. };
  477. bool win_socket_library::initdone = false;
  478. static win_socket_library ws32_lib;
  479. #define ERRNO() WSAGetLastError()
  480. #define JSE_ADDRINUSE WSAEADDRINUSE
  481. #define JSE_CONNRESET WSAECONNRESET
  482. #define JSE_CONNABORTED WSAECONNABORTED
  483. #define JSE_NOTCONN WSAENOTCONN
  484. #define JSE_WOULDBLOCK WSAEWOULDBLOCK
  485. #define JSE_INPROGRESS WSAEINPROGRESS
  486. #define JSE_NETUNREACH WSAENETUNREACH
  487. #define JSE_NOTSOCK WSAENOTSOCK
  488. #define JSE_TIMEDOUT WSAETIMEDOUT
  489. #define JSE_CONNREFUSED WSAECONNREFUSED
  490. #define JSE_BADF WSAEBADF
  491. #define JSE_INTR WSAEINTR
  492. struct j_sockaddr_in6 {
  493. short sin6_family; /* AF_INET6 */
  494. u_short sin6_port; /* Transport level port number */
  495. u_long sin6_flowinfo; /* IPv6 flow information */
  496. struct in_addr6 sin6_addr; /* IPv6 address */
  497. u_long sin6_scope_id; /* set of interfaces for a scope */
  498. };
  499. typedef union {
  500. struct sockaddr sa;
  501. struct j_sockaddr_in6 sin6;
  502. struct sockaddr_in sin;
  503. } J_SOCKADDR;
  504. #define DEFINE_SOCKADDR(name) J_SOCKADDR name; memset(&name,0,sizeof(J_SOCKADDR))
  505. static int _inet_pton(int af, const char* src, void* dst)
  506. {
  507. DEFINE_SOCKADDR(u);
  508. int address_length;
  509. switch (af) {
  510. case AF_INET:
  511. u.sin.sin_family = AF_INET;
  512. address_length = sizeof (u.sin);
  513. break;
  514. case AF_INET6:
  515. u.sin6.sin6_family = AF_INET6;
  516. address_length = sizeof (u.sin6);
  517. break;
  518. default:
  519. #ifdef EAFNOSUPPORT
  520. errno = EAFNOSUPPORT;
  521. #else
  522. errno = 52;
  523. #endif
  524. return -1;
  525. }
  526. ws32_lib.init();
  527. int ret = WSAStringToAddress ((LPTSTR) src, af, NULL, &u.sa, &address_length);
  528. if (ret == 0) {
  529. switch (af) {
  530. case AF_INET:
  531. memcpy (dst, &u.sin.sin_addr, sizeof (struct in_addr));
  532. break;
  533. case AF_INET6:
  534. memcpy (dst, &u.sin6.sin6_addr, sizeof (u.sin6.sin6_addr));
  535. break;
  536. }
  537. return 1;
  538. }
  539. errno = WSAGetLastError();
  540. // PROGLOG("errno = %d",errno);
  541. return 0;
  542. }
  543. static const char * _inet_ntop (int af, const void *src, char *dst, socklen_t cnt)
  544. {
  545. /* struct sockaddr can't accomodate struct sockaddr_in6. */
  546. DEFINE_SOCKADDR(u);
  547. DWORD dstlen = cnt;
  548. size_t srcsize;
  549. memset(&u,0,sizeof(u));
  550. switch (af) {
  551. case AF_INET:
  552. u.sin.sin_family = AF_INET;
  553. u.sin.sin_addr = *(struct in_addr *) src;
  554. srcsize = sizeof (u.sin);
  555. break;
  556. case AF_INET6:
  557. u.sin6.sin6_family = AF_INET6;
  558. memcpy(&u.sin6.sin6_addr,src,sizeof(in_addr6));
  559. srcsize = sizeof (u.sin6);
  560. break;
  561. default:
  562. return NULL;
  563. }
  564. ws32_lib.init();
  565. if (WSAAddressToString (&u.sa, srcsize, NULL, dst, &dstlen) != 0) {
  566. errno = WSAGetLastError();
  567. return NULL;
  568. }
  569. return (const char *) dst;
  570. }
  571. int inet_aton (const char *name, struct in_addr *addr)
  572. {
  573. addr->s_addr = inet_addr (name);
  574. return (addr->s_addr == (u_long)-1)?1:0; // 255.255.255.255 has had it here
  575. }
  576. #else
  577. #define JSE_ADDRINUSE EADDRINUSE
  578. #define JSE_CONNRESET ECONNRESET
  579. #define JSE_CONNABORTED ECONNABORTED
  580. #define JSE_NOTCONN ENOTCONN
  581. #define JSE_WOULDBLOCK EWOULDBLOCK
  582. #define JSE_INPROGRESS EINPROGRESS
  583. #define JSE_NETUNREACH ENETUNREACH
  584. #define JSE_NOTSOCK ENOTSOCK
  585. #define JSE_TIMEDOUT ETIMEDOUT
  586. #define JSE_CONNREFUSED ECONNREFUSED
  587. #define JSE_BADF EBADF
  588. #define _inet_ntop inet_ntop
  589. #define _inet_pton inet_pton
  590. #define in_addr6 in6_addr
  591. typedef union {
  592. struct sockaddr sa;
  593. struct sockaddr_in6 sin6;
  594. struct sockaddr_in sin;
  595. } J_SOCKADDR;
  596. #define DEFINE_SOCKADDR(name) J_SOCKADDR name; memset(&name,0,sizeof(J_SOCKADDR))
  597. #define JSE_INTR EINTR
  598. #define ERRNO() (errno)
  599. #ifndef INADDR_NONE
  600. #define INADDR_NONE (-1)
  601. #endif
  602. #endif
  603. #ifndef INET6_ADDRSTRLEN
  604. #define INET6_ADDRSTRLEN 65
  605. #endif
  606. extern jlib_decl void throwJSocketException(int jsockErr)
  607. {
  608. THROWJSOCKEXCEPTION2(jsockErr);
  609. }
  610. extern jlib_decl IJSOCK_Exception* createJSocketException(int jsockErr, const char *_msg)
  611. {
  612. return new SocketException(jsockErr, _msg);
  613. }
  614. inline void LogErr(unsigned err,unsigned ref,const char *info,unsigned lineno,const char *tracename)
  615. {
  616. if (err)
  617. {
  618. PROGLOG("jsocket(%d,%d)%s%s err = %d%s%s",ref,lineno,
  619. (info&&*info)?" ":"",(info&&*info)?info:"",err,
  620. (tracename&&*tracename)?" : ":"",(tracename&&*tracename)?tracename:"");
  621. #ifdef _TRACELINKCLOSED
  622. if ((JSE_NOTCONN == err) || (JSE_CONNRESET == err) || (JSE_CONNABORTED == err))
  623. {
  624. PROGLOG("Socket not connected, stack:");
  625. PrintStackReport();
  626. }
  627. #endif
  628. }
  629. }
  630. inline socklen_t setSockAddr(J_SOCKADDR &u, const IpAddress &ip,unsigned short port)
  631. {
  632. if (!IP6preferred) {
  633. if (ip.getNetAddress(sizeof(in_addr),&u.sin.sin_addr)==sizeof(in_addr)) {
  634. u.sin.sin_family = AF_INET;
  635. u.sin.sin_port = htons(port);
  636. return sizeof(u.sin);
  637. }
  638. }
  639. if (IP4only)
  640. IPV6_NOT_IMPLEMENTED();
  641. ip.getNetAddress(sizeof(in_addr6),&u.sin6.sin6_addr);
  642. u.sin6.sin6_family = AF_INET6;
  643. u.sin6.sin6_port = htons(port);
  644. return sizeof(u.sin6);
  645. }
  646. inline socklen_t setSockAddrAny(J_SOCKADDR &u, unsigned short port)
  647. {
  648. if (IP6preferred) {
  649. #ifdef _WIN32
  650. IN6ADDR_SETANY((PSOCKADDR_IN6)&u.sin6.sin6_addr);
  651. #else
  652. memcpy(&u.sin6.sin6_addr,&in6addr_any,sizeof(in_addr6));
  653. #endif
  654. u.sin6.sin6_family= AF_INET6;
  655. u.sin6.sin6_port = htons(port);
  656. return sizeof(u.sin6);
  657. }
  658. u.sin.sin_addr.s_addr = htonl(INADDR_ANY);
  659. u.sin.sin_family= AF_INET;
  660. u.sin.sin_port = htons(port);
  661. return sizeof(u.sin);
  662. }
  663. inline void getSockAddrEndpoint(const J_SOCKADDR &u, socklen_t ul, SocketEndpoint &ep)
  664. {
  665. if (ul==sizeof(u.sin)) {
  666. ep.setNetAddress(sizeof(in_addr),&u.sin.sin_addr);
  667. ep.port = htons(u.sin.sin_port);
  668. }
  669. else {
  670. ep.setNetAddress(sizeof(in_addr6),&u.sin6.sin6_addr);
  671. ep.port = htons(u.sin6.sin6_port);
  672. }
  673. }
  674. /* might need fcntl(F_SETFL), or ioctl(FIONBIO) */
  675. /* Posix.1g says fcntl */
  676. #if defined(O_NONBLOCK)
  677. bool CSocket::set_nonblock(bool on)
  678. {
  679. int flags = fcntl(sock, F_GETFL, 0);
  680. if (flags == -1)
  681. return nonblocking;
  682. if (on)
  683. flags |= O_NONBLOCK;
  684. else
  685. flags &= ~O_NONBLOCK;
  686. if (fcntl(sock, F_SETFL, flags)==0) {
  687. bool wasNonBlocking = nonblocking;
  688. nonblocking = on;
  689. return wasNonBlocking;
  690. }
  691. return nonblocking;
  692. }
  693. #else
  694. bool CSocket::set_nonblock(bool on)
  695. {
  696. #ifdef _WIN32
  697. u_long yes = on?1:0;
  698. if (ioctlsocket(sock, FIONBIO, &yes)==0) {
  699. #else
  700. int yes = on?1:0;
  701. if (ioctl(sock, FIONBIO, &yes)==0) {
  702. #endif
  703. bool wasNonBlocking = nonblocking;
  704. nonblocking = on;
  705. return wasNonBlocking;
  706. }
  707. return nonblocking;
  708. }
  709. #endif
  710. bool CSocket::set_nagle(bool on)
  711. {
  712. bool ret = nagling;
  713. nagling = on;
  714. int enabled = !on;
  715. if (setsockopt(sock, IPPROTO_TCP, TCP_NODELAY, (char*)&enabled, sizeof(enabled)) != 0) {
  716. nagling = !on;
  717. }
  718. return ret;
  719. }
  720. void CSocket::set_inherit(bool inherit)
  721. {
  722. #ifndef _WIN32
  723. long flag = fcntl(sock, F_GETFD);
  724. if(inherit)
  725. flag &= ~FD_CLOEXEC;
  726. else
  727. flag |= FD_CLOEXEC;
  728. fcntl(sock, F_SETFD, flag);
  729. #endif
  730. }
  731. size32_t CSocket::avail_read()
  732. {
  733. #ifdef _WIN32
  734. u_long avail;
  735. if (ioctlsocket(sock, FIONREAD, &avail)==0)
  736. #else
  737. int avail;
  738. if (ioctl(sock, FIONREAD, &avail)==0)
  739. #endif
  740. return (size32_t)avail;
  741. int err = ERRNO();
  742. LOGERR2(err,1,"avail_read");
  743. return 0;
  744. }
  745. #define PRE_CONN_UNREACH_ELIM 100
  746. int CSocket::pre_connect (bool block)
  747. {
  748. if (NULL == hostname || '\0' == (*hostname))
  749. {
  750. StringBuffer err;
  751. err.appendf("CSocket::pre_connect - Invalid/missing host IP address raised in : %s, line %d",sanitizeSourceFile(__FILE__), __LINE__);
  752. IJSOCK_Exception *e = new SocketException(JSOCKERR_bad_netaddr,err.str());
  753. throw e;
  754. }
  755. DEFINE_SOCKADDR(u);
  756. if (targetip.isNull()) {
  757. set_return_addr(hostport,hostname);
  758. targetip.ipset(returnep);
  759. }
  760. socklen_t ul = setSockAddr(u,targetip,hostport);
  761. sock = ::socket(u.sa.sa_family, SOCK_STREAM, targetip.isIp4()?0:PF_INET6);
  762. owned = true;
  763. state = ss_pre_open; // will be set to open by post_connect
  764. if (sock == INVALID_SOCKET) {
  765. int err = ERRNO();
  766. THROWJSOCKEXCEPTION(err);
  767. }
  768. STATS.activesockets++;
  769. int err = 0;
  770. set_nonblock(!block);
  771. int rc = ::connect(sock, &u.sa, ul);
  772. if (rc==SOCKET_ERROR) {
  773. err = ERRNO();
  774. if ((err != JSE_INPROGRESS)&&(err != JSE_WOULDBLOCK)&&(err != JSE_TIMEDOUT)&&(err!=JSE_CONNREFUSED)) { // handled by caller
  775. if (err != JSE_NETUNREACH) {
  776. pre_conn_unreach_cnt.store(0);
  777. LOGERR2(err,1,"pre_connect");
  778. } else {
  779. int ecnt = pre_conn_unreach_cnt.load();
  780. if (ecnt <= PRE_CONN_UNREACH_ELIM) {
  781. pre_conn_unreach_cnt.fetch_add(1);
  782. LOGERR2(err,1,"pre_connect network unreachable");
  783. }
  784. }
  785. } else
  786. pre_conn_unreach_cnt.store(0);
  787. } else
  788. pre_conn_unreach_cnt.store(0);
  789. #ifdef SOCKTRACE
  790. PROGLOG("SOCKTRACE: pre-connected socket%s %x %d (%p) err=%d", block?"(block)":"", sock, sock, this, err);
  791. #endif
  792. return err;
  793. }
  794. int CSocket::post_connect ()
  795. {
  796. set_nonblock(false);
  797. int err = 0;
  798. socklen_t errlen = sizeof(err);
  799. int rc = getsockopt(sock, SOL_SOCKET, SO_ERROR, (char *)&err, &errlen); // check for error
  800. if ((rc!=0)&&!err)
  801. err = ERRNO(); // some implementations of getsockopt duff
  802. if (err==0) {
  803. nagling = true;
  804. set_nagle(false);
  805. state = ss_open;
  806. SocketEndpoint ep;
  807. localPort = getEndpoint(ep).port;
  808. }
  809. else if ((err!=JSE_TIMEDOUT)&&(err!=JSE_CONNREFUSED)) // handled by caller
  810. LOGERR2(err,1,"post_connect");
  811. return err;
  812. }
  813. void CSocket::open(int listen_queue_size,bool reuseports)
  814. {
  815. // If listen_queue_size==0 then bind port to address but
  816. // do not actually listen() for accepting connections.
  817. // This is used when a unique IP:port is needed for MP client
  818. // INode/IGroup internals, but client never actually accepts connections.
  819. if (IP6preferred)
  820. sock = ::socket(AF_INET6, connectionless()?SOCK_DGRAM:SOCK_STREAM, PF_INET6);
  821. else
  822. sock = ::socket(AF_INET, connectionless()?SOCK_DGRAM:SOCK_STREAM, 0);
  823. if (sock == INVALID_SOCKET) {
  824. THROWJSOCKEXCEPTION(ERRNO());
  825. }
  826. STATS.activesockets++;
  827. #ifdef SOCKTRACE
  828. PROGLOG("SOCKTRACE: opened socket %x %d (%p)", sock,sock,this);
  829. #endif
  830. if ((hostport==0)&&(sockmode==sm_udp)) {
  831. state = ss_open;
  832. #ifdef SOCKTRACE
  833. PROGLOG("SOCKTRACE: opened socket return udp");
  834. #endif
  835. set_inherit(false);
  836. return;
  837. }
  838. #ifndef _WIN32
  839. reuseports = true; // for some reason linux requires reuse ports
  840. #endif
  841. if (reuseports && listen_queue_size) {
  842. int on = 1;
  843. setsockopt( sock, SOL_SOCKET, SO_REUSEADDR, (char *)&on, sizeof(on));
  844. }
  845. DEFINE_SOCKADDR(u);
  846. socklen_t ul;
  847. if (hostname) {
  848. if (targetip.isNull()) {
  849. set_return_addr(hostport,hostname);
  850. targetip.ipset(returnep);
  851. }
  852. ul = setSockAddr(u,targetip,hostport);
  853. }
  854. else
  855. ul = setSockAddrAny(u,hostport);
  856. int saverr;
  857. if (::bind(sock, &u.sa, ul) != 0) {
  858. saverr = ERRNO();
  859. if (saverr==JSE_ADDRINUSE) { // don't log as error (some usages probe ports)
  860. ErrPortInUse:
  861. closesock();
  862. char msg[1024];
  863. sprintf(msg,"Target: %s, port = %d, Raised in: %s, line %d",tracename,(int)hostport,sanitizeSourceFile(__FILE__), __LINE__);
  864. IJSOCK_Exception *e = new SocketException(JSOCKERR_port_in_use,msg);
  865. throw e;
  866. }
  867. else {
  868. closesock();
  869. THROWJSOCKEXCEPTION(saverr);
  870. }
  871. }
  872. if (!connectionless() && listen_queue_size) {
  873. if (::listen(sock, listen_queue_size) != 0) {
  874. saverr = ERRNO();
  875. if (saverr==JSE_ADDRINUSE)
  876. goto ErrPortInUse;
  877. closesock();
  878. THROWJSOCKEXCEPTION(saverr);
  879. }
  880. }
  881. if (mcastreq) {
  882. if (setsockopt(sock, IPPROTO_IP, IP_ADD_MEMBERSHIP,(char*)mcastreq, sizeof(*mcastreq))!=0) {
  883. saverr = ERRNO();
  884. closesock();
  885. THROWJSOCKEXCEPTION(saverr);
  886. }
  887. }
  888. state = ss_open;
  889. #ifdef SOCKTRACE
  890. PROGLOG("SOCKTRACE: opened socket return");
  891. #endif
  892. set_inherit(false);
  893. }
  894. ISocket* CSocket::accept(bool allowcancel, SocketEndpoint *peerEp)
  895. {
  896. if ((accept_cancel_state!=accept_not_cancelled) && allowcancel) {
  897. accept_cancel_state=accept_cancelled;
  898. return NULL;
  899. }
  900. if (state != ss_open) {
  901. IERRLOG("invalid accept, state = %d",(int)state);
  902. THROWJSOCKEXCEPTION(JSOCKERR_not_opened);
  903. }
  904. if (connectionless()) {
  905. THROWJSOCKEXCEPTION(JSOCKERR_connectionless_socket);
  906. }
  907. DEFINE_SOCKADDR(peerSockAddr); // used if peerIp
  908. socklen_t peerSockAddrLen = sizeof(peerSockAddr);
  909. T_SOCKET newsock;
  910. for (;;) {
  911. in_accept = true;
  912. newsock = (sock!=INVALID_SOCKET)?::accept(sock, &peerSockAddr.sa, &peerSockAddrLen):INVALID_SOCKET;
  913. in_accept = false;
  914. #ifdef SOCKTRACE
  915. PROGLOG("SOCKTRACE: accept created socket %x %d (%p)", newsock,newsock,this);
  916. #endif
  917. if (newsock!=INVALID_SOCKET) {
  918. if ((sock==INVALID_SOCKET)||(accept_cancel_state==accept_cancel_pending)) {
  919. ::close(newsock);
  920. newsock=INVALID_SOCKET;
  921. }
  922. else {
  923. accept_cancel_state = accept_not_cancelled;
  924. break;
  925. }
  926. }
  927. int saverr;
  928. saverr = ERRNO();
  929. if ((sock==INVALID_SOCKET)||(accept_cancel_state==accept_cancel_pending)) {
  930. accept_cancel_state = accept_cancelled;
  931. if (allowcancel)
  932. return NULL;
  933. THROWJSOCKEXCEPTION(JSOCKERR_cancel_accept);
  934. }
  935. if (saverr != JSE_INTR) {
  936. accept_cancel_state = accept_not_cancelled;
  937. THROWJSOCKEXCEPTION(saverr);
  938. }
  939. }
  940. if (state != ss_open) {
  941. accept_cancel_state = accept_cancelled;
  942. if (allowcancel)
  943. return NULL;
  944. THROWJSOCKEXCEPTION(JSOCKERR_cancel_accept);
  945. }
  946. if (peerEp)
  947. getSockAddrEndpoint(peerSockAddr, peerSockAddrLen, *peerEp);
  948. CSocket *ret = new CSocket(newsock,sm_tcp,true);
  949. ret->set_inherit(false);
  950. return ret;
  951. }
  952. void CSocket::set_linger(int lingertime)
  953. {
  954. struct linger l;
  955. l.l_onoff = (lingertime>=0)?1:0;
  956. l.l_linger = (lingertime>=0)?lingertime:0;
  957. if (setsockopt(sock, SOL_SOCKET, SO_LINGER, (char*)&l, sizeof(l)) != 0) {
  958. IWARNLOG("Linger not set");
  959. }
  960. }
  961. void CSocket::set_keep_alive(bool set)
  962. {
  963. int on=set?1:0;
  964. if (setsockopt(sock, SOL_SOCKET, SO_KEEPALIVE, (char*)&on, sizeof(on)) != 0) {
  965. IWARNLOG("KeepAlive not set");
  966. }
  967. }
  968. int CSocket::name(char *retname,size32_t namemax)
  969. {
  970. SocketEndpoint ep;
  971. getEndpoint(ep);
  972. if (retname && namemax)
  973. {
  974. StringBuffer s;
  975. ep.getIpText(s);
  976. if (namemax-1<s.length())
  977. s.setLength(namemax-1);
  978. memcpy(retname,s.str(),s.length()+1);
  979. }
  980. return ep.port;
  981. }
  982. int CSocket::peer_name(char *retname,size32_t namemax)
  983. {
  984. // should not raise exceptions
  985. int ret = 0;
  986. if (!retname)
  987. namemax = 0;
  988. if (namemax)
  989. retname[0] = 0;
  990. if (state != ss_open) {
  991. return -1; // don't log as used to test socket
  992. }
  993. StringBuffer s;
  994. if (sockmode==sm_udp_server) { // udp server
  995. returnep.getIpText(s);
  996. ret = returnep.port;
  997. }
  998. else {
  999. DEFINE_SOCKADDR(u);
  1000. socklen_t ul = sizeof(u);
  1001. if (::getpeername(sock,&u.sa, &ul)<0)
  1002. return -1; // don't log as used to test socket
  1003. SocketEndpoint ep;
  1004. getSockAddrEndpoint(u,ul,ep);
  1005. ep.getIpText(s);
  1006. ret = ep.port;
  1007. }
  1008. if (namemax>1) {
  1009. if (namemax-1<s.length())
  1010. s.setLength(namemax-1);
  1011. memcpy(retname,s.str(),s.length()+1);
  1012. }
  1013. return ret;
  1014. }
  1015. SocketEndpoint &CSocket::getPeerEndpoint(SocketEndpoint &ep)
  1016. {
  1017. if (state != ss_open) {
  1018. THROWJSOCKEXCEPTION(JSOCKERR_not_opened);
  1019. }
  1020. StringBuffer s;
  1021. if (sockmode==sm_udp_server) { // udp server
  1022. ep.set(returnep);
  1023. }
  1024. else {
  1025. DEFINE_SOCKADDR(u);
  1026. socklen_t ul = sizeof(u);
  1027. if (::getpeername(sock,&u.sa, &ul)<0) {
  1028. DBGLOG("getpeername failed %d",ERRNO());
  1029. ep.set(NULL, 0);
  1030. }
  1031. else
  1032. getSockAddrEndpoint(u,ul,ep);
  1033. }
  1034. return ep;
  1035. }
  1036. IpAddress & CSocket::getPeerAddress(IpAddress &addr)
  1037. {
  1038. SocketEndpoint ep;
  1039. getPeerEndpoint(ep);
  1040. addr = ep;
  1041. return addr;
  1042. }
  1043. void CSocket::set_return_addr(int port,const char *retname)
  1044. {
  1045. if (!returnep.ipset(retname)) {
  1046. IJSOCK_Exception *e = new SocketException(JSOCKERR_bad_address); // don't use THROWJSOCKEXCEPTION here
  1047. throw e;
  1048. }
  1049. returnep.port = port;
  1050. }
  1051. SocketEndpoint &CSocket::getEndpoint(SocketEndpoint &ep) const
  1052. {
  1053. if (state != ss_open) {
  1054. THROWJSOCKEXCEPTION(JSOCKERR_not_opened);
  1055. }
  1056. DEFINE_SOCKADDR(u);
  1057. socklen_t ul = sizeof(u);
  1058. if (::getsockname(sock,&u.sa, &ul)<0) {
  1059. THROWJSOCKEXCEPTION(ERRNO());
  1060. }
  1061. getSockAddrEndpoint(u,ul,ep);
  1062. return ep;
  1063. }
  1064. void CSocket::cancel_accept()
  1065. {
  1066. if (connectionless()) {
  1067. THROWJSOCKEXCEPTION(JSOCKERR_connectionless_socket);
  1068. }
  1069. #ifdef SOCKTRACE
  1070. PROGLOG("SOCKTRACE: Cancel accept socket %x %d (%p)", sock, sock, this);
  1071. #endif
  1072. if (!in_accept) {
  1073. accept_cancel_state = accept_cancelled;
  1074. shutdown();
  1075. errclose();
  1076. return;
  1077. }
  1078. accept_cancel_state = accept_cancel_pending;
  1079. errclose(); // this should cause accept to terminate (not supported on all linux though)
  1080. #ifdef _WIN32
  1081. for (unsigned i=0;i<5;i++) { // windows closes on different lower priority thread
  1082. Sleep(i);
  1083. if (accept_cancel_state==accept_cancelled)
  1084. return;
  1085. }
  1086. #else
  1087. Sleep(0);
  1088. if (accept_cancel_state==accept_cancelled)
  1089. return;
  1090. #endif
  1091. // Wakeup listener using a connection
  1092. SocketEndpoint ep(hostport,queryHostIP());
  1093. Owned<CSocket> sock = new CSocket(ep,sm_tcp,NULL);
  1094. try {
  1095. sock->connect_timeout(100,true);
  1096. }
  1097. catch (IJSOCK_Exception *e) {
  1098. EXCLOG(e,"CSocket::cancel_eccept");
  1099. e->Release();
  1100. }
  1101. }
  1102. // ================================================================================
  1103. // connect versions
  1104. ISocket* ISocket::connect( const SocketEndpoint &ep )
  1105. {
  1106. // general connect
  1107. return ISocket::connect_wait(ep,DEFAULT_CONNECT_TIME);
  1108. }
  1109. inline void refused_sleep(CTimeMon &tm, unsigned &refuseddelay)
  1110. {
  1111. unsigned remaining;
  1112. if (!tm.timedout(&remaining)) {
  1113. if (refuseddelay<remaining/4) {
  1114. Sleep(refuseddelay);
  1115. if (refuseddelay<CONNECT_TIMEOUT_REFUSED_WAIT/2)
  1116. refuseddelay *=2;
  1117. else
  1118. refuseddelay = CONNECT_TIMEOUT_REFUSED_WAIT;
  1119. }
  1120. else
  1121. Sleep(remaining/4); // towards end of timeout approach gradually
  1122. }
  1123. }
  1124. bool CSocket::connect_timeout( unsigned timeout, bool noexception)
  1125. {
  1126. // simple connect with timeout (no fancy stuff!)
  1127. unsigned startt = usTick();
  1128. CTimeMon tm(timeout);
  1129. unsigned remaining;
  1130. unsigned refuseddelay = 1;
  1131. int err;
  1132. while (!tm.timedout(&remaining))
  1133. {
  1134. err = pre_connect(false);
  1135. if ((err == JSE_INPROGRESS)||(err == JSE_WOULDBLOCK))
  1136. {
  1137. #ifdef _USE_SELECT
  1138. T_FD_SET fds;
  1139. struct timeval tv;
  1140. CHECKSOCKRANGE(sock);
  1141. XFD_ZERO(&fds);
  1142. FD_SET((unsigned)sock, &fds);
  1143. T_FD_SET except;
  1144. XFD_ZERO(&except);
  1145. FD_SET((unsigned)sock, &except);
  1146. tv.tv_sec = remaining / 1000;
  1147. tv.tv_usec = (remaining % 1000)*1000;
  1148. int rc = ::select( sock + 1, NULL, (fd_set *)&fds, (fd_set *)&except, &tv );
  1149. #else
  1150. struct pollfd fds[1];
  1151. fds[0].fd = sock;
  1152. fds[0].events = POLLOUT;
  1153. fds[0].revents = 0;
  1154. int rc = ::poll(fds, 1, remaining);
  1155. #endif
  1156. if (rc>0)
  1157. {
  1158. // select/poll succeeded - return error from socket (0 if connected)
  1159. socklen_t errlen = sizeof(err);
  1160. rc = getsockopt(sock, SOL_SOCKET, SO_ERROR, (char *)&err, &errlen); // check for error
  1161. if ((rc!=0)&&!err)
  1162. err = ERRNO(); // some implementations of getsockopt duff
  1163. if (err) // probably ECONNREFUSED but treat all errors same
  1164. refused_sleep(tm,refuseddelay);
  1165. }
  1166. else if (rc<0)
  1167. {
  1168. err = ERRNO();
  1169. if (err != JSE_INTR)
  1170. {
  1171. LOGERR2(err,2,"::select/poll");
  1172. errclose();
  1173. break;
  1174. }
  1175. }
  1176. }
  1177. else if (err)
  1178. refused_sleep(tm, refuseddelay); // this stops becoming cpu bound
  1179. if (err==0)
  1180. {
  1181. err = post_connect();
  1182. if (err==0)
  1183. {
  1184. STATS.connects++;
  1185. STATS.connecttime+=usTick()-startt;
  1186. #ifdef _TRACE
  1187. setTraceName();
  1188. #endif
  1189. #ifdef SOCKTRACE
  1190. unsigned conn_mstime = (usTick() - startt) / 1000;
  1191. logConnectionInfo(timeout, conn_mstime);
  1192. #endif
  1193. return true;
  1194. }
  1195. }
  1196. errclose();
  1197. }
  1198. #ifdef SOCKTRACE
  1199. PROGLOG("connect_timeout: failed %d",err);
  1200. #endif
  1201. STATS.failedconnects++;
  1202. STATS.failedconnecttime+=usTick()-startt;
  1203. if (!noexception)
  1204. THROWJSOCKEXCEPTION(JSOCKERR_connection_failed);
  1205. return false;
  1206. }
  1207. ISocket* ISocket::connect_timeout(const SocketEndpoint &ep,unsigned timeout)
  1208. {
  1209. if (ep.isNull()||(ep.port==0))
  1210. THROWJSOCKEXCEPTION2(JSOCKERR_bad_address);
  1211. Owned<CSocket> sock = new CSocket(ep,sm_tcp,NULL);
  1212. sock->connect_timeout(timeout,false);
  1213. return sock.getClear();
  1214. }
  1215. #define POLLTIME 50
  1216. void CSocket::connect_wait(unsigned timems)
  1217. {
  1218. // simple connect with timeout (no fancy stuff!)
  1219. unsigned startt = usTick();
  1220. CTimeMon tm(timems);
  1221. bool exit = false;
  1222. int err;
  1223. unsigned refuseddelay = 1;
  1224. while (!exit) {
  1225. #ifdef CENTRAL_NODE_RANDOM_DELAY
  1226. ForEachItemIn(cn,CentralNodeArray) {
  1227. const SocketEndpoint &ep=CentralNodeArray.item(cn);
  1228. if (ep.ipequals(targetip)) {
  1229. unsigned sleeptime = getRandom() % 1000;
  1230. StringBuffer s;
  1231. ep.getIpText(s);
  1232. DBGLOG("Connection to central node %s - sleeping %d milliseconds", s.str(), sleeptime);
  1233. Sleep(sleeptime);
  1234. break;
  1235. }
  1236. }
  1237. #endif
  1238. unsigned remaining;
  1239. exit = tm.timedout(&remaining);
  1240. bool blockselect = exit; // if last time round block
  1241. {
  1242. CriticalBlock block(crit);
  1243. if (++connectingcount>4)
  1244. blockselect = true;
  1245. }
  1246. err = pre_connect(blockselect);
  1247. if (blockselect)
  1248. {
  1249. if (err&&!exit)
  1250. refused_sleep(tm,refuseddelay); // probably ECONNREFUSED but treat all errors same
  1251. }
  1252. else
  1253. {
  1254. unsigned timeoutms = (exit||(remaining<10000))?10000:remaining;
  1255. #ifndef BLOCK_POLLED_SINGLE_CONNECTS
  1256. unsigned polltime = 1;
  1257. #endif
  1258. while (!blockselect && ((err == JSE_INPROGRESS)||(err == JSE_WOULDBLOCK)))
  1259. {
  1260. #ifdef _USE_SELECT
  1261. T_FD_SET fds;
  1262. struct timeval tv;
  1263. CHECKSOCKRANGE(sock);
  1264. XFD_ZERO(&fds);
  1265. FD_SET((unsigned)sock, &fds);
  1266. T_FD_SET except;
  1267. XFD_ZERO(&except);
  1268. FD_SET((unsigned)sock, &except);
  1269. #ifdef BLOCK_POLLED_SINGLE_CONNECTS
  1270. tv.tv_sec = timeoutms / 1000;
  1271. tv.tv_usec = (timeoutms % 1000)*1000;
  1272. #else
  1273. tv.tv_sec = 0;
  1274. tv.tv_usec = 0;
  1275. #endif
  1276. int rc = ::select( sock + 1, NULL, (fd_set *)&fds, (fd_set *)&except, &tv );
  1277. #else
  1278. struct pollfd fds[1];
  1279. fds[0].fd = sock;
  1280. fds[0].events = POLLOUT;
  1281. fds[0].revents = 0;
  1282. #ifdef BLOCK_POLLED_SINGLE_CONNECTS
  1283. int pollTimeOut = timeoutms;
  1284. #else
  1285. int pollTimeOut = 0;
  1286. #endif
  1287. int rc = ::poll(fds, 1, pollTimeOut);
  1288. #endif
  1289. if (rc>0)
  1290. {
  1291. // select/poll succeeded - return error from socket (0 if connected)
  1292. socklen_t errlen = sizeof(err);
  1293. rc = getsockopt(sock, SOL_SOCKET, SO_ERROR, (char *)&err, &errlen); // check for error
  1294. if ((rc!=0)&&!err)
  1295. err = ERRNO(); // some implementations of getsockopt duff
  1296. if (err)
  1297. refused_sleep(tm,refuseddelay); // probably ECONNREFUSED but treat all errors same
  1298. break;
  1299. }
  1300. if (rc<0)
  1301. {
  1302. err = ERRNO();
  1303. if (err != JSE_INTR)
  1304. {
  1305. LOGERR2(err,2,"::select/poll");
  1306. break;
  1307. }
  1308. }
  1309. if (!timeoutms)
  1310. {
  1311. #ifdef SOCKTRACE
  1312. PROGLOG("connecttimeout: timed out");
  1313. #endif
  1314. err = -1;
  1315. break;
  1316. }
  1317. #ifdef BLOCK_POLLED_SINGLE_CONNECTS
  1318. break;
  1319. #else
  1320. if (timeoutms<polltime)
  1321. polltime = timeoutms;
  1322. Sleep(polltime); // sleeps 1-50ms (to let other threads run)
  1323. timeoutms -= polltime;
  1324. if (polltime>POLLTIME/2)
  1325. polltime = POLLTIME;
  1326. else
  1327. polltime *= 2;
  1328. #endif
  1329. }
  1330. }
  1331. {
  1332. CriticalBlock block(crit);
  1333. --connectingcount;
  1334. }
  1335. if (err==0)
  1336. {
  1337. err = post_connect();
  1338. if (err==0)
  1339. {
  1340. STATS.connects++;
  1341. STATS.connecttime+=usTick()-startt;
  1342. #ifdef _TRACE
  1343. setTraceName();
  1344. #endif
  1345. #ifdef SOCKTRACE
  1346. unsigned conn_mstime = (usTick() - startt) / 1000;
  1347. logConnectionInfo(timems, conn_mstime);
  1348. #endif
  1349. return;
  1350. }
  1351. }
  1352. errclose();
  1353. }
  1354. #ifdef SOCKTRACE
  1355. PROGLOG("connect_wait: failed %d",err);
  1356. #endif
  1357. STATS.failedconnects++;
  1358. STATS.failedconnecttime+=usTick()-startt;
  1359. THROWJSOCKEXCEPTION(JSOCKERR_connection_failed);
  1360. }
  1361. void CSocket::setTraceName(const char * prefix, const char * name)
  1362. {
  1363. #ifdef _TRACE
  1364. StringBuffer peer;
  1365. peer.append(prefix);
  1366. if (state == ss_open)
  1367. peer.append(":").append(localPort).append(" -> ");
  1368. peer.append(name?name:"(NULL)");
  1369. peer.append(":").append(hostport);
  1370. if (sock != INVALID_SOCKET)
  1371. peer.append(" (").append(sock).append(")");
  1372. free(tracename);
  1373. tracename = peer.detach();
  1374. #endif
  1375. }
  1376. void CSocket::setTraceName()
  1377. {
  1378. #ifdef _TRACE
  1379. setTraceName("C!", hostname);
  1380. #endif
  1381. }
  1382. ISocket* ISocket::connect_wait( const SocketEndpoint &ep, unsigned timems)
  1383. {
  1384. if (ep.isNull()||(ep.port==0))
  1385. THROWJSOCKEXCEPTION2(JSOCKERR_bad_address);
  1386. Owned<CSocket> sock = new CSocket(ep,sm_tcp,NULL);
  1387. sock->connect_wait(timems);
  1388. return sock.getClear();
  1389. }
  1390. void CSocket::udpconnect()
  1391. {
  1392. DEFINE_SOCKADDR(u);
  1393. if (targetip.isNull()) {
  1394. set_return_addr(hostport,hostname);
  1395. targetip.ipset(returnep);
  1396. }
  1397. socklen_t ul = setSockAddr(u,targetip,hostport);
  1398. sock = ::socket(u.sa.sa_family, SOCK_DGRAM, targetip.isIp4()?0:PF_INET6);
  1399. #ifdef SOCKTRACE
  1400. PROGLOG("SOCKTRACE: udp connected socket %x %d (%p)", sock, sock, this);
  1401. #endif
  1402. STATS.activesockets++;
  1403. if (sock == INVALID_SOCKET) {
  1404. THROWJSOCKEXCEPTION(ERRNO());
  1405. }
  1406. int res = ::connect(sock, &u.sa, ul);
  1407. if (res != 0) { // works for UDP
  1408. closesock();
  1409. THROWJSOCKEXCEPTION(JSOCKERR_connection_failed);
  1410. }
  1411. nagling = false; // means nothing for UDP
  1412. state = ss_open;
  1413. #ifdef _TRACE
  1414. setTraceName();
  1415. #endif
  1416. }
  1417. int CSocket::logPollError(unsigned revents, const char *rwstr)
  1418. {
  1419. // return appropriate errno as caller will probably throw/log it
  1420. int retcode = EINVAL;
  1421. if (revents & POLLERR)
  1422. {
  1423. StringBuffer errStr;
  1424. errStr.appendf("%s POLLERR %u l:%d r:%s:%d", rwstr, sock, localPort, (hostname?hostname:"NULL"), hostport);
  1425. int serror = 0;
  1426. socklen_t serrlen = sizeof(serror);
  1427. int srtn = getsockopt(sock, SOL_SOCKET, SO_ERROR, (char *)&serror, &serrlen);
  1428. if (srtn != 0)
  1429. serror = ERRNO();
  1430. LOGERR2(serror,2,errStr.str());
  1431. retcode = serror;
  1432. }
  1433. else if (revents & POLLNVAL)
  1434. {
  1435. // These are typically expected - when closing a socket in order to interrupt a thread that waits on it, for example
  1436. }
  1437. else
  1438. {
  1439. StringBuffer errStr;
  1440. errStr.appendf("%s unknown poll() revents: 0x%x", rwstr, revents);
  1441. LOGERR2(999,4,errStr.str());
  1442. }
  1443. return retcode;
  1444. }
  1445. int CSocket::wait_read(unsigned timeout)
  1446. {
  1447. int ret = 0;
  1448. int retrycount = 100;
  1449. while (sock!=INVALID_SOCKET)
  1450. {
  1451. #ifdef _USE_SELECT
  1452. T_FD_SET fds;
  1453. CHECKSOCKRANGE(sock);
  1454. XFD_ZERO(&fds);
  1455. FD_SET((unsigned)sock, &fds);
  1456. if (timeout==WAIT_FOREVER)
  1457. {
  1458. ret = ::select( sock + 1, (fd_set *)&fds, NULL, NULL, NULL );
  1459. }
  1460. else
  1461. {
  1462. struct timeval tv;
  1463. tv.tv_sec = timeout / 1000;
  1464. tv.tv_usec = (timeout % 1000)*1000;
  1465. ret = ::select( sock + 1, (fd_set *)&fds, NULL, NULL, &tv );
  1466. }
  1467. #else
  1468. struct pollfd fds[1];
  1469. fds[0].fd = sock;
  1470. fds[0].events = POLLINX;
  1471. fds[0].revents = 0;
  1472. ret = ::poll(fds, 1, timeout);
  1473. #endif
  1474. if (ret == SOCKET_ERROR)
  1475. { // error
  1476. int err = ERRNO();
  1477. if (err!=JSE_INTR)
  1478. { // else retry (should adjust time but for our usage don't think it matters that much)
  1479. LOGERR2(err,1,"wait_read");
  1480. break;
  1481. }
  1482. else
  1483. {
  1484. if (retrycount-- <= 0)
  1485. break;
  1486. }
  1487. }
  1488. else if (ret == 0)
  1489. { // timeout, no error (timeout can be 0 for a fast check)
  1490. break;
  1491. }
  1492. else
  1493. { // ret > 0 - ready or error
  1494. // if returning < 0 also set errno as caller will probably throw/log it
  1495. #ifdef _USE_SELECT
  1496. if (!FD_ISSET(sock, &fds))
  1497. {
  1498. LOGERR2(998,7,"wait_read");
  1499. errno = EBADF;
  1500. ret = -1;
  1501. }
  1502. #else
  1503. if ( (fds[0].revents & (POLLERR | POLLNVAL)) || (!(fds[0].revents & (POLLINX | POLLHUP))) )
  1504. {
  1505. errno = logPollError(fds[0].revents, "wait_read");
  1506. ret = -1;
  1507. }
  1508. else
  1509. {
  1510. // cannot error out on POLLRDHUP or POLLHUP as there can still be data to read
  1511. // POLLIN | POLLRDHUP | POLLHUP ok
  1512. break;
  1513. }
  1514. #endif
  1515. break;
  1516. }
  1517. }
  1518. return ret;
  1519. }
  1520. int CSocket::wait_write(unsigned timeout)
  1521. {
  1522. int ret = 0;
  1523. while (sock!=INVALID_SOCKET) {
  1524. #ifdef _USE_SELECT
  1525. T_FD_SET fds;
  1526. CHECKSOCKRANGE(sock);
  1527. XFD_ZERO(&fds);
  1528. FD_SET((unsigned)sock, &fds);
  1529. if (timeout==WAIT_FOREVER) {
  1530. ret = ::select( sock + 1, NULL, (fd_set *)&fds, NULL, NULL );
  1531. }
  1532. else {
  1533. struct timeval tv;
  1534. tv.tv_sec = timeout / 1000;
  1535. tv.tv_usec = (timeout % 1000)*1000;
  1536. ret = ::select( sock + 1, NULL, (fd_set *)&fds, NULL, &tv );
  1537. }
  1538. #else
  1539. struct pollfd fds[1];
  1540. fds[0].fd = sock;
  1541. fds[0].events = POLLOUT;
  1542. fds[0].revents = 0;
  1543. ret = ::poll(fds, 1, timeout);
  1544. #endif
  1545. if (ret==SOCKET_ERROR)
  1546. {
  1547. int err = ERRNO();
  1548. if (err!=JSE_INTR)
  1549. { // else retry (should adjust time but for our usage don't think it matters that much)
  1550. LOGERR2(err,1,"wait_write");
  1551. break;
  1552. }
  1553. }
  1554. else if (ret == 0)
  1555. { // timeout
  1556. break;
  1557. }
  1558. else
  1559. { // ret > 0 - ready or error
  1560. // if returning < 0 also set errno as caller will probably throw/log it
  1561. #ifdef _USE_SELECT
  1562. if (!FD_ISSET(sock, &fds))
  1563. {
  1564. LOGERR2(998,7,"wait_write");
  1565. errno = EBADF;
  1566. ret = -1;
  1567. }
  1568. #else
  1569. if ( (fds[0].revents & (POLLERR | POLLNVAL)) || (!(fds[0].revents & (POLLOUT | POLLHUP))) )
  1570. {
  1571. errno = logPollError(fds[0].revents, "wait_write");
  1572. ret = -1;
  1573. }
  1574. else if (fds[0].revents & POLLHUP)
  1575. {
  1576. LOGERR2(998,5,"wait_write POLLHUP");
  1577. errno = EPIPE;
  1578. ret = -1;
  1579. }
  1580. else
  1581. {
  1582. // POLLOUT ok
  1583. break;
  1584. }
  1585. #endif
  1586. break;
  1587. }
  1588. }
  1589. return ret;
  1590. }
  1591. void CSocket::readtms(void* buf, size32_t min_size, size32_t max_size, size32_t &size_read,
  1592. unsigned timeoutms)
  1593. {
  1594. if (timeoutms == WAIT_FOREVER) {
  1595. read(buf,min_size, max_size, size_read,WAIT_FOREVER);
  1596. return;
  1597. }
  1598. unsigned startt=usTick();
  1599. size_read = 0;
  1600. if (state != ss_open) {
  1601. THROWJSOCKEXCEPTION(JSOCKERR_not_opened);
  1602. }
  1603. unsigned start;
  1604. unsigned timeleft;
  1605. start = msTick();
  1606. timeleft = timeoutms;
  1607. do {
  1608. int rc = wait_read(timeleft);
  1609. if (rc < 0) {
  1610. THROWJSOCKEXCEPTION(ERRNO());
  1611. }
  1612. if (rc == 0) {
  1613. THROWJSOCKEXCEPTION(JSOCKERR_timeout_expired);
  1614. }
  1615. unsigned elapsed = (msTick()-start);
  1616. if (elapsed<timeoutms)
  1617. timeleft = timeoutms-elapsed;
  1618. else
  1619. timeleft = 0;
  1620. unsigned retrycount=100;
  1621. EintrRetry:
  1622. if (sockmode==sm_udp_server) { // udp server
  1623. DEFINE_SOCKADDR(u);
  1624. socklen_t ul=sizeof(u);
  1625. rc = recvfrom(sock, (char*)buf + size_read, max_size - size_read, 0, &u.sa,&ul);
  1626. getSockAddrEndpoint(u,ul,returnep);
  1627. }
  1628. else {
  1629. rc = recv(sock, (char*)buf + size_read, max_size - size_read, 0);
  1630. }
  1631. if (rc < 0) {
  1632. int err = ERRNO();
  1633. if (BADSOCKERR(err)) {
  1634. // don't think this should happen but convert to same as shutdown while investigation
  1635. LOGERR2(err,1,"Socket closed during read");
  1636. rc = 0;
  1637. }
  1638. else if ((err==JSE_INTR)&&(retrycount--!=0)) {
  1639. LOGERR2(err,1,"EINTR retrying");
  1640. goto EintrRetry;
  1641. }
  1642. else {
  1643. VStringBuffer errMsg("readtms(timeoutms=%d)", timeoutms);
  1644. LOGERR2(err,1,errMsg.str());
  1645. if ((err==JSE_CONNRESET)||(err==JSE_INTR)||(err==JSE_CONNABORTED)) {
  1646. errclose();
  1647. err = JSOCKERR_broken_pipe;
  1648. }
  1649. THROWJSOCKEXCEPTION(err);
  1650. }
  1651. }
  1652. if (rc == 0) {
  1653. state = ss_shutdown;
  1654. if (min_size==0)
  1655. break; // if min_read is 0 return 0 if socket closed
  1656. THROWJSOCKEXCEPTION(JSOCKERR_graceful_close);
  1657. }
  1658. size_read += rc;
  1659. } while (size_read < min_size);
  1660. STATS.reads++;
  1661. STATS.readsize += size_read;
  1662. STATS.readtime+=usTick()-startt;
  1663. }
  1664. void CSocket::read(void* buf, size32_t min_size, size32_t max_size, size32_t &size_read,
  1665. unsigned timeoutsecs)
  1666. {
  1667. unsigned startt=usTick();
  1668. size_read = 0;
  1669. unsigned start = 0;
  1670. unsigned timeleft = 0;
  1671. if (state != ss_open) {
  1672. THROWJSOCKEXCEPTION(JSOCKERR_not_opened);
  1673. }
  1674. if (timeoutsecs != WAIT_FOREVER) {
  1675. start = (unsigned)time(NULL);
  1676. timeleft = timeoutsecs;
  1677. }
  1678. do {
  1679. int rc;
  1680. if (timeoutsecs != WAIT_FOREVER) {
  1681. rc = wait_read(timeleft*1000);
  1682. if (rc < 0) {
  1683. THROWJSOCKEXCEPTION(ERRNO());
  1684. }
  1685. if (rc == 0) {
  1686. THROWJSOCKEXCEPTION(JSOCKERR_timeout_expired);
  1687. }
  1688. unsigned elapsed = ((unsigned)time(NULL))-start;
  1689. if (elapsed<timeoutsecs)
  1690. timeleft = timeoutsecs-elapsed;
  1691. else
  1692. timeleft = 0;
  1693. }
  1694. unsigned retrycount=100;
  1695. EintrRetry:
  1696. if (sockmode==sm_udp_server) { // udp server
  1697. DEFINE_SOCKADDR(u);
  1698. socklen_t ul=sizeof(u.sin);
  1699. rc = recvfrom(sock, (char*)buf + size_read, max_size - size_read, 0, &u.sa,&ul);
  1700. getSockAddrEndpoint(u,ul,returnep);
  1701. }
  1702. else {
  1703. rc = recv(sock, (char*)buf + size_read, max_size - size_read, 0);
  1704. }
  1705. if (rc < 0) {
  1706. int err = ERRNO();
  1707. if (BADSOCKERR(err)) {
  1708. // don't think this should happen but convert to same as shutdown while investigation
  1709. LOGERR2(err,3,"Socket closed during read");
  1710. rc = 0;
  1711. }
  1712. else if ((err==JSE_INTR)&&(retrycount--!=0)) {
  1713. if (sock==INVALID_SOCKET)
  1714. rc = 0; // convert an EINTR after closed to a graceful close
  1715. else {
  1716. LOGERR2(err,3,"EINTR retrying");
  1717. goto EintrRetry;
  1718. }
  1719. }
  1720. else {
  1721. LOGERR2(err,3,"read");
  1722. if ((err==JSE_CONNRESET)||(err==JSE_INTR)||(err==JSE_CONNABORTED)) {
  1723. errclose();
  1724. err = JSOCKERR_broken_pipe;
  1725. }
  1726. THROWJSOCKEXCEPTION(err);
  1727. }
  1728. }
  1729. if (rc == 0) {
  1730. state = ss_shutdown;
  1731. if (min_size==0)
  1732. break; // if min_read is 0 return 0 if socket closed
  1733. THROWJSOCKEXCEPTION(JSOCKERR_graceful_close);
  1734. }
  1735. size_read += rc;
  1736. } while (size_read < min_size);
  1737. STATS.reads++;
  1738. STATS.readsize += size_read;
  1739. STATS.readtime+=usTick()-startt;
  1740. }
  1741. void CSocket::read(void* buf, size32_t size)
  1742. {
  1743. if (!size)
  1744. return;
  1745. unsigned startt=usTick();
  1746. size32_t size_read=size;
  1747. if (state != ss_open) {
  1748. THROWJSOCKEXCEPTION(JSOCKERR_not_opened);
  1749. }
  1750. do {
  1751. unsigned retrycount=100;
  1752. EintrRetry:
  1753. int rc;
  1754. if (sockmode==sm_udp_server) { // udp server
  1755. DEFINE_SOCKADDR(u);
  1756. socklen_t ul=sizeof(u.sin);
  1757. rc = recvfrom(sock, (char*)buf, size, 0, &u.sa,&ul);
  1758. getSockAddrEndpoint(u,ul,returnep);
  1759. }
  1760. else {
  1761. rc = recv(sock, (char*)buf, size, 0);
  1762. }
  1763. if (rc < 0) {
  1764. int err = ERRNO();
  1765. if (BADSOCKERR(err)) {
  1766. // don't think this should happen but convert to same as shutdown while investigation
  1767. LOGERR2(err,5,"Socket closed during read");
  1768. rc = 0;
  1769. }
  1770. else if ((err==JSE_INTR)&&(retrycount--!=0)) {
  1771. LOGERR2(err,5,"EINTR retrying");
  1772. goto EintrRetry;
  1773. }
  1774. else {
  1775. LOGERR2(err,5,"read");
  1776. if ((err==JSE_CONNRESET)||(err==JSE_INTR)||(err==JSE_CONNABORTED)) {
  1777. errclose();
  1778. err = JSOCKERR_broken_pipe;
  1779. }
  1780. THROWJSOCKEXCEPTION(err);
  1781. }
  1782. }
  1783. if (rc == 0) {
  1784. state = ss_shutdown;
  1785. THROWJSOCKEXCEPTION(JSOCKERR_graceful_close);
  1786. }
  1787. buf = (char*)buf + rc;
  1788. size -= rc;
  1789. } while (size != 0);
  1790. STATS.reads++;
  1791. STATS.readsize += size_read;
  1792. STATS.readtime+=usTick()-startt;
  1793. }
  1794. size32_t CSocket::write(void const* buf, size32_t size)
  1795. {
  1796. if (size==0)
  1797. return 0;
  1798. unsigned startt=usTick();
  1799. size32_t size_writ = size;
  1800. if (state != ss_open) {
  1801. THROWJSOCKEXCEPTION(JSOCKERR_not_opened);
  1802. }
  1803. size32_t res=0;
  1804. do {
  1805. unsigned retrycount=100;
  1806. EintrRetry:
  1807. int rc;
  1808. if (sockmode==sm_udp_server) { // udp server
  1809. DEFINE_SOCKADDR(u);
  1810. socklen_t ul = setSockAddr(u,returnep,returnep.port);
  1811. rc = sendto(sock, (char*)buf, size, 0, &u.sa, ul);
  1812. }
  1813. #ifdef SIMULATE_LOST_UDP_PACKETS
  1814. else if (sockmode==sm_udp && size <= 24 && dropCounter++ >= dropThreshold)
  1815. {
  1816. DBGLOG("Drop size %d cmd %d", size, *(unsigned short *)buf);
  1817. dropCounter = 0;
  1818. rc = size;
  1819. }
  1820. #endif
  1821. else {
  1822. rc = send(sock, (char*)buf, size, SEND_FLAGS);
  1823. }
  1824. if (rc < 0) {
  1825. int err=ERRNO();
  1826. if (BADSOCKERR(err)) {
  1827. LOGERR2(err,7,"Socket closed during write");
  1828. rc = 0;
  1829. }
  1830. else if ((err==JSE_INTR)&&(retrycount--!=0)) {
  1831. LOGERR2(err,7,"EINTR retrying");
  1832. goto EintrRetry;
  1833. }
  1834. else {
  1835. if (((sockmode==sm_multicast)||(sockmode==sm_udp))&&(err==JSE_CONNREFUSED))
  1836. break; // ignore
  1837. LOGERR2(err,7,"write");
  1838. if ((err==JSE_CONNRESET)||(err==JSE_INTR)||(err==JSE_CONNABORTED)
  1839. #ifndef _WIN32
  1840. ||(err==EPIPE)||(err==JSE_TIMEDOUT) // linux can raise these on broken pipe
  1841. #endif
  1842. ) {
  1843. errclose();
  1844. err = JSOCKERR_broken_pipe;
  1845. }
  1846. if ((err == JSE_WOULDBLOCK) && nonblocking)
  1847. break;
  1848. THROWJSOCKEXCEPTION(err);
  1849. }
  1850. }
  1851. res += rc;
  1852. if (rc == 0) {
  1853. state = ss_shutdown;
  1854. THROWJSOCKEXCEPTION(JSOCKERR_graceful_close);
  1855. }
  1856. if (nonblocking)
  1857. break;
  1858. buf = (char*)buf + rc;
  1859. size -= rc;
  1860. } while (size != 0);
  1861. STATS.writes++;
  1862. STATS.writesize += size_writ;
  1863. STATS.writetime+=usTick()-startt;
  1864. return res;
  1865. }
  1866. size32_t CSocket::writetms(void const* buf, size32_t size, unsigned timeoutms)
  1867. {
  1868. if (size==0)
  1869. return 0;
  1870. if (state != ss_open)
  1871. {
  1872. THROWJSOCKEXCEPTION(JSOCKERR_not_opened);
  1873. }
  1874. if (timeoutms == WAIT_FOREVER)
  1875. return write(buf, size);
  1876. const char *p = (const char *)buf;
  1877. unsigned start, elapsed;
  1878. start = msTick();
  1879. elapsed = 0;
  1880. size32_t nwritten = 0;
  1881. size32_t nleft = size;
  1882. unsigned rollover = 0;
  1883. bool prevblock = set_nonblock(true);
  1884. while ( (nwritten < size) && (elapsed <= timeoutms) )
  1885. {
  1886. size32_t amnt = write(p,nleft);
  1887. // can nonblock mode write() return -1 ?
  1888. if ( (amnt == 0) || (amnt == (size32_t)-1) )
  1889. {
  1890. if (++rollover >= 20)
  1891. {
  1892. rollover = 0;
  1893. Sleep(20);
  1894. }
  1895. }
  1896. else
  1897. {
  1898. nwritten += amnt;
  1899. nleft -= amnt;
  1900. p += amnt;
  1901. }
  1902. elapsed = msTick() - start;
  1903. }
  1904. set_nonblock(prevblock);
  1905. if (nwritten < size)
  1906. {
  1907. IERRLOG("writetms timed out; timeout: %u, nwritten: %u, size: %u", timeoutms, nwritten, size);
  1908. THROWJSOCKEXCEPTION(JSOCKERR_timeout_expired);
  1909. }
  1910. return nwritten;
  1911. }
  1912. bool CSocket::check_connection()
  1913. {
  1914. if (state != ss_open)
  1915. return false;
  1916. // This routine is for TCP stream sockets.
  1917. // It is the callers responsibility to ensure this is called
  1918. // in a thread-safe manner, while no other threads are reading.
  1919. // wait_read() returns = 0 - socket ok atm
  1920. // returns < 0 - error, assume closed
  1921. // returns > 0 - recv() to check for eof
  1922. int rc = wait_read(0);
  1923. if (rc == 0)
  1924. return true;
  1925. else if (rc < 0)
  1926. return false;
  1927. int retrycount = 100;
  1928. char buffer;
  1929. EintrRecv:
  1930. rc = recv(sock, &buffer, sizeof(buffer), MSG_PEEK);
  1931. // recv() returns = 0 - socket eof (closed)
  1932. // returns < 0 - error, assume closed
  1933. // returns > 0 - socket ok atm
  1934. if (rc == 0)
  1935. return false;
  1936. else if (rc < 0)
  1937. {
  1938. int err=ERRNO();
  1939. if ((err==JSE_INTR)&&(retrycount--!=0))
  1940. {
  1941. LOGERR2(err,7,"recv EINTR retrying");
  1942. goto EintrRecv;
  1943. }
  1944. else
  1945. return false;
  1946. }
  1947. return true;
  1948. }
  1949. size32_t CSocket::udp_write_to(const SocketEndpoint &ep, void const* buf, size32_t size)
  1950. {
  1951. if (size==0)
  1952. return 0;
  1953. unsigned startt=usTick();
  1954. if (state != ss_open) {
  1955. THROWJSOCKEXCEPTION(JSOCKERR_not_opened);
  1956. }
  1957. size32_t res=0;
  1958. DEFINE_SOCKADDR(u);
  1959. for (;;) {
  1960. socklen_t ul = setSockAddr(u,ep,ep.port);
  1961. int rc = sendto(sock, (char*)buf, size, 0, &u.sa, ul);
  1962. if (rc < 0) {
  1963. int err=ERRNO();
  1964. if (((sockmode==sm_multicast)||(sockmode==sm_udp))&&(err==JSE_CONNREFUSED))
  1965. break; // ignore
  1966. if (err!=JSE_INTR) {
  1967. THROWJSOCKEXCEPTION(err);
  1968. }
  1969. }
  1970. else {
  1971. res = (size32_t)rc;
  1972. break;
  1973. }
  1974. }
  1975. STATS.writes++;
  1976. STATS.writesize += res;
  1977. STATS.writetime+=usTick()-startt;
  1978. return res;
  1979. }
  1980. size32_t CSocket::write_multiple(unsigned num,const void **buf, size32_t *size)
  1981. {
  1982. assertex(sockmode!=sm_udp_server);
  1983. assertex(!nonblocking);
  1984. if (num==1)
  1985. return write(buf[0],size[0]);
  1986. size32_t total = 0;
  1987. unsigned i;
  1988. for (i=0;i<num;i++)
  1989. total += size[i];
  1990. if (total==0)
  1991. return 0;
  1992. unsigned startt=usTick();
  1993. if (state != ss_open) {
  1994. THROWJSOCKEXCEPTION(JSOCKERR_not_opened);
  1995. }
  1996. size32_t res=0;
  1997. #ifdef _WIN32
  1998. WSABUF *bufs = (WSABUF *)alloca(sizeof(WSABUF)*num);
  1999. for (i=0;i<num;i++) {
  2000. bufs[i].buf = (char *)buf[i];
  2001. bufs[i].len = size[i];
  2002. }
  2003. unsigned retrycount=100;
  2004. EintrRetry:
  2005. DWORD sent;
  2006. if (WSASendTo(sock,bufs,num,&sent,0,NULL,0,NULL,NULL)==SOCKET_ERROR) {
  2007. int err=ERRNO();
  2008. if (BADSOCKERR(err)) {
  2009. LOGERR2(err,8,"Socket closed during write");
  2010. sent = 0;
  2011. }
  2012. else if ((err==JSE_INTR)&&(retrycount--!=0)) {
  2013. LOGERR2(err,8,"EINTR retrying");
  2014. goto EintrRetry;
  2015. }
  2016. else {
  2017. LOGERR2(err,8,"write_multiple");
  2018. if ((err==JSE_CONNRESET)||(err==JSE_INTR)||(err==JSE_CONNABORTED)||(err==JSE_TIMEDOUT)) {
  2019. errclose();
  2020. err = JSOCKERR_broken_pipe;
  2021. }
  2022. THROWJSOCKEXCEPTION(err);
  2023. }
  2024. }
  2025. if (sent == 0) {
  2026. state = ss_shutdown;
  2027. THROWJSOCKEXCEPTION(JSOCKERR_graceful_close);
  2028. }
  2029. res = sent;
  2030. #else
  2031. #ifdef USE_CORK
  2032. if (total>1024) {
  2033. class Copt
  2034. {
  2035. T_SOCKET sock;
  2036. bool nagling;
  2037. public:
  2038. Copt(T_SOCKET _sock,bool _nagling)
  2039. {
  2040. nagling = _nagling;
  2041. int enabled = 1;
  2042. int disabled = 0;
  2043. if (!nagling)
  2044. setsockopt(sock, IPPROTO_TCP, TCP_NODELAY, (char*)&disabled, sizeof(disabled));
  2045. setsockopt(sock, IPPROTO_TCP, TCP_CORK, (char*)&enabled, sizeof(enabled));
  2046. }
  2047. ~Copt()
  2048. {
  2049. int enabled = 1;
  2050. int disabled = 0;
  2051. setsockopt(sock, IPPROTO_TCP, TCP_CORK, (char*)&disabled, sizeof(disabled));
  2052. if (!nagling)
  2053. setsockopt(sock, IPPROTO_TCP, TCP_NODELAY, (char*)&enabled, sizeof(enabled));
  2054. }
  2055. } copt(sock,nagling);
  2056. for (i=0;i<num;i++)
  2057. res += write(buf[i],size[i]);
  2058. }
  2059. else {
  2060. byte b[1024];
  2061. byte *p=b;
  2062. for (i=0;i<num;i++) {
  2063. memcpy(p,buf[i],size[i]);
  2064. p += size[i];
  2065. }
  2066. res = write(b,total);
  2067. }
  2068. #else
  2069. // send in equal chunks about 64K
  2070. unsigned n = (total+0xffff)/0x10000;
  2071. size32_t outbufsize = (total+n-1)/n;
  2072. MemoryAttr ma;
  2073. byte *outbuf = (byte *)ma.allocate(outbufsize);
  2074. i = 0;
  2075. size32_t os = 0;
  2076. size32_t left = total;
  2077. byte *b = NULL;
  2078. size32_t s=0;
  2079. for (;;) {
  2080. while (!s&&(i<num)) {
  2081. b = (byte *)buf[i];
  2082. s = size[i];
  2083. i++;
  2084. }
  2085. if ((os==0)&&(s==left)) {
  2086. write(b,s); // go for it
  2087. break;
  2088. }
  2089. else {
  2090. size32_t cpy = outbufsize-os;
  2091. if (cpy>s)
  2092. cpy = s;
  2093. memcpy(outbuf+os,b,cpy);
  2094. os += cpy;
  2095. left -= cpy;
  2096. s -= cpy;
  2097. b += cpy;
  2098. if (left==0) {
  2099. write(outbuf,os);
  2100. break;
  2101. }
  2102. else if (os==outbufsize) {
  2103. write(outbuf,os);
  2104. os = 0;
  2105. }
  2106. }
  2107. }
  2108. #endif
  2109. #endif
  2110. STATS.writes++;
  2111. STATS.writesize += res;
  2112. STATS.writetime+=usTick()-startt;
  2113. return res;
  2114. }
  2115. bool CSocket::send_block(const void *blk,size32_t sz)
  2116. {
  2117. unsigned startt=usTick();
  2118. #ifdef TRACE_SLOW_BLOCK_TRANSFER
  2119. unsigned startt2 = startt;
  2120. unsigned startt3 = startt;
  2121. #endif
  2122. if (blockflags&BF_SYNC_TRANSFER_PULL) {
  2123. size32_t rd;
  2124. bool eof = true;
  2125. readtms(&eof,sizeof(eof),sizeof(eof),rd,blocktimeoutms);
  2126. if (eof)
  2127. return false;
  2128. #ifdef TRACE_SLOW_BLOCK_TRANSFER
  2129. startt2=usTick();
  2130. #endif
  2131. }
  2132. if (!blk||!sz) {
  2133. sz = 0;
  2134. write(&sz,sizeof(sz));
  2135. try {
  2136. bool reply;
  2137. size32_t rd;
  2138. readtms(&reply,sizeof(reply),sizeof(reply),rd,blocktimeoutms);
  2139. }
  2140. catch (IJSOCK_Exception *e) {
  2141. if ((e->errorCode()!=JSOCKERR_broken_pipe)&&(e->errorCode()!=JSOCKERR_graceful_close))
  2142. EXCLOG(e,"CSocket::send_block");
  2143. e->Release();
  2144. }
  2145. return false;
  2146. }
  2147. size32_t rsz=sz;
  2148. _WINREV(rsz);
  2149. write(&rsz,sizeof(rsz));
  2150. if (blockflags&BF_SYNC_TRANSFER_PUSH) {
  2151. #ifdef TRACE_SLOW_BLOCK_TRANSFER
  2152. startt2=usTick();
  2153. #endif
  2154. size32_t rd;
  2155. bool eof = true;
  2156. readtms(&eof,sizeof(eof),sizeof(eof),rd,blocktimeoutms);
  2157. if (eof)
  2158. return false;
  2159. #ifdef TRACE_SLOW_BLOCK_TRANSFER
  2160. startt3=usTick();
  2161. #endif
  2162. }
  2163. write(blk,sz);
  2164. if (blockflags&BF_RELIABLE_TRANSFER) {
  2165. bool isok=false;
  2166. size32_t rd;
  2167. readtms(&isok,sizeof(isok),sizeof(isok),rd,blocktimeoutms);
  2168. if (!isok)
  2169. return false;
  2170. }
  2171. unsigned nowt = usTick();
  2172. unsigned elapsed = nowt-startt;
  2173. STATS.blocksendtime+=elapsed;
  2174. STATS.numblocksends++;
  2175. STATS.blocksendsize+=sz;
  2176. if (elapsed>STATS.longestblocksend) {
  2177. STATS.longestblocksend = elapsed;
  2178. STATS.longestblocksize = sz;
  2179. }
  2180. #ifdef TRACE_SLOW_BLOCK_TRANSFER
  2181. if (elapsed>1000000*60) // over 1min
  2182. IWARNLOG("send_block took %ds to %s (%d,%d,%d)",elapsed/1000000,tracename,startt2-startt,startt3-startt2,nowt-startt3);
  2183. #endif
  2184. return true;
  2185. }
  2186. size32_t CSocket::receive_block_size()
  2187. {
  2188. // assumed always paired with receive_block
  2189. if (nextblocksize) {
  2190. if (blockflags&BF_SYNC_TRANSFER_PULL) {
  2191. bool eof=false;
  2192. write(&eof,sizeof(eof));
  2193. }
  2194. size32_t rd;
  2195. readtms(&nextblocksize,sizeof(nextblocksize),sizeof(nextblocksize),rd,blocktimeoutms);
  2196. _WINREV(nextblocksize);
  2197. if (nextblocksize==0) { // confirm eof
  2198. try {
  2199. bool confirm=true;
  2200. write(&confirm,sizeof(confirm));
  2201. }
  2202. catch (IJSOCK_Exception *e) {
  2203. if ((e->errorCode()!=JSOCKERR_broken_pipe)&&(e->errorCode()!=JSOCKERR_graceful_close))
  2204. EXCLOG(e,"receive_block_size");
  2205. e->Release();
  2206. }
  2207. }
  2208. else if (blockflags&BF_SYNC_TRANSFER_PUSH) { // leaves receiveblocksem clear
  2209. #ifdef USERECVSEM
  2210. CSemProtect semprot; // this will catch exception in write
  2211. while (!semprot.wait(&receiveblocksem,&receiveblocksemowned,60*1000*5))
  2212. IWARNLOG("Receive block stalled");
  2213. #endif
  2214. bool eof=false;
  2215. write(&eof,sizeof(eof));
  2216. #ifdef USERECVSEM
  2217. semprot.clear();
  2218. #endif
  2219. }
  2220. }
  2221. return nextblocksize;
  2222. }
  2223. size32_t CSocket::receive_block(void *blk,size32_t maxsize)
  2224. {
  2225. #ifdef USERECVSEM
  2226. CSemProtect semprot; // this will catch exceptions
  2227. #endif
  2228. size32_t sz = nextblocksize;
  2229. if (sz) {
  2230. if (sz==UINT_MAX) { // need to get size
  2231. if (!blk||!maxsize) {
  2232. if (blockflags&BF_SYNC_TRANSFER_PUSH) { // ignore block size
  2233. size32_t rd;
  2234. readtms(&nextblocksize,sizeof(nextblocksize),sizeof(nextblocksize),rd,blocktimeoutms);
  2235. }
  2236. if (blockflags&(BF_SYNC_TRANSFER_PULL|BF_SYNC_TRANSFER_PUSH)) { // signal eof
  2237. bool eof=true;
  2238. write(&eof,sizeof(eof));
  2239. nextblocksize = 0;
  2240. return 0;
  2241. }
  2242. }
  2243. sz = receive_block_size();
  2244. if (!sz)
  2245. return 0;
  2246. }
  2247. unsigned startt=usTick(); // include sem block but not initial handshake
  2248. #ifdef USERECVSEM
  2249. if (blockflags&BF_SYNC_TRANSFER_PUSH) // read_block_size sets semaphore
  2250. semprot.set(&receiveblocksem,&receiveblocksemowned); // this will reset semaphore on exit
  2251. #endif
  2252. nextblocksize = UINT_MAX;
  2253. size32_t rd;
  2254. if (sz<=maxsize) {
  2255. readtms(blk,sz,sz,rd,blocktimeoutms);
  2256. }
  2257. else { // truncate
  2258. readtms(blk,maxsize,maxsize,rd,blocktimeoutms);
  2259. sz -= maxsize;
  2260. OwnedMalloc<void> tmp = malloc(sz);
  2261. readtms(tmp,sz,sz,rd,blocktimeoutms);
  2262. sz = maxsize;
  2263. }
  2264. if (blockflags&BF_RELIABLE_TRANSFER) {
  2265. bool isok=true;
  2266. write(&isok,sizeof(isok));
  2267. }
  2268. unsigned elapsed = usTick()-startt;
  2269. STATS.blockrecvtime+=elapsed;
  2270. STATS.numblockrecvs++;
  2271. STATS.blockrecvsize+=sz;
  2272. }
  2273. return sz;
  2274. }
  2275. void CSocket::set_block_mode(unsigned flags, size32_t recsize, unsigned _timeoutms)
  2276. {
  2277. blockflags = flags;
  2278. nextblocksize = UINT_MAX;
  2279. blocktimeoutms = _timeoutms?_timeoutms:WAIT_FOREVER;
  2280. }
  2281. void CSocket::shutdown(unsigned mode)
  2282. {
  2283. if (state == ss_open) {
  2284. state = ss_shutdown;
  2285. #ifdef SOCKTRACE
  2286. PROGLOG("SOCKTRACE: shutdown(%d) socket %x %d (%p)", mode, sock, sock, this);
  2287. #endif
  2288. int rc = ::shutdown(sock, mode);
  2289. if (rc != 0) {
  2290. int err=ERRNO();
  2291. if (err==JSE_NOTCONN) {
  2292. #ifdef _TRACELINKCLOSED
  2293. DBGLOG("CSocket::shutdown(%d) failed, socket: %d", mode, sock);
  2294. #endif
  2295. LOGERR2(err,9,"shutdown");
  2296. err = JSOCKERR_broken_pipe;
  2297. }
  2298. THROWJSOCKEXCEPTION(err);
  2299. }
  2300. }
  2301. }
  2302. void CSocket::errclose()
  2303. {
  2304. #ifdef USERECVSEM
  2305. if (receiveblocksemowned) {
  2306. receiveblocksemowned = false;
  2307. receiveblocksem.signal();
  2308. }
  2309. #endif
  2310. if (state != ss_close) {
  2311. state = ss_close;
  2312. #ifdef SOCKTRACE
  2313. PROGLOG("SOCKTRACE: errclose socket %x %d (%p)", sock, sock, this);
  2314. #endif
  2315. if (mcastreq)
  2316. setsockopt(sock, IPPROTO_IP, IP_DROP_MEMBERSHIP,(char*)mcastreq,sizeof(*mcastreq));
  2317. closesock();
  2318. }
  2319. }
  2320. void CSocket::close()
  2321. {
  2322. #ifdef USERECVSEM
  2323. if (receiveblocksemowned) {
  2324. receiveblocksemowned = false;
  2325. receiveblocksem.signal();
  2326. }
  2327. #endif
  2328. if (state != ss_close) {
  2329. #ifdef SOCKTRACE
  2330. PROGLOG("SOCKTRACE: close socket %x %d (%p)", sock, sock, this);
  2331. #endif
  2332. state = ss_close;
  2333. if (mcastreq)
  2334. setsockopt(sock, IPPROTO_IP, IP_DROP_MEMBERSHIP,(char*)mcastreq,sizeof(*mcastreq));
  2335. if (closesock() != 0) {
  2336. THROWJSOCKEXCEPTION(ERRNO());
  2337. }
  2338. }
  2339. }
  2340. size32_t CSocket::get_max_send_size()
  2341. {
  2342. size32_t maxsend=0;
  2343. socklen_t size = sizeof(maxsend);
  2344. #if _WIN32
  2345. getsockopt(sock, SOL_SOCKET, SO_MAX_MSG_SIZE, (char *) &maxsend, &size);
  2346. #else
  2347. getsockopt(sock, SOL_SOCKET, SO_SNDBUF, (char *) &maxsend, &size); // not the same but closest I can find
  2348. #endif
  2349. return maxsend;
  2350. }
  2351. size32_t CSocket::get_send_buffer_size()
  2352. {
  2353. size32_t maxsend=0;
  2354. socklen_t size = sizeof(maxsend);
  2355. getsockopt(sock, SOL_SOCKET, SO_SNDBUF, (char *) &maxsend, &size);
  2356. return maxsend;
  2357. }
  2358. void CSocket::set_send_buffer_size(size32_t maxsend)
  2359. {
  2360. if (setsockopt(sock, SOL_SOCKET, SO_SNDBUF, (char *)&maxsend, sizeof(maxsend))!=0) {
  2361. LOGERR2(ERRNO(),1,"setsockopt(SO_SNDBUF)");
  2362. }
  2363. #ifdef CHECKBUFSIZE
  2364. size32_t v;
  2365. if (getsockopt(sock, SOL_SOCKET, SO_SNDBUF, (char *)&v, sizeof(v))!=0) {
  2366. LOGERR2(ERRNO(),1,"getsockopt(SO_SNDBUF)");
  2367. }
  2368. if (v!=maxsend)
  2369. IWARNLOG("set_send_buffer_size requested %d, got %d",maxsend,v);
  2370. #endif
  2371. }
  2372. size32_t CSocket::get_receive_buffer_size()
  2373. {
  2374. size32_t max=0;
  2375. socklen_t size = sizeof(max);
  2376. getsockopt(sock, SOL_SOCKET, SO_RCVBUF, (char *) &max, &size);
  2377. return max;
  2378. }
  2379. void CSocket::set_receive_buffer_size(size32_t max)
  2380. {
  2381. if (setsockopt(sock, SOL_SOCKET, SO_RCVBUF, (char *)&max, sizeof(max))!=0) {
  2382. LOGERR2(ERRNO(),1,"setsockopt(SO_RCVBUF)");
  2383. }
  2384. #ifdef CHECKBUFSIZE
  2385. size32_t v;
  2386. if (getsockopt(sock, SOL_SOCKET, SO_RCVBUF, (char *)&v, sizeof(v))!=0) {
  2387. LOGERR2(ERRNO(),1,"getsockopt(SO_RCVBUF)");
  2388. }
  2389. if (v<max)
  2390. IWARNLOG("set_receive_buffer_size requested %d, got %d",max,v);
  2391. #endif
  2392. }
  2393. bool CSocket::join_multicast_group(SocketEndpoint &ep)
  2394. {
  2395. StringBuffer s;
  2396. ep.getIpText(s); // will improve later
  2397. MCASTREQ req(s.str());
  2398. if (setsockopt(sock, IPPROTO_IP, IP_ADD_MEMBERSHIP,(char*)&req, sizeof(req))!=0) {
  2399. return false;
  2400. }
  2401. return true;
  2402. }
  2403. bool CSocket::leave_multicast_group(SocketEndpoint &ep)
  2404. {
  2405. StringBuffer s;
  2406. ep.getIpText(s); // will improve later
  2407. MCASTREQ req(s.str());
  2408. if (setsockopt(sock, IPPROTO_IP, IP_DROP_MEMBERSHIP,(char*)&req, sizeof(req))!=0) {
  2409. return false;
  2410. }
  2411. return true;
  2412. }
  2413. void CSocket::set_ttl(unsigned _ttl)
  2414. {
  2415. if (_ttl)
  2416. {
  2417. u_char ttl = _ttl;
  2418. setsockopt(sock, IPPROTO_IP, IP_MULTICAST_TTL, (char *)&ttl, sizeof(ttl));
  2419. }
  2420. #ifdef SOCKTRACE
  2421. int ttl0 = 0;
  2422. socklen_t ttl1 = sizeof(ttl0);
  2423. getsockopt(sock, IPPROTO_IP, IP_MULTICAST_TTL, &ttl0, &ttl1);
  2424. DBGLOG("set_ttl: socket fd: %d requested ttl: %d actual ttl: %d", sock, _ttl, ttl0);
  2425. #endif
  2426. return;
  2427. }
  2428. void CSocket::logConnectionInfo(unsigned timeoutms, unsigned conn_mstime)
  2429. {
  2430. PROGLOG("SOCKTRACE: connect(%u) - time:%u ms fd:%d l:%d r:%s:%d", timeoutms, conn_mstime, sock, localPort, (hostname?hostname:"NULL"), hostport);
  2431. // PrintStackReport();
  2432. }
  2433. bool CSocket::isSecure() const
  2434. {
  2435. return false;
  2436. }
  2437. CSocket::~CSocket()
  2438. {
  2439. if (owned)
  2440. {
  2441. try
  2442. {
  2443. close();
  2444. }
  2445. catch (IException *e)
  2446. {
  2447. EXCLOG(e,"~CSocket close()");
  2448. e->Release();
  2449. }
  2450. }
  2451. free(hostname);
  2452. hostname = NULL;
  2453. #ifdef _TRACE
  2454. free(tracename);
  2455. tracename = NULL;
  2456. #endif
  2457. delete mcastreq;
  2458. }
  2459. CSocket::CSocket(const SocketEndpoint &ep,SOCKETMODE smode,const char *name)
  2460. {
  2461. state = ss_close;
  2462. nonblocking = false;
  2463. #ifdef USERECVSEM
  2464. receiveblocksemowned = false;
  2465. #endif
  2466. nagling = true; // until turned off
  2467. hostport = ep.port;
  2468. localPort = 0;
  2469. hostname = NULL;
  2470. mcastreq = NULL;
  2471. #ifdef _TRACE
  2472. tracename = NULL;
  2473. #endif
  2474. StringBuffer tmp;
  2475. if ((smode==sm_multicast_server)&&(name&&*name)) {
  2476. mcastreq = new MCASTREQ(name);
  2477. }
  2478. else {
  2479. if (!name&&!ep.isNull())
  2480. name = ep.getIpText(tmp).str();
  2481. hostname = name?strdup(name):NULL;
  2482. }
  2483. sock = INVALID_SOCKET;
  2484. sockmode = smode;
  2485. owned = true;
  2486. nextblocksize = 0;
  2487. in_accept = false;
  2488. accept_cancel_state = accept_not_cancelled;
  2489. #ifdef _TRACE
  2490. if (name)
  2491. setTraceName("T>", name);
  2492. else
  2493. {
  2494. StringBuffer hostname;
  2495. SocketEndpoint self;
  2496. self.setLocalHost(0);
  2497. self.getUrlStr(hostname);
  2498. setTraceName("S>", hostname.str());
  2499. }
  2500. #endif
  2501. }
  2502. CSocket::CSocket(T_SOCKET new_sock,SOCKETMODE smode,bool _owned)
  2503. {
  2504. nonblocking = false;
  2505. #ifdef USERECVSEM
  2506. receiveblocksemowned = false;
  2507. #endif
  2508. nagling = true; // until turned off
  2509. sock = new_sock;
  2510. if (new_sock!=INVALID_SOCKET)
  2511. STATS.activesockets++;
  2512. mcastreq = NULL;
  2513. #ifdef _TRACE
  2514. tracename = NULL;
  2515. #endif
  2516. state = ss_open;
  2517. sockmode = smode;
  2518. owned = _owned;
  2519. nextblocksize = 0;
  2520. in_accept = false;
  2521. accept_cancel_state = accept_not_cancelled;
  2522. set_nagle(false);
  2523. //set_linger(DEFAULT_LINGER_TIME); -- experiment with removing this as closesocket should still endevour to send outstanding data
  2524. char peer[256];
  2525. hostport = peer_name(peer,sizeof(peer));
  2526. hostname = strdup(peer);
  2527. SocketEndpoint ep;
  2528. localPort = getEndpoint(ep).port;
  2529. #ifdef _TRACE
  2530. setTraceName("A!", peer);
  2531. #endif
  2532. }
  2533. ISocket* ISocket::create(unsigned short p,int listen_queue_size)
  2534. {
  2535. if (p==0)
  2536. THROWJSOCKEXCEPTION2(JSOCKERR_bad_address);
  2537. SocketEndpoint ep;
  2538. ep.port = p;
  2539. Owned<CSocket> sock = new CSocket(ep,sm_tcp_server,NULL);
  2540. sock->open(listen_queue_size);
  2541. return sock.getClear();
  2542. }
  2543. ISocket* ISocket::create_ip(unsigned short p,const char *host,int listen_queue_size)
  2544. {
  2545. if (p==0)
  2546. THROWJSOCKEXCEPTION2(JSOCKERR_bad_address);
  2547. SocketEndpoint ep(host,p);
  2548. Owned<CSocket> sock = new CSocket(ep,sm_tcp_server,host);
  2549. sock->open(listen_queue_size);
  2550. return sock.getClear();
  2551. }
  2552. ISocket* ISocket::udp_create(unsigned short p)
  2553. {
  2554. SocketEndpoint ep;
  2555. ep.port=p;
  2556. Owned<CSocket> sock = new CSocket(ep,(p==0)?sm_udp:sm_udp_server,NULL);
  2557. sock->open(0);
  2558. return sock.getClear();
  2559. }
  2560. ISocket* ISocket::multicast_create(unsigned short p, const char *mcip, unsigned _ttl)
  2561. {
  2562. if (p==0)
  2563. THROWJSOCKEXCEPTION2(JSOCKERR_bad_address);
  2564. SocketEndpoint ep(mcip,p);
  2565. Owned<CSocket> sock = new CSocket(ep,sm_multicast_server,mcip);
  2566. sock->open(0,true);
  2567. if (_ttl)
  2568. sock->set_ttl(_ttl);
  2569. return sock.getClear();
  2570. }
  2571. ISocket* ISocket::multicast_create(unsigned short p, const IpAddress &ip, unsigned _ttl)
  2572. {
  2573. if (p==0)
  2574. THROWJSOCKEXCEPTION2(JSOCKERR_bad_address);
  2575. SocketEndpoint ep(p, ip);
  2576. StringBuffer tmp;
  2577. Owned<CSocket> sock = new CSocket(ep,sm_multicast_server,ip.getIpText(tmp).str());
  2578. sock->open(0,true);
  2579. if (_ttl)
  2580. sock->set_ttl(_ttl);
  2581. return sock.getClear();
  2582. }
  2583. ISocket* ISocket::udp_connect(unsigned short p, char const* name)
  2584. {
  2585. if (!name||!*name||(p==0))
  2586. THROWJSOCKEXCEPTION2(JSOCKERR_bad_address);
  2587. SocketEndpoint ep(name, p);
  2588. Owned<CSocket> sock = new CSocket(ep,sm_udp,name);
  2589. sock->udpconnect();
  2590. return sock.getClear();
  2591. }
  2592. ISocket* ISocket::udp_connect(const SocketEndpoint &ep)
  2593. {
  2594. Owned<CSocket> sock = new CSocket(ep,sm_udp,NULL);
  2595. sock->udpconnect();
  2596. return sock.getClear();
  2597. }
  2598. ISocket* ISocket::multicast_connect(unsigned short p, char const* mcip, unsigned _ttl)
  2599. {
  2600. if (p==0)
  2601. THROWJSOCKEXCEPTION2(JSOCKERR_bad_address);
  2602. SocketEndpoint ep(mcip,p);
  2603. return multicast_connect(ep, _ttl);
  2604. }
  2605. ISocket* ISocket::multicast_connect(const SocketEndpoint &ep, unsigned _ttl)
  2606. {
  2607. Owned<CSocket> sock = new CSocket(ep,sm_multicast,NULL);
  2608. sock->udpconnect();
  2609. if (_ttl)
  2610. sock->set_ttl(_ttl);
  2611. return sock.getClear();
  2612. }
  2613. ISocket* ISocket::attach(int s, bool tcpip)
  2614. {
  2615. CSocket* sock = new CSocket((SOCKET)s, tcpip?sm_tcp:sm_udp, false);
  2616. return sock;
  2617. }
  2618. bool isInterfaceIp(const IpAddress &ip, const char *ifname)
  2619. {
  2620. #ifdef _WIN32
  2621. return false;
  2622. #else
  2623. int fd = socket(AF_INET, SOCK_DGRAM, 0); // IPV6 TBD
  2624. if (fd<0)
  2625. return false;
  2626. MemoryAttr ma;
  2627. char *buf = (char *)ma.allocate(1024);
  2628. struct ifconf ifc;
  2629. ifc.ifc_len = 1024;
  2630. ifc.ifc_buf = buf;
  2631. if(ioctl(fd, SIOCGIFCONF, &ifc) < 0) // query interfaces
  2632. {
  2633. close(fd);
  2634. return false;
  2635. }
  2636. bool match = false;
  2637. struct ifreq *ifr = ifc.ifc_req;
  2638. int len = 0;
  2639. for (int i=0; i<ifc.ifc_len;
  2640. #ifdef __APPLE__
  2641. len = _SIZEOF_ADDR_IFREQ(*ifr),
  2642. #else
  2643. len = sizeof(struct ifreq),
  2644. #endif
  2645. ifr = (struct ifreq *) ((char *) ifr + len),
  2646. i += len
  2647. )
  2648. {
  2649. struct ifreq *item = ifr;
  2650. if (ifname&&*ifname)
  2651. if (!WildMatch(item->ifr_name,ifname))
  2652. continue;
  2653. IpAddress iptest((inet_ntoa(((struct sockaddr_in *)&item->ifr_addr)->sin_addr)));
  2654. if (ip.ipequals(iptest))
  2655. {
  2656. match = true;
  2657. break;
  2658. }
  2659. }
  2660. close(fd);
  2661. return match;
  2662. #endif
  2663. }
  2664. NO_SANITIZE("alignment") bool getInterfaceIp(IpAddress &ip,const char *ifname)
  2665. {
  2666. #if defined(_WIN32)
  2667. return false;
  2668. #else
  2669. static bool recursioncheck = false;
  2670. ip.ipset(NULL);
  2671. int fd = socket(AF_INET, SOCK_DGRAM, 0); // IPV6 TBD
  2672. if (fd<0)
  2673. return false;
  2674. MemoryAttr ma;
  2675. char *buf = (char *)ma.allocate(1024);
  2676. struct ifconf ifc;
  2677. ifc.ifc_len = 1024;
  2678. ifc.ifc_buf = buf;
  2679. if(ioctl(fd, SIOCGIFCONF, &ifc) < 0) // query interfaces
  2680. {
  2681. close(fd);
  2682. return false;
  2683. }
  2684. for (int loopback = 0; loopback <= 1; loopback++)
  2685. {
  2686. struct ifreq *ifr = ifc.ifc_req;
  2687. int len = 0;
  2688. for (int i=0; i<ifc.ifc_len;
  2689. #ifdef __APPLE__
  2690. len = _SIZEOF_ADDR_IFREQ(*ifr),
  2691. #else
  2692. len = sizeof(struct ifreq),
  2693. #endif
  2694. ifr = (struct ifreq *) ((char *) ifr + len),
  2695. i += len
  2696. )
  2697. {
  2698. bool useLoopback = (loopback==1);
  2699. struct ifreq *item = ifr;
  2700. if (ifname && *ifname)
  2701. if (!WildMatch(item->ifr_name,ifname))
  2702. continue;
  2703. #ifdef __APPLE__
  2704. if (item->ifr_addr.sa_family != AF_INET) // ipv6 support TBD
  2705. continue;
  2706. char host[128];
  2707. getnameinfo(&item->ifr_addr, sizeof(item->ifr_addr), host, sizeof(host), 0, 0, NI_NUMERICHOST);
  2708. IpAddress iptest(host);
  2709. #else
  2710. IpAddress iptest((inet_ntoa(((struct sockaddr_in *)&item->ifr_addr)->sin_addr)));
  2711. #endif
  2712. struct ifreq flags;
  2713. memset(&flags, 0, sizeof(flags));
  2714. strcpy(flags.ifr_name, item->ifr_name);
  2715. if (ioctl(fd, SIOCGIFFLAGS, &flags) < 0)
  2716. {
  2717. if (!recursioncheck) {
  2718. recursioncheck = true;
  2719. IERRLOG("Error retrieving interface flags for interface %s", item->ifr_name);
  2720. recursioncheck = false;
  2721. }
  2722. continue;
  2723. }
  2724. bool isLoopback = iptest.isLoopBack() || ((flags.ifr_flags & IFF_LOOPBACK) != 0);
  2725. bool isUp = (flags.ifr_flags & IFF_UP) != 0;
  2726. if ((isLoopback==useLoopback) && isUp)
  2727. {
  2728. if (ip.isNull())
  2729. ip.ipset(iptest);
  2730. else if (!PreferredSubnet.isNull()&&!PreferredSubnet.test(ip)&&PreferredSubnet.test(iptest))
  2731. ip.ipset(iptest);
  2732. }
  2733. }
  2734. if (!ip.isNull())
  2735. break;
  2736. }
  2737. close(fd);
  2738. return !ip.isNull();
  2739. #endif
  2740. }
  2741. static StringAttr cachehostname;
  2742. static IpAddress cachehostip;
  2743. static IpAddress localhostip;
  2744. static CriticalSection hostnamesect;
  2745. const char * GetCachedHostName()
  2746. {
  2747. CriticalBlock c(hostnamesect);
  2748. if (!cachehostname.get())
  2749. {
  2750. #ifndef _WIN32
  2751. IpAddress ip;
  2752. const char *ifs = queryEnvironmentConf().queryProp("interface");
  2753. if (getInterfaceIp(ip, ifs))
  2754. {
  2755. StringBuffer ips;
  2756. ip.getIpText(ips);
  2757. if (ips.length())
  2758. {
  2759. cachehostname.set(ips.str());
  2760. cachehostip.ipset(ip);
  2761. return cachehostname.get();
  2762. }
  2763. }
  2764. #endif
  2765. char temp[1024];
  2766. if (gethostname(temp, sizeof(temp))==0)
  2767. cachehostname.set(temp);
  2768. else
  2769. cachehostname.set("localhost"); // assume no NIC card
  2770. }
  2771. return cachehostname.get();
  2772. }
  2773. IpAddress & queryLocalIP()
  2774. {
  2775. CriticalBlock c(hostnamesect);
  2776. if (localhostip.isNull())
  2777. {
  2778. if (IP6preferred)
  2779. localhostip.ipset("::1"); //IPv6
  2780. else
  2781. localhostip.ipset("127.0.0.1"); //IPv4
  2782. }
  2783. return localhostip;
  2784. }
  2785. IpAddress & queryHostIP()
  2786. {
  2787. CriticalBlock c(hostnamesect);
  2788. if (cachehostip.isNull()) {
  2789. if (!cachehostip.ipset(GetCachedHostName())) {
  2790. cachehostip.ipset(queryLocalIP());
  2791. // printf("hostname %s not resolved, using localhost\n",GetCachedHostName()); // don't use jlog in case recursive
  2792. }
  2793. }
  2794. return cachehostip;
  2795. }
  2796. IpAddress &GetHostIp(IpAddress &ip)
  2797. {
  2798. ip.ipset(queryHostIP());
  2799. return ip;
  2800. }
  2801. IpAddress &localHostToNIC(IpAddress &ip)
  2802. {
  2803. if (ip.isLoopBack())
  2804. GetHostIp(ip);
  2805. return ip;
  2806. }
  2807. // IpAddress
  2808. bool getInterfaceName(StringBuffer &ifname)
  2809. {
  2810. #if defined(_WIN32)
  2811. return false;
  2812. #else
  2813. IpAddress myIp;
  2814. GetHostIp(myIp);
  2815. int fd = socket(AF_INET, SOCK_DGRAM, 0); // IPV6 TBD
  2816. if (fd<0)
  2817. return false;
  2818. MemoryAttr ma;
  2819. char *buf = (char *)ma.allocate(1024);
  2820. struct ifconf ifc;
  2821. ifc.ifc_len = 1024;
  2822. ifc.ifc_buf = buf;
  2823. if(ioctl(fd, SIOCGIFCONF, &ifc) < 0) // query interfaces
  2824. {
  2825. close(fd);
  2826. return false;
  2827. }
  2828. struct ifreq *ifr = ifc.ifc_req;
  2829. int len = 0;
  2830. for (int i=0; i<ifc.ifc_len;
  2831. #ifdef __APPLE__
  2832. len = _SIZEOF_ADDR_IFREQ(*ifr),
  2833. #else
  2834. len = sizeof(struct ifreq),
  2835. #endif
  2836. ifr = (struct ifreq *) ((char *) ifr + len),
  2837. i += len
  2838. )
  2839. {
  2840. struct ifreq *item = ifr;
  2841. IpAddress iptest((inet_ntoa(((struct sockaddr_in *)&item->ifr_addr)->sin_addr)));
  2842. if (iptest.ipequals(myIp))
  2843. {
  2844. ifname.set(item->ifr_name);
  2845. close(fd);
  2846. return true;
  2847. }
  2848. }
  2849. close(fd);
  2850. return false;
  2851. #endif
  2852. }
  2853. inline bool isIp4(const unsigned *netaddr)
  2854. {
  2855. if (IP4only)
  2856. return true;
  2857. if (netaddr[2]==0xffff0000)
  2858. return (netaddr[1]==0)&&(netaddr[0]==0);
  2859. if (netaddr[2]==0)
  2860. if ((netaddr[3]==0)&&(netaddr[0]==0)&&(netaddr[1]==0))
  2861. return true; // null address
  2862. // maybe should get loopback here
  2863. return false;
  2864. }
  2865. void IpAddress::setIP4(unsigned ip)
  2866. {
  2867. netaddr[0] = 0;
  2868. netaddr[1] = 0;
  2869. if (ip)
  2870. netaddr[2] = 0xffff0000;
  2871. else
  2872. netaddr[2] = 0;
  2873. netaddr[3] = ip;
  2874. }
  2875. unsigned IpAddress::getIP4() const
  2876. {
  2877. assertex(isIp4());
  2878. return netaddr[3];
  2879. }
  2880. bool IpAddress::isIp4() const
  2881. {
  2882. return ::isIp4(netaddr);
  2883. }
  2884. bool IpAddress::isNull() const
  2885. {
  2886. return (netaddr[3]==0)&&(IP4only||((netaddr[2]==0)&&(netaddr[1]==0)&&(netaddr[0]==0)));
  2887. }
  2888. bool IpAddress::isLoopBack() const
  2889. {
  2890. if (::isIp4(netaddr)&&((netaddr[3] & 0x000000ff)==0x000007f))
  2891. return true;
  2892. return (netaddr[3]==0x1000000)&&(netaddr[2]==0)&&(netaddr[1]==0)&&(netaddr[0]==0);
  2893. }
  2894. bool IpAddress::isLocal() const
  2895. {
  2896. if (isLoopBack() || isHost())
  2897. return true;
  2898. IpAddress ip(*this);
  2899. return isInterfaceIp(ip, NULL);
  2900. }
  2901. bool IpAddress::ipequals(const IpAddress & other) const
  2902. {
  2903. // reverse compare for speed
  2904. return (other.netaddr[3]==netaddr[3])&&(IP4only||((other.netaddr[2]==netaddr[2])&&(other.netaddr[1]==netaddr[1])&&(other.netaddr[0]==netaddr[0])));
  2905. }
  2906. int IpAddress::ipcompare(const IpAddress & other) const
  2907. {
  2908. return memcmp(&netaddr, &other.netaddr, sizeof(netaddr));
  2909. }
  2910. unsigned IpAddress::iphash(unsigned prev) const
  2911. {
  2912. return hashc((const byte *)&netaddr,sizeof(netaddr),prev);
  2913. }
  2914. unsigned IpAddress::fasthash() const
  2915. {
  2916. if (IP4only || isIp4())
  2917. return netaddr[3] >> 24;
  2918. else
  2919. return iphash(0) >> 24;
  2920. }
  2921. bool IpAddress::isHost() const
  2922. {
  2923. return ipequals(queryHostIP());
  2924. }
  2925. static bool decodeNumericIP(const char *text,unsigned *netaddr)
  2926. {
  2927. if (!text)
  2928. return false;
  2929. bool isv6 = strchr(text,':')!=NULL;
  2930. StringBuffer tmp;
  2931. if ((*text=='[')&&!IP4only) {
  2932. text++;
  2933. size32_t l = strlen(text);
  2934. if ((l<=2)||(text[l-1]!=']'))
  2935. return false;
  2936. text = tmp.append(l-2,text);
  2937. }
  2938. if (!isv6&&isdigit(text[0])) {
  2939. if (_inet_pton(AF_INET, text, &netaddr[3])>0) {
  2940. netaddr[2] = netaddr[3]?0xffff0000:0; // check for NULL
  2941. netaddr[1] = 0;
  2942. netaddr[0] = 0; // special handling for loopback?
  2943. return true;
  2944. }
  2945. }
  2946. else if (isv6&&!IP4only) {
  2947. int ret = _inet_pton(AF_INET6, text, netaddr);
  2948. if (ret>=0)
  2949. return (ret>0);
  2950. int err = ERRNO();
  2951. StringBuffer tmp("_inet_pton: ");
  2952. tmp.append(text);
  2953. LOGERR(err,1,tmp.str());
  2954. }
  2955. return false;
  2956. }
  2957. static bool lookupHostAddress(const char *name,unsigned *netaddr)
  2958. {
  2959. // if IP4only or using MS V6 can only resolve IPv4 using
  2960. static bool recursioncheck = false; // needed to stop error message recursing
  2961. unsigned retry=10;
  2962. #if defined(__linux__) || defined (__APPLE__) || defined(getaddrinfo)
  2963. if (IP4only) {
  2964. #else
  2965. {
  2966. #endif
  2967. CriticalBlock c(hostnamesect);
  2968. hostent * entry = gethostbyname(name);
  2969. while (entry==NULL) {
  2970. if (retry--==0) {
  2971. if (!recursioncheck) {
  2972. recursioncheck = true;
  2973. LogErr(h_errno,1,"gethostbyname failed",__LINE__,name);
  2974. recursioncheck = false;
  2975. }
  2976. return false;
  2977. }
  2978. {
  2979. CriticalUnblock ub(hostnamesect);
  2980. Sleep((10-retry)*100);
  2981. }
  2982. entry = gethostbyname(name);
  2983. }
  2984. if (entry->h_addr_list[0]) {
  2985. unsigned ptr = 0;
  2986. if (!PreferredSubnet.isNull()) {
  2987. for (;;) {
  2988. ptr++;
  2989. if (entry->h_addr_list[ptr]==NULL) {
  2990. ptr = 0;
  2991. break;
  2992. }
  2993. IpAddress ip;
  2994. ip.setNetAddress(sizeof(unsigned),entry->h_addr_list[ptr]);
  2995. if (PreferredSubnet.test(ip))
  2996. break;
  2997. }
  2998. }
  2999. memcpy(&netaddr[3], entry->h_addr_list[ptr], sizeof(netaddr[3]));
  3000. netaddr[2] = 0xffff0000;
  3001. netaddr[1] = 0;
  3002. netaddr[0] = 0;
  3003. return true;
  3004. }
  3005. return false;
  3006. }
  3007. #if defined(__linux__) || defined (__APPLE__) || defined(getaddrinfo)
  3008. struct addrinfo hints;
  3009. memset(&hints,0,sizeof(hints));
  3010. struct addrinfo *addrInfo = NULL;
  3011. for (;;) {
  3012. memset(&hints,0,sizeof(hints));
  3013. int ret = getaddrinfo(name, NULL , &hints, &addrInfo);
  3014. if (!ret)
  3015. break;
  3016. if (retry--==0) {
  3017. if (!recursioncheck) {
  3018. recursioncheck = true;
  3019. LogErr(ret,1,"getaddrinfo failed",__LINE__,name);
  3020. #ifdef _DEBUG
  3021. PrintStackReport();
  3022. #endif
  3023. recursioncheck = false;
  3024. }
  3025. return false;
  3026. }
  3027. Sleep((10-retry)*100);
  3028. }
  3029. struct addrinfo *best = NULL;
  3030. bool snm = !PreferredSubnet.isNull();
  3031. for (;;) {
  3032. struct addrinfo *ai;
  3033. for (ai = addrInfo; ai; ai = ai->ai_next) {
  3034. // printf("flags=%d, family=%d, socktype=%d, protocol=%d, addrlen=%d, canonname=%s\n",ai->ai_flags,ai->ai_family,ai->ai_socktype,ai->ai_protocol,ai->ai_addrlen,ai->ai_canonname?ai->ai_canonname:"NULL");
  3035. switch (ai->ai_family) {
  3036. case AF_INET: {
  3037. if (snm) {
  3038. IpAddress ip;
  3039. ip.setNetAddress(sizeof(in_addr),&(((sockaddr_in *)ai->ai_addr)->sin_addr));
  3040. if (!PreferredSubnet.test(ip))
  3041. continue;
  3042. }
  3043. if ((best==NULL)||((best->ai_family==AF_INET6)&&!IP6preferred))
  3044. best = ai;
  3045. break;
  3046. }
  3047. case AF_INET6: {
  3048. if (snm) {
  3049. IpAddress ip;
  3050. ip.setNetAddress(sizeof(in_addr6),&(((sockaddr_in6 *)ai->ai_addr)->sin6_addr));
  3051. if (!PreferredSubnet.test(ip))
  3052. continue;
  3053. }
  3054. if ((best==NULL)||((best->ai_family==AF_INET)&&IP6preferred))
  3055. best = ai;
  3056. break;
  3057. }
  3058. }
  3059. }
  3060. if (best||!snm)
  3061. break;
  3062. snm = false;
  3063. }
  3064. if (best) {
  3065. if (best->ai_family==AF_INET6)
  3066. memcpy(netaddr,&(((sockaddr_in6 *)best->ai_addr)->sin6_addr),sizeof(in6_addr));
  3067. else {
  3068. memcpy(netaddr+3,&(((sockaddr_in *)best->ai_addr)->sin_addr),sizeof(in_addr));
  3069. netaddr[2] = 0xffff0000;
  3070. netaddr[1] = 0;
  3071. netaddr[0] = 0;
  3072. }
  3073. }
  3074. freeaddrinfo(addrInfo);
  3075. return best!=NULL;
  3076. #endif
  3077. return false;
  3078. }
  3079. bool IpAddress::ipset(const char *text)
  3080. {
  3081. if (text&&*text) {
  3082. if ((text[0]=='.')&&(text[1]==0)) {
  3083. ipset(queryHostIP());
  3084. return true;
  3085. }
  3086. if (decodeNumericIP(text,netaddr))
  3087. return true;
  3088. const char *s;
  3089. for (s=text;*s;s++)
  3090. if (!isdigit(*s)&&(*s!=':')&&(*s!='.'))
  3091. break;
  3092. if (!*s)
  3093. return ipset(NULL);
  3094. if (lookupHostAddress(text,netaddr))
  3095. return true;
  3096. }
  3097. memset(&netaddr,0,sizeof(netaddr));
  3098. return false;
  3099. }
  3100. inline char * addbyte(char *s,byte b)
  3101. {
  3102. if (b>=100) {
  3103. *(s++) = b/100+'0';
  3104. b %= 100;
  3105. *(s++) = b/10+'0';
  3106. b %= 10;
  3107. }
  3108. else if (b>=10) {
  3109. *(s++) = b/10+'0';
  3110. b %= 10;
  3111. }
  3112. *(s++) = b+'0';
  3113. return s;
  3114. }
  3115. StringBuffer & IpAddress::getIpText(StringBuffer & out) const
  3116. {
  3117. if (::isIp4(netaddr)) {
  3118. const byte *ip = (const byte *)&netaddr[3];
  3119. char ips[16];
  3120. char *s = ips;
  3121. for (unsigned i=0;i<4;i++) {
  3122. if (i)
  3123. *(s++) = '.';
  3124. s = addbyte(s,ip[i]);
  3125. }
  3126. return out.append(s-ips,ips);
  3127. }
  3128. char tmp[INET6_ADDRSTRLEN];
  3129. const char *res = _inet_ntop(AF_INET6, &netaddr, tmp, sizeof(tmp));
  3130. if (!res)
  3131. throw makeOsException(errno);
  3132. return out.append(res);
  3133. }
  3134. void IpAddress::ipserialize(MemoryBuffer & out) const
  3135. {
  3136. if (((netaddr[2]==0xffff0000)||(netaddr[2]==0))&&(netaddr[1]==0)&&(netaddr[0]==0)) {
  3137. if (netaddr[3]==IPV6_SERIALIZE_PREFIX)
  3138. throw MakeStringException(-1,"Invalid network address"); // hack prevention
  3139. out.append(sizeof(netaddr[3]), &netaddr[3]);
  3140. }
  3141. else {
  3142. unsigned pfx = IPV6_SERIALIZE_PREFIX;
  3143. out.append(sizeof(pfx),&pfx).append(sizeof(netaddr),&netaddr);
  3144. }
  3145. }
  3146. void IpAddress::ipdeserialize(MemoryBuffer & in)
  3147. {
  3148. unsigned pfx;
  3149. in.read(sizeof(pfx),&pfx);
  3150. if (pfx!=IPV6_SERIALIZE_PREFIX) {
  3151. netaddr[0] = 0;
  3152. netaddr[1] = 0;
  3153. netaddr[2] = (pfx == 0 || pfx == 0x1000000) ? 0 : 0xffff0000; // catch null and loopback
  3154. netaddr[3] = pfx;
  3155. }
  3156. else
  3157. in.read(sizeof(netaddr),&netaddr);
  3158. }
  3159. unsigned IpAddress::ipdistance(const IpAddress &other,unsigned offset) const
  3160. {
  3161. if (offset>3)
  3162. offset = 3;
  3163. int i1;
  3164. _cpyrev4(&i1,&netaddr[3-offset]);
  3165. int i2;
  3166. _cpyrev4(&i2,&other.netaddr[3-offset]);
  3167. i1-=i2;
  3168. if (i1>0)
  3169. return i1;
  3170. return -i1;
  3171. }
  3172. bool IpAddress::ipincrement(unsigned count,byte minoctet,byte maxoctet,unsigned short minipv6piece,unsigned maxipv6piece)
  3173. {
  3174. unsigned base;
  3175. if (::isIp4(netaddr)) {
  3176. base = maxoctet-minoctet+1;
  3177. if (!base||(base>256))
  3178. return false;
  3179. byte * ips = (byte *)&netaddr[3];
  3180. byte * ip = ips+4;
  3181. while (count) {
  3182. if (ip==ips)
  3183. return false; // overflow
  3184. ip--;
  3185. unsigned v = (count+((*ip>minoctet)?(*ip-minoctet):0));
  3186. *ip = minoctet + v%base;
  3187. count = v/base;
  3188. }
  3189. }
  3190. else {
  3191. base = maxipv6piece-minipv6piece+1;
  3192. if (!base||(base>0x10000))
  3193. return false;
  3194. unsigned short * ps = (unsigned short *)&netaddr;
  3195. unsigned short * p = ps+8;
  3196. while (count) {
  3197. if (p==ps)
  3198. return false; // overflow (actually near impossible!)
  3199. p--;
  3200. unsigned v = (count+((*p>minipv6piece)?(*p-minipv6piece):0));
  3201. *p = minipv6piece + v%base;
  3202. count = v/base;
  3203. }
  3204. }
  3205. return true;
  3206. }
  3207. unsigned IpAddress::ipsetrange( const char *text) // e.g. 10.173.72.1-65 ('-' may be omitted)
  3208. {
  3209. unsigned e=0;
  3210. unsigned f=0;
  3211. const char *r = strchr(text,'-');
  3212. bool ok;
  3213. if (r) {
  3214. e = atoi(r+1);
  3215. StringBuffer tmp(r-text,text);
  3216. ok = ipset(tmp.str());
  3217. if (!::isIp4(netaddr))
  3218. IPV6_NOT_IMPLEMENTED(); // TBD IPv6
  3219. if (ok) {
  3220. while ((r!=text)&&(*(r-1)!='.'))
  3221. r--;
  3222. f = (r!=text)?atoi(r):0;
  3223. }
  3224. }
  3225. else
  3226. ok = ipset(text);
  3227. if ((f>e)||!ok)
  3228. return 0;
  3229. return e-f+1;
  3230. }
  3231. NO_SANITIZE("alignment") size32_t IpAddress::getNetAddress(size32_t maxsz,void *dst) const
  3232. {
  3233. if (maxsz==sizeof(unsigned)) {
  3234. if (::isIp4(netaddr)) {
  3235. *(unsigned *)dst = netaddr[3];
  3236. return maxsz;
  3237. }
  3238. }
  3239. else if (!IP4only&&(maxsz==sizeof(netaddr))) {
  3240. memcpy(dst,&netaddr,maxsz);
  3241. return maxsz;
  3242. }
  3243. return 0;
  3244. }
  3245. NO_SANITIZE("alignment") void IpAddress::setNetAddress(size32_t sz,const void *src)
  3246. {
  3247. if (sz==sizeof(unsigned)) { // IPv4
  3248. netaddr[0] = 0;
  3249. netaddr[1] = 0;
  3250. netaddr[3] = *(const unsigned *)src;
  3251. netaddr[2] = netaddr[3] ? 0xffff0000 : 0; // leave as null if Ipv4 address is null
  3252. }
  3253. else if (!IP4only&&(sz==sizeof(netaddr))) { // IPv6
  3254. memcpy(&netaddr,src,sz);
  3255. if ((netaddr[2]==0)&&(netaddr[3]!=0)&&(netaddr[3]!=0x1000000)&&(netaddr[0]==0)&&(netaddr[1]==0))
  3256. netaddr[2]=0xffff0000; // use this form only
  3257. }
  3258. else
  3259. memset(&netaddr,0,sizeof(netaddr));
  3260. }
  3261. void SocketEndpoint::deserialize(MemoryBuffer & in)
  3262. {
  3263. ipdeserialize(in);
  3264. in.read(port);
  3265. }
  3266. void SocketEndpoint::serialize(MemoryBuffer & out) const
  3267. {
  3268. ipserialize(out);
  3269. out.append(port);
  3270. }
  3271. bool SocketEndpoint::set(const char *name,unsigned short _port)
  3272. {
  3273. if (name) {
  3274. if (*name=='[') {
  3275. const char *s = name+1;
  3276. const char *t = strchr(s,']');
  3277. if (t) {
  3278. StringBuffer tmp(t-s,s);
  3279. if (t[1]==':')
  3280. _port = atoi(t+2);
  3281. return set(tmp.str(),_port);
  3282. }
  3283. }
  3284. const char * colon = strchr(name, ':');
  3285. if (colon) {
  3286. if (!IP4only&&strchr(colon+1, ':'))
  3287. colon = NULL; // hello its IpV6
  3288. }
  3289. else
  3290. colon = strchr(name, '|'); // strange hole convention
  3291. char ips[260];
  3292. if (colon) {
  3293. size32_t l = colon-name;
  3294. if (l>=sizeof(ips))
  3295. l = sizeof(ips)-1;
  3296. memcpy(ips,name,l);
  3297. ips[l] = 0;
  3298. name = ips;
  3299. _port = atoi(colon+1);
  3300. }
  3301. if (ipset(name)) {
  3302. port = _port;
  3303. return true;
  3304. }
  3305. }
  3306. ipset(NULL);
  3307. port = 0;
  3308. return false;
  3309. }
  3310. void SocketEndpoint::getUrlStr(char * str, size32_t len) const
  3311. {
  3312. if (len==0)
  3313. return;
  3314. StringBuffer _str;
  3315. getUrlStr(_str);
  3316. size32_t l = _str.length()+1;
  3317. if (l>len)
  3318. {
  3319. l = len-1;
  3320. str[l] = 0;
  3321. }
  3322. memcpy(str,_str.str(),l);
  3323. }
  3324. StringBuffer &SocketEndpoint::getUrlStr(StringBuffer &str) const
  3325. {
  3326. getIpText(str);
  3327. if (port)
  3328. str.append(':').append((unsigned)port); // TBD IPv6 put [] on
  3329. return str;
  3330. }
  3331. unsigned SocketEndpoint::hash(unsigned prev) const
  3332. {
  3333. return hashc((const byte *)&port,sizeof(port),iphash(prev));
  3334. }
  3335. //---------------------------------------------------------------------------
  3336. SocketListCreator::SocketListCreator()
  3337. {
  3338. lastPort = 0;
  3339. }
  3340. void SocketListCreator::addSocket(const SocketEndpoint &ep)
  3341. {
  3342. StringBuffer ipstr;
  3343. ep.getIpText(ipstr);
  3344. addSocket(ipstr.str(), ep.port);
  3345. }
  3346. void SocketListCreator::addSocket(const char * ip, unsigned port)
  3347. {
  3348. if (fullText.length())
  3349. fullText.append("|");
  3350. const char * prev = lastIp;
  3351. const char * startCopy = ip;
  3352. if (prev)
  3353. {
  3354. if (strcmp(ip, prev) == 0)
  3355. {
  3356. fullText.append("=");
  3357. startCopy = NULL;
  3358. }
  3359. else
  3360. {
  3361. const char * cur = ip;
  3362. for (;;)
  3363. {
  3364. char n = *cur;
  3365. if (!n)
  3366. break;
  3367. if (n != *prev)
  3368. break;
  3369. cur++;
  3370. prev++;
  3371. if (n == '.')
  3372. startCopy = cur;
  3373. }
  3374. if (startCopy != ip)
  3375. fullText.append("*");
  3376. }
  3377. }
  3378. fullText.append(startCopy);
  3379. if (lastPort != port)
  3380. fullText.append(":").append(port);
  3381. lastIp.set(ip);
  3382. lastPort = port;
  3383. }
  3384. const char * SocketListCreator::getText()
  3385. {
  3386. return fullText.str();
  3387. }
  3388. void SocketListCreator::addSockets(SocketEndpointArray &array)
  3389. {
  3390. ForEachItemIn(i,array) {
  3391. const SocketEndpoint &sockep=array.item(i);
  3392. StringBuffer ipstr;
  3393. sockep.getIpText(ipstr);
  3394. addSocket(ipstr.str(),sockep.port);
  3395. }
  3396. }
  3397. //---------------------------------------------------------------------------
  3398. SocketListParser::SocketListParser(const char * text)
  3399. {
  3400. fullText.set(text);
  3401. cursor = NULL;
  3402. lastPort = 0;
  3403. }
  3404. void SocketListParser::first(unsigned port)
  3405. {
  3406. cursor = fullText;
  3407. lastIp.set(NULL);
  3408. lastPort = port;
  3409. }
  3410. bool SocketListParser::get(StringAttr & ip, unsigned & port, unsigned index, unsigned defport)
  3411. {
  3412. first(defport);
  3413. do
  3414. {
  3415. if (!next(ip, port))
  3416. return false;
  3417. } while (index--);
  3418. return true;
  3419. }
  3420. bool SocketListParser::next(StringAttr & ip, unsigned & port)
  3421. {
  3422. // IPV6TBD
  3423. StringBuffer ipText;
  3424. if (*cursor == 0)
  3425. return false;
  3426. if (*cursor == '=')
  3427. {
  3428. ipText.append(lastIp);
  3429. cursor++;
  3430. }
  3431. else if (*cursor == '*')
  3432. {
  3433. cursor++;
  3434. //count the number of dots in the tail
  3435. const char * cur = cursor;
  3436. unsigned count = 0;
  3437. for (;;)
  3438. {
  3439. char c = *cur++;
  3440. switch (c)
  3441. {
  3442. case 0:
  3443. case '|':
  3444. case ',':
  3445. case ':':
  3446. goto done;
  3447. case '.':
  3448. ++count;
  3449. break;
  3450. }
  3451. }
  3452. done:
  3453. //copy up to the appropriate dot from the previous ip.
  3454. const unsigned dotCount = 3; //more what about 6 digit ip's
  3455. cur = lastIp;
  3456. for (;;)
  3457. {
  3458. char c = *cur++;
  3459. switch (c)
  3460. {
  3461. case 0:
  3462. case '|':
  3463. case ',':
  3464. case ':':
  3465. assertex(!"Should not get here!");
  3466. goto done2;
  3467. case '.':
  3468. ipText.append(c);
  3469. if (++count == dotCount)
  3470. goto done2;
  3471. break;
  3472. default:
  3473. ipText.append(c);
  3474. break;
  3475. }
  3476. }
  3477. done2:;
  3478. }
  3479. bool inPort = false;
  3480. port = lastPort;
  3481. for (;;)
  3482. {
  3483. char c = *cursor++;
  3484. switch (c)
  3485. {
  3486. case 0:
  3487. cursor--;
  3488. goto doneCopy;
  3489. case '|':
  3490. case ',':
  3491. goto doneCopy;
  3492. case ':':
  3493. port = atoi(cursor);
  3494. inPort = true;
  3495. break;;
  3496. default:
  3497. if (!inPort)
  3498. ipText.append(c);
  3499. break;
  3500. }
  3501. }
  3502. doneCopy:
  3503. lastIp.set(ipText.str());
  3504. ip.set(lastIp);
  3505. lastPort = port;
  3506. return true;
  3507. }
  3508. unsigned SocketListParser::getSockets(SocketEndpointArray &array,unsigned defport)
  3509. {
  3510. first(defport);
  3511. StringAttr ip;
  3512. unsigned port;
  3513. while (next(ip,port)) {
  3514. SocketEndpoint ep(ip,port);
  3515. array.append(ep);
  3516. }
  3517. return array.ordinality();
  3518. }
  3519. jlib_decl JSocketStatistics *getSocketStatPtr()
  3520. {
  3521. return &STATS;
  3522. }
  3523. void getSocketStatistics(JSocketStatistics &stats)
  3524. {
  3525. // should put in simple lock
  3526. memcpy(&stats,&STATS,sizeof(stats));
  3527. }
  3528. void resetSocketStatistics()
  3529. {
  3530. unsigned activesockets=STATS.activesockets;
  3531. memset(&STATS,0,sizeof(STATS));
  3532. STATS.activesockets = activesockets;
  3533. }
  3534. static StringBuffer &appendtime(StringBuffer &s,unsigned us)
  3535. {
  3536. // attemp to get into more sensible units
  3537. if (us>10000000)
  3538. return s.append(us/1000000).append('s');
  3539. if (us>10000)
  3540. return s.append(us/1000).append("ms");
  3541. return s.append(us).append("us");
  3542. }
  3543. StringBuffer &getSocketStatisticsString(JSocketStatistics &stats,StringBuffer &str)
  3544. {
  3545. str.append("connects=").append(stats.connects).append('\n');
  3546. appendtime(str.append("connecttime="),stats.connecttime).append('\n');
  3547. str.append("failedconnects=").append(stats.failedconnects).append('\n');
  3548. appendtime(str.append("failedconnecttime="),stats.failedconnecttime).append('\n');
  3549. str.append("reads=").append(stats.reads).append('\n');
  3550. appendtime(str.append("readtime="),stats.readtime).append('\n');
  3551. str.append("readsize=").append(stats.readsize).append(" bytes\n");
  3552. str.append("writes=").append(stats.writes).append('\n');
  3553. appendtime(str.append("writetime="),stats.writetime).append('\n');
  3554. str.append("writesize=").append(stats.writesize).append(" bytes").append('\n');
  3555. str.append("activesockets=").append(stats.activesockets).append('\n');
  3556. str.append("numblockrecvs=").append(stats.numblockrecvs).append('\n');
  3557. str.append("numblocksends=").append(stats.numblocksends).append('\n');
  3558. str.append("blockrecvsize=").append(stats.blockrecvsize).append('\n');
  3559. str.append("blocksendsize=").append(stats.blocksendsize).append('\n');
  3560. str.append("blockrecvtime=").append(stats.blockrecvtime).append('\n');
  3561. str.append("blocksendtime=").append(stats.blocksendtime).append('\n');
  3562. str.append("longestblocksend=").append(stats.longestblocksend).append('\n');
  3563. str.append("longestblocksize=").append(stats.longestblocksize);
  3564. return str;
  3565. }
  3566. // ===============================================================================
  3567. // select thread for handling multiple selects
  3568. struct SelectItem
  3569. {
  3570. ISocket *sock;
  3571. T_SOCKET handle;
  3572. ISocketSelectNotify *nfy;
  3573. byte mode;
  3574. bool del; // only used in select handler method
  3575. bool operator == (const SelectItem & other) const { return sock == other.sock; }
  3576. };
  3577. class SelectItemArray : public StructArrayOf<SelectItem> { };
  3578. class SelectItemArrayP : public StructArrayOf<SelectItem*> { };
  3579. #define SELECT_TIMEOUT_SECS 1 // but it does (TBD)
  3580. #ifdef _WIN32
  3581. // fd_set utility functions
  3582. inline T_FD_SET *cpyfds(T_FD_SET &dst,const T_FD_SET &src)
  3583. {
  3584. unsigned i = src.fd_count;
  3585. dst.fd_count = i;
  3586. while (i--)
  3587. dst.fd_array[i] = src.fd_array[i]; // possibly better as memcpy
  3588. return &dst;
  3589. }
  3590. inline bool findfds(T_FD_SET &s,T_SOCKET h,bool &c)
  3591. {
  3592. unsigned n = s.fd_count;
  3593. unsigned i;
  3594. for(i=0;i<n;i++) {
  3595. if (s.fd_array[i] == h) {
  3596. if (--n)
  3597. s.fd_array[i] = s.fd_array[n]; // remove item
  3598. else
  3599. c = false;
  3600. s.fd_count = n;
  3601. return true;
  3602. }
  3603. }
  3604. return false;
  3605. }
  3606. inline T_SOCKET popfds(T_FD_SET &s)
  3607. {
  3608. unsigned n = s.fd_count;
  3609. T_SOCKET ret;
  3610. if (n) {
  3611. ret = s.fd_array[--n];
  3612. s.fd_count = n;
  3613. }
  3614. else
  3615. ret = NULL;
  3616. return ret;
  3617. }
  3618. #else
  3619. #define _USE_PIPE_FOR_SELECT_TRIGGER
  3620. // not as optimized as windows but I am expecting to convert to using poll anyway
  3621. inline T_FD_SET *cpyfds(T_FD_SET &dst,const T_FD_SET &src)
  3622. {
  3623. memcpy(&dst,&src,sizeof(T_FD_SET));
  3624. return &dst;
  3625. }
  3626. inline bool findfds(T_FD_SET &s,T_SOCKET h,bool &c)
  3627. {
  3628. if ((unsigned)h>=XFD_SETSIZE)
  3629. return false;
  3630. return FD_ISSET(h,&s); // does not remove entry or set termination flag when done
  3631. }
  3632. #endif
  3633. class CSocketBaseThread: public Thread
  3634. {
  3635. protected:
  3636. bool terminating;
  3637. CriticalSection sect;
  3638. Semaphore ticksem;
  3639. std::atomic_uint tickwait;
  3640. unsigned offset;
  3641. bool selectvarschange;
  3642. unsigned waitingchange;
  3643. Semaphore waitingchangesem;
  3644. int validateselecterror;
  3645. unsigned validateerrcount;
  3646. const char *selecttrace;
  3647. unsigned basesize;
  3648. #ifdef _USE_PIPE_FOR_SELECT_TRIGGER
  3649. T_SOCKET dummysock[2];
  3650. #else
  3651. T_SOCKET dummysock;
  3652. #endif
  3653. bool dummysockopen;
  3654. CSocketBaseThread(const char *trc) : Thread("CSocketBaseThread"), tickwait(0)
  3655. {
  3656. }
  3657. ~CSocketBaseThread()
  3658. {
  3659. }
  3660. public:
  3661. void triggerselect()
  3662. {
  3663. if (tickwait)
  3664. ticksem.signal();
  3665. #ifdef _USE_PIPE_FOR_SELECT_TRIGGER
  3666. CriticalBlock block(sect);
  3667. char c = 0;
  3668. if(write(dummysock[1], &c, 1) != 1) {
  3669. int err = ERRNO();
  3670. LOGERR(err,1,"Socket closed during trigger select");
  3671. }
  3672. #else
  3673. closedummy();
  3674. #endif
  3675. }
  3676. void resettrigger()
  3677. {
  3678. #ifdef _USE_PIPE_FOR_SELECT_TRIGGER
  3679. CriticalBlock block(sect);
  3680. char c;
  3681. while((::read(dummysock[0], &c, sizeof(c))) == sizeof(c));
  3682. #endif
  3683. }
  3684. void stop(bool wait)
  3685. {
  3686. terminating = true;
  3687. triggerselect();
  3688. if (wait)
  3689. join();
  3690. }
  3691. bool sockOk(T_SOCKET sock)
  3692. {
  3693. #ifdef _DEBUG
  3694. PROGLOG("CSocketBaseThread: sockOk testing %d",sock);
  3695. #endif
  3696. int t=0;
  3697. socklen_t tl = sizeof(t);
  3698. if (getsockopt(sock, SOL_SOCKET, SO_TYPE, (char *)&t, &tl)!=0) {
  3699. StringBuffer sockstr;
  3700. const char *tracename = sockstr.append((unsigned)sock).str();
  3701. LOGERR2(ERRNO(),1,"CSocketBaseThread select handle");
  3702. return false;
  3703. }
  3704. #ifdef _USE_SELECT
  3705. T_FD_SET fds;
  3706. struct timeval tv;
  3707. CHECKSOCKRANGE(sock);
  3708. XFD_ZERO(&fds);
  3709. FD_SET((unsigned)sock, &fds);
  3710. //FD_SET((unsigned)sock, &except);
  3711. tv.tv_sec = 0;
  3712. tv.tv_usec = 0;
  3713. int rc = ::select( sock + 1, NULL, (fd_set *)&fds, NULL, &tv );
  3714. if (rc<0) {
  3715. StringBuffer sockstr;
  3716. const char *tracename = sockstr.append((unsigned)sock).str();
  3717. LOGERR2(ERRNO(),2,"CSocketBaseThread select handle");
  3718. return false;
  3719. }
  3720. # ifdef _DEBUG
  3721. else if (rc>0)
  3722. PROGLOG("CSocketBaseThread: select handle %d selected(2) %d",sock,rc);
  3723. # endif
  3724. XFD_ZERO(&fds);
  3725. FD_SET((unsigned)sock, &fds);
  3726. tv.tv_sec = 0;
  3727. tv.tv_usec = 0;
  3728. rc = ::select( sock + 1, (fd_set *)&fds, NULL, NULL, &tv );
  3729. if (rc<0) {
  3730. StringBuffer sockstr;
  3731. const char *tracename = sockstr.append((unsigned)sock).str();
  3732. LOGERR2(ERRNO(),3,"CSocketBaseThread select handle");
  3733. return false;
  3734. }
  3735. # ifdef _DEBUG
  3736. else if (rc>0)
  3737. PROGLOG("CSocketBaseThread: select handle %d selected(2) %d",sock,rc);
  3738. # endif
  3739. return true;
  3740. #else
  3741. struct pollfd fds[1];
  3742. fds[0].fd = sock;
  3743. fds[0].events = (POLLINX | POLLOUT);
  3744. fds[0].revents = 0;
  3745. int rc = ::poll(fds, 1, 0);
  3746. if (rc==0)
  3747. return true;
  3748. else if (rc>0)
  3749. {
  3750. if ( !(fds[0].revents & POLLNVAL) ) // MCK - skip POLLERR here
  3751. {
  3752. // PROGLOG("CSocketBaseThread: poll handle %d selected(2) %d",sock,rc);
  3753. return true;
  3754. }
  3755. }
  3756. else
  3757. {
  3758. int err = ERRNO();
  3759. if (err == JSE_INTR)
  3760. return true; // assume ok until next time called
  3761. }
  3762. StringBuffer sockstr;
  3763. const char *tracename = sockstr.append((unsigned)sock).str();
  3764. LOGERR2(ERRNO(),3,"CSocketBaseThread poll handle");
  3765. return false;
  3766. #endif
  3767. }
  3768. virtual void closedummy() = 0;
  3769. };
  3770. class CSocketSelectThread: public CSocketBaseThread
  3771. {
  3772. SelectItemArray items;
  3773. void opendummy()
  3774. {
  3775. CriticalBlock block(sect);
  3776. if (!dummysockopen) {
  3777. #ifdef _USE_PIPE_FOR_SELECT_TRIGGER
  3778. if(pipe(dummysock)) {
  3779. IWARNLOG("CSocketSelectThread: create pipe failed %d",ERRNO());
  3780. return;
  3781. }
  3782. for (unsigned i=0;i<2;i++) {
  3783. int flags = fcntl(dummysock[i], F_GETFL, 0);
  3784. if (flags!=-1) {
  3785. flags |= O_NONBLOCK;
  3786. fcntl(dummysock[i], F_SETFL, flags);
  3787. }
  3788. flags = fcntl(dummysock[i], F_GETFD, 0);
  3789. if (flags!=-1) {
  3790. flags |= FD_CLOEXEC;
  3791. fcntl(dummysock[i], F_SETFD, flags);
  3792. }
  3793. }
  3794. CHECKSOCKRANGE(dummysock[0]);
  3795. #else
  3796. if (IP6preferred)
  3797. dummysock = ::socket(AF_INET6, SOCK_STREAM, PF_INET6);
  3798. else
  3799. dummysock = ::socket(AF_INET, SOCK_STREAM, 0);
  3800. CHECKSOCKRANGE(dummysock);
  3801. #endif
  3802. dummysockopen = true;
  3803. }
  3804. }
  3805. void closedummy()
  3806. {
  3807. CriticalBlock block(sect);
  3808. if (dummysockopen) {
  3809. #ifdef _USE_PIPE_FOR_SELECT_TRIGGER
  3810. #ifdef SOCKTRACE
  3811. PROGLOG("SOCKTRACE: Closing dummy sockets %x %d %x %d (%p)", dummysock[0], dummysock[0], dummysock[1], dummysock[1], this);
  3812. #endif
  3813. ::close(dummysock[0]);
  3814. ::close(dummysock[1]);
  3815. #else
  3816. #ifdef _WIN32
  3817. ::closesocket(dummysock);
  3818. #else
  3819. ::close(dummysock);
  3820. #endif
  3821. #endif
  3822. dummysockopen = false;
  3823. }
  3824. }
  3825. #ifdef _WIN32
  3826. #define HASHTABSIZE 256
  3827. #define HASHNULL (HASHTABSIZE-1)
  3828. #define HASHTABMASK (HASHTABSIZE-1)
  3829. byte hashtab[HASHTABSIZE];
  3830. #define HASHSOCKET(s) ((((unsigned)s)>>2)&HASHTABMASK) // with some knowledge of windows handles
  3831. void inithash()
  3832. {
  3833. memset(&hashtab,HASHNULL,sizeof(hashtab));
  3834. assertex(FD_SETSIZE<255);
  3835. }
  3836. void reinithash()
  3837. { // done this way because index of items changes and hash table not that big
  3838. inithash();
  3839. assertex(items.ordinality()<HASHTABSIZE-1);
  3840. ForEachItemIn(i,items) {
  3841. unsigned h = HASHSOCKET(items.item(i).handle);
  3842. for (;;) {
  3843. if (hashtab[h]==HASHNULL) {
  3844. hashtab[h] = (byte)i;
  3845. break;
  3846. }
  3847. if (++h==HASHTABSIZE)
  3848. h = 0;
  3849. }
  3850. }
  3851. }
  3852. inline SelectItem &findhash(T_SOCKET handle)
  3853. {
  3854. unsigned h = HASHSOCKET(handle);
  3855. unsigned sh = h;
  3856. for (;;) {
  3857. SelectItem &i=items.element(hashtab[h]);
  3858. if (i.handle==handle)
  3859. return i;
  3860. if (++h==HASHTABSIZE)
  3861. h = 0;
  3862. assertex(h!=sh);
  3863. }
  3864. }
  3865. inline void processfds(T_FD_SET &s,byte mode,SelectItemArray &tonotify)
  3866. {
  3867. for (;;) {
  3868. T_SOCKET sock = popfds(s);
  3869. if (!sock)
  3870. break;
  3871. if (sock!=dummysock)
  3872. {
  3873. SelectItem &si = findhash(sock);
  3874. if (!si.del)
  3875. {
  3876. tonotify.append(si); // nb copies
  3877. SelectItem &itm = tonotify.element(tonotify.length()-1);
  3878. itm.nfy->Link();
  3879. itm.sock->Link();
  3880. itm.mode = mode;
  3881. }
  3882. }
  3883. }
  3884. }
  3885. #endif
  3886. public:
  3887. IMPLEMENT_IINTERFACE;
  3888. CSocketSelectThread(const char *trc)
  3889. : CSocketBaseThread("CSocketSelectThread")
  3890. {
  3891. dummysockopen = false;
  3892. opendummy();
  3893. terminating = false;
  3894. waitingchange = 0;
  3895. selectvarschange = false;
  3896. validateselecterror = 0;
  3897. validateerrcount = 0;
  3898. offset = 0;
  3899. selecttrace = trc;
  3900. basesize = 0;
  3901. #ifdef _WIN32
  3902. inithash();
  3903. #endif
  3904. }
  3905. ~CSocketSelectThread()
  3906. {
  3907. closedummy();
  3908. ForEachItemIn(i,items) {
  3909. // Release/dtors should not throw but leaving try/catch here until all paths checked
  3910. try {
  3911. SelectItem &si = items.element(i);
  3912. si.nfy->Release();
  3913. si.sock->Release();
  3914. }
  3915. catch (IException *e) {
  3916. EXCLOG(e,"~CSocketSelectThread");
  3917. e->Release();
  3918. }
  3919. }
  3920. }
  3921. Owned<IException> termexcept;
  3922. void updateItems()
  3923. {
  3924. // must be in CriticalBlock block(sect);
  3925. unsigned n = items.ordinality();
  3926. #ifdef _WIN32
  3927. bool hashupdateneeded = (n!=basesize); // additions all come at end
  3928. #endif
  3929. for (unsigned i=0;i<n;) {
  3930. SelectItem &si = items.element(i);
  3931. if (si.del) {
  3932. // Release/dtors should not throw but leaving try/catch here until all paths checked
  3933. try {
  3934. #ifdef SOCKTRACE
  3935. PROGLOG("CSocketSelectThread::updateItems release %d",si.handle);
  3936. #endif
  3937. si.nfy->Release();
  3938. si.sock->Release();
  3939. }
  3940. catch (IException *e) {
  3941. EXCLOG(e,"CSocketSelectThread::updateItems");
  3942. e->Release();
  3943. }
  3944. // NOTE: si is a reference, put last item into remove slot
  3945. n--;
  3946. if (i<n)
  3947. si = items.item(n);
  3948. items.remove(n);
  3949. #ifdef _WIN32
  3950. hashupdateneeded = true;
  3951. #endif
  3952. }
  3953. else
  3954. i++;
  3955. }
  3956. assertex(n<=XFD_SETSIZE-1);
  3957. #ifdef _WIN32
  3958. if (hashupdateneeded)
  3959. reinithash();
  3960. #endif
  3961. basesize = n;
  3962. }
  3963. bool checkSocks()
  3964. {
  3965. bool ret = false;
  3966. ForEachItemIn(i,items) {
  3967. SelectItem &si = items.element(i);
  3968. if (si.del)
  3969. ret = true; // maybe that bad one
  3970. else if (!sockOk(si.handle)) {
  3971. si.del = true;
  3972. ret = true;
  3973. }
  3974. }
  3975. return ret;
  3976. }
  3977. bool remove(ISocket *sock)
  3978. {
  3979. if (terminating)
  3980. return false;
  3981. CriticalBlock block(sect);
  3982. if (sock==NULL) { // wait until no changes outstanding
  3983. while (selectvarschange) {
  3984. waitingchange++;
  3985. CriticalUnblock unblock(sect);
  3986. waitingchangesem.wait();
  3987. }
  3988. return true;
  3989. }
  3990. ForEachItemIn(i,items) {
  3991. SelectItem &si = items.element(i);
  3992. if (!si.del&&(si.sock==sock)) {
  3993. si.del = true;
  3994. selectvarschange = true;
  3995. triggerselect();
  3996. return true;
  3997. }
  3998. }
  3999. return false;
  4000. }
  4001. bool add(ISocket *sock,unsigned mode,ISocketSelectNotify *nfy)
  4002. {
  4003. // maybe check once to prevent 1st delay? TBD
  4004. CriticalBlock block(sect);
  4005. unsigned n=0;
  4006. ForEachItemIn(i,items) {
  4007. SelectItem &si = items.element(i);
  4008. if (!si.del) {
  4009. if (si.sock==sock) {
  4010. si.del = true;
  4011. }
  4012. else
  4013. n++;
  4014. }
  4015. }
  4016. if (n>=XFD_SETSIZE-1) // leave 1 spare
  4017. return false;
  4018. SelectItem sn;
  4019. sn.nfy = LINK(nfy);
  4020. sn.sock = LINK(sock);
  4021. sn.mode = (byte)mode;
  4022. sn.handle = (T_SOCKET)sock->OShandle();
  4023. CHECKSOCKRANGE(sn.handle);
  4024. sn.del = false;
  4025. items.append(sn);
  4026. selectvarschange = true;
  4027. triggerselect();
  4028. return true;
  4029. }
  4030. void updateSelectVars(T_FD_SET &rdfds,T_FD_SET &wrfds,T_FD_SET &exfds,bool &isrd,bool &iswr,bool &isex,unsigned &ni,T_SOCKET &max_sockid)
  4031. {
  4032. CriticalBlock block(sect);
  4033. selectvarschange = false;
  4034. if (waitingchange) {
  4035. waitingchangesem.signal(waitingchange);
  4036. waitingchange = 0;
  4037. }
  4038. if (validateselecterror) { // something went wrong so check sockets
  4039. validateerrcount++;
  4040. if (!checkSocks()) {
  4041. // bad socket not found
  4042. PROGLOG("CSocketSelectThread::updateSelectVars cannot find socket error");
  4043. if (validateerrcount>10)
  4044. throw MakeStringException(-1,"CSocketSelectThread:Socket select error %d",validateselecterror);
  4045. }
  4046. }
  4047. else
  4048. validateerrcount = 0;
  4049. updateItems();
  4050. XFD_ZERO( &rdfds );
  4051. XFD_ZERO( &wrfds );
  4052. XFD_ZERO( &exfds );
  4053. isrd=false;
  4054. iswr=false;
  4055. isex=false;
  4056. #ifdef _USE_PIPE_FOR_SELECT_TRIGGER
  4057. max_sockid=dummysockopen?dummysock[0]:0;
  4058. #else
  4059. opendummy();
  4060. max_sockid=dummysockopen?dummysock:0;
  4061. #endif
  4062. ni = items.ordinality();
  4063. #ifdef _WIN32
  4064. if (offset>=ni)
  4065. #endif
  4066. offset = 0;
  4067. unsigned j = offset;
  4068. ForEachItemIn(i, items) {
  4069. SelectItem &si = items.element(j);
  4070. j++;
  4071. if (j==ni)
  4072. j = 0;
  4073. if (si.mode & SELECTMODE_READ) {
  4074. FD_SET( si.handle, &rdfds );
  4075. isrd = true;
  4076. }
  4077. if (si.mode & SELECTMODE_WRITE) {
  4078. FD_SET( si.handle, &wrfds );
  4079. iswr = true;
  4080. }
  4081. if (si.mode & SELECTMODE_EXCEPT) {
  4082. FD_SET( si.handle, &exfds );
  4083. isex = true;
  4084. }
  4085. max_sockid=std::max(si.handle, max_sockid);
  4086. }
  4087. if (dummysockopen) {
  4088. #ifdef _USE_PIPE_FOR_SELECT_TRIGGER
  4089. FD_SET( dummysock[0], &rdfds );
  4090. isrd = true;
  4091. #else
  4092. FD_SET( dummysock, &exfds );
  4093. isex = true;
  4094. #endif
  4095. }
  4096. validateselecterror = 0;
  4097. max_sockid++;
  4098. #ifdef SOCKTRACE
  4099. PROGLOG("SOCKTRACE: selecting on %d sockets",ni);
  4100. #endif
  4101. }
  4102. int run()
  4103. {
  4104. try
  4105. {
  4106. T_FD_SET rdfds;
  4107. T_FD_SET wrfds;
  4108. T_FD_SET exfds;
  4109. timeval selecttimeout;
  4110. bool isrd = false;
  4111. bool iswr = false;
  4112. bool isex = false;
  4113. T_SOCKET maxsockid = 0;
  4114. unsigned ni = 0;
  4115. selectvarschange = true;
  4116. unsigned numto = 0;
  4117. unsigned lastnumto = 0;
  4118. unsigned totnum = 0;
  4119. unsigned total = 0;
  4120. while (!terminating)
  4121. {
  4122. selecttimeout.tv_sec = SELECT_TIMEOUT_SECS; // linux modifies so initialize inside loop
  4123. selecttimeout.tv_usec = 0;
  4124. if (selectvarschange)
  4125. {
  4126. updateSelectVars(rdfds,wrfds,exfds,isrd,iswr,isex,ni,maxsockid);
  4127. }
  4128. if (ni==0)
  4129. {
  4130. validateerrcount = 0;
  4131. tickwait++;
  4132. if(!selectvarschange&&!terminating)
  4133. ticksem.wait(SELECT_TIMEOUT_SECS*1000);
  4134. tickwait--;
  4135. continue;
  4136. }
  4137. T_FD_SET rs;
  4138. T_FD_SET ws;
  4139. T_FD_SET es;
  4140. T_FD_SET *rsp = isrd?cpyfds(rs,rdfds):NULL;
  4141. T_FD_SET *wsp = iswr?cpyfds(ws,wrfds):NULL;
  4142. T_FD_SET *esp = isex?cpyfds(es,exfds):NULL;
  4143. int n = ::select(maxsockid,(fd_set *)rsp,(fd_set *)wsp,(fd_set *)esp,&selecttimeout); // first parameter needed for posix
  4144. if (terminating)
  4145. break;
  4146. if (n < 0)
  4147. {
  4148. CriticalBlock block(sect);
  4149. int err = ERRNO();
  4150. if (err != JSE_INTR)
  4151. {
  4152. if (dummysockopen)
  4153. {
  4154. LOGERR(err,12,"CSocketSelectThread select error"); // should cache error ?
  4155. validateselecterror = err;
  4156. #ifndef _USE_PIPE_FOR_SELECT_TRIGGER
  4157. closedummy(); // just in case was culprit
  4158. #endif
  4159. }
  4160. selectvarschange = true;
  4161. continue;
  4162. }
  4163. n = 0;
  4164. }
  4165. else if (n>0)
  4166. {
  4167. validateerrcount = 0;
  4168. numto = 0;
  4169. lastnumto = 0;
  4170. total += n;
  4171. totnum++;
  4172. SelectItemArray tonotify;
  4173. {
  4174. CriticalBlock block(sect);
  4175. #ifdef _WIN32
  4176. if (isrd)
  4177. processfds(rs,SELECTMODE_READ,tonotify);
  4178. if (iswr)
  4179. processfds(ws,SELECTMODE_WRITE,tonotify);
  4180. if (isex)
  4181. processfds(es,SELECTMODE_EXCEPT,tonotify);
  4182. #else
  4183. unsigned i;
  4184. SelectItem *si = items.getArray(offset);
  4185. SelectItem *sie = items.getArray(ni-1)+1;
  4186. bool r = isrd;
  4187. bool w = iswr;
  4188. bool e = isex;
  4189. #ifdef _USE_PIPE_FOR_SELECT_TRIGGER
  4190. if (r&&dummysockopen&&findfds(rs,dummysock[0],r))
  4191. {
  4192. resettrigger();
  4193. --n;
  4194. }
  4195. #endif
  4196. for (i=0;(n>0)&&(i<ni);i++)
  4197. {
  4198. unsigned addMode = 0;
  4199. if (r&&findfds(rs,si->handle,r))
  4200. {
  4201. if (!si->del)
  4202. addMode = SELECTMODE_READ;
  4203. --n;
  4204. }
  4205. if (w&&findfds(ws,si->handle,w))
  4206. {
  4207. if (!si->del)
  4208. addMode = SELECTMODE_WRITE;
  4209. --n;
  4210. }
  4211. if (e&&findfds(es,si->handle,e))
  4212. {
  4213. if (!si->del)
  4214. addMode = SELECTMODE_EXCEPT;
  4215. --n;
  4216. }
  4217. if (addMode)
  4218. {
  4219. tonotify.append(*si);
  4220. SelectItem &itm = tonotify.element(tonotify.length()-1);
  4221. itm.nfy->Link();
  4222. itm.sock->Link();
  4223. itm.mode = addMode;
  4224. }
  4225. si++;
  4226. if (si==sie)
  4227. si = items.getArray();
  4228. }
  4229. #endif
  4230. }
  4231. ForEachItemIn(j,tonotify)
  4232. {
  4233. const SelectItem &si = tonotify.item(j);
  4234. try
  4235. {
  4236. si.nfy->notifySelected(si.sock,si.mode); // ignore return
  4237. }
  4238. catch (IException *e)
  4239. { // should be acted upon by notifySelected
  4240. // could also not throw until after handling all events ...
  4241. EXCLOG(e,"CSocketSelectThread notifySelected");
  4242. throw ;
  4243. }
  4244. // Release/dtors should not throw but leaving try/catch here until all paths checked
  4245. try
  4246. {
  4247. si.nfy->Release();
  4248. si.sock->Release();
  4249. }
  4250. catch (IException *e)
  4251. {
  4252. EXCLOG(e,"CSocketSelectThread nfy/sock Release");
  4253. e->Release();
  4254. }
  4255. }
  4256. }
  4257. else
  4258. {
  4259. validateerrcount = 0;
  4260. if ((++numto>=lastnumto*2))
  4261. {
  4262. lastnumto = numto;
  4263. if (selecttrace&&(numto>4))
  4264. PROGLOG("%s: Select Idle(%d), %d,%d,%0.2f",selecttrace,numto,totnum,total,totnum?((double)total/(double)totnum):0.0);
  4265. }
  4266. /*
  4267. if (numto&&(numto%100))
  4268. {
  4269. CriticalBlock block(sect);
  4270. if (!selectvarschange)
  4271. selectvarschange = checkSocks();
  4272. }
  4273. */
  4274. }
  4275. if (++offset>=ni)
  4276. offset = 0;
  4277. }
  4278. }
  4279. catch (IException *e)
  4280. {
  4281. EXCLOG(e,"CSocketSelectThread");
  4282. termexcept.setown(e);
  4283. }
  4284. CriticalBlock block(sect);
  4285. try
  4286. {
  4287. updateItems();
  4288. }
  4289. catch (IException *e)
  4290. {
  4291. EXCLOG(e,"CSocketSelectThread(2)");
  4292. if (!termexcept)
  4293. termexcept.setown(e);
  4294. else
  4295. e->Release();
  4296. }
  4297. return 0;
  4298. }
  4299. };
  4300. class CSocketSelectHandler: implements ISocketSelectHandler, public CInterface
  4301. {
  4302. CIArrayOf<CSocketSelectThread> threads;
  4303. CriticalSection sect;
  4304. bool started;
  4305. std::atomic<bool> stopped;
  4306. StringAttr selecttrace;
  4307. public:
  4308. IMPLEMENT_IINTERFACE;
  4309. CSocketSelectHandler(const char *trc)
  4310. : started(false), stopped(false), selecttrace(trc)
  4311. {
  4312. }
  4313. ~CSocketSelectHandler()
  4314. {
  4315. stop(true);
  4316. threads.kill();
  4317. }
  4318. void start()
  4319. {
  4320. CriticalBlock block(sect);
  4321. if (!started) {
  4322. started = true;
  4323. ForEachItemIn(i,threads) {
  4324. threads.item(i).start();
  4325. }
  4326. }
  4327. }
  4328. void add(ISocket *sock,unsigned mode,ISocketSelectNotify *nfy)
  4329. {
  4330. CriticalBlock block(sect);
  4331. if (stopped)
  4332. return;
  4333. for (;;) {
  4334. bool added=false;
  4335. ForEachItemIn(i,threads) {
  4336. if (added)
  4337. threads.item(i).remove(sock);
  4338. else
  4339. added = threads.item(i).add(sock,mode,nfy);
  4340. }
  4341. if (added)
  4342. return;
  4343. CSocketSelectThread *thread = new CSocketSelectThread(selecttrace);
  4344. threads.append(*thread);
  4345. if (started)
  4346. thread->start();
  4347. }
  4348. }
  4349. void remove(ISocket *sock)
  4350. {
  4351. CriticalBlock block(sect);
  4352. ForEachItemIn(i,threads) {
  4353. if (threads.item(i).remove(sock)&&sock)
  4354. break;
  4355. }
  4356. }
  4357. void stop(bool wait)
  4358. {
  4359. IException *e=NULL;
  4360. CriticalBlock block(sect);
  4361. stopped = true;
  4362. unsigned i = 0;
  4363. while (i<threads.ordinality()) {
  4364. CSocketSelectThread &t=threads.item(i);
  4365. {
  4366. CriticalUnblock unblock(sect);
  4367. t.stop(wait); // not quite as quick as could be if wait true
  4368. }
  4369. if (wait && !e && t.termexcept)
  4370. e = t.termexcept.getClear();
  4371. i++;
  4372. }
  4373. #if 0 // don't throw error as too late
  4374. if (e)
  4375. throw e;
  4376. #else
  4377. ::Release(e);
  4378. #endif
  4379. }
  4380. };
  4381. #ifdef _HAS_EPOLL_SUPPORT
  4382. # define SOCK_ADDED 0x1
  4383. # define SOCK_REMOVED 0x2
  4384. class CSocketEpollThread: public CSocketBaseThread
  4385. {
  4386. int epfd;
  4387. SelectItem *sidummy;
  4388. SelectItemArrayP items;
  4389. struct epoll_event *epevents;
  4390. unsigned hdlPerThrd;
  4391. void epoll_op(int efd, int op, SelectItem *si, unsigned int event_mask)
  4392. {
  4393. int fd = si->handle;
  4394. int srtn;
  4395. struct epoll_event event;
  4396. // write all bytes to eliminate uninitialized warnings
  4397. memset(&event, 0, sizeof(event));
  4398. event.events = event_mask;
  4399. event.data.ptr = si;
  4400. # ifdef EPOLLTRACE
  4401. DBGLOG("EPOLL: op(%d) fd %d to epfd %d", op, fd, efd);
  4402. # endif
  4403. srtn = ::epoll_ctl(efd, op, fd, &event);
  4404. // if another thread closed fd before here don't fail
  4405. if ( (srtn < 0) && (op != EPOLL_CTL_DEL) ){
  4406. int err = ERRNO();
  4407. IWARNLOG("epoll_ctl failed op:%d, fd:%d, err=%d", op, fd, err);
  4408. }
  4409. }
  4410. void opendummy()
  4411. {
  4412. CriticalBlock block(sect);
  4413. if (!dummysockopen)
  4414. {
  4415. sidummy = new SelectItem;
  4416. sidummy->sock = nullptr;
  4417. sidummy->nfy = nullptr;
  4418. sidummy->del = true; // so its not added to tonotify ...
  4419. sidummy->mode = 0;
  4420. #ifdef _USE_PIPE_FOR_SELECT_TRIGGER
  4421. if(pipe(dummysock))
  4422. {
  4423. IWARNLOG("CSocketEpollThread: create pipe failed %d",ERRNO());
  4424. return;
  4425. }
  4426. for (unsigned i=0;i<2;i++)
  4427. {
  4428. int flags = fcntl(dummysock[i], F_GETFL, 0);
  4429. if (flags!=-1)
  4430. {
  4431. flags |= O_NONBLOCK;
  4432. fcntl(dummysock[i], F_SETFL, flags);
  4433. }
  4434. flags = fcntl(dummysock[i], F_GETFD, 0);
  4435. if (flags!=-1)
  4436. {
  4437. flags |= FD_CLOEXEC;
  4438. fcntl(dummysock[i], F_SETFD, flags);
  4439. }
  4440. }
  4441. sidummy->handle = dummysock[0];
  4442. epoll_op(epfd, EPOLL_CTL_ADD, sidummy, EPOLLINX);
  4443. #else
  4444. if (IP6preferred)
  4445. dummysock = ::socket(AF_INET6, SOCK_STREAM, PF_INET6);
  4446. else
  4447. dummysock = ::socket(AF_INET, SOCK_STREAM, 0);
  4448. // added EPOLLIN and EPOLLRDHUP also because cannot find anywhere MSG_OOB is sent
  4449. // added here to match existing select() code above which sets
  4450. // the except fd_set mask.
  4451. sidummy->handle = dummysock;
  4452. epoll_op(epfd, EPOLL_CTL_ADD, sidummy, (EPOLLINX | EPOLLPRI));
  4453. #endif
  4454. dummysockopen = true;
  4455. }
  4456. }
  4457. void closedummy()
  4458. {
  4459. CriticalBlock block(sect);
  4460. if (dummysockopen)
  4461. {
  4462. epoll_op(epfd, EPOLL_CTL_DEL, sidummy, 0);
  4463. #ifdef _USE_PIPE_FOR_SELECT_TRIGGER
  4464. #ifdef SOCKTRACE
  4465. PROGLOG("SOCKTRACE: Closing dummy sockets %x %d %x %d (%p)", dummysock[0], dummysock[0], dummysock[1], dummysock[1], this);
  4466. #endif
  4467. ::close(dummysock[0]);
  4468. ::close(dummysock[1]);
  4469. #else
  4470. ::close(dummysock);
  4471. #endif
  4472. delete sidummy;
  4473. dummysockopen = false;
  4474. }
  4475. }
  4476. void delSelItem(SelectItem *si)
  4477. {
  4478. epoll_op(epfd, EPOLL_CTL_DEL, si, 0);
  4479. // Release/dtors should not throw but leaving try/catch here until all paths checked
  4480. try
  4481. {
  4482. si->nfy->Release();
  4483. si->sock->Release();
  4484. delete si;
  4485. }
  4486. catch (IException *e)
  4487. {
  4488. EXCLOG(e,"CSocketEpollThread::delSelItem()");
  4489. e->Release();
  4490. }
  4491. }
  4492. void delSelItemPos(SelectItem *si, unsigned pos)
  4493. {
  4494. unsigned last = items.ordinality();
  4495. delSelItem(si);
  4496. last--;
  4497. if (pos < last)
  4498. items.swap(pos, last);
  4499. items.remove(last);
  4500. }
  4501. public:
  4502. IMPLEMENT_IINTERFACE;
  4503. CSocketEpollThread(const char *trc, unsigned _hdlPerThrd)
  4504. : CSocketBaseThread("CSocketEpollThread")
  4505. {
  4506. dummysockopen = false;
  4507. terminating = false;
  4508. waitingchange = 0;
  4509. selectvarschange = false;
  4510. validateselecterror = 0;
  4511. validateerrcount = 0;
  4512. offset = 0;
  4513. selecttrace = trc;
  4514. hdlPerThrd = _hdlPerThrd;
  4515. epfd = ::epoll_create(1); // NB: arg is not used in newer kernels
  4516. if (epfd < 0)
  4517. {
  4518. int err = ERRNO();
  4519. LOGERR(err,1,"epoll_create()");
  4520. THROWJSOCKEXCEPTION2(err);
  4521. }
  4522. # ifdef FD_CLOEXEC
  4523. int epflags = fcntl(epfd, F_GETFD, 0);
  4524. if (epflags != -1)
  4525. {
  4526. epflags |= FD_CLOEXEC;
  4527. fcntl(epfd, F_SETFD, epflags);
  4528. }
  4529. # endif
  4530. # ifdef EPOLLTRACE
  4531. DBGLOG("CSocketEpollThread: creating epoll fd %d", epfd );
  4532. # endif
  4533. try
  4534. {
  4535. epevents = new struct epoll_event[MAX_RET_EVENTS];
  4536. }
  4537. catch (const std::bad_alloc &e)
  4538. {
  4539. int err = ERRNO();
  4540. LOGERR(err,1,"epevents alloc()");
  4541. THROWJSOCKEXCEPTION2(err);
  4542. }
  4543. opendummy();
  4544. }
  4545. ~CSocketEpollThread()
  4546. {
  4547. closedummy();
  4548. ForEachItemIn(i, items)
  4549. {
  4550. SelectItem *si = items.element(i);
  4551. delSelItem(si);
  4552. }
  4553. if (epfd >= 0)
  4554. {
  4555. # ifdef EPOLLTRACE
  4556. DBGLOG("EPOLL: closing epfd %d", epfd);
  4557. # endif
  4558. ::close(epfd);
  4559. epfd = -1;
  4560. delete [] epevents;
  4561. }
  4562. }
  4563. Owned<IException> termexcept;
  4564. bool checkSocks()
  4565. {
  4566. bool ret = false;
  4567. // must be holding CriticalBlock (sect)
  4568. unsigned n = items.ordinality();
  4569. for (unsigned i=0;i<n;)
  4570. {
  4571. SelectItem *si = items.element(i);
  4572. if (!sockOk(si->handle))
  4573. {
  4574. delSelItemPos(si, i);
  4575. n--;
  4576. ret = true;
  4577. }
  4578. else
  4579. i++;
  4580. }
  4581. return ret;
  4582. }
  4583. bool removeSock(ISocket *sock)
  4584. {
  4585. // must be holding CriticalBlock (sect)
  4586. unsigned n = items.ordinality();
  4587. for (unsigned i=0;i<n;)
  4588. {
  4589. SelectItem *si = items.element(i);
  4590. if (si->sock==sock)
  4591. {
  4592. delSelItemPos(si, i);
  4593. n--;
  4594. return true;
  4595. }
  4596. else
  4597. i++;
  4598. }
  4599. return false;
  4600. }
  4601. bool remove(ISocket *sock)
  4602. {
  4603. CriticalBlock block(sect);
  4604. if (terminating)
  4605. return false;
  4606. if (sock==NULL)
  4607. { // wait until no changes outstanding
  4608. while (selectvarschange)
  4609. {
  4610. waitingchange++;
  4611. CriticalUnblock unblock(sect);
  4612. waitingchangesem.wait();
  4613. }
  4614. return true;
  4615. }
  4616. if (removeSock(sock))
  4617. {
  4618. selectvarschange = true;
  4619. // NB: could set terminating here if no more hdls on
  4620. // this thread and at least one other thread is present
  4621. triggerselect();
  4622. return true;
  4623. }
  4624. return false;
  4625. }
  4626. unsigned add(ISocket *sock,unsigned mode,ISocketSelectNotify *nfy)
  4627. {
  4628. if ( !sock || !nfy ||
  4629. !(mode & (SELECTMODE_READ|SELECTMODE_WRITE|SELECTMODE_EXCEPT)) )
  4630. {
  4631. IWARNLOG("EPOLL: adding fd but sock or nfy is NULL or mode is empty");
  4632. dbgassertex(false);
  4633. return 0;
  4634. }
  4635. CriticalBlock block(sect);
  4636. if (terminating)
  4637. return 0;
  4638. unsigned rm = 0;
  4639. if (removeSock(sock))
  4640. rm = SOCK_REMOVED;
  4641. unsigned n = items.ordinality();
  4642. // new handler thread
  4643. if (n >= hdlPerThrd)
  4644. return (0|rm);
  4645. SelectItem *sn = new SelectItem;
  4646. sn->nfy = LINK(nfy);
  4647. sn->sock = LINK(sock);
  4648. sn->mode = (byte)mode;
  4649. sn->handle = (T_SOCKET)sock->OShandle();
  4650. sn->del = false;
  4651. items.append(sn);
  4652. unsigned int ep_mode = 0;
  4653. if (mode & SELECTMODE_READ)
  4654. ep_mode |= EPOLLINX;
  4655. if (mode & SELECTMODE_WRITE)
  4656. ep_mode |= EPOLLOUT;
  4657. if (mode & SELECTMODE_EXCEPT)
  4658. ep_mode |= EPOLLPRI;
  4659. epoll_op(epfd, EPOLL_CTL_ADD, sn, ep_mode);
  4660. selectvarschange = true;
  4661. triggerselect();
  4662. return (SOCK_ADDED|rm);
  4663. }
  4664. void updateEpollVars(unsigned &ni)
  4665. {
  4666. CriticalBlock block(sect);
  4667. selectvarschange = false;
  4668. if (waitingchange)
  4669. {
  4670. waitingchangesem.signal(waitingchange);
  4671. waitingchange = 0;
  4672. }
  4673. if (validateselecterror)
  4674. { // something went wrong so check sockets
  4675. validateerrcount++;
  4676. if (!checkSocks())
  4677. {
  4678. // bad socket not found
  4679. IWARNLOG("CSocketEpollThread::updateEpollVars cannot find socket error");
  4680. if (validateerrcount>10)
  4681. throw MakeStringException(-1,"CSocketEpollThread:Socket epoll error %d",validateselecterror);
  4682. }
  4683. }
  4684. else
  4685. validateerrcount = 0;
  4686. #ifndef _USE_PIPE_FOR_SELECT_TRIGGER
  4687. opendummy();
  4688. #endif
  4689. ni = items.ordinality();
  4690. validateselecterror = 0;
  4691. }
  4692. int run()
  4693. {
  4694. try
  4695. {
  4696. unsigned ni = 0;
  4697. unsigned numto = 0;
  4698. unsigned lastnumto = 0;
  4699. unsigned totnum = 0;
  4700. unsigned total = 0;
  4701. selectvarschange = true;
  4702. while (!terminating)
  4703. {
  4704. if (selectvarschange)
  4705. updateEpollVars(ni);
  4706. if (ni==0)
  4707. {
  4708. validateerrcount = 0;
  4709. tickwait++;
  4710. if(!selectvarschange&&!terminating)
  4711. ticksem.wait(SELECT_TIMEOUT_SECS*1000);
  4712. tickwait--;
  4713. continue;
  4714. }
  4715. // poll for any events ...
  4716. int err = 0;
  4717. int n = ::epoll_wait(epfd, epevents, 1, 1000);
  4718. if (n < 0)
  4719. err = ERRNO();
  4720. # ifdef EPOLLTRACE
  4721. if(n > 0)
  4722. DBGLOG("EPOLL: after epoll_wait(), n = %d, ni = %d", n, ni);
  4723. # endif
  4724. if (terminating)
  4725. break;
  4726. if (n < 0)
  4727. {
  4728. CriticalBlock block(sect);
  4729. if (err != JSE_INTR)
  4730. {
  4731. if (dummysockopen)
  4732. {
  4733. LOGERR(err,12,"CSocketEpollThread epoll error"); // should cache error ?
  4734. validateselecterror = err;
  4735. #ifndef _USE_PIPE_FOR_SELECT_TRIGGER
  4736. closedummy(); // just in case was culprit
  4737. #endif
  4738. }
  4739. selectvarschange = true;
  4740. continue;
  4741. }
  4742. }
  4743. else if (n>0)
  4744. {
  4745. validateerrcount = 0;
  4746. numto = 0;
  4747. lastnumto = 0;
  4748. total += n;
  4749. totnum++;
  4750. SelectItemArray tonotify;
  4751. {
  4752. CriticalBlock block(sect);
  4753. // retrieve events, without waiting, while holding CS ...
  4754. int n2 = ::epoll_wait(epfd, epevents, MAX_RET_EVENTS, 0);
  4755. for (int j=0;j<n2;j++)
  4756. {
  4757. int tfd = -1;
  4758. SelectItem *epsi = (SelectItem *)epevents[j].data.ptr;
  4759. if (epsi)
  4760. tfd = epsi->handle;
  4761. # ifdef EPOLLTRACE
  4762. DBGLOG("EPOLL: j = %d, fd = %d, emask = %u", j, tfd, epevents[j].events);
  4763. # endif
  4764. if (tfd >= 0)
  4765. {
  4766. # ifdef _USE_PIPE_FOR_SELECT_TRIGGER
  4767. if ( (dummysockopen) && (tfd == dummysock[0]) )
  4768. {
  4769. resettrigger();
  4770. continue;
  4771. }
  4772. # endif
  4773. unsigned int ep_mode = 0;
  4774. if (epevents[j].events & (EPOLLINX | EPOLLHUP | EPOLLERR))
  4775. ep_mode |= SELECTMODE_READ;
  4776. if (epevents[j].events & EPOLLOUT)
  4777. ep_mode |= SELECTMODE_WRITE;
  4778. if (epevents[j].events & EPOLLPRI)
  4779. ep_mode |= SELECTMODE_EXCEPT;
  4780. if (ep_mode != 0)
  4781. {
  4782. tonotify.append(*epsi);
  4783. SelectItem &itm = tonotify.element(tonotify.length()-1);
  4784. itm.nfy->Link();
  4785. itm.sock->Link();
  4786. #ifdef _TRACELINKCLOSED
  4787. // temporary, to help diagnose spurious socket closes (hpcc-15043)
  4788. // currently no implementation of notifySelected() uses the mode
  4789. // argument so we can pass in the epoll events mask and log that
  4790. // if there is no data and the socket gets closed
  4791. itm.mode = epevents[j].events;
  4792. #else
  4793. itm.mode = ep_mode;
  4794. #endif
  4795. }
  4796. }
  4797. }
  4798. }
  4799. Owned<IException> nfyexcept;
  4800. ForEachItemIn(j,tonotify)
  4801. {
  4802. const SelectItem &si = tonotify.item(j);
  4803. try
  4804. {
  4805. si.nfy->notifySelected(si.sock,si.mode); // ignore return
  4806. }
  4807. catch (IException *e)
  4808. { // should be acted upon by notifySelected
  4809. // Don't throw until after handling all events ...
  4810. EXCLOG(e,"CSocketEpollThread notifySelected");
  4811. if (!nfyexcept)
  4812. nfyexcept.setown(e);
  4813. else
  4814. e->Release();
  4815. }
  4816. // Release/dtors should not throw but leaving try/catch here until all paths checked
  4817. try
  4818. {
  4819. si.nfy->Release();
  4820. si.sock->Release();
  4821. }
  4822. catch (IException *e)
  4823. {
  4824. EXCLOG(e,"CSocketEpollThread nfy/sock Release");
  4825. e->Release();
  4826. }
  4827. }
  4828. if (nfyexcept)
  4829. throw nfyexcept.getClear();
  4830. }
  4831. else
  4832. {
  4833. validateerrcount = 0;
  4834. if ((++numto>=lastnumto*2))
  4835. {
  4836. lastnumto = numto;
  4837. if (selecttrace&&(numto>4))
  4838. PROGLOG("%s: Epoll Idle(%d), %d,%d,%0.2f",selecttrace,numto,totnum,total,totnum?((double)total/(double)totnum):0.0);
  4839. }
  4840. /*
  4841. if (numto&&(numto%100))
  4842. {
  4843. CriticalBlock block(sect);
  4844. if (!selectvarschange)
  4845. selectvarschange = checkSocks();
  4846. }
  4847. */
  4848. }
  4849. }
  4850. }
  4851. catch (IException *e)
  4852. {
  4853. EXCLOG(e,"CSocketEpollThread");
  4854. termexcept.setown(e);
  4855. }
  4856. return 0;
  4857. }
  4858. };
  4859. class CSocketEpollHandler: implements ISocketSelectHandler, public CInterface
  4860. {
  4861. CIArrayOf<CSocketEpollThread> threads;
  4862. CriticalSection sect;
  4863. bool started;
  4864. std::atomic<bool> stopped;
  4865. StringAttr epolltrace;
  4866. unsigned hdlPerThrd;
  4867. public:
  4868. IMPLEMENT_IINTERFACE;
  4869. CSocketEpollHandler(const char *trc, unsigned _hdlPerThrd)
  4870. : started(false), stopped(false), epolltrace(trc), hdlPerThrd(_hdlPerThrd)
  4871. {
  4872. }
  4873. ~CSocketEpollHandler()
  4874. {
  4875. stop(true);
  4876. threads.kill();
  4877. }
  4878. void start()
  4879. {
  4880. CriticalBlock block(sect);
  4881. if (!started)
  4882. {
  4883. started = true;
  4884. ForEachItemIn(i,threads)
  4885. {
  4886. threads.item(i).start();
  4887. }
  4888. }
  4889. }
  4890. void add(ISocket *sock,unsigned mode,ISocketSelectNotify *nfy)
  4891. {
  4892. if ( !sock || !nfy ||
  4893. !(mode & (SELECTMODE_READ|SELECTMODE_WRITE|SELECTMODE_EXCEPT)) )
  4894. throw MakeStringException(-1,"CSocketEpollHandler::add() invalid sock or nfy or mode");
  4895. CriticalBlock block(sect);
  4896. if (stopped)
  4897. return;
  4898. // Create new handler thread if current one has hdlPerThrd fds.
  4899. // epoll() handles many fds faster than select so this would
  4900. // seem not as important, but we are still serializing on
  4901. // nfy events and spreading those over threads may help,
  4902. // especially with SSL as avail_read() could block more.
  4903. unsigned addrm = 0;
  4904. ForEachItemIn(i,threads)
  4905. {
  4906. if (!(addrm & SOCK_ADDED))
  4907. {
  4908. addrm |= threads.item(i).add(sock,mode,nfy);
  4909. if (addrm & (SOCK_ADDED | SOCK_REMOVED))
  4910. return;
  4911. }
  4912. else if (!(addrm & SOCK_REMOVED))
  4913. {
  4914. if (threads.item(i).remove(sock))
  4915. return;
  4916. }
  4917. }
  4918. if (addrm & SOCK_ADDED)
  4919. return;
  4920. CSocketEpollThread *thread = new CSocketEpollThread(epolltrace, hdlPerThrd);
  4921. threads.append(*thread);
  4922. if (started)
  4923. thread->start();
  4924. thread->add(sock,mode,nfy);
  4925. }
  4926. void remove(ISocket *sock)
  4927. {
  4928. CriticalBlock block(sect);
  4929. ForEachItemIn(i,threads)
  4930. {
  4931. if (threads.item(i).remove(sock))
  4932. break;
  4933. }
  4934. }
  4935. void stop(bool wait)
  4936. {
  4937. CriticalBlock block(sect);
  4938. stopped = true;
  4939. ForEachItemIn(i,threads)
  4940. {
  4941. CSocketEpollThread &t=threads.item(i);
  4942. {
  4943. CriticalUnblock unblock(sect);
  4944. t.stop(wait); // not quite as quick as could be if wait true
  4945. }
  4946. }
  4947. }
  4948. };
  4949. enum EpollMethod { EPOLL_INIT = 0, EPOLL_DISABLED, EPOLL_ENABLED };
  4950. static EpollMethod epoll_method = EPOLL_INIT;
  4951. static CriticalSection epollsect;
  4952. void check_epoll_cfg()
  4953. {
  4954. CriticalBlock block(epollsect);
  4955. // DBGLOG("check_epoll_cfg(): epoll_method = %d",epoll_method);
  4956. if (epoll_method == EPOLL_INIT)
  4957. {
  4958. if (queryEnvironmentConf().getPropBool("use_epoll", true))
  4959. epoll_method = EPOLL_ENABLED;
  4960. else
  4961. epoll_method = EPOLL_DISABLED;
  4962. // DBGLOG("check_epoll_cfg(): after reading conf file, epoll_method = %d",epoll_method);
  4963. epoll_hdlPerThrd = (unsigned)queryEnvironmentConf().getPropInt("epoll_hdlperthrd", UINT_MAX);
  4964. if (epoll_hdlPerThrd == 0)
  4965. epoll_hdlPerThrd = UINT_MAX;
  4966. // DBGLOG("check_epoll_cfg(): after reading conf file, epoll_hdlPerThrd = %u",epoll_hdlPerThrd);
  4967. }
  4968. }
  4969. #endif // _HAS_EPOLL_SUPPORT
  4970. ISocketSelectHandler *createSocketSelectHandler(const char *trc, unsigned hdlPerThrd)
  4971. {
  4972. #ifdef _HAS_EPOLL_SUPPORT
  4973. check_epoll_cfg();
  4974. if (epoll_method == EPOLL_ENABLED)
  4975. {
  4976. if (hdlPerThrd == 0)
  4977. hdlPerThrd = epoll_hdlPerThrd;
  4978. return new CSocketEpollHandler(trc, hdlPerThrd);
  4979. }
  4980. else
  4981. return new CSocketSelectHandler(trc);
  4982. #else
  4983. return new CSocketSelectHandler(trc);
  4984. #endif
  4985. }
  4986. ISocketSelectHandler *createSocketEpollHandler(const char *trc, unsigned hdlPerThrd)
  4987. {
  4988. #ifdef _HAS_EPOLL_SUPPORT
  4989. check_epoll_cfg();
  4990. if (hdlPerThrd == 0)
  4991. hdlPerThrd = epoll_hdlPerThrd;
  4992. return new CSocketEpollHandler(trc, hdlPerThrd);
  4993. #else
  4994. return new CSocketSelectHandler(trc);
  4995. #endif
  4996. }
  4997. void readBuffer(ISocket * socket, MemoryBuffer & buffer)
  4998. {
  4999. size32_t len;
  5000. socket->read(&len, sizeof(len));
  5001. _WINREV4(len);
  5002. if (len) {
  5003. void * target = buffer.reserve(len);
  5004. socket->read(target, len);
  5005. }
  5006. }
  5007. void readBuffer(ISocket * socket, MemoryBuffer & buffer, unsigned timeoutms)
  5008. {
  5009. size32_t len;
  5010. size32_t sizeGot;
  5011. socket->readtms(&len, sizeof(len), sizeof(len), sizeGot, timeoutms);
  5012. _WINREV4(len);
  5013. if (len) {
  5014. void * target = buffer.reserve(len);
  5015. socket->readtms(target, len, len, sizeGot, timeoutms);
  5016. }
  5017. }
  5018. void writeBuffer(ISocket * socket, MemoryBuffer & buffer)
  5019. {
  5020. unsigned len = buffer.length();
  5021. _WINREV4(len);
  5022. socket->write(&len, sizeof(len));
  5023. if (len)
  5024. socket->write(buffer.toByteArray(), buffer.length());
  5025. }
  5026. bool catchReadBuffer(ISocket * socket, MemoryBuffer & buffer)
  5027. {
  5028. try
  5029. {
  5030. readBuffer(socket, buffer);
  5031. return true;
  5032. }
  5033. catch (IException * e)
  5034. {
  5035. switch (e->errorCode())
  5036. {
  5037. case JSOCKERR_graceful_close:
  5038. break;
  5039. default:
  5040. EXCLOG(e,"catchReadBuffer");
  5041. break;
  5042. }
  5043. e->Release();
  5044. }
  5045. return false;
  5046. }
  5047. bool catchReadBuffer(ISocket * socket, MemoryBuffer & buffer, unsigned timeoutms)
  5048. {
  5049. try
  5050. {
  5051. readBuffer(socket, buffer, timeoutms);
  5052. return true;
  5053. }
  5054. catch (IException * e)
  5055. {
  5056. switch (e->errorCode())
  5057. {
  5058. case JSOCKERR_graceful_close:
  5059. break;
  5060. default:
  5061. EXCLOG(e,"catchReadBuffer");
  5062. break;
  5063. }
  5064. e->Release();
  5065. }
  5066. return false;
  5067. }
  5068. bool catchWriteBuffer(ISocket * socket, MemoryBuffer & buffer)
  5069. {
  5070. try
  5071. {
  5072. writeBuffer(socket, buffer);
  5073. return true;
  5074. }
  5075. catch (IException * e)
  5076. {
  5077. EXCLOG(e,"catchWriteBuffer");
  5078. e->Release();
  5079. }
  5080. return false;
  5081. }
  5082. CSingletonSocketConnection::CSingletonSocketConnection(SocketEndpoint &_ep)
  5083. {
  5084. ep = _ep;
  5085. state = Snone;
  5086. cancelling = false;
  5087. }
  5088. CSingletonSocketConnection::~CSingletonSocketConnection()
  5089. {
  5090. try {
  5091. if (sock)
  5092. sock->close();
  5093. }
  5094. catch (IException *e) {
  5095. if (e->errorCode()!=JSOCKERR_graceful_close)
  5096. EXCLOG(e,"CSingletonSocketConnection close");
  5097. e->Release();
  5098. }
  5099. }
  5100. void CSingletonSocketConnection::set_keep_alive(bool keepalive)
  5101. {
  5102. if (sock)
  5103. sock->set_keep_alive(keepalive);
  5104. }
  5105. bool CSingletonSocketConnection::connect(unsigned timeoutms)
  5106. {
  5107. CriticalBlock block(crit);
  5108. if (cancelling)
  5109. state = Scancelled;
  5110. if (state==Scancelled)
  5111. return false;
  5112. assertex(!sock);
  5113. ISocket *newsock=NULL;
  5114. state = Sconnect;
  5115. unsigned start = 0;
  5116. if (timeoutms!=(unsigned)INFINITE)
  5117. start = msTick();
  5118. while (state==Sconnect) {
  5119. try {
  5120. CriticalUnblock unblock(crit);
  5121. newsock = ISocket::connect_wait(ep,1000*60*4);
  5122. break;
  5123. }
  5124. catch (IException * e) {
  5125. if ((e->errorCode()==JSOCKERR_timeout_expired)||(e->errorCode()==JSOCKERR_connection_failed)) {
  5126. e->Release();
  5127. if ((state==Sconnect)&&(timeoutms!=(unsigned)INFINITE)&&(msTick()-start>timeoutms)) {
  5128. state = Snone;
  5129. return false;
  5130. }
  5131. }
  5132. else {
  5133. state = Scancelled;
  5134. EXCLOG(e,"CSingletonSocketConnection::connect");
  5135. e->Release();
  5136. return false;
  5137. }
  5138. }
  5139. }
  5140. if (state!=Sconnect) {
  5141. ::Release(newsock);
  5142. newsock = NULL;
  5143. }
  5144. if (!newsock) {
  5145. state = Scancelled;
  5146. return false;
  5147. }
  5148. sock.setown(newsock);
  5149. return true;
  5150. }
  5151. bool CSingletonSocketConnection::send(MemoryBuffer &mb)
  5152. {
  5153. CriticalBlock block(crit);
  5154. if (cancelling)
  5155. state = Scancelled;
  5156. if (state==Scancelled)
  5157. return false;
  5158. assertex(sock);
  5159. state = Srecv;
  5160. try {
  5161. CriticalUnblock unblock(crit);
  5162. writeBuffer(sock,mb);
  5163. }
  5164. catch (IException * e) {
  5165. state = Scancelled;
  5166. EXCLOG(e,"CSingletonSocketConnection::send");
  5167. e->Release();
  5168. return false;
  5169. }
  5170. state = Snone;
  5171. return true;
  5172. }
  5173. unsigned short CSingletonSocketConnection::setRandomPort(unsigned short base, unsigned num)
  5174. {
  5175. for (;;) {
  5176. try {
  5177. ep.port = base+(unsigned short)(getRandom()%num);
  5178. listensock.setown(ISocket::create(ep.port));
  5179. return ep.port;
  5180. }
  5181. catch (IException *e) {
  5182. if (e->errorCode()!=JSOCKERR_port_in_use) {
  5183. state = Scancelled;
  5184. EXCLOG(e,"CSingletonSocketConnection::setRandomPort");
  5185. e->Release();
  5186. break;
  5187. }
  5188. e->Release();
  5189. }
  5190. }
  5191. return 0;
  5192. }
  5193. bool CSingletonSocketConnection::accept(unsigned timeoutms)
  5194. {
  5195. CriticalBlock block(crit);
  5196. if (cancelling)
  5197. state = Scancelled;
  5198. if (state==Scancelled)
  5199. return false;
  5200. if (!sock) {
  5201. ISocket *newsock=NULL;
  5202. state = Saccept;
  5203. for (;;) {
  5204. try {
  5205. {
  5206. CriticalUnblock unblock(crit);
  5207. if (!listensock)
  5208. listensock.setown(ISocket::create(ep.port));
  5209. if ((timeoutms!=(unsigned)INFINITE)&&(!listensock->wait_read(timeoutms))) {
  5210. state = Snone;
  5211. return false;
  5212. }
  5213. }
  5214. if (cancelling)
  5215. state = Scancelled;
  5216. if (state==Scancelled)
  5217. return false;
  5218. {
  5219. CriticalUnblock unblock(crit);
  5220. newsock=listensock->accept(true);
  5221. break;
  5222. }
  5223. }
  5224. catch (IException *e) {
  5225. if (e->errorCode()==JSOCKERR_graceful_close)
  5226. PROGLOG("CSingletonSocketConnection: Closed socket on accept - retrying...");
  5227. else {
  5228. state = Scancelled;
  5229. EXCLOG(e,"CSingletonSocketConnection::accept");
  5230. e->Release();
  5231. break;
  5232. }
  5233. e->Release();
  5234. }
  5235. }
  5236. if (state!=Saccept) {
  5237. ::Release(newsock);
  5238. newsock = NULL;
  5239. }
  5240. if (!newsock) {
  5241. state = Scancelled;
  5242. return false;
  5243. }
  5244. sock.setown(newsock);
  5245. }
  5246. return true;
  5247. }
  5248. bool CSingletonSocketConnection::recv(MemoryBuffer &mb, unsigned timeoutms)
  5249. {
  5250. CriticalBlock block(crit);
  5251. if (cancelling)
  5252. state = Scancelled;
  5253. if (state==Scancelled)
  5254. return false;
  5255. assertex(sock);
  5256. state = Srecv;
  5257. try {
  5258. CriticalUnblock unblock(crit);
  5259. readBuffer(sock,mb,timeoutms);
  5260. }
  5261. catch (IException *e) {
  5262. if (e->errorCode()==JSOCKERR_timeout_expired)
  5263. state = Snone;
  5264. else {
  5265. state = Scancelled;
  5266. if (e->errorCode()!=JSOCKERR_graceful_close)
  5267. EXCLOG(e,"CSingletonSocketConnection::recv");
  5268. }
  5269. e->Release();
  5270. return false;
  5271. }
  5272. state = Snone;
  5273. return true;
  5274. }
  5275. void CSingletonSocketConnection::cancel()
  5276. {
  5277. CriticalBlock block(crit);
  5278. while (state!=Scancelled) {
  5279. cancelling = true;
  5280. try {
  5281. switch (state) {
  5282. case Saccept:
  5283. {
  5284. if (listensock)
  5285. listensock->cancel_accept();
  5286. }
  5287. break;
  5288. case Sconnect:
  5289. // wait for timeout
  5290. break;
  5291. case Srecv:
  5292. {
  5293. if (sock)
  5294. sock->close();
  5295. }
  5296. break;
  5297. case Ssend:
  5298. // wait for finished
  5299. break;
  5300. default:
  5301. state = Scancelled;
  5302. break;
  5303. }
  5304. }
  5305. catch (IException *e) {
  5306. EXCLOG(e,"CSingletonSocketConnection::cancel");
  5307. e->Release();
  5308. }
  5309. {
  5310. CriticalUnblock unblock(crit);
  5311. Sleep(1000);
  5312. }
  5313. }
  5314. }
  5315. IConversation *createSingletonSocketConnection(unsigned short port,SocketEndpoint *_ep)
  5316. {
  5317. SocketEndpoint ep;
  5318. if (_ep)
  5319. ep = *_ep;
  5320. if (port)
  5321. ep.port = port;
  5322. return new CSingletonSocketConnection(ep);
  5323. }
  5324. // interface for reading from multiple sockets using the BF_SYNC_TRANSFER_PUSH protocol
  5325. class CSocketBufferReader: implements ISocketBufferReader, public CInterface
  5326. {
  5327. class SocketElem: implements ISocketSelectNotify, public CInterface
  5328. {
  5329. CSocketBufferReader *parent;
  5330. unsigned num; // top bit used for ready
  5331. MemoryAttr blk;
  5332. CriticalSection sect;
  5333. Linked<ISocket> sock;
  5334. bool active;
  5335. bool pending;
  5336. public:
  5337. IMPLEMENT_IINTERFACE;
  5338. void init(CSocketBufferReader *_parent,ISocket *_sock,unsigned _n)
  5339. {
  5340. parent = _parent;
  5341. num = _n;
  5342. sock.set(_sock);
  5343. active = true;
  5344. pending = false;
  5345. }
  5346. virtual bool notifySelected(ISocket *socket,unsigned selected)
  5347. {
  5348. assertex(sock==socket);
  5349. {
  5350. CriticalBlock block(sect);
  5351. if (pending) {
  5352. active = false;
  5353. parent->remove(sock);
  5354. return false;
  5355. }
  5356. pending = true;
  5357. unsigned t1=usTick();
  5358. size32_t sz = sock->receive_block_size();
  5359. unsigned t2=usTick();
  5360. if (sz)
  5361. sock->receive_block(blk.allocate(sz),sz);
  5362. else
  5363. parent->remove(sock);
  5364. unsigned t3=usTick();
  5365. if (t3-t1>60*1000000)
  5366. PROGLOG("CSocketBufferReader(%d): slow receive_block (%d,%d) sz=%d",num,t2-t1,t3-t2,sz);
  5367. }
  5368. parent->enqueue(this); // nb outside sect critical block
  5369. return false; // always return false
  5370. }
  5371. unsigned get(MemoryBuffer &mb)
  5372. {
  5373. CriticalBlock block(sect);
  5374. assertex(pending);
  5375. size32_t sz = (size32_t)blk.length();
  5376. if (sz)
  5377. mb.setBuffer(sz,blk.detach(),true);
  5378. pending = false;
  5379. if (!active) {
  5380. active = true;
  5381. parent->add(*this);
  5382. }
  5383. return num;
  5384. }
  5385. size32_t size()
  5386. {
  5387. return (size32_t)blk.length();
  5388. }
  5389. ISocket *getSocket() { return sock; }
  5390. } *elems;
  5391. SimpleInterThreadQueueOf<SocketElem, false> readyq;
  5392. Owned<ISocketSelectHandler> selecthandler;
  5393. size32_t buffersize;
  5394. size32_t buffermax;
  5395. unsigned bufferwaiting;
  5396. CriticalSection buffersect;
  5397. Semaphore buffersem;
  5398. bool isdone;
  5399. public:
  5400. IMPLEMENT_IINTERFACE;
  5401. CSocketBufferReader(const char *trc)
  5402. {
  5403. selecthandler.setown(createSocketSelectHandler(trc));
  5404. elems = NULL;
  5405. }
  5406. ~CSocketBufferReader()
  5407. {
  5408. delete [] elems;
  5409. }
  5410. virtual void init(unsigned num,ISocket **sockets,size32_t _buffermax)
  5411. {
  5412. elems = new SocketElem[num];
  5413. for (unsigned i=0;i<num;i++) {
  5414. ISocket *sock = sockets[i];
  5415. if (sock) { // can have gaps
  5416. elems[i].init(this,sock,i);
  5417. add(elems[i]);
  5418. }
  5419. }
  5420. buffersize = 0;
  5421. buffermax = _buffermax;
  5422. bufferwaiting = 0;
  5423. isdone = false;
  5424. selecthandler->start();
  5425. }
  5426. virtual unsigned get(MemoryBuffer &mb)
  5427. {
  5428. SocketElem &e = *readyq.dequeue();
  5429. CriticalBlock block(buffersect);
  5430. assertex(buffersize>=e.size());
  5431. buffersize-=e.size();
  5432. if (bufferwaiting) {
  5433. buffersem.signal(bufferwaiting);
  5434. bufferwaiting = 0;
  5435. }
  5436. return e.get(mb);
  5437. }
  5438. virtual void done(bool wait)
  5439. {
  5440. buffersem.signal(0x10000);
  5441. isdone = true;
  5442. selecthandler->stop(wait);
  5443. if (wait) {
  5444. delete [] elems;
  5445. elems = NULL;
  5446. }
  5447. }
  5448. void enqueue(SocketElem *elem)
  5449. {
  5450. if (elem) {
  5451. CriticalBlock block(buffersect);
  5452. size32_t sz = elem->size();
  5453. while ((buffersize>0)&&(sz>0)&&(buffersize+sz>buffermax)) {
  5454. if (isdone)
  5455. return;
  5456. bufferwaiting++;
  5457. CriticalUnblock unblock(buffersect);
  5458. buffersem.wait();
  5459. }
  5460. buffersize += sz;
  5461. }
  5462. readyq.enqueue(elem);
  5463. }
  5464. void remove(ISocket *sock)
  5465. {
  5466. selecthandler->remove(sock);
  5467. }
  5468. void add(SocketElem &elem)
  5469. {
  5470. selecthandler->add(elem.getSocket(),SELECTMODE_READ,&elem);
  5471. }
  5472. };
  5473. ISocketBufferReader *createSocketBufferReader(const char *trc)
  5474. {
  5475. return new CSocketBufferReader(trc);
  5476. }
  5477. extern jlib_decl void markNodeCentral(SocketEndpoint &ep)
  5478. {
  5479. #ifdef CENTRAL_NODE_RANDOM_DELAY
  5480. CriticalBlock block(CSocket::crit);
  5481. CentralNodeArray.append(ep);
  5482. #endif
  5483. }
  5484. static CSocket *prepareSocket(unsigned idx,const SocketEndpoint &ep, ISocketConnectNotify &inotify)
  5485. {
  5486. Owned<CSocket> sock = new CSocket(ep,sm_tcp,NULL);
  5487. int err = sock->pre_connect(false);
  5488. if ((err == JSE_INPROGRESS)||(err == JSE_WOULDBLOCK))
  5489. return sock.getClear();
  5490. if (err==0) {
  5491. int err = sock->post_connect();
  5492. if (err==0)
  5493. inotify.connected(idx,ep,sock);
  5494. else {
  5495. sock->errclose();
  5496. inotify.failed(idx,ep,err);
  5497. }
  5498. }
  5499. else
  5500. inotify.failed(idx,ep,err);
  5501. return NULL;
  5502. }
  5503. void multiConnect(const SocketEndpointArray &eps,ISocketConnectNotify &inotify,unsigned timeout)
  5504. {
  5505. class SocketElem: implements ISocketSelectNotify, public CInterface
  5506. {
  5507. CriticalSection *sect;
  5508. ISocketSelectHandler *handler;
  5509. unsigned *remaining;
  5510. Semaphore *notifysem;
  5511. ISocketConnectNotify *inotify;
  5512. public:
  5513. Owned<CSocket> sock;
  5514. SocketEndpoint ep;
  5515. unsigned idx;
  5516. IMPLEMENT_IINTERFACE;
  5517. void init(CSocket *_sock,unsigned _idx,const SocketEndpoint &_ep,CriticalSection *_sect,ISocketSelectHandler *_handler,ISocketConnectNotify *_inotify, unsigned *_remaining, Semaphore *_notifysem)
  5518. {
  5519. ep = _ep;
  5520. idx = _idx;
  5521. inotify = _inotify;
  5522. sock.setown(_sock),
  5523. sect = _sect;
  5524. handler = _handler;
  5525. remaining = _remaining;
  5526. notifysem = _notifysem;
  5527. }
  5528. virtual bool notifySelected(ISocket *socket,unsigned selected)
  5529. {
  5530. CriticalBlock block(*sect);
  5531. handler->remove(socket);
  5532. int err = sock->post_connect();
  5533. CSocket *newsock = NULL;
  5534. {
  5535. CriticalUnblock unblock(*sect); // up to caller to cope with multithread
  5536. if (err==0)
  5537. inotify->connected(idx,ep,sock);
  5538. else if ((err==JSE_TIMEDOUT)||(err==JSE_CONNREFUSED)) {
  5539. // don't give up so easily (maybe listener not yet started (i.e. racing))
  5540. newsock = prepareSocket(idx,ep,*inotify);
  5541. Sleep(100); // not very nice but without this would just loop
  5542. }
  5543. else
  5544. inotify->failed(idx,ep,err);
  5545. }
  5546. if (newsock) {
  5547. sock.setown(newsock);
  5548. handler->add(sock,SELECTMODE_WRITE|SELECTMODE_EXCEPT,this);
  5549. }
  5550. else {
  5551. sock.clear();
  5552. (*remaining)--;
  5553. notifysem->signal();
  5554. }
  5555. return false;
  5556. }
  5557. } *elems;
  5558. unsigned n = eps.ordinality();
  5559. unsigned remaining = n;
  5560. if (!n)
  5561. return;
  5562. elems = new SocketElem[n];
  5563. unsigned i;
  5564. CriticalSection sect;
  5565. Semaphore notifysem;
  5566. Owned<ISocketSelectHandler> selecthandler = createSocketSelectHandler(
  5567. #ifdef _DEBUG
  5568. "multiConnect"
  5569. #else
  5570. NULL
  5571. #endif
  5572. );
  5573. StringBuffer name;
  5574. for (i=0;i<n;i++) {
  5575. CSocket* sock = prepareSocket(i,eps.item(i),inotify);
  5576. if (sock) {
  5577. elems[i].init(sock,i,eps.item(i),&sect,selecthandler,&inotify,&remaining,&notifysem);
  5578. selecthandler->add(sock,SELECTMODE_WRITE|SELECTMODE_EXCEPT,&elems[i]);
  5579. }
  5580. else
  5581. remaining--;
  5582. }
  5583. if (remaining) {
  5584. unsigned lastremaining=remaining;
  5585. selecthandler->start();
  5586. for (;;) {
  5587. bool to=!notifysem.wait(timeout);
  5588. {
  5589. CriticalBlock block(sect);
  5590. if (remaining==0)
  5591. break;
  5592. if (to&&(remaining==lastremaining))
  5593. break; // nothing happened recently
  5594. lastremaining = remaining;
  5595. }
  5596. }
  5597. selecthandler->stop(true);
  5598. }
  5599. selecthandler.clear();
  5600. if (remaining) {
  5601. for (unsigned j=0;j<n;j++) { // mop up timeouts
  5602. SocketElem &elem = elems[j];
  5603. if (elem.sock.get()) {
  5604. elem.sock.clear();
  5605. inotify.failed(j,elem.ep,-1);
  5606. remaining--;
  5607. if (remaining==0)
  5608. break;
  5609. }
  5610. }
  5611. }
  5612. delete [] elems;
  5613. }
  5614. void multiConnect(const SocketEndpointArray &eps, IPointerArrayOf<ISocket> &retsockets,unsigned timeout)
  5615. {
  5616. unsigned n = eps.ordinality();
  5617. if (n==0)
  5618. return;
  5619. if (n==1) { // no need for multi
  5620. ISocket *sock = NULL;
  5621. try {
  5622. sock = ISocket::connect_timeout(eps.item(0),timeout);
  5623. }
  5624. catch (IException *e) { // ignore error just append NULL
  5625. sock = NULL;
  5626. e->Release();
  5627. }
  5628. retsockets.append(sock);
  5629. return;
  5630. }
  5631. while (retsockets.ordinality()<n)
  5632. retsockets.append(NULL);
  5633. CriticalSection sect;
  5634. class cNotify: implements ISocketConnectNotify
  5635. {
  5636. CriticalSection &sect;
  5637. IPointerArrayOf<ISocket> &retsockets;
  5638. public:
  5639. cNotify(IPointerArrayOf<ISocket> &_retsockets,CriticalSection &_sect)
  5640. : sect(_sect), retsockets(_retsockets)
  5641. {
  5642. }
  5643. void connected(unsigned idx,const SocketEndpoint &ep,ISocket *sock)
  5644. {
  5645. CriticalBlock block(sect);
  5646. assertex(idx<retsockets.ordinality());
  5647. sock->Link();
  5648. retsockets.replace(sock,idx);
  5649. }
  5650. void failed(unsigned idx,const SocketEndpoint &ep,int err)
  5651. {
  5652. StringBuffer s;
  5653. PROGLOG("multiConnect failed to %s with %d",ep.getUrlStr(s).str(),err);
  5654. }
  5655. } notify(retsockets,sect);
  5656. multiConnect(eps,notify,timeout);
  5657. }
  5658. inline void flushText(StringBuffer &text,unsigned short port,unsigned &rep,unsigned &range)
  5659. {
  5660. if (rep) {
  5661. text.append('*').append(rep+1);
  5662. rep = 0;
  5663. }
  5664. else if (range) {
  5665. text.append('-').append(range);
  5666. range = 0;
  5667. }
  5668. if (port)
  5669. text.append(':').append(port);
  5670. }
  5671. StringBuffer &SocketEndpointArray::getText(StringBuffer &text)
  5672. {
  5673. unsigned count = ordinality();
  5674. if (!count)
  5675. return text;
  5676. if (count==1)
  5677. return item(0).getUrlStr(text);
  5678. byte lastip[4];
  5679. const SocketEndpoint &first = item(0);
  5680. bool lastis4 = first.getNetAddress(sizeof(lastip),&lastip)==sizeof(lastip);
  5681. unsigned short lastport = first.port;
  5682. first.getIpText(text);
  5683. unsigned rep=0;
  5684. unsigned range=0;
  5685. for (unsigned i=1;i<count;i++) {
  5686. byte ip[4];
  5687. const SocketEndpoint &ep = item(i);
  5688. bool is4 = ep.getNetAddress(sizeof(ip),&ip)==sizeof(ip);
  5689. if (!lastis4||!is4) {
  5690. flushText(text,lastport,rep,range);
  5691. text.append(',');
  5692. ep.getIpText(text);
  5693. }
  5694. else { // try and shorten
  5695. unsigned j;
  5696. for (j=0;j<4;j++)
  5697. if (ip[j]!=lastip[j])
  5698. break;
  5699. if (ep.port==lastport) {
  5700. if (j==4) {
  5701. if (range) // cant have range and rep
  5702. j--; // pretend only 3 matched
  5703. else {
  5704. rep++;
  5705. continue;
  5706. }
  5707. }
  5708. else if ((j==3)&&(lastip[3]+1==ip[3])&&(rep==0)) {
  5709. range = ip[3];
  5710. lastip[3] = (byte)range;
  5711. continue;
  5712. }
  5713. }
  5714. flushText(text,lastport,rep,range);
  5715. // output diff
  5716. text.append(',');
  5717. if (j==4)
  5718. j--;
  5719. for (unsigned k=j;k<4;k++) {
  5720. if (k>j)
  5721. text.append('.');
  5722. text.append((int)ip[k]);
  5723. }
  5724. }
  5725. memcpy(&lastip,&ip,sizeof(lastip));
  5726. lastis4 = is4;
  5727. lastport = ep.port;
  5728. }
  5729. flushText(text,lastport,rep,range);
  5730. return text;
  5731. }
  5732. inline const char *getnum(const char *s,unsigned &n)
  5733. {
  5734. n = 0;
  5735. while (isdigit(*s)) {
  5736. n = n*10+(*s-'0');
  5737. s++;
  5738. }
  5739. return s;
  5740. }
  5741. inline bool appendv4range(SocketEndpointArray *array,char *str,SocketEndpoint &ep, unsigned defport)
  5742. {
  5743. char *s = str;
  5744. unsigned dc = 0;
  5745. unsigned port = defport;
  5746. unsigned rng = 0;
  5747. unsigned rep = 1;
  5748. bool notip = false;
  5749. while (*s) {
  5750. if (*s=='.') {
  5751. dc++;
  5752. s++;
  5753. }
  5754. else if (*s==':') {
  5755. *s = 0;
  5756. s = (char *)getnum(s+1,port);
  5757. }
  5758. else if (*s=='-')
  5759. {
  5760. if (!notip)//don't assume '-' is an ip range delimiter
  5761. { //Allow '-' in any octet due to ip group support
  5762. *s = 0;
  5763. s = (char *)getnum(s+1,rng);
  5764. }
  5765. else
  5766. {
  5767. notip = true;
  5768. s++;
  5769. }
  5770. }
  5771. else if (*s=='*') {
  5772. *s = 0;
  5773. s = (char *)getnum(s+1,rep);
  5774. }
  5775. else {
  5776. if (!isdigit(*s))
  5777. notip = true;
  5778. s++;
  5779. }
  5780. }
  5781. ep.port = port;
  5782. if (*str) {
  5783. if (!notip&&((dc<3)&&((dc!=1)||(strlen(str)!=1)))) {
  5784. if (!ep.isIp4()) {
  5785. return false;
  5786. }
  5787. StringBuffer tmp;
  5788. ep.getIpText(tmp);
  5789. dc++;
  5790. for (;;) {
  5791. if (tmp.length()==0)
  5792. return false;
  5793. if (tmp.charAt(tmp.length()-1)=='.')
  5794. if (--dc==0)
  5795. break;
  5796. tmp.setLength(tmp.length()-1);
  5797. }
  5798. tmp.append(str);
  5799. if (rng) {
  5800. tmp.appendf("-%d",rng);
  5801. rep = ep.ipsetrange(tmp.str());
  5802. }
  5803. else
  5804. ep.ipset(tmp.str());
  5805. }
  5806. else if (rng) { // not nice as have to add back range (must be better way - maybe ipincrementto) TBD
  5807. StringBuffer tmp;
  5808. tmp.appendf("%s-%d",str,rng);
  5809. rep = ep.ipsetrange(tmp.str());
  5810. }
  5811. else if (*str)
  5812. ep.ipset(str);
  5813. if (ep.isNull())
  5814. ep.port = 0;
  5815. for (unsigned i=0;i<rep;i++) {
  5816. array->append(ep);
  5817. if (rng)
  5818. ep.ipincrement(1);
  5819. }
  5820. }
  5821. else {// just a port change
  5822. if (ep.isNull()) // avoid null values with ports
  5823. ep.port = 0;
  5824. array->append(ep);
  5825. }
  5826. return true;
  5827. }
  5828. bool SocketEndpointArray::fromName(const char *name, unsigned defport)
  5829. {
  5830. // Lookup a single name that may resolve to multiple IPs in a headless service scenario
  5831. StringArray portSplit;
  5832. portSplit.appendList(name, ":");
  5833. switch (portSplit.ordinality())
  5834. {
  5835. case 2:
  5836. defport = atoi(portSplit.item(1));
  5837. name = portSplit.item(0);
  5838. // fallthrough
  5839. case 1:
  5840. break;
  5841. default:
  5842. throw MakeStringException(-1, "Invalid name %s SocketEndpointArray::fromName", name);
  5843. }
  5844. #if defined(__linux__) || defined (__APPLE__) || defined(getaddrinfo)
  5845. if (IP4only)
  5846. #endif
  5847. {
  5848. CriticalBlock c(hostnamesect);
  5849. hostent * entry = gethostbyname(name);
  5850. if (entry && entry->h_addr_list[0])
  5851. {
  5852. unsigned ptr = 0;
  5853. for (;;)
  5854. {
  5855. ptr++;
  5856. if (entry->h_addr_list[ptr]==NULL)
  5857. break;
  5858. SocketEndpoint ep;
  5859. ep.setNetAddress(sizeof(unsigned),entry->h_addr_list[ptr]);
  5860. ep.port = defport;
  5861. append(ep);
  5862. }
  5863. }
  5864. return ordinality()>0;
  5865. }
  5866. #if defined(__linux__) || defined (__APPLE__) || defined(getaddrinfo)
  5867. struct addrinfo hints;
  5868. memset(&hints,0,sizeof(hints));
  5869. struct addrinfo *addrInfo = NULL;
  5870. memset(&hints,0,sizeof(hints));
  5871. int ret = getaddrinfo(name, NULL , &hints, &addrInfo);
  5872. if (ret == 0)
  5873. {
  5874. struct addrinfo *ai;
  5875. for (ai = addrInfo; ai; ai = ai->ai_next)
  5876. {
  5877. // DBGLOG("flags=%d, family=%d, socktype=%d, protocol=%d, addrlen=%d, canonname=%s",ai->ai_flags,ai->ai_family,ai->ai_socktype,ai->ai_protocol,ai->ai_addrlen,ai->ai_canonname?ai->ai_canonname:"NULL");
  5878. if (ai->ai_protocol == IPPROTO_IP)
  5879. {
  5880. switch (ai->ai_family)
  5881. {
  5882. case AF_INET:
  5883. {
  5884. SocketEndpoint ep;
  5885. ep.setNetAddress(sizeof(in_addr),&(((sockaddr_in *)ai->ai_addr)->sin_addr));
  5886. ep.port = defport;
  5887. append(ep);
  5888. // StringBuffer s;
  5889. // DBGLOG("Lookup %s found %s", name, ep.getUrlStr(s).str());
  5890. break;
  5891. }
  5892. case AF_INET6:
  5893. {
  5894. SocketEndpoint ep;
  5895. ep.setNetAddress(sizeof(in_addr6),&(((sockaddr_in6 *)ai->ai_addr)->sin6_addr));
  5896. ep.port = defport;
  5897. append(ep);
  5898. break;
  5899. }
  5900. }
  5901. }
  5902. }
  5903. }
  5904. freeaddrinfo(addrInfo);
  5905. #endif
  5906. return ordinality()>0;
  5907. }
  5908. void SocketEndpointArray::fromText(const char *text,unsigned defport)
  5909. {
  5910. // this is quite complicated with (mixed) IPv4 and IPv6
  5911. // only support 'full' IPv6 and no ranges
  5912. if (!text)
  5913. return;
  5914. char *str = strdup(text);
  5915. char *s = str;
  5916. SocketEndpoint ep;
  5917. bool eol = false;
  5918. for (;;) {
  5919. while (isspace(*s)||(*s==','))
  5920. s++;
  5921. if (!*s)
  5922. break;
  5923. char *e=s;
  5924. if (*e=='[') { // we have a IPv6
  5925. while (*e&&(*e!=']'))
  5926. e++;
  5927. while ((*e!=',')&&!isspace(*e)) {
  5928. if (!*s) {
  5929. eol = true;
  5930. break;
  5931. }
  5932. e++;
  5933. }
  5934. *e = 0;
  5935. ep.set(s,defport);
  5936. if (ep.isNull()) {
  5937. // Error TBD
  5938. }
  5939. append(ep);
  5940. }
  5941. else {
  5942. bool hascolon = false;
  5943. bool isv6 = false;
  5944. do {
  5945. if (*e==':') {
  5946. if (hascolon)
  5947. isv6 = true;
  5948. else
  5949. hascolon = true;
  5950. }
  5951. e++;
  5952. if (!*e) {
  5953. eol = true;
  5954. break;
  5955. }
  5956. } while (!isspace(*e)&&(*e!=','));
  5957. *e = 0;
  5958. if (isv6) {
  5959. ep.set(s,defport);
  5960. if (ep.isNull()) {
  5961. // Error TBD
  5962. }
  5963. append(ep);
  5964. }
  5965. else {
  5966. if (!appendv4range(this,s,ep,defport)) {
  5967. // Error TBD
  5968. }
  5969. }
  5970. }
  5971. if (eol)
  5972. break;
  5973. s = e+1;
  5974. }
  5975. free(str);
  5976. }
  5977. bool IpSubNet::set(const char *_net,const char *_mask)
  5978. {
  5979. if (!_net||!decodeNumericIP(_net,net)) { // _net NULL means match everything
  5980. memset(net,0,sizeof(net));
  5981. memset(mask,0,sizeof(mask));
  5982. return (_net==NULL);
  5983. }
  5984. if (!_mask||!decodeNumericIP(_mask,mask)) { // _mask NULL means match exact
  5985. memset(mask,0xff,sizeof(mask));
  5986. return (_mask==NULL);
  5987. }
  5988. if (isIp4(net)!=isIp4(mask))
  5989. return false;
  5990. for (unsigned j=0;j<4;j++)
  5991. if (net[j]&~mask[j])
  5992. return false;
  5993. return true;
  5994. }
  5995. bool IpSubNet::test(const IpAddress &ip) const
  5996. {
  5997. unsigned i;
  5998. if (ip.getNetAddress(sizeof(i),&i)==sizeof(i)) {
  5999. if (!isIp4(net))
  6000. return false;
  6001. return (i&mask[3])==(net[3]&mask[3]);
  6002. }
  6003. unsigned na[4];
  6004. if (ip.getNetAddress(sizeof(na),&na)==sizeof(na)) {
  6005. for (unsigned j=0;j<4;j++)
  6006. if ((na[j]&mask[j])!=(net[j]&mask[j]))
  6007. return false;
  6008. return true;
  6009. }
  6010. return false;
  6011. }
  6012. StringBuffer &IpSubNet::getNetText(StringBuffer &text) const
  6013. {
  6014. char tmp[INET6_ADDRSTRLEN];
  6015. const char *res = ::isIp4(net) ? _inet_ntop(AF_INET, &net[3], tmp, sizeof(tmp))
  6016. : _inet_ntop(AF_INET6, &net, tmp, sizeof(tmp));
  6017. return text.append(res);
  6018. }
  6019. StringBuffer &IpSubNet::getMaskText(StringBuffer &text) const
  6020. {
  6021. char tmp[INET6_ADDRSTRLEN];
  6022. // isIp4(net) is correct here
  6023. const char *res = ::isIp4(net) ? _inet_ntop(AF_INET, &mask[3], tmp, sizeof(tmp))
  6024. : _inet_ntop(AF_INET6, &mask, tmp, sizeof(tmp));
  6025. return text.append(res);
  6026. }
  6027. bool IpSubNet::isNull() const
  6028. {
  6029. for (unsigned i=0;i<4;i++)
  6030. if (net[i]||mask[i])
  6031. return false;
  6032. return true;
  6033. }
  6034. IpSubNet &queryPreferredSubnet()
  6035. {
  6036. return PreferredSubnet;
  6037. }
  6038. bool setPreferredSubnet(const char *ip,const char *mask)
  6039. {
  6040. // also resets cached host IP
  6041. if (PreferredSubnet.set(ip,mask))
  6042. {
  6043. if (!cachehostip.isNull())
  6044. {
  6045. cachehostip.ipset(NULL);
  6046. queryHostIP();
  6047. }
  6048. return true;
  6049. }
  6050. else
  6051. return false;
  6052. }
  6053. StringBuffer &lookupHostName(const IpAddress &ip,StringBuffer &ret)
  6054. {
  6055. // not a common routine (no Jlib function!) only support IPv4 initially
  6056. unsigned ipa;
  6057. if (ip.getNetAddress(sizeof(ipa),&ipa)==sizeof(ipa)) {
  6058. struct hostent *phostent = gethostbyaddr( (char *) &ipa, sizeof(ipa), PF_INET);
  6059. if (phostent)
  6060. ret.append(phostent->h_name);
  6061. else
  6062. ip.getIpText(ret);
  6063. }
  6064. else
  6065. ip.getIpText(ret);
  6066. return ret;
  6067. }
  6068. struct SocketEndpointHTElem
  6069. {
  6070. IInterface *ii;
  6071. SocketEndpoint ep;
  6072. SocketEndpointHTElem(const SocketEndpoint _ep,IInterface *_ii) { ep.set(_ep); ii = _ii; }
  6073. ~SocketEndpointHTElem() { ::Release(ii); }
  6074. };
  6075. class jlib_decl CSocketEndpointHashTable : public SuperHashTableOf<SocketEndpointHTElem, SocketEndpoint>, implements ISocketEndpointHashTable
  6076. {
  6077. virtual void onAdd(void *) {}
  6078. virtual void onRemove(void *e) { delete (SocketEndpointHTElem *)e; }
  6079. unsigned getHashFromElement(const void *e) const
  6080. {
  6081. return ((const SocketEndpointHTElem *)e)->ep.hash(0);
  6082. }
  6083. unsigned getHashFromFindParam(const void *fp) const
  6084. {
  6085. return ((const SocketEndpoint *)fp)->hash(0);
  6086. }
  6087. const void * getFindParam(const void *p) const
  6088. {
  6089. return &((const SocketEndpointHTElem *)p)->ep;
  6090. }
  6091. bool matchesFindParam(const void * et, const void *fp, unsigned) const
  6092. {
  6093. return ((const SocketEndpointHTElem *)et)->ep.equals(*(SocketEndpoint *)fp);
  6094. }
  6095. IMPLEMENT_SUPERHASHTABLEOF_REF_FIND(SocketEndpointHTElem,SocketEndpoint);
  6096. public:
  6097. IMPLEMENT_IINTERFACE;
  6098. CSocketEndpointHashTable() {}
  6099. ~CSocketEndpointHashTable() { _releaseAll(); }
  6100. void add(const SocketEndpoint &ep, IInterface *i)
  6101. {
  6102. SocketEndpointHTElem *e = SuperHashTableOf<SocketEndpointHTElem,SocketEndpoint>::find(&ep);
  6103. if (e) {
  6104. ::Release(e->ii);
  6105. e->ii = i;
  6106. }
  6107. else {
  6108. e = new SocketEndpointHTElem(ep,i);
  6109. SuperHashTableOf<SocketEndpointHTElem,SocketEndpoint>::add(*e);
  6110. }
  6111. }
  6112. void remove(const SocketEndpoint &ep)
  6113. {
  6114. SuperHashTableOf<SocketEndpointHTElem,SocketEndpoint>::remove(&ep);
  6115. }
  6116. IInterface *find(const SocketEndpoint &ep)
  6117. {
  6118. SocketEndpointHTElem *e = SuperHashTableOf<SocketEndpointHTElem,SocketEndpoint>::find(&ep);
  6119. if (e)
  6120. return e->ii;
  6121. return NULL;
  6122. }
  6123. };
  6124. ISocketEndpointHashTable *createSocketEndpointHashTable()
  6125. {
  6126. CSocketEndpointHashTable *ht = new CSocketEndpointHashTable;
  6127. return ht;
  6128. }
  6129. class CSocketConnectWait: implements ISocketConnectWait, public CInterface
  6130. {
  6131. Owned<CSocket> sock;
  6132. bool done;
  6133. CTimeMon connecttm;
  6134. unsigned startt;
  6135. bool oneshot;
  6136. bool isopen;
  6137. int initerr;
  6138. void successfulConnect()
  6139. {
  6140. STATS.connects++;
  6141. STATS.connecttime+=usTick()-startt;
  6142. #ifdef _TRACE
  6143. sock->setTraceName();
  6144. #endif
  6145. }
  6146. void failedConnect()
  6147. {
  6148. STATS.failedconnects++;
  6149. STATS.failedconnecttime+=usTick()-startt;
  6150. const char* tracename = sock->tracename;
  6151. THROWJSOCKEXCEPTION(JSOCKERR_connection_failed);
  6152. }
  6153. public:
  6154. IMPLEMENT_IINTERFACE;
  6155. CSocketConnectWait(SocketEndpoint &ep,unsigned connecttimeoutms)
  6156. : connecttm(connecttimeoutms)
  6157. {
  6158. oneshot = (connecttimeoutms==0); // i.e. as long as one connect takes
  6159. done = false;
  6160. startt = usTick();
  6161. sock.setown(new CSocket(ep,sm_tcp,NULL));
  6162. isopen = true;
  6163. initerr = sock->pre_connect(false);
  6164. }
  6165. ISocket *wait(unsigned timems)
  6166. {
  6167. // this is a bit spagetti due to dual timeouts etc
  6168. CTimeMon waittm(timems);
  6169. unsigned refuseddelay = 1;
  6170. bool waittimedout = false;
  6171. bool connectimedout = false;
  6172. do {
  6173. bool connectdone = false;
  6174. unsigned remaining;
  6175. connectimedout = connecttm.timedout(&remaining);
  6176. unsigned waitremaining;
  6177. waittimedout = waittm.timedout(&waitremaining);
  6178. if (oneshot||(waitremaining<remaining))
  6179. remaining = waitremaining;
  6180. int err = 0;
  6181. if (!isopen||initerr)
  6182. {
  6183. isopen = true;
  6184. err = initerr?initerr:sock->pre_connect(false);
  6185. initerr = 0;
  6186. if ((err == JSE_INPROGRESS)||(err == JSE_WOULDBLOCK))
  6187. err = 0; // continue
  6188. else
  6189. {
  6190. if (err==0)
  6191. connectdone = true; // done immediately
  6192. else if(!oneshot) // probably ECONNREFUSED but treat all errors same
  6193. refused_sleep((waitremaining==remaining)?waittm:connecttm,refuseddelay); // this stops becoming cpu bound
  6194. }
  6195. }
  6196. if (!connectdone&&(err==0))
  6197. {
  6198. SOCKET s = sock->sock;
  6199. #ifdef _USE_SELECT
  6200. CHECKSOCKRANGE(s);
  6201. T_FD_SET fds;
  6202. struct timeval tv;
  6203. XFD_ZERO(&fds);
  6204. FD_SET((unsigned)s, &fds);
  6205. T_FD_SET except;
  6206. XFD_ZERO(&except);
  6207. FD_SET((unsigned)s, &except);
  6208. tv.tv_sec = remaining / 1000;
  6209. tv.tv_usec = (remaining % 1000)*1000;
  6210. int rc = ::select( s + 1, NULL, (fd_set *)&fds, (fd_set *)&except, &tv );
  6211. #else
  6212. struct pollfd fds[1];
  6213. fds[0].fd = s;
  6214. fds[0].events = POLLOUT;
  6215. fds[0].revents = 0;
  6216. int rc = ::poll(fds, 1, remaining);
  6217. #endif
  6218. if (rc==0)
  6219. break; // timeout
  6220. done = true;
  6221. err = 0;
  6222. if (rc>0)
  6223. {
  6224. // select/poll succeeded - return error from socket (0 if connected)
  6225. socklen_t errlen = sizeof(err);
  6226. rc = getsockopt(s, SOL_SOCKET, SO_ERROR, (char *)&err, &errlen); // check for error
  6227. if ((rc!=0)&&!err)
  6228. err = ERRNO(); // some implementations of getsockopt duff
  6229. if (err&&!oneshot) // probably ECONNREFUSED but treat all errors same
  6230. refused_sleep((waitremaining==remaining)?waittm:connecttm,refuseddelay); // this stops becoming cpu bound
  6231. }
  6232. else
  6233. { // select/poll failed
  6234. err = ERRNO();
  6235. if (err != JSE_INTR)
  6236. {
  6237. LOGERR(err,2,"CSocketConnectWait ::select/poll");
  6238. sock->errclose();
  6239. isopen = false;
  6240. if (!oneshot)
  6241. connectimedout = true;
  6242. break;
  6243. }
  6244. }
  6245. }
  6246. if (err==0)
  6247. {
  6248. err = sock->post_connect();
  6249. if (err==0)
  6250. {
  6251. successfulConnect();
  6252. return sock.getClear();
  6253. }
  6254. }
  6255. sock->errclose();
  6256. isopen = false;
  6257. } while (!waittimedout&&!oneshot);
  6258. if (connectimedout)
  6259. {
  6260. STATS.failedconnects++;
  6261. STATS.failedconnecttime+=usTick()-startt;
  6262. const char* tracename = sock->tracename;
  6263. THROWJSOCKEXCEPTION(JSOCKERR_connection_failed);
  6264. }
  6265. return NULL;
  6266. }
  6267. };
  6268. ISocketConnectWait *nonBlockingConnect(SocketEndpoint &ep,unsigned connecttimeoutms)
  6269. {
  6270. return new CSocketConnectWait(ep,connecttimeoutms);
  6271. }
  6272. int wait_multiple(bool isRead, //IN true if wait read, false it wait write
  6273. UnsignedArray &socks, //IN sockets to be checked for readiness
  6274. unsigned timeoutMS, //IN timeout
  6275. UnsignedArray &readySocks) //OUT sockets ready
  6276. {
  6277. aindex_t numSocks = socks.length();
  6278. if (numSocks == 0)
  6279. THROWJSOCKEXCEPTION2(JSOCKERR_bad_address);
  6280. #ifdef _DEBUG
  6281. StringBuffer dbgSB("wait_multiple() on sockets :");
  6282. #endif
  6283. #ifdef _USE_SELECT
  6284. SOCKET maxSocket = 0;
  6285. T_FD_SET fds;
  6286. XFD_ZERO(&fds);
  6287. //Add each SOCKET in array to T_FD_SET
  6288. for (aindex_t idx = 0; idx < numSocks; idx++)
  6289. {
  6290. #ifdef _DEBUG
  6291. dbgSB.appendf(" %d",socks.item(idx));
  6292. #endif
  6293. SOCKET s = socks.item(idx);
  6294. CHECKSOCKRANGE(s);
  6295. maxSocket = s > maxSocket ? s : maxSocket;
  6296. FD_SET((unsigned)s, &fds);
  6297. }
  6298. #else
  6299. struct pollfd *fds = nullptr;
  6300. try
  6301. {
  6302. fds = new pollfd[numSocks];
  6303. }
  6304. catch (const std::bad_alloc &e)
  6305. {
  6306. int err = ERRNO();
  6307. throw MakeStringException(-1,"wait_multiple::fds malloc failure %d", err);
  6308. }
  6309. for (aindex_t idx = 0; idx < numSocks; idx++)
  6310. {
  6311. #ifdef _DEBUG
  6312. dbgSB.appendf(" %d",socks.item(idx));
  6313. #endif
  6314. SOCKET s = socks.item(idx);
  6315. fds[idx].fd = s;
  6316. fds[idx].events = isRead ? POLLINX : POLLOUT;
  6317. fds[idx].revents = 0;
  6318. }
  6319. #endif
  6320. #ifdef _DEBUG
  6321. DBGLOG("%s",dbgSB.str());
  6322. #endif
  6323. //Check socket states
  6324. int res = 0;
  6325. #ifdef _USE_SELECT
  6326. if (timeoutMS == WAIT_FOREVER)
  6327. res = ::select( maxSocket + 1, isRead ? (fd_set *)&fds : NULL, isRead ? NULL : (fd_set *)&fds, NULL, NULL );
  6328. else
  6329. {
  6330. struct timeval tv;
  6331. tv.tv_sec = timeoutMS / 1000;
  6332. tv.tv_usec = (timeoutMS % 1000)*1000;
  6333. res = ::select( maxSocket + 1, isRead ? (fd_set *)&fds : NULL, isRead ? NULL : (fd_set *)&fds, NULL, &tv );
  6334. }
  6335. #else
  6336. res = poll(fds, numSocks, timeoutMS);
  6337. #endif
  6338. if (res > 0)
  6339. {
  6340. #ifdef _DEBUG
  6341. StringBuffer dbgSB("wait_multiple() ready socket(s) :");
  6342. #endif
  6343. //Build up list of socks which are ready for accept read/write without blocking
  6344. for (aindex_t idx = 0; idx < numSocks; idx++)
  6345. {
  6346. SOCKET s = socks.item(idx);
  6347. #ifdef _USE_SELECT
  6348. if (FD_ISSET(s, &fds))
  6349. #else
  6350. if (fds[idx].revents)
  6351. #endif
  6352. {
  6353. #ifdef _DEBUG
  6354. dbgSB.appendf(" %d",s);
  6355. #endif
  6356. readySocks.append(s);
  6357. if ((int) readySocks.length() == res)
  6358. break;
  6359. }
  6360. }
  6361. #ifdef _DEBUG
  6362. DBGLOG("%s",dbgSB.str());
  6363. #endif
  6364. res = readySocks.ordinality();
  6365. }
  6366. else if (res == SOCKET_ERROR)
  6367. {
  6368. res = 0; // dont return negative on failure
  6369. int err = ERRNO();
  6370. if (err != JSE_INTR)
  6371. {
  6372. #ifndef _USE_SELECT
  6373. delete [] fds;
  6374. #endif
  6375. throw MakeStringException(-1,"wait_multiple::select/poll error %d", err);
  6376. }
  6377. }
  6378. #ifndef _USE_SELECT
  6379. delete [] fds;
  6380. #endif
  6381. return res;
  6382. }
  6383. //Given a list of sockets, wait until any one or more are ready to be read (wont block)
  6384. //returns 0 if timeout, number of waiting sockets otherwise
  6385. int wait_read_multiple(UnsignedArray &socks, //IN sockets to be checked for readiness
  6386. unsigned timeoutMS, //IN timeout
  6387. UnsignedArray &readySocks) //OUT sockets ready to be read
  6388. {
  6389. return wait_multiple(true, socks, timeoutMS, readySocks);
  6390. }
  6391. int wait_write_multiple(UnsignedArray &socks, //IN sockets to be checked for readiness
  6392. unsigned timeoutMS, //IN timeout
  6393. UnsignedArray &readySocks) //OUT sockets ready to be written
  6394. {
  6395. return wait_multiple(false, socks, timeoutMS, readySocks);
  6396. }
  6397. inline bool isIPV4Internal(const char *ip)
  6398. {
  6399. struct sockaddr_in sa;
  6400. return 0 != inet_pton(AF_INET, ip, &sa.sin_addr);
  6401. }
  6402. inline bool isIPV6Internal(const char *ip)
  6403. {
  6404. struct sockaddr_in6 sa;
  6405. return 0 != inet_pton(AF_INET6, ip, &sa.sin6_addr);
  6406. }
  6407. bool isIPV4(const char *ip)
  6408. {
  6409. if (isEmptyString(ip))
  6410. return false;
  6411. return isIPV4Internal(ip);
  6412. }
  6413. bool isIPV6(const char *ip)
  6414. {
  6415. if (isEmptyString(ip))
  6416. return false;
  6417. return isIPV6Internal(ip);
  6418. }
  6419. bool isIPAddress(const char *ip)
  6420. {
  6421. if (isEmptyString(ip))
  6422. return false;
  6423. return isIPV4(ip) || isIPV6(ip);
  6424. }
  6425. class CAllowListHandler : public CSimpleInterfaceOf<IAllowListHandler>, implements IAllowListWriter
  6426. {
  6427. typedef CSimpleInterfaceOf<IAllowListHandler> PARENT;
  6428. struct PairHasher
  6429. {
  6430. template <class T1, class T2>
  6431. std::size_t operator () (std::pair<T1, T2> const &pair) const
  6432. {
  6433. std::size_t h1 = std::hash<T1>()(pair.first);
  6434. std::size_t h2 = std::hash<T2>()(pair.second);
  6435. return h1 ^ h2;
  6436. }
  6437. };
  6438. using AllowListHT = std::unordered_set<std::pair<std::string, unsigned __int64>, PairHasher>;
  6439. AllowListPopulateFunction populateFunc;
  6440. AllowListFormatFunction roleFormatFunc;
  6441. std::unordered_set<std::pair<std::string, unsigned __int64>, PairHasher> allowList;
  6442. std::unordered_set<std::string> IPOnlyAllowList;
  6443. bool allowAnonRoles = false;
  6444. mutable CriticalSection populatedCrit;
  6445. mutable bool populated = false;
  6446. mutable bool enabled = true;
  6447. void ensurePopulated() const
  6448. {
  6449. // should be called within CS
  6450. if (populated)
  6451. return;
  6452. // NB: want to keep this method const, as used by isXX functions that are const, but if need to refresh it's effectively mutable
  6453. enabled = populateFunc(* const_cast<IAllowListWriter *>((const IAllowListWriter *)this));
  6454. populated = true;
  6455. }
  6456. public:
  6457. IMPLEMENT_IINTERFACE_O_USING(PARENT);
  6458. CAllowListHandler(AllowListPopulateFunction _populateFunc, AllowListFormatFunction _roleFormatFunc) : populateFunc(_populateFunc), roleFormatFunc(_roleFormatFunc)
  6459. {
  6460. }
  6461. // IAllowListHandler impl.
  6462. virtual bool isAllowListed(const char *ip, unsigned __int64 role, StringBuffer *responseText) const override
  6463. {
  6464. CriticalBlock block(populatedCrit);
  6465. ensurePopulated();
  6466. if (0 == role) // unknown, can only check ip
  6467. {
  6468. if (allowAnonRoles)
  6469. {
  6470. const auto &it = IPOnlyAllowList.find(ip);
  6471. if (it != IPOnlyAllowList.end())
  6472. return true;
  6473. }
  6474. }
  6475. else
  6476. {
  6477. const auto &it = allowList.find({ip, role});
  6478. if (it != allowList.end())
  6479. return true;
  6480. }
  6481. // if !enabled and no responseText supplied, generate response and warn that disabled
  6482. StringBuffer disabledResponseText;
  6483. if (!enabled && !responseText)
  6484. responseText = &disabledResponseText;
  6485. if (responseText)
  6486. {
  6487. responseText->append("Access denied! [server ip=");
  6488. queryHostIP().getIpText(*responseText);
  6489. responseText->append(", client ip=");
  6490. responseText->append(ip);
  6491. if (role)
  6492. {
  6493. responseText->append(", role=");
  6494. if (roleFormatFunc)
  6495. roleFormatFunc(*responseText, role);
  6496. else
  6497. responseText->append(role);
  6498. }
  6499. responseText->append("] not in allowlist");
  6500. }
  6501. if (enabled)
  6502. return false;
  6503. else
  6504. {
  6505. OWARNLOG("Allowlist is disabled, ignoring: %s", responseText->str());
  6506. return true;
  6507. }
  6508. }
  6509. virtual StringBuffer &getAllowList(StringBuffer &out) const override
  6510. {
  6511. CriticalBlock block(populatedCrit);
  6512. ensurePopulated();
  6513. for (const auto &it: allowList)
  6514. {
  6515. out.append(it.first.c_str()).append(", ");
  6516. if (roleFormatFunc)
  6517. roleFormatFunc(out, it.second);
  6518. else
  6519. out.append(it.second);
  6520. out.newline();
  6521. }
  6522. out.newline().appendf("Allowlist is currently: %s", enabled ? "enabled" : "disabled").newline();
  6523. return out;
  6524. }
  6525. virtual void refresh() override
  6526. {
  6527. /* NB: clear only, so that next usage will re-populated
  6528. * Do not want to repopulate now, because refresh() is likely called within a update write transaction
  6529. */
  6530. CriticalBlock block(populatedCrit);
  6531. enabled = true;
  6532. allowList.clear();
  6533. IPOnlyAllowList.clear();
  6534. populated = false;
  6535. }
  6536. // IAllowListWriter impl.
  6537. virtual void add(const char *ip, unsigned __int64 role) override
  6538. {
  6539. // NB: called via populateFunc, which is called whilst populatedCrit is locked.
  6540. allowList.insert({ ip, role });
  6541. if (allowAnonRoles)
  6542. IPOnlyAllowList.insert(ip);
  6543. }
  6544. virtual void setAllowAnonRoles(bool tf) override
  6545. {
  6546. allowAnonRoles = tf;
  6547. refresh();
  6548. }
  6549. };
  6550. IAllowListHandler *createAllowListHandler(AllowListPopulateFunction populateFunc, AllowListFormatFunction roleFormatFunc)
  6551. {
  6552. return new CAllowListHandler(populateFunc, roleFormatFunc);
  6553. }
  6554. static_assert(sizeof(IpAddress) == 16, "check size of IpAddress");
  6555. static_assert(sizeof(SocketEndpoint) == 20, "check size of SocketEndpoint");